package main import ( "context" "fmt" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/feature/s3/manager" "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/dustin/go-humanize" "io" "log" "sync" "sync/atomic" ) type OrderedWriter struct { buffer sync.Map writer *io.PipeWriter written uint64 expected int64 size *uint64 wg sync.WaitGroup mu sync.Mutex } func (o *OrderedWriter) WriteAt(p []byte, offset int64) (n int, err error) { o.wg.Add(1) writeBlockSize := len(p) if o.size != nil { atomic.AddUint64(&o.written, uint64(writeBlockSize)) percentageDownloaded := float32(o.written*100) / float32(*o.size) log.Printf("File size: %s, downloaded: %s, percentage: %.2f%%\n", humanize.IBytes(*o.size), humanize.IBytes(o.written), percentageDownloaded) } buf := make([]byte, writeBlockSize) copy(buf, p) o.buffer.Store(offset, buf) o.flush() return writeBlockSize, nil } func (o *OrderedWriter) flush() { o.mu.Lock() defer func() { o.wg.Done() o.mu.Unlock() }() for { if data, ok := o.buffer.Load(o.expected); ok { buf := data.([]byte) flushBlockSize := len(buf) _, err := o.writer.Write(buf) if err != nil { panic("Error writing to buf: " + err.Error()) } o.buffer.Delete(o.expected) atomic.AddInt64(&o.expected, int64(flushBlockSize)) } else { break } } } func download(s3Client *s3.Client, orderedWriter *OrderedWriter, sourceBucket string, sourceKey string) { log.Println("Begin downloading...") downloader := manager.NewDownloader(s3Client, func(downloader *manager.Downloader) { // Define a strategy that will buffer 50 MiB in memory downloader.BufferProvider = manager.NewPooledBufferedWriterReadFromProvider(50 * 1024 * 1024) downloader.PartSize = 100 * 1024 * 1024 // 100MB downloader.Concurrency = 5 downloader.PartBodyMaxRetries = 5 }) _, err := downloader.Download(context.TODO(), orderedWriter, &s3.GetObjectInput{ Bucket: aws.String(sourceBucket), Key: aws.String(sourceKey), }) if err != nil { log.Fatalf("download failed due to: %v\n", err) } log.Println("Download Success!") orderedWriter.wg.Wait() defer func() { if err := orderedWriter.writer.Close(); err != nil { panic(fmt.Sprintf("close writer error: %v", err)) } }() } func upload(s3Client *s3.Client, wg *sync.WaitGroup, destinationBucket string, destinationKey string, reader *io.PipeReader) { defer wg.Done() uploader := manager.NewUploader(s3Client, func(u *manager.Uploader) { // Define a strategy that will buffer 50 MiB in memory u.BufferProvider = manager.NewBufferedReadSeekerWriteToPool(50 * 1024 * 1024) u.PartSize = 100 * 1024 * 1024 // 100 MiB u.Concurrency = 5 }) log.Println("Begin uploading...") _, err := uploader.Upload(context.TODO(), &s3.PutObjectInput{ Bucket: aws.String(destinationBucket), Key: aws.String(destinationKey), Body: reader, }) if err != nil { log.Fatalf("upload failed due to: %v\n", err) } log.Println("Finish uploading...") } func getFileSize(s3Client s3.Client, sourceBucket string, sourceKey string) *uint64 { headObject, err := s3Client.HeadObject(context.TODO(), &s3.HeadObjectInput{ Bucket: aws.String(sourceBucket), Key: aws.String(sourceKey), }) if err != nil { log.Fatalf("failed to get source object metadata, err: %v", err) } if headObject.ContentLength != nil { return aws.Uint64(uint64(*headObject.ContentLength)) } return nil } func getS3Client(profile string) *s3.Client { // Using the SDK's default configuration, More about Configuring the AWS SDK // https://aws.github.io/aws-sdk-go-v2/docs/configuring-sdk/ cfg, err := config.LoadDefaultConfig(context.TODO(), config.WithSharedConfigProfile(profile)) if err != nil { log.Fatalf("failed to load aws config of profile: %s, err: %v", profile, err) } return s3.NewFromConfig(cfg, func(options *s3.Options) { options.RetryMaxAttempts = 5 }) } func main() { // REPLACE AS YOUR REQUIRED CONFIGURATION BEGIN sourceBucket := "source-bucket" destinationBucket := "destination-bucket" sourceKey := "source-key" destinationKey := "destination-key" sourceAccountProfile := "source-profile" destinationAccountProfile := "destination-profile" // REPLACE AS YOUR REQUIRED CONFIGURATION END sourceS3Client := getS3Client(sourceAccountProfile) destinationClient := getS3Client(destinationAccountProfile) reader, writer := io.Pipe() orderedWriter := &OrderedWriter{ writer: writer, expected: 0, size: getFileSize(*sourceS3Client, sourceBucket, sourceKey), } var wg sync.WaitGroup wg.Add(1) go download(sourceS3Client, orderedWriter, sourceBucket, sourceKey) go upload(destinationClient, &wg, destinationBucket, destinationKey, reader) wg.Wait() log.Println("Download && Upload Success!") }