rpc.c (3248B)
1 /* 2 * Multiplexed Venti client. It would be nice if we 3 * could turn this into a generic library routine rather 4 * than keep it Venti specific. A user-level 9P client 5 * could use something like this too. 6 * 7 * (Actually it does - this should be replaced with libmux, 8 * which should be renamed librpcmux.) 9 * 10 * This is a little more complicated than it might be 11 * because we want it to work well within and without libthread. 12 * 13 * The mux code is inspired by tra's, which is inspired by the Plan 9 kernel. 14 */ 15 16 #include <u.h> 17 #include <libc.h> 18 #include <venti.h> 19 20 typedef struct Rwait Rwait; 21 struct Rwait 22 { 23 Rendez r; 24 Packet *p; 25 int done; 26 int sleeping; 27 }; 28 29 static int gettag(VtConn*, Rwait*); 30 static void puttag(VtConn*, Rwait*, int); 31 static void muxrpc(VtConn*, Packet*); 32 33 Packet* 34 _vtrpc(VtConn *z, Packet *p, VtFcall *tx) 35 { 36 int i; 37 uchar tag, buf[2], *top; 38 Rwait *r, *rr; 39 40 if(z == nil){ 41 werrstr("not connected"); 42 packetfree(p); 43 return nil; 44 } 45 46 /* must malloc because stack could be private */ 47 r = vtmallocz(sizeof(Rwait)); 48 49 qlock(&z->lk); 50 r->r.l = &z->lk; 51 tag = gettag(z, r); 52 if(tx){ 53 /* vtfcallrpc can't print packet because it doesn't have tag */ 54 tx->tag = tag; 55 if(chattyventi) 56 fprint(2, "%s -> %F\n", argv0, tx); 57 } 58 59 /* slam tag into packet */ 60 top = packetpeek(p, buf, 0, 2); 61 if(top == nil){ 62 packetfree(p); 63 return nil; 64 } 65 if(top == buf){ 66 werrstr("first two bytes must be in same packet fragment"); 67 packetfree(p); 68 vtfree(r); 69 return nil; 70 } 71 top[1] = tag; 72 qunlock(&z->lk); 73 if(vtsend(z, p) < 0){ 74 vtfree(r); 75 return nil; 76 } 77 78 qlock(&z->lk); 79 /* wait for the muxer to give us our packet */ 80 r->sleeping = 1; 81 z->nsleep++; 82 while(z->muxer && !r->done) 83 rsleep(&r->r); 84 z->nsleep--; 85 r->sleeping = 0; 86 87 /* if not done, there's no muxer: start muxing */ 88 if(!r->done){ 89 if(z->muxer) 90 abort(); 91 z->muxer = 1; 92 while(!r->done){ 93 qunlock(&z->lk); 94 if((p = vtrecv(z)) == nil){ 95 werrstr("unexpected eof on venti connection"); 96 z->muxer = 0; 97 vtfree(r); 98 return nil; 99 } 100 qlock(&z->lk); 101 muxrpc(z, p); 102 } 103 z->muxer = 0; 104 /* if there is anyone else sleeping, wake first unfinished to mux */ 105 if(z->nsleep) 106 for(i=0; i<256; i++){ 107 rr = z->wait[i]; 108 if(rr && rr->sleeping && !rr->done){ 109 rwakeup(&rr->r); 110 break; 111 } 112 } 113 } 114 115 p = r->p; 116 puttag(z, r, tag); 117 vtfree(r); 118 qunlock(&z->lk); 119 return p; 120 } 121 122 Packet* 123 vtrpc(VtConn *z, Packet *p) 124 { 125 return _vtrpc(z, p, nil); 126 } 127 128 static int 129 gettag(VtConn *z, Rwait *r) 130 { 131 int i; 132 133 Again: 134 while(z->ntag == 256) 135 rsleep(&z->tagrend); 136 for(i=0; i<256; i++) 137 if(z->wait[i] == 0){ 138 z->ntag++; 139 z->wait[i] = r; 140 return i; 141 } 142 fprint(2, "libventi: ntag botch\n"); 143 goto Again; 144 } 145 146 static void 147 puttag(VtConn *z, Rwait *r, int tag) 148 { 149 assert(z->wait[tag] == r); 150 z->wait[tag] = nil; 151 z->ntag--; 152 rwakeup(&z->tagrend); 153 } 154 155 static void 156 muxrpc(VtConn *z, Packet *p) 157 { 158 uchar tag, buf[2], *top; 159 Rwait *r; 160 161 if((top = packetpeek(p, buf, 0, 2)) == nil){ 162 fprint(2, "libventi: short packet in vtrpc\n"); 163 packetfree(p); 164 return; 165 } 166 167 tag = top[1]; 168 if((r = z->wait[tag]) == nil){ 169 fprint(2, "libventi: unexpected packet tag %d in vtrpc\n", tag); 170 abort(); 171 packetfree(p); 172 return; 173 } 174 175 r->p = p; 176 r->done = 1; 177 rwakeup(&r->r); 178 }