venti.c (5750B)
1 #ifdef PLAN9PORT 2 #include <u.h> 3 #include <signal.h> 4 #endif 5 #include "stdinc.h" 6 #include "dat.h" 7 #include "fns.h" 8 9 #include "whack.h" 10 11 int debug; 12 int nofork; 13 int mainstacksize = 256*1024; 14 VtSrv *ventisrv; 15 16 static void ventiserver(void*); 17 18 void 19 usage(void) 20 { 21 fprint(2, "usage: venti [-Ldrs] [-a address] [-B blockcachesize] [-c config] " 22 "[-C lumpcachesize] [-h httpaddress] [-I indexcachesize] [-W webroot]\n"); 23 threadexitsall("usage"); 24 } 25 26 int 27 threadmaybackground(void) 28 { 29 return 1; 30 } 31 32 void 33 threadmain(int argc, char *argv[]) 34 { 35 char *configfile, *haddr, *vaddr, *webroot; 36 u32int mem, icmem, bcmem, minbcmem; 37 Config config; 38 39 traceinit(); 40 threadsetname("main"); 41 vaddr = nil; 42 haddr = nil; 43 configfile = nil; 44 webroot = nil; 45 mem = 0; 46 icmem = 0; 47 bcmem = 0; 48 ARGBEGIN{ 49 case 'a': 50 vaddr = EARGF(usage()); 51 break; 52 case 'B': 53 bcmem = unittoull(EARGF(usage())); 54 break; 55 case 'c': 56 configfile = EARGF(usage()); 57 break; 58 case 'C': 59 mem = unittoull(EARGF(usage())); 60 break; 61 case 'D': 62 settrace(EARGF(usage())); 63 break; 64 case 'd': 65 debug = 1; 66 nofork = 1; 67 break; 68 case 'h': 69 haddr = EARGF(usage()); 70 break; 71 case 'I': 72 icmem = unittoull(EARGF(usage())); 73 break; 74 case 'L': 75 ventilogging = 1; 76 break; 77 case 'r': 78 readonly = 1; 79 break; 80 case 's': 81 nofork = 1; 82 break; 83 case 'w': /* compatibility with old venti */ 84 queuewrites = 1; 85 break; 86 case 'W': 87 webroot = EARGF(usage()); 88 break; 89 default: 90 usage(); 91 }ARGEND 92 93 if(argc) 94 usage(); 95 96 if(!nofork) 97 rfork(RFNOTEG); 98 99 #ifdef PLAN9PORT 100 { 101 /* sigh - needed to avoid signals when writing to hungup networks */ 102 struct sigaction sa; 103 memset(&sa, 0, sizeof sa); 104 sa.sa_handler = SIG_IGN; 105 sigaction(SIGPIPE, &sa, nil); 106 } 107 #endif 108 109 ventifmtinstall(); 110 trace(TraceQuiet, "venti started"); 111 fprint(2, "%T venti: "); 112 113 if(configfile == nil) 114 configfile = "venti.conf"; 115 116 fprint(2, "conf..."); 117 if(initventi(configfile, &config) < 0) 118 sysfatal("can't init server: %r"); 119 /* 120 * load bloom filter 121 */ 122 if(mainindex->bloom && loadbloom(mainindex->bloom) < 0) 123 sysfatal("can't load bloom filter: %r"); 124 125 if(mem == 0) 126 mem = config.mem; 127 if(bcmem == 0) 128 bcmem = config.bcmem; 129 if(icmem == 0) 130 icmem = config.icmem; 131 if(haddr == nil) 132 haddr = config.haddr; 133 if(vaddr == nil) 134 vaddr = config.vaddr; 135 if(vaddr == nil) 136 vaddr = "tcp!*!venti"; 137 if(webroot == nil) 138 webroot = config.webroot; 139 if(queuewrites == 0) 140 queuewrites = config.queuewrites; 141 142 if(haddr){ 143 fprint(2, "httpd %s...", haddr); 144 if(httpdinit(haddr, webroot) < 0) 145 fprint(2, "warning: can't start http server: %r"); 146 } 147 fprint(2, "init..."); 148 149 if(mem == 0xffffffffUL) 150 mem = 1 * 1024 * 1024; 151 152 /* 153 * lump cache 154 */ 155 if(0) fprint(2, "initialize %d bytes of lump cache for %d lumps\n", 156 mem, mem / (8 * 1024)); 157 initlumpcache(mem, mem / (8 * 1024)); 158 159 /* 160 * index cache 161 */ 162 initicache(icmem); 163 initicachewrite(); 164 165 /* 166 * block cache: need a block for every arena and every process 167 */ 168 minbcmem = maxblocksize * 169 (mainindex->narenas + mainindex->nsects*4 + 16); 170 if(bcmem < minbcmem) 171 bcmem = minbcmem; 172 if(0) fprint(2, "initialize %d bytes of disk block cache\n", bcmem); 173 initdcache(bcmem); 174 175 if(mainindex->bloom) 176 startbloomproc(mainindex->bloom); 177 178 fprint(2, "sync..."); 179 if(!readonly && syncindex(mainindex) < 0) 180 sysfatal("can't sync server: %r"); 181 182 if(!readonly && queuewrites){ 183 fprint(2, "queue..."); 184 if(initlumpqueues(mainindex->nsects) < 0){ 185 fprint(2, "can't initialize lump queues," 186 " disabling write queueing: %r"); 187 queuewrites = 0; 188 } 189 } 190 191 if(initarenasum() < 0) 192 fprint(2, "warning: can't initialize arena summing process: %r"); 193 194 fprint(2, "announce %s...", vaddr); 195 ventisrv = vtlisten(vaddr); 196 if(ventisrv == nil) 197 sysfatal("can't announce %s: %r", vaddr); 198 199 fprint(2, "serving.\n"); 200 if(nofork) 201 ventiserver(nil); 202 else 203 vtproc(ventiserver, nil); 204 205 threadexits(nil); 206 } 207 208 static void 209 vtrerror(VtReq *r, char *error) 210 { 211 r->rx.msgtype = VtRerror; 212 r->rx.error = estrdup(error); 213 } 214 215 static void 216 ventiserver(void *v) 217 { 218 Packet *p; 219 VtReq *r; 220 char err[ERRMAX]; 221 uint ms; 222 int cached, ok; 223 224 USED(v); 225 threadsetname("ventiserver"); 226 trace(TraceWork, "start"); 227 while((r = vtgetreq(ventisrv)) != nil){ 228 trace(TraceWork, "finish"); 229 trace(TraceWork, "start request %F", &r->tx); 230 trace(TraceRpc, "<- %F", &r->tx); 231 r->rx.msgtype = r->tx.msgtype+1; 232 addstat(StatRpcTotal, 1); 233 if(0) print("req (arenas[0]=%p sects[0]=%p) %F\n", 234 mainindex->arenas[0], mainindex->sects[0], &r->tx); 235 switch(r->tx.msgtype){ 236 default: 237 vtrerror(r, "unknown request"); 238 break; 239 case VtTread: 240 ms = msec(); 241 r->rx.data = readlump(r->tx.score, r->tx.blocktype, r->tx.count, &cached); 242 ms = msec() - ms; 243 addstat2(StatRpcRead, 1, StatRpcReadTime, ms); 244 if(r->rx.data == nil){ 245 addstat(StatRpcReadFail, 1); 246 rerrstr(err, sizeof err); 247 vtrerror(r, err); 248 }else{ 249 addstat(StatRpcReadBytes, packetsize(r->rx.data)); 250 addstat(StatRpcReadOk, 1); 251 if(cached) 252 addstat2(StatRpcReadCached, 1, StatRpcReadCachedTime, ms); 253 else 254 addstat2(StatRpcReadUncached, 1, StatRpcReadUncachedTime, ms); 255 } 256 break; 257 case VtTwrite: 258 if(readonly){ 259 vtrerror(r, "read only"); 260 break; 261 } 262 p = r->tx.data; 263 r->tx.data = nil; 264 addstat(StatRpcWriteBytes, packetsize(p)); 265 ms = msec(); 266 ok = writelump(p, r->rx.score, r->tx.blocktype, 0, ms); 267 ms = msec() - ms; 268 addstat2(StatRpcWrite, 1, StatRpcWriteTime, ms); 269 270 if(ok < 0){ 271 addstat(StatRpcWriteFail, 1); 272 rerrstr(err, sizeof err); 273 vtrerror(r, err); 274 } 275 break; 276 case VtTsync: 277 flushqueue(); 278 flushdcache(); 279 break; 280 } 281 trace(TraceRpc, "-> %F", &r->rx); 282 vtrespond(r); 283 trace(TraceWork, "start"); 284 } 285 flushdcache(); 286 flushicache(); 287 threadexitsall(0); 288 }