plan9port

fork of plan9port with libvec, libstr and libsdb
Log | Files | Refs | README | LICENSE

buildindex.c (22561B)


      1 /*
      2  * Rebuild the index from scratch, in place.
      3  */
      4 #include "stdinc.h"
      5 #include "dat.h"
      6 #include "fns.h"
      7 
      8 enum
      9 {
     10 	MinBufSize = 64*1024,
     11 	MaxBufSize = 4*1024*1024,
     12 };
     13 
     14 typedef struct IEntryBuf IEntryBuf;
     15 struct IEntryBuf
     16 {
     17 	IEntry ie[100];
     18 	int nie;
     19 };
     20 
     21 typedef struct ScoreBuf ScoreBuf;
     22 struct ScoreBuf
     23 {
     24 	uchar score[100][VtScoreSize];
     25 	int nscore;
     26 };
     27 
     28 int		dumb;
     29 int		errors;
     30 char		**isect;
     31 int		nisect;
     32 int		bloom;
     33 int		zero;
     34 
     35 u32int	isectmem;
     36 u64int	totalbuckets;
     37 u64int	totalclumps;
     38 Channel	*arenadonechan;
     39 Channel	*isectdonechan;
     40 Index	*ix;
     41 
     42 u64int	arenaentries;
     43 u64int	skipentries;
     44 u64int	indexentries;
     45 
     46 static int shouldprocess(ISect*);
     47 static void	isectproc(void*);
     48 static void	arenapartproc(void*);
     49 
     50 void
     51 usage(void)
     52 {
     53 	fprint(2, "usage: buildindex [-bd] [-i isect]... [-M imem] venti.conf\n");
     54 	threadexitsall("usage");
     55 }
     56 
     57 void
     58 threadmain(int argc, char *argv[])
     59 {
     60 	int fd, i, napart, nfinish, maxdisks;
     61 	u32int bcmem, imem;
     62 	Config conf;
     63 	Part *p;
     64 
     65 	maxdisks = 100000;
     66 	ventifmtinstall();
     67 	imem = 256*1024*1024;
     68 	ARGBEGIN{
     69 	case 'b':
     70 		bloom = 1;
     71 		break;
     72 	case 'd':	/* debugging - make sure to run all 3 passes */
     73 		dumb = 1;
     74 		break;
     75 	case 'i':
     76 		isect = vtrealloc(isect, (nisect+1)*sizeof(isect[0]));
     77 		isect[nisect++] = EARGF(usage());
     78 		break;
     79 	case 'M':
     80 		imem = unittoull(EARGF(usage()));
     81 		break;
     82 	case 'm':	/* temporary - might go away */
     83 		maxdisks = atoi(EARGF(usage()));
     84 		break;
     85 	default:
     86 		usage();
     87 		break;
     88 	}ARGEND
     89 
     90 	if(argc != 1)
     91 		usage();
     92 
     93 	if(initventi(argv[0], &conf) < 0)
     94 		sysfatal("can't init venti: %r");
     95 	ix = mainindex;
     96 	if(nisect == 0 && ix->bloom)
     97 		bloom = 1;
     98 	if(bloom && ix->bloom && resetbloom(ix->bloom) < 0)
     99 		sysfatal("loadbloom: %r");
    100 	if(bloom && !ix->bloom)
    101 		sysfatal("-b specified but no bloom filter");
    102 	if(!bloom)
    103 		ix->bloom = nil;
    104 	isectmem = imem/ix->nsects;
    105 
    106 	/*
    107 	 * safety first - only need read access to arenas
    108 	 */
    109 	p = nil;
    110 	for(i=0; i<ix->narenas; i++){
    111 		if(ix->arenas[i]->part != p){
    112 			p = ix->arenas[i]->part;
    113 			if((fd = open(p->filename, OREAD)) < 0)
    114 				sysfatal("cannot reopen %s: %r", p->filename);
    115 			dup(fd, p->fd);
    116 			close(fd);
    117 		}
    118 	}
    119 
    120 	/*
    121 	 * need a block for every arena
    122 	 */
    123 	bcmem = maxblocksize * (mainindex->narenas + 16);
    124 	if(0) fprint(2, "initialize %d bytes of disk block cache\n", bcmem);
    125 	initdcache(bcmem);
    126 
    127 	totalclumps = 0;
    128 	for(i=0; i<ix->narenas; i++)
    129 		totalclumps += ix->arenas[i]->diskstats.clumps;
    130 
    131 	totalbuckets = 0;
    132 	for(i=0; i<ix->nsects; i++)
    133 		totalbuckets += ix->sects[i]->blocks;
    134 	fprint(2, "%,lld clumps, %,lld buckets\n", totalclumps, totalbuckets);
    135 
    136 	/* start index procs */
    137 	fprint(2, "%T read index\n");
    138 	isectdonechan = chancreate(sizeof(void*), 1);
    139 	for(i=0; i<ix->nsects; i++){
    140 		if(shouldprocess(ix->sects[i])){
    141 			ix->sects[i]->writechan = chancreate(sizeof(IEntryBuf), 1);
    142 			vtproc(isectproc, ix->sects[i]);
    143 		}
    144 	}
    145 
    146 	for(i=0; i<nisect; i++)
    147 		if(isect[i])
    148 			fprint(2, "warning: did not find index section %s\n", isect[i]);
    149 
    150 	/* start arena procs */
    151 	p = nil;
    152 	napart = 0;
    153 	nfinish = 0;
    154 	arenadonechan = chancreate(sizeof(void*), 0);
    155 	for(i=0; i<ix->narenas; i++){
    156 		if(ix->arenas[i]->part != p){
    157 			p = ix->arenas[i]->part;
    158 			vtproc(arenapartproc, p);
    159 			if(++napart >= maxdisks){
    160 				recvp(arenadonechan);
    161 				nfinish++;
    162 			}
    163 		}
    164 	}
    165 
    166 	/* wait for arena procs to finish */
    167 	for(; nfinish<napart; nfinish++)
    168 		recvp(arenadonechan);
    169 
    170 	/* tell index procs to finish */
    171 	for(i=0; i<ix->nsects; i++)
    172 		if(ix->sects[i]->writechan)
    173 			send(ix->sects[i]->writechan, nil);
    174 
    175 	/* wait for index procs to finish */
    176 	for(i=0; i<ix->nsects; i++)
    177 		if(ix->sects[i]->writechan)
    178 			recvp(isectdonechan);
    179 
    180 	if(ix->bloom && writebloom(ix->bloom) < 0)
    181 		fprint(2, "writing bloom filter: %r\n");
    182 
    183 	fprint(2, "%T done arenaentries=%,lld indexed=%,lld (nskip=%,lld)\n",
    184 		arenaentries, indexentries, skipentries);
    185 	threadexitsall(nil);
    186 }
    187 
    188 static int
    189 shouldprocess(ISect *is)
    190 {
    191 	int i;
    192 
    193 	if(nisect == 0)
    194 		return 1;
    195 
    196 	for(i=0; i<nisect; i++)
    197 		if(isect[i] && strcmp(isect[i], is->name) == 0){
    198 			isect[i] = nil;
    199 			return 1;
    200 		}
    201 	return 0;
    202 }
    203 
    204 static void
    205 add(u64int *a, u64int n)
    206 {
    207 	static Lock l;
    208 
    209 	lock(&l);
    210 	*a += n;
    211 	unlock(&l);
    212 }
    213 
    214 /*
    215  * Read through an arena partition and send each of its IEntries
    216  * to the appropriate index section.  When finished, send on
    217  * arenadonechan.
    218  */
    219 enum
    220 {
    221 	ClumpChunks = 32*1024,
    222 };
    223 static void
    224 arenapartproc(void *v)
    225 {
    226 	int i, j, n, nskip, x;
    227 	u32int clump;
    228 	u64int addr, tot;
    229 	Arena *a;
    230 	ClumpInfo *ci, *cis;
    231 	IEntry ie;
    232 	Part *p;
    233 	IEntryBuf *buf, *b;
    234 	uchar *score;
    235 	ScoreBuf sb;
    236 
    237 	p = v;
    238 	threadsetname("arenaproc %s", p->name);
    239 	buf = MKNZ(IEntryBuf, ix->nsects);
    240 
    241 	nskip = 0;
    242 	tot = 0;
    243 	sb.nscore = 0;
    244 	cis = MKN(ClumpInfo, ClumpChunks);
    245 	for(i=0; i<ix->narenas; i++){
    246 		a = ix->arenas[i];
    247 		if(a->part != p)
    248 			continue;
    249 		if(a->memstats.clumps)
    250 			fprint(2, "%T arena %s: %d entries\n",
    251 				a->name, a->memstats.clumps);
    252 		/*
    253 		 * Running the loop backwards accesses the
    254 		 * clump info blocks forwards, since they are
    255 		 * stored in reverse order at the end of the arena.
    256 		 * This speeds things slightly.
    257 		 */
    258 		addr = ix->amap[i].start + a->memstats.used;
    259 		for(clump=a->memstats.clumps; clump > 0; clump-=n){
    260 			n = ClumpChunks;
    261 			if(n > clump)
    262 				n = clump;
    263 			if(readclumpinfos(a, clump-n, cis, n) != n){
    264 				fprint(2, "%T arena %s: directory read: %r\n", a->name);
    265 				errors = 1;
    266 				break;
    267 			}
    268 			for(j=n-1; j>=0; j--){
    269 				ci = &cis[j];
    270 				ie.ia.type = ci->type;
    271 				ie.ia.size = ci->uncsize;
    272 				addr -= ci->size + ClumpSize;
    273 				ie.ia.addr = addr;
    274 				ie.ia.blocks = (ci->size + ClumpSize + (1<<ABlockLog)-1) >> ABlockLog;
    275 				scorecp(ie.score, ci->score);
    276 				if(ci->type == VtCorruptType)
    277 					nskip++;
    278 				else{
    279 					tot++;
    280 					x = indexsect(ix, ie.score);
    281 					assert(0 <= x && x < ix->nsects);
    282 					if(ix->sects[x]->writechan) {
    283 						b = &buf[x];
    284 						b->ie[b->nie] = ie;
    285 						b->nie++;
    286 						if(b->nie == nelem(b->ie)) {
    287 							send(ix->sects[x]->writechan, b);
    288 							b->nie = 0;
    289 						}
    290 					}
    291 					if(ix->bloom) {
    292 						score = sb.score[sb.nscore++];
    293 						scorecp(score, ie.score);
    294 						if(sb.nscore == nelem(sb.score)) {
    295 							markbloomfiltern(ix->bloom, sb.score, sb.nscore);
    296 							sb.nscore = 0;
    297 						}
    298 					}
    299 				}
    300 			}
    301 		}
    302 		if(addr != ix->amap[i].start)
    303 			fprint(2, "%T arena %s: clump miscalculation %lld != %lld\n", a->name, addr, ix->amap[i].start);
    304 	}
    305 	add(&arenaentries, tot);
    306 	add(&skipentries, nskip);
    307 
    308 	for(i=0; i<ix->nsects; i++)
    309 		if(ix->sects[i]->writechan && buf[i].nie > 0)
    310 			send(ix->sects[i]->writechan, &buf[i]);
    311 	free(buf);
    312 	free(cis);
    313 	if(ix->bloom && sb.nscore > 0)
    314 		markbloomfiltern(ix->bloom, sb.score, sb.nscore);
    315 	sendp(arenadonechan, p);
    316 }
    317 
    318 /*
    319  * Convert score into relative bucket number in isect.
    320  * Can pass a packed ientry instead of score - score is first.
    321  */
    322 static u32int
    323 score2bucket(ISect *is, uchar *score)
    324 {
    325 	u32int b;
    326 
    327 	b = hashbits(score, 32)/ix->div;
    328 	if(b < is->start || b >= is->stop){
    329 		fprint(2, "score2bucket: score=%V div=%d b=%ud start=%ud stop=%ud\n",
    330 			score, ix->div, b, is->start, is->stop);
    331 	}
    332 	assert(is->start <= b && b < is->stop);
    333 	return b - is->start;
    334 }
    335 
    336 /*
    337  * Convert offset in index section to bucket number.
    338  */
    339 static u32int
    340 offset2bucket(ISect *is, u64int offset)
    341 {
    342 	u32int b;
    343 
    344 	assert(is->blockbase <= offset);
    345 	offset -= is->blockbase;
    346 	b = offset/is->blocksize;
    347 	assert(b < is->stop-is->start);
    348 	return b;
    349 }
    350 
    351 /*
    352  * Convert bucket number to offset.
    353  */
    354 static u64int
    355 bucket2offset(ISect *is, u32int b)
    356 {
    357 	assert(b <= is->stop-is->start);
    358 	return is->blockbase + (u64int)b*is->blocksize;
    359 }
    360 
    361 /*
    362  * IEntry buffers to hold initial round of spraying.
    363  */
    364 typedef struct Buf Buf;
    365 struct Buf
    366 {
    367 	Part *part;			/* partition being written */
    368 	uchar *bp;		/* current block */
    369 	uchar *ep;		/* end of block */
    370 	uchar *wp;		/* write position in block */
    371 	u64int boffset;		/* start offset */
    372 	u64int woffset;		/* next write offset */
    373 	u64int eoffset;		/* end offset */
    374 	u32int nentry;		/* number of entries written */
    375 };
    376 
    377 static void
    378 bflush(Buf *buf)
    379 {
    380 	u32int bufsize;
    381 
    382 	if(buf->woffset >= buf->eoffset)
    383 		sysfatal("buf index chunk overflow - need bigger index");
    384 	bufsize = buf->ep - buf->bp;
    385 	if(writepart(buf->part, buf->woffset, buf->bp, bufsize) < 0){
    386 		fprint(2, "write %s: %r\n", buf->part->name);
    387 		errors = 1;
    388 	}
    389 	buf->woffset += bufsize;
    390 	memset(buf->bp, 0, bufsize);
    391 	buf->wp = buf->bp;
    392 }
    393 
    394 static void
    395 bwrite(Buf *buf, IEntry *ie)
    396 {
    397 	if(buf->wp+IEntrySize > buf->ep)
    398 		bflush(buf);
    399 	assert(buf->bp <= buf->wp && buf->wp < buf->ep);
    400 	packientry(ie, buf->wp);
    401 	buf->wp += IEntrySize;
    402 	assert(buf->bp <= buf->wp && buf->wp <= buf->ep);
    403 	buf->nentry++;
    404 }
    405 
    406 /*
    407  * Minibuffer.  In-memory data structure holds our place
    408  * in the buffer but has no block data.  We are writing and
    409  * reading the minibuffers at the same time.  (Careful!)
    410  */
    411 typedef struct Minibuf Minibuf;
    412 struct Minibuf
    413 {
    414 	u64int boffset;		/* start offset */
    415 	u64int roffset;		/* read offset */
    416 	u64int woffset;		/* write offset */
    417 	u64int eoffset;		/* end offset */
    418 	u32int nentry;		/* # entries left to read */
    419 	u32int nwentry;	/* # entries written */
    420 };
    421 
    422 /*
    423  * Index entry pool.  Used when trying to shuffle around
    424  * the entries in a big buffer into the corresponding M minibuffers.
    425  * Sized to hold M*EntriesPerBlock entries, so that there will always
    426  * either be room in the pool for another block worth of entries
    427  * or there will be an entire block worth of sorted entries to
    428  * write out.
    429  */
    430 typedef struct IEntryLink IEntryLink;
    431 typedef struct IPool IPool;
    432 
    433 struct IEntryLink
    434 {
    435 	uchar ie[IEntrySize];		/* raw IEntry */
    436 	IEntryLink *next;		/* next in chain */
    437 };
    438 
    439 struct IPool
    440 {
    441 	ISect *isect;
    442 	u32int buck0;			/* first bucket in pool */
    443 	u32int mbufbuckets;	/* buckets per minibuf */
    444 	IEntryLink *entry;		/* all IEntryLinks */
    445 	u32int nentry;			/* # of IEntryLinks */
    446 	IEntryLink *free;		/* free list */
    447 	u32int nfree;			/* # on free list */
    448 	Minibuf *mbuf;			/* all minibufs */
    449 	u32int nmbuf;			/* # of minibufs */
    450 	IEntryLink **mlist;		/* lists for each minibuf */
    451 	u32int *mcount;		/* # on each mlist[i] */
    452 	u32int bufsize;			/* block buffer size */
    453 	uchar *rbuf;			/* read buffer */
    454 	uchar *wbuf;			/* write buffer */
    455 	u32int epbuf;			/* entries per block buffer */
    456 };
    457 
    458 /*
    459 static int
    460 countsokay(IPool *p)
    461 {
    462 	int i;
    463 	u64int n;
    464 
    465 	n = 0;
    466 	for(i=0; i<p->nmbuf; i++)
    467 		n += p->mcount[i];
    468 	n += p->nfree;
    469 	if(n != p->nentry){
    470 		print("free %ud:", p->nfree);
    471 		for(i=0; i<p->nmbuf; i++)
    472 			print(" %ud", p->mcount[i]);
    473 		print(" = %lld nentry: %ud\n", n, p->nentry);
    474 	}
    475 	return n == p->nentry;
    476 }
    477 */
    478 
    479 static IPool*
    480 mkipool(ISect *isect, Minibuf *mbuf, u32int nmbuf,
    481 	u32int mbufbuckets, u32int bufsize)
    482 {
    483 	u32int i, nentry;
    484 	uchar *data;
    485 	IPool *p;
    486 	IEntryLink *l;
    487 
    488 	nentry = (nmbuf+1)*bufsize / IEntrySize;
    489 	p = ezmalloc(sizeof(IPool)
    490 		+nentry*sizeof(IEntry)
    491 		+nmbuf*sizeof(IEntryLink*)
    492 		+nmbuf*sizeof(u32int)
    493 		+3*bufsize);
    494 
    495 	p->isect = isect;
    496 	p->mbufbuckets = mbufbuckets;
    497 	p->bufsize = bufsize;
    498 	p->entry = (IEntryLink*)(p+1);
    499 	p->nentry = nentry;
    500 	p->mlist = (IEntryLink**)(p->entry+nentry);
    501 	p->mcount = (u32int*)(p->mlist+nmbuf);
    502 	p->nmbuf = nmbuf;
    503 	p->mbuf = mbuf;
    504 	data = (uchar*)(p->mcount+nmbuf);
    505 	data += bufsize - (uintptr)data%bufsize;
    506 	p->rbuf = data;
    507 	p->wbuf = data+bufsize;
    508 	p->epbuf = bufsize/IEntrySize;
    509 
    510 	for(i=0; i<p->nentry; i++){
    511 		l = &p->entry[i];
    512 		l->next = p->free;
    513 		p->free = l;
    514 		p->nfree++;
    515 	}
    516 	return p;
    517 }
    518 
    519 /*
    520  * Add the index entry ie to the pool p.
    521  * Caller must know there is room.
    522  */
    523 static void
    524 ipoolinsert(IPool *p, uchar *ie)
    525 {
    526 	u32int buck, x;
    527 	IEntryLink *l;
    528 
    529 	assert(p->free != nil);
    530 
    531 	buck = score2bucket(p->isect, ie);
    532 	x = (buck-p->buck0) / p->mbufbuckets;
    533 	if(x >= p->nmbuf){
    534 		fprint(2, "buck=%ud mbufbucket=%ud x=%ud\n",
    535 			buck, p->mbufbuckets, x);
    536 	}
    537 	assert(x < p->nmbuf);
    538 
    539 	l = p->free;
    540 	p->free = l->next;
    541 	p->nfree--;
    542 	memmove(l->ie, ie, IEntrySize);
    543 	l->next = p->mlist[x];
    544 	p->mlist[x] = l;
    545 	p->mcount[x]++;
    546 }
    547 
    548 /*
    549  * Pull out a block containing as many
    550  * entries as possible for minibuffer x.
    551  */
    552 static u32int
    553 ipoolgetbuf(IPool *p, u32int x)
    554 {
    555 	uchar *bp, *ep, *wp;
    556 	IEntryLink *l;
    557 	u32int n;
    558 
    559 	bp = p->wbuf;
    560 	ep = p->wbuf + p->bufsize;
    561 	n = 0;
    562 	assert(x < p->nmbuf);
    563 	for(wp=bp; wp+IEntrySize<=ep && p->mlist[x]; wp+=IEntrySize){
    564 		l = p->mlist[x];
    565 		p->mlist[x] = l->next;
    566 		p->mcount[x]--;
    567 		memmove(wp, l->ie, IEntrySize);
    568 		l->next = p->free;
    569 		p->free = l;
    570 		p->nfree++;
    571 		n++;
    572 	}
    573 	memset(wp, 0, ep-wp);
    574 	return n;
    575 }
    576 
    577 /*
    578  * Read a block worth of entries from the minibuf
    579  * into the pool.  Caller must know there is room.
    580  */
    581 static void
    582 ipoolloadblock(IPool *p, Minibuf *mb)
    583 {
    584 	u32int i, n;
    585 
    586 	assert(mb->nentry > 0);
    587 	assert(mb->roffset >= mb->woffset);
    588 	assert(mb->roffset < mb->eoffset);
    589 
    590 	n = p->bufsize/IEntrySize;
    591 	if(n > mb->nentry)
    592 		n = mb->nentry;
    593 	if(readpart(p->isect->part, mb->roffset, p->rbuf, p->bufsize) < 0)
    594 		fprint(2, "readpart %s: %r\n", p->isect->part->name);
    595 	else{
    596 		for(i=0; i<n; i++)
    597 			ipoolinsert(p, p->rbuf+i*IEntrySize);
    598 	}
    599 	mb->nentry -= n;
    600 	mb->roffset += p->bufsize;
    601 }
    602 
    603 /*
    604  * Write out a block worth of entries to minibuffer x.
    605  * If necessary, pick up the data there before overwriting it.
    606  */
    607 static void
    608 ipoolflush0(IPool *pool, u32int x)
    609 {
    610 	u32int bufsize;
    611 	Minibuf *mb;
    612 
    613 	mb = pool->mbuf+x;
    614 	bufsize = pool->bufsize;
    615 	mb->nwentry += ipoolgetbuf(pool, x);
    616 	if(mb->nentry > 0 && mb->roffset == mb->woffset){
    617 		assert(pool->nfree >= pool->bufsize/IEntrySize);
    618 		/*
    619 		 * There will be room in the pool -- we just
    620 		 * removed a block worth.
    621 		 */
    622 		ipoolloadblock(pool, mb);
    623 	}
    624 	if(writepart(pool->isect->part, mb->woffset, pool->wbuf, bufsize) < 0)
    625 		fprint(2, "writepart %s: %r\n", pool->isect->part->name);
    626 	mb->woffset += bufsize;
    627 }
    628 
    629 /*
    630  * Write out some full block of entries.
    631  * (There must be one -- the pool is almost full!)
    632  */
    633 static void
    634 ipoolflush1(IPool *pool)
    635 {
    636 	u32int i;
    637 
    638 	assert(pool->nfree <= pool->epbuf);
    639 
    640 	for(i=0; i<pool->nmbuf; i++){
    641 		if(pool->mcount[i] >= pool->epbuf){
    642 			ipoolflush0(pool, i);
    643 			return;
    644 		}
    645 	}
    646 	/* can't be reached - someone must be full */
    647 	sysfatal("ipoolflush1");
    648 }
    649 
    650 /*
    651  * Flush all the entries in the pool out to disk.
    652  * Nothing more to read from disk.
    653  */
    654 static void
    655 ipoolflush(IPool *pool)
    656 {
    657 	u32int i;
    658 
    659 	for(i=0; i<pool->nmbuf; i++)
    660 		while(pool->mlist[i])
    661 			ipoolflush0(pool, i);
    662 	assert(pool->nfree == pool->nentry);
    663 }
    664 
    665 /*
    666  * Third pass.  Pick up each minibuffer from disk into
    667  * memory and then write out the buckets.
    668  */
    669 
    670 /*
    671  * Compare two packed index entries.
    672  * Usual ordering except break ties by putting higher
    673  * index addresses first (assumes have duplicates
    674  * due to corruption in the lower addresses).
    675  */
    676 static int
    677 ientrycmpaddr(const void *va, const void *vb)
    678 {
    679 	int i;
    680 	uchar *a, *b;
    681 
    682 	a = (uchar*)va;
    683 	b = (uchar*)vb;
    684 	i = ientrycmp(a, b);
    685 	if(i)
    686 		return i;
    687 	return -memcmp(a+IEntryAddrOff, b+IEntryAddrOff, 8);
    688 }
    689 
    690 static void
    691 zerorange(Part *p, u64int o, u64int e)
    692 {
    693 	static uchar zero[MaxIoSize];
    694 	u32int n;
    695 
    696 	for(; o<e; o+=n){
    697 		n = sizeof zero;
    698 		if(o+n > e)
    699 			n = e-o;
    700 		if(writepart(p, o, zero, n) < 0)
    701 			fprint(2, "writepart %s: %r\n", p->name);
    702 	}
    703 }
    704 
    705 /*
    706  * Load a minibuffer into memory and write out the
    707  * corresponding buckets.
    708  */
    709 static void
    710 sortminibuffer(ISect *is, Minibuf *mb, uchar *buf, u32int nbuf, u32int bufsize)
    711 {
    712 	uchar *buckdata, *p, *q, *ep;
    713 	u32int b, lastb, memsize, n;
    714 	u64int o;
    715 	IBucket ib;
    716 	Part *part;
    717 
    718 	part = is->part;
    719 	buckdata = emalloc(is->blocksize);
    720 
    721 	if(mb->nwentry == 0)
    722 		return;
    723 
    724 	/*
    725 	 * read entire buffer.
    726 	 */
    727 	assert(mb->nwentry*IEntrySize <= mb->woffset-mb->boffset);
    728 	assert(mb->woffset-mb->boffset <= nbuf);
    729 	if(readpart(part, mb->boffset, buf, mb->woffset-mb->boffset) < 0){
    730 		fprint(2, "readpart %s: %r\n", part->name);
    731 		errors = 1;
    732 		return;
    733 	}
    734 	assert(*(uint*)buf != 0xa5a5a5a5);
    735 
    736 	/*
    737 	 * remove fragmentation due to IEntrySize
    738 	 * not evenly dividing Bufsize
    739 	 */
    740 	memsize = (bufsize/IEntrySize)*IEntrySize;
    741 	for(o=mb->boffset, p=q=buf; o<mb->woffset; o+=bufsize){
    742 		memmove(p, q, memsize);
    743 		p += memsize;
    744 		q += bufsize;
    745 	}
    746 	ep = buf + mb->nwentry*IEntrySize;
    747 	assert(ep <= buf+nbuf);
    748 
    749 	/*
    750 	 * sort entries
    751 	 */
    752 	qsort(buf, mb->nwentry, IEntrySize, ientrycmpaddr);
    753 
    754 	/*
    755 	 * write buckets out
    756 	 */
    757 	n = 0;
    758 	lastb = offset2bucket(is, mb->boffset);
    759 	for(p=buf; p<ep; p=q){
    760 		b = score2bucket(is, p);
    761 		for(q=p; q<ep && score2bucket(is, q)==b; q+=IEntrySize)
    762 			;
    763 		if(lastb+1 < b && zero)
    764 			zerorange(part, bucket2offset(is, lastb+1), bucket2offset(is, b));
    765 		if(IBucketSize+(q-p) > is->blocksize)
    766 			sysfatal("bucket overflow - make index bigger");
    767 		memmove(buckdata+IBucketSize, p, q-p);
    768 		ib.n = (q-p)/IEntrySize;
    769 		n += ib.n;
    770 		packibucket(&ib, buckdata, is->bucketmagic);
    771 		if(writepart(part, bucket2offset(is, b), buckdata, is->blocksize) < 0)
    772 			fprint(2, "write %s: %r\n", part->name);
    773 		lastb = b;
    774 	}
    775 	if(lastb+1 < is->stop-is->start && zero)
    776 		zerorange(part, bucket2offset(is, lastb+1), bucket2offset(is, is->stop - is->start));
    777 
    778 	if(n != mb->nwentry)
    779 		fprint(2, "sortminibuffer bug: n=%ud nwentry=%ud have=%ld\n", n, mb->nwentry, (ep-buf)/IEntrySize);
    780 
    781 	free(buckdata);
    782 }
    783 
    784 static void
    785 isectproc(void *v)
    786 {
    787 	u32int buck, bufbuckets, bufsize, epbuf, i, j;
    788 	u32int mbufbuckets, n, nbucket, nn, space;
    789 	u32int nbuf, nminibuf, xminiclump, prod;
    790 	u64int blocksize, offset, xclump;
    791 	uchar *data, *p;
    792 	Buf *buf;
    793 	IEntry ie;
    794 	IEntryBuf ieb;
    795 	IPool *ipool;
    796 	ISect *is;
    797 	Minibuf *mbuf, *mb;
    798 
    799 	is = v;
    800 	blocksize = is->blocksize;
    801 	nbucket = is->stop - is->start;
    802 
    803 	/*
    804 	 * Three passes:
    805 	 *	pass 1 - write index entries from arenas into
    806 	 *		large sequential sections on index disk.
    807 	 *		requires nbuf * bufsize memory.
    808 	 *
    809 	 *	pass 2 - split each section into minibufs.
    810 	 *		requires nminibuf * bufsize memory.
    811 	 *
    812 	 *	pass 3 - read each minibuf into memory and
    813 	 *		write buckets out.
    814 	 *		requires entries/minibuf * IEntrySize memory.
    815 	 *
    816 	 * The larger we set bufsize the less seeking hurts us.
    817 	 *
    818 	 * The fewer sections and minibufs we have, the less
    819 	 * seeking hurts us.
    820 	 *
    821 	 * The fewer sections and minibufs we have, the
    822 	 * more entries we end up with in each minibuf
    823 	 * at the end.
    824 	 *
    825 	 * Shoot for using half our memory to hold each
    826 	 * minibuf.  The chance of a random distribution
    827 	 * getting off by 2x is quite low.
    828 	 *
    829 	 * Once that is decided, figure out the smallest
    830 	 * nminibuf and nsection/biggest bufsize we can use
    831 	 * and still fit in the memory constraints.
    832 	 */
    833 
    834 	/* expected number of clump index entries we'll see */
    835 	xclump = nbucket * (double)totalclumps/totalbuckets;
    836 
    837 	/* number of clumps we want to see in a minibuf */
    838 	xminiclump = isectmem/2/IEntrySize;
    839 
    840 	/* total number of minibufs we need */
    841 	prod = (xclump+xminiclump-1) / xminiclump;
    842 
    843 	/* if possible, skip second pass */
    844 	if(!dumb && prod*MinBufSize < isectmem){
    845 		nbuf = prod;
    846 		nminibuf = 1;
    847 	}else{
    848 		/* otherwise use nsection = sqrt(nmini) */
    849 		for(nbuf=1; nbuf*nbuf<prod; nbuf++)
    850 			;
    851 		if(nbuf*MinBufSize > isectmem)
    852 			sysfatal("not enough memory");
    853 		nminibuf = nbuf;
    854 	}
    855 	if (nbuf == 0) {
    856 		fprint(2, "%s: brand-new index, no work to do\n", argv0);
    857 		threadexitsall(nil);
    858 	}
    859 
    860 	/* size buffer to use extra memory */
    861 	bufsize = MinBufSize;
    862 	while(bufsize*2*nbuf <= isectmem && bufsize < MaxBufSize)
    863 		bufsize *= 2;
    864 	data = emalloc(nbuf*bufsize);
    865 	epbuf = bufsize/IEntrySize;
    866 	fprint(2, "%T %s: %,ud buckets, %,ud groups, %,ud minigroups, %,ud buffer\n",
    867 		is->part->name, nbucket, nbuf, nminibuf, bufsize);
    868 	/*
    869 	 * Accept index entries from arena procs.
    870 	 */
    871 	buf = MKNZ(Buf, nbuf);
    872 	p = data;
    873 	offset = is->blockbase;
    874 	bufbuckets = (nbucket+nbuf-1)/nbuf;
    875 	for(i=0; i<nbuf; i++){
    876 		buf[i].part = is->part;
    877 		buf[i].bp = p;
    878 		buf[i].wp = p;
    879 		p += bufsize;
    880 		buf[i].ep = p;
    881 		buf[i].boffset = offset;
    882 		buf[i].woffset = offset;
    883 		if(i < nbuf-1){
    884 			offset += bufbuckets*blocksize;
    885 			buf[i].eoffset = offset;
    886 		}else{
    887 			offset = is->blockbase + nbucket*blocksize;
    888 			buf[i].eoffset = offset;
    889 		}
    890 	}
    891 	assert(p == data+nbuf*bufsize);
    892 
    893 	n = 0;
    894 	while(recv(is->writechan, &ieb) == 1){
    895 		if(ieb.nie == 0)
    896 			break;
    897 		for(j=0; j<ieb.nie; j++){
    898 			ie = ieb.ie[j];
    899 			buck = score2bucket(is, ie.score);
    900 			i = buck/bufbuckets;
    901 			assert(i < nbuf);
    902 			bwrite(&buf[i], &ie);
    903 			n++;
    904 		}
    905 	}
    906 	add(&indexentries, n);
    907 
    908 	nn = 0;
    909 	for(i=0; i<nbuf; i++){
    910 		bflush(&buf[i]);
    911 		buf[i].bp = nil;
    912 		buf[i].ep = nil;
    913 		buf[i].wp = nil;
    914 		nn += buf[i].nentry;
    915 	}
    916 	if(n != nn)
    917 		fprint(2, "isectproc bug: n=%ud nn=%ud\n", n, nn);
    918 
    919 	free(data);
    920 
    921 	fprint(2, "%T %s: reordering\n", is->part->name);
    922 
    923 	/*
    924 	 * Rearrange entries into minibuffers and then
    925 	 * split each minibuffer into buckets.
    926 	 * The minibuffer must be sized so that it is
    927 	 * a multiple of blocksize -- ipoolloadblock assumes
    928 	 * that each minibuf starts aligned on a blocksize
    929 	 * boundary.
    930 	 */
    931 	mbuf = MKN(Minibuf, nminibuf);
    932 	mbufbuckets = (bufbuckets+nminibuf-1)/nminibuf;
    933 	while(mbufbuckets*blocksize % bufsize)
    934 		mbufbuckets++;
    935 	for(i=0; i<nbuf; i++){
    936 		/*
    937 		 * Set up descriptors.
    938 		 */
    939 		n = buf[i].nentry;
    940 		nn = 0;
    941 		offset = buf[i].boffset;
    942 		memset(mbuf, 0, nminibuf*sizeof(mbuf[0]));
    943 		for(j=0; j<nminibuf; j++){
    944 			mb = &mbuf[j];
    945 			mb->boffset = offset;
    946 			offset += mbufbuckets*blocksize;
    947 			if(offset > buf[i].eoffset)
    948 				offset = buf[i].eoffset;
    949 			mb->eoffset = offset;
    950 			mb->roffset = mb->boffset;
    951 			mb->woffset = mb->boffset;
    952 			mb->nentry = epbuf * (mb->eoffset - mb->boffset)/bufsize;
    953 			if(mb->nentry > buf[i].nentry)
    954 				mb->nentry = buf[i].nentry;
    955 			buf[i].nentry -= mb->nentry;
    956 			nn += mb->nentry;
    957 		}
    958 		if(n != nn)
    959 			fprint(2, "isectproc bug2: n=%ud nn=%ud (i=%d)\n", n, nn, i);;
    960 		/*
    961 		 * Rearrange.
    962 		 */
    963 		if(!dumb && nminibuf == 1){
    964 			mbuf[0].nwentry = mbuf[0].nentry;
    965 			mbuf[0].woffset = buf[i].woffset;
    966 		}else{
    967 			ipool = mkipool(is, mbuf, nminibuf, mbufbuckets, bufsize);
    968 			ipool->buck0 = bufbuckets*i;
    969 			for(j=0; j<nminibuf; j++){
    970 				mb = &mbuf[j];
    971 				while(mb->nentry > 0){
    972 					if(ipool->nfree < epbuf){
    973 						ipoolflush1(ipool);
    974 						/* ipoolflush1 might change mb->nentry */
    975 						continue;
    976 					}
    977 					assert(ipool->nfree >= epbuf);
    978 					ipoolloadblock(ipool, mb);
    979 				}
    980 			}
    981 			ipoolflush(ipool);
    982 			nn = 0;
    983 			for(j=0; j<nminibuf; j++)
    984 				nn += mbuf[j].nwentry;
    985 			if(n != nn)
    986 				fprint(2, "isectproc bug3: n=%ud nn=%ud (i=%d)\n", n, nn, i);
    987 			free(ipool);
    988 		}
    989 
    990 		/*
    991 		 * Make buckets.
    992 		 */
    993 		space = 0;
    994 		for(j=0; j<nminibuf; j++)
    995 			if(space < mbuf[j].woffset - mbuf[j].boffset)
    996 				space = mbuf[j].woffset - mbuf[j].boffset;
    997 
    998 		data = emalloc(space);
    999 		for(j=0; j<nminibuf; j++){
   1000 			mb = &mbuf[j];
   1001 			sortminibuffer(is, mb, data, space, bufsize);
   1002 		}
   1003 		free(data);
   1004 	}
   1005 
   1006 	sendp(isectdonechan, is);
   1007 }