Last active
July 18, 2024 01:28
-
-
Save autumnjolitz/d30c2ff23e77062385321309b8912808 to your computer and use it in GitHub Desktop.
console supports for lack of top level await in non-await terminal
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import atexit | |
import concurrent.futures | |
import datetime | |
import functools | |
import inspect | |
import json | |
import logging | |
import readline | |
import rlcompleter | |
import sys | |
import threading | |
import weakref | |
from contextlib import suppress | |
from typing import Optional, Union, Dict, Tuple, List, TYPE_CHECKING, TypeVar, Set, Any | |
if TYPE_CHECKING: | |
from typing import Self, Type, Awaitable, Coroutine, Callable | |
rlcompleter | |
readline.parse_and_bind("tab: complete") | |
with suppress(ImportError): | |
from dateutil.relativedelta import relativedelta | |
print("relativedelta imported.") | |
relativedelta | |
with suppress(ImportError): | |
print("boto3 imported.") | |
import boto3 | |
boto3 | |
json | |
datetime | |
Optional | |
Union | |
Dict | |
Tuple | |
List | |
logger = logging.getLogger(__name__) | |
logging.basicConfig(stream=sys.stderr) | |
loop: "ThreadedEventLoop | asyncio.AbstractEventLoop" | |
try: | |
loop = asyncio.get_running_loop() | |
except RuntimeError: | |
async def _create_and_start_task( | |
coro: "Coroutine[Any, Any, T]", | |
*, | |
task_set: Optional[Set[asyncio.Task]] = None, | |
**kwargs, | |
) -> "asyncio.Task[T]": | |
assert not isinstance(coro, asyncio.Task) | |
loop = asyncio.get_running_loop() | |
task = loop.create_task(coro, **kwargs) | |
shielded = asyncio.shield(task) | |
with suppress(asyncio.TimeoutError): | |
await asyncio.wait_for(shielded, timeout=0) | |
if task_set is not None: | |
task_set.add(task) | |
task.add_done_callback(task_set.discard) | |
return task | |
T = TypeVar("T") | |
def await_(awaitable: "Awaitable[T]") -> "T": | |
""" | |
>>> assert loop | |
>>> await_(asyncio.sleep(1)) | |
""" | |
return loop.run_until_complete(awaitable) | |
print("defining await_(...) function to substitute for top level await.") | |
def wrap_run_coroutine_threadsafe(func): | |
@functools.wraps(func) | |
def run_coroutine_threadsafe(coro, loop): | |
if isinstance(loop, ThreadedEventLoop): | |
loop = loop.loop | |
return func(coro, loop) | |
return run_coroutine_threadsafe | |
asyncio.run_coroutine_threadsafe = wrap_run_coroutine_threadsafe( | |
asyncio.run_coroutine_threadsafe | |
) | |
del wrap_run_coroutine_threadsafe | |
class ThreadedEventLoop: | |
__slots__ = ( | |
"loop", | |
"_wait_for_shutdown", | |
"_status", | |
"thread", | |
"main_task", | |
"_lock", | |
"_event_loop_result", | |
"_tasks", | |
) | |
main_task: "weakref.ReferenceType[asyncio.Task]" | |
_tasks: Set[asyncio.Task] | |
def __init__( | |
self, | |
thread: threading.Thread, | |
loop: asyncio.AbstractEventLoop, | |
wait_for_shutdown_event: asyncio.Event, | |
main_task: asyncio.Task, | |
result: concurrent.futures.Future, | |
): | |
assert isinstance(loop, asyncio.AbstractEventLoop) and loop.is_running() | |
assert isinstance(wait_for_shutdown_event, asyncio.Event) | |
self.loop = loop | |
self._wait_for_shutdown = wait_for_shutdown_event | |
self._status = ("running",) | |
if thread is None: | |
thread = threading.current_thread() | |
self.thread = thread | |
self._lock = threading.RLock() | |
self.main_task = weakref.ref(main_task) | |
self._event_loop_result = result | |
self._tasks = set() | |
@property | |
def status(self) -> "tuple[str, ...]": | |
return self._status | |
def __getattr__(self, name: str): | |
if name in ("result", "exception"): | |
return getattr(self._event_loop_result, name) | |
return getattr(self.loop, name) | |
def __dir__(self) -> "list[str]": | |
mine = object.__dir__(self) | |
theirs = dir(self.loop) | |
return [*mine, *theirs] | |
def __repr__(self): | |
return f"{type(self).__name__}(status={self.status}, {self.loop!s})" | |
def create_task( | |
self, coro: "Coroutine[Any, Any, T]", *, name=None, context=None | |
) -> "asyncio.Task[T]": | |
kwargs = {} | |
if name is not None: | |
kwargs["name"] = name | |
if context is not None: | |
kwargs["context"] = context | |
task = self.run_until_complete( | |
_create_and_start_task(coro, task_set=self._tasks, **kwargs) | |
) | |
return task | |
def run_until_complete(self, coro: "Awaitable[T]") -> T: | |
if isinstance(coro, asyncio.Task): | |
task: asyncio.Task = coro | |
del coro | |
if task.done(): | |
return task.result() | |
async def waker(): | |
me = asyncio.current_task() | |
self._tasks.add(me) | |
me.add_done_callback(self._tasks.discard) | |
return await task | |
coro = waker() | |
assert inspect.isawaitable(coro) | |
handle = asyncio.run_coroutine_threadsafe(coro, self.loop) | |
return handle.result() | |
async def wait_for_shutdown(self) -> None: | |
assert not self._wait_for_shutdown.is_set() | |
await self._wait_for_shutdown.wait() | |
def shutdown(self) -> None: | |
with self._lock: | |
if self._status[0] == "running": | |
assert not self._wait_for_shutdown.is_set() | |
assert self.loop.is_running() | |
assert self.thread.is_alive() | |
task = self.main_task() | |
assert task is not None and not task.done() | |
self._status = ("stopping",) | |
if threading.current_thread() is self.thread: | |
def _after(t: asyncio.Task): | |
self._status = ("stopped",) | |
self._wait_for_shutdown.clear() | |
task.add_done_callback(_after) | |
self._wait_for_shutdown.set() | |
return | |
else: | |
async def _shut_me_down(): | |
self._wait_for_shutdown.set() | |
await asyncio.sleep(0) | |
shutdown_request = asyncio.run_coroutine_threadsafe( | |
_shut_me_down(), self.loop | |
) | |
with suppress(asyncio.CancelledError): | |
shutdown_request.result() | |
self._status = ("stopped",) | |
self._wait_for_shutdown.clear() | |
def run_in_executor( | |
self, executor: Optional[concurrent.futures.Executor], func, *args: Any | |
) -> asyncio.Future: | |
assert callable(func) | |
assert not inspect.iscoroutinefunction(func) | |
assert self.is_running() | |
if threading.current_thread() is self.thread: | |
return self.loop.run_in_executor(executor, func, *args) | |
async def executor_thunk(): | |
loop = asyncio.get_running_loop() | |
return await loop.run_in_executor(executor, func, *args) | |
return self.create_task(executor_thunk()) | |
@classmethod | |
def create_threaded_event_loop( | |
cls: "Type[Self]", loop: Optional[asyncio.AbstractEventLoop] = None | |
) -> "Self": | |
assert loop is None or isinstance(loop, asyncio.AbstractEventLoop) | |
q: "concurrent.futures.Future[ThreadedEventLoop]" = ( | |
concurrent.futures.Future() | |
) | |
f: "concurrent.futures.Future[Any]" = concurrent.futures.Future() | |
async def _main() -> None: | |
this_task = asyncio.current_task() | |
assert this_task is not None | |
try: | |
threaded_loop_proxy = cls( | |
threading.current_thread(), | |
asyncio.get_running_loop(), | |
asyncio.Event(), | |
this_task, | |
f, | |
) | |
except Exception as e: | |
q.set_exception(e) | |
else: | |
q.set_result(threaded_loop_proxy) | |
await threaded_loop_proxy.wait_for_shutdown() | |
def _thread_main( | |
coro_or_coro_factory: "Awaitable[T] | Callable[[], Awaitable[T]]", | |
loop: Optional[asyncio.AbstractEventLoop], | |
) -> None: | |
if loop is None: | |
loop = asyncio.new_event_loop() | |
assert isinstance(loop, asyncio.AbstractEventLoop) | |
if inspect.isawaitable(coro_or_coro_factory): | |
coro = coro_or_coro_factory | |
elif callable(coro_or_coro_factory): | |
coro = coro_or_coro_factory() | |
else: | |
assert False, "unreachable" | |
assert inspect.isawaitable(coro) | |
if isinstance(coro, asyncio.Task): | |
task = coro | |
del coro | |
async def thunk(): | |
return await task | |
coro = thunk() | |
asyncio.set_event_loop(loop) | |
if not f.set_running_or_notify_cancel(): | |
return | |
try: | |
result = loop.run_until_complete(coro) | |
except Exception as e: | |
f.set_exception(e) | |
else: | |
f.set_result(result) | |
finally: | |
atexit.unregister(_shutdown_event_loop) | |
if loop.is_running(): | |
loop.stop() | |
me = q.result() | |
if me._tasks: | |
_, pending = loop.run_until_complete( | |
asyncio.wait( | |
me._tasks, | |
return_when=asyncio.ALL_COMPLETED, | |
timeout=0.1, | |
) | |
) | |
log_thunk = logger.debug | |
if sys.flags.dev_mode: | |
log_thunk = logger.warning | |
if pending: | |
log_thunk(f"cancelling {len(pending)} pending futures") | |
for p in pending: | |
log_thunk(f"cancelling {p}") | |
p.cancel() | |
loop.run_until_complete( | |
asyncio.gather(*pending, return_exceptions=True) | |
) | |
# with suppress(Exception): | |
loop.run_until_complete(loop.shutdown_asyncgens()) | |
with suppress(AttributeError): | |
loop.run_until_complete(loop.shutdown_default_executor()) | |
loop.close() | |
loop_thread = threading.Thread( | |
target=_thread_main, | |
args=(_main, loop), | |
name=f"loop-thread-for[{threading.get_ident()}]", | |
) | |
loop_thread.daemon = True | |
loop_thread.start() | |
threaded_loop = q.result() | |
assert isinstance(threaded_loop, cls) | |
assert threaded_loop.status[0] == "running" | |
def _shutdown_event_loop(loop: ThreadedEventLoop) -> None: | |
if loop.is_running(): | |
loop.shutdown() | |
if loop.thread.is_alive(): | |
loop.thread.join() | |
atexit.register(_shutdown_event_loop, threaded_loop) | |
return threaded_loop | |
@classmethod | |
def make_default_event_loop( | |
cls: "Type[Self]", loop: "Union[asyncio.AbstractEventLoop, Self]" | |
) -> "Self": | |
assert loop.is_running() | |
event_loop: asyncio.AbstractEventLoop | |
if not isinstance(loop, cls): | |
assert isinstance(loop, asyncio.AbstractEventLoop) | |
loop = cls.create_threaded_event_loop(loop=loop) | |
event_loop = loop.loop | |
asyncio.set_event_loop(event_loop) | |
asyncio._set_running_loop(event_loop) | |
return loop | |
loop = ThreadedEventLoop.create_threaded_event_loop() | |
ThreadedEventLoop.make_default_event_loop(loop) | |
print( | |
"Imported modules: `asyncio`, `json`, `datetime`, `logging`, `sys`. Event loop at `loop`" | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This allows a 3.7+ vanilla
python3
shell to:loop.run_until_complete(...)
/loop.create_task
/loop.run_in_executor
safely so that it behaves as expectedObviously you could use
python -m asyncio
in 3.10, but if you're not able to do that or you simply wish to be able to customize which event loop, then this should be of use.There's an
await_(coro)
function that will serve as anawait coro
-replacement.Via
python
(3.7.16)Via
python -X dev
we see