Created
April 9, 2012 01:48
-
-
Save johnfink8/2340771 to your computer and use it in GitHub Desktop.
Process a list of input using a given function using multiple parallel processes.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
''' | |
ItemProcessor | |
Written by: John Fink | |
Takes a list of objects, and a function, and some optional arguments to | |
that function. Runs that function (with those optional arguments) against | |
each element of the input list, in parallel. Results from each function | |
call can be retrieved in a list of tuples (along with the input) with | |
.result(), which will also optionally (by default) join and block until | |
the entire list is finished. | |
Sample usage: | |
tasks = [1,2,3,4,5,6] | |
def test(num,val): | |
return num > val | |
print itemprocessor.ItemProcessor(tasks).start(test,3).result() | |
'[(1, False), (2, False), (3, False), (4, True), (5, True), (6, True)]' | |
''' | |
import multiprocessing | |
class ItemProcessor(object): | |
def __init__(self,task_list,process_count=10): | |
self.m=multiprocessing.Manager() | |
self.tasks=self.m.Queue() | |
self.results=self.m.list() | |
for task in task_list: | |
self.tasks.put(task) | |
self.process_count=process_count | |
def join(self): | |
return self.tasks.join() | |
def _worker_process(self,*args,**kwargs): | |
func=kwargs['func'] | |
del kwargs['func'] | |
while True: | |
try: | |
task=self.tasks.get_nowait() | |
except: | |
return | |
self.results.append((task,func(task,*args,**kwargs))) | |
self.tasks.task_done() | |
def start(self,func,*args,**kwargs): | |
for i in xrange(0,self.process_count): | |
kwargs['func']=func | |
thread=multiprocessing.Process(target=self._worker_process,args=args,kwargs=kwargs) | |
thread.start() | |
return self | |
def result(self,join=True): | |
if join: | |
self.join() | |
return list(self.results) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment