buildindex.c (22561B)
1 /* 2 * Rebuild the index from scratch, in place. 3 */ 4 #include "stdinc.h" 5 #include "dat.h" 6 #include "fns.h" 7 8 enum 9 { 10 MinBufSize = 64*1024, 11 MaxBufSize = 4*1024*1024, 12 }; 13 14 typedef struct IEntryBuf IEntryBuf; 15 struct IEntryBuf 16 { 17 IEntry ie[100]; 18 int nie; 19 }; 20 21 typedef struct ScoreBuf ScoreBuf; 22 struct ScoreBuf 23 { 24 uchar score[100][VtScoreSize]; 25 int nscore; 26 }; 27 28 int dumb; 29 int errors; 30 char **isect; 31 int nisect; 32 int bloom; 33 int zero; 34 35 u32int isectmem; 36 u64int totalbuckets; 37 u64int totalclumps; 38 Channel *arenadonechan; 39 Channel *isectdonechan; 40 Index *ix; 41 42 u64int arenaentries; 43 u64int skipentries; 44 u64int indexentries; 45 46 static int shouldprocess(ISect*); 47 static void isectproc(void*); 48 static void arenapartproc(void*); 49 50 void 51 usage(void) 52 { 53 fprint(2, "usage: buildindex [-bd] [-i isect]... [-M imem] venti.conf\n"); 54 threadexitsall("usage"); 55 } 56 57 void 58 threadmain(int argc, char *argv[]) 59 { 60 int fd, i, napart, nfinish, maxdisks; 61 u32int bcmem, imem; 62 Config conf; 63 Part *p; 64 65 maxdisks = 100000; 66 ventifmtinstall(); 67 imem = 256*1024*1024; 68 ARGBEGIN{ 69 case 'b': 70 bloom = 1; 71 break; 72 case 'd': /* debugging - make sure to run all 3 passes */ 73 dumb = 1; 74 break; 75 case 'i': 76 isect = vtrealloc(isect, (nisect+1)*sizeof(isect[0])); 77 isect[nisect++] = EARGF(usage()); 78 break; 79 case 'M': 80 imem = unittoull(EARGF(usage())); 81 break; 82 case 'm': /* temporary - might go away */ 83 maxdisks = atoi(EARGF(usage())); 84 break; 85 default: 86 usage(); 87 break; 88 }ARGEND 89 90 if(argc != 1) 91 usage(); 92 93 if(initventi(argv[0], &conf) < 0) 94 sysfatal("can't init venti: %r"); 95 ix = mainindex; 96 if(nisect == 0 && ix->bloom) 97 bloom = 1; 98 if(bloom && ix->bloom && resetbloom(ix->bloom) < 0) 99 sysfatal("loadbloom: %r"); 100 if(bloom && !ix->bloom) 101 sysfatal("-b specified but no bloom filter"); 102 if(!bloom) 103 ix->bloom = nil; 104 isectmem = imem/ix->nsects; 105 106 /* 107 * safety first - only need read access to arenas 108 */ 109 p = nil; 110 for(i=0; i<ix->narenas; i++){ 111 if(ix->arenas[i]->part != p){ 112 p = ix->arenas[i]->part; 113 if((fd = open(p->filename, OREAD)) < 0) 114 sysfatal("cannot reopen %s: %r", p->filename); 115 dup(fd, p->fd); 116 close(fd); 117 } 118 } 119 120 /* 121 * need a block for every arena 122 */ 123 bcmem = maxblocksize * (mainindex->narenas + 16); 124 if(0) fprint(2, "initialize %d bytes of disk block cache\n", bcmem); 125 initdcache(bcmem); 126 127 totalclumps = 0; 128 for(i=0; i<ix->narenas; i++) 129 totalclumps += ix->arenas[i]->diskstats.clumps; 130 131 totalbuckets = 0; 132 for(i=0; i<ix->nsects; i++) 133 totalbuckets += ix->sects[i]->blocks; 134 fprint(2, "%,lld clumps, %,lld buckets\n", totalclumps, totalbuckets); 135 136 /* start index procs */ 137 fprint(2, "%T read index\n"); 138 isectdonechan = chancreate(sizeof(void*), 1); 139 for(i=0; i<ix->nsects; i++){ 140 if(shouldprocess(ix->sects[i])){ 141 ix->sects[i]->writechan = chancreate(sizeof(IEntryBuf), 1); 142 vtproc(isectproc, ix->sects[i]); 143 } 144 } 145 146 for(i=0; i<nisect; i++) 147 if(isect[i]) 148 fprint(2, "warning: did not find index section %s\n", isect[i]); 149 150 /* start arena procs */ 151 p = nil; 152 napart = 0; 153 nfinish = 0; 154 arenadonechan = chancreate(sizeof(void*), 0); 155 for(i=0; i<ix->narenas; i++){ 156 if(ix->arenas[i]->part != p){ 157 p = ix->arenas[i]->part; 158 vtproc(arenapartproc, p); 159 if(++napart >= maxdisks){ 160 recvp(arenadonechan); 161 nfinish++; 162 } 163 } 164 } 165 166 /* wait for arena procs to finish */ 167 for(; nfinish<napart; nfinish++) 168 recvp(arenadonechan); 169 170 /* tell index procs to finish */ 171 for(i=0; i<ix->nsects; i++) 172 if(ix->sects[i]->writechan) 173 send(ix->sects[i]->writechan, nil); 174 175 /* wait for index procs to finish */ 176 for(i=0; i<ix->nsects; i++) 177 if(ix->sects[i]->writechan) 178 recvp(isectdonechan); 179 180 if(ix->bloom && writebloom(ix->bloom) < 0) 181 fprint(2, "writing bloom filter: %r\n"); 182 183 fprint(2, "%T done arenaentries=%,lld indexed=%,lld (nskip=%,lld)\n", 184 arenaentries, indexentries, skipentries); 185 threadexitsall(nil); 186 } 187 188 static int 189 shouldprocess(ISect *is) 190 { 191 int i; 192 193 if(nisect == 0) 194 return 1; 195 196 for(i=0; i<nisect; i++) 197 if(isect[i] && strcmp(isect[i], is->name) == 0){ 198 isect[i] = nil; 199 return 1; 200 } 201 return 0; 202 } 203 204 static void 205 add(u64int *a, u64int n) 206 { 207 static Lock l; 208 209 lock(&l); 210 *a += n; 211 unlock(&l); 212 } 213 214 /* 215 * Read through an arena partition and send each of its IEntries 216 * to the appropriate index section. When finished, send on 217 * arenadonechan. 218 */ 219 enum 220 { 221 ClumpChunks = 32*1024, 222 }; 223 static void 224 arenapartproc(void *v) 225 { 226 int i, j, n, nskip, x; 227 u32int clump; 228 u64int addr, tot; 229 Arena *a; 230 ClumpInfo *ci, *cis; 231 IEntry ie; 232 Part *p; 233 IEntryBuf *buf, *b; 234 uchar *score; 235 ScoreBuf sb; 236 237 p = v; 238 threadsetname("arenaproc %s", p->name); 239 buf = MKNZ(IEntryBuf, ix->nsects); 240 241 nskip = 0; 242 tot = 0; 243 sb.nscore = 0; 244 cis = MKN(ClumpInfo, ClumpChunks); 245 for(i=0; i<ix->narenas; i++){ 246 a = ix->arenas[i]; 247 if(a->part != p) 248 continue; 249 if(a->memstats.clumps) 250 fprint(2, "%T arena %s: %d entries\n", 251 a->name, a->memstats.clumps); 252 /* 253 * Running the loop backwards accesses the 254 * clump info blocks forwards, since they are 255 * stored in reverse order at the end of the arena. 256 * This speeds things slightly. 257 */ 258 addr = ix->amap[i].start + a->memstats.used; 259 for(clump=a->memstats.clumps; clump > 0; clump-=n){ 260 n = ClumpChunks; 261 if(n > clump) 262 n = clump; 263 if(readclumpinfos(a, clump-n, cis, n) != n){ 264 fprint(2, "%T arena %s: directory read: %r\n", a->name); 265 errors = 1; 266 break; 267 } 268 for(j=n-1; j>=0; j--){ 269 ci = &cis[j]; 270 ie.ia.type = ci->type; 271 ie.ia.size = ci->uncsize; 272 addr -= ci->size + ClumpSize; 273 ie.ia.addr = addr; 274 ie.ia.blocks = (ci->size + ClumpSize + (1<<ABlockLog)-1) >> ABlockLog; 275 scorecp(ie.score, ci->score); 276 if(ci->type == VtCorruptType) 277 nskip++; 278 else{ 279 tot++; 280 x = indexsect(ix, ie.score); 281 assert(0 <= x && x < ix->nsects); 282 if(ix->sects[x]->writechan) { 283 b = &buf[x]; 284 b->ie[b->nie] = ie; 285 b->nie++; 286 if(b->nie == nelem(b->ie)) { 287 send(ix->sects[x]->writechan, b); 288 b->nie = 0; 289 } 290 } 291 if(ix->bloom) { 292 score = sb.score[sb.nscore++]; 293 scorecp(score, ie.score); 294 if(sb.nscore == nelem(sb.score)) { 295 markbloomfiltern(ix->bloom, sb.score, sb.nscore); 296 sb.nscore = 0; 297 } 298 } 299 } 300 } 301 } 302 if(addr != ix->amap[i].start) 303 fprint(2, "%T arena %s: clump miscalculation %lld != %lld\n", a->name, addr, ix->amap[i].start); 304 } 305 add(&arenaentries, tot); 306 add(&skipentries, nskip); 307 308 for(i=0; i<ix->nsects; i++) 309 if(ix->sects[i]->writechan && buf[i].nie > 0) 310 send(ix->sects[i]->writechan, &buf[i]); 311 free(buf); 312 free(cis); 313 if(ix->bloom && sb.nscore > 0) 314 markbloomfiltern(ix->bloom, sb.score, sb.nscore); 315 sendp(arenadonechan, p); 316 } 317 318 /* 319 * Convert score into relative bucket number in isect. 320 * Can pass a packed ientry instead of score - score is first. 321 */ 322 static u32int 323 score2bucket(ISect *is, uchar *score) 324 { 325 u32int b; 326 327 b = hashbits(score, 32)/ix->div; 328 if(b < is->start || b >= is->stop){ 329 fprint(2, "score2bucket: score=%V div=%d b=%ud start=%ud stop=%ud\n", 330 score, ix->div, b, is->start, is->stop); 331 } 332 assert(is->start <= b && b < is->stop); 333 return b - is->start; 334 } 335 336 /* 337 * Convert offset in index section to bucket number. 338 */ 339 static u32int 340 offset2bucket(ISect *is, u64int offset) 341 { 342 u32int b; 343 344 assert(is->blockbase <= offset); 345 offset -= is->blockbase; 346 b = offset/is->blocksize; 347 assert(b < is->stop-is->start); 348 return b; 349 } 350 351 /* 352 * Convert bucket number to offset. 353 */ 354 static u64int 355 bucket2offset(ISect *is, u32int b) 356 { 357 assert(b <= is->stop-is->start); 358 return is->blockbase + (u64int)b*is->blocksize; 359 } 360 361 /* 362 * IEntry buffers to hold initial round of spraying. 363 */ 364 typedef struct Buf Buf; 365 struct Buf 366 { 367 Part *part; /* partition being written */ 368 uchar *bp; /* current block */ 369 uchar *ep; /* end of block */ 370 uchar *wp; /* write position in block */ 371 u64int boffset; /* start offset */ 372 u64int woffset; /* next write offset */ 373 u64int eoffset; /* end offset */ 374 u32int nentry; /* number of entries written */ 375 }; 376 377 static void 378 bflush(Buf *buf) 379 { 380 u32int bufsize; 381 382 if(buf->woffset >= buf->eoffset) 383 sysfatal("buf index chunk overflow - need bigger index"); 384 bufsize = buf->ep - buf->bp; 385 if(writepart(buf->part, buf->woffset, buf->bp, bufsize) < 0){ 386 fprint(2, "write %s: %r\n", buf->part->name); 387 errors = 1; 388 } 389 buf->woffset += bufsize; 390 memset(buf->bp, 0, bufsize); 391 buf->wp = buf->bp; 392 } 393 394 static void 395 bwrite(Buf *buf, IEntry *ie) 396 { 397 if(buf->wp+IEntrySize > buf->ep) 398 bflush(buf); 399 assert(buf->bp <= buf->wp && buf->wp < buf->ep); 400 packientry(ie, buf->wp); 401 buf->wp += IEntrySize; 402 assert(buf->bp <= buf->wp && buf->wp <= buf->ep); 403 buf->nentry++; 404 } 405 406 /* 407 * Minibuffer. In-memory data structure holds our place 408 * in the buffer but has no block data. We are writing and 409 * reading the minibuffers at the same time. (Careful!) 410 */ 411 typedef struct Minibuf Minibuf; 412 struct Minibuf 413 { 414 u64int boffset; /* start offset */ 415 u64int roffset; /* read offset */ 416 u64int woffset; /* write offset */ 417 u64int eoffset; /* end offset */ 418 u32int nentry; /* # entries left to read */ 419 u32int nwentry; /* # entries written */ 420 }; 421 422 /* 423 * Index entry pool. Used when trying to shuffle around 424 * the entries in a big buffer into the corresponding M minibuffers. 425 * Sized to hold M*EntriesPerBlock entries, so that there will always 426 * either be room in the pool for another block worth of entries 427 * or there will be an entire block worth of sorted entries to 428 * write out. 429 */ 430 typedef struct IEntryLink IEntryLink; 431 typedef struct IPool IPool; 432 433 struct IEntryLink 434 { 435 uchar ie[IEntrySize]; /* raw IEntry */ 436 IEntryLink *next; /* next in chain */ 437 }; 438 439 struct IPool 440 { 441 ISect *isect; 442 u32int buck0; /* first bucket in pool */ 443 u32int mbufbuckets; /* buckets per minibuf */ 444 IEntryLink *entry; /* all IEntryLinks */ 445 u32int nentry; /* # of IEntryLinks */ 446 IEntryLink *free; /* free list */ 447 u32int nfree; /* # on free list */ 448 Minibuf *mbuf; /* all minibufs */ 449 u32int nmbuf; /* # of minibufs */ 450 IEntryLink **mlist; /* lists for each minibuf */ 451 u32int *mcount; /* # on each mlist[i] */ 452 u32int bufsize; /* block buffer size */ 453 uchar *rbuf; /* read buffer */ 454 uchar *wbuf; /* write buffer */ 455 u32int epbuf; /* entries per block buffer */ 456 }; 457 458 /* 459 static int 460 countsokay(IPool *p) 461 { 462 int i; 463 u64int n; 464 465 n = 0; 466 for(i=0; i<p->nmbuf; i++) 467 n += p->mcount[i]; 468 n += p->nfree; 469 if(n != p->nentry){ 470 print("free %ud:", p->nfree); 471 for(i=0; i<p->nmbuf; i++) 472 print(" %ud", p->mcount[i]); 473 print(" = %lld nentry: %ud\n", n, p->nentry); 474 } 475 return n == p->nentry; 476 } 477 */ 478 479 static IPool* 480 mkipool(ISect *isect, Minibuf *mbuf, u32int nmbuf, 481 u32int mbufbuckets, u32int bufsize) 482 { 483 u32int i, nentry; 484 uchar *data; 485 IPool *p; 486 IEntryLink *l; 487 488 nentry = (nmbuf+1)*bufsize / IEntrySize; 489 p = ezmalloc(sizeof(IPool) 490 +nentry*sizeof(IEntry) 491 +nmbuf*sizeof(IEntryLink*) 492 +nmbuf*sizeof(u32int) 493 +3*bufsize); 494 495 p->isect = isect; 496 p->mbufbuckets = mbufbuckets; 497 p->bufsize = bufsize; 498 p->entry = (IEntryLink*)(p+1); 499 p->nentry = nentry; 500 p->mlist = (IEntryLink**)(p->entry+nentry); 501 p->mcount = (u32int*)(p->mlist+nmbuf); 502 p->nmbuf = nmbuf; 503 p->mbuf = mbuf; 504 data = (uchar*)(p->mcount+nmbuf); 505 data += bufsize - (uintptr)data%bufsize; 506 p->rbuf = data; 507 p->wbuf = data+bufsize; 508 p->epbuf = bufsize/IEntrySize; 509 510 for(i=0; i<p->nentry; i++){ 511 l = &p->entry[i]; 512 l->next = p->free; 513 p->free = l; 514 p->nfree++; 515 } 516 return p; 517 } 518 519 /* 520 * Add the index entry ie to the pool p. 521 * Caller must know there is room. 522 */ 523 static void 524 ipoolinsert(IPool *p, uchar *ie) 525 { 526 u32int buck, x; 527 IEntryLink *l; 528 529 assert(p->free != nil); 530 531 buck = score2bucket(p->isect, ie); 532 x = (buck-p->buck0) / p->mbufbuckets; 533 if(x >= p->nmbuf){ 534 fprint(2, "buck=%ud mbufbucket=%ud x=%ud\n", 535 buck, p->mbufbuckets, x); 536 } 537 assert(x < p->nmbuf); 538 539 l = p->free; 540 p->free = l->next; 541 p->nfree--; 542 memmove(l->ie, ie, IEntrySize); 543 l->next = p->mlist[x]; 544 p->mlist[x] = l; 545 p->mcount[x]++; 546 } 547 548 /* 549 * Pull out a block containing as many 550 * entries as possible for minibuffer x. 551 */ 552 static u32int 553 ipoolgetbuf(IPool *p, u32int x) 554 { 555 uchar *bp, *ep, *wp; 556 IEntryLink *l; 557 u32int n; 558 559 bp = p->wbuf; 560 ep = p->wbuf + p->bufsize; 561 n = 0; 562 assert(x < p->nmbuf); 563 for(wp=bp; wp+IEntrySize<=ep && p->mlist[x]; wp+=IEntrySize){ 564 l = p->mlist[x]; 565 p->mlist[x] = l->next; 566 p->mcount[x]--; 567 memmove(wp, l->ie, IEntrySize); 568 l->next = p->free; 569 p->free = l; 570 p->nfree++; 571 n++; 572 } 573 memset(wp, 0, ep-wp); 574 return n; 575 } 576 577 /* 578 * Read a block worth of entries from the minibuf 579 * into the pool. Caller must know there is room. 580 */ 581 static void 582 ipoolloadblock(IPool *p, Minibuf *mb) 583 { 584 u32int i, n; 585 586 assert(mb->nentry > 0); 587 assert(mb->roffset >= mb->woffset); 588 assert(mb->roffset < mb->eoffset); 589 590 n = p->bufsize/IEntrySize; 591 if(n > mb->nentry) 592 n = mb->nentry; 593 if(readpart(p->isect->part, mb->roffset, p->rbuf, p->bufsize) < 0) 594 fprint(2, "readpart %s: %r\n", p->isect->part->name); 595 else{ 596 for(i=0; i<n; i++) 597 ipoolinsert(p, p->rbuf+i*IEntrySize); 598 } 599 mb->nentry -= n; 600 mb->roffset += p->bufsize; 601 } 602 603 /* 604 * Write out a block worth of entries to minibuffer x. 605 * If necessary, pick up the data there before overwriting it. 606 */ 607 static void 608 ipoolflush0(IPool *pool, u32int x) 609 { 610 u32int bufsize; 611 Minibuf *mb; 612 613 mb = pool->mbuf+x; 614 bufsize = pool->bufsize; 615 mb->nwentry += ipoolgetbuf(pool, x); 616 if(mb->nentry > 0 && mb->roffset == mb->woffset){ 617 assert(pool->nfree >= pool->bufsize/IEntrySize); 618 /* 619 * There will be room in the pool -- we just 620 * removed a block worth. 621 */ 622 ipoolloadblock(pool, mb); 623 } 624 if(writepart(pool->isect->part, mb->woffset, pool->wbuf, bufsize) < 0) 625 fprint(2, "writepart %s: %r\n", pool->isect->part->name); 626 mb->woffset += bufsize; 627 } 628 629 /* 630 * Write out some full block of entries. 631 * (There must be one -- the pool is almost full!) 632 */ 633 static void 634 ipoolflush1(IPool *pool) 635 { 636 u32int i; 637 638 assert(pool->nfree <= pool->epbuf); 639 640 for(i=0; i<pool->nmbuf; i++){ 641 if(pool->mcount[i] >= pool->epbuf){ 642 ipoolflush0(pool, i); 643 return; 644 } 645 } 646 /* can't be reached - someone must be full */ 647 sysfatal("ipoolflush1"); 648 } 649 650 /* 651 * Flush all the entries in the pool out to disk. 652 * Nothing more to read from disk. 653 */ 654 static void 655 ipoolflush(IPool *pool) 656 { 657 u32int i; 658 659 for(i=0; i<pool->nmbuf; i++) 660 while(pool->mlist[i]) 661 ipoolflush0(pool, i); 662 assert(pool->nfree == pool->nentry); 663 } 664 665 /* 666 * Third pass. Pick up each minibuffer from disk into 667 * memory and then write out the buckets. 668 */ 669 670 /* 671 * Compare two packed index entries. 672 * Usual ordering except break ties by putting higher 673 * index addresses first (assumes have duplicates 674 * due to corruption in the lower addresses). 675 */ 676 static int 677 ientrycmpaddr(const void *va, const void *vb) 678 { 679 int i; 680 uchar *a, *b; 681 682 a = (uchar*)va; 683 b = (uchar*)vb; 684 i = ientrycmp(a, b); 685 if(i) 686 return i; 687 return -memcmp(a+IEntryAddrOff, b+IEntryAddrOff, 8); 688 } 689 690 static void 691 zerorange(Part *p, u64int o, u64int e) 692 { 693 static uchar zero[MaxIoSize]; 694 u32int n; 695 696 for(; o<e; o+=n){ 697 n = sizeof zero; 698 if(o+n > e) 699 n = e-o; 700 if(writepart(p, o, zero, n) < 0) 701 fprint(2, "writepart %s: %r\n", p->name); 702 } 703 } 704 705 /* 706 * Load a minibuffer into memory and write out the 707 * corresponding buckets. 708 */ 709 static void 710 sortminibuffer(ISect *is, Minibuf *mb, uchar *buf, u32int nbuf, u32int bufsize) 711 { 712 uchar *buckdata, *p, *q, *ep; 713 u32int b, lastb, memsize, n; 714 u64int o; 715 IBucket ib; 716 Part *part; 717 718 part = is->part; 719 buckdata = emalloc(is->blocksize); 720 721 if(mb->nwentry == 0) 722 return; 723 724 /* 725 * read entire buffer. 726 */ 727 assert(mb->nwentry*IEntrySize <= mb->woffset-mb->boffset); 728 assert(mb->woffset-mb->boffset <= nbuf); 729 if(readpart(part, mb->boffset, buf, mb->woffset-mb->boffset) < 0){ 730 fprint(2, "readpart %s: %r\n", part->name); 731 errors = 1; 732 return; 733 } 734 assert(*(uint*)buf != 0xa5a5a5a5); 735 736 /* 737 * remove fragmentation due to IEntrySize 738 * not evenly dividing Bufsize 739 */ 740 memsize = (bufsize/IEntrySize)*IEntrySize; 741 for(o=mb->boffset, p=q=buf; o<mb->woffset; o+=bufsize){ 742 memmove(p, q, memsize); 743 p += memsize; 744 q += bufsize; 745 } 746 ep = buf + mb->nwentry*IEntrySize; 747 assert(ep <= buf+nbuf); 748 749 /* 750 * sort entries 751 */ 752 qsort(buf, mb->nwentry, IEntrySize, ientrycmpaddr); 753 754 /* 755 * write buckets out 756 */ 757 n = 0; 758 lastb = offset2bucket(is, mb->boffset); 759 for(p=buf; p<ep; p=q){ 760 b = score2bucket(is, p); 761 for(q=p; q<ep && score2bucket(is, q)==b; q+=IEntrySize) 762 ; 763 if(lastb+1 < b && zero) 764 zerorange(part, bucket2offset(is, lastb+1), bucket2offset(is, b)); 765 if(IBucketSize+(q-p) > is->blocksize) 766 sysfatal("bucket overflow - make index bigger"); 767 memmove(buckdata+IBucketSize, p, q-p); 768 ib.n = (q-p)/IEntrySize; 769 n += ib.n; 770 packibucket(&ib, buckdata, is->bucketmagic); 771 if(writepart(part, bucket2offset(is, b), buckdata, is->blocksize) < 0) 772 fprint(2, "write %s: %r\n", part->name); 773 lastb = b; 774 } 775 if(lastb+1 < is->stop-is->start && zero) 776 zerorange(part, bucket2offset(is, lastb+1), bucket2offset(is, is->stop - is->start)); 777 778 if(n != mb->nwentry) 779 fprint(2, "sortminibuffer bug: n=%ud nwentry=%ud have=%ld\n", n, mb->nwentry, (ep-buf)/IEntrySize); 780 781 free(buckdata); 782 } 783 784 static void 785 isectproc(void *v) 786 { 787 u32int buck, bufbuckets, bufsize, epbuf, i, j; 788 u32int mbufbuckets, n, nbucket, nn, space; 789 u32int nbuf, nminibuf, xminiclump, prod; 790 u64int blocksize, offset, xclump; 791 uchar *data, *p; 792 Buf *buf; 793 IEntry ie; 794 IEntryBuf ieb; 795 IPool *ipool; 796 ISect *is; 797 Minibuf *mbuf, *mb; 798 799 is = v; 800 blocksize = is->blocksize; 801 nbucket = is->stop - is->start; 802 803 /* 804 * Three passes: 805 * pass 1 - write index entries from arenas into 806 * large sequential sections on index disk. 807 * requires nbuf * bufsize memory. 808 * 809 * pass 2 - split each section into minibufs. 810 * requires nminibuf * bufsize memory. 811 * 812 * pass 3 - read each minibuf into memory and 813 * write buckets out. 814 * requires entries/minibuf * IEntrySize memory. 815 * 816 * The larger we set bufsize the less seeking hurts us. 817 * 818 * The fewer sections and minibufs we have, the less 819 * seeking hurts us. 820 * 821 * The fewer sections and minibufs we have, the 822 * more entries we end up with in each minibuf 823 * at the end. 824 * 825 * Shoot for using half our memory to hold each 826 * minibuf. The chance of a random distribution 827 * getting off by 2x is quite low. 828 * 829 * Once that is decided, figure out the smallest 830 * nminibuf and nsection/biggest bufsize we can use 831 * and still fit in the memory constraints. 832 */ 833 834 /* expected number of clump index entries we'll see */ 835 xclump = nbucket * (double)totalclumps/totalbuckets; 836 837 /* number of clumps we want to see in a minibuf */ 838 xminiclump = isectmem/2/IEntrySize; 839 840 /* total number of minibufs we need */ 841 prod = (xclump+xminiclump-1) / xminiclump; 842 843 /* if possible, skip second pass */ 844 if(!dumb && prod*MinBufSize < isectmem){ 845 nbuf = prod; 846 nminibuf = 1; 847 }else{ 848 /* otherwise use nsection = sqrt(nmini) */ 849 for(nbuf=1; nbuf*nbuf<prod; nbuf++) 850 ; 851 if(nbuf*MinBufSize > isectmem) 852 sysfatal("not enough memory"); 853 nminibuf = nbuf; 854 } 855 if (nbuf == 0) { 856 fprint(2, "%s: brand-new index, no work to do\n", argv0); 857 threadexitsall(nil); 858 } 859 860 /* size buffer to use extra memory */ 861 bufsize = MinBufSize; 862 while(bufsize*2*nbuf <= isectmem && bufsize < MaxBufSize) 863 bufsize *= 2; 864 data = emalloc(nbuf*bufsize); 865 epbuf = bufsize/IEntrySize; 866 fprint(2, "%T %s: %,ud buckets, %,ud groups, %,ud minigroups, %,ud buffer\n", 867 is->part->name, nbucket, nbuf, nminibuf, bufsize); 868 /* 869 * Accept index entries from arena procs. 870 */ 871 buf = MKNZ(Buf, nbuf); 872 p = data; 873 offset = is->blockbase; 874 bufbuckets = (nbucket+nbuf-1)/nbuf; 875 for(i=0; i<nbuf; i++){ 876 buf[i].part = is->part; 877 buf[i].bp = p; 878 buf[i].wp = p; 879 p += bufsize; 880 buf[i].ep = p; 881 buf[i].boffset = offset; 882 buf[i].woffset = offset; 883 if(i < nbuf-1){ 884 offset += bufbuckets*blocksize; 885 buf[i].eoffset = offset; 886 }else{ 887 offset = is->blockbase + nbucket*blocksize; 888 buf[i].eoffset = offset; 889 } 890 } 891 assert(p == data+nbuf*bufsize); 892 893 n = 0; 894 while(recv(is->writechan, &ieb) == 1){ 895 if(ieb.nie == 0) 896 break; 897 for(j=0; j<ieb.nie; j++){ 898 ie = ieb.ie[j]; 899 buck = score2bucket(is, ie.score); 900 i = buck/bufbuckets; 901 assert(i < nbuf); 902 bwrite(&buf[i], &ie); 903 n++; 904 } 905 } 906 add(&indexentries, n); 907 908 nn = 0; 909 for(i=0; i<nbuf; i++){ 910 bflush(&buf[i]); 911 buf[i].bp = nil; 912 buf[i].ep = nil; 913 buf[i].wp = nil; 914 nn += buf[i].nentry; 915 } 916 if(n != nn) 917 fprint(2, "isectproc bug: n=%ud nn=%ud\n", n, nn); 918 919 free(data); 920 921 fprint(2, "%T %s: reordering\n", is->part->name); 922 923 /* 924 * Rearrange entries into minibuffers and then 925 * split each minibuffer into buckets. 926 * The minibuffer must be sized so that it is 927 * a multiple of blocksize -- ipoolloadblock assumes 928 * that each minibuf starts aligned on a blocksize 929 * boundary. 930 */ 931 mbuf = MKN(Minibuf, nminibuf); 932 mbufbuckets = (bufbuckets+nminibuf-1)/nminibuf; 933 while(mbufbuckets*blocksize % bufsize) 934 mbufbuckets++; 935 for(i=0; i<nbuf; i++){ 936 /* 937 * Set up descriptors. 938 */ 939 n = buf[i].nentry; 940 nn = 0; 941 offset = buf[i].boffset; 942 memset(mbuf, 0, nminibuf*sizeof(mbuf[0])); 943 for(j=0; j<nminibuf; j++){ 944 mb = &mbuf[j]; 945 mb->boffset = offset; 946 offset += mbufbuckets*blocksize; 947 if(offset > buf[i].eoffset) 948 offset = buf[i].eoffset; 949 mb->eoffset = offset; 950 mb->roffset = mb->boffset; 951 mb->woffset = mb->boffset; 952 mb->nentry = epbuf * (mb->eoffset - mb->boffset)/bufsize; 953 if(mb->nentry > buf[i].nentry) 954 mb->nentry = buf[i].nentry; 955 buf[i].nentry -= mb->nentry; 956 nn += mb->nentry; 957 } 958 if(n != nn) 959 fprint(2, "isectproc bug2: n=%ud nn=%ud (i=%d)\n", n, nn, i);; 960 /* 961 * Rearrange. 962 */ 963 if(!dumb && nminibuf == 1){ 964 mbuf[0].nwentry = mbuf[0].nentry; 965 mbuf[0].woffset = buf[i].woffset; 966 }else{ 967 ipool = mkipool(is, mbuf, nminibuf, mbufbuckets, bufsize); 968 ipool->buck0 = bufbuckets*i; 969 for(j=0; j<nminibuf; j++){ 970 mb = &mbuf[j]; 971 while(mb->nentry > 0){ 972 if(ipool->nfree < epbuf){ 973 ipoolflush1(ipool); 974 /* ipoolflush1 might change mb->nentry */ 975 continue; 976 } 977 assert(ipool->nfree >= epbuf); 978 ipoolloadblock(ipool, mb); 979 } 980 } 981 ipoolflush(ipool); 982 nn = 0; 983 for(j=0; j<nminibuf; j++) 984 nn += mbuf[j].nwentry; 985 if(n != nn) 986 fprint(2, "isectproc bug3: n=%ud nn=%ud (i=%d)\n", n, nn, i); 987 free(ipool); 988 } 989 990 /* 991 * Make buckets. 992 */ 993 space = 0; 994 for(j=0; j<nminibuf; j++) 995 if(space < mbuf[j].woffset - mbuf[j].boffset) 996 space = mbuf[j].woffset - mbuf[j].boffset; 997 998 data = emalloc(space); 999 for(j=0; j<nminibuf; j++){ 1000 mb = &mbuf[j]; 1001 sortminibuffer(is, mb, data, space, bufsize); 1002 } 1003 free(data); 1004 } 1005 1006 sendp(isectdonechan, is); 1007 }