sortientry.c (8324B)
1 #include "stdinc.h" 2 #include "dat.h" 3 #include "fns.h" 4 #include <bio.h> 5 6 typedef struct IEBuck IEBuck; 7 typedef struct IEBucks IEBucks; 8 9 enum 10 { 11 ClumpChunks = 32*1024 12 }; 13 14 struct IEBuck 15 { 16 u32int head; /* head of chain of chunks on the disk */ 17 u32int used; /* usage of the last chunk */ 18 u64int total; /* total number of bytes in this bucket */ 19 u8int *buf; /* chunk of entries for this bucket */ 20 }; 21 22 struct IEBucks 23 { 24 Part *part; 25 u64int off; /* offset for writing data in the partition */ 26 u32int chunks; /* total chunks written to fd */ 27 u64int max; /* max bytes entered in any one bucket */ 28 int bits; /* number of bits in initial bucket sort */ 29 int nbucks; /* 1 << bits, the number of buckets */ 30 u32int size; /* bytes in each of the buckets chunks */ 31 u32int usable; /* amount usable for IEntry data */ 32 u8int *buf; /* buffer for all chunks */ 33 u8int *xbuf; 34 IEBuck *bucks; 35 }; 36 37 #define U32GET(p) (((p)[0]<<24)|((p)[1]<<16)|((p)[2]<<8)|(p)[3]) 38 #define U32PUT(p,v) (p)[0]=(v)>>24;(p)[1]=(v)>>16;(p)[2]=(v)>>8;(p)[3]=(v) 39 40 static IEBucks *initiebucks(Part *part, int bits, u32int size); 41 static int flushiebuck(IEBucks *ib, int b, int reset); 42 static int flushiebucks(IEBucks *ib); 43 static u32int sortiebuck(IEBucks *ib, int b); 44 static u64int sortiebucks(IEBucks *ib); 45 static int sprayientry(IEBucks *ib, IEntry *ie); 46 static u32int readarenainfo(IEBucks *ib, Arena *arena, u64int a, Bloom *b); 47 static u32int readiebuck(IEBucks *ib, int b); 48 static void freeiebucks(IEBucks *ib); 49 50 /* 51 * build a sorted file with all IEntries which should be in ix. 52 * assumes the arenas' directories are up to date. 53 * reads each, converts the entries to index entries, 54 * and sorts them. 55 */ 56 u64int 57 sortrawientries(Index *ix, Part *tmp, u64int *base, Bloom *bloom) 58 { 59 IEBucks *ib; 60 u64int clumps, sorted; 61 u32int n; 62 int i, ok; 63 64 /* ZZZ should allow configuration of bits, bucket size */ 65 ib = initiebucks(tmp, 8, 64*1024); 66 if(ib == nil){ 67 seterr(EOk, "can't create sorting buckets: %r"); 68 return TWID64; 69 } 70 ok = 0; 71 clumps = 0; 72 fprint(2, "constructing entry list\n"); 73 for(i = 0; i < ix->narenas; i++){ 74 n = readarenainfo(ib, ix->arenas[i], ix->amap[i].start, bloom); 75 if(n == TWID32){ 76 ok = -1; 77 break; 78 } 79 clumps += n; 80 } 81 fprint(2, "sorting %lld entries\n", clumps); 82 if(ok == 0){ 83 sorted = sortiebucks(ib); 84 *base = (u64int)ib->chunks * ib->size; 85 if(sorted != clumps){ 86 fprint(2, "sorting messed up: clumps=%lld sorted=%lld\n", clumps, sorted); 87 ok = -1; 88 } 89 } 90 freeiebucks(ib); 91 if(ok < 0) 92 return TWID64; 93 return clumps; 94 } 95 96 #define CHECK(cis) if(((ulong*)cis)[-4] != 0xA110C09) xabort(); 97 98 void 99 xabort(void) 100 { 101 int *x; 102 103 x = 0; 104 *x = 0; 105 } 106 107 /* 108 * read in all of the arena's clump directory, 109 * convert to IEntry format, and bucket sort based 110 * on the first few bits. 111 */ 112 static u32int 113 readarenainfo(IEBucks *ib, Arena *arena, u64int a, Bloom *b) 114 { 115 IEntry ie; 116 ClumpInfo *ci, *cis; 117 u32int clump; 118 int i, n, ok, nskip; 119 120 if(arena->memstats.clumps) 121 fprint(2, "\tarena %s: %d entries\n", arena->name, arena->memstats.clumps); 122 else 123 fprint(2, "[%s] ", arena->name); 124 125 cis = MKN(ClumpInfo, ClumpChunks); 126 ok = 0; 127 nskip = 0; 128 memset(&ie, 0, sizeof(IEntry)); 129 for(clump = 0; clump < arena->memstats.clumps; clump += n){ 130 n = ClumpChunks; 131 if(n > arena->memstats.clumps - clump) 132 n = arena->memstats.clumps - clump; 133 if(readclumpinfos(arena, clump, cis, n) != n){ 134 seterr(EOk, "arena directory read failed: %r"); 135 ok = -1; 136 break; 137 } 138 139 for(i = 0; i < n; i++){ 140 ci = &cis[i]; 141 ie.ia.type = ci->type; 142 ie.ia.size = ci->uncsize; 143 ie.ia.addr = a; 144 a += ci->size + ClumpSize; 145 ie.ia.blocks = (ci->size + ClumpSize + (1 << ABlockLog) - 1) >> ABlockLog; 146 scorecp(ie.score, ci->score); 147 if(ci->type == VtCorruptType){ 148 if(0) print("! %V %22lld %3d %5d %3d\n", 149 ie.score, ie.ia.addr, ie.ia.type, ie.ia.size, ie.ia.blocks); 150 nskip++; 151 }else 152 sprayientry(ib, &ie); 153 markbloomfilter(b, ie.score); 154 } 155 } 156 free(cis); 157 if(ok < 0) 158 return TWID32; 159 return clump - nskip; 160 } 161 162 /* 163 * initialize the external bucket sorting data structures 164 */ 165 static IEBucks* 166 initiebucks(Part *part, int bits, u32int size) 167 { 168 IEBucks *ib; 169 int i; 170 171 ib = MKZ(IEBucks); 172 if(ib == nil){ 173 seterr(EOk, "out of memory"); 174 return nil; 175 } 176 ib->bits = bits; 177 ib->nbucks = 1 << bits; 178 ib->size = size; 179 ib->usable = (size - U32Size) / IEntrySize * IEntrySize; 180 ib->bucks = MKNZ(IEBuck, ib->nbucks); 181 if(ib->bucks == nil){ 182 seterr(EOk, "out of memory allocation sorting buckets"); 183 freeiebucks(ib); 184 return nil; 185 } 186 ib->xbuf = MKN(u8int, size * ((1 << bits)+1)); 187 ib->buf = (u8int*)(((uintptr)ib->xbuf+size-1)&~(uintptr)(size-1)); 188 if(ib->buf == nil){ 189 seterr(EOk, "out of memory allocating sorting buckets' buffers"); 190 freeiebucks(ib); 191 return nil; 192 } 193 for(i = 0; i < ib->nbucks; i++){ 194 ib->bucks[i].head = TWID32; 195 ib->bucks[i].buf = &ib->buf[i * size]; 196 } 197 ib->part = part; 198 return ib; 199 } 200 201 static void 202 freeiebucks(IEBucks *ib) 203 { 204 if(ib == nil) 205 return; 206 free(ib->bucks); 207 free(ib->buf); 208 free(ib); 209 } 210 211 /* 212 * initial sort: put the entry into the correct bucket 213 */ 214 static int 215 sprayientry(IEBucks *ib, IEntry *ie) 216 { 217 u32int n; 218 int b; 219 220 b = hashbits(ie->score, ib->bits); 221 n = ib->bucks[b].used; 222 if(n + IEntrySize > ib->usable){ 223 /* should be flushed below, but if flush fails, this can happen */ 224 seterr(EOk, "out of space in bucket"); 225 return -1; 226 } 227 packientry(ie, &ib->bucks[b].buf[n]); 228 n += IEntrySize; 229 ib->bucks[b].used = n; 230 if(n + IEntrySize <= ib->usable) 231 return 0; 232 return flushiebuck(ib, b, 1); 233 } 234 235 /* 236 * finish sorting: 237 * for each bucket, read it in and sort it 238 * write out the the final file 239 */ 240 static u64int 241 sortiebucks(IEBucks *ib) 242 { 243 u64int tot; 244 u32int n; 245 int i; 246 247 if(flushiebucks(ib) < 0) 248 return TWID64; 249 for(i = 0; i < ib->nbucks; i++) 250 ib->bucks[i].buf = nil; 251 ib->off = (u64int)ib->chunks * ib->size; 252 free(ib->xbuf); 253 254 ib->buf = MKN(u8int, ib->max + U32Size); 255 if(ib->buf == nil){ 256 seterr(EOk, "out of memory allocating final sorting buffer; try more buckets"); 257 return TWID64; 258 } 259 tot = 0; 260 for(i = 0; i < ib->nbucks; i++){ 261 n = sortiebuck(ib, i); 262 if(n == TWID32) 263 return TWID64; 264 if(n != ib->bucks[i].total/IEntrySize) 265 fprint(2, "bucket %d changed count %d => %d\n", 266 i, (int)(ib->bucks[i].total/IEntrySize), n); 267 tot += n; 268 } 269 return tot; 270 } 271 272 /* 273 * sort from bucket b of ib into the output file to 274 */ 275 static u32int 276 sortiebuck(IEBucks *ib, int b) 277 { 278 u32int n; 279 280 n = readiebuck(ib, b); 281 if(n == TWID32) 282 return TWID32; 283 qsort(ib->buf, n, IEntrySize, ientrycmp); 284 if(writepart(ib->part, ib->off, ib->buf, n*IEntrySize) < 0){ 285 seterr(EOk, "can't write sorted bucket: %r"); 286 return TWID32; 287 } 288 ib->off += n * IEntrySize; 289 return n; 290 } 291 292 /* 293 * write out a single bucket 294 */ 295 static int 296 flushiebuck(IEBucks *ib, int b, int reset) 297 { 298 u32int n; 299 300 if(ib->bucks[b].used == 0) 301 return 0; 302 n = ib->bucks[b].used; 303 U32PUT(&ib->bucks[b].buf[n], ib->bucks[b].head); 304 n += U32Size; 305 USED(n); 306 if(writepart(ib->part, (u64int)ib->chunks * ib->size, ib->bucks[b].buf, ib->size) < 0){ 307 seterr(EOk, "can't write sorting bucket to file: %r"); 308 xabort(); 309 return -1; 310 } 311 ib->bucks[b].head = ib->chunks++; 312 ib->bucks[b].total += ib->bucks[b].used; 313 if(reset) 314 ib->bucks[b].used = 0; 315 return 0; 316 } 317 318 /* 319 * write out all of the buckets, and compute 320 * the maximum size of any bucket 321 */ 322 static int 323 flushiebucks(IEBucks *ib) 324 { 325 int i; 326 327 for(i = 0; i < ib->nbucks; i++){ 328 if(flushiebuck(ib, i, 0) < 0) 329 return -1; 330 if(ib->bucks[i].total > ib->max) 331 ib->max = ib->bucks[i].total; 332 } 333 return 0; 334 } 335 336 /* 337 * read in the chained buffers for bucket b, 338 * and return it's total number of IEntries 339 */ 340 static u32int 341 readiebuck(IEBucks *ib, int b) 342 { 343 u32int head, m, n; 344 345 head = ib->bucks[b].head; 346 n = 0; 347 m = ib->bucks[b].used; 348 if(m == 0) 349 m = ib->usable; 350 if(0) if(ib->bucks[b].total) 351 fprint(2, "\tbucket %d: %lld entries\n", b, ib->bucks[b].total/IEntrySize); 352 while(head != TWID32){ 353 if(readpart(ib->part, (u64int)head * ib->size, &ib->buf[n], m+U32Size) < 0){ 354 seterr(EOk, "can't read index sort bucket: %r"); 355 return TWID32; 356 } 357 n += m; 358 head = U32GET(&ib->buf[n]); 359 m = ib->usable; 360 } 361 if(n != ib->bucks[b].total) 362 fprint(2, "\tbucket %d: expected %d entries, got %d\n", 363 b, (int)ib->bucks[b].total/IEntrySize, n/IEntrySize); 364 return n / IEntrySize; 365 }