plan9port

fork of plan9port with libvec, libstr and libsdb
Log | Files | Refs | README | LICENSE

lumpqueue.c (2721B)


      1 #include "stdinc.h"
      2 #include "dat.h"
      3 #include "fns.h"
      4 
      5 typedef struct LumpQueue	LumpQueue;
      6 typedef struct WLump		WLump;
      7 
      8 enum
      9 {
     10 	MaxLumpQ	= 1 << 3	/* max. lumps on a single write queue, must be pow 2 */
     11 };
     12 
     13 struct WLump
     14 {
     15 	Lump	*u;
     16 	Packet	*p;
     17 	int	creator;
     18 	int	gen;
     19 	uint	ms;
     20 };
     21 
     22 struct LumpQueue
     23 {
     24 	QLock	lock;
     25 	Rendez 	flush;
     26 	Rendez	full;
     27 	Rendez	empty;
     28 	WLump	q[MaxLumpQ];
     29 	int	w;
     30 	int	r;
     31 };
     32 
     33 static LumpQueue	*lumpqs;
     34 static int		nqs;
     35 
     36 static QLock		glk;
     37 static int		gen;
     38 
     39 static void	queueproc(void *vq);
     40 
     41 int
     42 initlumpqueues(int nq)
     43 {
     44 	LumpQueue *q;
     45 
     46 	int i;
     47 	nqs = nq;
     48 
     49 	lumpqs = MKNZ(LumpQueue, nq);
     50 
     51 	for(i = 0; i < nq; i++){
     52 		q = &lumpqs[i];
     53 		q->full.l = &q->lock;
     54 		q->empty.l = &q->lock;
     55 		q->flush.l = &q->lock;
     56 
     57 		if(vtproc(queueproc, q) < 0){
     58 			seterr(EOk, "can't start write queue slave: %r");
     59 			return -1;
     60 		}
     61 	}
     62 
     63 	return 0;
     64 }
     65 
     66 /*
     67  * queue a lump & it's packet data for writing
     68  */
     69 int
     70 queuewrite(Lump *u, Packet *p, int creator, uint ms)
     71 {
     72 	LumpQueue *q;
     73 	int i;
     74 
     75 	trace(TraceProc, "queuewrite");
     76 	i = indexsect(mainindex, u->score);
     77 	if(i < 0 || i >= nqs){
     78 		seterr(EBug, "internal error: illegal index section in queuewrite");
     79 		return -1;
     80 	}
     81 
     82 	q = &lumpqs[i];
     83 
     84 	qlock(&q->lock);
     85 	while(q->r == ((q->w + 1) & (MaxLumpQ - 1))){
     86 		trace(TraceProc, "queuewrite sleep");
     87 		rsleep(&q->full);
     88 	}
     89 
     90 	q->q[q->w].u = u;
     91 	q->q[q->w].p = p;
     92 	q->q[q->w].creator = creator;
     93 	q->q[q->w].ms = ms;
     94 	q->q[q->w].gen = gen;
     95 	q->w = (q->w + 1) & (MaxLumpQ - 1);
     96 
     97 	trace(TraceProc, "queuewrite wakeup");
     98 	rwakeup(&q->empty);
     99 
    100 	qunlock(&q->lock);
    101 
    102 	return 0;
    103 }
    104 
    105 void
    106 flushqueue(void)
    107 {
    108 	int i;
    109 	LumpQueue *q;
    110 
    111 	if(!lumpqs)
    112 		return;
    113 
    114 	trace(TraceProc, "flushqueue");
    115 
    116 	qlock(&glk);
    117 	gen++;
    118 	qunlock(&glk);
    119 
    120 	for(i=0; i<mainindex->nsects; i++){
    121 		q = &lumpqs[i];
    122 		qlock(&q->lock);
    123 		while(q->w != q->r && gen - q->q[q->r].gen > 0){
    124 			trace(TraceProc, "flushqueue sleep q%d", i);
    125 			rsleep(&q->flush);
    126 		}
    127 		qunlock(&q->lock);
    128 	}
    129 }
    130 
    131 static void
    132 queueproc(void *vq)
    133 {
    134 	LumpQueue *q;
    135 	Lump *u;
    136 	Packet *p;
    137 	int creator;
    138 	uint ms;
    139 
    140 	threadsetname("queueproc");
    141 
    142 	q = vq;
    143 	for(;;){
    144 		qlock(&q->lock);
    145 		while(q->w == q->r){
    146 			trace(TraceProc, "queueproc sleep empty");
    147 			rsleep(&q->empty);
    148 		}
    149 
    150 		u = q->q[q->r].u;
    151 		p = q->q[q->r].p;
    152 		creator = q->q[q->r].creator;
    153 		ms = q->q[q->r].ms;
    154 
    155 		q->r = (q->r + 1) & (MaxLumpQ - 1);
    156 		trace(TraceProc, "queueproc wakeup flush");
    157 		rwakeupall(&q->flush);
    158 
    159 		trace(TraceProc, "queueproc wakeup full");
    160 		rwakeup(&q->full);
    161 
    162 		qunlock(&q->lock);
    163 
    164 		trace(TraceProc, "queueproc writelump %V", u->score);
    165 		if(writeqlump(u, p, creator, ms) < 0)
    166 			fprint(2, "failed to write lump for %V: %r", u->score);
    167 		trace(TraceProc, "queueproc wrotelump %V", u->score);
    168 
    169 		putlump(u);
    170 	}
    171 }