client.c (9434B)
1 /* 2 * Sun RPC client. 3 */ 4 #include <u.h> 5 #include <libc.h> 6 #include <thread.h> 7 #include <sunrpc.h> 8 9 typedef struct Out Out; 10 struct Out 11 { 12 char err[ERRMAX]; /* error string */ 13 Channel *creply; /* send to finish rpc */ 14 uchar *p; /* pending request packet */ 15 int n; /* size of request */ 16 ulong tag; /* flush tag of pending request */ 17 ulong xid; /* xid of pending request */ 18 ulong st; /* first send time */ 19 ulong t; /* resend time */ 20 int nresend; /* number of resends */ 21 SunRpc rpc; /* response rpc */ 22 }; 23 24 static void 25 udpThread(void *v) 26 { 27 uchar *p, *buf; 28 Ioproc *io; 29 int n; 30 SunClient *cli; 31 enum { BufSize = 65536 }; 32 33 cli = v; 34 buf = emalloc(BufSize); 35 io = ioproc(); 36 p = nil; 37 for(;;){ 38 n = ioread(io, cli->fd, buf, BufSize); 39 if(n <= 0) 40 break; 41 p = emalloc(4+n); 42 memmove(p+4, buf, n); 43 p[0] = n>>24; 44 p[1] = n>>16; 45 p[2] = n>>8; 46 p[3] = n; 47 if(sendp(cli->readchan, p) == 0) 48 break; 49 p = nil; 50 } 51 free(p); 52 closeioproc(io); 53 while(send(cli->dying, nil) == -1) 54 ; 55 } 56 57 static void 58 netThread(void *v) 59 { 60 uchar *p, buf[4]; 61 Ioproc *io; 62 uint n, tot; 63 int done; 64 SunClient *cli; 65 66 cli = v; 67 io = ioproc(); 68 tot = 0; 69 p = nil; 70 for(;;){ 71 n = ioreadn(io, cli->fd, buf, 4); 72 if(n != 4) 73 break; 74 n = (buf[0]<<24)|(buf[1]<<16)|(buf[2]<<8)|buf[3]; 75 if(cli->chatty) 76 fprint(2, "%.8ux...", n); 77 done = n&0x80000000; 78 n &= ~0x80000000; 79 if(tot == 0){ 80 p = emalloc(4+n); 81 tot = 4; 82 }else 83 p = erealloc(p, tot+n); 84 if(ioreadn(io, cli->fd, p+tot, n) != n) 85 break; 86 tot += n; 87 if(done){ 88 p[0] = tot>>24; 89 p[1] = tot>>16; 90 p[2] = tot>>8; 91 p[3] = tot; 92 if(sendp(cli->readchan, p) == 0) 93 break; 94 p = nil; 95 tot = 0; 96 } 97 } 98 free(p); 99 closeioproc(io); 100 while(send(cli->dying, 0) == -1) 101 ; 102 } 103 104 static void 105 timerThread(void *v) 106 { 107 Ioproc *io; 108 SunClient *cli; 109 110 cli = v; 111 io = ioproc(); 112 for(;;){ 113 if(iosleep(io, 200) < 0) 114 break; 115 if(sendul(cli->timerchan, 0) == 0) 116 break; 117 } 118 closeioproc(io); 119 while(send(cli->dying, 0) == -1) 120 ; 121 } 122 123 static ulong 124 msec(void) 125 { 126 return nsec()/1000000; 127 } 128 129 static ulong 130 twait(ulong rtt, int nresend) 131 { 132 ulong t; 133 134 t = rtt; 135 if(nresend <= 1) 136 {} 137 else if(nresend <= 3) 138 t *= 2; 139 else if(nresend <= 18) 140 t <<= nresend-2; 141 else 142 t = 60*1000; 143 if(t > 60*1000) 144 t = 60*1000; 145 146 return t; 147 } 148 149 static void 150 rpcMuxThread(void *v) 151 { 152 uchar *buf, *p, *ep; 153 int i, n, nout, mout; 154 ulong t, xidgen, tag; 155 Alt a[5]; 156 Out *o, **out; 157 SunRpc rpc; 158 SunClient *cli; 159 160 cli = v; 161 mout = 16; 162 nout = 0; 163 out = emalloc(mout*sizeof(out[0])); 164 xidgen = truerand(); 165 166 a[0].op = CHANRCV; 167 a[0].c = cli->rpcchan; 168 a[0].v = &o; 169 a[1].op = CHANNOP; 170 a[1].c = cli->timerchan; 171 a[1].v = nil; 172 a[2].op = CHANRCV; 173 a[2].c = cli->flushchan; 174 a[2].v = &tag; 175 a[3].op = CHANRCV; 176 a[3].c = cli->readchan; 177 a[3].v = &buf; 178 a[4].op = CHANEND; 179 180 for(;;){ 181 switch(alt(a)){ 182 case 0: /* o = <-rpcchan */ 183 if(o == nil) 184 goto Done; 185 cli->nsend++; 186 /* set xid */ 187 o->xid = ++xidgen; 188 if(cli->needcount) 189 p = o->p+4; 190 else 191 p = o->p; 192 p[0] = xidgen>>24; 193 p[1] = xidgen>>16; 194 p[2] = xidgen>>8; 195 p[3] = xidgen; 196 if(write(cli->fd, o->p, o->n) != o->n){ 197 free(o->p); 198 o->p = nil; 199 snprint(o->err, sizeof o->err, "write: %r"); 200 sendp(o->creply, 0); 201 break; 202 } 203 if(nout >= mout){ 204 mout *= 2; 205 out = erealloc(out, mout*sizeof(out[0])); 206 } 207 o->st = msec(); 208 o->nresend = 0; 209 o->t = o->st + twait(cli->rtt.avg, 0); 210 if(cli->chatty) fprint(2, "send %lux %lud %lud\n", o->xid, o->st, o->t); 211 out[nout++] = o; 212 a[1].op = CHANRCV; 213 break; 214 215 case 1: /* <-timerchan */ 216 t = msec(); 217 for(i=0; i<nout; i++){ 218 o = out[i]; 219 if((int)(t - o->t) > 0){ 220 if(cli->chatty) fprint(2, "resend %lux %lud %lud\n", o->xid, t, o->t); 221 if(cli->maxwait && t - o->st >= cli->maxwait){ 222 free(o->p); 223 o->p = nil; 224 strcpy(o->err, "timeout"); 225 sendp(o->creply, 0); 226 out[i--] = out[--nout]; 227 continue; 228 } 229 cli->nresend++; 230 o->nresend++; 231 o->t = t + twait(cli->rtt.avg, o->nresend); 232 if(write(cli->fd, o->p, o->n) != o->n){ 233 free(o->p); 234 o->p = nil; 235 snprint(o->err, sizeof o->err, "rewrite: %r"); 236 sendp(o->creply, 0); 237 out[i--] = out[--nout]; 238 continue; 239 } 240 } 241 } 242 /* stop ticking if no work; rpcchan will turn it back on */ 243 if(nout == 0) 244 a[1].op = CHANNOP; 245 break; 246 247 case 2: /* tag = <-flushchan */ 248 for(i=0; i<nout; i++){ 249 o = out[i]; 250 if(o->tag == tag){ 251 out[i--] = out[--nout]; 252 strcpy(o->err, "flushed"); 253 free(o->p); 254 o->p = nil; 255 sendp(o->creply, 0); 256 } 257 } 258 break; 259 260 case 3: /* buf = <-readchan */ 261 p = buf; 262 n = (p[0]<<24)|(p[1]<<16)|(p[2]<<8)|p[3]; 263 p += 4; 264 ep = p+n; 265 if(sunrpcunpack(p, ep, &p, &rpc) != SunSuccess){ 266 fprint(2, "%s: in: %.*H unpack failed\n", argv0, n, buf+4); 267 free(buf); 268 break; 269 } 270 if(cli->chatty) 271 fprint(2, "in: %B\n", &rpc); 272 if(rpc.iscall){ 273 fprint(2, "did not get reply\n"); 274 free(buf); 275 break; 276 } 277 o = nil; 278 for(i=0; i<nout; i++){ 279 o = out[i]; 280 if(o->xid == rpc.xid) 281 break; 282 } 283 if(i==nout){ 284 if(cli->chatty) fprint(2, "did not find waiting request\n"); 285 free(buf); 286 break; 287 } 288 out[i] = out[--nout]; 289 free(o->p); 290 o->p = nil; 291 o->rpc = rpc; 292 if(rpc.status == SunSuccess){ 293 o->p = buf; 294 }else{ 295 o->p = nil; 296 free(buf); 297 sunerrstr(rpc.status); 298 rerrstr(o->err, sizeof o->err); 299 } 300 sendp(o->creply, 0); 301 break; 302 } 303 } 304 Done: 305 free(out); 306 sendp(cli->dying, 0); 307 } 308 309 SunClient* 310 sundial(char *address) 311 { 312 int fd; 313 SunClient *cli; 314 315 if((fd = dial(address, 0, 0, 0)) < 0) 316 return nil; 317 318 cli = emalloc(sizeof(SunClient)); 319 cli->fd = fd; 320 cli->maxwait = 15000; 321 cli->rtt.avg = 1000; 322 cli->dying = chancreate(sizeof(void*), 0); 323 cli->rpcchan = chancreate(sizeof(Out*), 0); 324 cli->timerchan = chancreate(sizeof(ulong), 0); 325 cli->flushchan = chancreate(sizeof(ulong), 0); 326 cli->readchan = chancreate(sizeof(uchar*), 0); 327 if(strstr(address, "udp!")){ 328 cli->needcount = 0; 329 cli->nettid = threadcreate(udpThread, cli, SunStackSize); 330 cli->timertid = threadcreate(timerThread, cli, SunStackSize); 331 }else{ 332 cli->needcount = 1; 333 cli->nettid = threadcreate(netThread, cli, SunStackSize); 334 /* assume reliable: don't need timer */ 335 /* BUG: netThread should know how to redial */ 336 } 337 threadcreate(rpcMuxThread, cli, SunStackSize); 338 339 return cli; 340 } 341 342 void 343 sunclientclose(SunClient *cli) 344 { 345 int n; 346 347 /* 348 * Threadints get you out of any stuck system calls 349 * or thread rendezvouses, but do nothing if the thread 350 * is in the ready state. Keep interrupting until it takes. 351 */ 352 n = 0; 353 if(!cli->timertid) 354 n++; 355 while(n < 2){ 356 /* 357 threadint(cli->nettid); 358 if(cli->timertid) 359 threadint(cli->timertid); 360 */ 361 362 yield(); 363 while(nbrecv(cli->dying, nil) == 1) 364 n++; 365 } 366 367 sendp(cli->rpcchan, 0); 368 recvp(cli->dying); 369 370 /* everyone's gone: clean up */ 371 close(cli->fd); 372 chanfree(cli->flushchan); 373 chanfree(cli->readchan); 374 chanfree(cli->timerchan); 375 free(cli); 376 } 377 378 void 379 sunclientflushrpc(SunClient *cli, ulong tag) 380 { 381 sendul(cli->flushchan, tag); 382 } 383 384 void 385 sunclientprog(SunClient *cli, SunProg *p) 386 { 387 if(cli->nprog%16 == 0) 388 cli->prog = erealloc(cli->prog, (cli->nprog+16)*sizeof(cli->prog[0])); 389 cli->prog[cli->nprog++] = p; 390 } 391 392 int 393 sunclientrpc(SunClient *cli, ulong tag, SunCall *tx, SunCall *rx, uchar **tofree) 394 { 395 uchar *bp, *p, *ep; 396 int i, n1, n2, n, nn; 397 Out o; 398 SunProg *prog; 399 SunStatus ok; 400 401 for(i=0; i<cli->nprog; i++) 402 if(cli->prog[i]->prog == tx->rpc.prog && cli->prog[i]->vers == tx->rpc.vers) 403 break; 404 if(i==cli->nprog){ 405 werrstr("unknown sun rpc program %d version %d", tx->rpc.prog, tx->rpc.vers); 406 return -1; 407 } 408 prog = cli->prog[i]; 409 410 if(cli->chatty){ 411 fprint(2, "out: %B\n", &tx->rpc); 412 fprint(2, "\t%C\n", tx); 413 } 414 415 n1 = sunrpcsize(&tx->rpc); 416 n2 = suncallsize(prog, tx); 417 418 n = n1+n2; 419 if(cli->needcount) 420 n += 4; 421 422 /* 423 * The dance with 100 is to leave some padding in case 424 * suncallsize is slightly underestimating. If this happens, 425 * the pack will succeed and then we can give a good size 426 * mismatch error below. Otherwise the pack fails with 427 * garbage args, which is less helpful. 428 */ 429 bp = emalloc(n+100); 430 ep = bp+n+100; 431 p = bp; 432 if(cli->needcount){ 433 nn = n-4; 434 p[0] = (nn>>24)|0x80; 435 p[1] = nn>>16; 436 p[2] = nn>>8; 437 p[3] = nn; 438 p += 4; 439 } 440 if((ok = sunrpcpack(p, ep, &p, &tx->rpc)) != SunSuccess 441 || (ok = suncallpack(prog, p, ep, &p, tx)) != SunSuccess){ 442 sunerrstr(ok); 443 free(bp); 444 return -1; 445 } 446 ep -= 100; 447 if(p != ep){ 448 werrstr("rpc: packet size mismatch %d %ld %ld", n, ep-bp, p-bp); 449 free(bp); 450 return -1; 451 } 452 453 memset(&o, 0, sizeof o); 454 o.creply = chancreate(sizeof(void*), 0); 455 o.tag = tag; 456 o.p = bp; 457 o.n = n; 458 459 sendp(cli->rpcchan, &o); 460 recvp(o.creply); 461 chanfree(o.creply); 462 463 if(o.p == nil){ 464 werrstr("%s", o.err); 465 return -1; 466 } 467 468 p = o.rpc.data; 469 ep = p+o.rpc.ndata; 470 rx->rpc = o.rpc; 471 rx->rpc.proc = tx->rpc.proc; 472 rx->rpc.prog = tx->rpc.prog; 473 rx->rpc.vers = tx->rpc.vers; 474 rx->type = (rx->rpc.proc<<1)|1; 475 if(rx->rpc.status != SunSuccess){ 476 sunerrstr(rx->rpc.status); 477 werrstr("unpack: %r"); 478 free(o.p); 479 return -1; 480 } 481 482 if((ok = suncallunpack(prog, p, ep, &p, rx)) != SunSuccess){ 483 sunerrstr(ok); 484 werrstr("unpack: %r"); 485 free(o.p); 486 return -1; 487 } 488 489 if(cli->chatty){ 490 fprint(2, "in: %B\n", &rx->rpc); 491 fprint(2, "in:\t%C\n", rx); 492 } 493 494 if(tofree) 495 *tofree = o.p; 496 else 497 free(o.p); 498 499 return 0; 500 }