Last active
November 28, 2016 02:14
-
-
Save tgarc/66a2e1c32ec641006ab8c0f1b896261b to your computer and use it in GitHub Desktop.
Extension of mpex.py: Keep the io process alive so that multiple file may be sent
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Example of keeping a simulated IO callback thread fed using separate master and daemon (I/O) processes. | |
Builds off of mpex.py: https://gist.github.com/tgarc/7120c00eb3eb5f736be93c70ade1d68a | |
""" | |
import multiprocessing, threading, Queue | |
import time | |
import sys | |
import urllib2 | |
IOBLOCKSZ = 4096 # made up IO block size | |
IPCBLOCKSZ = 4096 # a size that is guaranteed to be larger than BLOCK_SZ | |
IPCBUFFSZ = 10*IPCBLOCKSZ # wiggle room for the inter-process buffer | |
QSIZE = 8 # queue size needs to vary depending on | |
# the ratio of IPC block size to IO block | |
# size | |
IOPERIOD = 0.01 | |
DEBUG = 1 | |
class Daemon(multiprocessing.Process): | |
def __init__(self, *args, **kwargs): | |
super(Daemon, self).__init__(target=self.main, args=args, **kwargs) | |
self._exit = multiprocessing.Event() | |
self._exited = multiprocessing.Event() | |
self.full = multiprocessing.Event() | |
self.ready = multiprocessing.Event() | |
self.daemon = True | |
self.count = 0 | |
self.wridx = 0 | |
self.rdidx = 0 | |
def exit(self): | |
self._exit.set() | |
@property | |
def exited(self): | |
return self._exited.is_set() | |
def _read(self, inpbuff, blocksize): | |
wridx = self.wridx | |
# read logic is a bit more complex than the write since we don't | |
# necessarily know the required chunk size beforehand, so we may be | |
# required to read around the edges of the receive buffer | |
nextidx = self.rdidx+blocksize | |
# we assume that once the read index catches up to the write | |
# index the stream is finished (this is technically an overflow | |
# condition, but we just assume overflow will never happen) | |
if self.rdidx <= self.wridx < nextidx: | |
nextidx = self.wridx | |
if nextidx >= len(inpbuff): # corner case: wrap around read | |
nextidx %= len(inpbuff) | |
if self.wridx < nextidx: # we've reached the write index | |
nextidx = self.wridx | |
indata = inpbuff[self.rdidx:] | |
if nextidx > 0: indata += inpbuff[:nextidx] | |
else: | |
indata = inpbuff[self.rdidx:nextidx] | |
self.rdidx = nextidx | |
return indata | |
def _receive(self, pipe, inpbuff): | |
# Logic is a bit simplified here since we can guarantee buffer | |
# size is a multiple of the transmission size (except for the | |
# last chunk which will be smaller) | |
# no more rx buffer space; wait for processing | |
# Note: | |
# we *never* write up to the read index; technically it | |
# wouldn't overwrite anything but it breaks down the | |
# buffer checking logic if we do. Instead we simplify | |
# the logic and eat the cost of IPCBLOCKSZ bytes that | |
# will never be used | |
if self.wridx <= self.rdidx <= self.wridx+IPCBLOCKSZ: | |
return -1 | |
self.full.clear() | |
nbytes = pipe.recv_bytes_into(inpbuff, self.wridx) | |
self.wridx = (self.wridx+nbytes) % len(inpbuff) | |
return nbytes | |
def main(self, pipe): | |
q = Queue.Queue(QSIZE) | |
inpbuff = bytearray(IPCBUFFSZ) | |
while not self.ready.is_set(): | |
if self._exit.is_set(): break | |
time.sleep(0.050) | |
while not self._exit.is_set(): | |
self.ready.clear() | |
# we *must* write to the inp buffer before doing anything or the | |
# buffer checking logic will breakdown | |
nbytes = pipe.recv_bytes_into(inpbuff, self.wridx) | |
self.wridx += nbytes | |
try: | |
self.consume(pipe, q, inpbuff) | |
except KeyboardInterrupt: | |
break | |
while not self.ready.is_set(): | |
if self._exit.is_set(): break | |
time.sleep(0.050) | |
# cleanup | |
q.queue.clear() | |
self.count = self.wridx = self.rdidx = 0 | |
self.full.clear() | |
pipe.close() | |
self._exited.set() | |
def consume(self, pipe, q, inpbuff): | |
self.lt = time.time() | |
dbglog = open('ioproc.log', 'w') | |
def process(pipe, q, inpbuff): | |
indata = self._read(inpbuff, IOBLOCKSZ) | |
try: | |
q.put((0, indata), block=False) | |
except Queue.Full: | |
pipe.close() | |
raise | |
if DEBUG: | |
print >> dbglog, self.count, 1e3*(time.time()-self.lt), self.rdidx, self.wridx, abs(self.wridx-self.rdidx), q.qsize() | |
self.lt = time.time() | |
self.count += len(indata) | |
if len(indata) < IOBLOCKSZ: | |
q.put(None) | |
else: | |
threading.Timer(0.01, process, (pipe, q, inpbuff,)).start() | |
threading.Timer(0.01, process, (pipe, q, inpbuff,)).start() | |
try: | |
while not self._exit.is_set(): | |
pipe.send(q.get()) | |
nbytes = self._receive(pipe, inpbuff) | |
if nbytes == -1: # buffer's full right now | |
continue | |
elif nbytes < IPCBLOCKSZ: # we must be done | |
while True: | |
item = q.get() | |
pipe.send(item) | |
if item is None: break | |
break | |
finally: | |
dbglog.close() | |
def main(argv=None): | |
if argv is None: argv=sys.argv[1:] | |
inpf, outf = argv[:2] | |
try: | |
if argv[2] == '--loop': | |
loop = True | |
except IndexError: | |
loop = False | |
if inpf == '-': | |
inp_fh = sys.stdin | |
elif inpf.startswith('http'): | |
inp_fh = urllib2.urlopen(inpf) | |
else: | |
inp_fh = open(inpf, 'rb') | |
if outf == '-': | |
out_fh = sys.stdout | |
elif outf == 'null': | |
out_fh = None | |
else: | |
out_fh = open(outf, 'wb+') | |
hostpipe, clientpipe = multiprocessing.Pipe() | |
w1 = Daemon(clientpipe, name='consumer') | |
w1.start() | |
clientpipe.close() | |
try: | |
while w1.is_alive(): | |
stream(w1, hostpipe, inp_fh, out_fh) | |
if not loop: break | |
inp_fh.seek(0) | |
except KeyboardInterrupt: | |
if DEBUG: raise | |
except: | |
w1.terminate() | |
raise | |
finally: | |
w1.exit() | |
w1.join() | |
hostpipe.close() | |
if out_fh is not None: out_fh.close() | |
if not w1.exited: | |
raise multiprocessing.ProcessError("Subprocess crashed") | |
return 0 | |
def stream(w1, hostpipe, inp_fh, out_fh=None): | |
i = 0 | |
t1 = time.time() | |
statline = "{{{:^9d}}} {:6.3f}s {:10d} bytes sent, {:10d} bytes received\r" | |
w1.ready.set() | |
txcount = rxcount = 0 | |
done = False | |
try: | |
while w1.is_alive(): | |
if not w1.full.is_set(): | |
outdata = inp_fh.read(IPCBLOCKSZ) | |
if len(outdata) == IPCBLOCKSZ: | |
hostpipe.send_bytes(outdata) | |
elif not done: | |
hostpipe.send_bytes(outdata) | |
done = True | |
txcount += len(outdata) | |
w1.full.set() | |
elif hostpipe.poll(): | |
item = hostpipe.recv() | |
if item is None: break | |
stat, indata = item | |
if out_fh is not None: | |
out_fh.write(indata) | |
rxcount += len(indata) | |
if stat != 0: | |
raise multiprocessing.ProcessError("Subprocess error") | |
if out_fh != sys.stdout and (i%1000) == 0: | |
sys.stdout.write(statline.format(i, time.time()-t1, txcount, rxcount)) | |
i += 1 | |
if out_fh != sys.stdout : | |
sys.stdout.write(statline.format(i, time.time()-t1, txcount, rxcount)) | |
finally: | |
if __name__ == '__main__': | |
sys.exit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment