import threading
import Queue
import shelve
from multiprocessing.pool import ThreadPool
import logging
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s:[%(threadName)s] %('
                                                'levelname)s - %(message)s')

logger = logging.getLogger(__name__)

class Pipeline(object):
  state_lock = threading.Lock()

  def __init__(self, tasks, processes=1, maxsize=1, state="o.state"):
    n = len(tasks)
    self.tasks = tasks
    self.pools = [ThreadPool(processes) for _ in range(n)]
    self.out = Queue.Queue()
    if state:
      self.state = shelve.open(state)

  def resume(self):
    with self.state_lock:
      for key, items in self.state.items():
        for item in items:
          self.execute(self.tasks[int(key)], item)

  def state_log(self, i, item):
    key = str(i)
    with self.state_lock:
      items = self.state.get(key, [])
      self.state[key] = items + [item]

  def state_done(self, i, item):
    key = str(i)
    with self.state_lock:
      items = self.state.get(key, [])
      items.remove(item)
      self.state[key] = items

  def run(self, items, wait=True):
    map(lambda item: self.execute(0, item), items)
    if wait:
      self.wait()
    return self.out

  def wait(self):
      for i, pool in enumerate(self.pools):
        print "waiting for pool", i, pool
        pool.close()
        pool.join()

  def execute(self, i, item):
    task = self.tasks[i]
    if i == len(self.tasks) - 1:
      # last, put result to out
      callback = lambda item_: (self.state_done(i, item), self.out.put(item_))
    else:
      callback = lambda item_: (self.state_done(i, item), self.execute(i + 1,
                                                                     item_))
    pool = self.pools[i]
    self.state_log(i, item)
    pool.apply_async(task, (item,), callback=callback)

import time

def sleep(item):
  logger.debug("processing %s", item)
  time.sleep(1)
  return item + "," + str(time.time())

def test():
  items = "abcdefg"
  pipe = Pipeline([sleep, sleep, sleep, sleep])
  out = pipe.run(items)
  print out.queue

test()