plan9port

fork of plan9port with libvec, libstr and libsdb
Log | Files | Refs | README | LICENSE

venti.c (5750B)


      1 #ifdef PLAN9PORT
      2 #include <u.h>
      3 #include <signal.h>
      4 #endif
      5 #include "stdinc.h"
      6 #include "dat.h"
      7 #include "fns.h"
      8 
      9 #include "whack.h"
     10 
     11 int debug;
     12 int nofork;
     13 int mainstacksize = 256*1024;
     14 VtSrv *ventisrv;
     15 
     16 static void	ventiserver(void*);
     17 
     18 void
     19 usage(void)
     20 {
     21 	fprint(2, "usage: venti [-Ldrs] [-a address] [-B blockcachesize] [-c config] "
     22 "[-C lumpcachesize] [-h httpaddress] [-I indexcachesize] [-W webroot]\n");
     23 	threadexitsall("usage");
     24 }
     25 
     26 int
     27 threadmaybackground(void)
     28 {
     29 	return 1;
     30 }
     31 
     32 void
     33 threadmain(int argc, char *argv[])
     34 {
     35 	char *configfile, *haddr, *vaddr, *webroot;
     36 	u32int mem, icmem, bcmem, minbcmem;
     37 	Config config;
     38 
     39 	traceinit();
     40 	threadsetname("main");
     41 	vaddr = nil;
     42 	haddr = nil;
     43 	configfile = nil;
     44 	webroot = nil;
     45 	mem = 0;
     46 	icmem = 0;
     47 	bcmem = 0;
     48 	ARGBEGIN{
     49 	case 'a':
     50 		vaddr = EARGF(usage());
     51 		break;
     52 	case 'B':
     53 		bcmem = unittoull(EARGF(usage()));
     54 		break;
     55 	case 'c':
     56 		configfile = EARGF(usage());
     57 		break;
     58 	case 'C':
     59 		mem = unittoull(EARGF(usage()));
     60 		break;
     61 	case 'D':
     62 		settrace(EARGF(usage()));
     63 		break;
     64 	case 'd':
     65 		debug = 1;
     66 		nofork = 1;
     67 		break;
     68 	case 'h':
     69 		haddr = EARGF(usage());
     70 		break;
     71 	case 'I':
     72 		icmem = unittoull(EARGF(usage()));
     73 		break;
     74 	case 'L':
     75 		ventilogging = 1;
     76 		break;
     77 	case 'r':
     78 		readonly = 1;
     79 		break;
     80 	case 's':
     81 		nofork = 1;
     82 		break;
     83 	case 'w':			/* compatibility with old venti */
     84 		queuewrites = 1;
     85 		break;
     86 	case 'W':
     87 		webroot = EARGF(usage());
     88 		break;
     89 	default:
     90 		usage();
     91 	}ARGEND
     92 
     93 	if(argc)
     94 		usage();
     95 
     96 	if(!nofork)
     97 		rfork(RFNOTEG);
     98 
     99 #ifdef PLAN9PORT
    100 	{
    101 		/* sigh - needed to avoid signals when writing to hungup networks */
    102 		struct sigaction sa;
    103 		memset(&sa, 0, sizeof sa);
    104 		sa.sa_handler = SIG_IGN;
    105 		sigaction(SIGPIPE, &sa, nil);
    106 	}
    107 #endif
    108 
    109 	ventifmtinstall();
    110 	trace(TraceQuiet, "venti started");
    111 	fprint(2, "%T venti: ");
    112 
    113 	if(configfile == nil)
    114 		configfile = "venti.conf";
    115 
    116 	fprint(2, "conf...");
    117 	if(initventi(configfile, &config) < 0)
    118 		sysfatal("can't init server: %r");
    119 	/*
    120 	 * load bloom filter
    121 	 */
    122 	if(mainindex->bloom && loadbloom(mainindex->bloom) < 0)
    123 		sysfatal("can't load bloom filter: %r");
    124 
    125 	if(mem == 0)
    126 		mem = config.mem;
    127 	if(bcmem == 0)
    128 		bcmem = config.bcmem;
    129 	if(icmem == 0)
    130 		icmem = config.icmem;
    131 	if(haddr == nil)
    132 		haddr = config.haddr;
    133 	if(vaddr == nil)
    134 		vaddr = config.vaddr;
    135 	if(vaddr == nil)
    136 		vaddr = "tcp!*!venti";
    137 	if(webroot == nil)
    138 		webroot = config.webroot;
    139 	if(queuewrites == 0)
    140 		queuewrites = config.queuewrites;
    141 
    142 	if(haddr){
    143 		fprint(2, "httpd %s...", haddr);
    144 		if(httpdinit(haddr, webroot) < 0)
    145 			fprint(2, "warning: can't start http server: %r");
    146 	}
    147 	fprint(2, "init...");
    148 
    149 	if(mem == 0xffffffffUL)
    150 		mem = 1 * 1024 * 1024;
    151 
    152 	/*
    153 	 * lump cache
    154 	 */
    155 	if(0) fprint(2, "initialize %d bytes of lump cache for %d lumps\n",
    156 		mem, mem / (8 * 1024));
    157 	initlumpcache(mem, mem / (8 * 1024));
    158 
    159 	/*
    160 	 * index cache
    161 	 */
    162 	initicache(icmem);
    163 	initicachewrite();
    164 
    165 	/*
    166 	 * block cache: need a block for every arena and every process
    167 	 */
    168 	minbcmem = maxblocksize *
    169 		(mainindex->narenas + mainindex->nsects*4 + 16);
    170 	if(bcmem < minbcmem)
    171 		bcmem = minbcmem;
    172 	if(0) fprint(2, "initialize %d bytes of disk block cache\n", bcmem);
    173 	initdcache(bcmem);
    174 
    175 	if(mainindex->bloom)
    176 		startbloomproc(mainindex->bloom);
    177 
    178 	fprint(2, "sync...");
    179 	if(!readonly && syncindex(mainindex) < 0)
    180 		sysfatal("can't sync server: %r");
    181 
    182 	if(!readonly && queuewrites){
    183 		fprint(2, "queue...");
    184 		if(initlumpqueues(mainindex->nsects) < 0){
    185 			fprint(2, "can't initialize lump queues,"
    186 				" disabling write queueing: %r");
    187 			queuewrites = 0;
    188 		}
    189 	}
    190 
    191 	if(initarenasum() < 0)
    192 		fprint(2, "warning: can't initialize arena summing process: %r");
    193 
    194 	fprint(2, "announce %s...", vaddr);
    195 	ventisrv = vtlisten(vaddr);
    196 	if(ventisrv == nil)
    197 		sysfatal("can't announce %s: %r", vaddr);
    198 
    199 	fprint(2, "serving.\n");
    200 	if(nofork)
    201 		ventiserver(nil);
    202 	else
    203 		vtproc(ventiserver, nil);
    204 
    205 	threadexits(nil);
    206 }
    207 
    208 static void
    209 vtrerror(VtReq *r, char *error)
    210 {
    211 	r->rx.msgtype = VtRerror;
    212 	r->rx.error = estrdup(error);
    213 }
    214 
    215 static void
    216 ventiserver(void *v)
    217 {
    218 	Packet *p;
    219 	VtReq *r;
    220 	char err[ERRMAX];
    221 	uint ms;
    222 	int cached, ok;
    223 
    224 	USED(v);
    225 	threadsetname("ventiserver");
    226 	trace(TraceWork, "start");
    227 	while((r = vtgetreq(ventisrv)) != nil){
    228 		trace(TraceWork, "finish");
    229 		trace(TraceWork, "start request %F", &r->tx);
    230 		trace(TraceRpc, "<- %F", &r->tx);
    231 		r->rx.msgtype = r->tx.msgtype+1;
    232 		addstat(StatRpcTotal, 1);
    233 		if(0) print("req (arenas[0]=%p sects[0]=%p) %F\n",
    234 			mainindex->arenas[0], mainindex->sects[0], &r->tx);
    235 		switch(r->tx.msgtype){
    236 		default:
    237 			vtrerror(r, "unknown request");
    238 			break;
    239 		case VtTread:
    240 			ms = msec();
    241 			r->rx.data = readlump(r->tx.score, r->tx.blocktype, r->tx.count, &cached);
    242 			ms = msec() - ms;
    243 			addstat2(StatRpcRead, 1, StatRpcReadTime, ms);
    244 			if(r->rx.data == nil){
    245 				addstat(StatRpcReadFail, 1);
    246 				rerrstr(err, sizeof err);
    247 				vtrerror(r, err);
    248 			}else{
    249 				addstat(StatRpcReadBytes, packetsize(r->rx.data));
    250 				addstat(StatRpcReadOk, 1);
    251 				if(cached)
    252 					addstat2(StatRpcReadCached, 1, StatRpcReadCachedTime, ms);
    253 				else
    254 					addstat2(StatRpcReadUncached, 1, StatRpcReadUncachedTime, ms);
    255 			}
    256 			break;
    257 		case VtTwrite:
    258 			if(readonly){
    259 				vtrerror(r, "read only");
    260 				break;
    261 			}
    262 			p = r->tx.data;
    263 			r->tx.data = nil;
    264 			addstat(StatRpcWriteBytes, packetsize(p));
    265 			ms = msec();
    266 			ok = writelump(p, r->rx.score, r->tx.blocktype, 0, ms);
    267 			ms = msec() - ms;
    268 			addstat2(StatRpcWrite, 1, StatRpcWriteTime, ms);
    269 
    270 			if(ok < 0){
    271 				addstat(StatRpcWriteFail, 1);
    272 				rerrstr(err, sizeof err);
    273 				vtrerror(r, err);
    274 			}
    275 			break;
    276 		case VtTsync:
    277 			flushqueue();
    278 			flushdcache();
    279 			break;
    280 		}
    281 		trace(TraceRpc, "-> %F", &r->rx);
    282 		vtrespond(r);
    283 		trace(TraceWork, "start");
    284 	}
    285 	flushdcache();
    286 	flushicache();
    287 	threadexitsall(0);
    288 }