wrarena.c (4871B)
1 #include "stdinc.h" 2 #include "dat.h" 3 #include "fns.h" 4 5 QLock godot; 6 char *host; 7 int readonly = 1; /* for part.c */ 8 int mainstacksize = 256*1024; 9 Channel *c; 10 VtConn *z; 11 int fast; /* and a bit unsafe; only for benchmarking */ 12 int haveaoffset; 13 int maxwrites = -1; 14 int verbose; 15 16 typedef struct ZClump ZClump; 17 struct ZClump 18 { 19 ZBlock *lump; 20 Clump cl; 21 u64int aa; 22 }; 23 24 void 25 usage(void) 26 { 27 fprint(2, "usage: wrarena [-o fileoffset] [-h host] arenafile [clumpoffset]\n"); 28 threadexitsall("usage"); 29 } 30 31 void 32 vtsendthread(void *v) 33 { 34 ZClump zcl; 35 36 USED(v); 37 while(recv(c, &zcl) == 1){ 38 if(zcl.lump == nil) 39 break; 40 if(vtwrite(z, zcl.cl.info.score, zcl.cl.info.type, zcl.lump->data, zcl.cl.info.uncsize) < 0) 41 sysfatal("failed writing clump %llud: %r", zcl.aa); 42 if(verbose) 43 print("%V\n", zcl.cl.info.score); 44 freezblock(zcl.lump); 45 } 46 /* 47 * All the send threads try to exit right when 48 * threadmain is calling threadexitsall. 49 * Either libthread or the Linux NPTL pthreads library 50 * can't handle this condition (I suspect NPTL but have 51 * not confirmed this) and we get a seg fault in exit. 52 * I spent a day tracking this down with no success, 53 * so we're going to work around it instead by just 54 * sitting here and waiting for the threadexitsall to 55 * take effect. 56 */ 57 qlock(&godot); 58 } 59 60 static void 61 rdarena(Arena *arena, u64int offset) 62 { 63 int i; 64 u64int a, aa, e; 65 uchar score[VtScoreSize]; 66 Clump cl; 67 ClumpInfo ci; 68 ZBlock *lump; 69 ZClump zcl; 70 71 fprint(2, "wrarena: copying %s to venti\n", arena->name); 72 printarena(2, arena); 73 74 a = arena->base; 75 e = arena->base + arena->size; 76 if(offset != ~(u64int)0) { 77 if(offset >= e - a) 78 sysfatal("bad offset %#llx >= %#llx", offset, e - a); 79 aa = offset; 80 } else 81 aa = 0; 82 83 i = 0; 84 for(a = 0; maxwrites != 0 && i < arena->memstats.clumps; 85 a += ClumpSize + ci.size){ 86 if(readclumpinfo(arena, i++, &ci) < 0) 87 break; 88 if(a < aa || ci.type == VtCorruptType){ 89 if(ci.type == VtCorruptType) 90 fprint(2, "%s: corrupt clump read at %#llx: +%d\n", 91 argv0, a, ClumpSize+ci.size); 92 continue; 93 } 94 lump = loadclump(arena, a, 0, &cl, score, 0); 95 if(lump == nil) { 96 fprint(2, "clump %#llx failed to read: %r\n", a); 97 continue; 98 } 99 if(!fast && cl.info.type != VtCorruptType) { 100 scoremem(score, lump->data, cl.info.uncsize); 101 if(scorecmp(cl.info.score, score) != 0) { 102 fprint(2, "clump %#llx has mismatched score\n", 103 a); 104 break; 105 } 106 if(vttypevalid(cl.info.type) < 0) { 107 fprint(2, "clump %#llx has bad type %d\n", 108 a, cl.info.type); 109 break; 110 } 111 } 112 if(z && cl.info.type != VtCorruptType){ 113 zcl.cl = cl; 114 zcl.lump = lump; 115 zcl.aa = a; 116 send(c, &zcl); 117 }else 118 freezblock(lump); 119 if(maxwrites > 0) 120 --maxwrites; 121 } 122 if(a > aa) 123 aa = a; 124 if(haveaoffset) 125 print("end offset %#llx\n", aa); 126 } 127 128 void 129 threadmain(int argc, char *argv[]) 130 { 131 int i; 132 char *file; 133 Arena *arena; 134 ArenaPart *ap; 135 u64int offset, aoffset; 136 Part *part; 137 uchar buf[8192]; 138 ArenaHead head; 139 ZClump zerocl; 140 141 qlock(&godot); 142 aoffset = 0; 143 ARGBEGIN{ 144 case 'f': 145 fast = 1; 146 ventidoublechecksha1 = 0; 147 break; 148 case 'h': 149 host = EARGF(usage()); 150 break; 151 case 'o': 152 haveaoffset = 1; 153 aoffset = strtoull(EARGF(usage()), 0, 0); 154 break; 155 case 'M': 156 maxwrites = atoi(EARGF(usage())); 157 break; 158 case 'v': 159 verbose = 1; 160 break; 161 default: 162 usage(); 163 break; 164 }ARGEND 165 166 offset = ~(u64int)0; 167 switch(argc) { 168 default: 169 usage(); 170 case 2: 171 offset = strtoull(argv[1], 0, 0); 172 /* fall through */ 173 case 1: 174 file = argv[0]; 175 } 176 177 ventifmtinstall(); 178 179 statsinit(); 180 181 part = initpart(file, OREAD); 182 if(part == nil) 183 sysfatal("can't open file %s: %r", file); 184 185 // Try as arena partition. 186 arena = nil; 187 ap = initarenapart(part); 188 if(ap != nil) 189 goto loaded; 190 191 if(readpart(part, aoffset, buf, sizeof buf) < 0) 192 sysfatal("can't read file %s: %r", file); 193 194 if(unpackarenahead(&head, buf) < 0) 195 sysfatal("corrupted arena header: %r"); 196 197 if(aoffset+head.size > part->size) 198 sysfatal("arena is truncated: want %llud bytes have %llud", 199 head.size, part->size); 200 201 partblocksize(part, head.blocksize); 202 203 arena = initarena(part, aoffset, head.size, head.blocksize); 204 if(arena == nil) 205 sysfatal("initarena: %r"); 206 207 loaded: 208 z = nil; 209 if(host==nil || strcmp(host, "/dev/null") != 0){ 210 z = vtdial(host); 211 if(z == nil) 212 sysfatal("could not connect to server: %r"); 213 if(vtconnect(z) < 0) 214 sysfatal("vtconnect: %r"); 215 } 216 217 print("%T starting to send data\n"); 218 c = chancreate(sizeof(ZClump), 0); 219 for(i=0; i<12; i++) 220 vtproc(vtsendthread, nil); 221 222 initdcache(8 * MaxDiskBlock); 223 224 if(ap != nil) { 225 for(i=0; i<ap->narenas; i++) 226 rdarena(ap->arenas[i], 0); 227 } else 228 rdarena(arena, offset); 229 230 memset(&zerocl, 0, sizeof zerocl); 231 for(i=0; i<12; i++) 232 send(c, &zerocl); 233 if(vtsync(z) < 0) 234 sysfatal("executing sync: %r"); 235 if(z){ 236 vthangup(z); 237 } 238 print("%T sent all data\n"); 239 240 threadexitsall(0); 241 }