plan9port

fork of plan9port with libvec, libstr and libsdb
Log | Files | Refs | README | LICENSE

dcache.c (15772B)


      1 /*
      2  * Disk cache.
      3  *
      4  * Caches raw disk blocks.  Getdblock() gets a block, putdblock puts it back.
      5  * Getdblock has a mode parameter that determines i/o and access to a block:
      6  * if mode is OREAD or ORDWR, it is read from disk if not already in memory.
      7  * If mode is ORDWR or OWRITE, it is locked for exclusive use before being returned.
      8  * It is *not* marked dirty -- once changes have been made, they should be noted
      9  * by using dirtydblock() before putdblock().
     10  *
     11  * There is a global cache lock as well as a lock on each block.
     12  * Within a thread, the cache lock can be acquired while holding a block lock,
     13  * but not vice versa; and a block cannot be locked if you already hold the lock
     14  * on another block.
     15  *
     16  * The flush proc writes out dirty blocks in batches, one batch per dirty tag.
     17  * For example, the DirtyArena blocks are all written to disk before any of the
     18  * DirtyArenaCib blocks.
     19  *
     20  * This code used to be in charge of flushing the dirty index blocks out to
     21  * disk, but updating the index turned out to benefit from extra care.
     22  * Now cached index blocks are never marked dirty.  The index.c code takes
     23  * care of updating them behind our back, and uses _getdblock to update any
     24  * cached copies of the blocks as it changes them on disk.
     25  */
     26 
     27 #include "stdinc.h"
     28 #include "dat.h"
     29 #include "fns.h"
     30 
     31 typedef struct DCache	DCache;
     32 
     33 enum
     34 {
     35 	HashLog		= 9,
     36 	HashSize	= 1<<HashLog,
     37 	HashMask	= HashSize - 1,
     38 };
     39 
     40 struct DCache
     41 {
     42 	QLock		lock;
     43 	RWLock		dirtylock;		/* must be held to inspect or set b->dirty */
     44 	Rendez		full;
     45 	Round		round;
     46 	DBlock		*free;			/* list of available lumps */
     47 	u32int		now;			/* ticks for usage timestamps */
     48 	int		size;			/* max. size of any block; allocated to each block */
     49 	DBlock		**heads;		/* hash table for finding address */
     50 	int		nheap;			/* number of available victims */
     51 	DBlock		**heap;			/* heap for locating victims */
     52 	int		nblocks;		/* number of blocks allocated */
     53 	DBlock		*blocks;		/* array of block descriptors */
     54 	DBlock		**write;		/* array of block pointers to be written */
     55 	u8int		*mem;			/* memory for all block descriptors */
     56 	int		ndirty;			/* number of dirty blocks */
     57 	int		maxdirty;		/* max. number of dirty blocks */
     58 };
     59 
     60 typedef struct Ra Ra;
     61 struct Ra
     62 {
     63 	Part *part;
     64 	u64int addr;
     65 };
     66 
     67 static DCache	dcache;
     68 
     69 static int	downheap(int i, DBlock *b);
     70 static int	upheap(int i, DBlock *b);
     71 static DBlock	*bumpdblock(void);
     72 static void	delheap(DBlock *db);
     73 static void	fixheap(int i, DBlock *b);
     74 static void	flushproc(void*);
     75 static void	writeproc(void*);
     76 
     77 void
     78 initdcache(u32int mem)
     79 {
     80 	DBlock *b, *last;
     81 	u32int nblocks, blocksize;
     82 	int i;
     83 	u8int *p;
     84 
     85 	if(mem < maxblocksize * 2)
     86 		sysfatal("need at least %d bytes for the disk cache", maxblocksize * 2);
     87 	if(maxblocksize == 0)
     88 		sysfatal("no max. block size given for disk cache");
     89 	blocksize = maxblocksize;
     90 	nblocks = mem / blocksize;
     91 	dcache.full.l = &dcache.lock;
     92 	dcache.nblocks = nblocks;
     93 	dcache.maxdirty = (nblocks * 2) / 3;
     94 	trace(TraceProc, "initialize disk cache with %d blocks of %d bytes, maximum %d dirty blocks\n",
     95 			nblocks, blocksize, dcache.maxdirty);
     96 	dcache.size = blocksize;
     97 	dcache.heads = MKNZ(DBlock*, HashSize);
     98 	dcache.heap = MKNZ(DBlock*, nblocks);
     99 	dcache.blocks = MKNZ(DBlock, nblocks);
    100 	dcache.write = MKNZ(DBlock*, nblocks);
    101 	dcache.mem = MKNZ(u8int, (nblocks+1+128) * blocksize);
    102 
    103 	last = nil;
    104 	p = (u8int*)(((uintptr)dcache.mem+blocksize-1)&~(uintptr)(blocksize-1));
    105 	for(i = 0; i < nblocks; i++){
    106 		b = &dcache.blocks[i];
    107 		b->data = &p[i * blocksize];
    108 		b->heap = TWID32;
    109 		b->writedonechan = chancreate(sizeof(void*), 1);
    110 		b->next = last;
    111 		last = b;
    112 	}
    113 
    114 	dcache.free = last;
    115 	dcache.nheap = 0;
    116 	setstat(StatDcacheSize, nblocks);
    117 	initround(&dcache.round, "dcache", 120*1000);
    118 
    119 	vtproc(flushproc, nil);
    120 	vtproc(delaykickroundproc, &dcache.round);
    121 }
    122 
    123 static u32int
    124 pbhash(u64int addr)
    125 {
    126 	u32int h;
    127 
    128 #define hashit(c)	((((c) * 0x6b43a9b5) >> (32 - HashLog)) & HashMask)
    129 	h = (addr >> 32) ^ addr;
    130 	return hashit(h);
    131 }
    132 
    133 DBlock*
    134 getdblock(Part *part, u64int addr, int mode)
    135 {
    136 	DBlock *b;
    137 
    138 	b = _getdblock(part, addr, mode, 1);
    139 	if(mode == OREAD || mode == ORDWR)
    140 		addstat(StatDcacheRead, 1);
    141 	if(mode == OWRITE || mode == ORDWR)
    142 		addstat(StatDcacheWrite, 1);
    143 	return b;
    144 }
    145 
    146 DBlock*
    147 _getdblock(Part *part, u64int addr, int mode, int load)
    148 {
    149 	DBlock *b;
    150 	u32int h, size, ms;
    151 
    152 	ms = 0;
    153 	trace(TraceBlock, "getdblock enter %s 0x%llux", part->name, addr);
    154 	size = part->blocksize;
    155 	if(size > dcache.size){
    156 		seterr(EAdmin, "block size %d too big for cache with size %d", size, dcache.size);
    157 		if(load)
    158 			addstat(StatDcacheLookup, 1);
    159 		return nil;
    160 	}
    161 	h = pbhash(addr);
    162 
    163 	/*
    164 	 * look for the block in the cache
    165 	 */
    166 	qlock(&dcache.lock);
    167 again:
    168 	for(b = dcache.heads[h]; b != nil; b = b->next){
    169 		if(b->part == part && b->addr == addr){
    170 			if(load)
    171 				addstat2(StatDcacheHit, 1, StatDcacheLookup, 1);
    172 			goto found;
    173 		}
    174 	}
    175 
    176 	/*
    177 	 * missed: locate the block with the oldest second to last use.
    178 	 * remove it from the heap, and fix up the heap.
    179 	 */
    180 	if(!load){
    181 		qunlock(&dcache.lock);
    182 		return nil;
    183 	}
    184 
    185 	/*
    186 	 * Only start timer here, on cache miss - calling msec() on plain cache hits
    187 	 * makes cache hits system-call bound.
    188 	 */
    189 	ms = msec();
    190 	addstat2(StatDcacheLookup, 1, StatDcacheMiss, 1);
    191 
    192 	b = bumpdblock();
    193 	if(b == nil){
    194 		trace(TraceBlock, "all disk cache blocks in use");
    195 		addstat(StatDcacheStall, 1);
    196 		rsleep(&dcache.full);
    197 		addstat(StatDcacheStall, -1);
    198 		goto again;
    199 	}
    200 
    201 	assert(!b->dirty);
    202 
    203 	/*
    204 	 * the new block has no last use, so assume it happens sometime in the middle
    205 ZZZ this is not reasonable
    206 	 */
    207 	b->used = (b->used2 + dcache.now) / 2;
    208 
    209 	/*
    210 	 * rechain the block on the correct hash chain
    211 	 */
    212 	b->next = dcache.heads[h];
    213 	dcache.heads[h] = b;
    214 	if(b->next != nil)
    215 		b->next->prev = b;
    216 	b->prev = nil;
    217 
    218 	b->addr = addr;
    219 	b->part = part;
    220 	b->size = 0;
    221 
    222 found:
    223 	b->ref++;
    224 	b->used2 = b->used;
    225 	b->used = dcache.now++;
    226 	if(b->heap != TWID32)
    227 		fixheap(b->heap, b);
    228 
    229 	if((mode == ORDWR || mode == OWRITE) && part->writechan == nil){
    230 		trace(TraceBlock, "getdblock allocwriteproc %s", part->name);
    231 		part->writechan = chancreate(sizeof(DBlock*), dcache.nblocks);
    232 		vtproc(writeproc, part);
    233 	}
    234 	qunlock(&dcache.lock);
    235 
    236 	trace(TraceBlock, "getdblock lock");
    237 	addstat(StatDblockStall, 1);
    238 	if(mode == OREAD)
    239 		rlock(&b->lock);
    240 	else
    241 		wlock(&b->lock);
    242 	addstat(StatDblockStall, -1);
    243 	trace(TraceBlock, "getdblock locked");
    244 
    245 	if(b->size != size){
    246 		if(mode == OREAD){
    247 			addstat(StatDblockStall, 1);
    248 			runlock(&b->lock);
    249 			wlock(&b->lock);
    250 			addstat(StatDblockStall, -1);
    251 		}
    252 		if(b->size < size){
    253 			if(mode == OWRITE)
    254 				memset(&b->data[b->size], 0, size - b->size);
    255 			else{
    256 				trace(TraceBlock, "getdblock readpart %s 0x%llux", part->name, addr);
    257 				diskaccess(0);
    258 				if(readpart(part, addr + b->size, &b->data[b->size], size - b->size) < 0){
    259 					b->mode = ORDWR;	/* so putdblock wunlocks */
    260 					putdblock(b);
    261 					return nil;
    262 				}
    263 				trace(TraceBlock, "getdblock readpartdone");
    264 				addstat(StatApartRead, 1);
    265 				addstat(StatApartReadBytes, size-b->size);
    266 			}
    267 		}
    268 		b->size = size;
    269 		if(mode == OREAD){
    270 			addstat(StatDblockStall, 1);
    271 			wunlock(&b->lock);
    272 			rlock(&b->lock);
    273 			addstat(StatDblockStall, -1);
    274 		}
    275 	}
    276 
    277 	b->mode = mode;
    278 	trace(TraceBlock, "getdblock exit");
    279 	if(ms)
    280 		addstat(StatDcacheLookupTime, msec() - ms);
    281 	return b;
    282 }
    283 
    284 void
    285 putdblock(DBlock *b)
    286 {
    287 	if(b == nil)
    288 		return;
    289 
    290 	trace(TraceBlock, "putdblock %s 0x%llux", b->part->name, b->addr);
    291 
    292 	if(b->mode == OREAD)
    293 		runlock(&b->lock);
    294 	else
    295 		wunlock(&b->lock);
    296 
    297 	qlock(&dcache.lock);
    298 	if(--b->ref == 0 && !b->dirty){
    299 		if(b->heap == TWID32)
    300 			upheap(dcache.nheap++, b);
    301 		rwakeupall(&dcache.full);
    302 	}
    303 	qunlock(&dcache.lock);
    304 }
    305 
    306 void
    307 dirtydblock(DBlock *b, int dirty)
    308 {
    309 	int odirty;
    310 
    311 	trace(TraceBlock, "dirtydblock enter %s 0x%llux %d from 0x%lux",
    312 		b->part->name, b->addr, dirty, getcallerpc(&b));
    313 	assert(b->ref != 0);
    314 	assert(b->mode==ORDWR || b->mode==OWRITE);
    315 
    316 	odirty = b->dirty;
    317 	if(b->dirty)
    318 		assert(b->dirty == dirty);
    319 	else
    320 		b->dirty = dirty;
    321 
    322 	qlock(&dcache.lock);
    323 	if(!odirty){
    324 		dcache.ndirty++;
    325 		setstat(StatDcacheDirty, dcache.ndirty);
    326 		if(dcache.ndirty >= dcache.maxdirty)
    327 			kickround(&dcache.round, 0);
    328 		else
    329 			delaykickround(&dcache.round);
    330 	}
    331 	qunlock(&dcache.lock);
    332 }
    333 
    334 static void
    335 unchain(DBlock *b)
    336 {
    337 	ulong h;
    338 
    339 	/*
    340 	 * unchain the block
    341 	 */
    342 	if(b->prev == nil){
    343 		h = pbhash(b->addr);
    344 		if(dcache.heads[h] != b)
    345 			sysfatal("bad hash chains in disk cache");
    346 		dcache.heads[h] = b->next;
    347 	}else
    348 		b->prev->next = b->next;
    349 	if(b->next != nil)
    350 		b->next->prev = b->prev;
    351 }
    352 
    353 /*
    354  * remove some block from use and update the free list and counters
    355  */
    356 static DBlock*
    357 bumpdblock(void)
    358 {
    359 	DBlock *b;
    360 
    361 	trace(TraceBlock, "bumpdblock enter");
    362 	b = dcache.free;
    363 	if(b != nil){
    364 		dcache.free = b->next;
    365 		return b;
    366 	}
    367 
    368 	if(dcache.ndirty >= dcache.maxdirty)
    369 		kickdcache();
    370 
    371 	/*
    372 	 * remove blocks until we find one that is unused
    373 	 * referenced blocks are left in the heap even though
    374 	 * they can't be scavenged; this is simple a speed optimization
    375 	 */
    376 	for(;;){
    377 		if(dcache.nheap == 0){
    378 			kickdcache();
    379 			trace(TraceBlock, "bumpdblock gotnothing");
    380 			return nil;
    381 		}
    382 		b = dcache.heap[0];
    383 		delheap(b);
    384 		if(!b->ref && !b->dirty)
    385 			break;
    386 	}
    387 
    388 	trace(TraceBlock, "bumpdblock bumping %s 0x%llux", b->part->name, b->addr);
    389 
    390 	unchain(b);
    391 	return b;
    392 }
    393 
    394 void
    395 emptydcache(void)
    396 {
    397 	DBlock *b;
    398 
    399 	qlock(&dcache.lock);
    400 	while(dcache.nheap > 0){
    401 		b = dcache.heap[0];
    402 		delheap(b);
    403 		if(!b->ref && !b->dirty){
    404 			unchain(b);
    405 			b->next = dcache.free;
    406 			dcache.free = b;
    407 		}
    408 	}
    409 	qunlock(&dcache.lock);
    410 }
    411 
    412 /*
    413  * delete an arbitrary block from the heap
    414  */
    415 static void
    416 delheap(DBlock *db)
    417 {
    418 	if(db->heap == TWID32)
    419 		return;
    420 	fixheap(db->heap, dcache.heap[--dcache.nheap]);
    421 	db->heap = TWID32;
    422 }
    423 
    424 /*
    425  * push an element up or down to it's correct new location
    426  */
    427 static void
    428 fixheap(int i, DBlock *b)
    429 {
    430 	if(upheap(i, b) == i)
    431 		downheap(i, b);
    432 }
    433 
    434 static int
    435 upheap(int i, DBlock *b)
    436 {
    437 	DBlock *bb;
    438 	u32int now;
    439 	int p;
    440 
    441 	now = dcache.now;
    442 	for(; i != 0; i = p){
    443 		p = (i - 1) >> 1;
    444 		bb = dcache.heap[p];
    445 		if(b->used2 - now >= bb->used2 - now)
    446 			break;
    447 		dcache.heap[i] = bb;
    448 		bb->heap = i;
    449 	}
    450 
    451 	dcache.heap[i] = b;
    452 	b->heap = i;
    453 	return i;
    454 }
    455 
    456 static int
    457 downheap(int i, DBlock *b)
    458 {
    459 	DBlock *bb;
    460 	u32int now;
    461 	int k;
    462 
    463 	now = dcache.now;
    464 	for(; ; i = k){
    465 		k = (i << 1) + 1;
    466 		if(k >= dcache.nheap)
    467 			break;
    468 		if(k + 1 < dcache.nheap && dcache.heap[k]->used2 - now > dcache.heap[k + 1]->used2 - now)
    469 			k++;
    470 		bb = dcache.heap[k];
    471 		if(b->used2 - now <= bb->used2 - now)
    472 			break;
    473 		dcache.heap[i] = bb;
    474 		bb->heap = i;
    475 	}
    476 
    477 	dcache.heap[i] = b;
    478 	b->heap = i;
    479 	return i;
    480 }
    481 
    482 static void
    483 findblock(DBlock *bb)
    484 {
    485 	DBlock *b, *last;
    486 	int h;
    487 
    488 	last = nil;
    489 	h = pbhash(bb->addr);
    490 	for(b = dcache.heads[h]; b != nil; b = b->next){
    491 		if(last != b->prev)
    492 			sysfatal("bad prev link");
    493 		if(b == bb)
    494 			return;
    495 		last = b;
    496 	}
    497 	sysfatal("block missing from hash table");
    498 }
    499 
    500 void
    501 checkdcache(void)
    502 {
    503 	DBlock *b;
    504 	u32int size, now;
    505 	int i, k, refed, nfree;
    506 
    507 	qlock(&dcache.lock);
    508 	size = dcache.size;
    509 	now = dcache.now;
    510 	for(i = 0; i < dcache.nheap; i++){
    511 		if(dcache.heap[i]->heap != i)
    512 			sysfatal("dc: mis-heaped at %d: %d", i, dcache.heap[i]->heap);
    513 		if(i > 0 && dcache.heap[(i - 1) >> 1]->used2 - now > dcache.heap[i]->used2 - now)
    514 			sysfatal("dc: bad heap ordering");
    515 		k = (i << 1) + 1;
    516 		if(k < dcache.nheap && dcache.heap[i]->used2 - now > dcache.heap[k]->used2 - now)
    517 			sysfatal("dc: bad heap ordering");
    518 		k++;
    519 		if(k < dcache.nheap && dcache.heap[i]->used2 - now > dcache.heap[k]->used2 - now)
    520 			sysfatal("dc: bad heap ordering");
    521 	}
    522 
    523 	refed = 0;
    524 	for(i = 0; i < dcache.nblocks; i++){
    525 		b = &dcache.blocks[i];
    526 		if(b->data != &dcache.mem[i * size])
    527 			sysfatal("dc: mis-blocked at %d", i);
    528 		if(b->ref && b->heap == TWID32)
    529 			refed++;
    530 		if(b->addr)
    531 			findblock(b);
    532 		if(b->heap != TWID32
    533 		&& dcache.heap[b->heap] != b)
    534 			sysfatal("dc: spurious heap value");
    535 	}
    536 
    537 	nfree = 0;
    538 	for(b = dcache.free; b != nil; b = b->next){
    539 		if(b->addr != 0 || b->heap != TWID32)
    540 			sysfatal("dc: bad free list");
    541 		nfree++;
    542 	}
    543 
    544 	if(dcache.nheap + nfree + refed != dcache.nblocks)
    545 		sysfatal("dc: missing blocks: %d %d %d", dcache.nheap, refed, dcache.nblocks);
    546 	qunlock(&dcache.lock);
    547 }
    548 
    549 void
    550 flushdcache(void)
    551 {
    552 	trace(TraceProc, "flushdcache enter");
    553 	kickround(&dcache.round, 1);
    554 	trace(TraceProc, "flushdcache exit");
    555 }
    556 
    557 void
    558 kickdcache(void)
    559 {
    560 	kickround(&dcache.round, 0);
    561 }
    562 
    563 static int
    564 parallelwrites(DBlock **b, DBlock **eb, int dirty)
    565 {
    566 	DBlock **p, **q;
    567 	Part *part;
    568 
    569 	for(p=b; p<eb && (*p)->dirty == dirty; p++){
    570 		assert(b<=p && p<eb);
    571 		sendp((*p)->part->writechan, *p);
    572 	}
    573 	q = p;
    574 	for(p=b; p<q; p++){
    575 		assert(b<=p && p<eb);
    576 		recvp((*p)->writedonechan);
    577 	}
    578 
    579 	/*
    580 	 * Flush the partitions that have been written to.
    581 	 */
    582 	part = nil;
    583 	for(p=b; p<q; p++){
    584 		if(part != (*p)->part){
    585 			part = (*p)->part;
    586 			flushpart(part);	/* what if it fails? */
    587 		}
    588 	}
    589 
    590 	return p-b;
    591 }
    592 
    593 /*
    594  * Sort first by dirty flag, then by partition, then by address in partition.
    595  */
    596 static int
    597 writeblockcmp(const void *va, const void *vb)
    598 {
    599 	DBlock *a, *b;
    600 
    601 	a = *(DBlock**)va;
    602 	b = *(DBlock**)vb;
    603 
    604 	if(a->dirty != b->dirty)
    605 		return a->dirty - b->dirty;
    606 	if(a->part != b->part){
    607 		if(a->part < b->part)
    608 			return -1;
    609 		if(a->part > b->part)
    610 			return 1;
    611 	}
    612 	if(a->addr < b->addr)
    613 		return -1;
    614 	return 1;
    615 }
    616 
    617 static void
    618 flushproc(void *v)
    619 {
    620 	int i, j, n;
    621 	ulong t0;
    622 	DBlock *b, **write;
    623 
    624 	USED(v);
    625 	threadsetname("flushproc");
    626 	for(;;){
    627 		waitforkick(&dcache.round);
    628 
    629 		trace(TraceWork, "start");
    630 		t0 = nsec()/1000;
    631 		trace(TraceProc, "build t=%lud", (ulong)(nsec()/1000)-t0);
    632 
    633 		write = dcache.write;
    634 		n = 0;
    635 		for(i=0; i<dcache.nblocks; i++){
    636 			b = &dcache.blocks[i];
    637 			if(b->dirty)
    638 				write[n++] = b;
    639 		}
    640 
    641 		qsort(write, n, sizeof(write[0]), writeblockcmp);
    642 
    643 		/* Write each stage of blocks out. */
    644 		trace(TraceProc, "writeblocks t=%lud", (ulong)(nsec()/1000)-t0);
    645 		i = 0;
    646 		for(j=1; j<DirtyMax; j++){
    647 			trace(TraceProc, "writeblocks.%d t=%lud",
    648 				j, (ulong)(nsec()/1000)-t0);
    649 			i += parallelwrites(write+i, write+n, j);
    650 		}
    651 		if(i != n){
    652 			fprint(2, "in flushproc i=%d n=%d\n", i, n);
    653 			for(i=0; i<n; i++)
    654 				fprint(2, "\tblock %d: dirty=%d\n",
    655 					i, write[i]->dirty);
    656 			abort();
    657 		}
    658 
    659 		/*
    660 		 * b->dirty is protected by b->lock while ndirty is protected
    661 		 * by dcache.lock, so the --ndirty below is the delayed one
    662 		 * from clearing b->dirty in the write proc.  It may happen
    663 		 * that some other proc has come along and redirtied b since
    664 		 * the write.  That's okay, it just means that ndirty may be
    665 		 * one too high until we catch up and do the decrement.
    666 		 */
    667 		trace(TraceProc, "undirty.%d t=%lud", j, (ulong)(nsec()/1000)-t0);
    668 		qlock(&dcache.lock);
    669 		for(i=0; i<n; i++){
    670 			b = write[i];
    671 			--dcache.ndirty;
    672 			if(b->ref == 0 && b->heap == TWID32){
    673 				upheap(dcache.nheap++, b);
    674 				rwakeupall(&dcache.full);
    675 			}
    676 		}
    677 		setstat(StatDcacheDirty, dcache.ndirty);
    678 		qunlock(&dcache.lock);
    679 		addstat(StatDcacheFlush, 1);
    680 		trace(TraceWork, "finish");
    681 	}
    682 }
    683 
    684 static void
    685 writeproc(void *v)
    686 {
    687 	DBlock *b;
    688 	Part *p;
    689 
    690 	p = v;
    691 
    692 	threadsetname("writeproc:%s", p->name);
    693 	for(;;){
    694 		b = recvp(p->writechan);
    695 		trace(TraceWork, "start");
    696 		assert(b->part == p);
    697 		trace(TraceProc, "wlock %s 0x%llux", p->name, b->addr);
    698 		wlock(&b->lock);
    699 		trace(TraceProc, "writepart %s 0x%llux", p->name, b->addr);
    700 		diskaccess(0);
    701 		if(writepart(p, b->addr, b->data, b->size) < 0)
    702 			fprint(2, "%s: writeproc: part %s addr 0x%llux: write error: %r\n",
    703 				argv0, p->name, b->addr);
    704 		addstat(StatApartWrite, 1);
    705 		addstat(StatApartWriteBytes, b->size);
    706 		b->dirty = 0;
    707 		wunlock(&b->lock);
    708 		trace(TraceProc, "finish %s 0x%llux", p->name, b->addr);
    709 		trace(TraceWork, "finish");
    710 		sendp(b->writedonechan, b);
    711 	}
    712 }