Skip to content

Instantly share code, notes, and snippets.

@Marinell0
Last active September 18, 2024 19:34
Show Gist options
  • Save Marinell0/2f549eefa15a157863aac52ec91e18c6 to your computer and use it in GitHub Desktop.
Save Marinell0/2f549eefa15a157863aac52ec91e18c6 to your computer and use it in GitHub Desktop.
Parallelize any python function
from concurrent.futures import ThreadPoolExecutor, Future
from collections.abc import Callable, Iterable, Sized
from typing import TypeVar
import os
from tqdm import tqdm
T = TypeVar('T')
R = TypeVar('R')
def parallelize(func: Callable[[T], R], iterable: Iterable[T], max_workers: int | None, *args) -> list[R]:
"""
Parallelize a function that receives an item from an iterable and returns a result.
Args:
func: Function to be parallelized
iterable: Iterable to be used as input for the function
max_workers: Number of threads to be used
args: Additional arguments to be passed to the function
Return:
List of results with the same type as the function return
"""
futures: list[tuple[T, Future[R]]] = []
# If iterable has a length, use tqdm parameter to show progress
if isinstance(iterable, Sized):
pbar = tqdm(total=len(iterable), desc=func.__name__, smoothing=0)
else:
pbar = tqdm(desc=func.__name__, smoothing=0)
cpu_count: int = max_workers if max_workers is not None else len(os.sched_getaffinity(0))
def when_done_callback_generator(pbar: tqdm, item: T) -> Callable[[Future[R]], None]:
def when_done_callback(future: Future[R]) -> None:
pbar.update(1)
str_item = (str(item)[:17] + '...') if len(str(item)) > 20 else str(item)
pbar.set_description(f"{func.__name__} - (Last item: {str_item})")
return when_done_callback
with ThreadPoolExecutor(max_workers=cpu_count) as executor:
for item in iterable:
future = executor.submit(func, item, *args)
future.add_done_callback(when_done_callback_generator(pbar, item))
futures.append((item, future))
results: list[R] = []
index = 0
for future in futures:
try:
results.append(future[1].result())
index += 1
except Exception as e:
# Log error, showing the item that caused the error
print(f'Error in {future[1]}, for parameters {future[0]} at position {index}: {e}')
raise e
return results
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment