9proc.c (14612B)
1 #include "stdinc.h" 2 3 #include "9.h" 4 #include "dat.h" 5 #include "fns.h" 6 7 enum { 8 NConInit = 128, 9 NMsgInit = 384, 10 NMsgProcInit = 64, 11 NMsizeInit = 8192+IOHDRSZ, 12 }; 13 14 static struct { 15 QLock alock; /* alloc */ 16 Msg* ahead; 17 Rendez arendez; 18 19 int maxmsg; 20 int nmsg; 21 int nmsgstarve; 22 23 QLock rlock; /* read */ 24 Msg* rhead; 25 Msg* rtail; 26 Rendez rrendez; 27 28 int maxproc; 29 int nproc; 30 int nprocstarve; 31 32 u32int msize; /* immutable */ 33 } mbox; 34 35 static struct { 36 QLock alock; /* alloc */ 37 Con* ahead; 38 Rendez arendez; 39 40 RWLock clock; 41 Con* chead; 42 Con* ctail; 43 44 int maxcon; 45 int ncon; 46 int nconstarve; 47 48 u32int msize; 49 } cbox; 50 51 static void 52 conFree(Con* con) 53 { 54 assert(con->version == nil); 55 assert(con->mhead == nil); 56 assert(con->whead == nil); 57 assert(con->nfid == 0); 58 assert(con->state == ConMoribund); 59 60 if(con->fd >= 0){ 61 close(con->fd); 62 con->fd = -1; 63 } 64 con->state = ConDead; 65 con->aok = 0; 66 con->flags = 0; 67 con->isconsole = 0; 68 69 qlock(&cbox.alock); 70 if(con->cprev != nil) 71 con->cprev->cnext = con->cnext; 72 else 73 cbox.chead = con->cnext; 74 if(con->cnext != nil) 75 con->cnext->cprev = con->cprev; 76 else 77 cbox.ctail = con->cprev; 78 con->cprev = con->cnext = nil; 79 80 if(cbox.ncon > cbox.maxcon){ 81 if(con->name != nil) 82 vtfree(con->name); 83 vtfree(con->data); 84 vtfree(con); 85 cbox.ncon--; 86 qunlock(&cbox.alock); 87 return; 88 } 89 con->anext = cbox.ahead; 90 cbox.ahead = con; 91 if(con->anext == nil) 92 rwakeup(&cbox.arendez); 93 qunlock(&cbox.alock); 94 } 95 96 static void 97 msgFree(Msg* m) 98 { 99 assert(m->rwnext == nil); 100 assert(m->flush == nil); 101 102 qlock(&mbox.alock); 103 if(mbox.nmsg > mbox.maxmsg){ 104 vtfree(m->data); 105 vtfree(m); 106 mbox.nmsg--; 107 qunlock(&mbox.alock); 108 return; 109 } 110 m->anext = mbox.ahead; 111 mbox.ahead = m; 112 if(m->anext == nil) 113 rwakeup(&mbox.arendez); 114 qunlock(&mbox.alock); 115 } 116 117 static Msg* 118 msgAlloc(Con* con) 119 { 120 Msg *m; 121 122 qlock(&mbox.alock); 123 while(mbox.ahead == nil){ 124 if(mbox.nmsg >= mbox.maxmsg){ 125 mbox.nmsgstarve++; 126 rsleep(&mbox.arendez); 127 continue; 128 } 129 m = vtmallocz(sizeof(Msg)); 130 m->data = vtmalloc(mbox.msize); 131 m->msize = mbox.msize; 132 mbox.nmsg++; 133 mbox.ahead = m; 134 break; 135 } 136 m = mbox.ahead; 137 mbox.ahead = m->anext; 138 m->anext = nil; 139 qunlock(&mbox.alock); 140 141 m->con = con; 142 m->state = MsgR; 143 m->nowq = 0; 144 145 return m; 146 } 147 148 static void 149 msgMunlink(Msg* m) 150 { 151 Con *con; 152 153 con = m->con; 154 155 if(m->mprev != nil) 156 m->mprev->mnext = m->mnext; 157 else 158 con->mhead = m->mnext; 159 if(m->mnext != nil) 160 m->mnext->mprev = m->mprev; 161 else 162 con->mtail = m->mprev; 163 m->mprev = m->mnext = nil; 164 } 165 166 void 167 msgFlush(Msg* m) 168 { 169 Con *con; 170 Msg *flush, *old; 171 172 con = m->con; 173 174 if(Dflag) 175 fprint(2, "msgFlush %F\n", &m->t); 176 177 /* 178 * If this Tflush has been flushed, nothing to do. 179 * Look for the message to be flushed in the 180 * queue of all messages still on this connection. 181 * If it's not found must assume Elvis has already 182 * left the building and reply normally. 183 */ 184 qlock(&con->mlock); 185 if(m->state == MsgF){ 186 qunlock(&con->mlock); 187 return; 188 } 189 for(old = con->mhead; old != nil; old = old->mnext) 190 if(old->t.tag == m->t.oldtag) 191 break; 192 if(old == nil){ 193 if(Dflag) 194 fprint(2, "msgFlush: cannot find %d\n", m->t.oldtag); 195 qunlock(&con->mlock); 196 return; 197 } 198 199 if(Dflag) 200 fprint(2, "\tmsgFlush found %F\n", &old->t); 201 202 /* 203 * Found it. 204 * There are two cases where the old message can be 205 * truly flushed and no reply to the original message given. 206 * The first is when the old message is in MsgR state; no 207 * processing has been done yet and it is still on the read 208 * queue. The second is if old is a Tflush, which doesn't 209 * affect the server state. In both cases, put the old 210 * message into MsgF state and let MsgWrite toss it after 211 * pulling it off the queue. 212 */ 213 if(old->state == MsgR || old->t.type == Tflush){ 214 old->state = MsgF; 215 if(Dflag) 216 fprint(2, "msgFlush: change %d from MsgR to MsgF\n", 217 m->t.oldtag); 218 } 219 220 /* 221 * Link this flush message and the old message 222 * so multiple flushes can be coalesced (if there are 223 * multiple Tflush messages for a particular pending 224 * request, it is only necessary to respond to the last 225 * one, so any previous can be removed) and to be 226 * sure flushes wait for their corresponding old 227 * message to go out first. 228 * Waiting flush messages do not go on the write queue, 229 * they are processed after the old message is dealt 230 * with. There's no real need to protect the setting of 231 * Msg.nowq, the only code to check it runs in this 232 * process after this routine returns. 233 */ 234 if((flush = old->flush) != nil){ 235 if(Dflag) 236 fprint(2, "msgFlush: remove %d from %d list\n", 237 old->flush->t.tag, old->t.tag); 238 m->flush = flush->flush; 239 flush->flush = nil; 240 msgMunlink(flush); 241 msgFree(flush); 242 } 243 old->flush = m; 244 m->nowq = 1; 245 246 if(Dflag) 247 fprint(2, "msgFlush: add %d to %d queue\n", 248 m->t.tag, old->t.tag); 249 qunlock(&con->mlock); 250 } 251 252 static void 253 msgProc(void* v) 254 { 255 Msg *m; 256 char e[ERRMAX]; 257 Con *con; 258 259 USED(v); 260 threadsetname("msgProc"); 261 262 for(;;){ 263 /* 264 * If surplus to requirements, exit. 265 * If not, wait for and pull a message off 266 * the read queue. 267 */ 268 qlock(&mbox.rlock); 269 if(mbox.nproc > mbox.maxproc){ 270 mbox.nproc--; 271 qunlock(&mbox.rlock); 272 break; 273 } 274 while(mbox.rhead == nil) 275 rsleep(&mbox.rrendez); 276 m = mbox.rhead; 277 mbox.rhead = m->rwnext; 278 m->rwnext = nil; 279 qunlock(&mbox.rlock); 280 281 con = m->con; 282 *e = 0; 283 284 /* 285 * If the message has been flushed before 286 * any 9P processing has started, mark it so 287 * none will be attempted. 288 */ 289 qlock(&con->mlock); 290 if(m->state == MsgF) 291 strcpy(e, "flushed"); 292 else 293 m->state = Msg9; 294 qunlock(&con->mlock); 295 296 if(*e == 0){ 297 /* 298 * explain this 299 */ 300 qlock(&con->lock); 301 if(m->t.type == Tversion){ 302 con->version = m; 303 con->state = ConDown; 304 while(con->mhead != m) 305 rsleep(&con->rendez); 306 assert(con->state == ConDown); 307 if(con->version == m){ 308 con->version = nil; 309 con->state = ConInit; 310 } 311 else 312 strcpy(e, "Tversion aborted"); 313 } 314 else if(con->state != ConUp) 315 strcpy(e, "connection not ready"); 316 qunlock(&con->lock); 317 } 318 319 /* 320 * Dispatch if not error already. 321 */ 322 m->r.tag = m->t.tag; 323 if(*e == 0 && !(*rFcall[m->t.type])(m)) 324 rerrstr(e, sizeof e); 325 if(*e != 0){ 326 m->r.type = Rerror; 327 m->r.ename = e; 328 } 329 else 330 m->r.type = m->t.type+1; 331 332 /* 333 * Put the message (with reply) on the 334 * write queue and wakeup the write process. 335 */ 336 if(!m->nowq){ 337 qlock(&con->wlock); 338 if(con->whead == nil) 339 con->whead = m; 340 else 341 con->wtail->rwnext = m; 342 con->wtail = m; 343 rwakeup(&con->wrendez); 344 qunlock(&con->wlock); 345 } 346 } 347 } 348 349 static void 350 msgRead(void* v) 351 { 352 Msg *m; 353 Con *con; 354 int eof, fd, n; 355 356 threadsetname("msgRead"); 357 358 con = v; 359 fd = con->fd; 360 eof = 0; 361 362 while(!eof){ 363 m = msgAlloc(con); 364 365 n = read9pmsg(fd, m->data, con->msize); 366 if(n <= 0){ 367 m->t.type = Tversion; 368 m->t.fid = NOFID; 369 m->t.tag = NOTAG; 370 m->t.msize = con->msize; 371 m->t.version = "9PEoF"; 372 eof = 1; 373 } 374 else if(convM2S(m->data, n, &m->t) != n){ 375 if(Dflag) 376 fprint(2, "msgRead: convM2S error: %s\n", 377 con->name); 378 msgFree(m); 379 continue; 380 } 381 if(Dflag) 382 fprint(2, "msgRead %p: t %F\n", con, &m->t); 383 384 qlock(&con->mlock); 385 if(con->mtail != nil){ 386 m->mprev = con->mtail; 387 con->mtail->mnext = m; 388 } 389 else{ 390 con->mhead = m; 391 m->mprev = nil; 392 } 393 con->mtail = m; 394 qunlock(&con->mlock); 395 396 qlock(&mbox.rlock); 397 if(mbox.rhead == nil){ 398 mbox.rhead = m; 399 if(!rwakeup(&mbox.rrendez)){ 400 if(mbox.nproc < mbox.maxproc){ 401 if(proccreate(msgProc, nil, STACK) > 0) 402 mbox.nproc++; 403 } 404 else 405 mbox.nprocstarve++; 406 } 407 /* 408 * don't need this surely? 409 rwakeup(&mbox.rrendez); 410 */ 411 } 412 else 413 mbox.rtail->rwnext = m; 414 mbox.rtail = m; 415 qunlock(&mbox.rlock); 416 } 417 } 418 419 static void 420 msgWrite(void* v) 421 { 422 Con *con; 423 int eof, n; 424 Msg *flush, *m; 425 426 threadsetname("msgWrite"); 427 428 con = v; 429 if(proccreate(msgRead, con, STACK) < 0){ 430 conFree(con); 431 return; 432 } 433 434 for(;;){ 435 /* 436 * Wait for and pull a message off the write queue. 437 */ 438 qlock(&con->wlock); 439 while(con->whead == nil) 440 rsleep(&con->wrendez); 441 m = con->whead; 442 con->whead = m->rwnext; 443 m->rwnext = nil; 444 assert(!m->nowq); 445 qunlock(&con->wlock); 446 447 eof = 0; 448 449 /* 450 * Write each message (if it hasn't been flushed) 451 * followed by any messages waiting for it to complete. 452 */ 453 qlock(&con->mlock); 454 while(m != nil){ 455 msgMunlink(m); 456 457 if(Dflag) 458 fprint(2, "msgWrite %d: r %F\n", 459 m->state, &m->r); 460 461 if(m->state != MsgF){ 462 m->state = MsgW; 463 qunlock(&con->mlock); 464 465 n = convS2M(&m->r, con->data, con->msize); 466 if(write(con->fd, con->data, n) != n) 467 eof = 1; 468 469 qlock(&con->mlock); 470 } 471 472 if((flush = m->flush) != nil){ 473 assert(flush->nowq); 474 m->flush = nil; 475 } 476 msgFree(m); 477 m = flush; 478 } 479 qunlock(&con->mlock); 480 481 qlock(&con->lock); 482 if(eof && con->fd >= 0){ 483 close(con->fd); 484 con->fd = -1; 485 } 486 if(con->state == ConDown) 487 rwakeup(&con->rendez); 488 if(con->state == ConMoribund && con->mhead == nil){ 489 qunlock(&con->lock); 490 conFree(con); 491 break; 492 } 493 qunlock(&con->lock); 494 } 495 } 496 497 Con* 498 conAlloc(int fd, char* name, int flags) 499 { 500 Con *con; 501 char buf[128], *p; 502 int rfd, n; 503 504 qlock(&cbox.alock); 505 while(cbox.ahead == nil){ 506 if(cbox.ncon >= cbox.maxcon){ 507 cbox.nconstarve++; 508 rsleep(&cbox.arendez); 509 continue; 510 } 511 con = vtmallocz(sizeof(Con)); 512 con->rendez.l = &con->lock; 513 con->data = vtmalloc(cbox.msize); 514 con->msize = cbox.msize; 515 con->mrendez.l = &con->mlock; 516 con->wrendez.l = &con->wlock; 517 518 cbox.ncon++; 519 cbox.ahead = con; 520 break; 521 } 522 con = cbox.ahead; 523 cbox.ahead = con->anext; 524 con->anext = nil; 525 526 if(cbox.ctail != nil){ 527 con->cprev = cbox.ctail; 528 cbox.ctail->cnext = con; 529 } 530 else{ 531 cbox.chead = con; 532 con->cprev = nil; 533 } 534 cbox.ctail = con; 535 536 assert(con->mhead == nil); 537 assert(con->whead == nil); 538 assert(con->fhead == nil); 539 assert(con->nfid == 0); 540 541 con->state = ConNew; 542 con->fd = fd; 543 if(con->name != nil){ 544 vtfree(con->name); 545 con->name = nil; 546 } 547 if(name != nil) 548 con->name = vtstrdup(name); 549 else 550 con->name = vtstrdup("unknown"); 551 con->remote[0] = 0; 552 snprint(buf, sizeof buf, "%s/remote", con->name); 553 if((rfd = open(buf, OREAD)) >= 0){ 554 n = read(rfd, buf, sizeof buf-1); 555 close(rfd); 556 if(n > 0){ 557 buf[n] = 0; 558 if((p = strchr(buf, '\n')) != nil) 559 *p = 0; 560 strecpy(con->remote, con->remote+sizeof con->remote, buf); 561 } 562 } 563 con->flags = flags; 564 con->isconsole = 0; 565 qunlock(&cbox.alock); 566 567 if(proccreate(msgWrite, con, STACK) < 0){ 568 conFree(con); 569 return nil; 570 } 571 572 return con; 573 } 574 575 static int 576 cmdMsg(int argc, char* argv[]) 577 { 578 char *p; 579 char *usage = "usage: msg [-m nmsg] [-p nproc]"; 580 int maxmsg, nmsg, nmsgstarve, maxproc, nproc, nprocstarve; 581 582 maxmsg = maxproc = 0; 583 584 ARGBEGIN{ 585 default: 586 return cliError(usage); 587 case 'm': 588 p = ARGF(); 589 if(p == nil) 590 return cliError(usage); 591 maxmsg = strtol(argv[0], &p, 0); 592 if(maxmsg <= 0 || p == argv[0] || *p != '\0') 593 return cliError(usage); 594 break; 595 case 'p': 596 p = ARGF(); 597 if(p == nil) 598 return cliError(usage); 599 maxproc = strtol(argv[0], &p, 0); 600 if(maxproc <= 0 || p == argv[0] || *p != '\0') 601 return cliError(usage); 602 break; 603 }ARGEND 604 if(argc) 605 return cliError(usage); 606 607 qlock(&mbox.alock); 608 if(maxmsg) 609 mbox.maxmsg = maxmsg; 610 maxmsg = mbox.maxmsg; 611 nmsg = mbox.nmsg; 612 nmsgstarve = mbox.nmsgstarve; 613 qunlock(&mbox.alock); 614 615 qlock(&mbox.rlock); 616 if(maxproc) 617 mbox.maxproc = maxproc; 618 maxproc = mbox.maxproc; 619 nproc = mbox.nproc; 620 nprocstarve = mbox.nprocstarve; 621 qunlock(&mbox.rlock); 622 623 consPrint("\tmsg -m %d -p %d\n", maxmsg, maxproc); 624 consPrint("\tnmsg %d nmsgstarve %d nproc %d nprocstarve %d\n", 625 nmsg, nmsgstarve, nproc, nprocstarve); 626 627 return 1; 628 } 629 630 static int 631 scmp(Fid *a, Fid *b) 632 { 633 if(a == 0) 634 return 1; 635 if(b == 0) 636 return -1; 637 return strcmp(a->uname, b->uname); 638 } 639 640 static Fid* 641 fidMerge(Fid *a, Fid *b) 642 { 643 Fid *s, **l; 644 645 l = &s; 646 while(a || b){ 647 if(scmp(a, b) < 0){ 648 *l = a; 649 l = &a->sort; 650 a = a->sort; 651 }else{ 652 *l = b; 653 l = &b->sort; 654 b = b->sort; 655 } 656 } 657 *l = 0; 658 return s; 659 } 660 661 static Fid* 662 fidMergeSort(Fid *f) 663 { 664 int delay; 665 Fid *a, *b; 666 667 if(f == nil) 668 return nil; 669 if(f->sort == nil) 670 return f; 671 672 a = b = f; 673 delay = 1; 674 while(a && b){ 675 if(delay) /* easy way to handle 2-element list */ 676 delay = 0; 677 else 678 a = a->sort; 679 if(b = b->sort) 680 b = b->sort; 681 } 682 683 b = a->sort; 684 a->sort = nil; 685 686 a = fidMergeSort(f); 687 b = fidMergeSort(b); 688 689 return fidMerge(a, b); 690 } 691 692 static int 693 cmdWho(int argc, char* argv[]) 694 { 695 char *usage = "usage: who"; 696 int i, l1, l2, l; 697 Con *con; 698 Fid *fid, *last; 699 700 ARGBEGIN{ 701 default: 702 return cliError(usage); 703 }ARGEND 704 705 if(argc > 0) 706 return cliError(usage); 707 708 rlock(&cbox.clock); 709 l1 = 0; 710 l2 = 0; 711 for(con=cbox.chead; con; con=con->cnext){ 712 if((l = strlen(con->name)) > l1) 713 l1 = l; 714 if((l = strlen(con->remote)) > l2) 715 l2 = l; 716 } 717 for(con=cbox.chead; con; con=con->cnext){ 718 consPrint("\t%-*s %-*s", l1, con->name, l2, con->remote); 719 qlock(&con->fidlock); 720 last = nil; 721 for(i=0; i<NFidHash; i++) 722 for(fid=con->fidhash[i]; fid; fid=fid->hash) 723 if(fid->fidno != NOFID && fid->uname){ 724 fid->sort = last; 725 last = fid; 726 } 727 fid = fidMergeSort(last); 728 last = nil; 729 for(; fid; last=fid, fid=fid->sort) 730 if(last==nil || strcmp(fid->uname, last->uname) != 0) 731 consPrint(" %q", fid->uname); 732 qunlock(&con->fidlock); 733 consPrint("\n"); 734 } 735 runlock(&cbox.clock); 736 return 1; 737 } 738 739 void 740 msgInit(void) 741 { 742 mbox.arendez.l = &mbox.alock; 743 744 mbox.rrendez.l = &mbox.rlock; 745 746 mbox.maxmsg = NMsgInit; 747 mbox.maxproc = NMsgProcInit; 748 mbox.msize = NMsizeInit; 749 750 cliAddCmd("msg", cmdMsg); 751 } 752 753 static int 754 cmdCon(int argc, char* argv[]) 755 { 756 char *p; 757 Con *con; 758 char *usage = "usage: con [-m ncon]"; 759 int maxcon, ncon, nconstarve; 760 761 maxcon = 0; 762 763 ARGBEGIN{ 764 default: 765 return cliError(usage); 766 case 'm': 767 p = ARGF(); 768 if(p == nil) 769 return cliError(usage); 770 maxcon = strtol(argv[0], &p, 0); 771 if(maxcon <= 0 || p == argv[0] || *p != '\0') 772 return cliError(usage); 773 break; 774 }ARGEND 775 if(argc) 776 return cliError(usage); 777 778 wlock(&cbox.clock); 779 if(maxcon) 780 cbox.maxcon = maxcon; 781 maxcon = cbox.maxcon; 782 ncon = cbox.ncon; 783 nconstarve = cbox.nconstarve; 784 wunlock(&cbox.clock); 785 786 consPrint("\tcon -m %d\n", maxcon); 787 consPrint("\tncon %d nconstarve %d\n", ncon, nconstarve); 788 789 rlock(&cbox.clock); 790 for(con = cbox.chead; con != nil; con = con->cnext){ 791 consPrint("\t%s\n", con->name); 792 } 793 runlock(&cbox.clock); 794 795 return 1; 796 } 797 798 void 799 conInit(void) 800 { 801 cbox.arendez.l = &cbox.alock; 802 803 cbox.maxcon = NConInit; 804 cbox.msize = NMsizeInit; 805 806 cliAddCmd("con", cmdCon); 807 cliAddCmd("who", cmdWho); 808 }