import threading import Queue import shelve from multiprocessing.pool import ThreadPool import logging logging.basicConfig(level=logging.DEBUG, format='%(asctime)s:[%(threadName)s] %(' 'levelname)s - %(message)s') logger = logging.getLogger(__name__) class Pipeline(object): state_lock = threading.Lock() def __init__(self, tasks, processes=1, maxsize=1, state="o.state"): n = len(tasks) self.tasks = tasks self.pools = [ThreadPool(processes) for _ in range(n)] self.out = Queue.Queue() if state: self.state = shelve.open(state) def resume(self): with self.state_lock: for key, items in self.state.items(): for item in items: self.execute(self.tasks[int(key)], item) def state_log(self, i, item): key = str(i) with self.state_lock: items = self.state.get(key, []) self.state[key] = items + [item] def state_done(self, i, item): key = str(i) with self.state_lock: items = self.state.get(key, []) items.remove(item) self.state[key] = items def run(self, items, wait=True): map(lambda item: self.execute(0, item), items) if wait: self.wait() return self.out def wait(self): for i, pool in enumerate(self.pools): print "waiting for pool", i, pool pool.close() pool.join() def execute(self, i, item): task = self.tasks[i] if i == len(self.tasks) - 1: # last, put result to out callback = lambda item_: (self.state_done(i, item), self.out.put(item_)) else: callback = lambda item_: (self.state_done(i, item), self.execute(i + 1, item_)) pool = self.pools[i] self.state_log(i, item) pool.apply_async(task, (item,), callback=callback) import time def sleep(item): logger.debug("processing %s", item) time.sleep(1) return item + "," + str(time.time()) def test(): items = "abcdefg" pipe = Pipeline([sleep, sleep, sleep, sleep]) out = pipe.run(items) print out.queue test()