plan9port

fork of plan9port with libvec, libstr and libsdb
Log | Files | Refs | README | LICENSE

io.c (2279B)


      1 /* Copyright (C) 2003 Russ Cox, Massachusetts Institute of Technology */
      2 /* See COPYRIGHT */
      3 
      4 #include <u.h>
      5 #include <libc.h>
      6 #include <mux.h>
      7 
      8 /*
      9  * If you fork off two procs running muxrecvproc and muxsendproc,
     10  * then muxrecv/muxsend (and thus muxrpc) will never block except on
     11  * rendevouses, which is nice when it's running in one thread of many.
     12  */
     13 void
     14 _muxrecvproc(void *v)
     15 {
     16 	void *p;
     17 	Mux *mux;
     18 	Muxqueue *q;
     19 
     20 	mux = v;
     21 	q = _muxqalloc();
     22 
     23 	qlock(&mux->lk);
     24 	mux->readq = q;
     25 	qlock(&mux->inlk);
     26 	rwakeup(&mux->rpcfork);
     27 	qunlock(&mux->lk);
     28 
     29 	while((p = mux->recv(mux)) != nil)
     30 		if(_muxqsend(q, p) < 0){
     31 			free(p);
     32 			break;
     33 		}
     34 	qunlock(&mux->inlk);
     35 	qlock(&mux->lk);
     36 	_muxqhangup(q);
     37 	p = nil;
     38 	while(_muxnbqrecv(q, &p) && p != nil){
     39 		free(p);
     40 		p = nil;
     41 	}
     42 	free(q);
     43 	mux->readq = nil;
     44 	rwakeup(&mux->rpcfork);
     45 	qunlock(&mux->lk);
     46 }
     47 
     48 void
     49 _muxsendproc(void *v)
     50 {
     51 	Muxqueue *q;
     52 	void *p;
     53 	Mux *mux;
     54 
     55 	mux = v;
     56 	q = _muxqalloc();
     57 
     58 	qlock(&mux->lk);
     59 	mux->writeq = q;
     60 	qlock(&mux->outlk);
     61 	rwakeup(&mux->rpcfork);
     62 	qunlock(&mux->lk);
     63 
     64 	while((p = _muxqrecv(q)) != nil)
     65 		if(mux->send(mux, p) < 0)
     66 			break;
     67 	qunlock(&mux->outlk);
     68 	qlock(&mux->lk);
     69 	_muxqhangup(q);
     70 	while(_muxnbqrecv(q, &p))
     71 		free(p);
     72 	free(q);
     73 	mux->writeq = nil;
     74 	rwakeup(&mux->rpcfork);
     75 	qunlock(&mux->lk);
     76 	return;
     77 }
     78 
     79 int
     80 _muxrecv(Mux *mux, int canblock, void **vp)
     81 {
     82 	void *p;
     83 	int ret;
     84 
     85 	qlock(&mux->lk);
     86 	if(mux->readq){
     87 		qunlock(&mux->lk);
     88 		if(canblock){
     89 			*vp = _muxqrecv(mux->readq);
     90 			return 1;
     91 		}
     92 		return _muxnbqrecv(mux->readq, vp);
     93 	}
     94 
     95 	qlock(&mux->inlk);
     96 	qunlock(&mux->lk);
     97 	if(canblock){
     98 		p = mux->recv(mux);
     99 		ret = 1;
    100 	}else{
    101 		if(mux->nbrecv)
    102 			ret = mux->nbrecv(mux, &p);
    103 		else{
    104 			/* send eof, not "no packet ready" */
    105 			p = nil;
    106 			ret = 1;
    107 		}
    108 	}
    109 	qunlock(&mux->inlk);
    110 	*vp = p;
    111 	return ret;
    112 }
    113 
    114 int
    115 _muxsend(Mux *mux, void *p)
    116 {
    117 	qlock(&mux->lk);
    118 /*
    119 	if(mux->state != VtStateConnected){
    120 		packetfree(p);
    121 		werrstr("not connected");
    122 		qunlock(&mux->lk);
    123 		return -1;
    124 	}
    125 */
    126 	if(mux->writeq){
    127 		qunlock(&mux->lk);
    128 		if(_muxqsend(mux->writeq, p) < 0){
    129 			free(p);
    130 			return -1;
    131 		}
    132 		return 0;
    133 	}
    134 
    135 	qlock(&mux->outlk);
    136 	qunlock(&mux->lk);
    137 	if(mux->send(mux, p) < 0){
    138 		qunlock(&mux->outlk);
    139 		/* vthangup(mux); */
    140 		return -1;
    141 	}
    142 	qunlock(&mux->outlk);
    143 	return 0;
    144 }