Skip to content

Instantly share code, notes, and snippets.

@luoyetx
Created November 7, 2016 11:52
Show Gist options
  • Save luoyetx/7bb471afb02b706a6bf48bdf3d2235f5 to your computer and use it in GitHub Desktop.
Save luoyetx/7bb471afb02b706a6bf48bdf3d2235f5 to your computer and use it in GitHub Desktop.
a simple map-reduce for data processing pipeline
import multiprocessing as mp
class Job(object):
'''Job present a data processing pipeline with mapper and reducer
'''
def __init__(self, name, mapper, reducer, worker_n):
'''initialize Job object with given mapper and reducer
Parameters
----------
name: job name
mapper: mapper function
reducer: reducer function
worker_n: number of total workers
'''
self.name = name
self.mapper_func = mapper
self.reducer_func = reducer
self.worker_n = worker_n
def start(self, args, mkws=None, rkws=None):
'''start this job
Parameters
----------
args: arg list for mappers
mkws: extra args for mapper
rkws: extra args for reducer
'''
q_in = [mp.Queue() for i in range(self.worker_n)]
for idx, arg in enumerate(args):
q_in[idx%self.worker_n].put(arg)
q_out = mp.Queue(10240)
mappers = [mp.Process(target=self.mapper_func, args=(q_in[i], q_out, mkws)) for i in range(self.worker_n)]
reducer = mp.Process(target=self.reducer_func, args=(q_out, rkws))
for mapper in mappers:
mapper.start()
reducer.start()
self.q_in = q_in
self.q_out = q_out
self.mappers = mappers
self.reducer = reducer
def join(self, flag='finish'):
'''join this job
Parameters
----------
flag: data transpose flag for q_out
'''
for mapper in self.mappers:
mapper.join()
self.q_out.put([flag, []])
self.reducer.join()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment