send.c (4733B)
1 #include <u.h> 2 #include <libc.h> 3 #include <venti.h> 4 #include "queue.h" 5 6 long ventisendbytes, ventisendpackets; 7 long ventirecvbytes, ventirecvpackets; 8 9 static int 10 _vtsend(VtConn *z, Packet *p) 11 { 12 IOchunk ioc; 13 int n, tot; 14 uchar buf[4]; 15 16 if(z->state != VtStateConnected) { 17 werrstr("session not connected"); 18 return -1; 19 } 20 21 /* add framing */ 22 n = packetsize(p); 23 if(z->version[1] == '2') { 24 if(n >= (1<<16)) { 25 werrstr("packet too large"); 26 packetfree(p); 27 return -1; 28 } 29 buf[0] = n>>8; 30 buf[1] = n; 31 packetprefix(p, buf, 2); 32 ventisendbytes += n+2; 33 } else { 34 buf[0] = n>>24; 35 buf[1] = n>>16; 36 buf[2] = n>>8; 37 buf[3] = n; 38 packetprefix(p, buf, 4); 39 ventisendbytes += n+4; 40 } 41 ventisendpackets++; 42 43 tot = 0; 44 for(;;){ 45 n = packetfragments(p, &ioc, 1, 0); 46 if(n == 0) 47 break; 48 if(write(z->outfd, ioc.addr, ioc.len) < ioc.len){ 49 vtlog(VtServerLog, "<font size=-1>%T %s:</font> sending packet %p: %r<br>\n", z->addr, p); 50 packetfree(p); 51 return -1; 52 } 53 packetconsume(p, nil, ioc.len); 54 tot += ioc.len; 55 } 56 vtlog(VtServerLog, "<font size=-1>%T %s:</font> sent packet %p (%d bytes)<br>\n", z->addr, p, tot); 57 packetfree(p); 58 return 1; 59 } 60 61 static int 62 interrupted(void) 63 { 64 char e[ERRMAX]; 65 66 rerrstr(e, sizeof e); 67 return strstr(e, "interrupted") != nil; 68 } 69 70 71 static Packet* 72 _vtrecv(VtConn *z) 73 { 74 uchar buf[10], *b; 75 int n, need; 76 Packet *p; 77 int size, len; 78 79 if(z->state != VtStateConnected) { 80 werrstr("session not connected"); 81 return nil; 82 } 83 84 p = z->part; 85 /* get enough for head size */ 86 size = packetsize(p); 87 need = z->version[1] - '0'; // 2 or 4 88 while(size < need) { 89 b = packettrailer(p, need); 90 assert(b != nil); 91 if(0) fprint(2, "%d read hdr\n", getpid()); 92 n = read(z->infd, b, need); 93 if(0) fprint(2, "%d got %d (%r)\n", getpid(), n); 94 if(n==0 || (n<0 && !interrupted())) 95 goto Err; 96 size += n; 97 packettrim(p, 0, size); 98 } 99 100 if(packetconsume(p, buf, need) < 0) 101 goto Err; 102 if(z->version[1] == '2') { 103 len = (buf[0] << 8) | buf[1]; 104 size -= 2; 105 } else { 106 len = (buf[0]<<24) | (buf[1]<<16) | (buf[2]<<8) | buf[3]; 107 size -= 4; 108 } 109 110 while(size < len) { 111 n = len - size; 112 if(n > MaxFragSize) 113 n = MaxFragSize; 114 b = packettrailer(p, n); 115 if(0) fprint(2, "%d read body %d\n", getpid(), n); 116 n = read(z->infd, b, n); 117 if(0) fprint(2, "%d got %d (%r)\n", getpid(), n); 118 if(n > 0) 119 size += n; 120 packettrim(p, 0, size); 121 if(n==0 || (n<0 && !interrupted())) 122 goto Err; 123 } 124 ventirecvbytes += len; 125 ventirecvpackets++; 126 p = packetsplit(p, len); 127 vtlog(VtServerLog, "<font size=-1>%T %s:</font> read packet %p len %d<br>\n", z->addr, p, len); 128 return p; 129 Err: 130 vtlog(VtServerLog, "<font size=-1>%T %s:</font> error reading packet: %r<br>\n", z->addr); 131 return nil; 132 } 133 134 /* 135 * If you fork off two procs running vtrecvproc and vtsendproc, 136 * then vtrecv/vtsend (and thus vtrpc) will never block except on 137 * rendevouses, which is nice when it's running in one thread of many. 138 */ 139 void 140 vtrecvproc(void *v) 141 { 142 Packet *p; 143 VtConn *z; 144 Queue *q; 145 146 z = v; 147 q = _vtqalloc(); 148 149 qlock(&z->lk); 150 z->readq = q; 151 qlock(&z->inlk); 152 rwakeup(&z->rpcfork); 153 qunlock(&z->lk); 154 155 while((p = _vtrecv(z)) != nil) 156 if(_vtqsend(q, p) < 0){ 157 packetfree(p); 158 break; 159 } 160 qunlock(&z->inlk); 161 qlock(&z->lk); 162 _vtqhangup(q); 163 while((p = _vtnbqrecv(q)) != nil) 164 packetfree(p); 165 _vtqdecref(q); 166 z->readq = nil; 167 rwakeup(&z->rpcfork); 168 qunlock(&z->lk); 169 vthangup(z); 170 } 171 172 void 173 vtsendproc(void *v) 174 { 175 Queue *q; 176 Packet *p; 177 VtConn *z; 178 179 z = v; 180 q = _vtqalloc(); 181 182 qlock(&z->lk); 183 z->writeq = q; 184 qlock(&z->outlk); 185 rwakeup(&z->rpcfork); 186 qunlock(&z->lk); 187 188 while((p = _vtqrecv(q)) != nil) 189 if(_vtsend(z, p) < 0) 190 break; 191 qunlock(&z->outlk); 192 qlock(&z->lk); 193 _vtqhangup(q); 194 while((p = _vtnbqrecv(q)) != nil) 195 packetfree(p); 196 _vtqdecref(q); 197 z->writeq = nil; 198 rwakeup(&z->rpcfork); 199 qunlock(&z->lk); 200 return; 201 } 202 203 Packet* 204 vtrecv(VtConn *z) 205 { 206 Packet *p; 207 Queue *q; 208 209 qlock(&z->lk); 210 if(z->state != VtStateConnected){ 211 werrstr("not connected"); 212 qunlock(&z->lk); 213 return nil; 214 } 215 if(z->readq){ 216 q = _vtqincref(z->readq); 217 qunlock(&z->lk); 218 p = _vtqrecv(q); 219 _vtqdecref(q); 220 return p; 221 } 222 223 qlock(&z->inlk); 224 qunlock(&z->lk); 225 p = _vtrecv(z); 226 qunlock(&z->inlk); 227 if(!p) 228 vthangup(z); 229 return p; 230 } 231 232 int 233 vtsend(VtConn *z, Packet *p) 234 { 235 Queue *q; 236 237 qlock(&z->lk); 238 if(z->state != VtStateConnected){ 239 packetfree(p); 240 werrstr("not connected"); 241 qunlock(&z->lk); 242 return -1; 243 } 244 if(z->writeq){ 245 q = _vtqincref(z->writeq); 246 qunlock(&z->lk); 247 if(_vtqsend(q, p) < 0){ 248 _vtqdecref(q); 249 packetfree(p); 250 return -1; 251 } 252 _vtqdecref(q); 253 return 0; 254 } 255 256 qlock(&z->outlk); 257 qunlock(&z->lk); 258 if(_vtsend(z, p) < 0){ 259 qunlock(&z->outlk); 260 vthangup(z); 261 return -1; 262 } 263 qunlock(&z->outlk); 264 return 0; 265 }