plan9port

fork of plan9port with libvec, libstr and libsdb
Log | Files | Refs | README | LICENSE

sortientry.c (8324B)


      1 #include "stdinc.h"
      2 #include "dat.h"
      3 #include "fns.h"
      4 #include <bio.h>
      5 
      6 typedef struct IEBuck	IEBuck;
      7 typedef struct IEBucks	IEBucks;
      8 
      9 enum
     10 {
     11 	ClumpChunks	= 32*1024
     12 };
     13 
     14 struct IEBuck
     15 {
     16 	u32int	head;		/* head of chain of chunks on the disk */
     17 	u32int	used;		/* usage of the last chunk */
     18 	u64int	total;		/* total number of bytes in this bucket */
     19 	u8int	*buf;		/* chunk of entries for this bucket */
     20 };
     21 
     22 struct IEBucks
     23 {
     24 	Part	*part;
     25 	u64int	off;		/* offset for writing data in the partition */
     26 	u32int	chunks;		/* total chunks written to fd */
     27 	u64int	max;		/* max bytes entered in any one bucket */
     28 	int	bits;		/* number of bits in initial bucket sort */
     29 	int	nbucks;		/* 1 << bits, the number of buckets */
     30 	u32int	size;		/* bytes in each of the buckets chunks */
     31 	u32int	usable;		/* amount usable for IEntry data */
     32 	u8int	*buf;		/* buffer for all chunks */
     33 	u8int	*xbuf;
     34 	IEBuck	*bucks;
     35 };
     36 
     37 #define	U32GET(p)	(((p)[0]<<24)|((p)[1]<<16)|((p)[2]<<8)|(p)[3])
     38 #define	U32PUT(p,v)	(p)[0]=(v)>>24;(p)[1]=(v)>>16;(p)[2]=(v)>>8;(p)[3]=(v)
     39 
     40 static IEBucks	*initiebucks(Part *part, int bits, u32int size);
     41 static int	flushiebuck(IEBucks *ib, int b, int reset);
     42 static int	flushiebucks(IEBucks *ib);
     43 static u32int	sortiebuck(IEBucks *ib, int b);
     44 static u64int	sortiebucks(IEBucks *ib);
     45 static int	sprayientry(IEBucks *ib, IEntry *ie);
     46 static u32int	readarenainfo(IEBucks *ib, Arena *arena, u64int a, Bloom *b);
     47 static u32int	readiebuck(IEBucks *ib, int b);
     48 static void	freeiebucks(IEBucks *ib);
     49 
     50 /*
     51  * build a sorted file with all IEntries which should be in ix.
     52  * assumes the arenas' directories are up to date.
     53  * reads each, converts the entries to index entries,
     54  * and sorts them.
     55  */
     56 u64int
     57 sortrawientries(Index *ix, Part *tmp, u64int *base, Bloom *bloom)
     58 {
     59 	IEBucks *ib;
     60 	u64int clumps, sorted;
     61 	u32int n;
     62 	int i, ok;
     63 
     64 /* ZZZ should allow configuration of bits, bucket size */
     65 	ib = initiebucks(tmp, 8, 64*1024);
     66 	if(ib == nil){
     67 		seterr(EOk, "can't create sorting buckets: %r");
     68 		return TWID64;
     69 	}
     70 	ok = 0;
     71 	clumps = 0;
     72 	fprint(2, "constructing entry list\n");
     73 	for(i = 0; i < ix->narenas; i++){
     74 		n = readarenainfo(ib, ix->arenas[i], ix->amap[i].start, bloom);
     75 		if(n == TWID32){
     76 			ok = -1;
     77 			break;
     78 		}
     79 		clumps += n;
     80 	}
     81 	fprint(2, "sorting %lld entries\n", clumps);
     82 	if(ok == 0){
     83 		sorted = sortiebucks(ib);
     84 		*base = (u64int)ib->chunks * ib->size;
     85 		if(sorted != clumps){
     86 			fprint(2, "sorting messed up: clumps=%lld sorted=%lld\n", clumps, sorted);
     87 			ok = -1;
     88 		}
     89 	}
     90 	freeiebucks(ib);
     91 	if(ok < 0)
     92 		return TWID64;
     93 	return clumps;
     94 }
     95 
     96 #define CHECK(cis)	if(((ulong*)cis)[-4] != 0xA110C09) xabort();
     97 
     98 void
     99 xabort(void)
    100 {
    101 	int *x;
    102 
    103 	x = 0;
    104 	*x = 0;
    105 }
    106 
    107 /*
    108  * read in all of the arena's clump directory,
    109  * convert to IEntry format, and bucket sort based
    110  * on the first few bits.
    111  */
    112 static u32int
    113 readarenainfo(IEBucks *ib, Arena *arena, u64int a, Bloom *b)
    114 {
    115 	IEntry ie;
    116 	ClumpInfo *ci, *cis;
    117 	u32int clump;
    118 	int i, n, ok, nskip;
    119 
    120 	if(arena->memstats.clumps)
    121 		fprint(2, "\tarena %s: %d entries\n", arena->name, arena->memstats.clumps);
    122 	else
    123 		fprint(2, "[%s] ", arena->name);
    124 
    125 	cis = MKN(ClumpInfo, ClumpChunks);
    126 	ok = 0;
    127 	nskip = 0;
    128 	memset(&ie, 0, sizeof(IEntry));
    129 	for(clump = 0; clump < arena->memstats.clumps; clump += n){
    130 		n = ClumpChunks;
    131 		if(n > arena->memstats.clumps - clump)
    132 			n = arena->memstats.clumps - clump;
    133 		if(readclumpinfos(arena, clump, cis, n) != n){
    134 			seterr(EOk, "arena directory read failed: %r");
    135 			ok = -1;
    136 			break;
    137 		}
    138 
    139 		for(i = 0; i < n; i++){
    140 			ci = &cis[i];
    141 			ie.ia.type = ci->type;
    142 			ie.ia.size = ci->uncsize;
    143 			ie.ia.addr = a;
    144 			a += ci->size + ClumpSize;
    145 			ie.ia.blocks = (ci->size + ClumpSize + (1 << ABlockLog) - 1) >> ABlockLog;
    146 			scorecp(ie.score, ci->score);
    147 			if(ci->type == VtCorruptType){
    148 				if(0) print("! %V %22lld %3d %5d %3d\n",
    149 					ie.score, ie.ia.addr, ie.ia.type, ie.ia.size, ie.ia.blocks);
    150 				nskip++;
    151 			}else
    152 				sprayientry(ib, &ie);
    153 			markbloomfilter(b, ie.score);
    154 		}
    155 	}
    156 	free(cis);
    157 	if(ok < 0)
    158 		return TWID32;
    159 	return clump - nskip;
    160 }
    161 
    162 /*
    163  * initialize the external bucket sorting data structures
    164  */
    165 static IEBucks*
    166 initiebucks(Part *part, int bits, u32int size)
    167 {
    168 	IEBucks *ib;
    169 	int i;
    170 
    171 	ib = MKZ(IEBucks);
    172 	if(ib == nil){
    173 		seterr(EOk, "out of memory");
    174 		return nil;
    175 	}
    176 	ib->bits = bits;
    177 	ib->nbucks = 1 << bits;
    178 	ib->size = size;
    179 	ib->usable = (size - U32Size) / IEntrySize * IEntrySize;
    180 	ib->bucks = MKNZ(IEBuck, ib->nbucks);
    181 	if(ib->bucks == nil){
    182 		seterr(EOk, "out of memory allocation sorting buckets");
    183 		freeiebucks(ib);
    184 		return nil;
    185 	}
    186 	ib->xbuf = MKN(u8int, size * ((1 << bits)+1));
    187 	ib->buf = (u8int*)(((uintptr)ib->xbuf+size-1)&~(uintptr)(size-1));
    188 	if(ib->buf == nil){
    189 		seterr(EOk, "out of memory allocating sorting buckets' buffers");
    190 		freeiebucks(ib);
    191 		return nil;
    192 	}
    193 	for(i = 0; i < ib->nbucks; i++){
    194 		ib->bucks[i].head = TWID32;
    195 		ib->bucks[i].buf = &ib->buf[i * size];
    196 	}
    197 	ib->part = part;
    198 	return ib;
    199 }
    200 
    201 static void
    202 freeiebucks(IEBucks *ib)
    203 {
    204 	if(ib == nil)
    205 		return;
    206 	free(ib->bucks);
    207 	free(ib->buf);
    208 	free(ib);
    209 }
    210 
    211 /*
    212  * initial sort: put the entry into the correct bucket
    213  */
    214 static int
    215 sprayientry(IEBucks *ib, IEntry *ie)
    216 {
    217 	u32int n;
    218 	int b;
    219 
    220 	b = hashbits(ie->score, ib->bits);
    221 	n = ib->bucks[b].used;
    222 	if(n + IEntrySize > ib->usable){
    223 		/* should be flushed below, but if flush fails, this can happen */
    224 		seterr(EOk, "out of space in bucket");
    225 		return -1;
    226 	}
    227 	packientry(ie, &ib->bucks[b].buf[n]);
    228 	n += IEntrySize;
    229 	ib->bucks[b].used = n;
    230 	if(n + IEntrySize <= ib->usable)
    231 		return 0;
    232 	return flushiebuck(ib, b, 1);
    233 }
    234 
    235 /*
    236  * finish sorting:
    237  * for each bucket, read it in and sort it
    238  * write out the the final file
    239  */
    240 static u64int
    241 sortiebucks(IEBucks *ib)
    242 {
    243 	u64int tot;
    244 	u32int n;
    245 	int i;
    246 
    247 	if(flushiebucks(ib) < 0)
    248 		return TWID64;
    249 	for(i = 0; i < ib->nbucks; i++)
    250 		ib->bucks[i].buf = nil;
    251 	ib->off = (u64int)ib->chunks * ib->size;
    252 	free(ib->xbuf);
    253 
    254 	ib->buf = MKN(u8int, ib->max + U32Size);
    255 	if(ib->buf == nil){
    256 		seterr(EOk, "out of memory allocating final sorting buffer; try more buckets");
    257 		return TWID64;
    258 	}
    259 	tot = 0;
    260 	for(i = 0; i < ib->nbucks; i++){
    261 		n = sortiebuck(ib, i);
    262 		if(n == TWID32)
    263 			return TWID64;
    264 		if(n != ib->bucks[i].total/IEntrySize)
    265 			fprint(2, "bucket %d changed count %d => %d\n",
    266 				i, (int)(ib->bucks[i].total/IEntrySize), n);
    267 		tot += n;
    268 	}
    269 	return tot;
    270 }
    271 
    272 /*
    273  * sort from bucket b of ib into the output file to
    274  */
    275 static u32int
    276 sortiebuck(IEBucks *ib, int b)
    277 {
    278 	u32int n;
    279 
    280 	n = readiebuck(ib, b);
    281 	if(n == TWID32)
    282 		return TWID32;
    283 	qsort(ib->buf, n, IEntrySize, ientrycmp);
    284 	if(writepart(ib->part, ib->off, ib->buf, n*IEntrySize) < 0){
    285 		seterr(EOk, "can't write sorted bucket: %r");
    286 		return TWID32;
    287 	}
    288 	ib->off += n * IEntrySize;
    289 	return n;
    290 }
    291 
    292 /*
    293  * write out a single bucket
    294  */
    295 static int
    296 flushiebuck(IEBucks *ib, int b, int reset)
    297 {
    298 	u32int n;
    299 
    300 	if(ib->bucks[b].used == 0)
    301 		return 0;
    302 	n = ib->bucks[b].used;
    303 	U32PUT(&ib->bucks[b].buf[n], ib->bucks[b].head);
    304 	n += U32Size;
    305 	USED(n);
    306 	if(writepart(ib->part, (u64int)ib->chunks * ib->size, ib->bucks[b].buf, ib->size) < 0){
    307 		seterr(EOk, "can't write sorting bucket to file: %r");
    308 xabort();
    309 		return -1;
    310 	}
    311 	ib->bucks[b].head = ib->chunks++;
    312 	ib->bucks[b].total += ib->bucks[b].used;
    313 	if(reset)
    314 		ib->bucks[b].used = 0;
    315 	return 0;
    316 }
    317 
    318 /*
    319  * write out all of the buckets, and compute
    320  * the maximum size of any bucket
    321  */
    322 static int
    323 flushiebucks(IEBucks *ib)
    324 {
    325 	int i;
    326 
    327 	for(i = 0; i < ib->nbucks; i++){
    328 		if(flushiebuck(ib, i, 0) < 0)
    329 			return -1;
    330 		if(ib->bucks[i].total > ib->max)
    331 			ib->max = ib->bucks[i].total;
    332 	}
    333 	return 0;
    334 }
    335 
    336 /*
    337  * read in the chained buffers for bucket b,
    338  * and return it's total number of IEntries
    339  */
    340 static u32int
    341 readiebuck(IEBucks *ib, int b)
    342 {
    343 	u32int head, m, n;
    344 
    345 	head = ib->bucks[b].head;
    346 	n = 0;
    347 	m = ib->bucks[b].used;
    348 	if(m == 0)
    349 		m = ib->usable;
    350 	if(0) if(ib->bucks[b].total)
    351 		fprint(2, "\tbucket %d: %lld entries\n", b, ib->bucks[b].total/IEntrySize);
    352 	while(head != TWID32){
    353 		if(readpart(ib->part, (u64int)head * ib->size, &ib->buf[n], m+U32Size) < 0){
    354 			seterr(EOk, "can't read index sort bucket: %r");
    355 			return TWID32;
    356 		}
    357 		n += m;
    358 		head = U32GET(&ib->buf[n]);
    359 		m = ib->usable;
    360 	}
    361 	if(n != ib->bucks[b].total)
    362 		fprint(2, "\tbucket %d: expected %d entries, got %d\n",
    363 			b, (int)ib->bucks[b].total/IEntrySize, n/IEntrySize);
    364 	return n / IEntrySize;
    365 }