Last active
April 21, 2018 19:45
-
-
Save j0e1in/dcdc253defba4f575fb308ef4f2af8ce to your computer and use it in GitHub Desktop.
Run an async function / coroutine in a thread.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
########################################################## | |
# Use cases | |
# 1. Start an async function in __init__ | |
# 2. Start an endless (while loop) async function while current event loop is running | |
# 3. ... | |
# | |
# Limitations | |
# 1. Some libraries (eg. motor) cannot be executed using this method, because you can't control how event loop is retrieved. (See another method below) | |
# | |
# Usage: | |
# async def func(count, add=1) | |
# while True: | |
# count += add | |
# print(count) | |
# await asyncio.sleep(1) | |
# | |
# th = run_async_in_thread(func, 1, add=3) | |
# | |
# # Tip: restart a thread if it is dead | |
# while True: | |
# if not th.is_alive(): | |
# th = run_async_in_thread(func, 1, add=3) | |
########################################################## | |
from threading import Thread | |
import asyncio | |
def run_async_in_thread(func, *args, **kwargs): | |
def run_async(func, loop, *args, **kwargs): | |
loop.run_until_complete(func(*args, **kwargs)) | |
loop = asyncio.new_event_loop() | |
th = Thread(target=run_async, args=(func, loop, *args), kwargs=kwargs) | |
th.start() | |
return th | |
# Another option is to use `asyncio.run_coroutine_threadsafe`. | |
# This method is more robust to many situation, | |
# but it requires an additional step to get traceback. | |
import asyncio | |
import sys | |
import traceback | |
async def catch_traceback(fn, *args, **kwargs): | |
""" Wraps `fn`(coroutine) in order to preserve the | |
traceback of any kind of exception. | |
""" | |
try: | |
return await fn(*args, **kwargs) | |
except Exception: | |
raise sys.exc_info()[0](traceback.format_exc()) | |
loop = asyncio.get_event_loop() | |
future = asyncio.run_coroutine_threadsafe( | |
catch_traceback(func, 1, add=3), loop) | |
while True: | |
try: | |
res = future.exception(timeout=1) # try to retrive exception / result | |
except concurrent.futures.TimeoutError: | |
pass # skip and do other tasks first | |
else: | |
# handle exception / result | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment