plan9port

fork of plan9port with libvec, libstr and libsdb
Log | Files | Refs | README | LICENSE

channel.c (6729B)


      1 #include "threadimpl.h"
      2 
      3 /*
      4  * One can go through a lot of effort to avoid this global lock.
      5  * You have to put locks in all the channels and all the Alt
      6  * structures.  At the beginning of an alt you have to lock all
      7  * the channels, but then to try to actually exec an op you
      8  * have to lock the other guy's alt structure, so that other
      9  * people aren't trying to use him in some other op at the
     10  * same time.
     11  *
     12  * For Plan 9 apps, it's just not worth the extra effort.
     13  */
     14 static QLock chanlock;
     15 
     16 Channel*
     17 chancreate(int elemsize, int bufsize)
     18 {
     19 	Channel *c;
     20 
     21 	c = malloc(sizeof *c+bufsize*elemsize);
     22 	if(c == nil)
     23 		sysfatal("chancreate malloc: %r");
     24 	memset(c, 0, sizeof *c);
     25 	c->elemsize = elemsize;
     26 	c->bufsize = bufsize;
     27 	c->nbuf = 0;
     28 	c->buf = (uchar*)(c+1);
     29 	return c;
     30 }
     31 
     32 void
     33 chansetname(Channel *c, char *fmt, ...)
     34 {
     35 	char *name;
     36 	va_list arg;
     37 
     38 	va_start(arg, fmt);
     39 	name = vsmprint(fmt, arg);
     40 	va_end(arg);
     41 	free(c->name);
     42 	c->name = name;
     43 }
     44 
     45 /* bug - work out races */
     46 void
     47 chanfree(Channel *c)
     48 {
     49 	if(c == nil)
     50 		return;
     51 	free(c->name);
     52 	free(c->arecv.a);
     53 	free(c->asend.a);
     54 	free(c);
     55 }
     56 
     57 static void
     58 addarray(_Altarray *a, Alt *alt)
     59 {
     60 	if(a->n == a->m){
     61 		a->m += 16;
     62 		a->a = realloc(a->a, a->m*sizeof a->a[0]);
     63 	}
     64 	a->a[a->n++] = alt;
     65 }
     66 
     67 static void
     68 delarray(_Altarray *a, int i)
     69 {
     70 	--a->n;
     71 	a->a[i] = a->a[a->n];
     72 }
     73 
     74 /*
     75  * doesn't really work for things other than CHANSND and CHANRCV
     76  * but is only used as arg to chanarray, which can handle it
     77  */
     78 #define otherop(op)	(CHANSND+CHANRCV-(op))
     79 
     80 static _Altarray*
     81 chanarray(Channel *c, uint op)
     82 {
     83 	switch(op){
     84 	default:
     85 		return nil;
     86 	case CHANSND:
     87 		return &c->asend;
     88 	case CHANRCV:
     89 		return &c->arecv;
     90 	}
     91 }
     92 
     93 static int
     94 altcanexec(Alt *a)
     95 {
     96 	_Altarray *ar;
     97 	Channel *c;
     98 
     99 	if(a->op == CHANNOP || (c=a->c) == nil)
    100 		return 0;
    101 	if(c->bufsize == 0){
    102 		ar = chanarray(c, otherop(a->op));
    103 		return ar && ar->n;
    104 	}else{
    105 		switch(a->op){
    106 		default:
    107 			return 0;
    108 		case CHANSND:
    109 			return c->nbuf < c->bufsize;
    110 		case CHANRCV:
    111 			return c->nbuf > 0;
    112 		}
    113 	}
    114 }
    115 
    116 static void
    117 altqueue(Alt *a)
    118 {
    119 	_Altarray *ar;
    120 
    121 	if(a->c == nil)
    122 		return;
    123 	ar = chanarray(a->c, a->op);
    124 	addarray(ar, a);
    125 }
    126 
    127 static void
    128 altdequeue(Alt *a)
    129 {
    130 	int i;
    131 	_Altarray *ar;
    132 
    133 	ar = chanarray(a->c, a->op);
    134 	if(ar == nil){
    135 		fprint(2, "bad use of altdequeue op=%d\n", a->op);
    136 		abort();
    137 	}
    138 
    139 	for(i=0; i<ar->n; i++)
    140 		if(ar->a[i] == a){
    141 			delarray(ar, i);
    142 			return;
    143 		}
    144 	fprint(2, "cannot find self in altdequeue\n");
    145 	abort();
    146 }
    147 
    148 static void
    149 altalldequeue(Alt *a)
    150 {
    151 	int i;
    152 
    153 	for(i=0; a[i].op!=CHANEND && a[i].op!=CHANNOBLK; i++)
    154 		if(a[i].op != CHANNOP)
    155 			altdequeue(&a[i]);
    156 }
    157 
    158 static void
    159 amove(void *dst, void *src, uint n)
    160 {
    161 	if(dst){
    162 		if(src == nil)
    163 			memset(dst, 0, n);
    164 		else
    165 			memmove(dst, src, n);
    166 	}
    167 }
    168 
    169 /*
    170  * Actually move the data around.  There are up to three
    171  * players: the sender, the receiver, and the channel itself.
    172  * If the channel is unbuffered or the buffer is empty,
    173  * data goes from sender to receiver.  If the channel is full,
    174  * the receiver removes some from the channel and the sender
    175  * gets to put some in.
    176  */
    177 static void
    178 altcopy(Alt *s, Alt *r)
    179 {
    180 	Alt *t;
    181 	Channel *c;
    182 	uchar *cp;
    183 
    184 	/*
    185 	 * Work out who is sender and who is receiver
    186 	 */
    187 	if(s == nil && r == nil)
    188 		return;
    189 	assert(s != nil);
    190 	c = s->c;
    191 	if(s->op == CHANRCV){
    192 		t = s;
    193 		s = r;
    194 		r = t;
    195 	}
    196 	assert(s==nil || s->op == CHANSND);
    197 	assert(r==nil || r->op == CHANRCV);
    198 
    199 	/*
    200 	 * Channel is empty (or unbuffered) - copy directly.
    201 	 */
    202 	if(s && r && c->nbuf == 0){
    203 		amove(r->v, s->v, c->elemsize);
    204 		return;
    205 	}
    206 
    207 	/*
    208 	 * Otherwise it's always okay to receive and then send.
    209 	 */
    210 	if(r){
    211 		cp = c->buf + c->off*c->elemsize;
    212 		amove(r->v, cp, c->elemsize);
    213 		--c->nbuf;
    214 		if(++c->off == c->bufsize)
    215 			c->off = 0;
    216 	}
    217 	if(s){
    218 		cp = c->buf + (c->off+c->nbuf)%c->bufsize*c->elemsize;
    219 		amove(cp, s->v, c->elemsize);
    220 		++c->nbuf;
    221 	}
    222 }
    223 
    224 static void
    225 altexec(Alt *a)
    226 {
    227 	int i;
    228 	_Altarray *ar;
    229 	Alt *other;
    230 	Channel *c;
    231 
    232 	c = a->c;
    233 	ar = chanarray(c, otherop(a->op));
    234 	if(ar && ar->n){
    235 		i = rand()%ar->n;
    236 		other = ar->a[i];
    237 		altcopy(a, other);
    238 		altalldequeue(other->thread->alt);
    239 		other->thread->alt = other;
    240 		_threadready(other->thread);
    241 	}else
    242 		altcopy(a, nil);
    243 }
    244 
    245 #define dbgalt 0
    246 int
    247 chanalt(Alt *a)
    248 {
    249 	int i, j, ncan, n, canblock;
    250 	Channel *c;
    251 	_Thread *t;
    252 
    253 	needstack(512);
    254 	for(i=0; a[i].op != CHANEND && a[i].op != CHANNOBLK; i++)
    255 		;
    256 	n = i;
    257 	canblock = a[i].op == CHANEND;
    258 
    259 	t = proc()->thread;
    260 	for(i=0; i<n; i++)
    261 		a[i].thread = t;
    262 	t->alt = a;
    263 	qlock(&chanlock);
    264 if(dbgalt) print("alt ");
    265 	ncan = 0;
    266 	for(i=0; i<n; i++){
    267 		c = a[i].c;
    268 if(dbgalt) print(" %c:", "esrnb"[a[i].op]);
    269 if(dbgalt) if(c->name) print("%s", c->name); else print("%p", c);
    270 		if(altcanexec(&a[i])){
    271 if(dbgalt) print("*");
    272 			ncan++;
    273 		}
    274 	}
    275 	if(ncan){
    276 		j = rand()%ncan;
    277 		for(i=0; i<n; i++){
    278 			if(altcanexec(&a[i])){
    279 				if(j-- == 0){
    280 if(dbgalt){
    281 c = a[i].c;
    282 print(" => %c:", "esrnb"[a[i].op]);
    283 if(c->name) print("%s", c->name); else print("%p", c);
    284 print("\n");
    285 }
    286 					altexec(&a[i]);
    287 					qunlock(&chanlock);
    288 					return i;
    289 				}
    290 			}
    291 		}
    292 	}
    293 if(dbgalt)print("\n");
    294 
    295 	if(!canblock){
    296 		qunlock(&chanlock);
    297 		return -1;
    298 	}
    299 
    300 	for(i=0; i<n; i++){
    301 		if(a[i].op != CHANNOP)
    302 			altqueue(&a[i]);
    303 	}
    304 	qunlock(&chanlock);
    305 
    306 	_threadswitch();
    307 
    308 	/*
    309 	 * the guy who ran the op took care of dequeueing us
    310 	 * and then set t->alt to the one that was executed.
    311 	 */
    312 	if(t->alt < a || t->alt >= a+n)
    313 		sysfatal("channel bad alt");
    314 	return t->alt - a;
    315 }
    316 
    317 static int
    318 _chanop(Channel *c, int op, void *p, int canblock)
    319 {
    320 	Alt a[2];
    321 
    322 	a[0].c = c;
    323 	a[0].op = op;
    324 	a[0].v = p;
    325 	a[1].op = canblock ? CHANEND : CHANNOBLK;
    326 	if(chanalt(a) < 0)
    327 		return -1;
    328 	return 1;
    329 }
    330 
    331 int
    332 chansend(Channel *c, void *v)
    333 {
    334 	return _chanop(c, CHANSND, v, 1);
    335 }
    336 
    337 int
    338 channbsend(Channel *c, void *v)
    339 {
    340 	return _chanop(c, CHANSND, v, 0);
    341 }
    342 
    343 int
    344 chanrecv(Channel *c, void *v)
    345 {
    346 	return _chanop(c, CHANRCV, v, 1);
    347 }
    348 
    349 int
    350 channbrecv(Channel *c, void *v)
    351 {
    352 	return _chanop(c, CHANRCV, v, 0);
    353 }
    354 
    355 int
    356 chansendp(Channel *c, void *v)
    357 {
    358 	return _chanop(c, CHANSND, (void*)&v, 1);
    359 }
    360 
    361 void*
    362 chanrecvp(Channel *c)
    363 {
    364 	void *v;
    365 
    366 	if(_chanop(c, CHANRCV, (void*)&v, 1) > 0)
    367 		return v;
    368 	return nil;
    369 }
    370 
    371 int
    372 channbsendp(Channel *c, void *v)
    373 {
    374 	return _chanop(c, CHANSND, (void*)&v, 0);
    375 }
    376 
    377 void*
    378 channbrecvp(Channel *c)
    379 {
    380 	void *v;
    381 
    382 	if(_chanop(c, CHANRCV, (void*)&v, 0) > 0)
    383 		return v;
    384 	return nil;
    385 }
    386 
    387 int
    388 chansendul(Channel *c, ulong val)
    389 {
    390 	return _chanop(c, CHANSND, &val, 1);
    391 }
    392 
    393 ulong
    394 chanrecvul(Channel *c)
    395 {
    396 	ulong val;
    397 
    398 	if(_chanop(c, CHANRCV, &val, 1) > 0)
    399 		return val;
    400 	return 0;
    401 }
    402 
    403 int
    404 channbsendul(Channel *c, ulong val)
    405 {
    406 	return _chanop(c, CHANSND, &val, 0);
    407 }
    408 
    409 ulong
    410 channbrecvul(Channel *c)
    411 {
    412 	ulong val;
    413 
    414 	if(_chanop(c, CHANRCV, &val, 0) > 0)
    415 		return val;
    416 	return 0;
    417 }