Skip to content

Instantly share code, notes, and snippets.

@Nihhaar
Last active January 11, 2020 19:50
Show Gist options
  • Select an option

  • Save Nihhaar/f8f95d860e11469dc11a0e75b241e346 to your computer and use it in GitHub Desktop.

Select an option

Save Nihhaar/f8f95d860e11469dc11a0e75b241e346 to your computer and use it in GitHub Desktop.

Revisions

  1. Nihhaar revised this gist Jan 11, 2020. 1 changed file with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion s3_multipart_upload.py
    Original file line number Diff line number Diff line change
    @@ -19,7 +19,7 @@

    class S3MultipartUpload():

    def __init__(self, client, bucket, key, part_size=int(25e6)):
    def __init__(self, client, bucket, key, part_size=int(5e6)):
    self.client = client
    self.bucket = bucket
    self.key = key
  2. Nihhaar created this gist Jan 11, 2020.
    80 changes: 80 additions & 0 deletions s3_multipart_upload.py
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,80 @@
    from datetime import datetime

    # Usage:
    # with S3MultipartUpload(client=boto3.client('s3'), destination_bucket, destination_key) as s3upload:
    # # Do something
    # s3upload.upload(content)
    # # Do something
    # s3upload.upload(content)
    # # Do something
    # s3upload.flush()
    #
    # This usage format is required since if the multi-part upload is aborted, we need to call abort_multipart_upload()
    # to delete the intermediate parts generated else we will be paying for the storage of these intermediate parts AND
    # if the multi-part upload is completed, we need to call completed_multipart_upload() for telling aws to construct
    # the final object from the uploaded parts else we will be paying for the storage of these intermediate parts without
    # the final object. This usage format ensures both functions are called according to the situation.
    #
    # Read more here: https://docs.aws.amazon.com/AmazonS3/latest/dev/mpuoverview.html

    class S3MultipartUpload():

    def __init__(self, client, bucket, key, part_size=int(25e6)):
    self.client = client
    self.bucket = bucket
    self.key = key
    self.part_size = part_size
    self.count_bytes = 0
    self.upload_bytes = b''
    self.part_counter = 1
    self.counter = 1
    self.parts = []
    self.mpuid = None

    def __enter__(self):
    if self.mpuid == None:
    self.mpuid = self.client.create_multipart_upload(Bucket=self.bucket, Key=self.key)["UploadId"]
    print("{} : Initiated multi-part upload with ID: {} for {}".format(datetime.now(), self.mpuid, self.bucket + "/" + self.key))
    return self

    def __exit__(self, type, value, tb):
    if tb is None:
    # No Exception, so complete the multipart upload
    print("{} : Completing multipart upload for key: {}".format(datetime.now(), self.key))
    result = self.client.complete_multipart_upload(
    Bucket=self.bucket,
    Key=self.key,
    UploadId=self.mpuid,
    MultipartUpload={"Parts": self.parts})
    else:
    # Exception occured, so abort the multipart upload
    print("{} : Aborting multipart upload with key: {}".format(datetime.now(), self.key))
    self.client.abort_multipart_upload(
    Bucket=self.bucket,
    Key=self.key,
    UploadId=self.mpuid
    )
    print("{} : Done".format(datetime.now()))

    def upload(self, part_bytes):
    self.upload_bytes += part_bytes
    if len(self.upload_bytes) > self.part_size:
    part = self.client.upload_part(Body=self.upload_bytes, Bucket=self.bucket, Key=self.key, UploadId=self.mpuid, PartNumber=self.part_counter)
    self.parts.append({"PartNumber": self.part_counter, "ETag": part["ETag"]})
    self.part_counter += 1
    self.count_bytes += len(self.upload_bytes)
    self.upload_bytes = b''

    if self.count_bytes / self.counter >= 100000000:
    print("{0} : Uploaded (cumulative) {1:.2f} MB".format(datetime.now(), self.count_bytes/1000000))
    self.counter += 1

    def flush(self):
    # Upload if anything is remaining
    if len(self.upload_bytes) != 0:
    part = self.client.upload_part(Body=self.upload_bytes, Bucket=self.bucket, Key=self.key, UploadId=self.mpuid, PartNumber=self.part_counter)
    self.parts.append({"PartNumber": self.part_counter, "ETag": part["ETag"]})
    self.part_counter += 1
    self.count_bytes += len(self.upload_bytes)
    self.upload_bytes = b''
    print("{0} : Uploaded (cumulative) {1:.2f} MB".format(datetime.now(), self.count_bytes/1000000))