Created
November 7, 2016 11:52
-
-
Save luoyetx/7bb471afb02b706a6bf48bdf3d2235f5 to your computer and use it in GitHub Desktop.
a simple map-reduce for data processing pipeline
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import multiprocessing as mp | |
class Job(object): | |
'''Job present a data processing pipeline with mapper and reducer | |
''' | |
def __init__(self, name, mapper, reducer, worker_n): | |
'''initialize Job object with given mapper and reducer | |
Parameters | |
---------- | |
name: job name | |
mapper: mapper function | |
reducer: reducer function | |
worker_n: number of total workers | |
''' | |
self.name = name | |
self.mapper_func = mapper | |
self.reducer_func = reducer | |
self.worker_n = worker_n | |
def start(self, args, mkws=None, rkws=None): | |
'''start this job | |
Parameters | |
---------- | |
args: arg list for mappers | |
mkws: extra args for mapper | |
rkws: extra args for reducer | |
''' | |
q_in = [mp.Queue() for i in range(self.worker_n)] | |
for idx, arg in enumerate(args): | |
q_in[idx%self.worker_n].put(arg) | |
q_out = mp.Queue(10240) | |
mappers = [mp.Process(target=self.mapper_func, args=(q_in[i], q_out, mkws)) for i in range(self.worker_n)] | |
reducer = mp.Process(target=self.reducer_func, args=(q_out, rkws)) | |
for mapper in mappers: | |
mapper.start() | |
reducer.start() | |
self.q_in = q_in | |
self.q_out = q_out | |
self.mappers = mappers | |
self.reducer = reducer | |
def join(self, flag='finish'): | |
'''join this job | |
Parameters | |
---------- | |
flag: data transpose flag for q_out | |
''' | |
for mapper in self.mappers: | |
mapper.join() | |
self.q_out.put([flag, []]) | |
self.reducer.join() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment