disk.c (7036B)
1 #include "stdinc.h" 2 #include "dat.h" 3 #include "fns.h" 4 #include "error.h" 5 6 static void diskThread(void *a); 7 8 enum { 9 /* 10 * disable measurement since it gets alignment faults on BG 11 * and the guts used to be commented out. 12 */ 13 Timing = 0, /* flag */ 14 QueueSize = 100, /* maximum block to queue */ 15 }; 16 17 struct Disk { 18 QLock lk; 19 int ref; 20 21 int fd; 22 Header h; 23 24 Rendez flow; 25 Rendez starve; 26 Rendez flush; 27 Rendez die; 28 29 int nqueue; 30 31 Block *cur; /* block to do on current scan */ 32 Block *next; /* blocks to do next scan */ 33 }; 34 35 /* keep in sync with Part* enum in dat.h */ 36 static char *partname[] = { 37 [PartError] = "error", 38 [PartSuper] = "super", 39 [PartLabel] = "label", 40 [PartData] = "data", 41 [PartVenti] = "venti", 42 }; 43 44 Disk * 45 diskAlloc(int fd) 46 { 47 u8int buf[HeaderSize]; 48 Header h; 49 Disk *disk; 50 51 if(pread(fd, buf, HeaderSize, HeaderOffset) < HeaderSize){ 52 werrstr("short read: %r"); 53 return nil; 54 } 55 56 if(!headerUnpack(&h, buf)){ 57 werrstr("bad disk header"); 58 return nil; 59 } 60 disk = vtmallocz(sizeof(Disk)); 61 disk->starve.l = &disk->lk; 62 disk->flow.l = &disk->lk; 63 disk->flush.l = &disk->lk; 64 disk->fd = fd; 65 disk->h = h; 66 67 disk->ref = 2; 68 proccreate(diskThread, disk, STACK); 69 70 return disk; 71 } 72 73 void 74 diskFree(Disk *disk) 75 { 76 diskFlush(disk); 77 78 /* kill slave */ 79 qlock(&disk->lk); 80 disk->die.l = &disk->lk; 81 rwakeup(&disk->starve); 82 while(disk->ref > 1) 83 rsleep(&disk->die); 84 qunlock(&disk->lk); 85 close(disk->fd); 86 vtfree(disk); 87 } 88 89 static u32int 90 partStart(Disk *disk, int part) 91 { 92 switch(part){ 93 default: 94 assert(0); 95 case PartSuper: 96 return disk->h.super; 97 case PartLabel: 98 return disk->h.label; 99 case PartData: 100 return disk->h.data; 101 } 102 } 103 104 105 static u32int 106 partEnd(Disk *disk, int part) 107 { 108 switch(part){ 109 default: 110 assert(0); 111 case PartSuper: 112 return disk->h.super+1; 113 case PartLabel: 114 return disk->h.data; 115 case PartData: 116 return disk->h.end; 117 } 118 } 119 120 int 121 diskReadRaw(Disk *disk, int part, u32int addr, uchar *buf) 122 { 123 ulong start, end; 124 u64int offset; 125 int n, nn; 126 127 start = partStart(disk, part); 128 end = partEnd(disk, part); 129 130 if(addr >= end-start){ 131 werrstr(EBadAddr); 132 return 0; 133 } 134 135 offset = ((u64int)(addr + start))*disk->h.blockSize; 136 n = disk->h.blockSize; 137 while(n > 0){ 138 nn = pread(disk->fd, buf, n, offset); 139 if(nn < 0){ 140 werrstr("%r"); 141 return 0; 142 } 143 if(nn == 0){ 144 werrstr("eof reading disk"); 145 return 0; 146 } 147 n -= nn; 148 offset += nn; 149 buf += nn; 150 } 151 return 1; 152 } 153 154 int 155 diskWriteRaw(Disk *disk, int part, u32int addr, uchar *buf) 156 { 157 ulong start, end; 158 u64int offset; 159 int n; 160 161 start = partStart(disk, part); 162 end = partEnd(disk, part); 163 164 if(addr >= end - start){ 165 werrstr(EBadAddr); 166 return 0; 167 } 168 169 offset = ((u64int)(addr + start))*disk->h.blockSize; 170 n = pwrite(disk->fd, buf, disk->h.blockSize, offset); 171 if(n < 0){ 172 werrstr("%r"); 173 return 0; 174 } 175 if(n < disk->h.blockSize) { 176 werrstr("short write"); 177 return 0; 178 } 179 180 return 1; 181 } 182 183 static void 184 diskQueue(Disk *disk, Block *b) 185 { 186 Block **bp, *bb; 187 188 qlock(&disk->lk); 189 while(disk->nqueue >= QueueSize) 190 rsleep(&disk->flow); 191 if(disk->cur == nil || b->addr > disk->cur->addr) 192 bp = &disk->cur; 193 else 194 bp = &disk->next; 195 196 for(bb=*bp; bb; bb=*bp){ 197 if(b->addr < bb->addr) 198 break; 199 bp = &bb->ionext; 200 } 201 b->ionext = bb; 202 *bp = b; 203 if(disk->nqueue == 0) 204 rwakeup(&disk->starve); 205 disk->nqueue++; 206 qunlock(&disk->lk); 207 } 208 209 210 void 211 diskRead(Disk *disk, Block *b) 212 { 213 assert(b->iostate == BioEmpty || b->iostate == BioLabel); 214 blockSetIOState(b, BioReading); 215 diskQueue(disk, b); 216 } 217 218 void 219 diskWrite(Disk *disk, Block *b) 220 { 221 assert(b->nlock == 1); 222 assert(b->iostate == BioDirty); 223 blockSetIOState(b, BioWriting); 224 diskQueue(disk, b); 225 } 226 227 void 228 diskWriteAndWait(Disk *disk, Block *b) 229 { 230 int nlock; 231 232 /* 233 * If b->nlock > 1, the block is aliased within 234 * a single thread. That thread is us. 235 * DiskWrite does some funny stuff with QLock 236 * and blockPut that basically assumes b->nlock==1. 237 * We humor diskWrite by temporarily setting 238 * nlock to 1. This needs to be revisited. 239 */ 240 nlock = b->nlock; 241 if(nlock > 1) 242 b->nlock = 1; 243 diskWrite(disk, b); 244 while(b->iostate != BioClean) 245 rsleep(&b->ioready); 246 b->nlock = nlock; 247 } 248 249 int 250 diskBlockSize(Disk *disk) 251 { 252 return disk->h.blockSize; /* immuttable */ 253 } 254 255 int 256 diskFlush(Disk *disk) 257 { 258 Dir dir; 259 260 qlock(&disk->lk); 261 while(disk->nqueue > 0) 262 rsleep(&disk->flush); 263 qunlock(&disk->lk); 264 265 /* there really should be a cleaner interface to flush an fd */ 266 nulldir(&dir); 267 if(dirfwstat(disk->fd, &dir) < 0){ 268 werrstr("%r"); 269 return 0; 270 } 271 return 1; 272 } 273 274 u32int 275 diskSize(Disk *disk, int part) 276 { 277 return partEnd(disk, part) - partStart(disk, part); 278 } 279 280 static uintptr 281 mypc(int x) 282 { 283 return getcallerpc(&x); 284 } 285 286 static char * 287 disk2file(Disk *disk) 288 { 289 static char buf[256]; 290 291 #ifndef PLAN9PORT 292 if (fd2path(disk->fd, buf, sizeof buf) < 0) 293 strncpy(buf, "GOK", sizeof buf); 294 #endif 295 return buf; 296 } 297 298 static void 299 diskThread(void *a) 300 { 301 Disk *disk = a; 302 Block *b; 303 uchar *buf, *p; 304 double t; 305 int nio; 306 307 threadsetname("disk"); 308 309 //fprint(2, "diskThread %d\n", getpid()); 310 311 buf = vtmalloc(disk->h.blockSize); 312 313 qlock(&disk->lk); 314 if (Timing) { 315 nio = 0; 316 t = -nsec(); 317 } 318 for(;;){ 319 while(disk->nqueue == 0){ 320 if (Timing) { 321 t += nsec(); 322 if(nio >= 10000){ 323 fprint(2, "disk: io=%d at %.3fms\n", 324 nio, t*1e-6/nio); 325 nio = 0; 326 t = 0; 327 } 328 } 329 if(disk->die.l != nil) 330 goto Done; 331 rsleep(&disk->starve); 332 if (Timing) 333 t -= nsec(); 334 } 335 assert(disk->cur != nil || disk->next != nil); 336 337 if(disk->cur == nil){ 338 disk->cur = disk->next; 339 disk->next = nil; 340 } 341 b = disk->cur; 342 disk->cur = b->ionext; 343 qunlock(&disk->lk); 344 345 /* 346 * no one should hold onto blocking in the 347 * reading or writing state, so this lock should 348 * not cause deadlock. 349 */ 350 if(0)fprint(2, "fossil: diskThread: %d:%d %x\n", getpid(), b->part, b->addr); 351 bwatchLock(b); 352 qlock(&b->lk); 353 b->pc = mypc(0); 354 assert(b->nlock == 1); 355 switch(b->iostate){ 356 default: 357 abort(); 358 case BioReading: 359 if(!diskReadRaw(disk, b->part, b->addr, b->data)){ 360 fprint(2, "fossil: diskReadRaw failed: %s: " 361 "score %V: part=%s block %ud: %r\n", 362 disk2file(disk), b->score, 363 partname[b->part], b->addr); 364 blockSetIOState(b, BioReadError); 365 }else 366 blockSetIOState(b, BioClean); 367 break; 368 case BioWriting: 369 p = blockRollback(b, buf); 370 /* NB: ctime result ends with a newline */ 371 if(!diskWriteRaw(disk, b->part, b->addr, p)){ 372 fprint(2, "fossil: diskWriteRaw failed: %s: " 373 "score %V: date %s part=%s block %ud: %r\n", 374 disk2file(disk), b->score, 375 ctime(time(0)), 376 partname[b->part], b->addr); 377 break; 378 } 379 if(p != buf) 380 blockSetIOState(b, BioClean); 381 else 382 blockSetIOState(b, BioDirty); 383 break; 384 } 385 386 blockPut(b); /* remove extra reference, unlock */ 387 qlock(&disk->lk); 388 disk->nqueue--; 389 if(disk->nqueue == QueueSize-1) 390 rwakeup(&disk->flow); 391 if(disk->nqueue == 0) 392 rwakeup(&disk->flush); 393 if(Timing) 394 nio++; 395 } 396 Done: 397 //fprint(2, "diskThread done\n"); 398 disk->ref--; 399 rwakeup(&disk->die); 400 qunlock(&disk->lk); 401 vtfree(buf); 402 }