Last active
November 18, 2016 00:09
-
-
Save tgarc/7120c00eb3eb5f736be93c70ade1d68a to your computer and use it in GitHub Desktop.
Multiprocessing example: feeding a low rate I/O stream
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Example of keeping a simulated IO callback thread fed using separate master and daemon (I/O) processes. | |
""" | |
import multiprocessing, threading, Queue | |
import time | |
import sys | |
import urllib2 | |
IOBLOCKSZ = 4096 # made up IO block size | |
IPCBLOCKSZ = 8192 # a size that is guaranteed to be larger than BLOCK_SZ | |
IPCBUFFSZ = 10*IPCBLOCKSZ # wiggle room for the inter-process buffer | |
QSIZE = 1024 # queue size needs to vary depending | |
# on the ratio of IPC block size to IO | |
# block size | |
DEBUG = 0 | |
class Daemon(multiprocessing.Process): | |
def __init__(self, *args, **kwargs): | |
super(Daemon, self).__init__(target=self.consume, args=args, **kwargs) | |
self._exit = multiprocessing.Event() | |
self._exited = multiprocessing.Event() | |
self.buffered = multiprocessing.Event() | |
self.daemon = True | |
def exit(self): | |
self._exit.set() | |
@property | |
def exited(self): | |
return self._exited.is_set() | |
def _read(self, inpbuff, blocksize): | |
with self.lock: | |
# logic is a bit more complex here since we don't necessarily know | |
# the required chunk size beforehand, so maybe required to read | |
# around the edges of the receive buffer | |
nextidx = self.rdidx+blocksize | |
if nextidx >= len(inpbuff): # corner case: wrap around read | |
nextidx %= len(inpbuff) | |
if self.wridx <= nextidx: | |
nextidx = self.wridx | |
indata = inpbuff[self.rdidx:] | |
# worst case scenario: we allocate and copy | |
if nextidx != 0: | |
indata += inpbuff[:nextidx] | |
else: | |
# we assume that once the read index catches up to the write | |
# index the stream is finished (however it could also indicate | |
# overflow) | |
if self.rdidx < self.wridx <= nextidx: | |
nextidx = self.wridx | |
indata = inpbuff[self.rdidx:nextidx] | |
self.rdidx = nextidx | |
return indata | |
def _receive(self, pipe, inpbuff): | |
# Logic is a bit simplified here since we can guarantee buffer | |
# size is a multiple of the transmission size (except for the | |
# last chunk which will be smaller) | |
with self.lock: | |
# no more rx buffer space; wait for processing | |
if self.wridx < self.rdidx <= self.wridx+IPCBLOCKSZ: | |
return -1 | |
self.buffered.clear() | |
nbytes = pipe.recv_bytes_into(inpbuff, self.wridx) | |
self.wridx += nbytes | |
if self.wridx == len(inpbuff): self.wridx = 0 | |
return nbytes | |
def consume(self, pipe): | |
q = Queue.Queue(QSIZE) | |
self.lock = threading.Lock() | |
inpbuff = bytearray(IPCBUFFSZ) | |
self.count = self.wridx = self.rdidx = 0 | |
self.lt = time.time() | |
def process(self, q, inpbuff): | |
indata = self._read(inpbuff, IOBLOCKSZ) | |
self.count += len(indata) | |
try: | |
q.put((0, indata), block=False) | |
except Queue.Full: | |
pipe.close() | |
raise | |
if DEBUG: | |
print self.count, time.time()-self.lt, self.rdidx, self.wridx, abs(self.wridx-self.rdidx), q.qsize() | |
self.lt = time.time() | |
if len(indata) < IOBLOCKSZ: | |
q.put(None) | |
else: | |
threading.Timer(0.001, process, (self, q, inpbuff,)).start() | |
try: | |
self._receive(pipe, inpbuff) | |
threading.Timer(0.001, process, (self, q, inpbuff,)).start() | |
while not self._exit.is_set(): | |
try: | |
item = q.get() | |
except Queue.Empty: | |
pass | |
else: | |
if item is None: break | |
pipe.send(item) | |
nbytes = self._receive(pipe, inpbuff) | |
if nbytes == -1: # buffer's full right now | |
continue | |
elif nbytes < IPCBLOCKSZ: # we must be done | |
item = q.get() | |
while item is not None: | |
pipe.send(item) | |
item = q.get() | |
break | |
except (KeyboardInterrupt, EOFError): | |
if DEBUG: raise | |
except multiprocessing.BufferTooShort: | |
raise Exception("Buffer overflow") | |
finally: | |
pipe.close() | |
self._exited.set() | |
def main(argv=None): | |
if argv is None: argv=sys.argv[1:] | |
inpf, outf = argv[:2] | |
try: | |
if argv[2] == '--loop': | |
loop = True | |
except IndexError: | |
loop = False | |
if inpf.startswith('http'): | |
inp_fh = urllib2.urlopen(inpf) | |
else: | |
inp_fh = open(inpf, 'rb') | |
if outf == '-': | |
out_fh = sys.stdout | |
elif outf == 'null': | |
out_fh = None | |
else: | |
out_fh = open(outf, 'wb+') | |
stream(inp_fh, out_fh, loop) | |
return 0 | |
def stream(inp_fh, out_fh=None, loop=False): | |
hostpipe, clientpipe = multiprocessing.Pipe() | |
w1 = Daemon(clientpipe, name='consumer') | |
w1.start() | |
clientpipe.close() | |
try: | |
i = 0 | |
t1 = time.time() | |
txcount = rxcount = 0 | |
done = False | |
while w1.is_alive(): | |
if not w1.buffered.is_set(): | |
outdata = inp_fh.read(IPCBLOCKSZ) | |
if len(outdata) < IPCBLOCKSZ: | |
if loop: | |
inp_fh.seek(0) | |
outdata += inp_fh.read(IPCBLOCKSZ-len(outdata)) | |
hostpipe.send_bytes(outdata) | |
elif not done: | |
hostpipe.send_bytes(outdata) | |
done = True | |
else: | |
hostpipe.send_bytes(outdata) | |
txcount += len(outdata) | |
w1.buffered.set() | |
elif hostpipe.poll(): | |
try: | |
stat, indata = hostpipe.recv() | |
if out_fh is not None: | |
out_fh.write(indata) | |
rxcount += len(indata) | |
if stat != 0: | |
raise multiprocessing.ProcessError("Subprocess error") | |
except (IOError, EOFError): # Processing is done, or, they crashed | |
w1.join() | |
if not w1.exited: | |
raise multiprocessing.ProcessError("Subprocess crashed") | |
break | |
if out_fh != sys.stdout and not DEBUG: | |
sys.stdout.write("{{{:^6d}}} {:6.3f}s {:10d} bytes sent, {:10d} bytes received\r".format(i, time.time()-t1, txcount, rxcount)) | |
sys.stdout.flush() | |
i += 1 | |
except KeyboardInterrupt: | |
w1.exit() | |
if DEBUG: raise | |
except: | |
w1.terminate() | |
raise | |
finally: | |
hostpipe.close() | |
if out_fh is not None: out_fh.close() | |
w1.join() | |
if __name__ == '__main__': | |
sys.exit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment