from datetime import datetime # Usage: # with S3MultipartUpload(client=boto3.client('s3'), destination_bucket, destination_key) as s3upload: # # Do something # s3upload.upload(content) # # Do something # s3upload.upload(content) # # Do something # s3upload.flush() # # This usage format is required since if the multi-part upload is aborted, we need to call abort_multipart_upload() # to delete the intermediate parts generated else we will be paying for the storage of these intermediate parts AND # if the multi-part upload is completed, we need to call completed_multipart_upload() for telling aws to construct # the final object from the uploaded parts else we will be paying for the storage of these intermediate parts without # the final object. This usage format ensures both functions are called according to the situation. # # Read more here: https://docs.aws.amazon.com/AmazonS3/latest/dev/mpuoverview.html class S3MultipartUpload(): def __init__(self, client, bucket, key, part_size=int(5e6)): self.client = client self.bucket = bucket self.key = key self.part_size = part_size self.count_bytes = 0 self.upload_bytes = b'' self.part_counter = 1 self.counter = 1 self.parts = [] self.mpuid = None def __enter__(self): if self.mpuid == None: self.mpuid = self.client.create_multipart_upload(Bucket=self.bucket, Key=self.key)["UploadId"] print("{} : Initiated multi-part upload with ID: {} for {}".format(datetime.now(), self.mpuid, self.bucket + "/" + self.key)) return self def __exit__(self, type, value, tb): if tb is None: # No Exception, so complete the multipart upload print("{} : Completing multipart upload for key: {}".format(datetime.now(), self.key)) result = self.client.complete_multipart_upload( Bucket=self.bucket, Key=self.key, UploadId=self.mpuid, MultipartUpload={"Parts": self.parts}) else: # Exception occured, so abort the multipart upload print("{} : Aborting multipart upload with key: {}".format(datetime.now(), self.key)) self.client.abort_multipart_upload( Bucket=self.bucket, Key=self.key, UploadId=self.mpuid ) print("{} : Done".format(datetime.now())) def upload(self, part_bytes): self.upload_bytes += part_bytes if len(self.upload_bytes) > self.part_size: part = self.client.upload_part(Body=self.upload_bytes, Bucket=self.bucket, Key=self.key, UploadId=self.mpuid, PartNumber=self.part_counter) self.parts.append({"PartNumber": self.part_counter, "ETag": part["ETag"]}) self.part_counter += 1 self.count_bytes += len(self.upload_bytes) self.upload_bytes = b'' if self.count_bytes / self.counter >= 100000000: print("{0} : Uploaded (cumulative) {1:.2f} MB".format(datetime.now(), self.count_bytes/1000000)) self.counter += 1 def flush(self): # Upload if anything is remaining if len(self.upload_bytes) != 0: part = self.client.upload_part(Body=self.upload_bytes, Bucket=self.bucket, Key=self.key, UploadId=self.mpuid, PartNumber=self.part_counter) self.parts.append({"PartNumber": self.part_counter, "ETag": part["ETag"]}) self.part_counter += 1 self.count_bytes += len(self.upload_bytes) self.upload_bytes = b'' print("{0} : Uploaded (cumulative) {1:.2f} MB".format(datetime.now(), self.count_bytes/1000000))