plan9port

fork of plan9port with libvec, libstr and libsdb
Log | Files | Refs | README | LICENSE

wrarena.c (4871B)


      1 #include "stdinc.h"
      2 #include "dat.h"
      3 #include "fns.h"
      4 
      5 QLock godot;
      6 char *host;
      7 int readonly = 1;	/* for part.c */
      8 int mainstacksize = 256*1024;
      9 Channel *c;
     10 VtConn *z;
     11 int fast;	/* and a bit unsafe; only for benchmarking */
     12 int haveaoffset;
     13 int maxwrites = -1;
     14 int verbose;
     15 
     16 typedef struct ZClump ZClump;
     17 struct ZClump
     18 {
     19 	ZBlock *lump;
     20 	Clump cl;
     21 	u64int aa;
     22 };
     23 
     24 void
     25 usage(void)
     26 {
     27 	fprint(2, "usage: wrarena [-o fileoffset] [-h host] arenafile [clumpoffset]\n");
     28 	threadexitsall("usage");
     29 }
     30 
     31 void
     32 vtsendthread(void *v)
     33 {
     34 	ZClump zcl;
     35 
     36 	USED(v);
     37 	while(recv(c, &zcl) == 1){
     38 		if(zcl.lump == nil)
     39 			break;
     40 		if(vtwrite(z, zcl.cl.info.score, zcl.cl.info.type, zcl.lump->data, zcl.cl.info.uncsize) < 0)
     41 			sysfatal("failed writing clump %llud: %r", zcl.aa);
     42 		if(verbose)
     43 			print("%V\n", zcl.cl.info.score);
     44 		freezblock(zcl.lump);
     45 	}
     46 	/*
     47 	 * All the send threads try to exit right when
     48 	 * threadmain is calling threadexitsall.
     49 	 * Either libthread or the Linux NPTL pthreads library
     50 	 * can't handle this condition (I suspect NPTL but have
     51 	 * not confirmed this) and we get a seg fault in exit.
     52 	 * I spent a day tracking this down with no success,
     53 	 * so we're going to work around it instead by just
     54 	 * sitting here and waiting for the threadexitsall to
     55 	 * take effect.
     56 	 */
     57 	qlock(&godot);
     58 }
     59 
     60 static void
     61 rdarena(Arena *arena, u64int offset)
     62 {
     63 	int i;
     64 	u64int a, aa, e;
     65 	uchar score[VtScoreSize];
     66 	Clump cl;
     67 	ClumpInfo ci;
     68 	ZBlock *lump;
     69 	ZClump zcl;
     70 
     71 	fprint(2, "wrarena: copying %s to venti\n", arena->name);
     72 	printarena(2, arena);
     73 
     74 	a = arena->base;
     75 	e = arena->base + arena->size;
     76 	if(offset != ~(u64int)0) {
     77 		if(offset >= e - a)
     78 			sysfatal("bad offset %#llx >= %#llx", offset, e - a);
     79 		aa = offset;
     80 	} else
     81 		aa = 0;
     82 
     83 	i = 0;
     84 	for(a = 0; maxwrites != 0 && i < arena->memstats.clumps;
     85 	    a += ClumpSize + ci.size){
     86 		if(readclumpinfo(arena, i++, &ci) < 0)
     87 			break;
     88 		if(a < aa || ci.type == VtCorruptType){
     89 			if(ci.type == VtCorruptType)
     90 				fprint(2, "%s: corrupt clump read at %#llx: +%d\n",
     91 					argv0, a, ClumpSize+ci.size);
     92 			continue;
     93 		}
     94 		lump = loadclump(arena, a, 0, &cl, score, 0);
     95 		if(lump == nil) {
     96 			fprint(2, "clump %#llx failed to read: %r\n", a);
     97 			continue;
     98 		}
     99 		if(!fast && cl.info.type != VtCorruptType) {
    100 			scoremem(score, lump->data, cl.info.uncsize);
    101 			if(scorecmp(cl.info.score, score) != 0) {
    102 				fprint(2, "clump %#llx has mismatched score\n",
    103 					a);
    104 				break;
    105 			}
    106 			if(vttypevalid(cl.info.type) < 0) {
    107 				fprint(2, "clump %#llx has bad type %d\n",
    108 					a, cl.info.type);
    109 				break;
    110 			}
    111 		}
    112 		if(z && cl.info.type != VtCorruptType){
    113 			zcl.cl = cl;
    114 			zcl.lump = lump;
    115 			zcl.aa = a;
    116 			send(c, &zcl);
    117 		}else
    118 			freezblock(lump);
    119 		if(maxwrites > 0)
    120 			--maxwrites;
    121 	}
    122 	if(a > aa)
    123 		aa = a;
    124 	if(haveaoffset)
    125 		print("end offset %#llx\n", aa);
    126 }
    127 
    128 void
    129 threadmain(int argc, char *argv[])
    130 {
    131 	int i;
    132 	char *file;
    133 	Arena *arena;
    134 	ArenaPart *ap;
    135 	u64int offset, aoffset;
    136 	Part *part;
    137 	uchar buf[8192];
    138 	ArenaHead head;
    139 	ZClump zerocl;
    140 
    141 	qlock(&godot);
    142 	aoffset = 0;
    143 	ARGBEGIN{
    144 	case 'f':
    145 		fast = 1;
    146 		ventidoublechecksha1 = 0;
    147 		break;
    148 	case 'h':
    149 		host = EARGF(usage());
    150 		break;
    151 	case 'o':
    152 		haveaoffset = 1;
    153 		aoffset = strtoull(EARGF(usage()), 0, 0);
    154 		break;
    155 	case 'M':
    156 		maxwrites = atoi(EARGF(usage()));
    157 		break;
    158 	case 'v':
    159 		verbose = 1;
    160 		break;
    161 	default:
    162 		usage();
    163 		break;
    164 	}ARGEND
    165 
    166 	offset = ~(u64int)0;
    167 	switch(argc) {
    168 	default:
    169 		usage();
    170 	case 2:
    171 		offset = strtoull(argv[1], 0, 0);
    172 		/* fall through */
    173 	case 1:
    174 		file = argv[0];
    175 	}
    176 
    177 	ventifmtinstall();
    178 
    179 	statsinit();
    180 
    181 	part = initpart(file, OREAD);
    182 	if(part == nil)
    183 		sysfatal("can't open file %s: %r", file);
    184 
    185 	// Try as arena partition.
    186 	arena = nil;
    187 	ap = initarenapart(part);
    188 	if(ap != nil)
    189 		goto loaded;
    190 
    191 	if(readpart(part, aoffset, buf, sizeof buf) < 0)
    192 		sysfatal("can't read file %s: %r", file);
    193 
    194 	if(unpackarenahead(&head, buf) < 0)
    195 		sysfatal("corrupted arena header: %r");
    196 
    197 	if(aoffset+head.size > part->size)
    198 		sysfatal("arena is truncated: want %llud bytes have %llud",
    199 			head.size, part->size);
    200 
    201 	partblocksize(part, head.blocksize);
    202 
    203 	arena = initarena(part, aoffset, head.size, head.blocksize);
    204 	if(arena == nil)
    205 		sysfatal("initarena: %r");
    206 
    207 loaded:
    208 	z = nil;
    209 	if(host==nil || strcmp(host, "/dev/null") != 0){
    210 		z = vtdial(host);
    211 		if(z == nil)
    212 			sysfatal("could not connect to server: %r");
    213 		if(vtconnect(z) < 0)
    214 			sysfatal("vtconnect: %r");
    215 	}
    216 
    217 	print("%T starting to send data\n");
    218 	c = chancreate(sizeof(ZClump), 0);
    219 	for(i=0; i<12; i++)
    220 		vtproc(vtsendthread, nil);
    221 
    222 	initdcache(8 * MaxDiskBlock);
    223 
    224 	if(ap != nil) {
    225 		for(i=0; i<ap->narenas; i++)
    226 			rdarena(ap->arenas[i], 0);
    227 	} else
    228 		rdarena(arena, offset);
    229 
    230 	memset(&zerocl, 0, sizeof zerocl);
    231 	for(i=0; i<12; i++)
    232 		send(c, &zerocl);
    233 	if(vtsync(z) < 0)
    234 		sysfatal("executing sync: %r");
    235 	if(z){
    236 		vthangup(z);
    237 	}
    238 	print("%T sent all data\n");
    239 
    240 	threadexitsall(0);
    241 }