Skip to content

Instantly share code, notes, and snippets.

@ikouchiha47
Last active January 2, 2025 05:39
Show Gist options
  • Save ikouchiha47/cb316887f0d1830bbe97f3807b4e2439 to your computer and use it in GitHub Desktop.
Save ikouchiha47/cb316887f0d1830bbe97f3807b4e2439 to your computer and use it in GitHub Desktop.
Python synchronise work between threads
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
from functools import wraps
# Global thread pool
thread_pool = ThreadPoolExecutor(max_workers=10)
workernames = list("abcdefghijklmnopqrstuvwxyz")
def submit_to_thread_pool(func):
@wraps(func)
def wrapper(*args, **kwargs):
return thread_pool.submit(func, *args, **kwargs)
return wrapper
@submit_to_thread_pool
def task_function(n, id):
import time
time.sleep(n)
return f"Task for {workernames[id]} with {n}s delay done!"
def worker(id):
print(f"Worker {workernames[id]} submitting tasks")
futures = [task_function(i, id) for i in range(1, 4)]
for future in as_completed(futures):
print(f"Worker {workernames[id]} received: {future.result()}")
print(f"Worker {workernames[id]} completed")
if __name__ == "__main__":
threads = [threading.Thread(target=worker, args=(i,)) for i in range(3)]
for t in threads:
t.start()
for t in threads:
t.join()
thread_pool.shutdown(wait=True)
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
from functools import wraps
class GlobalThreadPool:
"""
Wrapper for ThreadPoolExecutor to enable using 'with' syntax.
"""
def __init__(self, max_workers=None):
self.executor = None
self.max_workers = max_workers
def __enter__(self):
if self.executor is None:
self.executor = ThreadPoolExecutor(max_workers=self.max_workers)
return self
def __exit__(self, exc_type, exc_value, traceback):
if self.executor is not None:
self.executor.shutdown(wait=True)
self.executor = None
def submit(self, func, *args, **kwargs):
return self.executor.submit(func, *args, **kwargs)
workernames = list("abcdefghijklmnopqrstuvwxyz")
def submit_to_thread_pool(global_pool):
"""
Decorator to submit a function to the provided thread pool.
"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
return global_pool.submit(func, *args, **kwargs)
return wrapper
return decorator
def task_function(n, id):
"""
Simulate a task with a delay.
"""
import time
time.sleep(n)
return f"Task for worker {workernames[id]} with {n}s delay done!"
def worker(global_pool, id):
"""
Worker function that submits tasks to the thread pool.
"""
worker_name = workernames[id]
print(f"Worker {worker_name} submitting tasks")
# Submit tasks for this worker
futures = [global_pool.submit(task_function, i, id) for i in range(1, 4)]
# Process results as tasks are completed
for future in as_completed(futures):
print(f"Worker {worker_name} received: {future.result()}")
print(f"Worker {worker_name} completed")
if __name__ == "__main__":
# Using 'with' syntax for thread pool
with GlobalThreadPool(max_workers=10) as pool:
# Decorator usage for pre-defined task function
@submit_to_thread_pool(pool)
def decorated_task(n, id):
import time
time.sleep(n)
return f"Decorated task for worker {workernames[id]} with {n}s delay done!"
# Submit decorated tasks
futures = [decorated_task(i, 0) for i in range(1, 4)]
# Wait for results of decorated tasks
for future in as_completed(futures):
print(f"Main thread received: {future.result()}")
# Start worker threads
threads = [threading.Thread(target=worker, args=(pool, i)) for i in range(3)]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
import concurrent.futures
import threading
import time
import urllib.request
URLS = [
"http://www.foxnews.com/",
"http://www.cnn.com/",
"http://europe.wsj.com/",
"http://www.bbc.co.uk/",
"http://nonexistent-subdomain.python.org/",
]
signal = threading.Event()
# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
if url != URLS[4]:
time.sleep(4)
if signal.is_set():
raise Exception()
with urllib.request.urlopen(url, timeout=timeout) as conn:
return conn.read()
# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# Start the load operations and mark each future with its URL
future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print("%r generated an exception: %s" % (url, exc))
signal.set()
executor.shutdown()
print("exiting early")
exit(1)
else:
print("%r page is %d bytes" % (url, len(data)))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment