Skip to content

Instantly share code, notes, and snippets.

@autumnjolitz
Last active July 18, 2024 01:28
Show Gist options
  • Save autumnjolitz/d30c2ff23e77062385321309b8912808 to your computer and use it in GitHub Desktop.
Save autumnjolitz/d30c2ff23e77062385321309b8912808 to your computer and use it in GitHub Desktop.
console supports for lack of top level await in non-await terminal
import asyncio
import atexit
import concurrent.futures
import datetime
import functools
import inspect
import json
import logging
import readline
import rlcompleter
import sys
import threading
import weakref
from contextlib import suppress
from typing import Optional, Union, Dict, Tuple, List, TYPE_CHECKING, TypeVar, Set, Any
if TYPE_CHECKING:
from typing import Self, Type, Awaitable, Coroutine, Callable
rlcompleter
readline.parse_and_bind("tab: complete")
with suppress(ImportError):
from dateutil.relativedelta import relativedelta
print("relativedelta imported.")
relativedelta
with suppress(ImportError):
print("boto3 imported.")
import boto3
boto3
json
datetime
Optional
Union
Dict
Tuple
List
logger = logging.getLogger(__name__)
logging.basicConfig(stream=sys.stderr)
loop: "ThreadedEventLoop | asyncio.AbstractEventLoop"
try:
loop = asyncio.get_running_loop()
except RuntimeError:
async def _create_and_start_task(
coro: "Coroutine[Any, Any, T]",
*,
task_set: Optional[Set[asyncio.Task]] = None,
**kwargs,
) -> "asyncio.Task[T]":
assert not isinstance(coro, asyncio.Task)
loop = asyncio.get_running_loop()
task = loop.create_task(coro, **kwargs)
shielded = asyncio.shield(task)
with suppress(asyncio.TimeoutError):
await asyncio.wait_for(shielded, timeout=0)
if task_set is not None:
task_set.add(task)
task.add_done_callback(task_set.discard)
return task
T = TypeVar("T")
def await_(awaitable: "Awaitable[T]") -> "T":
"""
>>> assert loop
>>> await_(asyncio.sleep(1))
"""
return loop.run_until_complete(awaitable)
print("defining await_(...) function to substitute for top level await.")
def wrap_run_coroutine_threadsafe(func):
@functools.wraps(func)
def run_coroutine_threadsafe(coro, loop):
if isinstance(loop, ThreadedEventLoop):
loop = loop.loop
return func(coro, loop)
return run_coroutine_threadsafe
asyncio.run_coroutine_threadsafe = wrap_run_coroutine_threadsafe(
asyncio.run_coroutine_threadsafe
)
del wrap_run_coroutine_threadsafe
class ThreadedEventLoop:
__slots__ = (
"loop",
"_wait_for_shutdown",
"_status",
"thread",
"main_task",
"_lock",
"_event_loop_result",
"_tasks",
)
main_task: "weakref.ReferenceType[asyncio.Task]"
_tasks: Set[asyncio.Task]
def __init__(
self,
thread: threading.Thread,
loop: asyncio.AbstractEventLoop,
wait_for_shutdown_event: asyncio.Event,
main_task: asyncio.Task,
result: concurrent.futures.Future,
):
assert isinstance(loop, asyncio.AbstractEventLoop) and loop.is_running()
assert isinstance(wait_for_shutdown_event, asyncio.Event)
self.loop = loop
self._wait_for_shutdown = wait_for_shutdown_event
self._status = ("running",)
if thread is None:
thread = threading.current_thread()
self.thread = thread
self._lock = threading.RLock()
self.main_task = weakref.ref(main_task)
self._event_loop_result = result
self._tasks = set()
@property
def status(self) -> "tuple[str, ...]":
return self._status
def __getattr__(self, name: str):
if name in ("result", "exception"):
return getattr(self._event_loop_result, name)
return getattr(self.loop, name)
def __dir__(self) -> "list[str]":
mine = object.__dir__(self)
theirs = dir(self.loop)
return [*mine, *theirs]
def __repr__(self):
return f"{type(self).__name__}(status={self.status}, {self.loop!s})"
def create_task(
self, coro: "Coroutine[Any, Any, T]", *, name=None, context=None
) -> "asyncio.Task[T]":
kwargs = {}
if name is not None:
kwargs["name"] = name
if context is not None:
kwargs["context"] = context
task = self.run_until_complete(
_create_and_start_task(coro, task_set=self._tasks, **kwargs)
)
return task
def run_until_complete(self, coro: "Awaitable[T]") -> T:
if isinstance(coro, asyncio.Task):
task: asyncio.Task = coro
del coro
if task.done():
return task.result()
async def waker():
me = asyncio.current_task()
self._tasks.add(me)
me.add_done_callback(self._tasks.discard)
return await task
coro = waker()
assert inspect.isawaitable(coro)
handle = asyncio.run_coroutine_threadsafe(coro, self.loop)
return handle.result()
async def wait_for_shutdown(self) -> None:
assert not self._wait_for_shutdown.is_set()
await self._wait_for_shutdown.wait()
def shutdown(self) -> None:
with self._lock:
if self._status[0] == "running":
assert not self._wait_for_shutdown.is_set()
assert self.loop.is_running()
assert self.thread.is_alive()
task = self.main_task()
assert task is not None and not task.done()
self._status = ("stopping",)
if threading.current_thread() is self.thread:
def _after(t: asyncio.Task):
self._status = ("stopped",)
self._wait_for_shutdown.clear()
task.add_done_callback(_after)
self._wait_for_shutdown.set()
return
else:
async def _shut_me_down():
self._wait_for_shutdown.set()
await asyncio.sleep(0)
shutdown_request = asyncio.run_coroutine_threadsafe(
_shut_me_down(), self.loop
)
with suppress(asyncio.CancelledError):
shutdown_request.result()
self._status = ("stopped",)
self._wait_for_shutdown.clear()
def run_in_executor(
self, executor: Optional[concurrent.futures.Executor], func, *args: Any
) -> asyncio.Future:
assert callable(func)
assert not inspect.iscoroutinefunction(func)
assert self.is_running()
if threading.current_thread() is self.thread:
return self.loop.run_in_executor(executor, func, *args)
async def executor_thunk():
loop = asyncio.get_running_loop()
return await loop.run_in_executor(executor, func, *args)
return self.create_task(executor_thunk())
@classmethod
def create_threaded_event_loop(
cls: "Type[Self]", loop: Optional[asyncio.AbstractEventLoop] = None
) -> "Self":
assert loop is None or isinstance(loop, asyncio.AbstractEventLoop)
q: "concurrent.futures.Future[ThreadedEventLoop]" = (
concurrent.futures.Future()
)
f: "concurrent.futures.Future[Any]" = concurrent.futures.Future()
async def _main() -> None:
this_task = asyncio.current_task()
assert this_task is not None
try:
threaded_loop_proxy = cls(
threading.current_thread(),
asyncio.get_running_loop(),
asyncio.Event(),
this_task,
f,
)
except Exception as e:
q.set_exception(e)
else:
q.set_result(threaded_loop_proxy)
await threaded_loop_proxy.wait_for_shutdown()
def _thread_main(
coro_or_coro_factory: "Awaitable[T] | Callable[[], Awaitable[T]]",
loop: Optional[asyncio.AbstractEventLoop],
) -> None:
if loop is None:
loop = asyncio.new_event_loop()
assert isinstance(loop, asyncio.AbstractEventLoop)
if inspect.isawaitable(coro_or_coro_factory):
coro = coro_or_coro_factory
elif callable(coro_or_coro_factory):
coro = coro_or_coro_factory()
else:
assert False, "unreachable"
assert inspect.isawaitable(coro)
if isinstance(coro, asyncio.Task):
task = coro
del coro
async def thunk():
return await task
coro = thunk()
asyncio.set_event_loop(loop)
if not f.set_running_or_notify_cancel():
return
try:
result = loop.run_until_complete(coro)
except Exception as e:
f.set_exception(e)
else:
f.set_result(result)
finally:
atexit.unregister(_shutdown_event_loop)
if loop.is_running():
loop.stop()
me = q.result()
if me._tasks:
_, pending = loop.run_until_complete(
asyncio.wait(
me._tasks,
return_when=asyncio.ALL_COMPLETED,
timeout=0.1,
)
)
log_thunk = logger.debug
if sys.flags.dev_mode:
log_thunk = logger.warning
if pending:
log_thunk(f"cancelling {len(pending)} pending futures")
for p in pending:
log_thunk(f"cancelling {p}")
p.cancel()
loop.run_until_complete(
asyncio.gather(*pending, return_exceptions=True)
)
# with suppress(Exception):
loop.run_until_complete(loop.shutdown_asyncgens())
with suppress(AttributeError):
loop.run_until_complete(loop.shutdown_default_executor())
loop.close()
loop_thread = threading.Thread(
target=_thread_main,
args=(_main, loop),
name=f"loop-thread-for[{threading.get_ident()}]",
)
loop_thread.daemon = True
loop_thread.start()
threaded_loop = q.result()
assert isinstance(threaded_loop, cls)
assert threaded_loop.status[0] == "running"
def _shutdown_event_loop(loop: ThreadedEventLoop) -> None:
if loop.is_running():
loop.shutdown()
if loop.thread.is_alive():
loop.thread.join()
atexit.register(_shutdown_event_loop, threaded_loop)
return threaded_loop
@classmethod
def make_default_event_loop(
cls: "Type[Self]", loop: "Union[asyncio.AbstractEventLoop, Self]"
) -> "Self":
assert loop.is_running()
event_loop: asyncio.AbstractEventLoop
if not isinstance(loop, cls):
assert isinstance(loop, asyncio.AbstractEventLoop)
loop = cls.create_threaded_event_loop(loop=loop)
event_loop = loop.loop
asyncio.set_event_loop(event_loop)
asyncio._set_running_loop(event_loop)
return loop
loop = ThreadedEventLoop.create_threaded_event_loop()
ThreadedEventLoop.make_default_event_loop(loop)
print(
"Imported modules: `asyncio`, `json`, `datetime`, `logging`, `sys`. Event loop at `loop`"
)
@autumnjolitz
Copy link
Author

autumnjolitz commented Jul 18, 2024

This allows a 3.7+ vanillapython3 shell to:

  1. provision an event loop, running, in a background thread
  2. wrap loop.run_until_complete(...)/loop.create_task/loop.run_in_executor safely so that it behaves as expected
  3. safely handle shutdown/close states

Obviously you could use python -m asyncio in 3.10, but if you're not able to do that or you simply wish to be able to customize which event loop, then this should be of use.

There's an await_(coro) function that will serve as an await coro-replacement.

Via python (3.7.16)

>>> await_(asyncio.sleep(1))
>>> task = loop.create_task(asyncio.sleep(30))
>>> task
<Task pending coro=<sleep() running at /usr/local/Cellar/[email protected]/3.7.16/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/tasks.py:595> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x10b387550>()]> cb=[set.discard()]>
>>> loop._tasks
{<Task pending coro=<sleep() running at /usr/local/Cellar/[email protected]/3.7.16/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/tasks.py:595> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x10b387550>()]> cb=[set.discard()]>}
>>> loop._tasks
{<Task pending coro=<sleep() running at /usr/local/Cellar/[email protected]/3.7.16/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/tasks.py:595> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x10b387550>()]> cb=[set.discard()]>}
>>> await_(task)
>>> task
<Task finished coro=<sleep() done, defined at /usr/local/Cellar/[email protected]/3.7.16/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/tasks.py:582> result=None>
>>>
>>> # okay, start a task and then exit. will it block?
>>> task = loop.create_task(asyncio.sleep(30))
>>> ^D

Via python -X dev we see

>>> # okay, start a task and then exit. will it block?
>>> task = loop.create_task(asyncio.sleep(30))
>>> ^D
WARNING:__main__:cancelling 1 pending futures
WARNING:__main__:cancelling <Task pending coro=<sleep() running at /usr/local/Cellar/[email protected]/3.7.16/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/tasks.py:595> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x10c4653c0>()] created at /usr/local/Cellar/[email protected]/3.7.16/Frameworks/Python.framework/Versions/3.7/lib/python3.7/asyncio/base_events.py:395> cb=[set.discard()] created at /Users/autumn/.pythonrc.py:51>

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment