import asyncio import atexit import functools import inspect import threading from concurrent.futures import wait from typing import Optional class TaskRunner: """A singleton task runner that runs an asyncio event loop on a background thread.""" __instance = None @staticmethod def getInstance(): if TaskRunner.__instance is None: TaskRunner() assert TaskRunner.__instance is not None return TaskRunner.__instance def __init__(self): if TaskRunner.__instance is not None: raise Exception("This class is a singleton!") else: TaskRunner.__instance = self self.__io_loop: Optional[asyncio.AbstractEventLoop] = None self.__runner_thread: Optional[threading.Thread] = None self.__lock = threading.Lock() atexit.register(self._close) def _close(self): if self.__io_loop: self.__io_loop.stop() def _runner(self): loop = self.__io_loop assert loop is not None try: loop.run_forever() finally: loop.close() def run(self, coro): """Synchronously run a coroutine on a background thread.""" with self.__lock: if self.__io_loop is None: self.__io_loop = asyncio.new_event_loop() self.__runner_thread = threading.Thread(target=self._runner, daemon=True) self.__runner_thread.start() fut = asyncio.run_coroutine_threadsafe(coro, self.__io_loop) wait([fut]) return fut.result() def synchronize_method(async_method, doc=None): """Decorate `async_method` so it runs synchronously. The method runs on an event loop. :Parameters: - `async_method`: Unbound method of pymongo Collection, Database, MongoClient, etc. - `doc`: Optionally override async_method's docstring """ @functools.wraps(async_method) def method(self, *args, **kwargs): runner = TaskRunner.getInstance() coro = async_method(self, *args, **kwargs) return runner.run(coro) # This is for the benefit of generating documentation with Sphinx. method.is_sync_method = True # type: ignore[attr-defined] if doc is not None: method.__doc__ = doc return method def synchronize_class(kls): """Create a synchronous class from an asynchronous one.""" name = kls.__name__.replace("Async", "") subclass = type(name, (kls,), {}) subclass._async_marker = False for name in dir(subclass): if name.startswith('_'): continue obj = getattr(subclass, name) if inspect.iscoroutinefunction(obj): setattr(subclass, name, synchronize_method(obj)) return subclass class AsyncMongoClient: _async_marker = True async def find_one(self): kls = AsyncCursor if self._async_marker else Cursor return kls() class AsyncCursor: _async_marker = True def __iter__(self): return self def __next__(self): raise StopIteration async def __aiter__(self): return self async def __anext__(self): raise StopAsyncIteration async def fetch_something(self): return 'foo' MongoClient = synchronize_class(AsyncMongoClient) Cursor = synchronize_class(AsyncCursor) client = MongoClient() print(client) cursor = client.find_one() print(cursor.fetch_something()) for item in cursor: print(item) print(cursor) print(str(cursor))