Last active
June 26, 2021 13:53
-
-
Save santiagobasulto/10b689ba5fcadf307ffc5cd4f4ae00ec to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# This is a proof of concept of a wait function that preserves | |
# the order of the futures. | |
# | |
# I'm working on a library on top of concurrent.futures | |
# and I needed a `wait` function that would preserve the order of the futures passed | |
# so I wrote a simple `wait_in_order` function. | |
# Using this also makes `ThreadPoolExecutor.map`'s implemenation simpler | |
from concurrent.futures._base import ( | |
_create_and_install_waiters, TimeoutError, | |
_AcquireFutures, CANCELLED_AND_NOTIFIED, FINISHED, | |
ALL_COMPLETED, FIRST_COMPLETED, FIRST_EXCEPTION) | |
def wait_in_order(fs, timeout=None, return_when=ALL_COMPLETED): | |
DONE_STATUS = {CANCELLED_AND_NOTIFIED, FINISHED} | |
with _AcquireFutures(fs): | |
any_done = any([f._state in DONE_STATUS for f in fs]) | |
any_exception = any([f.exception() is not None for f in fs if not f.cancelled() and f.done()]) | |
all_done = all([f._state in DONE_STATUS for f in fs]) | |
if return_when == FIRST_COMPLETED and any_done: | |
return fs | |
if return_when == FIRST_EXCEPTION and any_exception: | |
return fs | |
# return_when must be ALL_COMPLETED | |
if all_done: | |
return fs | |
waiter = _create_and_install_waiters(fs, return_when) | |
waiter.event.wait(timeout) | |
for f in fs: | |
with f._condition: | |
f._waiters.remove(waiter) | |
return fs | |
class NewThreadPoolExecutor(concurrent.futures.ThreadPoolExecutor): | |
def map(self, fn, *iterables, timeout=None, chunksize=1): | |
fs = wait_in_order( | |
[self.submit(fn, *args) for args in zip(*iterables)], | |
timeout=timeout, return_when=ALL_COMPLETED) | |
if not all([f.done() for f in fs]): | |
raise TimeoutError() | |
return (f.result() for f in fs) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment