From ad9ccb00ac97be832b4b9248fddef87b541b7030 Mon Sep 17 00:00:00 2001 From: Sean McGrail Date: Sun, 27 Oct 2019 18:24:14 -0700 Subject: [PATCH] awstesting/integration/performance/s3UploadManager: CLI Configurable Benchmarking Options (#2894) --- .../performance/s3UploadManager/main.go | 10 + .../performance/s3UploadManager/main_test.go | 177 +++++++++++------- 2 files changed, 123 insertions(+), 64 deletions(-) diff --git a/awstesting/integration/performance/s3UploadManager/main.go b/awstesting/integration/performance/s3UploadManager/main.go index 695a432975..32b19c2003 100644 --- a/awstesting/integration/performance/s3UploadManager/main.go +++ b/awstesting/integration/performance/s3UploadManager/main.go @@ -192,3 +192,13 @@ func newUploader(clientConfig ClientConfig, sdkConfig SDKConfig, options ...requ return uploader } + +func getUploadPartSize(fileSize, uploadPartSize int64) int64 { + partSize := uploadPartSize + + if fileSize/partSize > s3manager.MaxUploadParts { + partSize = (fileSize / s3manager.MaxUploadParts) + 1 + } + + return partSize +} diff --git a/awstesting/integration/performance/s3UploadManager/main_test.go b/awstesting/integration/performance/s3UploadManager/main_test.go index 61e5cadd95..18ce517f1a 100644 --- a/awstesting/integration/performance/s3UploadManager/main_test.go +++ b/awstesting/integration/performance/s3UploadManager/main_test.go @@ -3,14 +3,16 @@ package main import ( - "bytes" "flag" "fmt" "io" "os" - "path/filepath" + "strconv" + "strings" "testing" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/awstesting" "github.com/aws/aws-sdk-go/awstesting/integration" "github.com/aws/aws-sdk-go/internal/sdkio" "github.com/aws/aws-sdk-go/service/s3/s3manager" @@ -22,90 +24,140 @@ type BenchmarkConfig struct { bucket string tempdir string clientConfig ClientConfig + sizes string + parts string + concurrency string + bufferSize string } func (b *BenchmarkConfig) SetupFlags(prefix string, flagSet *flag.FlagSet) { flagSet.StringVar(&b.bucket, "bucket", "", "Bucket to use for benchmark") flagSet.StringVar(&b.tempdir, "temp", os.TempDir(), "location to create temporary files") + + flagSet.StringVar(&b.sizes, "size", + fmt.Sprintf("%d,%d", + 5*sdkio.MebiByte, + 1*sdkio.GibiByte), "file sizes to benchmark separated by comma") + + flagSet.StringVar(&b.parts, "part", + fmt.Sprintf("%d,%d,%d", + s3manager.DefaultUploadPartSize, + 25*sdkio.MebiByte, + 100*sdkio.MebiByte), "part sizes to benchmark separated by comma") + + flagSet.StringVar(&b.concurrency, "concurrency", + fmt.Sprintf("%d,%d,%d", + s3manager.DefaultUploadConcurrency, + 2*s3manager.DefaultUploadConcurrency, + 100), + "concurrences to benchmark separated comma") + + flagSet.StringVar(&b.bufferSize, "buffer", fmt.Sprintf("%d,%d", 0, 1*sdkio.MebiByte), "part sizes to benchmark separated comma") b.clientConfig.SetupFlags(prefix, flagSet) } -var benchStrategies = []struct { - name string - bufferProvider s3manager.ReadSeekerWriteToProvider -}{ - {name: "Unbuffered", bufferProvider: nil}, - {name: "Buffered", bufferProvider: s3manager.NewBufferedReadSeekerWriteToPool(1024 * 1024)}, +func (b *BenchmarkConfig) BufferSizes() []int { + ints, err := b.stringToInt(b.bufferSize) + if err != nil { + panic(fmt.Sprintf("failed to parse file sizes: %v", err)) + } + + return ints } -func BenchmarkInMemory(b *testing.B) { - memBreader := bytes.NewReader(make([]byte, 1*1024*1024*1024)) - - baseSdkConfig := SDKConfig{WithUnsignedPayload: true, ExpectContinue: true, WithContentMD5: false} - - key := integration.UniqueID() - // Concurrency: 5, 10, 100 - for _, concurrency := range []int{s3manager.DefaultUploadConcurrency, 2 * s3manager.DefaultUploadConcurrency, 100} { - b.Run(fmt.Sprintf("%d_Concurrency", concurrency), func(b *testing.B) { - // PartSize: 5 MB, 25 MB, 100 MB - for _, partSize := range []int64{s3manager.DefaultUploadPartSize, 25 * 1024 * 1024, 100 * 1024 * 1024} { - b.Run(fmt.Sprintf("%s_PartSize", integration.SizeToName(int(partSize))), func(b *testing.B) { - sdkConfig := baseSdkConfig - - sdkConfig.BufferProvider = nil - sdkConfig.Concurrency = concurrency - sdkConfig.PartSize = partSize - - b.ResetTimer() - for i := 0; i < b.N; i++ { - benchUpload(b, benchConfig.bucket, key, memBreader, sdkConfig, benchConfig.clientConfig) - _, err := memBreader.Seek(0, sdkio.SeekStart) - if err != nil { - b.Fatalf("failed to seek to start of file: %v", err) - } - } - }) - } - }) +func (b *BenchmarkConfig) FileSizes() []int64 { + ints, err := b.stringToInt64(b.sizes) + if err != nil { + panic(fmt.Sprintf("failed to parse file sizes: %v", err)) } + + return ints +} + +func (b *BenchmarkConfig) PartSizes() []int64 { + ints, err := b.stringToInt64(b.parts) + if err != nil { + panic(fmt.Sprintf("failed to parse part sizes: %v", err)) + } + + return ints +} + +func (b *BenchmarkConfig) Concurrences() []int { + ints, err := b.stringToInt(b.concurrency) + if err != nil { + panic(fmt.Sprintf("failed to parse part sizes: %v", err)) + } + + return ints +} + +func (b *BenchmarkConfig) stringToInt(s string) ([]int, error) { + int64s, err := b.stringToInt64(s) + if err != nil { + return nil, err + } + + var ints []int + for i := range int64s { + ints = append(ints, int(int64s[i])) + } + + return ints, nil +} + +func (b *BenchmarkConfig) stringToInt64(s string) ([]int64, error) { + var sizes []int64 + + split := strings.Split(s, ",") + + for _, size := range split { + size = strings.Trim(size, " ") + i, err := strconv.ParseInt(size, 10, 64) + if err != nil { + return nil, fmt.Errorf("invalid integer %s: %v", size, err) + } + + sizes = append(sizes, i) + } + + return sizes, nil } func BenchmarkUpload(b *testing.B) { - baseSdkConfig := SDKConfig{WithUnsignedPayload: true, ExpectContinue: true, WithContentMD5: false} - - // FileSizes: 5 MB, 1 GB, 10 GB - for _, fileSize := range []int64{5 * 1024 * 1024, 1024 * 1024 * 1024, 10 * 1024 * 1024 * 1024} { - b.Run(fmt.Sprintf("%s_File", integration.SizeToName(int(fileSize))), func(b *testing.B) { - b.Logf("creating file of size: %s", integration.SizeToName(int(fileSize))) - file, err := integration.CreateFileOfSize(benchConfig.tempdir, fileSize) - if err != nil { - b.Fatalf("failed to create file: %v", err) - } + baseSdkConfig := SDKConfig{WithUnsignedPayload: true} - // Concurrency: 5, 10, 100 - for _, concurrency := range []int{s3manager.DefaultUploadConcurrency, 2 * s3manager.DefaultUploadConcurrency, 100} { - b.Run(fmt.Sprintf("%d_Concurrency", concurrency), func(b *testing.B) { - // PartSize: 5 MB, 25 MB, 100 MB - for _, partSize := range []int64{s3manager.DefaultUploadPartSize, 25 * 1024 * 1024, 100 * 1024 * 1024} { + for _, fileSize := range benchConfig.FileSizes() { + b.Run(fmt.Sprintf("%s File", integration.SizeToName(int(fileSize))), func(b *testing.B) { + for _, concurrency := range benchConfig.Concurrences() { + b.Run(fmt.Sprintf("%d Concurrency", concurrency), func(b *testing.B) { + for _, partSize := range benchConfig.PartSizes() { if partSize > fileSize { continue } - b.Run(fmt.Sprintf("%s_PartSize", integration.SizeToName(int(partSize))), func(b *testing.B) { - for _, strat := range benchStrategies { - b.Run(strat.name, func(b *testing.B) { + partSize = getUploadPartSize(fileSize, partSize) + b.Run(fmt.Sprintf("%s PartSize", integration.SizeToName(int(partSize))), func(b *testing.B) { + for _, bufferSize := range benchConfig.BufferSizes() { + var name string + if bufferSize == 0 { + name = "Unbuffered" + } else { + name = fmt.Sprintf("%s Buffer", integration.SizeToName(bufferSize)) + } + b.Run(name, func(b *testing.B) { sdkConfig := baseSdkConfig - sdkConfig.BufferProvider = strat.bufferProvider sdkConfig.Concurrency = concurrency sdkConfig.PartSize = partSize + if bufferSize > 0 { + sdkConfig.BufferProvider = s3manager.NewBufferedReadSeekerWriteToPool(bufferSize) + } + + reader := aws.ReadSeekCloser(io.LimitReader(&awstesting.EndlessReader{}, fileSize)) b.ResetTimer() for i := 0; i < b.N; i++ { - benchUpload(b, benchConfig.bucket, filepath.Base(file.Name()), file, sdkConfig, benchConfig.clientConfig) - _, err := file.Seek(0, sdkio.SeekStart) - if err != nil { - b.Fatalf("failed to seek to start of file: %v", err) - } + benchUpload(b, benchConfig.bucket, integration.UniqueID(), reader, sdkConfig, benchConfig.clientConfig) } }) } @@ -113,9 +165,6 @@ func BenchmarkUpload(b *testing.B) { } }) } - - os.Remove(file.Name()) - file.Close() }) } }