Skip to content

Instantly share code, notes, and snippets.

@PGryllos
Created January 5, 2019 12:04
Show Gist options
  • Save PGryllos/b0509fb8b6535a9be6e39ece40cec8c7 to your computer and use it in GitHub Desktop.
Save PGryllos/b0509fb8b6535a9be6e39ece40cec8c7 to your computer and use it in GitHub Desktop.

Revisions

  1. PGryllos created this gist Jan 5, 2019.
    61 changes: 61 additions & 0 deletions s3_util.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,61 @@
    """
    Using dask's multithreaded scheduler to speedup download of multiple files from
    an s3 bucket
    """

    import os
    from functools import partial

    import botocore
    import boto3
    import dask
    from dask.diagnostics import ProgressBar


    # playing with the number of threads can increase / decrease the throughput
    dask.config.set(scheduler='threads', num_workers=20)


    def _s3_download(s3_client, path, bucket, key):
    """wrapper to avoid crushing on not found objects
    s3_client: s3 resource service client
    path: path to store the downloaded file
    bucket: bucket in which to find the file
    key: key of the file
    """
    try:
    s3_client.Bucket(bucket).download_file(
    key, os.path.join(path, key)
    )
    except botocore.exceptions.ClientError as e:
    if e.response['Error']['Code'] == '404':
    print('The object does not exist')
    else:
    raise


    def fetch_multiple(aws_access_key_id, aws_secret_access_key, bucket, keys,
    path):
    """Initialise an s3 client Session and download a list of files
    aws_access_key_id: access key
    aws_secret_access_key: secret key
    bucket: s3 bucket where the files are stored
    keys: list of keys to download
    """
    session = boto3.Session(
    aws_access_key_id=aws_access_key_id,
    aws_secret_access_key=aws_secret_access_key,
    )

    s3 = session.resource('s3')

    _download = partial(_s3_download, s3, path, bucket)

    delayed_futures = []
    for k in keys:
    delayed_futures.append(dask.delayed(_download)(k))

    with ProgressBar():
    dask.compute(*delayed_futures)