plan9port

fork of plan9port with libvec, libstr and libsdb
Log | Files | Refs | README | LICENSE

rpc.c (3248B)


      1 /*
      2  * Multiplexed Venti client.  It would be nice if we
      3  * could turn this into a generic library routine rather
      4  * than keep it Venti specific.  A user-level 9P client
      5  * could use something like this too.
      6  *
      7  * (Actually it does - this should be replaced with libmux,
      8  * which should be renamed librpcmux.)
      9  *
     10  * This is a little more complicated than it might be
     11  * because we want it to work well within and without libthread.
     12  *
     13  * The mux code is inspired by tra's, which is inspired by the Plan 9 kernel.
     14  */
     15 
     16 #include <u.h>
     17 #include <libc.h>
     18 #include <venti.h>
     19 
     20 typedef struct Rwait Rwait;
     21 struct Rwait
     22 {
     23 	Rendez r;
     24 	Packet *p;
     25 	int done;
     26 	int sleeping;
     27 };
     28 
     29 static int gettag(VtConn*, Rwait*);
     30 static void puttag(VtConn*, Rwait*, int);
     31 static void muxrpc(VtConn*, Packet*);
     32 
     33 Packet*
     34 _vtrpc(VtConn *z, Packet *p, VtFcall *tx)
     35 {
     36 	int i;
     37 	uchar tag, buf[2], *top;
     38 	Rwait *r, *rr;
     39 
     40 	if(z == nil){
     41 		werrstr("not connected");
     42 		packetfree(p);
     43 		return nil;
     44 	}
     45 
     46 	/* must malloc because stack could be private */
     47 	r = vtmallocz(sizeof(Rwait));
     48 
     49 	qlock(&z->lk);
     50 	r->r.l = &z->lk;
     51 	tag = gettag(z, r);
     52 	if(tx){
     53 		/* vtfcallrpc can't print packet because it doesn't have tag */
     54 		tx->tag = tag;
     55 		if(chattyventi)
     56 			fprint(2, "%s -> %F\n", argv0, tx);
     57 	}
     58 
     59 	/* slam tag into packet */
     60 	top = packetpeek(p, buf, 0, 2);
     61 	if(top == nil){
     62 		packetfree(p);
     63 		return nil;
     64 	}
     65 	if(top == buf){
     66 		werrstr("first two bytes must be in same packet fragment");
     67 		packetfree(p);
     68 		vtfree(r);
     69 		return nil;
     70 	}
     71 	top[1] = tag;
     72 	qunlock(&z->lk);
     73 	if(vtsend(z, p) < 0){
     74 		vtfree(r);
     75 		return nil;
     76 	}
     77 
     78 	qlock(&z->lk);
     79 	/* wait for the muxer to give us our packet */
     80 	r->sleeping = 1;
     81 	z->nsleep++;
     82 	while(z->muxer && !r->done)
     83 		rsleep(&r->r);
     84 	z->nsleep--;
     85 	r->sleeping = 0;
     86 
     87 	/* if not done, there's no muxer: start muxing */
     88 	if(!r->done){
     89 		if(z->muxer)
     90 			abort();
     91 		z->muxer = 1;
     92 		while(!r->done){
     93 			qunlock(&z->lk);
     94 			if((p = vtrecv(z)) == nil){
     95 				werrstr("unexpected eof on venti connection");
     96 				z->muxer = 0;
     97 				vtfree(r);
     98 				return nil;
     99 			}
    100 			qlock(&z->lk);
    101 			muxrpc(z, p);
    102 		}
    103 		z->muxer = 0;
    104 		/* if there is anyone else sleeping, wake first unfinished to mux */
    105 		if(z->nsleep)
    106 		for(i=0; i<256; i++){
    107 			rr = z->wait[i];
    108 			if(rr && rr->sleeping && !rr->done){
    109 				rwakeup(&rr->r);
    110 				break;
    111 			}
    112 		}
    113 	}
    114 
    115 	p = r->p;
    116 	puttag(z, r, tag);
    117 	vtfree(r);
    118 	qunlock(&z->lk);
    119 	return p;
    120 }
    121 
    122 Packet*
    123 vtrpc(VtConn *z, Packet *p)
    124 {
    125 	return _vtrpc(z, p, nil);
    126 }
    127 
    128 static int
    129 gettag(VtConn *z, Rwait *r)
    130 {
    131 	int i;
    132 
    133 Again:
    134 	while(z->ntag == 256)
    135 		rsleep(&z->tagrend);
    136 	for(i=0; i<256; i++)
    137 		if(z->wait[i] == 0){
    138 			z->ntag++;
    139 			z->wait[i] = r;
    140 			return i;
    141 		}
    142 	fprint(2, "libventi: ntag botch\n");
    143 	goto Again;
    144 }
    145 
    146 static void
    147 puttag(VtConn *z, Rwait *r, int tag)
    148 {
    149 	assert(z->wait[tag] == r);
    150 	z->wait[tag] = nil;
    151 	z->ntag--;
    152 	rwakeup(&z->tagrend);
    153 }
    154 
    155 static void
    156 muxrpc(VtConn *z, Packet *p)
    157 {
    158 	uchar tag, buf[2], *top;
    159 	Rwait *r;
    160 
    161 	if((top = packetpeek(p, buf, 0, 2)) == nil){
    162 		fprint(2, "libventi: short packet in vtrpc\n");
    163 		packetfree(p);
    164 		return;
    165 	}
    166 
    167 	tag = top[1];
    168 	if((r = z->wait[tag]) == nil){
    169 		fprint(2, "libventi: unexpected packet tag %d in vtrpc\n", tag);
    170 abort();
    171 		packetfree(p);
    172 		return;
    173 	}
    174 
    175 	r->p = p;
    176 	r->done = 1;
    177 	rwakeup(&r->r);
    178 }