Skip to content

Instantly share code, notes, and snippets.

@domenukk
Created January 3, 2019 20:19

Revisions

  1. domenukk created this gist Jan 3, 2019.
    74 changes: 74 additions & 0 deletions py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,74 @@
    #!/bin/env python
    """
    A simple example calling any functon in the foreground from background threads, including a response, using Queues.
    """
    from multiprocessing import Queue, Process, cpu_count
    from collections import namedtuple
    import traceback

    Event = namedtuple("Event", "thread_id function args kwargs")



    def done(thread_id, request_queue, response_queue):
    request_queue.put(Event(thread_id, "done", [], {}))


    def call_from_bg(thread_id, request_queue, response_queue, func, *args, **kwargs):
    request_queue.put(Event(thread_id, func, args, kwargs))
    return response_queue.get()


    def background_thread(thread_id, *queues):
    try:
    def call(func, *args, **kwargs):
    return call_from_bg(thread_id, *queues, func, *args, **kwargs)

    for i in range(1000):
    print("{}: calling on main".format(thread_id))
    resp = call("on_main", "{}: Hello from the foreground :) ({})".format(thread_id, i))
    print("{}: Foreground responded: {}".format(thread_id, resp))
    except Exception as ex:
    print("An exception occurred: {}".format(ex))
    traceback.print_exc()
    finally:
    done(thread_id, *queues)


    # Example foo
    counter = 0


    def on_main(*args):
    global counter
    counter += 1
    print(*args)
    return "Already counted to: {}".format(counter)


    def main_thread(n_threads):
    request_queue = Queue()
    response_queues = []

    threads = []
    done = []
    for i in range(n_threads):
    response_queue = Queue()
    response_queues.append(response_queue)
    threads.append(Process(target=background_thread, args=[i, request_queue, response_queue]))
    threads[i].start()
    while not len(done) == n_threads:
    event = request_queue.get()
    if event.function == "done":
    done.append(threads[event.thread_id])
    else:
    try:
    response_queues[event.thread_id].put(globals()[event.function](*event.args, **event.kwargs))
    except Exception as ex:
    response_queues[event.thread_id].put(ex)
    for p in done:
    p.join()


    if __name__ == '__main__':
    main_thread(cpu_count() * 2)