plan9port

fork of plan9port with libvec, libstr and libsdb
Log | Files | Refs | README | LICENSE

disk.c (7036B)


      1 #include "stdinc.h"
      2 #include "dat.h"
      3 #include "fns.h"
      4 #include "error.h"
      5 
      6 static void diskThread(void *a);
      7 
      8 enum {
      9 	/*
     10 	 * disable measurement since it gets alignment faults on BG
     11 	 * and the guts used to be commented out.
     12 	 */
     13 	Timing	= 0,			/* flag */
     14 	QueueSize = 100,		/* maximum block to queue */
     15 };
     16 
     17 struct Disk {
     18 	QLock lk;
     19 	int ref;
     20 
     21 	int fd;
     22 	Header h;
     23 
     24 	Rendez flow;
     25 	Rendez starve;
     26 	Rendez flush;
     27 	Rendez die;
     28 
     29 	int nqueue;
     30 
     31 	Block *cur;		/* block to do on current scan */
     32 	Block *next;		/* blocks to do next scan */
     33 };
     34 
     35 /* keep in sync with Part* enum in dat.h */
     36 static char *partname[] = {
     37 	[PartError]	= "error",
     38 	[PartSuper]	= "super",
     39 	[PartLabel]	= "label",
     40 	[PartData]	= "data",
     41 	[PartVenti]	= "venti",
     42 };
     43 
     44 Disk *
     45 diskAlloc(int fd)
     46 {
     47 	u8int buf[HeaderSize];
     48 	Header h;
     49 	Disk *disk;
     50 
     51 	if(pread(fd, buf, HeaderSize, HeaderOffset) < HeaderSize){
     52 		werrstr("short read: %r");
     53 		return nil;
     54 	}
     55 
     56 	if(!headerUnpack(&h, buf)){
     57 		werrstr("bad disk header");
     58 		return nil;
     59 	}
     60 	disk = vtmallocz(sizeof(Disk));
     61 	disk->starve.l = &disk->lk;
     62 	disk->flow.l = &disk->lk;
     63 	disk->flush.l = &disk->lk;
     64 	disk->fd = fd;
     65 	disk->h = h;
     66 
     67 	disk->ref = 2;
     68 	proccreate(diskThread, disk, STACK);
     69 
     70 	return disk;
     71 }
     72 
     73 void
     74 diskFree(Disk *disk)
     75 {
     76 	diskFlush(disk);
     77 
     78 	/* kill slave */
     79 	qlock(&disk->lk);
     80 	disk->die.l = &disk->lk;
     81 	rwakeup(&disk->starve);
     82 	while(disk->ref > 1)
     83 		rsleep(&disk->die);
     84 	qunlock(&disk->lk);
     85 	close(disk->fd);
     86 	vtfree(disk);
     87 }
     88 
     89 static u32int
     90 partStart(Disk *disk, int part)
     91 {
     92 	switch(part){
     93 	default:
     94 		assert(0);
     95 	case PartSuper:
     96 		return disk->h.super;
     97 	case PartLabel:
     98 		return disk->h.label;
     99 	case PartData:
    100 		return disk->h.data;
    101 	}
    102 }
    103 
    104 
    105 static u32int
    106 partEnd(Disk *disk, int part)
    107 {
    108 	switch(part){
    109 	default:
    110 		assert(0);
    111 	case PartSuper:
    112 		return disk->h.super+1;
    113 	case PartLabel:
    114 		return disk->h.data;
    115 	case PartData:
    116 		return disk->h.end;
    117 	}
    118 }
    119 
    120 int
    121 diskReadRaw(Disk *disk, int part, u32int addr, uchar *buf)
    122 {
    123 	ulong start, end;
    124 	u64int offset;
    125 	int n, nn;
    126 
    127 	start = partStart(disk, part);
    128 	end = partEnd(disk, part);
    129 
    130 	if(addr >= end-start){
    131 		werrstr(EBadAddr);
    132 		return 0;
    133 	}
    134 
    135 	offset = ((u64int)(addr + start))*disk->h.blockSize;
    136 	n = disk->h.blockSize;
    137 	while(n > 0){
    138 		nn = pread(disk->fd, buf, n, offset);
    139 		if(nn < 0){
    140 			werrstr("%r");
    141 			return 0;
    142 		}
    143 		if(nn == 0){
    144 			werrstr("eof reading disk");
    145 			return 0;
    146 		}
    147 		n -= nn;
    148 		offset += nn;
    149 		buf += nn;
    150 	}
    151 	return 1;
    152 }
    153 
    154 int
    155 diskWriteRaw(Disk *disk, int part, u32int addr, uchar *buf)
    156 {
    157 	ulong start, end;
    158 	u64int offset;
    159 	int n;
    160 
    161 	start = partStart(disk, part);
    162 	end = partEnd(disk, part);
    163 
    164 	if(addr >= end - start){
    165 		werrstr(EBadAddr);
    166 		return 0;
    167 	}
    168 
    169 	offset = ((u64int)(addr + start))*disk->h.blockSize;
    170 	n = pwrite(disk->fd, buf, disk->h.blockSize, offset);
    171 	if(n < 0){
    172 		werrstr("%r");
    173 		return 0;
    174 	}
    175 	if(n < disk->h.blockSize) {
    176 		werrstr("short write");
    177 		return 0;
    178 	}
    179 
    180 	return 1;
    181 }
    182 
    183 static void
    184 diskQueue(Disk *disk, Block *b)
    185 {
    186 	Block **bp, *bb;
    187 
    188 	qlock(&disk->lk);
    189 	while(disk->nqueue >= QueueSize)
    190 		rsleep(&disk->flow);
    191 	if(disk->cur == nil || b->addr > disk->cur->addr)
    192 		bp = &disk->cur;
    193 	else
    194 		bp = &disk->next;
    195 
    196 	for(bb=*bp; bb; bb=*bp){
    197 		if(b->addr < bb->addr)
    198 			break;
    199 		bp = &bb->ionext;
    200 	}
    201 	b->ionext = bb;
    202 	*bp = b;
    203 	if(disk->nqueue == 0)
    204 		rwakeup(&disk->starve);
    205 	disk->nqueue++;
    206 	qunlock(&disk->lk);
    207 }
    208 
    209 
    210 void
    211 diskRead(Disk *disk, Block *b)
    212 {
    213 	assert(b->iostate == BioEmpty || b->iostate == BioLabel);
    214 	blockSetIOState(b, BioReading);
    215 	diskQueue(disk, b);
    216 }
    217 
    218 void
    219 diskWrite(Disk *disk, Block *b)
    220 {
    221 	assert(b->nlock == 1);
    222 	assert(b->iostate == BioDirty);
    223 	blockSetIOState(b, BioWriting);
    224 	diskQueue(disk, b);
    225 }
    226 
    227 void
    228 diskWriteAndWait(Disk *disk, Block *b)
    229 {
    230 	int nlock;
    231 
    232 	/*
    233 	 * If b->nlock > 1, the block is aliased within
    234 	 * a single thread.  That thread is us.
    235 	 * DiskWrite does some funny stuff with QLock
    236 	 * and blockPut that basically assumes b->nlock==1.
    237 	 * We humor diskWrite by temporarily setting
    238 	 * nlock to 1.  This needs to be revisited.
    239 	 */
    240 	nlock = b->nlock;
    241 	if(nlock > 1)
    242 		b->nlock = 1;
    243 	diskWrite(disk, b);
    244 	while(b->iostate != BioClean)
    245 		rsleep(&b->ioready);
    246 	b->nlock = nlock;
    247 }
    248 
    249 int
    250 diskBlockSize(Disk *disk)
    251 {
    252 	return disk->h.blockSize;	/* immuttable */
    253 }
    254 
    255 int
    256 diskFlush(Disk *disk)
    257 {
    258 	Dir dir;
    259 
    260 	qlock(&disk->lk);
    261 	while(disk->nqueue > 0)
    262 		rsleep(&disk->flush);
    263 	qunlock(&disk->lk);
    264 
    265 	/* there really should be a cleaner interface to flush an fd */
    266 	nulldir(&dir);
    267 	if(dirfwstat(disk->fd, &dir) < 0){
    268 		werrstr("%r");
    269 		return 0;
    270 	}
    271 	return 1;
    272 }
    273 
    274 u32int
    275 diskSize(Disk *disk, int part)
    276 {
    277 	return partEnd(disk, part) - partStart(disk, part);
    278 }
    279 
    280 static uintptr
    281 mypc(int x)
    282 {
    283 	return getcallerpc(&x);
    284 }
    285 
    286 static char *
    287 disk2file(Disk *disk)
    288 {
    289 	static char buf[256];
    290 
    291 #ifndef PLAN9PORT
    292 	if (fd2path(disk->fd, buf, sizeof buf) < 0)
    293 		strncpy(buf, "GOK", sizeof buf);
    294 #endif
    295 	return buf;
    296 }
    297 
    298 static void
    299 diskThread(void *a)
    300 {
    301 	Disk *disk = a;
    302 	Block *b;
    303 	uchar *buf, *p;
    304 	double t;
    305 	int nio;
    306 
    307 	threadsetname("disk");
    308 
    309 //fprint(2, "diskThread %d\n", getpid());
    310 
    311 	buf = vtmalloc(disk->h.blockSize);
    312 
    313 	qlock(&disk->lk);
    314 	if (Timing) {
    315 		nio = 0;
    316 		t = -nsec();
    317 	}
    318 	for(;;){
    319 		while(disk->nqueue == 0){
    320 			if (Timing) {
    321 				t += nsec();
    322 				if(nio >= 10000){
    323 					fprint(2, "disk: io=%d at %.3fms\n",
    324 						nio, t*1e-6/nio);
    325 					nio = 0;
    326 					t = 0;
    327 				}
    328 			}
    329 			if(disk->die.l != nil)
    330 				goto Done;
    331 			rsleep(&disk->starve);
    332 			if (Timing)
    333 				t -= nsec();
    334 		}
    335 		assert(disk->cur != nil || disk->next != nil);
    336 
    337 		if(disk->cur == nil){
    338 			disk->cur = disk->next;
    339 			disk->next = nil;
    340 		}
    341 		b = disk->cur;
    342 		disk->cur = b->ionext;
    343 		qunlock(&disk->lk);
    344 
    345 		/*
    346 		 * no one should hold onto blocking in the
    347 		 * reading or writing state, so this lock should
    348 		 * not cause deadlock.
    349 		 */
    350 if(0)fprint(2, "fossil: diskThread: %d:%d %x\n", getpid(), b->part, b->addr);
    351 		bwatchLock(b);
    352 		qlock(&b->lk);
    353 		b->pc = mypc(0);
    354 		assert(b->nlock == 1);
    355 		switch(b->iostate){
    356 		default:
    357 			abort();
    358 		case BioReading:
    359 			if(!diskReadRaw(disk, b->part, b->addr, b->data)){
    360 				fprint(2, "fossil: diskReadRaw failed: %s: "
    361 					"score %V: part=%s block %ud: %r\n",
    362 					disk2file(disk), b->score,
    363 					partname[b->part], b->addr);
    364 				blockSetIOState(b, BioReadError);
    365 			}else
    366 				blockSetIOState(b, BioClean);
    367 			break;
    368 		case BioWriting:
    369 			p = blockRollback(b, buf);
    370 			/* NB: ctime result ends with a newline */
    371 			if(!diskWriteRaw(disk, b->part, b->addr, p)){
    372 				fprint(2, "fossil: diskWriteRaw failed: %s: "
    373 				    "score %V: date %s part=%s block %ud: %r\n",
    374 					disk2file(disk), b->score,
    375 					ctime(time(0)),
    376 					partname[b->part], b->addr);
    377 				break;
    378 			}
    379 			if(p != buf)
    380 				blockSetIOState(b, BioClean);
    381 			else
    382 				blockSetIOState(b, BioDirty);
    383 			break;
    384 		}
    385 
    386 		blockPut(b);		/* remove extra reference, unlock */
    387 		qlock(&disk->lk);
    388 		disk->nqueue--;
    389 		if(disk->nqueue == QueueSize-1)
    390 			rwakeup(&disk->flow);
    391 		if(disk->nqueue == 0)
    392 			rwakeup(&disk->flush);
    393 		if(Timing)
    394 			nio++;
    395 	}
    396 Done:
    397 //fprint(2, "diskThread done\n");
    398 	disk->ref--;
    399 	rwakeup(&disk->die);
    400 	qunlock(&disk->lk);
    401 	vtfree(buf);
    402 }