vbackup.c (14404B)
1 /* 2 * vbackup [-Dnv] fspartition [score] 3 * 4 * Copy a file system to a disk image stored on Venti. 5 * Prints a vnfs config line for the copied image. 6 * 7 * -D print debugging 8 * -f 'fast' writes - skip write if block exists on server 9 * -i read old scores incrementally 10 * -m set mount name 11 * -M set mount place 12 * -n nop -- don't actually write blocks 13 * -s print status updates 14 * -v print debugging trace 15 * -w write parallelism 16 * 17 * If score is given on the command line, it should be the 18 * score from a previous vbackup on this fspartition. 19 * In this mode, only the new blocks are stored to Venti. 20 * The result is still a complete image, but requires many 21 * fewer Venti writes in the common case. 22 * 23 * This program is structured as three processes connected 24 * by buffered queues: 25 * 26 * fsysproc | cmpproc | ventiproc 27 * 28 * Fsysproc reads the disk and queues the blocks. 29 * Cmpproc compares the blocks against the SHA1 hashes 30 * in the old image, if any. It discards the unchanged blocks 31 * and queues the changed ones. Ventiproc writes blocks to Venti. 32 * 33 * There is a fourth proc, statusproc, which prints status 34 * updates about how the various procs are progressing. 35 */ 36 37 #include <u.h> 38 #include <libc.h> 39 #include <bio.h> 40 #include <thread.h> 41 #include <libsec.h> 42 #include <venti.h> 43 #include <diskfs.h> 44 #include "queue.h" 45 46 enum 47 { 48 STACK = 32768 49 }; 50 51 typedef struct WriteReq WriteReq; 52 struct WriteReq 53 { 54 Packet *p; 55 uint type; 56 uchar score[VtScoreSize]; 57 }; 58 59 Biobuf bscores; /* biobuf filled with block scores */ 60 int debug; /* debugging flag (not used) */ 61 Disk* disk; /* disk being backed up */ 62 RWLock endlk; /* silly synchonization */ 63 int errors; /* are we exiting with an error status? */ 64 int fastwrites; /* do not write blocks already on server */ 65 int fsscanblock; /* last block scanned */ 66 Fsys* fsys; /* file system being backed up */ 67 int incremental; /* use vscores rather than bscores */ 68 int nchange; /* number of changed blocks */ 69 int nop; /* don't actually send blocks to venti */ 70 int nskip; /* number of blocks skipped (already on server) */ 71 int nwritethread; /* number of write-behind threads */ 72 Queue* qcmp; /* queue fsys->cmp */ 73 Queue* qventi; /* queue cmp->venti */ 74 int statustime; /* print status every _ seconds */ 75 int verbose; /* print extra stuff */ 76 VtFile* vfile; /* venti file being written */ 77 VtFile* vscores; /* venti file with block scores */ 78 Channel* writechan; /* chan(WriteReq) */ 79 VtConn* z; /* connection to venti */ 80 VtCache* zcache; /* cache of venti blocks */ 81 uchar* zero; /* blocksize zero bytes */ 82 83 int nsend, nrecv; 84 85 void cmpproc(void*); 86 void fsysproc(void*); 87 void statusproc(void*); 88 void ventiproc(void*); 89 int timefmt(Fmt*); 90 char* guessmountplace(char *dev); 91 92 void 93 usage(void) 94 { 95 fprint(2, "usage: vbackup [-DVnv] [-m mtpt] [-M mtpl] [-s secs] [-w n] disk [score]\n"); 96 threadexitsall("usage"); 97 } 98 99 void 100 threadmain(int argc, char **argv) 101 { 102 char *pref, *mountname, *mountplace; 103 uchar score[VtScoreSize], prev[VtScoreSize]; 104 int i, fd, csize; 105 vlong bsize; 106 Tm tm; 107 VtEntry e; 108 VtBlock *b; 109 VtCache *c; 110 VtRoot root; 111 char *tmp, *tmpnam; 112 113 fmtinstall('F', vtfcallfmt); 114 fmtinstall('H', encodefmt); 115 fmtinstall('T', timefmt); 116 fmtinstall('V', vtscorefmt); 117 118 mountname = sysname(); 119 mountplace = nil; 120 ARGBEGIN{ 121 default: 122 usage(); 123 break; 124 case 'D': 125 debug++; 126 break; 127 case 'V': 128 chattyventi = 1; 129 break; 130 case 'f': 131 fastwrites = 1; 132 break; 133 case 'i': 134 incremental = 1; 135 break; 136 case 'm': 137 mountname = EARGF(usage()); 138 break; 139 case 'M': 140 mountplace = EARGF(usage()); 141 i = strlen(mountplace); 142 if(i > 0 && mountplace[i-1] == '/') 143 mountplace[i-1] = 0; 144 break; 145 case 'n': 146 nop = 1; 147 break; 148 case 's': 149 statustime = atoi(EARGF(usage())); 150 break; 151 case 'v': 152 verbose = 1; 153 break; 154 case 'w': 155 nwritethread = atoi(EARGF(usage())); 156 break; 157 }ARGEND 158 159 if(argc != 1 && argc != 2) 160 usage(); 161 162 if(statustime) 163 print("# %T vbackup %s %s\n", argv[0], argc>=2 ? argv[1] : ""); 164 165 /* 166 * open fs 167 */ 168 if((disk = diskopenfile(argv[0])) == nil) 169 sysfatal("diskopen: %r"); 170 if((disk = diskcache(disk, 32768, 2*MAXQ+16)) == nil) 171 sysfatal("diskcache: %r"); 172 if((fsys = fsysopen(disk)) == nil) 173 sysfatal("fsysopen: %r"); 174 175 /* 176 * connect to venti 177 */ 178 if((z = vtdial(nil)) == nil) 179 sysfatal("vtdial: %r"); 180 if(vtconnect(z) < 0) 181 sysfatal("vtconnect: %r"); 182 183 /* 184 * set up venti block cache 185 */ 186 zero = vtmallocz(fsys->blocksize); 187 bsize = fsys->blocksize; 188 csize = 50; /* plenty; could probably do with 5 */ 189 190 if(verbose) 191 fprint(2, "cache %d blocks\n", csize); 192 c = vtcachealloc(z, bsize*csize); 193 zcache = c; 194 195 /* 196 * parse starting score 197 */ 198 memset(prev, 0, sizeof prev); 199 if(argc == 1){ 200 vfile = vtfilecreateroot(c, (fsys->blocksize/VtScoreSize)*VtScoreSize, 201 fsys->blocksize, VtDataType); 202 if(vfile == nil) 203 sysfatal("vtfilecreateroot: %r"); 204 vtfilelock(vfile, VtORDWR); 205 if(vtfilewrite(vfile, zero, 1, bsize*fsys->nblock-1) != 1) 206 sysfatal("vtfilewrite: %r"); 207 if(vtfileflush(vfile) < 0) 208 sysfatal("vtfileflush: %r"); 209 }else{ 210 if(vtparsescore(argv[1], &pref, score) < 0) 211 sysfatal("bad score: %r"); 212 if(pref!=nil && strcmp(pref, fsys->type) != 0) 213 sysfatal("score is %s but fsys is %s", pref, fsys->type); 214 b = vtcacheglobal(c, score, VtRootType, VtRootSize); 215 if(b){ 216 if(vtrootunpack(&root, b->data) < 0) 217 sysfatal("bad root: %r"); 218 if(strcmp(root.type, fsys->type) != 0) 219 sysfatal("root is %s but fsys is %s", root.type, fsys->type); 220 memmove(prev, score, VtScoreSize); 221 memmove(score, root.score, VtScoreSize); 222 vtblockput(b); 223 } 224 b = vtcacheglobal(c, score, VtDirType, VtEntrySize); 225 if(b == nil) 226 sysfatal("vtcacheglobal %V: %r", score); 227 if(vtentryunpack(&e, b->data, 0) < 0) 228 sysfatal("%V: vtentryunpack failed", score); 229 if(verbose) 230 fprint(2, "entry: size %llud psize %d dsize %d\n", 231 e.size, e.psize, e.dsize); 232 vtblockput(b); 233 if((vfile = vtfileopenroot(c, &e)) == nil) 234 sysfatal("vtfileopenroot: %r"); 235 vtfilelock(vfile, VtORDWR); 236 if(e.dsize != bsize) 237 sysfatal("file system block sizes don't match %d %lld", e.dsize, bsize); 238 if(e.size != fsys->nblock*bsize) 239 sysfatal("file system block counts don't match %lld %lld", e.size, fsys->nblock*bsize); 240 } 241 242 tmpnam = nil; 243 if(incremental){ 244 if(vtfilegetentry(vfile, &e) < 0) 245 sysfatal("vtfilegetentry: %r"); 246 if((vscores = vtfileopenroot(c, &e)) == nil) 247 sysfatal("vtfileopenroot: %r"); 248 vtfileunlock(vfile); 249 }else{ 250 /* 251 * write scores of blocks into temporary file 252 */ 253 if((tmp = getenv("TMP")) != nil){ 254 /* okay, good */ 255 }else if(access("/var/tmp", 0) >= 0) 256 tmp = "/var/tmp"; 257 else 258 tmp = "/tmp"; 259 tmpnam = smprint("%s/vbackup.XXXXXX", tmp); 260 if(tmpnam == nil) 261 sysfatal("smprint: %r"); 262 263 if((fd = opentemp(tmpnam, ORDWR|ORCLOSE)) < 0) 264 sysfatal("opentemp %s: %r", tmpnam); 265 if(statustime) 266 print("# %T reading scores into %s\n", tmpnam); 267 if(verbose) 268 fprint(2, "read scores into %s...\n", tmpnam); 269 270 Binit(&bscores, fd, OWRITE); 271 for(i=0; i<fsys->nblock; i++){ 272 if(vtfileblockscore(vfile, i, score) < 0) 273 sysfatal("vtfileblockhash %d: %r", i); 274 if(Bwrite(&bscores, score, VtScoreSize) != VtScoreSize) 275 sysfatal("Bwrite: %r"); 276 } 277 Bterm(&bscores); 278 vtfileunlock(vfile); 279 280 /* 281 * prep scores for rereading 282 */ 283 seek(fd, 0, 0); 284 Binit(&bscores, fd, OREAD); 285 } 286 287 /* 288 * start the main processes 289 */ 290 if(statustime) 291 print("# %T starting procs\n"); 292 qcmp = qalloc(); 293 qventi = qalloc(); 294 295 rlock(&endlk); 296 proccreate(fsysproc, nil, STACK); 297 rlock(&endlk); 298 proccreate(ventiproc, nil, STACK); 299 rlock(&endlk); 300 proccreate(cmpproc, nil, STACK); 301 if(statustime){ 302 rlock(&endlk); 303 proccreate(statusproc, nil, STACK); 304 } 305 306 /* 307 * wait for processes to finish 308 */ 309 wlock(&endlk); 310 311 qfree(qcmp); 312 qfree(qventi); 313 314 if(statustime) 315 print("# %T procs exited: %d of %d %d-byte blocks changed, " 316 "%d read, %d written, %d skipped, %d copied\n", 317 nchange, fsys->nblock, fsys->blocksize, 318 vtcachenread, vtcachenwrite, nskip, vtcachencopy); 319 320 /* 321 * prepare root block 322 */ 323 if(incremental) 324 vtfileclose(vscores); 325 vtfilelock(vfile, -1); 326 if(vtfileflush(vfile) < 0) 327 sysfatal("vtfileflush: %r"); 328 if(vtfilegetentry(vfile, &e) < 0) 329 sysfatal("vtfilegetentry: %r"); 330 vtfileunlock(vfile); 331 vtfileclose(vfile); 332 333 b = vtcacheallocblock(c, VtDirType, VtEntrySize); 334 if(b == nil) 335 sysfatal("vtcacheallocblock: %r"); 336 vtentrypack(&e, b->data, 0); 337 if(vtblockwrite(b) < 0) 338 sysfatal("vtblockwrite: %r"); 339 340 memset(&root, 0, sizeof root); 341 strecpy(root.name, root.name+sizeof root.name, argv[0]); 342 strecpy(root.type, root.type+sizeof root.type, fsys->type); 343 memmove(root.score, b->score, VtScoreSize); 344 root.blocksize = fsys->blocksize; 345 memmove(root.prev, prev, VtScoreSize); 346 vtblockput(b); 347 348 b = vtcacheallocblock(c, VtRootType, VtRootSize); 349 if(b == nil) 350 sysfatal("vtcacheallocblock: %r"); 351 vtrootpack(&root, b->data); 352 if(vtblockwrite(b) < 0) 353 sysfatal("vtblockwrite: %r"); 354 355 tm = *localtime(time(0)); 356 tm.year += 1900; 357 tm.mon++; 358 if(mountplace == nil) 359 mountplace = guessmountplace(argv[0]); 360 print("mount /%s/%d/%02d%02d%s %s:%V %d/%02d%02d/%02d%02d\n", 361 mountname, tm.year, tm.mon, tm.mday, 362 mountplace, 363 root.type, b->score, 364 tm.year, tm.mon, tm.mday, tm.hour, tm.min); 365 print("# %T %s %s:%V\n", argv[0], root.type, b->score); 366 if(statustime) 367 print("# %T venti sync\n"); 368 vtblockput(b); 369 if(vtsync(z) < 0) 370 sysfatal("vtsync: %r"); 371 if(statustime) 372 print("# %T synced\n"); 373 374 fsysclose(fsys); 375 diskclose(disk); 376 vtcachefree(zcache); 377 378 // Vtgoodbye hangs on Linux - not sure why. 379 // Probably vtfcallrpc doesn't quite get the 380 // remote hangup right. Also probably related 381 // to the vtrecvproc problem below. 382 // vtgoodbye(z); 383 384 // Leak here, because I can't seem to make 385 // the vtrecvproc exit. 386 // vtfreeconn(z); 387 388 free(tmpnam); 389 z = nil; 390 zcache = nil; 391 fsys = nil; 392 disk = nil; 393 threadexitsall(nil); 394 } 395 396 void 397 fsysproc(void *dummy) 398 { 399 u32int i; 400 Block *db; 401 402 USED(dummy); 403 404 for(i=0; i<fsys->nblock; i++){ 405 fsscanblock = i; 406 if((db = fsysreadblock(fsys, i)) != nil) 407 qwrite(qcmp, db, i); 408 } 409 fsscanblock = i; 410 qclose(qcmp); 411 412 if(statustime) 413 print("# %T fsys proc exiting\n"); 414 runlock(&endlk); 415 } 416 417 void 418 cmpproc(void *dummy) 419 { 420 uchar *data; 421 Block *db; 422 u32int bno, bsize; 423 uchar score[VtScoreSize]; 424 uchar score1[VtScoreSize]; 425 426 USED(dummy); 427 428 if(incremental) 429 vtfilelock(vscores, VtOREAD); 430 bsize = fsys->blocksize; 431 while((db = qread(qcmp, &bno)) != nil){ 432 data = db->data; 433 sha1(data, vtzerotruncate(VtDataType, data, bsize), score, nil); 434 if(incremental){ 435 if(vtfileblockscore(vscores, bno, score1) < 0) 436 sysfatal("cmpproc vtfileblockscore %d: %r", bno); 437 }else{ 438 if(Bseek(&bscores, (vlong)bno*VtScoreSize, 0) < 0) 439 sysfatal("cmpproc Bseek: %r"); 440 if(Bread(&bscores, score1, VtScoreSize) != VtScoreSize) 441 sysfatal("cmpproc Bread: %r"); 442 } 443 if(memcmp(score, score1, VtScoreSize) != 0){ 444 nchange++; 445 if(verbose) 446 print("# block %ud: old %V new %V\n", bno, score1, score); 447 qwrite(qventi, db, bno); 448 }else 449 blockput(db); 450 } 451 qclose(qventi); 452 if(incremental) 453 vtfileunlock(vscores); 454 if(statustime) 455 print("# %T cmp proc exiting\n"); 456 runlock(&endlk); 457 } 458 459 void 460 writethread(void *v) 461 { 462 WriteReq wr; 463 char err[ERRMAX]; 464 465 USED(v); 466 467 while(recv(writechan, &wr) == 1){ 468 nrecv++; 469 if(wr.p == nil) 470 break; 471 472 if(fastwrites && vtread(z, wr.score, wr.type, nil, 0) < 0){ 473 rerrstr(err, sizeof err); 474 if(strstr(err, "read too small")){ /* already exists */ 475 nskip++; 476 packetfree(wr.p); 477 continue; 478 } 479 } 480 if(vtwritepacket(z, wr.score, wr.type, wr.p) < 0) 481 sysfatal("vtwritepacket: %r"); 482 packetfree(wr.p); 483 } 484 } 485 486 int 487 myvtwrite(VtConn *z, uchar score[VtScoreSize], uint type, uchar *buf, int n) 488 { 489 WriteReq wr; 490 491 if(nwritethread == 0){ 492 n = vtwrite(z, score, type, buf, n); 493 if(n < 0) 494 sysfatal("vtwrite: %r"); 495 return n; 496 } 497 498 wr.p = packetalloc(); 499 packetappend(wr.p, buf, n); 500 packetsha1(wr.p, score); 501 memmove(wr.score, score, VtScoreSize); 502 wr.type = type; 503 nsend++; 504 send(writechan, &wr); 505 return 0; 506 } 507 508 void 509 ventiproc(void *dummy) 510 { 511 int i; 512 Block *db; 513 u32int bno; 514 u64int bsize; 515 516 USED(dummy); 517 518 proccreate(vtsendproc, z, STACK); 519 proccreate(vtrecvproc, z, STACK); 520 521 writechan = chancreate(sizeof(WriteReq), 0); 522 for(i=0; i<nwritethread; i++) 523 threadcreate(writethread, nil, STACK); 524 vtcachesetwrite(zcache, myvtwrite); 525 526 bsize = fsys->blocksize; 527 vtfilelock(vfile, -1); 528 while((db = qread(qventi, &bno)) != nil){ 529 if(nop){ 530 blockput(db); 531 continue; 532 } 533 if(vtfilewrite(vfile, db->data, bsize, bno*bsize) != bsize) 534 sysfatal("ventiproc vtfilewrite: %r"); 535 if(vtfileflushbefore(vfile, (bno+1)*bsize) < 0) 536 sysfatal("ventiproc vtfileflushbefore: %r"); 537 blockput(db); 538 } 539 vtfileunlock(vfile); 540 vtcachesetwrite(zcache, nil); 541 for(i=0; i<nwritethread; i++) 542 send(writechan, nil); 543 chanfree(writechan); 544 if(statustime) 545 print("# %T venti proc exiting - nsend %d nrecv %d\n", nsend, nrecv); 546 runlock(&endlk); 547 } 548 549 static int 550 percent(u32int a, u32int b) 551 { 552 return (vlong)a*100/b; 553 } 554 555 void 556 statusproc(void *dummy) 557 { 558 int n; 559 USED(dummy); 560 561 for(n=0;;n++){ 562 sleep(1000); 563 if(qcmp->closed && qcmp->nel==0 && qventi->closed && qventi->nel==0) 564 break; 565 if(n < statustime) 566 continue; 567 n = 0; 568 print("# %T fsscan=%d%% cmpq=%d%% ventiq=%d%%\n", 569 percent(fsscanblock, fsys->nblock), 570 percent(qcmp->nel, MAXQ), 571 percent(qventi->nel, MAXQ)); 572 } 573 print("# %T status proc exiting\n"); 574 runlock(&endlk); 575 } 576 577 int 578 timefmt(Fmt *fmt) 579 { 580 vlong ns; 581 Tm tm; 582 ns = nsec(); 583 tm = *localtime(time(0)); 584 return fmtprint(fmt, "%04d/%02d%02d %02d:%02d:%02d.%03d", 585 tm.year+1900, tm.mon+1, tm.mday, tm.hour, tm.min, tm.sec, 586 (int)(ns%1000000000)/1000000); 587 } 588 589 char* 590 guessmountplace(char *dev) 591 { 592 char *cmd, *q; 593 int p[2], fd[3], n; 594 char buf[100]; 595 596 if(pipe(p) < 0) 597 sysfatal("pipe: %r"); 598 599 fd[0] = -1; 600 fd[1] = p[1]; 601 fd[2] = -1; 602 cmd = smprint("mount | awk 'BEGIN{v=\"%s\"; u=v; sub(/rdisk/, \"disk\", u);} ($1==v||$1==u) && $2 == \"on\" {print $3}'", dev); 603 if(threadspawnl(fd, "sh", "sh", "-c", cmd, nil) < 0) 604 sysfatal("exec mount|awk (to find mtpt of %s): %r", dev); 605 /* threadspawnl closed p[1] */ 606 free(cmd); 607 n = readn(p[0], buf, sizeof buf-1); 608 close(p[0]); 609 if(n <= 0) 610 return dev; 611 buf[n] = 0; 612 if((q = strchr(buf, '\n')) == nil) 613 return dev; 614 *q = 0; 615 q = buf+strlen(buf); 616 if(q>buf && *(q-1) == '/') 617 *--q = 0; 618 return strdup(buf); 619 }