io.c (2279B)
1 /* Copyright (C) 2003 Russ Cox, Massachusetts Institute of Technology */ 2 /* See COPYRIGHT */ 3 4 #include <u.h> 5 #include <libc.h> 6 #include <mux.h> 7 8 /* 9 * If you fork off two procs running muxrecvproc and muxsendproc, 10 * then muxrecv/muxsend (and thus muxrpc) will never block except on 11 * rendevouses, which is nice when it's running in one thread of many. 12 */ 13 void 14 _muxrecvproc(void *v) 15 { 16 void *p; 17 Mux *mux; 18 Muxqueue *q; 19 20 mux = v; 21 q = _muxqalloc(); 22 23 qlock(&mux->lk); 24 mux->readq = q; 25 qlock(&mux->inlk); 26 rwakeup(&mux->rpcfork); 27 qunlock(&mux->lk); 28 29 while((p = mux->recv(mux)) != nil) 30 if(_muxqsend(q, p) < 0){ 31 free(p); 32 break; 33 } 34 qunlock(&mux->inlk); 35 qlock(&mux->lk); 36 _muxqhangup(q); 37 p = nil; 38 while(_muxnbqrecv(q, &p) && p != nil){ 39 free(p); 40 p = nil; 41 } 42 free(q); 43 mux->readq = nil; 44 rwakeup(&mux->rpcfork); 45 qunlock(&mux->lk); 46 } 47 48 void 49 _muxsendproc(void *v) 50 { 51 Muxqueue *q; 52 void *p; 53 Mux *mux; 54 55 mux = v; 56 q = _muxqalloc(); 57 58 qlock(&mux->lk); 59 mux->writeq = q; 60 qlock(&mux->outlk); 61 rwakeup(&mux->rpcfork); 62 qunlock(&mux->lk); 63 64 while((p = _muxqrecv(q)) != nil) 65 if(mux->send(mux, p) < 0) 66 break; 67 qunlock(&mux->outlk); 68 qlock(&mux->lk); 69 _muxqhangup(q); 70 while(_muxnbqrecv(q, &p)) 71 free(p); 72 free(q); 73 mux->writeq = nil; 74 rwakeup(&mux->rpcfork); 75 qunlock(&mux->lk); 76 return; 77 } 78 79 int 80 _muxrecv(Mux *mux, int canblock, void **vp) 81 { 82 void *p; 83 int ret; 84 85 qlock(&mux->lk); 86 if(mux->readq){ 87 qunlock(&mux->lk); 88 if(canblock){ 89 *vp = _muxqrecv(mux->readq); 90 return 1; 91 } 92 return _muxnbqrecv(mux->readq, vp); 93 } 94 95 qlock(&mux->inlk); 96 qunlock(&mux->lk); 97 if(canblock){ 98 p = mux->recv(mux); 99 ret = 1; 100 }else{ 101 if(mux->nbrecv) 102 ret = mux->nbrecv(mux, &p); 103 else{ 104 /* send eof, not "no packet ready" */ 105 p = nil; 106 ret = 1; 107 } 108 } 109 qunlock(&mux->inlk); 110 *vp = p; 111 return ret; 112 } 113 114 int 115 _muxsend(Mux *mux, void *p) 116 { 117 qlock(&mux->lk); 118 /* 119 if(mux->state != VtStateConnected){ 120 packetfree(p); 121 werrstr("not connected"); 122 qunlock(&mux->lk); 123 return -1; 124 } 125 */ 126 if(mux->writeq){ 127 qunlock(&mux->lk); 128 if(_muxqsend(mux->writeq, p) < 0){ 129 free(p); 130 return -1; 131 } 132 return 0; 133 } 134 135 qlock(&mux->outlk); 136 qunlock(&mux->lk); 137 if(mux->send(mux, p) < 0){ 138 qunlock(&mux->outlk); 139 /* vthangup(mux); */ 140 return -1; 141 } 142 qunlock(&mux->outlk); 143 return 0; 144 }