channel.c (6729B)
1 #include "threadimpl.h" 2 3 /* 4 * One can go through a lot of effort to avoid this global lock. 5 * You have to put locks in all the channels and all the Alt 6 * structures. At the beginning of an alt you have to lock all 7 * the channels, but then to try to actually exec an op you 8 * have to lock the other guy's alt structure, so that other 9 * people aren't trying to use him in some other op at the 10 * same time. 11 * 12 * For Plan 9 apps, it's just not worth the extra effort. 13 */ 14 static QLock chanlock; 15 16 Channel* 17 chancreate(int elemsize, int bufsize) 18 { 19 Channel *c; 20 21 c = malloc(sizeof *c+bufsize*elemsize); 22 if(c == nil) 23 sysfatal("chancreate malloc: %r"); 24 memset(c, 0, sizeof *c); 25 c->elemsize = elemsize; 26 c->bufsize = bufsize; 27 c->nbuf = 0; 28 c->buf = (uchar*)(c+1); 29 return c; 30 } 31 32 void 33 chansetname(Channel *c, char *fmt, ...) 34 { 35 char *name; 36 va_list arg; 37 38 va_start(arg, fmt); 39 name = vsmprint(fmt, arg); 40 va_end(arg); 41 free(c->name); 42 c->name = name; 43 } 44 45 /* bug - work out races */ 46 void 47 chanfree(Channel *c) 48 { 49 if(c == nil) 50 return; 51 free(c->name); 52 free(c->arecv.a); 53 free(c->asend.a); 54 free(c); 55 } 56 57 static void 58 addarray(_Altarray *a, Alt *alt) 59 { 60 if(a->n == a->m){ 61 a->m += 16; 62 a->a = realloc(a->a, a->m*sizeof a->a[0]); 63 } 64 a->a[a->n++] = alt; 65 } 66 67 static void 68 delarray(_Altarray *a, int i) 69 { 70 --a->n; 71 a->a[i] = a->a[a->n]; 72 } 73 74 /* 75 * doesn't really work for things other than CHANSND and CHANRCV 76 * but is only used as arg to chanarray, which can handle it 77 */ 78 #define otherop(op) (CHANSND+CHANRCV-(op)) 79 80 static _Altarray* 81 chanarray(Channel *c, uint op) 82 { 83 switch(op){ 84 default: 85 return nil; 86 case CHANSND: 87 return &c->asend; 88 case CHANRCV: 89 return &c->arecv; 90 } 91 } 92 93 static int 94 altcanexec(Alt *a) 95 { 96 _Altarray *ar; 97 Channel *c; 98 99 if(a->op == CHANNOP || (c=a->c) == nil) 100 return 0; 101 if(c->bufsize == 0){ 102 ar = chanarray(c, otherop(a->op)); 103 return ar && ar->n; 104 }else{ 105 switch(a->op){ 106 default: 107 return 0; 108 case CHANSND: 109 return c->nbuf < c->bufsize; 110 case CHANRCV: 111 return c->nbuf > 0; 112 } 113 } 114 } 115 116 static void 117 altqueue(Alt *a) 118 { 119 _Altarray *ar; 120 121 if(a->c == nil) 122 return; 123 ar = chanarray(a->c, a->op); 124 addarray(ar, a); 125 } 126 127 static void 128 altdequeue(Alt *a) 129 { 130 int i; 131 _Altarray *ar; 132 133 ar = chanarray(a->c, a->op); 134 if(ar == nil){ 135 fprint(2, "bad use of altdequeue op=%d\n", a->op); 136 abort(); 137 } 138 139 for(i=0; i<ar->n; i++) 140 if(ar->a[i] == a){ 141 delarray(ar, i); 142 return; 143 } 144 fprint(2, "cannot find self in altdequeue\n"); 145 abort(); 146 } 147 148 static void 149 altalldequeue(Alt *a) 150 { 151 int i; 152 153 for(i=0; a[i].op!=CHANEND && a[i].op!=CHANNOBLK; i++) 154 if(a[i].op != CHANNOP) 155 altdequeue(&a[i]); 156 } 157 158 static void 159 amove(void *dst, void *src, uint n) 160 { 161 if(dst){ 162 if(src == nil) 163 memset(dst, 0, n); 164 else 165 memmove(dst, src, n); 166 } 167 } 168 169 /* 170 * Actually move the data around. There are up to three 171 * players: the sender, the receiver, and the channel itself. 172 * If the channel is unbuffered or the buffer is empty, 173 * data goes from sender to receiver. If the channel is full, 174 * the receiver removes some from the channel and the sender 175 * gets to put some in. 176 */ 177 static void 178 altcopy(Alt *s, Alt *r) 179 { 180 Alt *t; 181 Channel *c; 182 uchar *cp; 183 184 /* 185 * Work out who is sender and who is receiver 186 */ 187 if(s == nil && r == nil) 188 return; 189 assert(s != nil); 190 c = s->c; 191 if(s->op == CHANRCV){ 192 t = s; 193 s = r; 194 r = t; 195 } 196 assert(s==nil || s->op == CHANSND); 197 assert(r==nil || r->op == CHANRCV); 198 199 /* 200 * Channel is empty (or unbuffered) - copy directly. 201 */ 202 if(s && r && c->nbuf == 0){ 203 amove(r->v, s->v, c->elemsize); 204 return; 205 } 206 207 /* 208 * Otherwise it's always okay to receive and then send. 209 */ 210 if(r){ 211 cp = c->buf + c->off*c->elemsize; 212 amove(r->v, cp, c->elemsize); 213 --c->nbuf; 214 if(++c->off == c->bufsize) 215 c->off = 0; 216 } 217 if(s){ 218 cp = c->buf + (c->off+c->nbuf)%c->bufsize*c->elemsize; 219 amove(cp, s->v, c->elemsize); 220 ++c->nbuf; 221 } 222 } 223 224 static void 225 altexec(Alt *a) 226 { 227 int i; 228 _Altarray *ar; 229 Alt *other; 230 Channel *c; 231 232 c = a->c; 233 ar = chanarray(c, otherop(a->op)); 234 if(ar && ar->n){ 235 i = rand()%ar->n; 236 other = ar->a[i]; 237 altcopy(a, other); 238 altalldequeue(other->thread->alt); 239 other->thread->alt = other; 240 _threadready(other->thread); 241 }else 242 altcopy(a, nil); 243 } 244 245 #define dbgalt 0 246 int 247 chanalt(Alt *a) 248 { 249 int i, j, ncan, n, canblock; 250 Channel *c; 251 _Thread *t; 252 253 needstack(512); 254 for(i=0; a[i].op != CHANEND && a[i].op != CHANNOBLK; i++) 255 ; 256 n = i; 257 canblock = a[i].op == CHANEND; 258 259 t = proc()->thread; 260 for(i=0; i<n; i++) 261 a[i].thread = t; 262 t->alt = a; 263 qlock(&chanlock); 264 if(dbgalt) print("alt "); 265 ncan = 0; 266 for(i=0; i<n; i++){ 267 c = a[i].c; 268 if(dbgalt) print(" %c:", "esrnb"[a[i].op]); 269 if(dbgalt) if(c->name) print("%s", c->name); else print("%p", c); 270 if(altcanexec(&a[i])){ 271 if(dbgalt) print("*"); 272 ncan++; 273 } 274 } 275 if(ncan){ 276 j = rand()%ncan; 277 for(i=0; i<n; i++){ 278 if(altcanexec(&a[i])){ 279 if(j-- == 0){ 280 if(dbgalt){ 281 c = a[i].c; 282 print(" => %c:", "esrnb"[a[i].op]); 283 if(c->name) print("%s", c->name); else print("%p", c); 284 print("\n"); 285 } 286 altexec(&a[i]); 287 qunlock(&chanlock); 288 return i; 289 } 290 } 291 } 292 } 293 if(dbgalt)print("\n"); 294 295 if(!canblock){ 296 qunlock(&chanlock); 297 return -1; 298 } 299 300 for(i=0; i<n; i++){ 301 if(a[i].op != CHANNOP) 302 altqueue(&a[i]); 303 } 304 qunlock(&chanlock); 305 306 _threadswitch(); 307 308 /* 309 * the guy who ran the op took care of dequeueing us 310 * and then set t->alt to the one that was executed. 311 */ 312 if(t->alt < a || t->alt >= a+n) 313 sysfatal("channel bad alt"); 314 return t->alt - a; 315 } 316 317 static int 318 _chanop(Channel *c, int op, void *p, int canblock) 319 { 320 Alt a[2]; 321 322 a[0].c = c; 323 a[0].op = op; 324 a[0].v = p; 325 a[1].op = canblock ? CHANEND : CHANNOBLK; 326 if(chanalt(a) < 0) 327 return -1; 328 return 1; 329 } 330 331 int 332 chansend(Channel *c, void *v) 333 { 334 return _chanop(c, CHANSND, v, 1); 335 } 336 337 int 338 channbsend(Channel *c, void *v) 339 { 340 return _chanop(c, CHANSND, v, 0); 341 } 342 343 int 344 chanrecv(Channel *c, void *v) 345 { 346 return _chanop(c, CHANRCV, v, 1); 347 } 348 349 int 350 channbrecv(Channel *c, void *v) 351 { 352 return _chanop(c, CHANRCV, v, 0); 353 } 354 355 int 356 chansendp(Channel *c, void *v) 357 { 358 return _chanop(c, CHANSND, (void*)&v, 1); 359 } 360 361 void* 362 chanrecvp(Channel *c) 363 { 364 void *v; 365 366 if(_chanop(c, CHANRCV, (void*)&v, 1) > 0) 367 return v; 368 return nil; 369 } 370 371 int 372 channbsendp(Channel *c, void *v) 373 { 374 return _chanop(c, CHANSND, (void*)&v, 0); 375 } 376 377 void* 378 channbrecvp(Channel *c) 379 { 380 void *v; 381 382 if(_chanop(c, CHANRCV, (void*)&v, 0) > 0) 383 return v; 384 return nil; 385 } 386 387 int 388 chansendul(Channel *c, ulong val) 389 { 390 return _chanop(c, CHANSND, &val, 1); 391 } 392 393 ulong 394 chanrecvul(Channel *c) 395 { 396 ulong val; 397 398 if(_chanop(c, CHANRCV, &val, 1) > 0) 399 return val; 400 return 0; 401 } 402 403 int 404 channbsendul(Channel *c, ulong val) 405 { 406 return _chanop(c, CHANSND, &val, 0); 407 } 408 409 ulong 410 channbrecvul(Channel *c) 411 { 412 ulong val; 413 414 if(_chanop(c, CHANRCV, &val, 0) > 0) 415 return val; 416 return 0; 417 }