Created
January 5, 2019 12:04
-
-
Save PGryllos/b0509fb8b6535a9be6e39ece40cec8c7 to your computer and use it in GitHub Desktop.
Revisions
-
PGryllos created this gist
Jan 5, 2019 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,61 @@ """ Using dask's multithreaded scheduler to speedup download of multiple files from an s3 bucket """ import os from functools import partial import botocore import boto3 import dask from dask.diagnostics import ProgressBar # playing with the number of threads can increase / decrease the throughput dask.config.set(scheduler='threads', num_workers=20) def _s3_download(s3_client, path, bucket, key): """wrapper to avoid crushing on not found objects s3_client: s3 resource service client path: path to store the downloaded file bucket: bucket in which to find the file key: key of the file """ try: s3_client.Bucket(bucket).download_file( key, os.path.join(path, key) ) except botocore.exceptions.ClientError as e: if e.response['Error']['Code'] == '404': print('The object does not exist') else: raise def fetch_multiple(aws_access_key_id, aws_secret_access_key, bucket, keys, path): """Initialise an s3 client Session and download a list of files aws_access_key_id: access key aws_secret_access_key: secret key bucket: s3 bucket where the files are stored keys: list of keys to download """ session = boto3.Session( aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key, ) s3 = session.resource('s3') _download = partial(_s3_download, s3, path, bucket) delayed_futures = [] for k in keys: delayed_futures.append(dask.delayed(_download)(k)) with ProgressBar(): dask.compute(*delayed_futures)