Created
January 5, 2019 12:04
-
-
Save PGryllos/b0509fb8b6535a9be6e39ece40cec8c7 to your computer and use it in GitHub Desktop.
Using dask's multithreaded scheduler to speedup download of multiple files from s3
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Using dask's multithreaded scheduler to speedup download of multiple files from | |
an s3 bucket | |
""" | |
import os | |
from functools import partial | |
import botocore | |
import boto3 | |
import dask | |
from dask.diagnostics import ProgressBar | |
# playing with the number of threads can increase / decrease the throughput | |
dask.config.set(scheduler='threads', num_workers=20) | |
def _s3_download(s3_client, path, bucket, key): | |
"""wrapper to avoid crushing on not found objects | |
s3_client: s3 resource service client | |
path: path to store the downloaded file | |
bucket: bucket in which to find the file | |
key: key of the file | |
""" | |
try: | |
s3_client.Bucket(bucket).download_file( | |
key, os.path.join(path, key) | |
) | |
except botocore.exceptions.ClientError as e: | |
if e.response['Error']['Code'] == '404': | |
print('The object does not exist') | |
else: | |
raise | |
def fetch_multiple(aws_access_key_id, aws_secret_access_key, bucket, keys, | |
path): | |
"""Initialise an s3 client Session and download a list of files | |
aws_access_key_id: access key | |
aws_secret_access_key: secret key | |
bucket: s3 bucket where the files are stored | |
keys: list of keys to download | |
""" | |
session = boto3.Session( | |
aws_access_key_id=aws_access_key_id, | |
aws_secret_access_key=aws_secret_access_key, | |
) | |
s3 = session.resource('s3') | |
_download = partial(_s3_download, s3, path, bucket) | |
delayed_futures = [] | |
for k in keys: | |
delayed_futures.append(dask.delayed(_download)(k)) | |
with ProgressBar(): | |
dask.compute(*delayed_futures) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment