plan9port

fork of plan9port with libvec, libstr and libsdb
Log | Files | Refs | README | LICENSE

client.c (9434B)


      1 /*
      2  * Sun RPC client.
      3  */
      4 #include <u.h>
      5 #include <libc.h>
      6 #include <thread.h>
      7 #include <sunrpc.h>
      8 
      9 typedef struct Out Out;
     10 struct Out
     11 {
     12 	char err[ERRMAX];	/* error string */
     13 	Channel *creply;	/* send to finish rpc */
     14 	uchar *p;			/* pending request packet */
     15 	int n;				/* size of request */
     16 	ulong tag;			/* flush tag of pending request */
     17 	ulong xid;			/* xid of pending request */
     18 	ulong st;			/* first send time */
     19 	ulong t;			/* resend time */
     20 	int nresend;		/* number of resends */
     21 	SunRpc rpc;		/* response rpc */
     22 };
     23 
     24 static void
     25 udpThread(void *v)
     26 {
     27 	uchar *p, *buf;
     28 	Ioproc *io;
     29 	int n;
     30 	SunClient *cli;
     31 	enum { BufSize = 65536 };
     32 
     33 	cli = v;
     34 	buf = emalloc(BufSize);
     35 	io = ioproc();
     36 	p = nil;
     37 	for(;;){
     38 		n = ioread(io, cli->fd, buf, BufSize);
     39 		if(n <= 0)
     40 			break;
     41 		p = emalloc(4+n);
     42 		memmove(p+4, buf, n);
     43 		p[0] = n>>24;
     44 		p[1] = n>>16;
     45 		p[2] = n>>8;
     46 		p[3] = n;
     47 		if(sendp(cli->readchan, p) == 0)
     48 			break;
     49 		p = nil;
     50 	}
     51 	free(p);
     52 	closeioproc(io);
     53 	while(send(cli->dying, nil) == -1)
     54 		;
     55 }
     56 
     57 static void
     58 netThread(void *v)
     59 {
     60 	uchar *p, buf[4];
     61 	Ioproc *io;
     62 	uint n, tot;
     63 	int done;
     64 	SunClient *cli;
     65 
     66 	cli = v;
     67 	io = ioproc();
     68 	tot = 0;
     69 	p = nil;
     70 	for(;;){
     71 		n = ioreadn(io, cli->fd, buf, 4);
     72 		if(n != 4)
     73 			break;
     74 		n = (buf[0]<<24)|(buf[1]<<16)|(buf[2]<<8)|buf[3];
     75 		if(cli->chatty)
     76 			fprint(2, "%.8ux...", n);
     77 		done = n&0x80000000;
     78 		n &= ~0x80000000;
     79 		if(tot == 0){
     80 			p = emalloc(4+n);
     81 			tot = 4;
     82 		}else
     83 			p = erealloc(p, tot+n);
     84 		if(ioreadn(io, cli->fd, p+tot, n) != n)
     85 			break;
     86 		tot += n;
     87 		if(done){
     88 			p[0] = tot>>24;
     89 			p[1] = tot>>16;
     90 			p[2] = tot>>8;
     91 			p[3] = tot;
     92 			if(sendp(cli->readchan, p) == 0)
     93 				break;
     94 			p = nil;
     95 			tot = 0;
     96 		}
     97 	}
     98 	free(p);
     99 	closeioproc(io);
    100 	while(send(cli->dying, 0) == -1)
    101 		;
    102 }
    103 
    104 static void
    105 timerThread(void *v)
    106 {
    107 	Ioproc *io;
    108 	SunClient *cli;
    109 
    110 	cli = v;
    111 	io = ioproc();
    112 	for(;;){
    113 		if(iosleep(io, 200) < 0)
    114 			break;
    115 		if(sendul(cli->timerchan, 0) == 0)
    116 			break;
    117 	}
    118 	closeioproc(io);
    119 	while(send(cli->dying, 0) == -1)
    120 		;
    121 }
    122 
    123 static ulong
    124 msec(void)
    125 {
    126 	return nsec()/1000000;
    127 }
    128 
    129 static ulong
    130 twait(ulong rtt, int nresend)
    131 {
    132 	ulong t;
    133 
    134 	t = rtt;
    135 	if(nresend <= 1)
    136 		{}
    137 	else if(nresend <= 3)
    138 		t *= 2;
    139 	else if(nresend <= 18)
    140 		t <<= nresend-2;
    141 	else
    142 		t = 60*1000;
    143 	if(t > 60*1000)
    144 		t = 60*1000;
    145 
    146 	return t;
    147 }
    148 
    149 static void
    150 rpcMuxThread(void *v)
    151 {
    152 	uchar *buf, *p, *ep;
    153 	int i, n, nout, mout;
    154 	ulong t, xidgen, tag;
    155 	Alt a[5];
    156 	Out *o, **out;
    157 	SunRpc rpc;
    158 	SunClient *cli;
    159 
    160 	cli = v;
    161 	mout = 16;
    162 	nout = 0;
    163 	out = emalloc(mout*sizeof(out[0]));
    164 	xidgen = truerand();
    165 
    166 	a[0].op = CHANRCV;
    167 	a[0].c = cli->rpcchan;
    168 	a[0].v = &o;
    169 	a[1].op = CHANNOP;
    170 	a[1].c = cli->timerchan;
    171 	a[1].v = nil;
    172 	a[2].op = CHANRCV;
    173 	a[2].c = cli->flushchan;
    174 	a[2].v = &tag;
    175 	a[3].op = CHANRCV;
    176 	a[3].c = cli->readchan;
    177 	a[3].v = &buf;
    178 	a[4].op = CHANEND;
    179 
    180 	for(;;){
    181 		switch(alt(a)){
    182 		case 0:	/* o = <-rpcchan */
    183 			if(o == nil)
    184 				goto Done;
    185 			cli->nsend++;
    186 			/* set xid */
    187 			o->xid = ++xidgen;
    188 			if(cli->needcount)
    189 				p = o->p+4;
    190 			else
    191 				p = o->p;
    192 			p[0] = xidgen>>24;
    193 			p[1] = xidgen>>16;
    194 			p[2] = xidgen>>8;
    195 			p[3] = xidgen;
    196 			if(write(cli->fd, o->p, o->n) != o->n){
    197 				free(o->p);
    198 				o->p = nil;
    199 				snprint(o->err, sizeof o->err, "write: %r");
    200 				sendp(o->creply, 0);
    201 				break;
    202 			}
    203 			if(nout >= mout){
    204 				mout *= 2;
    205 				out = erealloc(out, mout*sizeof(out[0]));
    206 			}
    207 			o->st = msec();
    208 			o->nresend = 0;
    209 			o->t = o->st + twait(cli->rtt.avg, 0);
    210 if(cli->chatty) fprint(2, "send %lux %lud %lud\n", o->xid, o->st, o->t);
    211 			out[nout++] = o;
    212 			a[1].op = CHANRCV;
    213 			break;
    214 
    215 		case 1:	/* <-timerchan */
    216 			t = msec();
    217 			for(i=0; i<nout; i++){
    218 				o = out[i];
    219 				if((int)(t - o->t) > 0){
    220 if(cli->chatty) fprint(2, "resend %lux %lud %lud\n", o->xid, t, o->t);
    221 					if(cli->maxwait && t - o->st >= cli->maxwait){
    222 						free(o->p);
    223 						o->p = nil;
    224 						strcpy(o->err, "timeout");
    225 						sendp(o->creply, 0);
    226 						out[i--] = out[--nout];
    227 						continue;
    228 					}
    229 					cli->nresend++;
    230 					o->nresend++;
    231 					o->t = t + twait(cli->rtt.avg, o->nresend);
    232 					if(write(cli->fd, o->p, o->n) != o->n){
    233 						free(o->p);
    234 						o->p = nil;
    235 						snprint(o->err, sizeof o->err, "rewrite: %r");
    236 						sendp(o->creply, 0);
    237 						out[i--] = out[--nout];
    238 						continue;
    239 					}
    240 				}
    241 			}
    242 			/* stop ticking if no work; rpcchan will turn it back on */
    243 			if(nout == 0)
    244 				a[1].op = CHANNOP;
    245 			break;
    246 
    247 		case 2:	/* tag = <-flushchan */
    248 			for(i=0; i<nout; i++){
    249 				o = out[i];
    250 				if(o->tag == tag){
    251 					out[i--] = out[--nout];
    252 					strcpy(o->err, "flushed");
    253 					free(o->p);
    254 					o->p = nil;
    255 					sendp(o->creply, 0);
    256 				}
    257 			}
    258 			break;
    259 
    260 		case 3:	/* buf = <-readchan */
    261 			p = buf;
    262 			n = (p[0]<<24)|(p[1]<<16)|(p[2]<<8)|p[3];
    263 			p += 4;
    264 			ep = p+n;
    265 			if(sunrpcunpack(p, ep, &p, &rpc) != SunSuccess){
    266 				fprint(2, "%s: in: %.*H unpack failed\n", argv0, n, buf+4);
    267 				free(buf);
    268 				break;
    269 			}
    270 			if(cli->chatty)
    271 				fprint(2, "in: %B\n", &rpc);
    272 			if(rpc.iscall){
    273 				fprint(2, "did not get reply\n");
    274 				free(buf);
    275 				break;
    276 			}
    277 			o = nil;
    278 			for(i=0; i<nout; i++){
    279 				o = out[i];
    280 				if(o->xid == rpc.xid)
    281 					break;
    282 			}
    283 			if(i==nout){
    284 				if(cli->chatty) fprint(2, "did not find waiting request\n");
    285 				free(buf);
    286 				break;
    287 			}
    288 			out[i] = out[--nout];
    289 			free(o->p);
    290 			o->p = nil;
    291 			o->rpc = rpc;
    292 			if(rpc.status == SunSuccess){
    293 				o->p = buf;
    294 			}else{
    295 				o->p = nil;
    296 				free(buf);
    297 				sunerrstr(rpc.status);
    298 				rerrstr(o->err, sizeof o->err);
    299 			}
    300 			sendp(o->creply, 0);
    301 			break;
    302 		}
    303 	}
    304 Done:
    305 	free(out);
    306 	sendp(cli->dying, 0);
    307 }
    308 
    309 SunClient*
    310 sundial(char *address)
    311 {
    312 	int fd;
    313 	SunClient *cli;
    314 
    315 	if((fd = dial(address, 0, 0, 0)) < 0)
    316 		return nil;
    317 
    318 	cli = emalloc(sizeof(SunClient));
    319 	cli->fd = fd;
    320 	cli->maxwait = 15000;
    321 	cli->rtt.avg = 1000;
    322 	cli->dying = chancreate(sizeof(void*), 0);
    323 	cli->rpcchan = chancreate(sizeof(Out*), 0);
    324 	cli->timerchan = chancreate(sizeof(ulong), 0);
    325 	cli->flushchan = chancreate(sizeof(ulong), 0);
    326 	cli->readchan = chancreate(sizeof(uchar*), 0);
    327 	if(strstr(address, "udp!")){
    328 		cli->needcount = 0;
    329 		cli->nettid = threadcreate(udpThread, cli, SunStackSize);
    330 		cli->timertid = threadcreate(timerThread, cli, SunStackSize);
    331 	}else{
    332 		cli->needcount = 1;
    333 		cli->nettid = threadcreate(netThread, cli, SunStackSize);
    334 		/* assume reliable: don't need timer */
    335 		/* BUG: netThread should know how to redial */
    336 	}
    337 	threadcreate(rpcMuxThread, cli, SunStackSize);
    338 
    339 	return cli;
    340 }
    341 
    342 void
    343 sunclientclose(SunClient *cli)
    344 {
    345 	int n;
    346 
    347 	/*
    348 	 * Threadints get you out of any stuck system calls
    349 	 * or thread rendezvouses, but do nothing if the thread
    350 	 * is in the ready state.  Keep interrupting until it takes.
    351 	 */
    352 	n = 0;
    353 	if(!cli->timertid)
    354 		n++;
    355 	while(n < 2){
    356 /*
    357 		threadint(cli->nettid);
    358 		if(cli->timertid)
    359 			threadint(cli->timertid);
    360 */
    361 
    362 		yield();
    363 		while(nbrecv(cli->dying, nil) == 1)
    364 			n++;
    365 	}
    366 
    367 	sendp(cli->rpcchan, 0);
    368 	recvp(cli->dying);
    369 
    370 	/* everyone's gone: clean up */
    371 	close(cli->fd);
    372 	chanfree(cli->flushchan);
    373 	chanfree(cli->readchan);
    374 	chanfree(cli->timerchan);
    375 	free(cli);
    376 }
    377 
    378 void
    379 sunclientflushrpc(SunClient *cli, ulong tag)
    380 {
    381 	sendul(cli->flushchan, tag);
    382 }
    383 
    384 void
    385 sunclientprog(SunClient *cli, SunProg *p)
    386 {
    387 	if(cli->nprog%16 == 0)
    388 		cli->prog = erealloc(cli->prog, (cli->nprog+16)*sizeof(cli->prog[0]));
    389 	cli->prog[cli->nprog++] = p;
    390 }
    391 
    392 int
    393 sunclientrpc(SunClient *cli, ulong tag, SunCall *tx, SunCall *rx, uchar **tofree)
    394 {
    395 	uchar *bp, *p, *ep;
    396 	int i, n1, n2, n, nn;
    397 	Out o;
    398 	SunProg *prog;
    399 	SunStatus ok;
    400 
    401 	for(i=0; i<cli->nprog; i++)
    402 		if(cli->prog[i]->prog == tx->rpc.prog && cli->prog[i]->vers == tx->rpc.vers)
    403 			break;
    404 	if(i==cli->nprog){
    405 		werrstr("unknown sun rpc program %d version %d", tx->rpc.prog, tx->rpc.vers);
    406 		return -1;
    407 	}
    408 	prog = cli->prog[i];
    409 
    410 	if(cli->chatty){
    411 		fprint(2, "out: %B\n", &tx->rpc);
    412 		fprint(2, "\t%C\n", tx);
    413 	}
    414 
    415 	n1 = sunrpcsize(&tx->rpc);
    416 	n2 = suncallsize(prog, tx);
    417 
    418 	n = n1+n2;
    419 	if(cli->needcount)
    420 		n += 4;
    421 
    422 	/*
    423 	 * The dance with 100 is to leave some padding in case
    424 	 * suncallsize is slightly underestimating.  If this happens,
    425 	 * the pack will succeed and then we can give a good size
    426 	 * mismatch error below.  Otherwise the pack fails with
    427 	 * garbage args, which is less helpful.
    428 	 */
    429 	bp = emalloc(n+100);
    430 	ep = bp+n+100;
    431 	p = bp;
    432 	if(cli->needcount){
    433 		nn = n-4;
    434 		p[0] = (nn>>24)|0x80;
    435 		p[1] = nn>>16;
    436 		p[2] = nn>>8;
    437 		p[3] = nn;
    438 		p += 4;
    439 	}
    440 	if((ok = sunrpcpack(p, ep, &p, &tx->rpc)) != SunSuccess
    441 	|| (ok = suncallpack(prog, p, ep, &p, tx)) != SunSuccess){
    442 		sunerrstr(ok);
    443 		free(bp);
    444 		return -1;
    445 	}
    446 	ep -= 100;
    447 	if(p != ep){
    448 		werrstr("rpc: packet size mismatch %d %ld %ld", n, ep-bp, p-bp);
    449 		free(bp);
    450 		return -1;
    451 	}
    452 
    453 	memset(&o, 0, sizeof o);
    454 	o.creply = chancreate(sizeof(void*), 0);
    455 	o.tag = tag;
    456 	o.p = bp;
    457 	o.n = n;
    458 
    459 	sendp(cli->rpcchan, &o);
    460 	recvp(o.creply);
    461 	chanfree(o.creply);
    462 
    463 	if(o.p == nil){
    464 		werrstr("%s", o.err);
    465 		return -1;
    466 	}
    467 
    468 	p = o.rpc.data;
    469 	ep = p+o.rpc.ndata;
    470 	rx->rpc = o.rpc;
    471 	rx->rpc.proc = tx->rpc.proc;
    472 	rx->rpc.prog = tx->rpc.prog;
    473 	rx->rpc.vers = tx->rpc.vers;
    474 	rx->type = (rx->rpc.proc<<1)|1;
    475 	if(rx->rpc.status != SunSuccess){
    476 		sunerrstr(rx->rpc.status);
    477 		werrstr("unpack: %r");
    478 		free(o.p);
    479 		return -1;
    480 	}
    481 
    482 	if((ok = suncallunpack(prog, p, ep, &p, rx)) != SunSuccess){
    483 		sunerrstr(ok);
    484 		werrstr("unpack: %r");
    485 		free(o.p);
    486 		return -1;
    487 	}
    488 
    489 	if(cli->chatty){
    490 		fprint(2, "in: %B\n", &rx->rpc);
    491 		fprint(2, "in:\t%C\n", rx);
    492 	}
    493 
    494 	if(tofree)
    495 		*tofree = o.p;
    496 	else
    497 		free(o.p);
    498 
    499 	return 0;
    500 }