plan9port

fork of plan9port with libvec, libstr and libsdb
Log | Files | Refs | README | LICENSE

arena.c (20303B)


      1 #include "stdinc.h"
      2 #include "dat.h"
      3 #include "fns.h"
      4 
      5 typedef struct ASum ASum;
      6 
      7 struct ASum
      8 {
      9 	Arena	*arena;
     10 	ASum	*next;
     11 };
     12 
     13 static void	sealarena(Arena *arena);
     14 static int	okarena(Arena *arena);
     15 static int	loadarena(Arena *arena);
     16 static CIBlock	*getcib(Arena *arena, int clump, int writing, CIBlock *rock);
     17 static void	putcib(Arena *arena, CIBlock *cib);
     18 static void	sumproc(void *);
     19 static void loadcig(Arena *arena);
     20 
     21 static QLock	sumlock;
     22 static Rendez	sumwait;
     23 static ASum	*sumq;
     24 static ASum	*sumqtail;
     25 static uchar zero[8192];
     26 
     27 int	arenasumsleeptime;
     28 
     29 int
     30 initarenasum(void)
     31 {
     32 	needzeroscore();  /* OS X */
     33 
     34 	qlock(&sumlock);
     35 	sumwait.l = &sumlock;
     36 	qunlock(&sumlock);
     37 
     38 	if(vtproc(sumproc, nil) < 0){
     39 		seterr(EOk, "can't start arena checksum slave: %r");
     40 		return -1;
     41 	}
     42 	return 0;
     43 }
     44 
     45 /*
     46  * make an Arena, and initialize it based upon the disk header and trailer.
     47  */
     48 Arena*
     49 initarena(Part *part, u64int base, u64int size, u32int blocksize)
     50 {
     51 	Arena *arena;
     52 
     53 	arena = MKZ(Arena);
     54 	arena->part = part;
     55 	arena->blocksize = blocksize;
     56 	arena->clumpmax = arena->blocksize / ClumpInfoSize;
     57 	arena->base = base + blocksize;
     58 	arena->size = size - 2 * blocksize;
     59 
     60 	if(loadarena(arena) < 0){
     61 		seterr(ECorrupt, "arena header or trailer corrupted");
     62 		freearena(arena);
     63 		return nil;
     64 	}
     65 	if(okarena(arena) < 0){
     66 		freearena(arena);
     67 		return nil;
     68 	}
     69 
     70 	if(arena->diskstats.sealed && scorecmp(zeroscore, arena->score)==0)
     71 		sealarena(arena);
     72 
     73 	return arena;
     74 }
     75 
     76 void
     77 freearena(Arena *arena)
     78 {
     79 	if(arena == nil)
     80 		return;
     81 	free(arena);
     82 }
     83 
     84 Arena*
     85 newarena(Part *part, u32int vers, char *name, u64int base, u64int size, u32int blocksize)
     86 {
     87 	int bsize;
     88 	Arena *arena;
     89 
     90 	if(nameok(name) < 0){
     91 		seterr(EOk, "illegal arena name", name);
     92 		return nil;
     93 	}
     94 	arena = MKZ(Arena);
     95 	arena->part = part;
     96 	arena->version = vers;
     97 	if(vers == ArenaVersion4)
     98 		arena->clumpmagic = _ClumpMagic;
     99 	else{
    100 		do
    101 			arena->clumpmagic = fastrand();
    102 		while(arena->clumpmagic==_ClumpMagic || arena->clumpmagic==0);
    103 	}
    104 	arena->blocksize = blocksize;
    105 	arena->clumpmax = arena->blocksize / ClumpInfoSize;
    106 	arena->base = base + blocksize;
    107 	arena->size = size - 2 * blocksize;
    108 
    109 	namecp(arena->name, name);
    110 
    111 	bsize = sizeof zero;
    112 	if(bsize > arena->blocksize)
    113 		bsize = arena->blocksize;
    114 
    115 	if(wbarena(arena)<0 || wbarenahead(arena)<0
    116 	|| writepart(arena->part, arena->base, zero, bsize)<0){
    117 		freearena(arena);
    118 		return nil;
    119 	}
    120 
    121 	return arena;
    122 }
    123 
    124 int
    125 readclumpinfo(Arena *arena, int clump, ClumpInfo *ci)
    126 {
    127 	CIBlock *cib, r;
    128 
    129 	cib = getcib(arena, clump, 0, &r);
    130 	if(cib == nil)
    131 		return -1;
    132 	unpackclumpinfo(ci, &cib->data->data[cib->offset]);
    133 	putcib(arena, cib);
    134 	return 0;
    135 }
    136 
    137 int
    138 readclumpinfos(Arena *arena, int clump, ClumpInfo *cis, int n)
    139 {
    140 	CIBlock *cib, r;
    141 	int i;
    142 
    143 	/*
    144 	 * because the clump blocks are laid out
    145 	 * in reverse order at the end of the arena,
    146 	 * it can be a few percent faster to read
    147 	 * the clumps backwards, which reads the
    148 	 * disk blocks forwards.
    149 	 */
    150 	for(i = n-1; i >= 0; i--){
    151 		cib = getcib(arena, clump + i, 0, &r);
    152 		if(cib == nil){
    153 			n = i;
    154 			continue;
    155 		}
    156 		unpackclumpinfo(&cis[i], &cib->data->data[cib->offset]);
    157 		putcib(arena, cib);
    158 	}
    159 	return n;
    160 }
    161 
    162 /*
    163  * write directory information for one clump
    164  * must be called the arena locked
    165  */
    166 int
    167 writeclumpinfo(Arena *arena, int clump, ClumpInfo *ci)
    168 {
    169 	CIBlock *cib, r;
    170 
    171 	cib = getcib(arena, clump, 1, &r);
    172 	if(cib == nil)
    173 		return -1;
    174 	dirtydblock(cib->data, DirtyArenaCib);
    175 	packclumpinfo(ci, &cib->data->data[cib->offset]);
    176 	putcib(arena, cib);
    177 	return 0;
    178 }
    179 
    180 u64int
    181 arenadirsize(Arena *arena, u32int clumps)
    182 {
    183 	return ((clumps / arena->clumpmax) + 1) * arena->blocksize;
    184 }
    185 
    186 /*
    187  * read a clump of data
    188  * n is a hint of the size of the data, not including the header
    189  * make sure it won't run off the end, then return the number of bytes actually read
    190  */
    191 u32int
    192 readarena(Arena *arena, u64int aa, u8int *buf, long n)
    193 {
    194 	DBlock *b;
    195 	u64int a;
    196 	u32int blocksize, off, m;
    197 	long nn;
    198 
    199 	if(n == 0)
    200 		return -1;
    201 
    202 	qlock(&arena->lock);
    203 	a = arena->size - arenadirsize(arena, arena->memstats.clumps);
    204 	qunlock(&arena->lock);
    205 	if(aa >= a){
    206 		seterr(EOk, "reading beyond arena clump storage: clumps=%d aa=%lld a=%lld -1 clumps=%lld\n", arena->memstats.clumps, aa, a, arena->size - arenadirsize(arena, arena->memstats.clumps - 1));
    207 		return -1;
    208 	}
    209 	if(aa + n > a)
    210 		n = a - aa;
    211 
    212 	blocksize = arena->blocksize;
    213 	a = arena->base + aa;
    214 	off = a & (blocksize - 1);
    215 	a -= off;
    216 	nn = 0;
    217 	for(;;){
    218 		b = getdblock(arena->part, a, OREAD);
    219 		if(b == nil)
    220 			return -1;
    221 		m = blocksize - off;
    222 		if(m > n - nn)
    223 			m = n - nn;
    224 		memmove(&buf[nn], &b->data[off], m);
    225 		putdblock(b);
    226 		nn += m;
    227 		if(nn == n)
    228 			break;
    229 		off = 0;
    230 		a += blocksize;
    231 	}
    232 	return n;
    233 }
    234 
    235 /*
    236  * write some data to the clump section at a given offset
    237  * used to fix up corrupted arenas.
    238  */
    239 u32int
    240 writearena(Arena *arena, u64int aa, u8int *clbuf, u32int n)
    241 {
    242 	DBlock *b;
    243 	u64int a;
    244 	u32int blocksize, off, m;
    245 	long nn;
    246 	int ok;
    247 
    248 	if(n == 0)
    249 		return -1;
    250 
    251 	qlock(&arena->lock);
    252 	a = arena->size - arenadirsize(arena, arena->memstats.clumps);
    253 	if(aa >= a || aa + n > a){
    254 		qunlock(&arena->lock);
    255 		seterr(EOk, "writing beyond arena clump storage");
    256 		return -1;
    257 	}
    258 
    259 	blocksize = arena->blocksize;
    260 	a = arena->base + aa;
    261 	off = a & (blocksize - 1);
    262 	a -= off;
    263 	nn = 0;
    264 	for(;;){
    265 		b = getdblock(arena->part, a, off != 0 || off + n < blocksize ? ORDWR : OWRITE);
    266 		if(b == nil){
    267 			qunlock(&arena->lock);
    268 			return -1;
    269 		}
    270 		dirtydblock(b, DirtyArena);
    271 		m = blocksize - off;
    272 		if(m > n - nn)
    273 			m = n - nn;
    274 		memmove(&b->data[off], &clbuf[nn], m);
    275 		ok = 0;
    276 		putdblock(b);
    277 		if(ok < 0){
    278 			qunlock(&arena->lock);
    279 			return -1;
    280 		}
    281 		nn += m;
    282 		if(nn == n)
    283 			break;
    284 		off = 0;
    285 		a += blocksize;
    286 	}
    287 	qunlock(&arena->lock);
    288 	return n;
    289 }
    290 
    291 /*
    292  * allocate space for the clump and write it,
    293  * updating the arena directory
    294 ZZZ question: should this distinguish between an arena
    295 filling up and real errors writing the clump?
    296  */
    297 u64int
    298 writeaclump(Arena *arena, Clump *c, u8int *clbuf)
    299 {
    300 	DBlock *b;
    301 	u64int a, aa;
    302 	u32int clump, n, nn, m, off, blocksize;
    303 	int ok;
    304 
    305 	n = c->info.size + ClumpSize + U32Size;
    306 	qlock(&arena->lock);
    307 	aa = arena->memstats.used;
    308 	if(arena->memstats.sealed
    309 	|| aa + n + U32Size + arenadirsize(arena, arena->memstats.clumps + 1) > arena->size){
    310 		if(!arena->memstats.sealed){
    311 			logerr(EOk, "seal memstats %s", arena->name);
    312 			arena->memstats.sealed = 1;
    313 			wbarena(arena);
    314 		}
    315 		qunlock(&arena->lock);
    316 		return TWID64;
    317 	}
    318 	if(packclump(c, &clbuf[0], arena->clumpmagic) < 0){
    319 		qunlock(&arena->lock);
    320 		return TWID64;
    321 	}
    322 
    323 	/*
    324 	 * write the data out one block at a time
    325 	 */
    326 	blocksize = arena->blocksize;
    327 	a = arena->base + aa;
    328 	off = a & (blocksize - 1);
    329 	a -= off;
    330 	nn = 0;
    331 	for(;;){
    332 		b = getdblock(arena->part, a, off != 0 ? ORDWR : OWRITE);
    333 		if(b == nil){
    334 			qunlock(&arena->lock);
    335 			return TWID64;
    336 		}
    337 		dirtydblock(b, DirtyArena);
    338 		m = blocksize - off;
    339 		if(m > n - nn)
    340 			m = n - nn;
    341 		memmove(&b->data[off], &clbuf[nn], m);
    342 		ok = 0;
    343 		putdblock(b);
    344 		if(ok < 0){
    345 			qunlock(&arena->lock);
    346 			return TWID64;
    347 		}
    348 		nn += m;
    349 		if(nn == n)
    350 			break;
    351 		off = 0;
    352 		a += blocksize;
    353 	}
    354 
    355 	arena->memstats.used += c->info.size + ClumpSize;
    356 	arena->memstats.uncsize += c->info.uncsize;
    357 	if(c->info.size < c->info.uncsize)
    358 		arena->memstats.cclumps++;
    359 
    360 	clump = arena->memstats.clumps;
    361 	if(clump % ArenaCIGSize == 0){
    362 		if(arena->cig == nil){
    363 			loadcig(arena);
    364 			if(arena->cig == nil)
    365 				goto NoCIG;
    366 		}
    367 		/* add aa as start of next cig */
    368 		if(clump/ArenaCIGSize != arena->ncig){
    369 			fprint(2, "bad arena cig computation %s: writing clump %d but %d cigs\n",
    370 				arena->name, clump, arena->ncig);
    371 			arena->ncig = -1;
    372 			vtfree(arena->cig);
    373 			arena->cig = nil;
    374 			goto NoCIG;
    375 		}
    376 		arena->cig = vtrealloc(arena->cig, (arena->ncig+1)*sizeof arena->cig[0]);
    377 		arena->cig[arena->ncig++].offset = aa;
    378 	}
    379 NoCIG:
    380 	arena->memstats.clumps++;
    381 
    382 	if(arena->memstats.clumps == 0)
    383 		sysfatal("clumps wrapped");
    384 	arena->wtime = now();
    385 	if(arena->ctime == 0)
    386 		arena->ctime = arena->wtime;
    387 
    388 	writeclumpinfo(arena, clump, &c->info);
    389 	wbarena(arena);
    390 
    391 	qunlock(&arena->lock);
    392 
    393 	return aa;
    394 }
    395 
    396 int
    397 atailcmp(ATailStats *a, ATailStats *b)
    398 {
    399 	/* good test */
    400 	if(a->used < b->used)
    401 		return -1;
    402 	if(a->used > b->used)
    403 		return 1;
    404 
    405 	/* suspect tests - why order this way? (no one cares) */
    406 	if(a->clumps < b->clumps)
    407 		return -1;
    408 	if(a->clumps > b->clumps)
    409 		return 1;
    410 	if(a->cclumps < b->cclumps)
    411 		return -1;
    412 	if(a->cclumps > b->cclumps)
    413 		return 1;
    414 	if(a->uncsize < b->uncsize)
    415 		return -1;
    416 	if(a->uncsize > b->uncsize)
    417 		return 1;
    418 	if(a->sealed < b->sealed)
    419 		return -1;
    420 	if(a->sealed > b->sealed)
    421 		return 1;
    422 
    423 	/* everything matches */
    424 	return 0;
    425 }
    426 
    427 void
    428 setatailstate(AState *as)
    429 {
    430 	int i, j, osealed;
    431 	Arena *a;
    432 	Index *ix;
    433 
    434 	trace(0, "setatailstate %s 0x%llux clumps %d", as->arena->name, as->aa, as->stats.clumps);
    435 
    436 	/*
    437 	 * Look up as->arena to find index.
    438 	 */
    439 	needmainindex();	/* OS X linker */
    440 	ix = mainindex;
    441 	for(i=0; i<ix->narenas; i++)
    442 		if(ix->arenas[i] == as->arena)
    443 			break;
    444 	if(i==ix->narenas || as->aa < ix->amap[i].start || as->aa >= ix->amap[i].stop || as->arena != ix->arenas[i]){
    445 		fprint(2, "funny settailstate 0x%llux\n", as->aa);
    446 		return;
    447 	}
    448 
    449 	for(j=0; j<=i; j++){
    450 		a = ix->arenas[j];
    451 		if(atailcmp(&a->diskstats, &a->memstats) == 0)
    452 			continue;
    453 		qlock(&a->lock);
    454 		osealed = a->diskstats.sealed;
    455 		if(j == i)
    456 			a->diskstats = as->stats;
    457 		else
    458 			a->diskstats = a->memstats;
    459 		wbarena(a);
    460 		if(a->diskstats.sealed != osealed && !a->inqueue)
    461 			sealarena(a);
    462 		qunlock(&a->lock);
    463 	}
    464 }
    465 
    466 /*
    467  * once sealed, an arena never has any data added to it.
    468  * it should only be changed to fix errors.
    469  * this also syncs the clump directory.
    470  */
    471 static void
    472 sealarena(Arena *arena)
    473 {
    474 	arena->inqueue = 1;
    475 	backsumarena(arena);
    476 }
    477 
    478 void
    479 backsumarena(Arena *arena)
    480 {
    481 	ASum *as;
    482 
    483 	as = MK(ASum);
    484 	if(as == nil)
    485 		return;
    486 	qlock(&sumlock);
    487 	as->arena = arena;
    488 	as->next = nil;
    489 	if(sumq)
    490 		sumqtail->next = as;
    491 	else
    492 		sumq = as;
    493 	sumqtail = as;
    494 	/*
    495 	 * Might get here while initializing arenas,
    496 	 * before initarenasum has been called.
    497 	 */
    498 	if(sumwait.l)
    499 		rwakeup(&sumwait);
    500 	qunlock(&sumlock);
    501 }
    502 
    503 static void
    504 sumproc(void *unused)
    505 {
    506 	ASum *as;
    507 	Arena *arena;
    508 
    509 	USED(unused);
    510 
    511 	for(;;){
    512 		qlock(&sumlock);
    513 		while(sumq == nil)
    514 			rsleep(&sumwait);
    515 		as = sumq;
    516 		sumq = as->next;
    517 		qunlock(&sumlock);
    518 		arena = as->arena;
    519 		free(as);
    520 		sumarena(arena);
    521 	}
    522 }
    523 
    524 void
    525 sumarena(Arena *arena)
    526 {
    527 	ZBlock *b;
    528 	DigestState s;
    529 	u64int a, e;
    530 	u32int bs;
    531 	int t;
    532 	u8int score[VtScoreSize];
    533 
    534 	bs = MaxIoSize;
    535 	if(bs < arena->blocksize)
    536 		bs = arena->blocksize;
    537 
    538 	/*
    539 	 * read & sum all blocks except the last one
    540 	 */
    541 	flushdcache();
    542 	memset(&s, 0, sizeof s);
    543 	b = alloczblock(bs, 0, arena->part->blocksize);
    544 	e = arena->base + arena->size;
    545 	for(a = arena->base - arena->blocksize; a + arena->blocksize <= e; a += bs){
    546 		disksched();
    547 		while((t=arenasumsleeptime) == SleepForever){
    548 			sleep(1000);
    549 			disksched();
    550 		}
    551 		sleep(t);
    552 		if(a + bs > e)
    553 			bs = arena->blocksize;
    554 		if(readpart(arena->part, a, b->data, bs) < 0)
    555 			goto ReadErr;
    556 		addstat(StatSumRead, 1);
    557 		addstat(StatSumReadBytes, bs);
    558 		sha1(b->data, bs, nil, &s);
    559 	}
    560 
    561 	/*
    562 	 * the last one is special, since it may already have the checksum included
    563 	 */
    564 	bs = arena->blocksize;
    565 	if(readpart(arena->part, e, b->data, bs) < 0){
    566 ReadErr:
    567 		logerr(EOk, "sumarena can't sum %s, read at %lld failed: %r", arena->name, a);
    568 		freezblock(b);
    569 		return;
    570 	}
    571 	addstat(StatSumRead, 1);
    572 	addstat(StatSumReadBytes, bs);
    573 
    574 	sha1(b->data, bs-VtScoreSize, nil, &s);
    575 	sha1(zeroscore, VtScoreSize, nil, &s);
    576 	sha1(nil, 0, score, &s);
    577 
    578 	/*
    579 	 * check for no checksum or the same
    580 	 */
    581 	if(scorecmp(score, &b->data[bs - VtScoreSize]) != 0
    582 	&& scorecmp(zeroscore, &b->data[bs - VtScoreSize]) != 0)
    583 		logerr(EOk, "overwriting mismatched checksums for arena=%s, found=%V calculated=%V",
    584 			arena->name, &b->data[bs - VtScoreSize], score);
    585 	freezblock(b);
    586 
    587 	qlock(&arena->lock);
    588 	scorecp(arena->score, score);
    589 	wbarena(arena);
    590 	qunlock(&arena->lock);
    591 }
    592 
    593 /*
    594  * write the arena trailer block to the partition
    595  */
    596 int
    597 wbarena(Arena *arena)
    598 {
    599 	DBlock *b;
    600 	int bad;
    601 
    602 	if((b = getdblock(arena->part, arena->base + arena->size, OWRITE)) == nil){
    603 		logerr(EAdmin, "can't write arena trailer: %r");
    604 		return -1;
    605 	}
    606 	dirtydblock(b, DirtyArenaTrailer);
    607 	bad = okarena(arena)<0 || packarena(arena, b->data)<0;
    608 	scorecp(b->data + arena->blocksize - VtScoreSize, arena->score);
    609 	putdblock(b);
    610 	if(bad)
    611 		return -1;
    612 	return 0;
    613 }
    614 
    615 int
    616 wbarenahead(Arena *arena)
    617 {
    618 	ZBlock *b;
    619 	ArenaHead head;
    620 	int bad;
    621 
    622 	namecp(head.name, arena->name);
    623 	head.version = arena->version;
    624 	head.size = arena->size + 2 * arena->blocksize;
    625 	head.blocksize = arena->blocksize;
    626 	head.clumpmagic = arena->clumpmagic;
    627 	b = alloczblock(arena->blocksize, 1, arena->part->blocksize);
    628 	if(b == nil){
    629 		logerr(EAdmin, "can't write arena header: %r");
    630 /* ZZZ add error message? */
    631 		return -1;
    632 	}
    633 	/*
    634 	 * this writepart is okay because it only happens
    635 	 * during initialization.
    636 	 */
    637 	bad = packarenahead(&head, b->data)<0 ||
    638 	      writepart(arena->part, arena->base - arena->blocksize, b->data, arena->blocksize)<0 ||
    639 	      flushpart(arena->part)<0;
    640 	freezblock(b);
    641 	if(bad)
    642 		return -1;
    643 	return 0;
    644 }
    645 
    646 /*
    647  * read the arena header and trailer blocks from disk
    648  */
    649 static int
    650 loadarena(Arena *arena)
    651 {
    652 	ArenaHead head;
    653 	ZBlock *b;
    654 
    655 	b = alloczblock(arena->blocksize, 0, arena->part->blocksize);
    656 	if(b == nil)
    657 		return -1;
    658 	if(readpart(arena->part, arena->base + arena->size, b->data, arena->blocksize) < 0){
    659 		freezblock(b);
    660 		return -1;
    661 	}
    662 	if(unpackarena(arena, b->data) < 0){
    663 		freezblock(b);
    664 		return -1;
    665 	}
    666 	if(arena->version != ArenaVersion4 && arena->version != ArenaVersion5){
    667 		seterr(EAdmin, "unknown arena version %d", arena->version);
    668 		freezblock(b);
    669 		return -1;
    670 	}
    671 	scorecp(arena->score, &b->data[arena->blocksize - VtScoreSize]);
    672 
    673 	if(readpart(arena->part, arena->base - arena->blocksize, b->data, arena->blocksize) < 0){
    674 		logerr(EAdmin, "can't read arena header: %r");
    675 		freezblock(b);
    676 		return 0;
    677 	}
    678 	if(unpackarenahead(&head, b->data) < 0)
    679 		logerr(ECorrupt, "corrupted arena header: %r");
    680 	else if(namecmp(arena->name, head.name)!=0
    681 	     || arena->clumpmagic != head.clumpmagic
    682 	     || arena->version != head.version
    683 	     || arena->blocksize != head.blocksize
    684 	     || arena->size + 2 * arena->blocksize != head.size){
    685 		if(namecmp(arena->name, head.name)!=0)
    686 			logerr(ECorrupt, "arena tail name %s head %s",
    687 				arena->name, head.name);
    688 		else if(arena->clumpmagic != head.clumpmagic)
    689 			logerr(ECorrupt, "arena tail clumpmagic 0x%lux head 0x%lux",
    690 				(ulong)arena->clumpmagic, (ulong)head.clumpmagic);
    691 		else if(arena->version != head.version)
    692 			logerr(ECorrupt, "arena tail version %d head version %d",
    693 				arena->version, head.version);
    694 		else if(arena->blocksize != head.blocksize)
    695 			logerr(ECorrupt, "arena tail block size %d head %d",
    696 				arena->blocksize, head.blocksize);
    697 		else if(arena->size+2*arena->blocksize != head.size)
    698 			logerr(ECorrupt, "arena tail size %lud head %lud",
    699 				(ulong)arena->size+2*arena->blocksize, head.size);
    700 		else
    701 			logerr(ECorrupt, "arena header inconsistent with arena data");
    702 	}
    703 	freezblock(b);
    704 
    705 	return 0;
    706 }
    707 
    708 static int
    709 okarena(Arena *arena)
    710 {
    711 	u64int dsize;
    712 	int ok;
    713 
    714 	ok = 0;
    715 	dsize = arenadirsize(arena, arena->diskstats.clumps);
    716 	if(arena->diskstats.used + dsize > arena->size){
    717 		seterr(ECorrupt, "arena %s used > size", arena->name);
    718 		ok = -1;
    719 	}
    720 
    721 	if(arena->diskstats.cclumps > arena->diskstats.clumps)
    722 		logerr(ECorrupt, "arena %s has more compressed clumps than total clumps", arena->name);
    723 
    724 	/*
    725 	 * This need not be true if some of the disk is corrupted.
    726 	 *
    727 	if(arena->diskstats.uncsize + arena->diskstats.clumps * ClumpSize + arena->blocksize < arena->diskstats.used)
    728 		logerr(ECorrupt, "arena %s uncompressed size inconsistent with used space %lld %d %lld", arena->name, arena->diskstats.uncsize, arena->diskstats.clumps, arena->diskstats.used);
    729 	 */
    730 
    731 	/*
    732 	 * this happens; it's harmless.
    733 	 *
    734 	if(arena->ctime > arena->wtime)
    735 		logerr(ECorrupt, "arena %s creation time after last write time", arena->name);
    736 	 */
    737 	return ok;
    738 }
    739 
    740 static CIBlock*
    741 getcib(Arena *arena, int clump, int writing, CIBlock *rock)
    742 {
    743 	int mode;
    744 	CIBlock *cib;
    745 	u32int block, off;
    746 
    747 	if(clump >= arena->memstats.clumps){
    748 		seterr(EOk, "clump directory access out of range");
    749 		return nil;
    750 	}
    751 	block = clump / arena->clumpmax;
    752 	off = (clump - block * arena->clumpmax) * ClumpInfoSize;
    753 	cib = rock;
    754 	cib->block = block;
    755 	cib->offset = off;
    756 
    757 	if(writing){
    758 		if(off == 0 && clump == arena->memstats.clumps-1)
    759 			mode = OWRITE;
    760 		else
    761 			mode = ORDWR;
    762 	}else
    763 		mode = OREAD;
    764 
    765 	cib->data = getdblock(arena->part,
    766 		arena->base + arena->size - (block + 1) * arena->blocksize, mode);
    767 	if(cib->data == nil)
    768 		return nil;
    769 	return cib;
    770 }
    771 
    772 static void
    773 putcib(Arena *arena, CIBlock *cib)
    774 {
    775 	USED(arena);
    776 
    777 	putdblock(cib->data);
    778 	cib->data = nil;
    779 }
    780 
    781 
    782 /*
    783  * For index entry readahead purposes, the arenas are
    784  * broken into smaller subpieces, called clump info groups
    785  * or cigs.  Each cig has ArenaCIGSize clumps (ArenaCIGSize
    786  * is chosen to make the index entries take up about half
    787  * a megabyte).  The index entries do not contain enough
    788  * information to determine what the clump index is for
    789  * a given address in an arena.  That info is needed both for
    790  * figuring out which clump group an address belongs to
    791  * and for prefetching a clump group's index entries from
    792  * the arena table of contents.  The first time clump groups
    793  * are accessed, we scan the entire arena table of contents
    794  * (which might be 10s of megabytes), recording the data
    795  * offset of each clump group.
    796  */
    797 
    798 /*
    799  * load clump info group information by scanning entire toc.
    800  */
    801 static void
    802 loadcig(Arena *arena)
    803 {
    804 	u32int i, j, ncig, nci;
    805 	ArenaCIG *cig;
    806 	ClumpInfo *ci;
    807 	u64int offset;
    808 	int ms;
    809 
    810 	if(arena->cig || arena->ncig < 0)
    811 		return;
    812 
    813 //	fprint(2, "loadcig %s\n", arena->name);
    814 
    815 	ncig = (arena->memstats.clumps+ArenaCIGSize-1) / ArenaCIGSize;
    816 	if(ncig == 0){
    817 		arena->cig = vtmalloc(1);
    818 		arena->ncig = 0;
    819 		return;
    820 	}
    821 
    822 	ms = msec();
    823 	cig = vtmalloc(ncig*sizeof cig[0]);
    824 	ci = vtmalloc(ArenaCIGSize*sizeof ci[0]);
    825 	offset = 0;
    826 	for(i=0; i<ncig; i++){
    827 		nci = readclumpinfos(arena, i*ArenaCIGSize, ci, ArenaCIGSize);
    828 		cig[i].offset = offset;
    829 		for(j=0; j<nci; j++)
    830 			offset += ClumpSize + ci[j].size;
    831 		if(nci < ArenaCIGSize){
    832 			if(i != ncig-1){
    833 				vtfree(ci);
    834 				vtfree(cig);
    835 				arena->ncig = -1;
    836 				fprint(2, "loadcig %s: got %ud cigs, expected %ud\n", arena->name, i+1, ncig);
    837 				goto out;
    838 			}
    839 		}
    840 	}
    841 	vtfree(ci);
    842 
    843 	arena->ncig = ncig;
    844 	arena->cig = cig;
    845 
    846 out:
    847 	ms = msec() - ms;
    848 	addstat2(StatCigLoad, 1, StatCigLoadTime, ms);
    849 }
    850 
    851 /*
    852  * convert arena address into arena group + data boundaries.
    853  */
    854 int
    855 arenatog(Arena *arena, u64int addr, u64int *gstart, u64int *glimit, int *g)
    856 {
    857 	int r, l, m;
    858 
    859 	qlock(&arena->lock);
    860 	if(arena->cig == nil)
    861 		loadcig(arena);
    862 	if(arena->cig == nil || arena->ncig == 0){
    863 		qunlock(&arena->lock);
    864 		return -1;
    865 	}
    866 
    867 	l = 1;
    868 	r = arena->ncig - 1;
    869 	while(l <= r){
    870 		m = (r + l) / 2;
    871 		if(arena->cig[m].offset <= addr)
    872 			l = m + 1;
    873 		else
    874 			r = m - 1;
    875 	}
    876 	l--;
    877 
    878 	*g = l;
    879 	*gstart = arena->cig[l].offset;
    880 	if(l+1 < arena->ncig)
    881 		*glimit = arena->cig[l+1].offset;
    882 	else
    883 		*glimit = arena->memstats.used;
    884 	qunlock(&arena->lock);
    885 	return 0;
    886 }
    887 
    888 /*
    889  * load the clump info for group g into the index entries.
    890  */
    891 int
    892 asumload(Arena *arena, int g, IEntry *entries, int nentries)
    893 {
    894 	int i, base, limit;
    895 	u64int addr;
    896 	ClumpInfo ci;
    897 	IEntry *ie;
    898 
    899 	if(nentries < ArenaCIGSize){
    900 		fprint(2, "asking for too few entries\n");
    901 		return -1;
    902 	}
    903 
    904 	qlock(&arena->lock);
    905 	if(arena->cig == nil)
    906 		loadcig(arena);
    907 	if(arena->cig == nil || arena->ncig == 0 || g >= arena->ncig){
    908 		qunlock(&arena->lock);
    909 		return -1;
    910 	}
    911 
    912 	addr = 0;
    913 	base = g*ArenaCIGSize;
    914 	limit = base + ArenaCIGSize;
    915 	if(base > arena->memstats.clumps)
    916 		base = arena->memstats.clumps;
    917 	ie = entries;
    918 	for(i=base; i<limit; i++){
    919 		if(readclumpinfo(arena, i, &ci) < 0)
    920 			break;
    921 		if(ci.type != VtCorruptType){
    922 			scorecp(ie->score, ci.score);
    923 			ie->ia.type = ci.type;
    924 			ie->ia.size = ci.uncsize;
    925 			ie->ia.blocks = (ci.size + ClumpSize + (1<<ABlockLog) - 1) >> ABlockLog;
    926 			ie->ia.addr = addr;
    927 			ie++;
    928 		}
    929 		addr += ClumpSize + ci.size;
    930 	}
    931 	qunlock(&arena->lock);
    932 	return ie - entries;
    933 }