""" Using dask's multithreaded scheduler to speedup download of multiple files from an s3 bucket """ import os from functools import partial import botocore import boto3 import dask from dask.diagnostics import ProgressBar # playing with the number of threads can increase / decrease the throughput dask.config.set(scheduler='threads', num_workers=20) def _s3_download(s3_client, path, bucket, key): """wrapper to avoid crushing on not found objects s3_client: s3 resource service client path: path to store the downloaded file bucket: bucket in which to find the file key: key of the file """ try: s3_client.Bucket(bucket).download_file( key, os.path.join(path, key) ) except botocore.exceptions.ClientError as e: if e.response['Error']['Code'] == '404': print('The object does not exist') else: raise def fetch_multiple(aws_access_key_id, aws_secret_access_key, bucket, keys, path): """Initialise an s3 client Session and download a list of files aws_access_key_id: access key aws_secret_access_key: secret key bucket: s3 bucket where the files are stored keys: list of keys to download """ session = boto3.Session( aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key, ) s3 = session.resource('s3') _download = partial(_s3_download, s3, path, bucket) delayed_futures = [] for k in keys: delayed_futures.append(dask.delayed(_download)(k)) with ProgressBar(): dask.compute(*delayed_futures)