mux.c (5119B)
1 /* Copyright (C) 2003-2006 Russ Cox, Massachusetts Institute of Technology */ 2 /* See COPYRIGHT */ 3 4 /* 5 * Generic RPC packet multiplexor. Inspired by but not derived from 6 * Plan 9 kernel. Originally developed as part of Tra, later used in 7 * libnventi, and then finally split out into a generic library. 8 */ 9 10 #include <u.h> 11 #include <libc.h> 12 #include <mux.h> 13 14 static int gettag(Mux*, Muxrpc*); 15 static void puttag(Mux*, Muxrpc*); 16 static void enqueue(Mux*, Muxrpc*); 17 static void dequeue(Mux*, Muxrpc*); 18 19 void 20 muxinit(Mux *mux) 21 { 22 memset(&mux->lk, 0, sizeof(Mux)-offsetof(Mux, lk)); 23 mux->tagrend.l = &mux->lk; 24 mux->rpcfork.l = &mux->lk; 25 mux->sleep.next = &mux->sleep; 26 mux->sleep.prev = &mux->sleep; 27 } 28 29 static Muxrpc* 30 allocmuxrpc(Mux *mux) 31 { 32 Muxrpc *r; 33 34 /* must malloc because stack could be private */ 35 r = mallocz(sizeof(Muxrpc), 1); 36 if(r == nil){ 37 werrstr("mallocz: %r"); 38 return nil; 39 } 40 r->mux = mux; 41 r->r.l = &mux->lk; 42 r->waiting = 1; 43 44 return r; 45 } 46 47 static int 48 tagmuxrpc(Muxrpc *r, void *tx) 49 { 50 int tag; 51 Mux *mux; 52 53 mux = r->mux; 54 /* assign the tag, add selves to response queue */ 55 qlock(&mux->lk); 56 tag = gettag(mux, r); 57 /*print("gettag %p %d\n", r, tag); */ 58 enqueue(mux, r); 59 qunlock(&mux->lk); 60 61 /* actually send the packet */ 62 if(tag < 0 || mux->settag(mux, tx, tag) < 0 || _muxsend(mux, tx) < 0){ 63 werrstr("settag/send tag %d: %r", tag); 64 fprint(2, "%r\n"); 65 qlock(&mux->lk); 66 dequeue(mux, r); 67 puttag(mux, r); 68 qunlock(&mux->lk); 69 return -1; 70 } 71 return 0; 72 } 73 74 void 75 muxmsgandqlock(Mux *mux, void *p) 76 { 77 int tag; 78 Muxrpc *r2; 79 80 tag = mux->gettag(mux, p) - mux->mintag; 81 /*print("mux tag %d\n", tag); */ 82 qlock(&mux->lk); 83 /* hand packet to correct sleeper */ 84 if(tag < 0 || tag >= mux->mwait){ 85 fprint(2, "%s: bad rpc tag %ux\n", argv0, tag); 86 /* must leak packet! don't know how to free it! */ 87 return; 88 } 89 r2 = mux->wait[tag]; 90 if(r2 == nil || r2->prev == nil){ 91 fprint(2, "%s: bad rpc tag %ux (no one waiting on that tag)\n", argv0, tag); 92 /* must leak packet! don't know how to free it! */ 93 return; 94 } 95 r2->p = p; 96 dequeue(mux, r2); 97 rwakeup(&r2->r); 98 } 99 100 void 101 electmuxer(Mux *mux) 102 { 103 Muxrpc *rpc; 104 105 /* if there is anyone else sleeping, wake them to mux */ 106 for(rpc=mux->sleep.next; rpc != &mux->sleep; rpc = rpc->next){ 107 if(!rpc->async){ 108 mux->muxer = rpc; 109 rwakeup(&rpc->r); 110 return; 111 } 112 } 113 mux->muxer = nil; 114 } 115 116 void* 117 muxrpc(Mux *mux, void *tx) 118 { 119 int tag; 120 Muxrpc *r; 121 void *p; 122 123 if((r = allocmuxrpc(mux)) == nil) 124 return nil; 125 126 if((tag = tagmuxrpc(r, tx)) < 0) 127 return nil; 128 129 qlock(&mux->lk); 130 /* wait for our packet */ 131 while(mux->muxer && mux->muxer != r && !r->p) 132 rsleep(&r->r); 133 134 /* if not done, there's no muxer: start muxing */ 135 if(!r->p){ 136 if(mux->muxer != nil && mux->muxer != r) 137 abort(); 138 mux->muxer = r; 139 while(!r->p){ 140 qunlock(&mux->lk); 141 _muxrecv(mux, 1, &p); 142 if(p == nil){ 143 /* eof -- just give up and pass the buck */ 144 qlock(&mux->lk); 145 dequeue(mux, r); 146 break; 147 } 148 muxmsgandqlock(mux, p); 149 } 150 electmuxer(mux); 151 } 152 p = r->p; 153 puttag(mux, r); 154 qunlock(&mux->lk); 155 if(p == nil) 156 werrstr("unexpected eof"); 157 return p; 158 } 159 160 Muxrpc* 161 muxrpcstart(Mux *mux, void *tx) 162 { 163 int tag; 164 Muxrpc *r; 165 166 if((r = allocmuxrpc(mux)) == nil) 167 return nil; 168 r->async = 1; 169 if((tag = tagmuxrpc(r, tx)) < 0) 170 return nil; 171 return r; 172 } 173 174 int 175 muxrpccanfinish(Muxrpc *r, void **vp) 176 { 177 void *p; 178 Mux *mux; 179 int ret; 180 181 mux = r->mux; 182 qlock(&mux->lk); 183 ret = 1; 184 if(!r->p && !mux->muxer){ 185 mux->muxer = r; 186 while(!r->p){ 187 qunlock(&mux->lk); 188 p = nil; 189 if(!_muxrecv(mux, 0, &p)) 190 ret = 0; 191 if(p == nil){ 192 qlock(&mux->lk); 193 break; 194 } 195 muxmsgandqlock(mux, p); 196 } 197 electmuxer(mux); 198 } 199 p = r->p; 200 if(p) 201 puttag(mux, r); 202 qunlock(&mux->lk); 203 *vp = p; 204 return ret; 205 } 206 207 static void 208 enqueue(Mux *mux, Muxrpc *r) 209 { 210 r->next = mux->sleep.next; 211 r->prev = &mux->sleep; 212 r->next->prev = r; 213 r->prev->next = r; 214 } 215 216 static void 217 dequeue(Mux *mux, Muxrpc *r) 218 { 219 r->next->prev = r->prev; 220 r->prev->next = r->next; 221 r->prev = nil; 222 r->next = nil; 223 } 224 225 static int 226 gettag(Mux *mux, Muxrpc *r) 227 { 228 int i, mw; 229 Muxrpc **w; 230 231 for(;;){ 232 /* wait for a free tag */ 233 while(mux->nwait == mux->mwait){ 234 if(mux->mwait < mux->maxtag-mux->mintag){ 235 mw = mux->mwait; 236 if(mw == 0) 237 mw = 1; 238 else 239 mw <<= 1; 240 w = realloc(mux->wait, mw*sizeof(w[0])); 241 if(w == nil) 242 return -1; 243 memset(w+mux->mwait, 0, (mw-mux->mwait)*sizeof(w[0])); 244 mux->wait = w; 245 mux->freetag = mux->mwait; 246 mux->mwait = mw; 247 break; 248 } 249 rsleep(&mux->tagrend); 250 } 251 252 i=mux->freetag; 253 if(mux->wait[i] == 0) 254 goto Found; 255 for(; i<mux->mwait; i++) 256 if(mux->wait[i] == 0) 257 goto Found; 258 for(i=0; i<mux->freetag; i++) 259 if(mux->wait[i] == 0) 260 goto Found; 261 /* should not fall out of while without free tag */ 262 fprint(2, "libfs: nwait botch\n"); 263 abort(); 264 } 265 266 Found: 267 mux->nwait++; 268 mux->wait[i] = r; 269 r->tag = i+mux->mintag; 270 return r->tag; 271 } 272 273 static void 274 puttag(Mux *mux, Muxrpc *r) 275 { 276 int i; 277 278 i = r->tag - mux->mintag; 279 assert(mux->wait[i] == r); 280 mux->wait[i] = nil; 281 mux->nwait--; 282 mux->freetag = i; 283 rwakeup(&mux->tagrend); 284 free(r); 285 }