plan9port

fork of plan9port with libvec, libstr and libsdb
Log | Files | Refs | README | LICENSE

vbackup.c (14404B)


      1 /*
      2  * vbackup [-Dnv] fspartition [score]
      3  *
      4  * Copy a file system to a disk image stored on Venti.
      5  * Prints a vnfs config line for the copied image.
      6  *
      7  *	-D	print debugging
      8  *	-f	'fast' writes - skip write if block exists on server
      9  *	-i	read old scores incrementally
     10  *	-m	set mount name
     11  *	-M	set mount place
     12  *	-n	nop -- don't actually write blocks
     13  *	-s	print status updates
     14  *	-v	print debugging trace
     15  *	-w	write parallelism
     16  *
     17  * If score is given on the command line, it should be the
     18  * score from a previous vbackup on this fspartition.
     19  * In this mode, only the new blocks are stored to Venti.
     20  * The result is still a complete image, but requires many
     21  * fewer Venti writes in the common case.
     22  *
     23  * This program is structured as three processes connected
     24  * by buffered queues:
     25  *
     26  * 	fsysproc | cmpproc | ventiproc
     27  *
     28  * Fsysproc reads the disk and queues the blocks.
     29  * Cmpproc compares the blocks against the SHA1 hashes
     30  * in the old image, if any.  It discards the unchanged blocks
     31  * and queues the changed ones.  Ventiproc writes blocks to Venti.
     32  *
     33  * There is a fourth proc, statusproc, which prints status
     34  * updates about how the various procs are progressing.
     35  */
     36 
     37 #include <u.h>
     38 #include <libc.h>
     39 #include <bio.h>
     40 #include <thread.h>
     41 #include <libsec.h>
     42 #include <venti.h>
     43 #include <diskfs.h>
     44 #include "queue.h"
     45 
     46 enum
     47 {
     48 	STACK = 32768
     49 };
     50 
     51 typedef struct WriteReq WriteReq;
     52 struct WriteReq
     53 {
     54 	Packet *p;
     55 	uint type;
     56 	uchar score[VtScoreSize];
     57 };
     58 
     59 Biobuf	bscores;		/* biobuf filled with block scores */
     60 int		debug;		/* debugging flag (not used) */
     61 Disk*	disk;			/* disk being backed up */
     62 RWLock	endlk;		/* silly synchonization */
     63 int		errors;		/* are we exiting with an error status? */
     64 int		fastwrites;		/* do not write blocks already on server */
     65 int		fsscanblock;	/* last block scanned */
     66 Fsys*	fsys;			/* file system being backed up */
     67 int		incremental;	/* use vscores rather than bscores */
     68 int		nchange;		/* number of changed blocks */
     69 int		nop;			/* don't actually send blocks to venti */
     70 int		nskip;		/* number of blocks skipped (already on server) */
     71 int		nwritethread;	/* number of write-behind threads */
     72 Queue*	qcmp;		/* queue fsys->cmp */
     73 Queue*	qventi;		/* queue cmp->venti */
     74 int		statustime;	/* print status every _ seconds */
     75 int		verbose;		/* print extra stuff */
     76 VtFile*	vfile;			/* venti file being written */
     77 VtFile*	vscores;		/* venti file with block scores */
     78 Channel*	writechan;	/* chan(WriteReq) */
     79 VtConn*	z;			/* connection to venti */
     80 VtCache*	zcache;		/* cache of venti blocks */
     81 uchar*	zero;			/* blocksize zero bytes */
     82 
     83 int nsend, nrecv;
     84 
     85 void		cmpproc(void*);
     86 void		fsysproc(void*);
     87 void		statusproc(void*);
     88 void		ventiproc(void*);
     89 int		timefmt(Fmt*);
     90 char*	guessmountplace(char *dev);
     91 
     92 void
     93 usage(void)
     94 {
     95 	fprint(2, "usage: vbackup [-DVnv] [-m mtpt] [-M mtpl] [-s secs] [-w n] disk [score]\n");
     96 	threadexitsall("usage");
     97 }
     98 
     99 void
    100 threadmain(int argc, char **argv)
    101 {
    102 	char *pref, *mountname, *mountplace;
    103 	uchar score[VtScoreSize], prev[VtScoreSize];
    104 	int i, fd, csize;
    105 	vlong bsize;
    106 	Tm tm;
    107 	VtEntry e;
    108 	VtBlock *b;
    109 	VtCache *c;
    110 	VtRoot root;
    111 	char *tmp, *tmpnam;
    112 
    113 	fmtinstall('F', vtfcallfmt);
    114 	fmtinstall('H', encodefmt);
    115 	fmtinstall('T', timefmt);
    116 	fmtinstall('V', vtscorefmt);
    117 
    118 	mountname = sysname();
    119 	mountplace = nil;
    120 	ARGBEGIN{
    121 	default:
    122 		usage();
    123 		break;
    124 	case 'D':
    125 		debug++;
    126 		break;
    127 	case 'V':
    128 		chattyventi = 1;
    129 		break;
    130 	case 'f':
    131 		fastwrites = 1;
    132 		break;
    133 	case 'i':
    134 		incremental = 1;
    135 		break;
    136 	case 'm':
    137 		mountname = EARGF(usage());
    138 		break;
    139 	case 'M':
    140 		mountplace = EARGF(usage());
    141 		i = strlen(mountplace);
    142 		if(i > 0 && mountplace[i-1] == '/')
    143 			mountplace[i-1] = 0;
    144 		break;
    145 	case 'n':
    146 		nop = 1;
    147 		break;
    148 	case 's':
    149 		statustime = atoi(EARGF(usage()));
    150 		break;
    151 	case 'v':
    152 		verbose = 1;
    153 		break;
    154 	case 'w':
    155 		nwritethread = atoi(EARGF(usage()));
    156 		break;
    157 	}ARGEND
    158 
    159 	if(argc != 1 && argc != 2)
    160 		usage();
    161 
    162 	if(statustime)
    163 		print("# %T vbackup %s %s\n", argv[0], argc>=2 ? argv[1] : "");
    164 
    165 	/*
    166 	 * open fs
    167 	 */
    168 	if((disk = diskopenfile(argv[0])) == nil)
    169 		sysfatal("diskopen: %r");
    170 	if((disk = diskcache(disk, 32768, 2*MAXQ+16)) == nil)
    171 		sysfatal("diskcache: %r");
    172 	if((fsys = fsysopen(disk)) == nil)
    173 		sysfatal("fsysopen: %r");
    174 
    175 	/*
    176 	 * connect to venti
    177 	 */
    178 	if((z = vtdial(nil)) == nil)
    179 		sysfatal("vtdial: %r");
    180 	if(vtconnect(z) < 0)
    181 		sysfatal("vtconnect: %r");
    182 
    183 	/*
    184 	 * set up venti block cache
    185 	 */
    186 	zero = vtmallocz(fsys->blocksize);
    187 	bsize = fsys->blocksize;
    188 	csize = 50;	/* plenty; could probably do with 5 */
    189 
    190 	if(verbose)
    191 		fprint(2, "cache %d blocks\n", csize);
    192 	c = vtcachealloc(z, bsize*csize);
    193 	zcache = c;
    194 
    195 	/*
    196 	 * parse starting score
    197 	 */
    198 	memset(prev, 0, sizeof prev);
    199 	if(argc == 1){
    200 		vfile = vtfilecreateroot(c, (fsys->blocksize/VtScoreSize)*VtScoreSize,
    201 			fsys->blocksize, VtDataType);
    202 		if(vfile == nil)
    203 			sysfatal("vtfilecreateroot: %r");
    204 		vtfilelock(vfile, VtORDWR);
    205 		if(vtfilewrite(vfile, zero, 1, bsize*fsys->nblock-1) != 1)
    206 			sysfatal("vtfilewrite: %r");
    207 		if(vtfileflush(vfile) < 0)
    208 			sysfatal("vtfileflush: %r");
    209 	}else{
    210 		if(vtparsescore(argv[1], &pref, score) < 0)
    211 			sysfatal("bad score: %r");
    212 		if(pref!=nil && strcmp(pref, fsys->type) != 0)
    213 			sysfatal("score is %s but fsys is %s", pref, fsys->type);
    214 		b = vtcacheglobal(c, score, VtRootType, VtRootSize);
    215 		if(b){
    216 			if(vtrootunpack(&root, b->data) < 0)
    217 				sysfatal("bad root: %r");
    218 			if(strcmp(root.type, fsys->type) != 0)
    219 				sysfatal("root is %s but fsys is %s", root.type, fsys->type);
    220 			memmove(prev, score, VtScoreSize);
    221 			memmove(score, root.score, VtScoreSize);
    222 			vtblockput(b);
    223 		}
    224 		b = vtcacheglobal(c, score, VtDirType, VtEntrySize);
    225 		if(b == nil)
    226 			sysfatal("vtcacheglobal %V: %r", score);
    227 		if(vtentryunpack(&e, b->data, 0) < 0)
    228 			sysfatal("%V: vtentryunpack failed", score);
    229 		if(verbose)
    230 			fprint(2, "entry: size %llud psize %d dsize %d\n",
    231 				e.size, e.psize, e.dsize);
    232 		vtblockput(b);
    233 		if((vfile = vtfileopenroot(c, &e)) == nil)
    234 			sysfatal("vtfileopenroot: %r");
    235 		vtfilelock(vfile, VtORDWR);
    236 		if(e.dsize != bsize)
    237 			sysfatal("file system block sizes don't match %d %lld", e.dsize, bsize);
    238 		if(e.size != fsys->nblock*bsize)
    239 			sysfatal("file system block counts don't match %lld %lld", e.size, fsys->nblock*bsize);
    240 	}
    241 
    242 	tmpnam = nil;
    243 	if(incremental){
    244 		if(vtfilegetentry(vfile, &e) < 0)
    245 			sysfatal("vtfilegetentry: %r");
    246 		if((vscores = vtfileopenroot(c, &e)) == nil)
    247 			sysfatal("vtfileopenroot: %r");
    248 		vtfileunlock(vfile);
    249 	}else{
    250 		/*
    251 		 * write scores of blocks into temporary file
    252 		 */
    253 		if((tmp = getenv("TMP")) != nil){
    254 			/* okay, good */
    255 		}else if(access("/var/tmp", 0) >= 0)
    256 			tmp = "/var/tmp";
    257 		else
    258 			tmp = "/tmp";
    259 		tmpnam = smprint("%s/vbackup.XXXXXX", tmp);
    260 		if(tmpnam == nil)
    261 			sysfatal("smprint: %r");
    262 
    263 		if((fd = opentemp(tmpnam, ORDWR|ORCLOSE)) < 0)
    264 			sysfatal("opentemp %s: %r", tmpnam);
    265 		if(statustime)
    266 			print("# %T reading scores into %s\n", tmpnam);
    267 		if(verbose)
    268 			fprint(2, "read scores into %s...\n", tmpnam);
    269 
    270 		Binit(&bscores, fd, OWRITE);
    271 		for(i=0; i<fsys->nblock; i++){
    272 			if(vtfileblockscore(vfile, i, score) < 0)
    273 				sysfatal("vtfileblockhash %d: %r", i);
    274 			if(Bwrite(&bscores, score, VtScoreSize) != VtScoreSize)
    275 				sysfatal("Bwrite: %r");
    276 		}
    277 		Bterm(&bscores);
    278 		vtfileunlock(vfile);
    279 
    280 		/*
    281 		 * prep scores for rereading
    282 		 */
    283 		seek(fd, 0, 0);
    284 		Binit(&bscores, fd, OREAD);
    285 	}
    286 
    287 	/*
    288 	 * start the main processes
    289 	 */
    290 	if(statustime)
    291 		print("# %T starting procs\n");
    292 	qcmp = qalloc();
    293 	qventi = qalloc();
    294 
    295 	rlock(&endlk);
    296 	proccreate(fsysproc, nil, STACK);
    297 	rlock(&endlk);
    298 	proccreate(ventiproc, nil, STACK);
    299 	rlock(&endlk);
    300 	proccreate(cmpproc, nil, STACK);
    301 	if(statustime){
    302 		rlock(&endlk);
    303 		proccreate(statusproc, nil, STACK);
    304 	}
    305 
    306 	/*
    307 	 * wait for processes to finish
    308 	 */
    309 	wlock(&endlk);
    310 
    311 	qfree(qcmp);
    312 	qfree(qventi);
    313 
    314 	if(statustime)
    315 		print("# %T procs exited: %d of %d %d-byte blocks changed, "
    316 			"%d read, %d written, %d skipped, %d copied\n",
    317 			nchange, fsys->nblock, fsys->blocksize,
    318 			vtcachenread, vtcachenwrite, nskip, vtcachencopy);
    319 
    320 	/*
    321 	 * prepare root block
    322 	 */
    323 	if(incremental)
    324 		vtfileclose(vscores);
    325 	vtfilelock(vfile, -1);
    326 	if(vtfileflush(vfile) < 0)
    327 		sysfatal("vtfileflush: %r");
    328 	if(vtfilegetentry(vfile, &e) < 0)
    329 		sysfatal("vtfilegetentry: %r");
    330 	vtfileunlock(vfile);
    331 	vtfileclose(vfile);
    332 
    333 	b = vtcacheallocblock(c, VtDirType, VtEntrySize);
    334 	if(b == nil)
    335 		sysfatal("vtcacheallocblock: %r");
    336 	vtentrypack(&e, b->data, 0);
    337 	if(vtblockwrite(b) < 0)
    338 		sysfatal("vtblockwrite: %r");
    339 
    340 	memset(&root, 0, sizeof root);
    341 	strecpy(root.name, root.name+sizeof root.name, argv[0]);
    342 	strecpy(root.type, root.type+sizeof root.type, fsys->type);
    343 	memmove(root.score, b->score, VtScoreSize);
    344 	root.blocksize = fsys->blocksize;
    345 	memmove(root.prev, prev, VtScoreSize);
    346 	vtblockput(b);
    347 
    348 	b = vtcacheallocblock(c, VtRootType, VtRootSize);
    349 	if(b == nil)
    350 		sysfatal("vtcacheallocblock: %r");
    351 	vtrootpack(&root, b->data);
    352 	if(vtblockwrite(b) < 0)
    353 		sysfatal("vtblockwrite: %r");
    354 
    355 	tm = *localtime(time(0));
    356 	tm.year += 1900;
    357 	tm.mon++;
    358 	if(mountplace == nil)
    359 		mountplace = guessmountplace(argv[0]);
    360 	print("mount /%s/%d/%02d%02d%s %s:%V %d/%02d%02d/%02d%02d\n",
    361 		mountname, tm.year, tm.mon, tm.mday,
    362 		mountplace,
    363 		root.type, b->score,
    364 		tm.year, tm.mon, tm.mday, tm.hour, tm.min);
    365 	print("# %T %s %s:%V\n", argv[0], root.type, b->score);
    366 	if(statustime)
    367 		print("# %T venti sync\n");
    368 	vtblockput(b);
    369 	if(vtsync(z) < 0)
    370 		sysfatal("vtsync: %r");
    371 	if(statustime)
    372 		print("# %T synced\n");
    373 
    374 	fsysclose(fsys);
    375 	diskclose(disk);
    376 	vtcachefree(zcache);
    377 
    378 	// Vtgoodbye hangs on Linux - not sure why.
    379 	// Probably vtfcallrpc doesn't quite get the
    380 	// remote hangup right.  Also probably related
    381 	// to the vtrecvproc problem below.
    382 	// vtgoodbye(z);
    383 
    384 	// Leak here, because I can't seem to make
    385 	// the vtrecvproc exit.
    386 	// vtfreeconn(z);
    387 
    388 	free(tmpnam);
    389 	z = nil;
    390 	zcache = nil;
    391 	fsys = nil;
    392 	disk = nil;
    393 	threadexitsall(nil);
    394 }
    395 
    396 void
    397 fsysproc(void *dummy)
    398 {
    399 	u32int i;
    400 	Block *db;
    401 
    402 	USED(dummy);
    403 
    404 	for(i=0; i<fsys->nblock; i++){
    405 		fsscanblock = i;
    406 		if((db = fsysreadblock(fsys, i)) != nil)
    407 			qwrite(qcmp, db, i);
    408 	}
    409 	fsscanblock = i;
    410 	qclose(qcmp);
    411 
    412 	if(statustime)
    413 		print("# %T fsys proc exiting\n");
    414 	runlock(&endlk);
    415 }
    416 
    417 void
    418 cmpproc(void *dummy)
    419 {
    420 	uchar *data;
    421 	Block *db;
    422 	u32int bno, bsize;
    423 	uchar score[VtScoreSize];
    424 	uchar score1[VtScoreSize];
    425 
    426 	USED(dummy);
    427 
    428 	if(incremental)
    429 		vtfilelock(vscores, VtOREAD);
    430 	bsize = fsys->blocksize;
    431 	while((db = qread(qcmp, &bno)) != nil){
    432 		data = db->data;
    433 		sha1(data, vtzerotruncate(VtDataType, data, bsize), score, nil);
    434 		if(incremental){
    435 			if(vtfileblockscore(vscores, bno, score1) < 0)
    436 				sysfatal("cmpproc vtfileblockscore %d: %r", bno);
    437 		}else{
    438 			if(Bseek(&bscores, (vlong)bno*VtScoreSize, 0) < 0)
    439 				sysfatal("cmpproc Bseek: %r");
    440 			if(Bread(&bscores, score1, VtScoreSize) != VtScoreSize)
    441 				sysfatal("cmpproc Bread: %r");
    442 		}
    443 		if(memcmp(score, score1, VtScoreSize) != 0){
    444 			nchange++;
    445 			if(verbose)
    446 				print("# block %ud: old %V new %V\n", bno, score1, score);
    447 			qwrite(qventi, db, bno);
    448 		}else
    449 			blockput(db);
    450 	}
    451 	qclose(qventi);
    452 	if(incremental)
    453 		vtfileunlock(vscores);
    454 	if(statustime)
    455 		print("# %T cmp proc exiting\n");
    456 	runlock(&endlk);
    457 }
    458 
    459 void
    460 writethread(void *v)
    461 {
    462 	WriteReq wr;
    463 	char err[ERRMAX];
    464 
    465 	USED(v);
    466 
    467 	while(recv(writechan, &wr) == 1){
    468 		nrecv++;
    469 		if(wr.p == nil)
    470 			break;
    471 
    472 		if(fastwrites && vtread(z, wr.score, wr.type, nil, 0) < 0){
    473 			rerrstr(err, sizeof err);
    474 			if(strstr(err, "read too small")){	/* already exists */
    475 				nskip++;
    476 				packetfree(wr.p);
    477 				continue;
    478 			}
    479 		}
    480 		if(vtwritepacket(z, wr.score, wr.type, wr.p) < 0)
    481 			sysfatal("vtwritepacket: %r");
    482 		packetfree(wr.p);
    483 	}
    484 }
    485 
    486 int
    487 myvtwrite(VtConn *z, uchar score[VtScoreSize], uint type, uchar *buf, int n)
    488 {
    489 	WriteReq wr;
    490 
    491 	if(nwritethread == 0){
    492 		n = vtwrite(z, score, type, buf, n);
    493 		if(n < 0)
    494 			sysfatal("vtwrite: %r");
    495 		return n;
    496 	}
    497 
    498 	wr.p = packetalloc();
    499 	packetappend(wr.p, buf, n);
    500 	packetsha1(wr.p, score);
    501 	memmove(wr.score, score, VtScoreSize);
    502 	wr.type = type;
    503 	nsend++;
    504 	send(writechan, &wr);
    505 	return 0;
    506 }
    507 
    508 void
    509 ventiproc(void *dummy)
    510 {
    511 	int i;
    512 	Block *db;
    513 	u32int bno;
    514 	u64int bsize;
    515 
    516 	USED(dummy);
    517 
    518 	proccreate(vtsendproc, z, STACK);
    519 	proccreate(vtrecvproc, z, STACK);
    520 
    521 	writechan = chancreate(sizeof(WriteReq), 0);
    522 	for(i=0; i<nwritethread; i++)
    523 		threadcreate(writethread, nil, STACK);
    524 	vtcachesetwrite(zcache, myvtwrite);
    525 
    526 	bsize = fsys->blocksize;
    527 	vtfilelock(vfile, -1);
    528 	while((db = qread(qventi, &bno)) != nil){
    529 		if(nop){
    530 			blockput(db);
    531 			continue;
    532 		}
    533 		if(vtfilewrite(vfile, db->data, bsize, bno*bsize) != bsize)
    534 			sysfatal("ventiproc vtfilewrite: %r");
    535 		if(vtfileflushbefore(vfile, (bno+1)*bsize) < 0)
    536 			sysfatal("ventiproc vtfileflushbefore: %r");
    537 		blockput(db);
    538 	}
    539 	vtfileunlock(vfile);
    540 	vtcachesetwrite(zcache, nil);
    541 	for(i=0; i<nwritethread; i++)
    542 		send(writechan, nil);
    543 	chanfree(writechan);
    544 	if(statustime)
    545 		print("# %T venti proc exiting - nsend %d nrecv %d\n", nsend, nrecv);
    546 	runlock(&endlk);
    547 }
    548 
    549 static int
    550 percent(u32int a, u32int b)
    551 {
    552 	return (vlong)a*100/b;
    553 }
    554 
    555 void
    556 statusproc(void *dummy)
    557 {
    558 	int n;
    559 	USED(dummy);
    560 
    561 	for(n=0;;n++){
    562 		sleep(1000);
    563 		if(qcmp->closed && qcmp->nel==0 && qventi->closed && qventi->nel==0)
    564 			break;
    565 		if(n < statustime)
    566 			continue;
    567 		n = 0;
    568 		print("# %T fsscan=%d%% cmpq=%d%% ventiq=%d%%\n",
    569 			percent(fsscanblock, fsys->nblock),
    570 			percent(qcmp->nel, MAXQ),
    571 			percent(qventi->nel, MAXQ));
    572 	}
    573 	print("# %T status proc exiting\n");
    574 	runlock(&endlk);
    575 }
    576 
    577 int
    578 timefmt(Fmt *fmt)
    579 {
    580 	vlong ns;
    581 	Tm tm;
    582 	ns = nsec();
    583 	tm = *localtime(time(0));
    584 	return fmtprint(fmt, "%04d/%02d%02d %02d:%02d:%02d.%03d",
    585 		tm.year+1900, tm.mon+1, tm.mday, tm.hour, tm.min, tm.sec,
    586 		(int)(ns%1000000000)/1000000);
    587 }
    588 
    589 char*
    590 guessmountplace(char *dev)
    591 {
    592 	char *cmd, *q;
    593 	int p[2], fd[3], n;
    594 	char buf[100];
    595 
    596 	if(pipe(p) < 0)
    597 		sysfatal("pipe: %r");
    598 
    599 	fd[0] = -1;
    600 	fd[1] = p[1];
    601 	fd[2] = -1;
    602 	cmd = smprint("mount | awk 'BEGIN{v=\"%s\"; u=v; sub(/rdisk/, \"disk\", u);} ($1==v||$1==u) && $2 == \"on\" {print $3}'", dev);
    603 	if(threadspawnl(fd, "sh", "sh", "-c", cmd, nil) < 0)
    604 		sysfatal("exec mount|awk (to find mtpt of %s): %r", dev);
    605 	/* threadspawnl closed p[1] */
    606 	free(cmd);
    607 	n = readn(p[0], buf, sizeof buf-1);
    608 	close(p[0]);
    609 	if(n <= 0)
    610 		return dev;
    611 	buf[n] = 0;
    612 	if((q = strchr(buf, '\n')) == nil)
    613 		return dev;
    614 	*q = 0;
    615 	q = buf+strlen(buf);
    616 	if(q>buf && *(q-1) == '/')
    617 		*--q = 0;
    618 	return strdup(buf);
    619 }