plan9port

fork of plan9port with libvec, libstr and libsdb
Log | Files | Refs | README | LICENSE

icachewrite.c (7471B)


      1 /*
      2  * Write the dirty icache entries to disk.  Random seeks are
      3  * so expensive that it makes sense to wait until we have
      4  * a lot and then just make a sequential pass over the disk.
      5  */
      6 #include "stdinc.h"
      7 #include "dat.h"
      8 #include "fns.h"
      9 
     10 static void icachewriteproc(void*);
     11 static void icachewritecoord(void*);
     12 static IEntry *iesort(IEntry*);
     13 
     14 int icachesleeptime = 1000;	/* milliseconds */
     15 int minicachesleeptime = 0;
     16 
     17 enum
     18 {
     19 	Bufsize = 8*1024*1024
     20 };
     21 
     22 typedef struct IWrite IWrite;
     23 struct IWrite
     24 {
     25 	Round round;
     26 	AState as;
     27 };
     28 
     29 static IWrite iwrite;
     30 
     31 void
     32 initicachewrite(void)
     33 {
     34 	int i;
     35 	Index *ix;
     36 
     37 	initround(&iwrite.round, "icache", 120*60*1000);
     38 	ix = mainindex;
     39 	for(i=0; i<ix->nsects; i++){
     40 		ix->sects[i]->writechan = chancreate(sizeof(ulong), 1);
     41 		ix->sects[i]->writedonechan = chancreate(sizeof(ulong), 1);
     42 		vtproc(icachewriteproc, ix->sects[i]);
     43 	}
     44 	vtproc(icachewritecoord, nil);
     45 	vtproc(delaykickroundproc, &iwrite.round);
     46 }
     47 
     48 static u64int
     49 ie2diskaddr(Index *ix, ISect *is, IEntry *ie)
     50 {
     51 	u64int bucket, addr;
     52 
     53 	bucket = hashbits(ie->score, 32)/ix->div;
     54 	addr = is->blockbase + ((bucket - is->start) << is->blocklog);
     55 	return addr;
     56 }
     57 
     58 static IEntry*
     59 nextchunk(Index *ix, ISect *is, IEntry **pie, u64int *paddr, uint *pnbuf)
     60 {
     61 	u64int addr, naddr;
     62 	uint nbuf;
     63 	int bsize;
     64 	IEntry *iefirst, *ie, **l;
     65 
     66 	bsize = 1<<is->blocklog;
     67 	iefirst = *pie;
     68 	addr = ie2diskaddr(ix, is, iefirst);
     69 	nbuf = 0;
     70 	for(l = &iefirst->nextdirty; (ie = *l) != nil; l = &(*l)->nextdirty){
     71 		naddr = ie2diskaddr(ix, is, ie);
     72 		if(naddr - addr >= Bufsize)
     73 			break;
     74 		nbuf = naddr - addr;
     75 	}
     76 	nbuf += bsize;
     77 
     78 	*l = nil;
     79 	*pie = ie;
     80 	*paddr = addr;
     81 	*pnbuf = nbuf;
     82 	return iefirst;
     83 }
     84 
     85 static int
     86 icachewritesect(Index *ix, ISect *is, u8int *buf)
     87 {
     88 	int err, i, werr, h, bsize, t;
     89 	u32int lo, hi;
     90 	u64int addr, naddr;
     91 	uint nbuf, off;
     92 	DBlock *b;
     93 	IBucket ib;
     94 	IEntry *ie, *iedirty, **l, *chunk;
     95 
     96 	lo = is->start * ix->div;
     97 	if(TWID32/ix->div < is->stop)
     98 		hi = TWID32;
     99 	else
    100 		hi = is->stop * ix->div - 1;
    101 
    102 	trace(TraceProc, "icachewritesect enter %ud %ud %llud",
    103 		lo, hi, iwrite.as.aa);
    104 
    105 	iedirty = icachedirty(lo, hi, iwrite.as.aa);
    106 	iedirty = iesort(iedirty);
    107 	bsize = 1 << is->blocklog;
    108 	err = 0;
    109 
    110 	while(iedirty){
    111 		disksched();
    112 		while((t = icachesleeptime) == SleepForever){
    113 			sleep(1000);
    114 			disksched();
    115 		}
    116 		if(t < minicachesleeptime)
    117 			t = minicachesleeptime;
    118 		if(t > 0)
    119 			sleep(t);
    120 		trace(TraceProc, "icachewritesect nextchunk");
    121 		chunk = nextchunk(ix, is, &iedirty, &addr, &nbuf);
    122 
    123 		trace(TraceProc, "icachewritesect readpart 0x%llux+0x%ux",
    124 			addr, nbuf);
    125 		if(readpart(is->part, addr, buf, nbuf) < 0){
    126 			fprint(2, "%s: part %s addr 0x%llux: icachewritesect "
    127 				"readpart: %r\n", argv0, is->part->name, addr);
    128 			err  = -1;
    129 			continue;
    130 		}
    131 		trace(TraceProc, "icachewritesect updatebuf");
    132 		addstat(StatIsectReadBytes, nbuf);
    133 		addstat(StatIsectRead, 1);
    134 
    135 		for(l=&chunk; (ie=*l)!=nil; l=&ie->nextdirty){
    136 again:
    137 			naddr = ie2diskaddr(ix, is, ie);
    138 			off = naddr - addr;
    139 			if(off+bsize > nbuf){
    140 				fprint(2, "%s: whoops! addr=0x%llux nbuf=%ud "
    141 					"addr+nbuf=0x%llux naddr=0x%llux\n",
    142 					argv0, addr, nbuf, addr+nbuf, naddr);
    143 				assert(off+bsize <= nbuf);
    144 			}
    145 			unpackibucket(&ib, buf+off, is->bucketmagic);
    146 			if(okibucket(&ib, is) < 0){
    147 				fprint(2, "%s: bad bucket XXX\n", argv0);
    148 				goto skipit;
    149 			}
    150 			trace(TraceProc, "icachewritesect add %V at 0x%llux",
    151 				ie->score, naddr);
    152 			h = bucklook(ie->score, ie->ia.type, ib.data, ib.n);
    153 			if(h & 1){
    154 				h ^= 1;
    155 				packientry(ie, &ib.data[h]);
    156 			}else if(ib.n < is->buckmax){
    157 				memmove(&ib.data[h + IEntrySize], &ib.data[h],
    158 					ib.n*IEntrySize - h);
    159 				ib.n++;
    160 				packientry(ie, &ib.data[h]);
    161 			}else{
    162 				fprint(2, "%s: bucket overflow XXX\n", argv0);
    163 skipit:
    164 				err = -1;
    165 				*l = ie->nextdirty;
    166 				ie = *l;
    167 				if(ie)
    168 					goto again;
    169 				else
    170 					break;
    171 			}
    172 			packibucket(&ib, buf+off, is->bucketmagic);
    173 		}
    174 
    175 		diskaccess(1);
    176 
    177 		trace(TraceProc, "icachewritesect writepart", addr, nbuf);
    178 		werr = 0;
    179 		if(writepart(is->part, addr, buf, nbuf) < 0 || flushpart(is->part) < 0)
    180 			werr = -1;
    181 
    182 		for(i=0; i<nbuf; i+=bsize){
    183 			if((b = _getdblock(is->part, addr+i, ORDWR, 0)) != nil){
    184 				memmove(b->data, buf+i, bsize);
    185 				putdblock(b);
    186 			}
    187 		}
    188 
    189 		if(werr < 0){
    190 			fprint(2, "%s: part %s addr 0x%llux: icachewritesect "
    191 				"writepart: %r\n", argv0, is->part->name, addr);
    192 			err = -1;
    193 			continue;
    194 		}
    195 
    196 		addstat(StatIsectWriteBytes, nbuf);
    197 		addstat(StatIsectWrite, 1);
    198 		icacheclean(chunk);
    199 	}
    200 
    201 	trace(TraceProc, "icachewritesect done");
    202 	return err;
    203 }
    204 
    205 static void
    206 icachewriteproc(void *v)
    207 {
    208 	int ret;
    209 	uint bsize;
    210 	ISect *is;
    211 	Index *ix;
    212 	u8int *buf;
    213 
    214 	ix = mainindex;
    215 	is = v;
    216 	threadsetname("icachewriteproc:%s", is->part->name);
    217 
    218 	bsize = 1<<is->blocklog;
    219 	buf = emalloc(Bufsize+bsize);
    220 	buf = (u8int*)(((uintptr)buf+bsize-1)&~(uintptr)(bsize-1));
    221 
    222 	for(;;){
    223 		trace(TraceProc, "icachewriteproc recv");
    224 		recv(is->writechan, 0);
    225 		trace(TraceWork, "start");
    226 		ret = icachewritesect(ix, is, buf);
    227 		trace(TraceProc, "icachewriteproc send");
    228 		trace(TraceWork, "finish");
    229 		sendul(is->writedonechan, ret);
    230 	}
    231 }
    232 
    233 static void
    234 icachewritecoord(void *v)
    235 {
    236 	int i, err;
    237 	Index *ix;
    238 	AState as;
    239 
    240 	USED(v);
    241 
    242 	threadsetname("icachewritecoord");
    243 
    244 	ix = mainindex;
    245 	iwrite.as = icachestate();
    246 
    247 	for(;;){
    248 		trace(TraceProc, "icachewritecoord sleep");
    249 		waitforkick(&iwrite.round);
    250 		trace(TraceWork, "start");
    251 		as = icachestate();
    252 		if(as.arena==iwrite.as.arena && as.aa==iwrite.as.aa){
    253 			/* will not be able to do anything more than last flush - kick disk */
    254 			trace(TraceProc, "icachewritecoord kick dcache");
    255 			kickdcache();
    256 			trace(TraceProc, "icachewritecoord kicked dcache");
    257 			goto SkipWork;	/* won't do anything; don't bother rewriting bloom filter */
    258 		}
    259 		iwrite.as = as;
    260 
    261 		trace(TraceProc, "icachewritecoord start flush");
    262 		if(iwrite.as.arena){
    263 			for(i=0; i<ix->nsects; i++)
    264 				send(ix->sects[i]->writechan, 0);
    265 			if(ix->bloom)
    266 				send(ix->bloom->writechan, 0);
    267 
    268 			err = 0;
    269 			for(i=0; i<ix->nsects; i++)
    270 				err |= recvul(ix->sects[i]->writedonechan);
    271 			if(ix->bloom)
    272 				err |= recvul(ix->bloom->writedonechan);
    273 
    274 			trace(TraceProc, "icachewritecoord donewrite err=%d", err);
    275 			if(err == 0){
    276 				setatailstate(&iwrite.as);
    277 			}
    278 		}
    279 	SkipWork:
    280 		icacheclean(nil);	/* wake up anyone waiting */
    281 		trace(TraceWork, "finish");
    282 		addstat(StatIcacheFlush, 1);
    283 	}
    284 }
    285 
    286 void
    287 flushicache(void)
    288 {
    289 	trace(TraceProc, "flushicache enter");
    290 	kickround(&iwrite.round, 1);
    291 	trace(TraceProc, "flushicache exit");
    292 }
    293 
    294 void
    295 kickicache(void)
    296 {
    297 	kickround(&iwrite.round, 0);
    298 }
    299 
    300 void
    301 delaykickicache(void)
    302 {
    303 	delaykickround(&iwrite.round);
    304 }
    305 
    306 static IEntry*
    307 iesort(IEntry *ie)
    308 {
    309 	int cmp;
    310 	IEntry **l;
    311 	IEntry *ie1, *ie2, *sorted;
    312 
    313 	if(ie == nil || ie->nextdirty == nil)
    314 		return ie;
    315 
    316 	/* split the lists */
    317 	ie1 = ie;
    318 	ie2 = ie;
    319 	if(ie2)
    320 		ie2 = ie2->nextdirty;
    321 	if(ie2)
    322 		ie2 = ie2->nextdirty;
    323 	while(ie1 && ie2){
    324 		ie1 = ie1->nextdirty;
    325 		ie2 = ie2->nextdirty;
    326 		if(ie2)
    327 			ie2 = ie2->nextdirty;
    328 	}
    329 	if(ie1){
    330 		ie2 = ie1->nextdirty;
    331 		ie1->nextdirty = nil;
    332 	}
    333 
    334 	/* sort the lists */
    335 	ie1 = iesort(ie);
    336 	ie2 = iesort(ie2);
    337 
    338 	/* merge the lists */
    339 	sorted = nil;
    340 	l = &sorted;
    341 	cmp = 0;
    342 	while(ie1 || ie2){
    343 		if(ie1 && ie2)
    344 			cmp = scorecmp(ie1->score, ie2->score);
    345 		if(ie1==nil || (ie2 && cmp > 0)){
    346 			*l = ie2;
    347 			l = &ie2->nextdirty;
    348 			ie2 = ie2->nextdirty;
    349 		}else{
    350 			*l = ie1;
    351 			l = &ie1->nextdirty;
    352 			ie1 = ie1->nextdirty;
    353 		}
    354 	}
    355 	*l = nil;
    356 	return sorted;
    357 }