plan9port

fork of plan9port with libvec, libstr and libsdb
Log | Files | Refs | README | LICENSE

runq.c (13234B)


      1 #include "common.h"
      2 #include <ctype.h>
      3 
      4 void	doalldirs(void);
      5 void	dodir(char*);
      6 void	dofile(Dir*);
      7 void	rundir(char*);
      8 char*	file(char*, char);
      9 void	warning(char*, void*);
     10 void	error(char*, void*);
     11 int	returnmail(char**, char*, char*);
     12 void	logit(char*, char*, char**);
     13 void	doload(int);
     14 
     15 #define HUNK 32
     16 char	*cmd;
     17 char	*root;
     18 int	debug;
     19 int	giveup = 2*24*60*60;
     20 int	load;
     21 int	limit;
     22 
     23 /* the current directory */
     24 Dir	*dirbuf;
     25 long	ndirbuf = 0;
     26 int	nfiles;
     27 char	*curdir;
     28 
     29 char *runqlog = "runq";
     30 
     31 int	*pidlist;
     32 char	**badsys;		/* array of recalcitrant systems */
     33 int	nbad;
     34 int	npid = 50;
     35 int	sflag;			/* single thread per directory */
     36 int	aflag;			/* all directories */
     37 int	Eflag;			/* ignore E.xxxxxx dates */
     38 int	Rflag;			/* no giving up, ever */
     39 
     40 void
     41 usage(void)
     42 {
     43 	fprint(2, "usage: runq [-adsE] [-q dir] [-l load] [-t time] [-r nfiles] [-n nprocs] q-root cmd\n");
     44 	exits("");
     45 }
     46 
     47 void
     48 main(int argc, char **argv)
     49 {
     50 	char *qdir, *x;
     51 
     52 	qdir = 0;
     53 
     54 	ARGBEGIN{
     55 	case 'l':
     56 		x = ARGF();
     57 		if(x == 0)
     58 			usage();
     59 		load = atoi(x);
     60 		if(load < 0)
     61 			load = 0;
     62 		break;
     63 	case 'E':
     64 		Eflag++;
     65 		break;
     66 	case 'R':	/* no giving up -- just leave stuff in the queue */
     67 		Rflag++;
     68 		break;
     69 	case 'a':
     70 		aflag++;
     71 		break;
     72 	case 'd':
     73 		debug++;
     74 		break;
     75 	case 'r':
     76 		limit = atoi(ARGF());
     77 		break;
     78 	case 's':
     79 		sflag++;
     80 		break;
     81 	case 't':
     82 		giveup = 60*60*atoi(ARGF());
     83 		break;
     84 	case 'q':
     85 		qdir = ARGF();
     86 		if(qdir == 0)
     87 			usage();
     88 		break;
     89 	case 'n':
     90 		npid = atoi(ARGF());
     91 		if(npid == 0)
     92 			usage();
     93 		break;
     94 	}ARGEND;
     95 
     96 	if(argc != 2)
     97 		usage();
     98 
     99 	pidlist = malloc(npid*sizeof(*pidlist));
    100 	if(pidlist == 0)
    101 		error("can't malloc", 0);
    102 
    103 	if(aflag == 0 && qdir == 0) {
    104 		qdir = getuser();
    105 		if(qdir == 0)
    106 			error("unknown user", 0);
    107 	}
    108 	root = argv[0];
    109 	cmd = argv[1];
    110 
    111 	if(chdir(root) < 0)
    112 		error("can't cd to %s", root);
    113 
    114 	doload(1);
    115 	if(aflag)
    116 		doalldirs();
    117 	else
    118 		dodir(qdir);
    119 	doload(0);
    120 	exits(0);
    121 }
    122 
    123 int
    124 emptydir(char *name)
    125 {
    126 	int fd;
    127 	long n;
    128 	char buf[2048];
    129 
    130 	fd = open(name, OREAD);
    131 	if(fd < 0)
    132 		return 1;
    133 	n = read(fd, buf, sizeof(buf));
    134 	close(fd);
    135 	if(n <= 0) {
    136 		if(debug)
    137 			fprint(2, "removing directory %s\n", name);
    138 		syslog(0, runqlog, "rmdir %s", name);
    139 		sysremove(name);
    140 		return 1;
    141 	}
    142 	return 0;
    143 }
    144 
    145 int
    146 forkltd(void)
    147 {
    148 	int i;
    149 	int pid;
    150 
    151 	for(i = 0; i < npid; i++){
    152 		if(pidlist[i] <= 0)
    153 			break;
    154 	}
    155 
    156 	while(i >= npid){
    157 		pid = waitpid();
    158 		if(pid < 0){
    159 			syslog(0, runqlog, "forkltd confused");
    160 			exits(0);
    161 		}
    162 
    163 		for(i = 0; i < npid; i++)
    164 			if(pidlist[i] == pid)
    165 				break;
    166 	}
    167 	pidlist[i] = fork();
    168 	return pidlist[i];
    169 }
    170 
    171 /*
    172  *  run all user directories, must be bootes (or root on unix) to do this
    173  */
    174 void
    175 doalldirs(void)
    176 {
    177 	Dir *db;
    178 	int fd;
    179 	long i, n;
    180 
    181 
    182 	fd = open(".", OREAD);
    183 	if(fd == -1){
    184 		warning("reading %s", root);
    185 		return;
    186 	}
    187 	n = sysdirreadall(fd, &db);
    188 	if(n > 0){
    189 		for(i=0; i<n; i++){
    190 			if(db[i].qid.type & QTDIR){
    191 				if(emptydir(db[i].name))
    192 					continue;
    193 				switch(forkltd()){
    194 				case -1:
    195 					syslog(0, runqlog, "out of procs");
    196 					doload(0);
    197 					exits(0);
    198 				case 0:
    199 					if(sysdetach() < 0)
    200 						error("%r", 0);
    201 					dodir(db[i].name);
    202 					exits(0);
    203 				default:
    204 					break;
    205 				}
    206 			}
    207 		}
    208 		free(db);
    209 	}
    210 	close(fd);
    211 }
    212 
    213 /*
    214  *  cd to a user directory and run it
    215  */
    216 void
    217 dodir(char *name)
    218 {
    219 	curdir = name;
    220 
    221 	if(chdir(name) < 0){
    222 		warning("cd to %s", name);
    223 		return;
    224 	}
    225 	if(debug)
    226 		fprint(2, "running %s\n", name);
    227 	rundir(name);
    228 	chdir("..");
    229 }
    230 
    231 /*
    232  *  run the current directory
    233  */
    234 void
    235 rundir(char *name)
    236 {
    237 	int fd;
    238 	long i;
    239 
    240 	if(aflag && sflag)
    241 		fd = sysopenlocked(".", OREAD);
    242 	else
    243 		fd = open(".", OREAD);
    244 	if(fd == -1){
    245 		warning("reading %s", name);
    246 		return;
    247 	}
    248 	nfiles = sysdirreadall(fd, &dirbuf);
    249 	if(nfiles > 0){
    250 		for(i=0; i<nfiles; i++){
    251 			if(dirbuf[i].name[0]!='C' || dirbuf[i].name[1]!='.')
    252 				continue;
    253 			dofile(&dirbuf[i]);
    254 		}
    255 		free(dirbuf);
    256 	}
    257 	if(aflag && sflag)
    258 		sysunlockfile(fd);
    259 	else
    260 		close(fd);
    261 }
    262 
    263 /*
    264  *  free files matching name in the current directory
    265  */
    266 void
    267 remmatch(char *name)
    268 {
    269 	long i;
    270 
    271 	syslog(0, runqlog, "removing %s/%s", curdir, name);
    272 
    273 	for(i=0; i<nfiles; i++){
    274 		if(strcmp(&dirbuf[i].name[1], &name[1]) == 0)
    275 			sysremove(dirbuf[i].name);
    276 	}
    277 
    278 	/* error file (may have) appeared after we read the directory */
    279 	/* stomp on data file in case of phase error */
    280 	sysremove(file(name, 'D'));
    281 	sysremove(file(name, 'E'));
    282 }
    283 
    284 /*
    285  *  like trylock, but we've already got the lock on fd,
    286  *  and don't want an L. lock file.
    287  */
    288 static Mlock *
    289 keeplockalive(char *path, int fd)
    290 {
    291 	char buf[1];
    292 	Mlock *l;
    293 
    294 	l = malloc(sizeof(Mlock));
    295 	if(l == 0)
    296 		return 0;
    297 	l->fd = fd;
    298 	l->name = s_new();
    299 	s_append(l->name, path);
    300 
    301 	/* fork process to keep lock alive until sysunlock(l) */
    302 	switch(l->pid = rfork(RFPROC)){
    303 	default:
    304 		break;
    305 	case 0:
    306 		fd = l->fd;
    307 		for(;;){
    308 			sleep(1000*60);
    309 			if(pread(fd, buf, 1, 0) < 0)
    310 				break;
    311 		}
    312 		_exits(0);
    313 	}
    314 	return l;
    315 }
    316 
    317 /*
    318  *  try a message
    319  */
    320 void
    321 dofile(Dir *dp)
    322 {
    323 	Dir *d;
    324 	int dfd, ac, dtime, efd, pid, i, etime;
    325 	char *buf, *cp, **av;
    326 	Waitmsg *wm;
    327 	Biobuf *b;
    328 	Mlock *l = nil;
    329 
    330 	if(debug)
    331 		fprint(2, "dofile %s\n", dp->name);
    332 	/*
    333 	 *  if no data file or empty control or data file, just clean up
    334 	 *  the empty control file must be 15 minutes old, to minimize the
    335 	 *  chance of a race.
    336 	 */
    337 	d = dirstat(file(dp->name, 'D'));
    338 	if(d == nil){
    339 		syslog(0, runqlog, "no data file for %s", dp->name);
    340 		remmatch(dp->name);
    341 		return;
    342 	}
    343 	if(dp->length == 0){
    344 		if(time(0)-dp->mtime > 15*60){
    345 			syslog(0, runqlog, "empty ctl file for %s", dp->name);
    346 			remmatch(dp->name);
    347 		}
    348 		return;
    349 	}
    350 	dtime = d->mtime;
    351 	free(d);
    352 
    353 	/*
    354 	 *  retry times depend on the age of the errors file
    355 	 */
    356 	if(!Eflag && (d = dirstat(file(dp->name, 'E'))) != nil){
    357 		etime = d->mtime;
    358 		free(d);
    359 		if(etime - dtime < 60*60){
    360 			/* up to the first hour, try every 15 minutes */
    361 			if(time(0) - etime < 15*60)
    362 				return;
    363 		} else {
    364 			/* after the first hour, try once an hour */
    365 			if(time(0) - etime < 60*60)
    366 				return;
    367 		}
    368 
    369 	}
    370 
    371 	/*
    372 	 *  open control and data
    373 	 */
    374 	b = sysopen(file(dp->name, 'C'), "rl", 0660);
    375 	if(b == 0) {
    376 		if(debug)
    377 			fprint(2, "can't open %s: %r\n", file(dp->name, 'C'));
    378 		return;
    379 	}
    380 	dfd = open(file(dp->name, 'D'), OREAD);
    381 	if(dfd < 0){
    382 		if(debug)
    383 			fprint(2, "can't open %s: %r\n", file(dp->name, 'D'));
    384 		Bterm(b);
    385 		sysunlockfile(Bfildes(b));
    386 		return;
    387 	}
    388 
    389 	/*
    390 	 *  make arg list
    391 	 *	- read args into (malloc'd) buffer
    392 	 *	- malloc a vector and copy pointers to args into it
    393 	 */
    394 	buf = malloc(dp->length+1);
    395 	if(buf == 0){
    396 		warning("buffer allocation", 0);
    397 		Bterm(b);
    398 		sysunlockfile(Bfildes(b));
    399 		close(dfd);
    400 		return;
    401 	}
    402 	if(Bread(b, buf, dp->length) != dp->length){
    403 		warning("reading control file %s\n", dp->name);
    404 		Bterm(b);
    405 		sysunlockfile(Bfildes(b));
    406 		close(dfd);
    407 		free(buf);
    408 		return;
    409 	}
    410 	buf[dp->length] = 0;
    411 	av = malloc(2*sizeof(char*));
    412 	if(av == 0){
    413 		warning("argv allocation", 0);
    414 		close(dfd);
    415 		free(buf);
    416 		Bterm(b);
    417 		sysunlockfile(Bfildes(b));
    418 		return;
    419 	}
    420 	for(ac = 1, cp = buf; *cp; ac++){
    421 		while(isspace(*cp))
    422 			*cp++ = 0;
    423 		if(*cp == 0)
    424 			break;
    425 
    426 		av = realloc(av, (ac+2)*sizeof(char*));
    427 		if(av == 0){
    428 			warning("argv allocation", 0);
    429 			close(dfd);
    430 			free(buf);
    431 			Bterm(b);
    432 			sysunlockfile(Bfildes(b));
    433 			return;
    434 		}
    435 		av[ac] = cp;
    436 		while(*cp && !isspace(*cp)){
    437 			if(*cp++ == '"'){
    438 				while(*cp && *cp != '"')
    439 					cp++;
    440 				if(*cp)
    441 					cp++;
    442 			}
    443 		}
    444 	}
    445 	av[0] = cmd;
    446 	av[ac] = 0;
    447 
    448 	if(!Eflag &&time(0) - dtime > giveup){
    449 		if(returnmail(av, dp->name, "Giveup") != 0)
    450 			logit("returnmail failed", dp->name, av);
    451 		remmatch(dp->name);
    452 		goto done;
    453 	}
    454 
    455 	for(i = 0; i < nbad; i++){
    456 		if(strcmp(av[3], badsys[i]) == 0)
    457 			goto done;
    458 	}
    459 
    460 	/*
    461 	 * Ken's fs, for example, gives us 5 minutes of inactivity before
    462 	 * the lock goes stale, so we have to keep reading it.
    463  	 */
    464 	l = keeplockalive(file(dp->name, 'C'), Bfildes(b));
    465 
    466 	/*
    467 	 *  transfer
    468 	 */
    469 	pid = fork();
    470 	switch(pid){
    471 	case -1:
    472 		sysunlock(l);
    473 		sysunlockfile(Bfildes(b));
    474 		syslog(0, runqlog, "out of procs");
    475 		exits(0);
    476 	case 0:
    477 		if(debug) {
    478 			fprint(2, "Starting %s", cmd);
    479 			for(ac = 0; av[ac]; ac++)
    480 				fprint(2, " %s", av[ac]);
    481 			fprint(2, "\n");
    482 		}
    483 		logit("execing", dp->name, av);
    484 		close(0);
    485 		dup(dfd, 0);
    486 		close(dfd);
    487 		close(2);
    488 		efd = open(file(dp->name, 'E'), OWRITE);
    489 		if(efd < 0){
    490 			if(debug) syslog(0, "runq", "open %s as %s: %r", file(dp->name,'E'), getuser());
    491 			efd = create(file(dp->name, 'E'), OWRITE, 0666);
    492 			if(efd < 0){
    493 				if(debug) syslog(0, "runq", "create %s as %s: %r", file(dp->name, 'E'), getuser());
    494 				exits("could not open error file - Retry");
    495 			}
    496 		}
    497 		seek(efd, 0, 2);
    498 		exec(cmd, av);
    499 		error("can't exec %s", cmd);
    500 		break;
    501 	default:
    502 		for(;;){
    503 			wm = wait();
    504 			if(wm == nil)
    505 				error("wait failed: %r", "");
    506 			if(wm->pid == pid)
    507 				break;
    508 			free(wm);
    509 		}
    510 		if(debug)
    511 			fprint(2, "wm->pid %d wm->msg == %s\n", wm->pid, wm->msg);
    512 
    513 		if(wm->msg[0]){
    514 			if(debug)
    515 				fprint(2, "[%d] wm->msg == %s\n", getpid(), wm->msg);
    516 			if(!Rflag && atoi(wm->msg) != RetryCode){
    517 				/* return the message and remove it */
    518 				if(returnmail(av, dp->name, wm->msg) != 0)
    519 					logit("returnmail failed", dp->name, av);
    520 				remmatch(dp->name);
    521 			} else {
    522 				/* add sys to bad list and try again later */
    523 				nbad++;
    524 				badsys = realloc(badsys, nbad*sizeof(char*));
    525 				badsys[nbad-1] = strdup(av[3]);
    526 			}
    527 		} else {
    528 			/* it worked remove the message */
    529 			remmatch(dp->name);
    530 		}
    531 		free(wm);
    532 
    533 	}
    534 done:
    535 	if (l)
    536 		sysunlock(l);
    537 	Bterm(b);
    538 	sysunlockfile(Bfildes(b));
    539 	free(buf);
    540 	free(av);
    541 	close(dfd);
    542 }
    543 
    544 
    545 /*
    546  *  return a name starting with the given character
    547  */
    548 char*
    549 file(char *name, char type)
    550 {
    551 	static char nname[Elemlen+1];
    552 
    553 	strncpy(nname, name, Elemlen);
    554 	nname[Elemlen] = 0;
    555 	nname[0] = type;
    556 	return nname;
    557 }
    558 
    559 /*
    560  *  send back the mail with an error message
    561  *
    562  *  return 0 if successful
    563  */
    564 int
    565 returnmail(char **av, char *name, char *msg)
    566 {
    567 	int pfd[2];
    568 	Waitmsg *wm;
    569 	int fd;
    570 	char buf[256];
    571 	char attachment[256];
    572 	int i;
    573 	long n;
    574 	String *s;
    575 	char *sender;
    576 
    577 	if(av[1] == 0 || av[2] == 0){
    578 		logit("runq - dumping bad file", name, av);
    579 		return 0;
    580 	}
    581 
    582 	s = unescapespecial(s_copy(av[2]));
    583 	sender = s_to_c(s);
    584 
    585 	if(!returnable(sender) || strcmp(sender, "postmaster") == 0) {
    586 		logit("runq - dumping p to p mail", name, av);
    587 		return 0;
    588 	}
    589 
    590 	if(pipe(pfd) < 0){
    591 		logit("runq - pipe failed", name, av);
    592 		return -1;
    593 	}
    594 
    595 	switch(rfork(RFFDG|RFPROC|RFENVG)){
    596 	case -1:
    597 		logit("runq - fork failed", name, av);
    598 		return -1;
    599 	case 0:
    600 		logit("returning", name, av);
    601 		close(pfd[1]);
    602 		close(0);
    603 		dup(pfd[0], 0);
    604 		close(pfd[0]);
    605 		putenv("upasname", "/dev/null");
    606 		snprint(buf, sizeof(buf), "%s/marshal", UPASBIN);
    607 		snprint(attachment, sizeof(attachment), "%s", file(name, 'D'));
    608 		execl(buf, "send", "-A", attachment, "-s", "permanent failure", sender, nil);
    609 		error("can't exec", 0);
    610 		break;
    611 	default:
    612 		break;
    613 	}
    614 
    615 	close(pfd[0]);
    616 	fprint(pfd[1], "\n");	/* get out of headers */
    617 	if(av[1]){
    618 		fprint(pfd[1], "Your request ``%.20s ", av[1]);
    619 		for(n = 3; av[n]; n++)
    620 			fprint(pfd[1], "%s ", av[n]);
    621 	}
    622 	fprint(pfd[1], "'' failed (code %s).\nThe symptom was:\n\n", msg);
    623 	fd = open(file(name, 'E'), OREAD);
    624 	if(fd >= 0){
    625 		for(;;){
    626 			n = read(fd, buf, sizeof(buf));
    627 			if(n <= 0)
    628 				break;
    629 			if(write(pfd[1], buf, n) != n){
    630 				close(fd);
    631 				goto out;
    632 			}
    633 		}
    634 		close(fd);
    635 	}
    636 	close(pfd[1]);
    637 out:
    638 	wm = wait();
    639 	if(wm == nil){
    640 		syslog(0, "runq", "wait: %r");
    641 		logit("wait failed", name, av);
    642 		return -1;
    643 	}
    644 	i = 0;
    645 	if(wm->msg[0]){
    646 		i = -1;
    647 		syslog(0, "runq", "returnmail child: %s", wm->msg);
    648 		logit("returnmail child failed", name, av);
    649 	}
    650 	free(wm);
    651 	return i;
    652 }
    653 
    654 /*
    655  *  print a warning and continue
    656  */
    657 void
    658 warning(char *f, void *a)
    659 {
    660 	char err[65];
    661 	char buf[256];
    662 
    663 	rerrstr(err, sizeof(err));
    664 	snprint(buf, sizeof(buf), f, a);
    665 	fprint(2, "runq: %s: %s\n", buf, err);
    666 }
    667 
    668 /*
    669  *  print an error and die
    670  */
    671 void
    672 error(char *f, void *a)
    673 {
    674 	char err[Errlen];
    675 	char buf[256];
    676 
    677 	rerrstr(err, sizeof(err));
    678 	snprint(buf, sizeof(buf), f, a);
    679 	fprint(2, "runq: %s: %s\n", buf, err);
    680 	exits(buf);
    681 }
    682 
    683 void
    684 logit(char *msg, char *file, char **av)
    685 {
    686 	int n, m;
    687 	char buf[256];
    688 
    689 	n = snprint(buf, sizeof(buf), "%s/%s: %s", curdir, file, msg);
    690 	for(; *av; av++){
    691 		m = strlen(*av);
    692 		if(n + m + 4 > sizeof(buf))
    693 			break;
    694 		sprint(buf + n, " '%s'", *av);
    695 		n += m + 3;
    696 	}
    697 	syslog(0, runqlog, "%s", buf);
    698 }
    699 
    700 char *loadfile = ".runqload";
    701 
    702 /*
    703  *  load balancing
    704  */
    705 void
    706 doload(int start)
    707 {
    708 	int fd;
    709 	char buf[32];
    710 	int i, n;
    711 	Mlock *l;
    712 	Dir *d;
    713 
    714 	if(load <= 0)
    715 		return;
    716 
    717 	if(chdir(root) < 0){
    718 		load = 0;
    719 		return;
    720 	}
    721 
    722 	l = syslock(loadfile);
    723 	fd = open(loadfile, ORDWR);
    724 	if(fd < 0){
    725 		fd = create(loadfile, 0666, ORDWR);
    726 		if(fd < 0){
    727 			load = 0;
    728 			sysunlock(l);
    729 			return;
    730 		}
    731 	}
    732 
    733 	/* get current load */
    734 	i = 0;
    735 	n = read(fd, buf, sizeof(buf)-1);
    736 	if(n >= 0){
    737 		buf[n] = 0;
    738 		i = atoi(buf);
    739 	}
    740 	if(i < 0)
    741 		i = 0;
    742 
    743 	/* ignore load if file hasn't been changed in 30 minutes */
    744 	d = dirfstat(fd);
    745 	if(d != nil){
    746 		if(d->mtime + 30*60 < time(0))
    747 			i = 0;
    748 		free(d);
    749 	}
    750 
    751 	/* if load already too high, give up */
    752 	if(start && i >= load){
    753 		sysunlock(l);
    754 		exits(0);
    755 	}
    756 
    757 	/* increment/decrement load */
    758 	if(start)
    759 		i++;
    760 	else
    761 		i--;
    762 	seek(fd, 0, 0);
    763 	fprint(fd, "%d\n", i);
    764 	sysunlock(l);
    765 	close(fd);
    766 }