Last active
October 11, 2023 20:26
-
-
Save chapimenge3/19571d5f3ff9b3b094be7e9ef15dea14 to your computer and use it in GitHub Desktop.
Redis subscriber task runner in parallel multiple tasks.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import threading | |
import redis | |
redis_url = 'localhost' | |
username = 'default' | |
redis = r = redis.Redis( | |
host=redis_url, | |
port=6379, | |
decode_responses=True | |
) | |
task_sub = redis.pubsub() | |
task_sub.subscribe('task_queue') | |
print('Subscribed to task_queue') | |
async def async_range(count): | |
for i in range(count): | |
yield(i) | |
await asyncio.sleep(0.0) | |
async def pre_task(): | |
# This function runs before any task. | |
print('pre_task') | |
await asyncio.sleep(2) | |
async def task1(*args, **kwargs): | |
# Example task 1 | |
async for i in async_range(10): | |
print('task1', i, args, kwargs) | |
await asyncio.sleep(1) | |
async def task2(*args, **kwargs): | |
# Example task 2 | |
async for i in async_range(10): | |
print('task2', i, args, kwargs) | |
await asyncio.sleep(1) | |
async def main_task(*args, **kwargs): | |
# Run both tasks in parallel and wait for them to finish. | |
print('Running pre_task', args, kwargs) | |
await pre_task() | |
print('Running main_task', args, kwargs) | |
await asyncio.gather(task1(*args, **kwargs), task2(*args, **kwargs)) | |
print('main_task done', args, kwargs) | |
def run_main_task(*args, **kwargs): | |
# This function is used to run the main_task asynchronously. | |
asyncio.run(main_task(*args, **kwargs)) | |
def task_runner(): | |
# listen to any message in a loop | |
for msg in task_sub.listen(): | |
# print the message | |
print(msg) | |
# if the message is a task, run the main task in the background. | |
if msg['type'] == 'message': | |
args = msg['data'].split() | |
kwargs = {} | |
task_thread = threading.Thread(target=run_main_task, args=args, kwargs=kwargs) | |
task_thread.start() | |
if __name__ == '__main__': | |
# Start the task runner. | |
task_runner() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment