plan9port

fork of plan9port with libvec, libstr and libsdb
Log | Files | Refs | README | LICENSE

send.c (4733B)


      1 #include <u.h>
      2 #include <libc.h>
      3 #include <venti.h>
      4 #include "queue.h"
      5 
      6 long ventisendbytes, ventisendpackets;
      7 long ventirecvbytes, ventirecvpackets;
      8 
      9 static int
     10 _vtsend(VtConn *z, Packet *p)
     11 {
     12 	IOchunk ioc;
     13 	int n, tot;
     14 	uchar buf[4];
     15 
     16 	if(z->state != VtStateConnected) {
     17 		werrstr("session not connected");
     18 		return -1;
     19 	}
     20 
     21 	/* add framing */
     22 	n = packetsize(p);
     23 	if(z->version[1] == '2') {
     24 		if(n >= (1<<16)) {
     25 			werrstr("packet too large");
     26 			packetfree(p);
     27 			return -1;
     28 		}
     29 		buf[0] = n>>8;
     30 		buf[1] = n;
     31 		packetprefix(p, buf, 2);
     32 		ventisendbytes += n+2;
     33 	} else {
     34 		buf[0] = n>>24;
     35 		buf[1] = n>>16;
     36 		buf[2] = n>>8;
     37 		buf[3] = n;
     38 		packetprefix(p, buf, 4);
     39 		ventisendbytes += n+4;
     40 	}
     41 	ventisendpackets++;
     42 
     43 	tot = 0;
     44 	for(;;){
     45 		n = packetfragments(p, &ioc, 1, 0);
     46 		if(n == 0)
     47 			break;
     48 		if(write(z->outfd, ioc.addr, ioc.len) < ioc.len){
     49 			vtlog(VtServerLog, "<font size=-1>%T %s:</font> sending packet %p: %r<br>\n", z->addr, p);
     50 			packetfree(p);
     51 			return -1;
     52 		}
     53 		packetconsume(p, nil, ioc.len);
     54 		tot += ioc.len;
     55 	}
     56 	vtlog(VtServerLog, "<font size=-1>%T %s:</font> sent packet %p (%d bytes)<br>\n", z->addr, p, tot);
     57 	packetfree(p);
     58 	return 1;
     59 }
     60 
     61 static int
     62 interrupted(void)
     63 {
     64 	char e[ERRMAX];
     65 
     66 	rerrstr(e, sizeof e);
     67 	return strstr(e, "interrupted") != nil;
     68 }
     69 
     70 
     71 static Packet*
     72 _vtrecv(VtConn *z)
     73 {
     74 	uchar buf[10], *b;
     75 	int n, need;
     76 	Packet *p;
     77 	int size, len;
     78 
     79 	if(z->state != VtStateConnected) {
     80 		werrstr("session not connected");
     81 		return nil;
     82 	}
     83 
     84 	p = z->part;
     85 	/* get enough for head size */
     86 	size = packetsize(p);
     87 	need = z->version[1] - '0';	// 2 or 4
     88 	while(size < need) {
     89 		b = packettrailer(p, need);
     90 		assert(b != nil);
     91 		if(0) fprint(2, "%d read hdr\n", getpid());
     92 		n = read(z->infd, b, need);
     93 		if(0) fprint(2, "%d got %d (%r)\n", getpid(), n);
     94 		if(n==0 || (n<0 && !interrupted()))
     95 			goto Err;
     96 		size += n;
     97 		packettrim(p, 0, size);
     98 	}
     99 
    100 	if(packetconsume(p, buf, need) < 0)
    101 		goto Err;
    102 	if(z->version[1] == '2') {
    103 		len = (buf[0] << 8) | buf[1];
    104 		size -= 2;
    105 	} else {
    106 		len = (buf[0]<<24) | (buf[1]<<16) | (buf[2]<<8) | buf[3];
    107 		size -= 4;
    108 	}
    109 
    110 	while(size < len) {
    111 		n = len - size;
    112 		if(n > MaxFragSize)
    113 			n = MaxFragSize;
    114 		b = packettrailer(p, n);
    115 		if(0) fprint(2, "%d read body %d\n", getpid(), n);
    116 		n = read(z->infd, b, n);
    117 		if(0) fprint(2, "%d got %d (%r)\n", getpid(), n);
    118 		if(n > 0)
    119 			size += n;
    120 		packettrim(p, 0, size);
    121 		if(n==0 || (n<0 && !interrupted()))
    122 			goto Err;
    123 	}
    124 	ventirecvbytes += len;
    125 	ventirecvpackets++;
    126 	p = packetsplit(p, len);
    127 	vtlog(VtServerLog, "<font size=-1>%T %s:</font> read packet %p len %d<br>\n", z->addr, p, len);
    128 	return p;
    129 Err:
    130 	vtlog(VtServerLog, "<font size=-1>%T %s:</font> error reading packet: %r<br>\n", z->addr);
    131 	return nil;
    132 }
    133 
    134 /*
    135  * If you fork off two procs running vtrecvproc and vtsendproc,
    136  * then vtrecv/vtsend (and thus vtrpc) will never block except on
    137  * rendevouses, which is nice when it's running in one thread of many.
    138  */
    139 void
    140 vtrecvproc(void *v)
    141 {
    142 	Packet *p;
    143 	VtConn *z;
    144 	Queue *q;
    145 
    146 	z = v;
    147 	q = _vtqalloc();
    148 
    149 	qlock(&z->lk);
    150 	z->readq = q;
    151 	qlock(&z->inlk);
    152 	rwakeup(&z->rpcfork);
    153 	qunlock(&z->lk);
    154 
    155 	while((p = _vtrecv(z)) != nil)
    156 		if(_vtqsend(q, p) < 0){
    157 			packetfree(p);
    158 			break;
    159 		}
    160 	qunlock(&z->inlk);
    161 	qlock(&z->lk);
    162 	_vtqhangup(q);
    163 	while((p = _vtnbqrecv(q)) != nil)
    164 		packetfree(p);
    165 	_vtqdecref(q);
    166 	z->readq = nil;
    167 	rwakeup(&z->rpcfork);
    168 	qunlock(&z->lk);
    169 	vthangup(z);
    170 }
    171 
    172 void
    173 vtsendproc(void *v)
    174 {
    175 	Queue *q;
    176 	Packet *p;
    177 	VtConn *z;
    178 
    179 	z = v;
    180 	q = _vtqalloc();
    181 
    182 	qlock(&z->lk);
    183 	z->writeq = q;
    184 	qlock(&z->outlk);
    185 	rwakeup(&z->rpcfork);
    186 	qunlock(&z->lk);
    187 
    188 	while((p = _vtqrecv(q)) != nil)
    189 		if(_vtsend(z, p) < 0)
    190 			break;
    191 	qunlock(&z->outlk);
    192 	qlock(&z->lk);
    193 	_vtqhangup(q);
    194 	while((p = _vtnbqrecv(q)) != nil)
    195 		packetfree(p);
    196 	_vtqdecref(q);
    197 	z->writeq = nil;
    198 	rwakeup(&z->rpcfork);
    199 	qunlock(&z->lk);
    200 	return;
    201 }
    202 
    203 Packet*
    204 vtrecv(VtConn *z)
    205 {
    206 	Packet *p;
    207 	Queue *q;
    208 
    209 	qlock(&z->lk);
    210 	if(z->state != VtStateConnected){
    211 		werrstr("not connected");
    212 		qunlock(&z->lk);
    213 		return nil;
    214 	}
    215 	if(z->readq){
    216 		q = _vtqincref(z->readq);
    217 		qunlock(&z->lk);
    218 		p = _vtqrecv(q);
    219 		_vtqdecref(q);
    220 		return p;
    221 	}
    222 
    223 	qlock(&z->inlk);
    224 	qunlock(&z->lk);
    225 	p = _vtrecv(z);
    226 	qunlock(&z->inlk);
    227 	if(!p)
    228 		vthangup(z);
    229 	return p;
    230 }
    231 
    232 int
    233 vtsend(VtConn *z, Packet *p)
    234 {
    235 	Queue *q;
    236 
    237 	qlock(&z->lk);
    238 	if(z->state != VtStateConnected){
    239 		packetfree(p);
    240 		werrstr("not connected");
    241 		qunlock(&z->lk);
    242 		return -1;
    243 	}
    244 	if(z->writeq){
    245 		q = _vtqincref(z->writeq);
    246 		qunlock(&z->lk);
    247 		if(_vtqsend(q, p) < 0){
    248 			_vtqdecref(q);
    249 			packetfree(p);
    250 			return -1;
    251 		}
    252 		_vtqdecref(q);
    253 		return 0;
    254 	}
    255 
    256 	qlock(&z->outlk);
    257 	qunlock(&z->lk);
    258 	if(_vtsend(z, p) < 0){
    259 		qunlock(&z->outlk);
    260 		vthangup(z);
    261 		return -1;
    262 	}
    263 	qunlock(&z->outlk);
    264 	return 0;
    265 }