Last active
April 9, 2025 16:36
-
-
Save glommer/73f5ca1f777668d2c055233a0a3fe6f1 to your computer and use it in GitHub Desktop.
Revisions
-
glommer created this gist
Apr 4, 2025 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,522 @@ package main import ( "bytes" "context" "encoding/csv" "flag" "fmt" "log" "math/rand" "net/http" "os" "path/filepath" "strconv" "sync" "time" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/service/s3" ) // TestConfig holds the configuration for the benchmark type TestConfig struct { Bucket string ExpressBucket string Region string FileSizes []int // in KB NumIterations int OutputFile string SkipUpload bool SkipDownload bool SkipS3Standard bool SkipS3Express bool WarmupCount int // Number of warmup requests before measuring KeyPrefix string // Prefix for all test objects } // TestResult represents a single test result type TestResult struct { StorageType string Operation string FileSize int Duration time.Duration BytesPerSec float64 } // ObjectKeyCache tracks uploaded objects for reuse in download tests type ObjectKeyCache struct { StandardKeys map[int]string // Cached object keys per size for standard S3 ExpressKeys map[int]string // Cached object keys per size for S3 Express Mu sync.Mutex // Protects the maps } func main() { config := parseFlags() // Setup AWS client with optimized settings awsCfg, err := setupAWSClient(config.Region) if err != nil { log.Fatalf("Failed to set up AWS client: %v", err) } // Create S3 client - we'll use the same client for both standard and Express s3Client := s3.NewFromConfig(awsCfg) // Validate that the Express bucket follows the directory bucket naming convention if !config.SkipS3Express && !isValidDirectoryBucketName(config.ExpressBucket) { log.Printf("Warning: S3 Express bucket name '%s' doesn't appear to follow the directory bucket format.", config.ExpressBucket) log.Printf("S3 Express directory bucket names should be in format: 'name--az-id--x-s3' (e.g., 'mybucket--usw2-az1--x-s3')") } // Create key cache to track uploaded objects keyCache := &ObjectKeyCache{ StandardKeys: make(map[int]string), ExpressKeys: make(map[int]string), } // Generate random test data for each file size testData := make(map[int][]byte) for _, size := range config.FileSizes { testData[size] = generateRandomData(size * 1024) // Convert KB to bytes } var results []TestResult // Run warmup requests if configured if config.WarmupCount > 0 { log.Printf("Performing initial warmup with %d requests...", config.WarmupCount) runWarmup(s3Client, testData, config, keyCache) // Add a short delay after warmup to allow any background operations to complete time.Sleep(1 * time.Second) } // S3 Standard tests if !config.SkipS3Standard { log.Println("Starting S3 Standard tests...") results = append(results, runTests(s3Client, testData, config, "S3Standard", config.Bucket, keyCache)...) } // S3 Express tests if !config.SkipS3Express { log.Println("Starting S3 Express One Zone tests...") results = append(results, runTests(s3Client, testData, config, "S3Express", config.ExpressBucket, keyCache)...) } // Write results to CSV if err := writeResultsToCSV(results, config.OutputFile); err != nil { log.Fatalf("Failed to write results: %v", err) } log.Printf("Benchmark completed. Results written to %s", config.OutputFile) } // Check if a bucket name follows the S3 Express directory bucket naming convention func isValidDirectoryBucketName(name string) bool { // Directory bucket names should end with "--x-s3" return len(name) > 6 && name[len(name)-6:] == "--x-s3" } func parseFlags() TestConfig { bucket := flag.String("bucket", "", "S3 standard bucket name") expressBucket := flag.String("express-bucket", "", "S3 Express One Zone directory bucket name (format: name--az-id--x-s3)") region := flag.String("region", "us-east-1", "AWS region") fileSizesStr := flag.String("file-sizes", "4,16,64,256,1024", "Comma-separated list of file sizes in KB") numIterations := flag.Int("iterations", 5, "Number of iterations for each test") outputFile := flag.String("output", "s3_benchmark_results.csv", "Output CSV file") skipUpload := flag.Bool("skip-upload", false, "Skip upload tests") skipDownload := flag.Bool("skip-download", false, "Skip download tests") skipS3Standard := flag.Bool("skip-s3-standard", false, "Skip S3 Standard tests") skipS3Express := flag.Bool("skip-s3-express", false, "Skip S3 Express One Zone tests") warmupCount := flag.Int("warmup", 5, "Number of warmup requests before measuring") keyPrefix := flag.String("key-prefix", "benchmark_", "Prefix for S3 object keys") flag.Parse() if *bucket == "" && !*skipS3Standard { log.Fatal("S3 standard bucket name is required") } if *expressBucket == "" && !*skipS3Express { log.Fatal("S3 Express One Zone bucket name is required") } // Parse file sizes var fileSizes []int for _, size := range splitCSV(*fileSizesStr) { sizeInt, err := strconv.Atoi(size) if err != nil { log.Fatalf("Invalid file size: %s", size) } fileSizes = append(fileSizes, sizeInt) } return TestConfig{ Bucket: *bucket, ExpressBucket: *expressBucket, Region: *region, FileSizes: fileSizes, NumIterations: *numIterations, OutputFile: *outputFile, SkipUpload: *skipUpload, SkipDownload: *skipDownload, SkipS3Standard: *skipS3Standard, SkipS3Express: *skipS3Express, WarmupCount: *warmupCount, KeyPrefix: *keyPrefix, } } func splitCSV(s string) []string { var result []string for _, v := range bytes.Split([]byte(s), []byte(",")) { result = append(result, string(bytes.TrimSpace(v))) } return result } func setupAWSClient(region string) (aws.Config, error) { // Create an optimized HTTP client with increased connection pool httpClient := &http.Client{ Transport: &http.Transport{ MaxIdleConns: 100, MaxIdleConnsPerHost: 100, IdleConnTimeout: 90 * time.Second, }, Timeout: 30 * time.Second, } // Load config with optimization settings return config.LoadDefaultConfig(context.TODO(), config.WithRegion(region), config.WithRetryMaxAttempts(1), // Disable retries for benchmarking config.WithHTTPClient(httpClient), ) } func generateRandomData(size int) []byte { data := make([]byte, size) rand.Read(data) return data } func runWarmup(client *s3.Client, testData map[int][]byte, config TestConfig, keyCache *ObjectKeyCache) { // Perform warmup requests to establish connections for storageType, bucketName := range map[string]string{ "S3Standard": config.Bucket, "S3Express": config.ExpressBucket, } { if (storageType == "S3Standard" && config.SkipS3Standard) || (storageType == "S3Express" && config.SkipS3Express) { continue } // Use the smallest file size for warmup smallestSize := config.FileSizes[0] data := testData[smallestSize] for i := 0; i < config.WarmupCount; i++ { // Warmup upload if !config.SkipUpload { key := fmt.Sprintf("%swarmup_%s_%d_kb_%d", config.KeyPrefix, storageType, smallestSize, i) input := &s3.PutObjectInput{ Bucket: aws.String(bucketName), Key: aws.String(key), Body: bytes.NewReader(data), } _, _ = client.PutObject(context.TODO(), input) // Cache the first key for download warmup if i == 0 { keyCache.Mu.Lock() if storageType == "S3Standard" { keyCache.StandardKeys[smallestSize] = key } else { keyCache.ExpressKeys[smallestSize] = key } keyCache.Mu.Unlock() } } // Warmup download if !config.SkipDownload { // Use the key we just uploaded key := fmt.Sprintf("%swarmup_%s_%d_kb_%d", config.KeyPrefix, storageType, smallestSize, 0) input := &s3.GetObjectInput{ Bucket: aws.String(bucketName), Key: aws.String(key), } resp, _ := client.GetObject(context.TODO(), input) if resp != nil && resp.Body != nil { resp.Body.Close() } } } } } func runTests(client *s3.Client, testData map[int][]byte, config TestConfig, storageType, bucketName string, keyCache *ObjectKeyCache) []TestResult { var results []TestResult for _, size := range config.FileSizes { log.Printf("Testing %s with file size %d KB", storageType, size) // Do a warmup for each file size to ensure connection is established log.Printf("Performing warmup for %s with file size %d KB", storageType, size) warmupKey := fmt.Sprintf("%swarmup_pre_test_%d_kb_%d", config.KeyPrefix, size, time.Now().UnixNano()) // Warmup upload _, err := client.PutObject(context.TODO(), &s3.PutObjectInput{ Bucket: aws.String(bucketName), Key: aws.String(warmupKey), Body: bytes.NewReader(testData[size]), }) if err != nil { log.Printf("Warmup upload error: %v", err) } else { // Warmup download resp, err := client.GetObject(context.TODO(), &s3.GetObjectInput{ Bucket: aws.String(bucketName), Key: aws.String(warmupKey), }) if err != nil { log.Printf("Warmup download error: %v", err) } else if resp != nil && resp.Body != nil { // Drain and close the body buf := new(bytes.Buffer) buf.ReadFrom(resp.Body) resp.Body.Close() } } // Small delay to ensure the connection is fully established time.Sleep(500 * time.Millisecond) // Upload tests if !config.SkipUpload { for i := 0; i < config.NumIterations; i++ { result := runUploadTest(client, testData[size], size, storageType, bucketName, config.KeyPrefix) // Cache the key for future downloads if not an error if result.Operation != "Upload-Error" { keyCache.Mu.Lock() if storageType == "S3Standard" { keyCache.StandardKeys[size] = result.Operation[7:] // Strip "Upload-" prefix } else { keyCache.ExpressKeys[size] = result.Operation[7:] // Strip "Upload-" prefix } keyCache.Mu.Unlock() // Reset the operation name for results result.Operation = "Upload" } results = append(results, result) } } // Small delay between upload and download tests time.Sleep(500 * time.Millisecond) // Download tests if !config.SkipDownload { // Get the cached key for this size and storage type var key string keyCache.Mu.Lock() if storageType == "S3Standard" { key = keyCache.StandardKeys[size] } else { key = keyCache.ExpressKeys[size] } keyCache.Mu.Unlock() // If we don't have a cached key, ensure the file exists if key == "" && !config.SkipUpload { // Create a key and upload the object key = fmt.Sprintf("%s%d_kb_%s.bin", config.KeyPrefix, size, time.Now().Format("20060102150405")) // Upload the file input := &s3.PutObjectInput{ Bucket: aws.String(bucketName), Key: aws.String(key), Body: bytes.NewReader(testData[size]), } _, err := client.PutObject(context.TODO(), input) if err != nil { log.Printf("Error uploading file for download tests: %v", err) continue } // Cache the key keyCache.Mu.Lock() if storageType == "S3Standard" { keyCache.StandardKeys[size] = key } else { keyCache.ExpressKeys[size] = key } keyCache.Mu.Unlock() } // Run sequential download tests for i := 0; i < config.NumIterations; i++ { results = append(results, runDownloadTest(client, size, storageType, bucketName, key)) } } } return results } func runUploadTest(client *s3.Client, data []byte, sizeInKB int, storageType, bucketName, keyPrefix string) TestResult { // Create a unique key using the current timestamp key := fmt.Sprintf("%s%d_kb_%s.bin", keyPrefix, sizeInKB, time.Now().Format("20060102150405")) input := &s3.PutObjectInput{ Bucket: aws.String(bucketName), Key: aws.String(key), Body: bytes.NewReader(data), } start := time.Now() _, err := client.PutObject(context.TODO(), input) duration := time.Since(start) if err != nil { log.Printf("Upload error for %s (%d KB): %v", storageType, sizeInKB, err) return TestResult{ StorageType: storageType, Operation: "Upload-Error", FileSize: sizeInKB, Duration: duration, BytesPerSec: 0, } } bytesPerSec := float64(len(data)) / duration.Seconds() // Return the key in the operation field temporarily // This will be used to cache the key for download tests return TestResult{ StorageType: storageType, Operation: "Upload-" + key, FileSize: sizeInKB, Duration: duration, BytesPerSec: bytesPerSec, } } func runDownloadTest(client *s3.Client, sizeInKB int, storageType, bucketName, key string) TestResult { if key == "" { return TestResult{ StorageType: storageType, Operation: "Download-Error-NoKey", FileSize: sizeInKB, Duration: 0, BytesPerSec: 0, } } input := &s3.GetObjectInput{ Bucket: aws.String(bucketName), Key: aws.String(key), } start := time.Now() resp, err := client.GetObject(context.TODO(), input) if err != nil { duration := time.Since(start) log.Printf("Download error for %s (%d KB): %v", storageType, sizeInKB, err) return TestResult{ StorageType: storageType, Operation: "Download-Error", FileSize: sizeInKB, Duration: duration, BytesPerSec: 0, } } // Read the body to ensure we actually download the data buf := new(bytes.Buffer) bytesDownloaded, err := buf.ReadFrom(resp.Body) resp.Body.Close() duration := time.Since(start) if err != nil { log.Printf("Download read error for %s (%d KB): %v", storageType, sizeInKB, err) return TestResult{ StorageType: storageType, Operation: "Download-Error", FileSize: sizeInKB, Duration: duration, BytesPerSec: 0, } } bytesPerSec := float64(bytesDownloaded) / duration.Seconds() return TestResult{ StorageType: storageType, Operation: "Download", FileSize: sizeInKB, Duration: duration, BytesPerSec: bytesPerSec, } } func writeResultsToCSV(results []TestResult, filename string) error { // Create directory if it doesn't exist dir := filepath.Dir(filename) if dir != "." && dir != "" { if err := os.MkdirAll(dir, 0755); err != nil { return err } } file, err := os.Create(filename) if err != nil { return err } defer file.Close() writer := csv.NewWriter(file) defer writer.Flush() // Write header header := []string{"StorageType", "Operation", "FileSize_KB", "Duration_ms", "Throughput_MBps"} if err := writer.Write(header); err != nil { return err } // Write results for _, result := range results { durationMs := result.Duration.Milliseconds() throughputMBps := result.BytesPerSec / (1024 * 1024) // Convert bytes/sec to MB/sec row := []string{ result.StorageType, result.Operation, strconv.Itoa(result.FileSize), strconv.FormatInt(durationMs, 10), strconv.FormatFloat(throughputMBps, 'f', 2, 64), } if err := writer.Write(row); err != nil { return err } } return nil }