Skip to content

Instantly share code, notes, and snippets.

@MattPitlyk
Last active March 13, 2017 04:44
Show Gist options
  • Save MattPitlyk/ac45c3513d75bfc6d1446d7aa8f991e4 to your computer and use it in GitHub Desktop.
Save MattPitlyk/ac45c3513d75bfc6d1446d7aa8f991e4 to your computer and use it in GitHub Desktop.
How to partially consume a generator during multiprocessing.
'''
This file demostrates how to prevent a generator from being fully comsumed by
multiprocessing. It avoids using multiprocessing.map() in favor of a single
queue with a max size from which processes pull elements to work on. As elements
are being processed, a loop fills the queue will elements from the generator
until it reaches its max size, at which point it blocks until elements are
retrieved from the queue by the processes.
'''
from multiprocessing import Process, JoinableQueue
import os
import time
print('Main {}'.format(os.getpid()))
def worker(q):
"""The process will continually pull elements from the shared queue
to process until reaching a None sentinel.
"""
pid = os.getpid()
print(pid)
while True:
print("{} getting next element".format(pid))
current_elt = q.get(timeout=5)
if current_elt is None:
q.task_done()
break
print("{} - Starting to process elt: {}".format(pid, current_elt))
time.sleep(2)
print("{} - Finished processing elt: {}".format(pid, current_elt))
q.task_done()
# Create a Queue or JoinableQueue from multiprocessing.
# There is no direct multiprocessing analog for the thread safe ``Queue.Queue``.
# If you want to join the queue so that it blocks until all tasks are done, use multiprocessing.JoinableQueue.
# If you don't want blocking behavior, use multiprocessing.Queue.
# https://docs.python.org/2/library/multiprocessing.html#multiprocessing.Queue
# https://docs.python.org/2/library/multiprocessing.html#multiprocessing.JoinableQueue
queue_max_size = 8
q = JoinableQueue(maxsize=queue_max_size)
# Create processes
processes = []
num_of_processes = 5
for i in xrange(num_of_processes):
p = Process(target=worker, args=(q,))
p.start()
processes.append(p)
# Get some generator
g = xrange(50) # Yes, I still use Python 2.
for elt in g:
print("Adding {}".format(elt))
# Because there is a max size and block=True, this put will block until a
# slot upens up in the queue. This prevents the generator from being exhausted
# and only keeps ``queue_max_size`` elements from the generator in memory at once
# (plus the elements currently being processed).
q.put(elt, block=True)
# Add sentinels to shut down processes.
print('Adding sentinels...')
for _ in xrange(num_of_processes):
q.put(None)
q.join() # Block until all elements have been processed.
print("All done!")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment