Skip to content

Instantly share code, notes, and snippets.

@jmfrank63
Created August 6, 2018 15:05
Show Gist options
  • Save jmfrank63/5fb9909a8e06c91dead9265cab2f33de to your computer and use it in GitHub Desktop.
Save jmfrank63/5fb9909a8e06c91dead9265cab2f33de to your computer and use it in GitHub Desktop.
Async deque in python
# -*- coding: utf-8 -*-
class AsyncDeque(deque):
def __init__(self, elements):
super().__init__(elements)
def __aiter__(self):
return self
async def __anext__(self):
if not self:
raise StopAsyncIteration
element = self.popleft()
return element
@glacial-ai
Copy link

Very cool! If you're interested in allowing consumers to wait while the queue is empty, and/or if you want context manager support, then try this:

import asyncio
from collections import deque

class AsyncDeque(deque):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._not_empty = asyncio.Condition()  # For signaling when items are added
        self._stopped = False  # To indicate when the deque is stopped

    def __aiter__(self):
        return self

    async def __anext__(self):
        async with self._not_empty:
            while not self and not self._stopped:
                await self._not_empty.wait()  # Wait until an item is added or stopped
            if self._stopped and not self:
                raise StopAsyncIteration
            return self.popleft()

    async def put(self, item):
        """Add an item to the deque and notify waiting consumers."""
        async with self._not_empty:
            self.append(item)
            self._not_empty.notify()

    async def stop(self):
        """Stop all waiting consumers by notifying them."""
        async with self._not_empty:
            self._stopped = True
            self._not_empty.notify_all()

    async def __aenter__(self):
        """Enter context, returning the deque."""
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """Exit context, ensuring proper cleanup."""
        await self.stop()

Here's an example of how to use it:

async def producer(queue):
    for i in range(5):
        await asyncio.sleep(1)  # Simulate work
        await queue.put(i)
        print(f"Produced: {i}")

async def consumer(queue):
    async for item in queue:
        print(f"Consumed: {item}")

async def main():
    async with AsyncDeque() as queue:
        await asyncio.gather(producer(queue), consumer(queue))

asyncio.run(main())

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment