plan9port

fork of plan9port with libvec, libstr and libsdb
Log | Files | Refs | README | LICENSE

archive.c (9999B)


      1 /*
      2  * Archiver.  In charge of sending blocks to Venti.
      3  */
      4 
      5 #include "stdinc.h"
      6 #include "dat.h"
      7 #include "fns.h"
      8 #include "error.h"
      9 
     10 #include "9.h"	/* for consPrint */
     11 
     12 #define DEBUG 0
     13 
     14 static void archThread(void*);
     15 
     16 struct Arch
     17 {
     18 	int ref;
     19 	uint blockSize;
     20 	uint diskSize;
     21 	Cache *c;
     22 	Fs *fs;
     23 	VtConn *z;
     24 
     25 	QLock lk;
     26 	Rendez starve;
     27 	Rendez die;
     28 };
     29 
     30 Arch *
     31 archInit(Cache *c, Disk *disk, Fs *fs, VtConn *z)
     32 {
     33 	Arch *a;
     34 
     35 	a = vtmallocz(sizeof(Arch));
     36 
     37 	a->c = c;
     38 	a->z = z;
     39 	a->fs = fs;
     40 	a->blockSize = diskBlockSize(disk);
     41 	a->starve.l = &a->lk;
     42 
     43 	a->ref = 2;
     44 	proccreate(archThread, a, STACK);
     45 
     46 	return a;
     47 }
     48 
     49 void
     50 archFree(Arch *a)
     51 {
     52 	/* kill slave */
     53 	qlock(&a->lk);
     54 	a->die.l = &a->lk;
     55 	rwakeup(&a->starve);
     56 	while(a->ref > 1)
     57 		rsleep(&a->die);
     58 	qunlock(&a->lk);
     59 	vtfree(a);
     60 }
     61 
     62 static int
     63 ventiSend(Arch *a, Block *b, uchar *data)
     64 {
     65 	uint n;
     66 	uchar score[VtScoreSize];
     67 
     68 	if(DEBUG > 1)
     69 		fprint(2, "ventiSend: sending %#ux %L to venti\n", b->addr, &b->l);
     70 	n = vtzerotruncate(vtType[b->l.type], data, a->blockSize);
     71 	if(DEBUG > 1)
     72 		fprint(2, "ventiSend: truncate %d to %d\n", a->blockSize, n);
     73 	if(vtwrite(a->z, score, vtType[b->l.type], data, n) < 0){
     74 		fprint(2, "ventiSend: vtwrite block %#ux failed: %r\n", b->addr);
     75 		return 0;
     76 	}
     77 	if(vtsha1check(score, data, n) < 0){
     78 		uchar score2[VtScoreSize];
     79 		vtsha1(score2, data, n);
     80 		fprint(2, "ventiSend: vtwrite block %#ux failed vtsha1check %V %V\n",
     81 			b->addr, score, score2);
     82 		return 0;
     83 	}
     84 	if(vtsync(a->z) < 0)
     85 		return 0;
     86 	return 1;
     87 }
     88 
     89 /*
     90  * parameters for recursion; there are so many,
     91  * and some only change occasionally.  this is
     92  * easier than spelling things out at each call.
     93  */
     94 typedef struct Param Param;
     95 struct Param
     96 {
     97 	/* these never change */
     98 	uint snapEpoch;	/* epoch for snapshot being archived */
     99 	uint blockSize;
    100 	Cache *c;
    101 	Arch *a;
    102 
    103 	/* changes on every call */
    104 	uint depth;
    105 
    106 	/* statistics */
    107 	uint nfixed;
    108 	uint nsend;
    109 	uint nvisit;
    110 	uint nfailsend;
    111 	uint maxdepth;
    112 	uint nreclaim;
    113 	uint nfake;
    114 	uint nreal;
    115 
    116 	/* these occasionally change (must save old values and put back) */
    117 	uint dsize;
    118 	uint psize;
    119 
    120 	/* return value; avoids using stack space */
    121 	Label l;
    122 	uchar score[VtScoreSize];
    123 };
    124 
    125 static void
    126 shaBlock(uchar score[VtScoreSize], Block *b, uchar *data, uint bsize)
    127 {
    128 	vtsha1(score, data, vtzerotruncate(vtType[b->l.type], data, bsize));
    129 }
    130 
    131 static uchar*
    132 copyBlock(Block *b, u32int blockSize)
    133 {
    134 	uchar *data;
    135 
    136 	data = vtmalloc(blockSize);
    137 	if(data == nil)
    138 		return nil;
    139 	memmove(data, b->data, blockSize);
    140 	return data;
    141 }
    142 
    143 /*
    144  * Walk over the block tree, archiving it to Venti.
    145  *
    146  * We don't archive the snapshots. Instead we zero the
    147  * entries in a temporary copy of the block and archive that.
    148  *
    149  * Return value is:
    150  *
    151  *	ArchFailure	some error occurred
    152  *	ArchSuccess	block and all children archived
    153  * 	ArchFaked	success, but block or children got copied
    154  */
    155 enum
    156 {
    157 	ArchFailure,
    158 	ArchSuccess,
    159 	ArchFaked,
    160 };
    161 static int
    162 archWalk(Param *p, u32int addr, uchar type, u32int tag)
    163 {
    164 	int ret, i, x, psize, dsize;
    165 	uchar *data, score[VtScoreSize];
    166 	Block *b;
    167 	Label l;
    168 	Entry *e;
    169 	WalkPtr w;
    170 	char err[ERRMAX];
    171 
    172 	p->nvisit++;
    173 
    174 	b = cacheLocalData(p->c, addr, type, tag, OReadWrite,0);
    175 	if(b == nil){
    176 		fprint(2, "archive(%ud, %#ux): cannot find block: %r\n", p->snapEpoch, addr);
    177 		rerrstr(err, sizeof err);
    178 		if(strcmp(err, ELabelMismatch) == 0){
    179 			/* might as well plod on so we write _something_ to Venti */
    180 			memmove(p->score, vtzeroscore, VtScoreSize);
    181 			return ArchFaked;
    182 		}
    183 		return ArchFailure;
    184 	}
    185 
    186 	if(DEBUG) fprint(2, "%*sarchive(%ud, %#ux): block label %L\n",
    187 		p->depth*2, "",  p->snapEpoch, b->addr, &b->l);
    188 	p->depth++;
    189 	if(p->depth > p->maxdepth)
    190 		p->maxdepth = p->depth;
    191 
    192 	data = b->data;
    193 	if((b->l.state&BsVenti) == 0){
    194 		initWalk(&w, b, b->l.type==BtDir ? p->dsize : p->psize);
    195 		for(i=0; nextWalk(&w, score, &type, &tag, &e); i++){
    196 			if(e){
    197 				if(!(e->flags&VtEntryActive))
    198 					continue;
    199 				if((e->snap && !e->archive)
    200 				|| (e->flags&VtEntryNoArchive)){
    201 					if(0) fprint(2, "snap; faking %#ux\n", b->addr);
    202 					if(data == b->data){
    203 						data = copyBlock(b, p->blockSize);
    204 						if(data == nil){
    205 							ret = ArchFailure;
    206 							goto Out;
    207 						}
    208 						w.data = data;
    209 					}
    210 					memmove(e->score, vtzeroscore, VtScoreSize);
    211 					e->depth = 0;
    212 					e->size = 0;
    213 					e->tag = 0;
    214 					e->flags &= ~VtEntryLocal;
    215 					entryPack(e, data, w.n-1);
    216 					continue;
    217 				}
    218 			}
    219 			addr = globalToLocal(score);
    220 			if(addr == NilBlock)
    221 				continue;
    222 			dsize = p->dsize;
    223 			psize = p->psize;
    224 			if(e){
    225 				p->dsize= e->dsize;
    226 				p->psize = e->psize;
    227 			}
    228 			qunlock(&b->lk);
    229 			x = archWalk(p, addr, type, tag);
    230 			qlock(&b->lk);
    231 			if(e){
    232 				p->dsize = dsize;
    233 				p->psize = psize;
    234 			}
    235 			while(b->iostate != BioClean && b->iostate != BioDirty)
    236 				rsleep(&b->ioready);
    237 			switch(x){
    238 			case ArchFailure:
    239 				fprint(2, "archWalk %#ux failed; ptr is in %#ux offset %d\n",
    240 					addr, b->addr, i);
    241 				ret = ArchFailure;
    242 				goto Out;
    243 			case ArchFaked:
    244 				/*
    245 				 * When we're writing the entry for an archive directory
    246 				 * (like /archive/2003/1215) then even if we've faked
    247 				 * any data, record the score unconditionally.
    248 				 * This way, we will always record the Venti score here.
    249 				 * Otherwise, temporary data or corrupted file system
    250 				 * would cause us to keep holding onto the on-disk
    251 				 * copy of the archive.
    252 				 */
    253 				if(e==nil || !e->archive)
    254 				if(data == b->data){
    255 if(0) fprint(2, "faked %#ux, faking %#ux (%V)\n", addr, b->addr, p->score);
    256 					data = copyBlock(b, p->blockSize);
    257 					if(data == nil){
    258 						ret = ArchFailure;
    259 						goto Out;
    260 					}
    261 					w.data = data;
    262 				}
    263 				/* fall through */
    264 if(0) fprint(2, "falling\n");
    265 			case ArchSuccess:
    266 				if(e){
    267 					memmove(e->score, p->score, VtScoreSize);
    268 					e->flags &= ~VtEntryLocal;
    269 					entryPack(e, data, w.n-1);
    270 				}else
    271 					memmove(data+(w.n-1)*VtScoreSize, p->score, VtScoreSize);
    272 				if(data == b->data){
    273 					blockDirty(b);
    274 					/*
    275 					 * If b is in the active tree, then we need to note that we've
    276 					 * just removed addr from the active tree (replacing it with the
    277 					 * copy we just stored to Venti).  If addr is in other snapshots,
    278 					 * this will close addr but not free it, since it has a non-empty
    279 					 * epoch range.
    280 					 *
    281 					 * If b is in the active tree but has been copied (this can happen
    282 					 * if we get killed at just the right moment), then we will
    283 					 * mistakenly leak its kids.
    284 					 *
    285 					 * The children of an archive directory (e.g., /archive/2004/0604)
    286 					 * are not treated as in the active tree.
    287 					 */
    288 					if((b->l.state&BsCopied)==0 && (e==nil || e->snap==0))
    289 						blockRemoveLink(b, addr, p->l.type, p->l.tag, 0);
    290 				}
    291 				break;
    292 			}
    293 		}
    294 
    295 		if(!ventiSend(p->a, b, data)){
    296 			p->nfailsend++;
    297 			ret = ArchFailure;
    298 			goto Out;
    299 		}
    300 		p->nsend++;
    301 		if(data != b->data)
    302 			p->nfake++;
    303 		if(data == b->data){	/* not faking it, so update state */
    304 			p->nreal++;
    305 			l = b->l;
    306 			l.state |= BsVenti;
    307 			if(!blockSetLabel(b, &l, 0)){
    308 				ret = ArchFailure;
    309 				goto Out;
    310 			}
    311 		}
    312 	}
    313 
    314 	shaBlock(p->score, b, data, p->blockSize);
    315 if(0) fprint(2, "ventisend %V %p %p %p\n", p->score, data, b->data, w.data);
    316 	ret = data!=b->data ? ArchFaked : ArchSuccess;
    317 	p->l = b->l;
    318 Out:
    319 	if(data != b->data)
    320 		vtfree(data);
    321 	p->depth--;
    322 	blockPut(b);
    323 	return ret;
    324 }
    325 
    326 static void
    327 archThread(void *v)
    328 {
    329 	Arch *a = v;
    330 	Block *b;
    331 	Param p;
    332 	Super super;
    333 	int ret;
    334 	u32int addr;
    335 	uchar rbuf[VtRootSize];
    336 	VtRoot root;
    337 
    338 	threadsetname("arch");
    339 
    340 	for(;;){
    341 		/* look for work */
    342 		wlock(&a->fs->elk);
    343 		b = superGet(a->c, &super);
    344 		if(b == nil){
    345 			wunlock(&a->fs->elk);
    346 			fprint(2, "archThread: superGet: %r\n");
    347 			sleep(60*1000);
    348 			continue;
    349 		}
    350 		addr = super.next;
    351 		if(addr != NilBlock && super.current == NilBlock){
    352 			super.current = addr;
    353 			super.next = NilBlock;
    354 			superPack(&super, b->data);
    355 			blockDirty(b);
    356 		}else
    357 			addr = super.current;
    358 		blockPut(b);
    359 		wunlock(&a->fs->elk);
    360 
    361 		if(addr == NilBlock){
    362 			/* wait for work */
    363 			qlock(&a->lk);
    364 			rsleep(&a->starve);
    365 			if(a->die.l != nil)
    366 				goto Done;
    367 			qunlock(&a->lk);
    368 			continue;
    369 		}
    370 
    371 sleep(10*1000);	/* window of opportunity to provoke races */
    372 
    373 		/* do work */
    374 		memset(&p, 0, sizeof p);
    375 		p.blockSize = a->blockSize;
    376 		p.dsize = 3*VtEntrySize;	/* root has three Entries */
    377 		p.c = a->c;
    378 		p.a = a;
    379 
    380 		ret = archWalk(&p, addr, BtDir, RootTag);
    381 		switch(ret){
    382 		default:
    383 			abort();
    384 		case ArchFailure:
    385 			fprint(2, "archiveBlock %#ux: %r\n", addr);
    386 			sleep(60*1000);
    387 			continue;
    388 		case ArchSuccess:
    389 		case ArchFaked:
    390 			break;
    391 		}
    392 
    393 		if(0) fprint(2, "archiveSnapshot 0x%#ux: maxdepth %ud nfixed %ud"
    394 			" send %ud nfailsend %ud nvisit %ud"
    395 			" nreclaim %ud nfake %ud nreal %ud\n",
    396 			addr, p.maxdepth, p.nfixed,
    397 			p.nsend, p.nfailsend, p.nvisit,
    398 			p.nreclaim, p.nfake, p.nreal);
    399 		if(0) fprint(2, "archiveBlock %V (%ud)\n", p.score, p.blockSize);
    400 
    401 		/* tie up vac root */
    402 		memset(&root, 0, sizeof root);
    403 		strecpy(root.type, root.type+sizeof root.type, "vac");
    404 		strecpy(root.name, root.name+sizeof root.name, "fossil");
    405 		memmove(root.score, p.score, VtScoreSize);
    406 		memmove(root.prev, super.last, VtScoreSize);
    407 		root.blocksize = a->blockSize;
    408 		vtrootpack(&root, rbuf);
    409 		if(vtwrite(a->z, p.score, VtRootType, rbuf, VtRootSize) < 0
    410 		|| vtsha1check(p.score, rbuf, VtRootSize) < 0){
    411 			fprint(2, "vtWriteBlock %#ux: %r\n", addr);
    412 			sleep(60*1000);
    413 			continue;
    414 		}
    415 
    416 		/* record success */
    417 		wlock(&a->fs->elk);
    418 		b = superGet(a->c, &super);
    419 		if(b == nil){
    420 			wunlock(&a->fs->elk);
    421 			fprint(2, "archThread: superGet: %r\n");
    422 			sleep(60*1000);
    423 			continue;
    424 		}
    425 		super.current = NilBlock;
    426 		memmove(super.last, p.score, VtScoreSize);
    427 		superPack(&super, b->data);
    428 		blockDirty(b);
    429 		blockPut(b);
    430 		wunlock(&a->fs->elk);
    431 
    432 		consPrint("archive vac:%V\n", p.score);
    433 	}
    434 
    435 Done:
    436 	a->ref--;
    437 	rwakeup(&a->die);
    438 	qunlock(&a->lk);
    439 }
    440 
    441 void
    442 archKick(Arch *a)
    443 {
    444 	if(a == nil){
    445 		fprint(2, "warning: archKick nil\n");
    446 		return;
    447 	}
    448 	qlock(&a->lk);
    449 	rwakeup(&a->starve);
    450 	qunlock(&a->lk);
    451 }