plan9port

fork of plan9port with libvec, libstr and libsdb
Log | Files | Refs | README | LICENSE

ioproc.c (2201B)


      1 #include <u.h>
      2 #include <libc.h>
      3 #include <thread.h>
      4 #include "ioproc.h"
      5 
      6 enum
      7 {
      8 	STACK = 32768
      9 };
     10 
     11 void
     12 iointerrupt(Ioproc *io)
     13 {
     14 	if(!io->inuse)
     15 		return;
     16 	fprint(2, "bug: cannot iointerrupt %p yet\n", io);
     17 }
     18 
     19 static void
     20 xioproc(void *a)
     21 {
     22 	Ioproc *io, *x;
     23 
     24 	threadsetname("ioproc");
     25 	io = a;
     26 	/*
     27 	 * first recvp acquires the ioproc.
     28 	 * second tells us that the data is ready.
     29 	 */
     30 	for(;;){
     31 		while(recv(io->c, &x) == -1)
     32 			;
     33 		if(x == 0)	/* our cue to leave */
     34 			break;
     35 		assert(x == io);
     36 
     37 		/* caller is now committed -- even if interrupted he'll return */
     38 		while(recv(io->creply, &x) == -1)
     39 			;
     40 		if(x == 0)	/* caller backed out */
     41 			continue;
     42 		assert(x == io);
     43 
     44 		io->ret = io->op(&io->arg);
     45 		if(io->ret < 0)
     46 			rerrstr(io->err, sizeof io->err);
     47 		while(send(io->creply, &io) == -1)
     48 			;
     49 		while(recv(io->creply, &x) == -1)
     50 			;
     51 	}
     52 }
     53 
     54 Ioproc*
     55 ioproc(void)
     56 {
     57 	Ioproc *io;
     58 
     59 	io = mallocz(sizeof(*io), 1);
     60 	if(io == nil)
     61 		sysfatal("ioproc malloc: %r");
     62 	io->c = chancreate(sizeof(void*), 0);
     63 	chansetname(io->c, "ioc%p", io->c);
     64 	io->creply = chancreate(sizeof(void*), 0);
     65 	chansetname(io->creply, "ior%p", io->c);
     66 	io->tid = proccreate(xioproc, io, STACK);
     67 	return io;
     68 }
     69 
     70 void
     71 closeioproc(Ioproc *io)
     72 {
     73 	if(io == nil)
     74 		return;
     75 	iointerrupt(io);
     76 	while(send(io->c, 0) == -1)
     77 		;
     78 	chanfree(io->c);
     79 	chanfree(io->creply);
     80 	free(io);
     81 }
     82 
     83 long
     84 iocall(Ioproc *io, long (*op)(va_list*), ...)
     85 {
     86 	char e[ERRMAX];
     87 	int ret, inted;
     88 	Ioproc *msg;
     89 
     90 	if(send(io->c, &io) == -1){
     91 		werrstr("interrupted");
     92 		return -1;
     93 	}
     94 	assert(!io->inuse);
     95 	io->inuse = 1;
     96 	io->op = op;
     97 	va_start(io->arg, op);
     98 	msg = io;
     99 	inted = 0;
    100 	while(send(io->creply, &msg) == -1){
    101 		msg = nil;
    102 		inted = 1;
    103 	}
    104 	if(inted){
    105 		werrstr("interrupted");
    106 		return -1;
    107 	}
    108 
    109 	/*
    110 	 * If we get interrupted, we have stick around so that
    111 	 * the IO proc has someone to talk to.  Send it an interrupt
    112 	 * and try again.
    113 	 */
    114 	inted = 0;
    115 	while(recv(io->creply, nil) == -1){
    116 		inted = 1;
    117 		iointerrupt(io);
    118 	}
    119 	USED(inted);
    120 	va_end(io->arg);
    121 	ret = io->ret;
    122 	if(ret < 0)
    123 		strecpy(e, e+sizeof e, io->err);
    124 	io->inuse = 0;
    125 
    126 	/* release resources */
    127 	while(send(io->creply, &io) == -1)
    128 		;
    129 	if(ret < 0)
    130 		errstr(e, sizeof e);
    131 	return ret;
    132 }