lumpqueue.c (2721B)
1 #include "stdinc.h" 2 #include "dat.h" 3 #include "fns.h" 4 5 typedef struct LumpQueue LumpQueue; 6 typedef struct WLump WLump; 7 8 enum 9 { 10 MaxLumpQ = 1 << 3 /* max. lumps on a single write queue, must be pow 2 */ 11 }; 12 13 struct WLump 14 { 15 Lump *u; 16 Packet *p; 17 int creator; 18 int gen; 19 uint ms; 20 }; 21 22 struct LumpQueue 23 { 24 QLock lock; 25 Rendez flush; 26 Rendez full; 27 Rendez empty; 28 WLump q[MaxLumpQ]; 29 int w; 30 int r; 31 }; 32 33 static LumpQueue *lumpqs; 34 static int nqs; 35 36 static QLock glk; 37 static int gen; 38 39 static void queueproc(void *vq); 40 41 int 42 initlumpqueues(int nq) 43 { 44 LumpQueue *q; 45 46 int i; 47 nqs = nq; 48 49 lumpqs = MKNZ(LumpQueue, nq); 50 51 for(i = 0; i < nq; i++){ 52 q = &lumpqs[i]; 53 q->full.l = &q->lock; 54 q->empty.l = &q->lock; 55 q->flush.l = &q->lock; 56 57 if(vtproc(queueproc, q) < 0){ 58 seterr(EOk, "can't start write queue slave: %r"); 59 return -1; 60 } 61 } 62 63 return 0; 64 } 65 66 /* 67 * queue a lump & it's packet data for writing 68 */ 69 int 70 queuewrite(Lump *u, Packet *p, int creator, uint ms) 71 { 72 LumpQueue *q; 73 int i; 74 75 trace(TraceProc, "queuewrite"); 76 i = indexsect(mainindex, u->score); 77 if(i < 0 || i >= nqs){ 78 seterr(EBug, "internal error: illegal index section in queuewrite"); 79 return -1; 80 } 81 82 q = &lumpqs[i]; 83 84 qlock(&q->lock); 85 while(q->r == ((q->w + 1) & (MaxLumpQ - 1))){ 86 trace(TraceProc, "queuewrite sleep"); 87 rsleep(&q->full); 88 } 89 90 q->q[q->w].u = u; 91 q->q[q->w].p = p; 92 q->q[q->w].creator = creator; 93 q->q[q->w].ms = ms; 94 q->q[q->w].gen = gen; 95 q->w = (q->w + 1) & (MaxLumpQ - 1); 96 97 trace(TraceProc, "queuewrite wakeup"); 98 rwakeup(&q->empty); 99 100 qunlock(&q->lock); 101 102 return 0; 103 } 104 105 void 106 flushqueue(void) 107 { 108 int i; 109 LumpQueue *q; 110 111 if(!lumpqs) 112 return; 113 114 trace(TraceProc, "flushqueue"); 115 116 qlock(&glk); 117 gen++; 118 qunlock(&glk); 119 120 for(i=0; i<mainindex->nsects; i++){ 121 q = &lumpqs[i]; 122 qlock(&q->lock); 123 while(q->w != q->r && gen - q->q[q->r].gen > 0){ 124 trace(TraceProc, "flushqueue sleep q%d", i); 125 rsleep(&q->flush); 126 } 127 qunlock(&q->lock); 128 } 129 } 130 131 static void 132 queueproc(void *vq) 133 { 134 LumpQueue *q; 135 Lump *u; 136 Packet *p; 137 int creator; 138 uint ms; 139 140 threadsetname("queueproc"); 141 142 q = vq; 143 for(;;){ 144 qlock(&q->lock); 145 while(q->w == q->r){ 146 trace(TraceProc, "queueproc sleep empty"); 147 rsleep(&q->empty); 148 } 149 150 u = q->q[q->r].u; 151 p = q->q[q->r].p; 152 creator = q->q[q->r].creator; 153 ms = q->q[q->r].ms; 154 155 q->r = (q->r + 1) & (MaxLumpQ - 1); 156 trace(TraceProc, "queueproc wakeup flush"); 157 rwakeupall(&q->flush); 158 159 trace(TraceProc, "queueproc wakeup full"); 160 rwakeup(&q->full); 161 162 qunlock(&q->lock); 163 164 trace(TraceProc, "queueproc writelump %V", u->score); 165 if(writeqlump(u, p, creator, ms) < 0) 166 fprint(2, "failed to write lump for %V: %r", u->score); 167 trace(TraceProc, "queueproc wrotelump %V", u->score); 168 169 putlump(u); 170 } 171 }