icachewrite.c (7471B)
1 /* 2 * Write the dirty icache entries to disk. Random seeks are 3 * so expensive that it makes sense to wait until we have 4 * a lot and then just make a sequential pass over the disk. 5 */ 6 #include "stdinc.h" 7 #include "dat.h" 8 #include "fns.h" 9 10 static void icachewriteproc(void*); 11 static void icachewritecoord(void*); 12 static IEntry *iesort(IEntry*); 13 14 int icachesleeptime = 1000; /* milliseconds */ 15 int minicachesleeptime = 0; 16 17 enum 18 { 19 Bufsize = 8*1024*1024 20 }; 21 22 typedef struct IWrite IWrite; 23 struct IWrite 24 { 25 Round round; 26 AState as; 27 }; 28 29 static IWrite iwrite; 30 31 void 32 initicachewrite(void) 33 { 34 int i; 35 Index *ix; 36 37 initround(&iwrite.round, "icache", 120*60*1000); 38 ix = mainindex; 39 for(i=0; i<ix->nsects; i++){ 40 ix->sects[i]->writechan = chancreate(sizeof(ulong), 1); 41 ix->sects[i]->writedonechan = chancreate(sizeof(ulong), 1); 42 vtproc(icachewriteproc, ix->sects[i]); 43 } 44 vtproc(icachewritecoord, nil); 45 vtproc(delaykickroundproc, &iwrite.round); 46 } 47 48 static u64int 49 ie2diskaddr(Index *ix, ISect *is, IEntry *ie) 50 { 51 u64int bucket, addr; 52 53 bucket = hashbits(ie->score, 32)/ix->div; 54 addr = is->blockbase + ((bucket - is->start) << is->blocklog); 55 return addr; 56 } 57 58 static IEntry* 59 nextchunk(Index *ix, ISect *is, IEntry **pie, u64int *paddr, uint *pnbuf) 60 { 61 u64int addr, naddr; 62 uint nbuf; 63 int bsize; 64 IEntry *iefirst, *ie, **l; 65 66 bsize = 1<<is->blocklog; 67 iefirst = *pie; 68 addr = ie2diskaddr(ix, is, iefirst); 69 nbuf = 0; 70 for(l = &iefirst->nextdirty; (ie = *l) != nil; l = &(*l)->nextdirty){ 71 naddr = ie2diskaddr(ix, is, ie); 72 if(naddr - addr >= Bufsize) 73 break; 74 nbuf = naddr - addr; 75 } 76 nbuf += bsize; 77 78 *l = nil; 79 *pie = ie; 80 *paddr = addr; 81 *pnbuf = nbuf; 82 return iefirst; 83 } 84 85 static int 86 icachewritesect(Index *ix, ISect *is, u8int *buf) 87 { 88 int err, i, werr, h, bsize, t; 89 u32int lo, hi; 90 u64int addr, naddr; 91 uint nbuf, off; 92 DBlock *b; 93 IBucket ib; 94 IEntry *ie, *iedirty, **l, *chunk; 95 96 lo = is->start * ix->div; 97 if(TWID32/ix->div < is->stop) 98 hi = TWID32; 99 else 100 hi = is->stop * ix->div - 1; 101 102 trace(TraceProc, "icachewritesect enter %ud %ud %llud", 103 lo, hi, iwrite.as.aa); 104 105 iedirty = icachedirty(lo, hi, iwrite.as.aa); 106 iedirty = iesort(iedirty); 107 bsize = 1 << is->blocklog; 108 err = 0; 109 110 while(iedirty){ 111 disksched(); 112 while((t = icachesleeptime) == SleepForever){ 113 sleep(1000); 114 disksched(); 115 } 116 if(t < minicachesleeptime) 117 t = minicachesleeptime; 118 if(t > 0) 119 sleep(t); 120 trace(TraceProc, "icachewritesect nextchunk"); 121 chunk = nextchunk(ix, is, &iedirty, &addr, &nbuf); 122 123 trace(TraceProc, "icachewritesect readpart 0x%llux+0x%ux", 124 addr, nbuf); 125 if(readpart(is->part, addr, buf, nbuf) < 0){ 126 fprint(2, "%s: part %s addr 0x%llux: icachewritesect " 127 "readpart: %r\n", argv0, is->part->name, addr); 128 err = -1; 129 continue; 130 } 131 trace(TraceProc, "icachewritesect updatebuf"); 132 addstat(StatIsectReadBytes, nbuf); 133 addstat(StatIsectRead, 1); 134 135 for(l=&chunk; (ie=*l)!=nil; l=&ie->nextdirty){ 136 again: 137 naddr = ie2diskaddr(ix, is, ie); 138 off = naddr - addr; 139 if(off+bsize > nbuf){ 140 fprint(2, "%s: whoops! addr=0x%llux nbuf=%ud " 141 "addr+nbuf=0x%llux naddr=0x%llux\n", 142 argv0, addr, nbuf, addr+nbuf, naddr); 143 assert(off+bsize <= nbuf); 144 } 145 unpackibucket(&ib, buf+off, is->bucketmagic); 146 if(okibucket(&ib, is) < 0){ 147 fprint(2, "%s: bad bucket XXX\n", argv0); 148 goto skipit; 149 } 150 trace(TraceProc, "icachewritesect add %V at 0x%llux", 151 ie->score, naddr); 152 h = bucklook(ie->score, ie->ia.type, ib.data, ib.n); 153 if(h & 1){ 154 h ^= 1; 155 packientry(ie, &ib.data[h]); 156 }else if(ib.n < is->buckmax){ 157 memmove(&ib.data[h + IEntrySize], &ib.data[h], 158 ib.n*IEntrySize - h); 159 ib.n++; 160 packientry(ie, &ib.data[h]); 161 }else{ 162 fprint(2, "%s: bucket overflow XXX\n", argv0); 163 skipit: 164 err = -1; 165 *l = ie->nextdirty; 166 ie = *l; 167 if(ie) 168 goto again; 169 else 170 break; 171 } 172 packibucket(&ib, buf+off, is->bucketmagic); 173 } 174 175 diskaccess(1); 176 177 trace(TraceProc, "icachewritesect writepart", addr, nbuf); 178 werr = 0; 179 if(writepart(is->part, addr, buf, nbuf) < 0 || flushpart(is->part) < 0) 180 werr = -1; 181 182 for(i=0; i<nbuf; i+=bsize){ 183 if((b = _getdblock(is->part, addr+i, ORDWR, 0)) != nil){ 184 memmove(b->data, buf+i, bsize); 185 putdblock(b); 186 } 187 } 188 189 if(werr < 0){ 190 fprint(2, "%s: part %s addr 0x%llux: icachewritesect " 191 "writepart: %r\n", argv0, is->part->name, addr); 192 err = -1; 193 continue; 194 } 195 196 addstat(StatIsectWriteBytes, nbuf); 197 addstat(StatIsectWrite, 1); 198 icacheclean(chunk); 199 } 200 201 trace(TraceProc, "icachewritesect done"); 202 return err; 203 } 204 205 static void 206 icachewriteproc(void *v) 207 { 208 int ret; 209 uint bsize; 210 ISect *is; 211 Index *ix; 212 u8int *buf; 213 214 ix = mainindex; 215 is = v; 216 threadsetname("icachewriteproc:%s", is->part->name); 217 218 bsize = 1<<is->blocklog; 219 buf = emalloc(Bufsize+bsize); 220 buf = (u8int*)(((uintptr)buf+bsize-1)&~(uintptr)(bsize-1)); 221 222 for(;;){ 223 trace(TraceProc, "icachewriteproc recv"); 224 recv(is->writechan, 0); 225 trace(TraceWork, "start"); 226 ret = icachewritesect(ix, is, buf); 227 trace(TraceProc, "icachewriteproc send"); 228 trace(TraceWork, "finish"); 229 sendul(is->writedonechan, ret); 230 } 231 } 232 233 static void 234 icachewritecoord(void *v) 235 { 236 int i, err; 237 Index *ix; 238 AState as; 239 240 USED(v); 241 242 threadsetname("icachewritecoord"); 243 244 ix = mainindex; 245 iwrite.as = icachestate(); 246 247 for(;;){ 248 trace(TraceProc, "icachewritecoord sleep"); 249 waitforkick(&iwrite.round); 250 trace(TraceWork, "start"); 251 as = icachestate(); 252 if(as.arena==iwrite.as.arena && as.aa==iwrite.as.aa){ 253 /* will not be able to do anything more than last flush - kick disk */ 254 trace(TraceProc, "icachewritecoord kick dcache"); 255 kickdcache(); 256 trace(TraceProc, "icachewritecoord kicked dcache"); 257 goto SkipWork; /* won't do anything; don't bother rewriting bloom filter */ 258 } 259 iwrite.as = as; 260 261 trace(TraceProc, "icachewritecoord start flush"); 262 if(iwrite.as.arena){ 263 for(i=0; i<ix->nsects; i++) 264 send(ix->sects[i]->writechan, 0); 265 if(ix->bloom) 266 send(ix->bloom->writechan, 0); 267 268 err = 0; 269 for(i=0; i<ix->nsects; i++) 270 err |= recvul(ix->sects[i]->writedonechan); 271 if(ix->bloom) 272 err |= recvul(ix->bloom->writedonechan); 273 274 trace(TraceProc, "icachewritecoord donewrite err=%d", err); 275 if(err == 0){ 276 setatailstate(&iwrite.as); 277 } 278 } 279 SkipWork: 280 icacheclean(nil); /* wake up anyone waiting */ 281 trace(TraceWork, "finish"); 282 addstat(StatIcacheFlush, 1); 283 } 284 } 285 286 void 287 flushicache(void) 288 { 289 trace(TraceProc, "flushicache enter"); 290 kickround(&iwrite.round, 1); 291 trace(TraceProc, "flushicache exit"); 292 } 293 294 void 295 kickicache(void) 296 { 297 kickround(&iwrite.round, 0); 298 } 299 300 void 301 delaykickicache(void) 302 { 303 delaykickround(&iwrite.round); 304 } 305 306 static IEntry* 307 iesort(IEntry *ie) 308 { 309 int cmp; 310 IEntry **l; 311 IEntry *ie1, *ie2, *sorted; 312 313 if(ie == nil || ie->nextdirty == nil) 314 return ie; 315 316 /* split the lists */ 317 ie1 = ie; 318 ie2 = ie; 319 if(ie2) 320 ie2 = ie2->nextdirty; 321 if(ie2) 322 ie2 = ie2->nextdirty; 323 while(ie1 && ie2){ 324 ie1 = ie1->nextdirty; 325 ie2 = ie2->nextdirty; 326 if(ie2) 327 ie2 = ie2->nextdirty; 328 } 329 if(ie1){ 330 ie2 = ie1->nextdirty; 331 ie1->nextdirty = nil; 332 } 333 334 /* sort the lists */ 335 ie1 = iesort(ie); 336 ie2 = iesort(ie2); 337 338 /* merge the lists */ 339 sorted = nil; 340 l = &sorted; 341 cmp = 0; 342 while(ie1 || ie2){ 343 if(ie1 && ie2) 344 cmp = scorecmp(ie1->score, ie2->score); 345 if(ie1==nil || (ie2 && cmp > 0)){ 346 *l = ie2; 347 l = &ie2->nextdirty; 348 ie2 = ie2->nextdirty; 349 }else{ 350 *l = ie1; 351 l = &ie1->nextdirty; 352 ie1 = ie1->nextdirty; 353 } 354 } 355 *l = nil; 356 return sorted; 357 }