Skip to content

Instantly share code, notes, and snippets.

@antonagestam
Last active April 5, 2025 03:48
Show Gist options
  • Save antonagestam/8476ada7d74cce93af0339cf32c62ae2 to your computer and use it in GitHub Desktop.
Save antonagestam/8476ada7d74cce93af0339cf32c62ae2 to your computer and use it in GitHub Desktop.
Merge results from multiple async generators into one single stream.
import asyncio
import random
from typing import TypeVar, AsyncGenerator
T = TypeVar("T")
async def read_into_queue(
task: AsyncGenerator[T, None],
queue: asyncio.Queue[T],
done: asyncio.Semaphore,
) -> None:
async for item in task:
await queue.put(item)
# All items from this task are in the queue, decrease semaphore by one.
await done.acquire()
async def join(*generators: AsyncGenerator[T, None]) -> AsyncGenerator[T, None]:
queue = asyncio.Queue(maxsize=1)
done_semaphore = asyncio.Semaphore(len(generators))
# Read from each given generator into the shared queue.
produce_tasks = [
asyncio.create_task(read_into_queue(task, queue, done_semaphore))
for task in generators
]
# Read items off the queue until it is empty and the semaphore value is down to zero.
while not done_semaphore.locked() or not queue.empty():
try:
yield await asyncio.wait_for(queue.get(), .001)
except TimeoutError:
continue
# Not strictly needed, but usually a good idea to await tasks, they are already finished here.
try:
await asyncio.wait_for(asyncio.gather(*produce_tasks), 0)
except TimeoutError:
raise NotImplementedError("Impossible state: expected all tasks to be exhausted")
# ---
async def produce(i) -> AsyncGenerator[int, None]:
for i in range(i, i + 5):
yield i
await asyncio.sleep(2 * random.random())
async def consume(source: AsyncGenerator[int, None]) -> None:
async for item in source:
print(f"{item=}")
@asyncio.run
@lambda fn: fn()
async def main() -> None:
await consume(
join(
produce(10),
produce(20),
produce(30),
)
)
@Lx
Copy link

Lx commented Mar 9, 2024

I’m keen to learn more about the @asyncio.run and @lambda decorators—there are no obvious Google results for these.

@antonagestam
Copy link
Author

@Lx That's just my lazy way of executing the main function, it's exactly equivalent to:

async def main():
    ...

main = main()  # @lambda fn: fn()
main = asyncio.run(main)  # @asyncio.run

Nothing magic, it's just abusing how decorators work.

@Lx
Copy link

Lx commented Mar 10, 2024

Interesting! I didn’t know that lambda syntax could be used there. Thank you!

@antonagestam
Copy link
Author

@Lx I believe it became possible with this PEP, so it hasn't been legal syntax for very long: https://peps.python.org/pep-0614/

@pelson
Copy link

pelson commented Apr 4, 2025

If generator can raise, you might want to:

async def put_async_generator_in_queue(
    generator: typing.AsyncGenerator[T, None],
    queue: asyncio.Queue[T],
    done: asyncio.Semaphore,
) ->  None:
    try:
        async for item in generator:
            await queue.put(item)
    finally:
        await done.acquire()

Furthermore, I don't see any reason to do anything other than at the end:

await asyncio.wait_for(asyncio.gather(*produce_tasks)

Note that, despite the comment, this is not optional if you want to be able to retrieve errors from the generators.

@antonagestam
Copy link
Author

@pelson You need to do something different for handling errors as that point in the code will never be reached, the semaphore will not reach zero.

@pelson
Copy link

pelson commented Apr 5, 2025

@pelson You need to do something different for handling errors as that point in the code will never be reached, the semaphore will not reach zero.

See the finally above. This ensures that generators raising will both raise, and mark the semaphore done.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment