-
-
Save asdf8601/3d79a1a61f8bd74f64657445c4ebb39c to your computer and use it in GitHub Desktop.
Parallelizing with Dask & Dask Distributed
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from distributed import Client, as_completed | |
from dask import delayed | |
from time import sleep | |
import numpy as np | |
from pprint import pprint | |
# Define a time-consuming task | |
def foo(n): | |
print("Starting the {:d}-second task".format(n)) | |
sleep(n) | |
print("Ending the {:d}-second task".format(n)) | |
return n | |
######################################################################################################################## | |
# Goal: parallel execution of the above task for the following set of params: | |
######################################################################################################################## | |
# Specify task details | |
args_iterable = range(5, 15) # launch task with 10 different sets of arguments | |
is_pure = False # specifiy task purity (https://toolz.readthedocs.io/en/latest/purity.html) | |
# optional: it enables a finer control of caching | |
c = Client(processes=False) # setup cluster: scheduler + workers (choose to deploy them either as processes or threads) | |
# We can also connect to a remotely deployed cluster (if available to us) | |
pprint(c.scheduler_info()) # print information about the cluster we are currently connecting to | |
######################################################################################################################## | |
# Solution 1: using the standard concurrent.futures interface (PEP-3148) | |
######################################################################################################################## | |
# Send tasks | |
send_mode = 'map' | |
if send_mode == 'submit': | |
fut_list = [] | |
for i in args_iterable: | |
f = c.submit(foo, i, pure=is_pure) # start executing foo(10) immediately, pure functions are cached | |
fut_list.append(f) | |
elif send_mode == 'map': | |
fut_list = c.map(foo, args_iterable, pure=is_pure) | |
# Receive task results | |
rec_mode = 'as_completed' | |
if rec_mode == 'gather': | |
# Wait for all tasks to finish and gather the results in a list | |
out = c.gather(fut_list) | |
print('Result:', out) | |
elif rec_mode == 'result': | |
# Wait for the first one of the tasks to finish | |
out = fut_list[0].result() | |
print('Result:', out) | |
elif rec_mode == 'as_completed': | |
# Retrieve each task result as they finish | |
for future, out in as_completed(fut_list, with_results=True): | |
print('Result:', out) | |
######################################################################################################################## | |
# Solution 2: define a computation DAG by chaining Delayed-type objects | |
######################################################################################################################## | |
# Build a list of tasks and aggregat them into a parent task object, but do not execute them yet | |
task_list = [] | |
for i in args_iterable: | |
task_list.append(delayed(foo, pure=is_pure)(i)) | |
parent = delayed(task_list) | |
# Execute tasks synchronously (interpreter blocks until all results are computed) | |
out = parent.compute() | |
print('Result:', out) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment