plan9port

fork of plan9port with libvec, libstr and libsdb
Log | Files | Refs | README | LICENSE

mux.c (5119B)


      1 /* Copyright (C) 2003-2006 Russ Cox, Massachusetts Institute of Technology */
      2 /* See COPYRIGHT */
      3 
      4 /*
      5  * Generic RPC packet multiplexor.  Inspired by but not derived from
      6  * Plan 9 kernel.  Originally developed as part of Tra, later used in
      7  * libnventi, and then finally split out into a generic library.
      8  */
      9 
     10 #include <u.h>
     11 #include <libc.h>
     12 #include <mux.h>
     13 
     14 static int gettag(Mux*, Muxrpc*);
     15 static void puttag(Mux*, Muxrpc*);
     16 static void enqueue(Mux*, Muxrpc*);
     17 static void dequeue(Mux*, Muxrpc*);
     18 
     19 void
     20 muxinit(Mux *mux)
     21 {
     22 	memset(&mux->lk, 0, sizeof(Mux)-offsetof(Mux, lk));
     23 	mux->tagrend.l = &mux->lk;
     24 	mux->rpcfork.l = &mux->lk;
     25 	mux->sleep.next = &mux->sleep;
     26 	mux->sleep.prev = &mux->sleep;
     27 }
     28 
     29 static Muxrpc*
     30 allocmuxrpc(Mux *mux)
     31 {
     32 	Muxrpc *r;
     33 
     34 	/* must malloc because stack could be private */
     35 	r = mallocz(sizeof(Muxrpc), 1);
     36 	if(r == nil){
     37 		werrstr("mallocz: %r");
     38 		return nil;
     39 	}
     40 	r->mux = mux;
     41 	r->r.l = &mux->lk;
     42 	r->waiting = 1;
     43 
     44 	return r;
     45 }
     46 
     47 static int
     48 tagmuxrpc(Muxrpc *r, void *tx)
     49 {
     50 	int tag;
     51 	Mux *mux;
     52 
     53 	mux = r->mux;
     54 	/* assign the tag, add selves to response queue */
     55 	qlock(&mux->lk);
     56 	tag = gettag(mux, r);
     57 /*print("gettag %p %d\n", r, tag); */
     58 	enqueue(mux, r);
     59 	qunlock(&mux->lk);
     60 
     61 	/* actually send the packet */
     62 	if(tag < 0 || mux->settag(mux, tx, tag) < 0 || _muxsend(mux, tx) < 0){
     63 		werrstr("settag/send tag %d: %r", tag);
     64 		fprint(2, "%r\n");
     65 		qlock(&mux->lk);
     66 		dequeue(mux, r);
     67 		puttag(mux, r);
     68 		qunlock(&mux->lk);
     69 		return -1;
     70 	}
     71 	return 0;
     72 }
     73 
     74 void
     75 muxmsgandqlock(Mux *mux, void *p)
     76 {
     77 	int tag;
     78 	Muxrpc *r2;
     79 
     80 	tag = mux->gettag(mux, p) - mux->mintag;
     81 /*print("mux tag %d\n", tag); */
     82 	qlock(&mux->lk);
     83 	/* hand packet to correct sleeper */
     84 	if(tag < 0 || tag >= mux->mwait){
     85 		fprint(2, "%s: bad rpc tag %ux\n", argv0, tag);
     86 		/* must leak packet! don't know how to free it! */
     87 		return;
     88 	}
     89 	r2 = mux->wait[tag];
     90 	if(r2 == nil || r2->prev == nil){
     91 		fprint(2, "%s: bad rpc tag %ux (no one waiting on that tag)\n", argv0, tag);
     92 		/* must leak packet! don't know how to free it! */
     93 		return;
     94 	}
     95 	r2->p = p;
     96 	dequeue(mux, r2);
     97 	rwakeup(&r2->r);
     98 }
     99 
    100 void
    101 electmuxer(Mux *mux)
    102 {
    103 	Muxrpc *rpc;
    104 
    105 	/* if there is anyone else sleeping, wake them to mux */
    106 	for(rpc=mux->sleep.next; rpc != &mux->sleep; rpc = rpc->next){
    107 		if(!rpc->async){
    108 			mux->muxer = rpc;
    109 			rwakeup(&rpc->r);
    110 			return;
    111 		}
    112 	}
    113 	mux->muxer = nil;
    114 }
    115 
    116 void*
    117 muxrpc(Mux *mux, void *tx)
    118 {
    119 	int tag;
    120 	Muxrpc *r;
    121 	void *p;
    122 
    123 	if((r = allocmuxrpc(mux)) == nil)
    124 		return nil;
    125 
    126 	if((tag = tagmuxrpc(r, tx)) < 0)
    127 		return nil;
    128 
    129 	qlock(&mux->lk);
    130 	/* wait for our packet */
    131 	while(mux->muxer && mux->muxer != r && !r->p)
    132 		rsleep(&r->r);
    133 
    134 	/* if not done, there's no muxer: start muxing */
    135 	if(!r->p){
    136 		if(mux->muxer != nil && mux->muxer != r)
    137 			abort();
    138 		mux->muxer = r;
    139 		while(!r->p){
    140 			qunlock(&mux->lk);
    141 			_muxrecv(mux, 1, &p);
    142 			if(p == nil){
    143 				/* eof -- just give up and pass the buck */
    144 				qlock(&mux->lk);
    145 				dequeue(mux, r);
    146 				break;
    147 			}
    148 			muxmsgandqlock(mux, p);
    149 		}
    150 		electmuxer(mux);
    151 	}
    152 	p = r->p;
    153 	puttag(mux, r);
    154 	qunlock(&mux->lk);
    155 	if(p == nil)
    156 		werrstr("unexpected eof");
    157 	return p;
    158 }
    159 
    160 Muxrpc*
    161 muxrpcstart(Mux *mux, void *tx)
    162 {
    163 	int tag;
    164 	Muxrpc *r;
    165 
    166 	if((r = allocmuxrpc(mux)) == nil)
    167 		return nil;
    168 	r->async = 1;
    169 	if((tag = tagmuxrpc(r, tx)) < 0)
    170 		return nil;
    171 	return r;
    172 }
    173 
    174 int
    175 muxrpccanfinish(Muxrpc *r, void **vp)
    176 {
    177 	void *p;
    178 	Mux *mux;
    179 	int ret;
    180 
    181 	mux = r->mux;
    182 	qlock(&mux->lk);
    183 	ret = 1;
    184 	if(!r->p && !mux->muxer){
    185 		mux->muxer = r;
    186 		while(!r->p){
    187 			qunlock(&mux->lk);
    188 			p = nil;
    189 			if(!_muxrecv(mux, 0, &p))
    190 				ret = 0;
    191 			if(p == nil){
    192 				qlock(&mux->lk);
    193 				break;
    194 			}
    195 			muxmsgandqlock(mux, p);
    196 		}
    197 		electmuxer(mux);
    198 	}
    199 	p = r->p;
    200 	if(p)
    201 		puttag(mux, r);
    202 	qunlock(&mux->lk);
    203 	*vp = p;
    204 	return ret;
    205 }
    206 
    207 static void
    208 enqueue(Mux *mux, Muxrpc *r)
    209 {
    210 	r->next = mux->sleep.next;
    211 	r->prev = &mux->sleep;
    212 	r->next->prev = r;
    213 	r->prev->next = r;
    214 }
    215 
    216 static void
    217 dequeue(Mux *mux, Muxrpc *r)
    218 {
    219 	r->next->prev = r->prev;
    220 	r->prev->next = r->next;
    221 	r->prev = nil;
    222 	r->next = nil;
    223 }
    224 
    225 static int
    226 gettag(Mux *mux, Muxrpc *r)
    227 {
    228 	int i, mw;
    229 	Muxrpc **w;
    230 
    231 	for(;;){
    232 		/* wait for a free tag */
    233 		while(mux->nwait == mux->mwait){
    234 			if(mux->mwait < mux->maxtag-mux->mintag){
    235 				mw = mux->mwait;
    236 				if(mw == 0)
    237 					mw = 1;
    238 				else
    239 					mw <<= 1;
    240 				w = realloc(mux->wait, mw*sizeof(w[0]));
    241 				if(w == nil)
    242 					return -1;
    243 				memset(w+mux->mwait, 0, (mw-mux->mwait)*sizeof(w[0]));
    244 				mux->wait = w;
    245 				mux->freetag = mux->mwait;
    246 				mux->mwait = mw;
    247 				break;
    248 			}
    249 			rsleep(&mux->tagrend);
    250 		}
    251 
    252 		i=mux->freetag;
    253 		if(mux->wait[i] == 0)
    254 			goto Found;
    255 		for(; i<mux->mwait; i++)
    256 			if(mux->wait[i] == 0)
    257 				goto Found;
    258 		for(i=0; i<mux->freetag; i++)
    259 			if(mux->wait[i] == 0)
    260 				goto Found;
    261 		/* should not fall out of while without free tag */
    262 		fprint(2, "libfs: nwait botch\n");
    263 		abort();
    264 	}
    265 
    266 Found:
    267 	mux->nwait++;
    268 	mux->wait[i] = r;
    269 	r->tag = i+mux->mintag;
    270 	return r->tag;
    271 }
    272 
    273 static void
    274 puttag(Mux *mux, Muxrpc *r)
    275 {
    276 	int i;
    277 
    278 	i = r->tag - mux->mintag;
    279 	assert(mux->wait[i] == r);
    280 	mux->wait[i] = nil;
    281 	mux->nwait--;
    282 	mux->freetag = i;
    283 	rwakeup(&mux->tagrend);
    284 	free(r);
    285 }