runq.c (13234B)
1 #include "common.h" 2 #include <ctype.h> 3 4 void doalldirs(void); 5 void dodir(char*); 6 void dofile(Dir*); 7 void rundir(char*); 8 char* file(char*, char); 9 void warning(char*, void*); 10 void error(char*, void*); 11 int returnmail(char**, char*, char*); 12 void logit(char*, char*, char**); 13 void doload(int); 14 15 #define HUNK 32 16 char *cmd; 17 char *root; 18 int debug; 19 int giveup = 2*24*60*60; 20 int load; 21 int limit; 22 23 /* the current directory */ 24 Dir *dirbuf; 25 long ndirbuf = 0; 26 int nfiles; 27 char *curdir; 28 29 char *runqlog = "runq"; 30 31 int *pidlist; 32 char **badsys; /* array of recalcitrant systems */ 33 int nbad; 34 int npid = 50; 35 int sflag; /* single thread per directory */ 36 int aflag; /* all directories */ 37 int Eflag; /* ignore E.xxxxxx dates */ 38 int Rflag; /* no giving up, ever */ 39 40 void 41 usage(void) 42 { 43 fprint(2, "usage: runq [-adsE] [-q dir] [-l load] [-t time] [-r nfiles] [-n nprocs] q-root cmd\n"); 44 exits(""); 45 } 46 47 void 48 main(int argc, char **argv) 49 { 50 char *qdir, *x; 51 52 qdir = 0; 53 54 ARGBEGIN{ 55 case 'l': 56 x = ARGF(); 57 if(x == 0) 58 usage(); 59 load = atoi(x); 60 if(load < 0) 61 load = 0; 62 break; 63 case 'E': 64 Eflag++; 65 break; 66 case 'R': /* no giving up -- just leave stuff in the queue */ 67 Rflag++; 68 break; 69 case 'a': 70 aflag++; 71 break; 72 case 'd': 73 debug++; 74 break; 75 case 'r': 76 limit = atoi(ARGF()); 77 break; 78 case 's': 79 sflag++; 80 break; 81 case 't': 82 giveup = 60*60*atoi(ARGF()); 83 break; 84 case 'q': 85 qdir = ARGF(); 86 if(qdir == 0) 87 usage(); 88 break; 89 case 'n': 90 npid = atoi(ARGF()); 91 if(npid == 0) 92 usage(); 93 break; 94 }ARGEND; 95 96 if(argc != 2) 97 usage(); 98 99 pidlist = malloc(npid*sizeof(*pidlist)); 100 if(pidlist == 0) 101 error("can't malloc", 0); 102 103 if(aflag == 0 && qdir == 0) { 104 qdir = getuser(); 105 if(qdir == 0) 106 error("unknown user", 0); 107 } 108 root = argv[0]; 109 cmd = argv[1]; 110 111 if(chdir(root) < 0) 112 error("can't cd to %s", root); 113 114 doload(1); 115 if(aflag) 116 doalldirs(); 117 else 118 dodir(qdir); 119 doload(0); 120 exits(0); 121 } 122 123 int 124 emptydir(char *name) 125 { 126 int fd; 127 long n; 128 char buf[2048]; 129 130 fd = open(name, OREAD); 131 if(fd < 0) 132 return 1; 133 n = read(fd, buf, sizeof(buf)); 134 close(fd); 135 if(n <= 0) { 136 if(debug) 137 fprint(2, "removing directory %s\n", name); 138 syslog(0, runqlog, "rmdir %s", name); 139 sysremove(name); 140 return 1; 141 } 142 return 0; 143 } 144 145 int 146 forkltd(void) 147 { 148 int i; 149 int pid; 150 151 for(i = 0; i < npid; i++){ 152 if(pidlist[i] <= 0) 153 break; 154 } 155 156 while(i >= npid){ 157 pid = waitpid(); 158 if(pid < 0){ 159 syslog(0, runqlog, "forkltd confused"); 160 exits(0); 161 } 162 163 for(i = 0; i < npid; i++) 164 if(pidlist[i] == pid) 165 break; 166 } 167 pidlist[i] = fork(); 168 return pidlist[i]; 169 } 170 171 /* 172 * run all user directories, must be bootes (or root on unix) to do this 173 */ 174 void 175 doalldirs(void) 176 { 177 Dir *db; 178 int fd; 179 long i, n; 180 181 182 fd = open(".", OREAD); 183 if(fd == -1){ 184 warning("reading %s", root); 185 return; 186 } 187 n = sysdirreadall(fd, &db); 188 if(n > 0){ 189 for(i=0; i<n; i++){ 190 if(db[i].qid.type & QTDIR){ 191 if(emptydir(db[i].name)) 192 continue; 193 switch(forkltd()){ 194 case -1: 195 syslog(0, runqlog, "out of procs"); 196 doload(0); 197 exits(0); 198 case 0: 199 if(sysdetach() < 0) 200 error("%r", 0); 201 dodir(db[i].name); 202 exits(0); 203 default: 204 break; 205 } 206 } 207 } 208 free(db); 209 } 210 close(fd); 211 } 212 213 /* 214 * cd to a user directory and run it 215 */ 216 void 217 dodir(char *name) 218 { 219 curdir = name; 220 221 if(chdir(name) < 0){ 222 warning("cd to %s", name); 223 return; 224 } 225 if(debug) 226 fprint(2, "running %s\n", name); 227 rundir(name); 228 chdir(".."); 229 } 230 231 /* 232 * run the current directory 233 */ 234 void 235 rundir(char *name) 236 { 237 int fd; 238 long i; 239 240 if(aflag && sflag) 241 fd = sysopenlocked(".", OREAD); 242 else 243 fd = open(".", OREAD); 244 if(fd == -1){ 245 warning("reading %s", name); 246 return; 247 } 248 nfiles = sysdirreadall(fd, &dirbuf); 249 if(nfiles > 0){ 250 for(i=0; i<nfiles; i++){ 251 if(dirbuf[i].name[0]!='C' || dirbuf[i].name[1]!='.') 252 continue; 253 dofile(&dirbuf[i]); 254 } 255 free(dirbuf); 256 } 257 if(aflag && sflag) 258 sysunlockfile(fd); 259 else 260 close(fd); 261 } 262 263 /* 264 * free files matching name in the current directory 265 */ 266 void 267 remmatch(char *name) 268 { 269 long i; 270 271 syslog(0, runqlog, "removing %s/%s", curdir, name); 272 273 for(i=0; i<nfiles; i++){ 274 if(strcmp(&dirbuf[i].name[1], &name[1]) == 0) 275 sysremove(dirbuf[i].name); 276 } 277 278 /* error file (may have) appeared after we read the directory */ 279 /* stomp on data file in case of phase error */ 280 sysremove(file(name, 'D')); 281 sysremove(file(name, 'E')); 282 } 283 284 /* 285 * like trylock, but we've already got the lock on fd, 286 * and don't want an L. lock file. 287 */ 288 static Mlock * 289 keeplockalive(char *path, int fd) 290 { 291 char buf[1]; 292 Mlock *l; 293 294 l = malloc(sizeof(Mlock)); 295 if(l == 0) 296 return 0; 297 l->fd = fd; 298 l->name = s_new(); 299 s_append(l->name, path); 300 301 /* fork process to keep lock alive until sysunlock(l) */ 302 switch(l->pid = rfork(RFPROC)){ 303 default: 304 break; 305 case 0: 306 fd = l->fd; 307 for(;;){ 308 sleep(1000*60); 309 if(pread(fd, buf, 1, 0) < 0) 310 break; 311 } 312 _exits(0); 313 } 314 return l; 315 } 316 317 /* 318 * try a message 319 */ 320 void 321 dofile(Dir *dp) 322 { 323 Dir *d; 324 int dfd, ac, dtime, efd, pid, i, etime; 325 char *buf, *cp, **av; 326 Waitmsg *wm; 327 Biobuf *b; 328 Mlock *l = nil; 329 330 if(debug) 331 fprint(2, "dofile %s\n", dp->name); 332 /* 333 * if no data file or empty control or data file, just clean up 334 * the empty control file must be 15 minutes old, to minimize the 335 * chance of a race. 336 */ 337 d = dirstat(file(dp->name, 'D')); 338 if(d == nil){ 339 syslog(0, runqlog, "no data file for %s", dp->name); 340 remmatch(dp->name); 341 return; 342 } 343 if(dp->length == 0){ 344 if(time(0)-dp->mtime > 15*60){ 345 syslog(0, runqlog, "empty ctl file for %s", dp->name); 346 remmatch(dp->name); 347 } 348 return; 349 } 350 dtime = d->mtime; 351 free(d); 352 353 /* 354 * retry times depend on the age of the errors file 355 */ 356 if(!Eflag && (d = dirstat(file(dp->name, 'E'))) != nil){ 357 etime = d->mtime; 358 free(d); 359 if(etime - dtime < 60*60){ 360 /* up to the first hour, try every 15 minutes */ 361 if(time(0) - etime < 15*60) 362 return; 363 } else { 364 /* after the first hour, try once an hour */ 365 if(time(0) - etime < 60*60) 366 return; 367 } 368 369 } 370 371 /* 372 * open control and data 373 */ 374 b = sysopen(file(dp->name, 'C'), "rl", 0660); 375 if(b == 0) { 376 if(debug) 377 fprint(2, "can't open %s: %r\n", file(dp->name, 'C')); 378 return; 379 } 380 dfd = open(file(dp->name, 'D'), OREAD); 381 if(dfd < 0){ 382 if(debug) 383 fprint(2, "can't open %s: %r\n", file(dp->name, 'D')); 384 Bterm(b); 385 sysunlockfile(Bfildes(b)); 386 return; 387 } 388 389 /* 390 * make arg list 391 * - read args into (malloc'd) buffer 392 * - malloc a vector and copy pointers to args into it 393 */ 394 buf = malloc(dp->length+1); 395 if(buf == 0){ 396 warning("buffer allocation", 0); 397 Bterm(b); 398 sysunlockfile(Bfildes(b)); 399 close(dfd); 400 return; 401 } 402 if(Bread(b, buf, dp->length) != dp->length){ 403 warning("reading control file %s\n", dp->name); 404 Bterm(b); 405 sysunlockfile(Bfildes(b)); 406 close(dfd); 407 free(buf); 408 return; 409 } 410 buf[dp->length] = 0; 411 av = malloc(2*sizeof(char*)); 412 if(av == 0){ 413 warning("argv allocation", 0); 414 close(dfd); 415 free(buf); 416 Bterm(b); 417 sysunlockfile(Bfildes(b)); 418 return; 419 } 420 for(ac = 1, cp = buf; *cp; ac++){ 421 while(isspace(*cp)) 422 *cp++ = 0; 423 if(*cp == 0) 424 break; 425 426 av = realloc(av, (ac+2)*sizeof(char*)); 427 if(av == 0){ 428 warning("argv allocation", 0); 429 close(dfd); 430 free(buf); 431 Bterm(b); 432 sysunlockfile(Bfildes(b)); 433 return; 434 } 435 av[ac] = cp; 436 while(*cp && !isspace(*cp)){ 437 if(*cp++ == '"'){ 438 while(*cp && *cp != '"') 439 cp++; 440 if(*cp) 441 cp++; 442 } 443 } 444 } 445 av[0] = cmd; 446 av[ac] = 0; 447 448 if(!Eflag &&time(0) - dtime > giveup){ 449 if(returnmail(av, dp->name, "Giveup") != 0) 450 logit("returnmail failed", dp->name, av); 451 remmatch(dp->name); 452 goto done; 453 } 454 455 for(i = 0; i < nbad; i++){ 456 if(strcmp(av[3], badsys[i]) == 0) 457 goto done; 458 } 459 460 /* 461 * Ken's fs, for example, gives us 5 minutes of inactivity before 462 * the lock goes stale, so we have to keep reading it. 463 */ 464 l = keeplockalive(file(dp->name, 'C'), Bfildes(b)); 465 466 /* 467 * transfer 468 */ 469 pid = fork(); 470 switch(pid){ 471 case -1: 472 sysunlock(l); 473 sysunlockfile(Bfildes(b)); 474 syslog(0, runqlog, "out of procs"); 475 exits(0); 476 case 0: 477 if(debug) { 478 fprint(2, "Starting %s", cmd); 479 for(ac = 0; av[ac]; ac++) 480 fprint(2, " %s", av[ac]); 481 fprint(2, "\n"); 482 } 483 logit("execing", dp->name, av); 484 close(0); 485 dup(dfd, 0); 486 close(dfd); 487 close(2); 488 efd = open(file(dp->name, 'E'), OWRITE); 489 if(efd < 0){ 490 if(debug) syslog(0, "runq", "open %s as %s: %r", file(dp->name,'E'), getuser()); 491 efd = create(file(dp->name, 'E'), OWRITE, 0666); 492 if(efd < 0){ 493 if(debug) syslog(0, "runq", "create %s as %s: %r", file(dp->name, 'E'), getuser()); 494 exits("could not open error file - Retry"); 495 } 496 } 497 seek(efd, 0, 2); 498 exec(cmd, av); 499 error("can't exec %s", cmd); 500 break; 501 default: 502 for(;;){ 503 wm = wait(); 504 if(wm == nil) 505 error("wait failed: %r", ""); 506 if(wm->pid == pid) 507 break; 508 free(wm); 509 } 510 if(debug) 511 fprint(2, "wm->pid %d wm->msg == %s\n", wm->pid, wm->msg); 512 513 if(wm->msg[0]){ 514 if(debug) 515 fprint(2, "[%d] wm->msg == %s\n", getpid(), wm->msg); 516 if(!Rflag && atoi(wm->msg) != RetryCode){ 517 /* return the message and remove it */ 518 if(returnmail(av, dp->name, wm->msg) != 0) 519 logit("returnmail failed", dp->name, av); 520 remmatch(dp->name); 521 } else { 522 /* add sys to bad list and try again later */ 523 nbad++; 524 badsys = realloc(badsys, nbad*sizeof(char*)); 525 badsys[nbad-1] = strdup(av[3]); 526 } 527 } else { 528 /* it worked remove the message */ 529 remmatch(dp->name); 530 } 531 free(wm); 532 533 } 534 done: 535 if (l) 536 sysunlock(l); 537 Bterm(b); 538 sysunlockfile(Bfildes(b)); 539 free(buf); 540 free(av); 541 close(dfd); 542 } 543 544 545 /* 546 * return a name starting with the given character 547 */ 548 char* 549 file(char *name, char type) 550 { 551 static char nname[Elemlen+1]; 552 553 strncpy(nname, name, Elemlen); 554 nname[Elemlen] = 0; 555 nname[0] = type; 556 return nname; 557 } 558 559 /* 560 * send back the mail with an error message 561 * 562 * return 0 if successful 563 */ 564 int 565 returnmail(char **av, char *name, char *msg) 566 { 567 int pfd[2]; 568 Waitmsg *wm; 569 int fd; 570 char buf[256]; 571 char attachment[256]; 572 int i; 573 long n; 574 String *s; 575 char *sender; 576 577 if(av[1] == 0 || av[2] == 0){ 578 logit("runq - dumping bad file", name, av); 579 return 0; 580 } 581 582 s = unescapespecial(s_copy(av[2])); 583 sender = s_to_c(s); 584 585 if(!returnable(sender) || strcmp(sender, "postmaster") == 0) { 586 logit("runq - dumping p to p mail", name, av); 587 return 0; 588 } 589 590 if(pipe(pfd) < 0){ 591 logit("runq - pipe failed", name, av); 592 return -1; 593 } 594 595 switch(rfork(RFFDG|RFPROC|RFENVG)){ 596 case -1: 597 logit("runq - fork failed", name, av); 598 return -1; 599 case 0: 600 logit("returning", name, av); 601 close(pfd[1]); 602 close(0); 603 dup(pfd[0], 0); 604 close(pfd[0]); 605 putenv("upasname", "/dev/null"); 606 snprint(buf, sizeof(buf), "%s/marshal", UPASBIN); 607 snprint(attachment, sizeof(attachment), "%s", file(name, 'D')); 608 execl(buf, "send", "-A", attachment, "-s", "permanent failure", sender, nil); 609 error("can't exec", 0); 610 break; 611 default: 612 break; 613 } 614 615 close(pfd[0]); 616 fprint(pfd[1], "\n"); /* get out of headers */ 617 if(av[1]){ 618 fprint(pfd[1], "Your request ``%.20s ", av[1]); 619 for(n = 3; av[n]; n++) 620 fprint(pfd[1], "%s ", av[n]); 621 } 622 fprint(pfd[1], "'' failed (code %s).\nThe symptom was:\n\n", msg); 623 fd = open(file(name, 'E'), OREAD); 624 if(fd >= 0){ 625 for(;;){ 626 n = read(fd, buf, sizeof(buf)); 627 if(n <= 0) 628 break; 629 if(write(pfd[1], buf, n) != n){ 630 close(fd); 631 goto out; 632 } 633 } 634 close(fd); 635 } 636 close(pfd[1]); 637 out: 638 wm = wait(); 639 if(wm == nil){ 640 syslog(0, "runq", "wait: %r"); 641 logit("wait failed", name, av); 642 return -1; 643 } 644 i = 0; 645 if(wm->msg[0]){ 646 i = -1; 647 syslog(0, "runq", "returnmail child: %s", wm->msg); 648 logit("returnmail child failed", name, av); 649 } 650 free(wm); 651 return i; 652 } 653 654 /* 655 * print a warning and continue 656 */ 657 void 658 warning(char *f, void *a) 659 { 660 char err[65]; 661 char buf[256]; 662 663 rerrstr(err, sizeof(err)); 664 snprint(buf, sizeof(buf), f, a); 665 fprint(2, "runq: %s: %s\n", buf, err); 666 } 667 668 /* 669 * print an error and die 670 */ 671 void 672 error(char *f, void *a) 673 { 674 char err[Errlen]; 675 char buf[256]; 676 677 rerrstr(err, sizeof(err)); 678 snprint(buf, sizeof(buf), f, a); 679 fprint(2, "runq: %s: %s\n", buf, err); 680 exits(buf); 681 } 682 683 void 684 logit(char *msg, char *file, char **av) 685 { 686 int n, m; 687 char buf[256]; 688 689 n = snprint(buf, sizeof(buf), "%s/%s: %s", curdir, file, msg); 690 for(; *av; av++){ 691 m = strlen(*av); 692 if(n + m + 4 > sizeof(buf)) 693 break; 694 sprint(buf + n, " '%s'", *av); 695 n += m + 3; 696 } 697 syslog(0, runqlog, "%s", buf); 698 } 699 700 char *loadfile = ".runqload"; 701 702 /* 703 * load balancing 704 */ 705 void 706 doload(int start) 707 { 708 int fd; 709 char buf[32]; 710 int i, n; 711 Mlock *l; 712 Dir *d; 713 714 if(load <= 0) 715 return; 716 717 if(chdir(root) < 0){ 718 load = 0; 719 return; 720 } 721 722 l = syslock(loadfile); 723 fd = open(loadfile, ORDWR); 724 if(fd < 0){ 725 fd = create(loadfile, 0666, ORDWR); 726 if(fd < 0){ 727 load = 0; 728 sysunlock(l); 729 return; 730 } 731 } 732 733 /* get current load */ 734 i = 0; 735 n = read(fd, buf, sizeof(buf)-1); 736 if(n >= 0){ 737 buf[n] = 0; 738 i = atoi(buf); 739 } 740 if(i < 0) 741 i = 0; 742 743 /* ignore load if file hasn't been changed in 30 minutes */ 744 d = dirfstat(fd); 745 if(d != nil){ 746 if(d->mtime + 30*60 < time(0)) 747 i = 0; 748 free(d); 749 } 750 751 /* if load already too high, give up */ 752 if(start && i >= load){ 753 sysunlock(l); 754 exits(0); 755 } 756 757 /* increment/decrement load */ 758 if(start) 759 i++; 760 else 761 i--; 762 seek(fd, 0, 0); 763 fprint(fd, "%d\n", i); 764 sysunlock(l); 765 close(fd); 766 }