-
-
Save antonagestam/8476ada7d74cce93af0339cf32c62ae2 to your computer and use it in GitHub Desktop.
import asyncio | |
import random | |
from typing import TypeVar, AsyncGenerator | |
T = TypeVar("T") | |
async def read_into_queue( | |
task: AsyncGenerator[T, None], | |
queue: asyncio.Queue[T], | |
done: asyncio.Semaphore, | |
) -> None: | |
async for item in task: | |
await queue.put(item) | |
# All items from this task are in the queue, decrease semaphore by one. | |
await done.acquire() | |
async def join(*generators: AsyncGenerator[T, None]) -> AsyncGenerator[T, None]: | |
queue = asyncio.Queue(maxsize=1) | |
done_semaphore = asyncio.Semaphore(len(generators)) | |
# Read from each given generator into the shared queue. | |
produce_tasks = [ | |
asyncio.create_task(read_into_queue(task, queue, done_semaphore)) | |
for task in generators | |
] | |
# Read items off the queue until it is empty and the semaphore value is down to zero. | |
while not done_semaphore.locked() or not queue.empty(): | |
try: | |
yield await asyncio.wait_for(queue.get(), .001) | |
except TimeoutError: | |
continue | |
# Not strictly needed, but usually a good idea to await tasks, they are already finished here. | |
try: | |
await asyncio.wait_for(asyncio.gather(*produce_tasks), 0) | |
except TimeoutError: | |
raise NotImplementedError("Impossible state: expected all tasks to be exhausted") | |
# --- | |
async def produce(i) -> AsyncGenerator[int, None]: | |
for i in range(i, i + 5): | |
yield i | |
await asyncio.sleep(2 * random.random()) | |
async def consume(source: AsyncGenerator[int, None]) -> None: | |
async for item in source: | |
print(f"{item=}") | |
@asyncio.run | |
@lambda fn: fn() | |
async def main() -> None: | |
await consume( | |
join( | |
produce(10), | |
produce(20), | |
produce(30), | |
) | |
) |
@Lx I believe it became possible with this PEP, so it hasn't been legal syntax for very long: https://peps.python.org/pep-0614/
If generator can raise, you might want to:
async def put_async_generator_in_queue(
generator: typing.AsyncGenerator[T, None],
queue: asyncio.Queue[T],
done: asyncio.Semaphore,
) -> None:
try:
async for item in generator:
await queue.put(item)
finally:
await done.acquire()
Furthermore, I don't see any reason to do anything other than at the end:
await asyncio.wait_for(asyncio.gather(*produce_tasks)
Note that, despite the comment, this is not optional if you want to be able to retrieve errors from the generators.
@pelson You need to do something different for handling errors as that point in the code will never be reached, the semaphore will not reach zero.
@pelson You need to do something different for handling errors as that point in the code will never be reached, the semaphore will not reach zero.
See the finally above. This ensures that generators raising will both raise, and mark the semaphore done.
Interesting! I didn’t know that lambda syntax could be used there. Thank you!