Last active
March 13, 2017 04:44
-
-
Save MattPitlyk/ac45c3513d75bfc6d1446d7aa8f991e4 to your computer and use it in GitHub Desktop.
How to partially consume a generator during multiprocessing.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
''' | |
This file demostrates how to prevent a generator from being fully comsumed by | |
multiprocessing. It avoids using multiprocessing.map() in favor of a single | |
queue with a max size from which processes pull elements to work on. As elements | |
are being processed, a loop fills the queue will elements from the generator | |
until it reaches its max size, at which point it blocks until elements are | |
retrieved from the queue by the processes. | |
''' | |
from multiprocessing import Process, JoinableQueue | |
import os | |
import time | |
print('Main {}'.format(os.getpid())) | |
def worker(q): | |
"""The process will continually pull elements from the shared queue | |
to process until reaching a None sentinel. | |
""" | |
pid = os.getpid() | |
print(pid) | |
while True: | |
print("{} getting next element".format(pid)) | |
current_elt = q.get(timeout=5) | |
if current_elt is None: | |
q.task_done() | |
break | |
print("{} - Starting to process elt: {}".format(pid, current_elt)) | |
time.sleep(2) | |
print("{} - Finished processing elt: {}".format(pid, current_elt)) | |
q.task_done() | |
# Create a Queue or JoinableQueue from multiprocessing. | |
# There is no direct multiprocessing analog for the thread safe ``Queue.Queue``. | |
# If you want to join the queue so that it blocks until all tasks are done, use multiprocessing.JoinableQueue. | |
# If you don't want blocking behavior, use multiprocessing.Queue. | |
# https://docs.python.org/2/library/multiprocessing.html#multiprocessing.Queue | |
# https://docs.python.org/2/library/multiprocessing.html#multiprocessing.JoinableQueue | |
queue_max_size = 8 | |
q = JoinableQueue(maxsize=queue_max_size) | |
# Create processes | |
processes = [] | |
num_of_processes = 5 | |
for i in xrange(num_of_processes): | |
p = Process(target=worker, args=(q,)) | |
p.start() | |
processes.append(p) | |
# Get some generator | |
g = xrange(50) # Yes, I still use Python 2. | |
for elt in g: | |
print("Adding {}".format(elt)) | |
# Because there is a max size and block=True, this put will block until a | |
# slot upens up in the queue. This prevents the generator from being exhausted | |
# and only keeps ``queue_max_size`` elements from the generator in memory at once | |
# (plus the elements currently being processed). | |
q.put(elt, block=True) | |
# Add sentinels to shut down processes. | |
print('Adding sentinels...') | |
for _ in xrange(num_of_processes): | |
q.put(None) | |
q.join() # Block until all elements have been processed. | |
print("All done!") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment