Last active
September 18, 2024 19:34
-
-
Save Marinell0/2f549eefa15a157863aac52ec91e18c6 to your computer and use it in GitHub Desktop.
Parallelize any python function
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from concurrent.futures import ThreadPoolExecutor, Future | |
from collections.abc import Callable, Iterable, Sized | |
from typing import TypeVar | |
import os | |
from tqdm import tqdm | |
T = TypeVar('T') | |
R = TypeVar('R') | |
def parallelize(func: Callable[[T], R], iterable: Iterable[T], max_workers: int | None, *args) -> list[R]: | |
""" | |
Parallelize a function that receives an item from an iterable and returns a result. | |
Args: | |
func: Function to be parallelized | |
iterable: Iterable to be used as input for the function | |
max_workers: Number of threads to be used | |
args: Additional arguments to be passed to the function | |
Return: | |
List of results with the same type as the function return | |
""" | |
futures: list[tuple[T, Future[R]]] = [] | |
# If iterable has a length, use tqdm parameter to show progress | |
if isinstance(iterable, Sized): | |
pbar = tqdm(total=len(iterable), desc=func.__name__, smoothing=0) | |
else: | |
pbar = tqdm(desc=func.__name__, smoothing=0) | |
cpu_count: int = max_workers if max_workers is not None else len(os.sched_getaffinity(0)) | |
def when_done_callback_generator(pbar: tqdm, item: T) -> Callable[[Future[R]], None]: | |
def when_done_callback(future: Future[R]) -> None: | |
pbar.update(1) | |
str_item = (str(item)[:17] + '...') if len(str(item)) > 20 else str(item) | |
pbar.set_description(f"{func.__name__} - (Last item: {str_item})") | |
return when_done_callback | |
with ThreadPoolExecutor(max_workers=cpu_count) as executor: | |
for item in iterable: | |
future = executor.submit(func, item, *args) | |
future.add_done_callback(when_done_callback_generator(pbar, item)) | |
futures.append((item, future)) | |
results: list[R] = [] | |
index = 0 | |
for future in futures: | |
try: | |
results.append(future[1].result()) | |
index += 1 | |
except Exception as e: | |
# Log error, showing the item that caused the error | |
print(f'Error in {future[1]}, for parameters {future[0]} at position {index}: {e}') | |
raise e | |
return results |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment