Skip to content

Commit

Permalink
awstesting/integration/performance/s3UploadManager: CLI Configurable …
Browse files Browse the repository at this point in the history
…Benchmarking Options (#2894)
  • Loading branch information
skmcgrail committed Oct 28, 2019
1 parent 506b1fd commit ad9ccb0
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 64 deletions.
10 changes: 10 additions & 0 deletions awstesting/integration/performance/s3UploadManager/main.go
Expand Up @@ -192,3 +192,13 @@ func newUploader(clientConfig ClientConfig, sdkConfig SDKConfig, options ...requ

return uploader
}

func getUploadPartSize(fileSize, uploadPartSize int64) int64 {
partSize := uploadPartSize

if fileSize/partSize > s3manager.MaxUploadParts {
partSize = (fileSize / s3manager.MaxUploadParts) + 1
}

return partSize
}
177 changes: 113 additions & 64 deletions awstesting/integration/performance/s3UploadManager/main_test.go
Expand Up @@ -3,14 +3,16 @@
package main

import (
"bytes"
"flag"
"fmt"
"io"
"os"
"path/filepath"
"strconv"
"strings"
"testing"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/awstesting"
"github.com/aws/aws-sdk-go/awstesting/integration"
"github.com/aws/aws-sdk-go/internal/sdkio"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
Expand All @@ -22,100 +24,147 @@ type BenchmarkConfig struct {
bucket string
tempdir string
clientConfig ClientConfig
sizes string
parts string
concurrency string
bufferSize string
}

func (b *BenchmarkConfig) SetupFlags(prefix string, flagSet *flag.FlagSet) {
flagSet.StringVar(&b.bucket, "bucket", "", "Bucket to use for benchmark")
flagSet.StringVar(&b.tempdir, "temp", os.TempDir(), "location to create temporary files")

flagSet.StringVar(&b.sizes, "size",
fmt.Sprintf("%d,%d",
5*sdkio.MebiByte,
1*sdkio.GibiByte), "file sizes to benchmark separated by comma")

flagSet.StringVar(&b.parts, "part",
fmt.Sprintf("%d,%d,%d",
s3manager.DefaultUploadPartSize,
25*sdkio.MebiByte,
100*sdkio.MebiByte), "part sizes to benchmark separated by comma")

flagSet.StringVar(&b.concurrency, "concurrency",
fmt.Sprintf("%d,%d,%d",
s3manager.DefaultUploadConcurrency,
2*s3manager.DefaultUploadConcurrency,
100),
"concurrences to benchmark separated comma")

flagSet.StringVar(&b.bufferSize, "buffer", fmt.Sprintf("%d,%d", 0, 1*sdkio.MebiByte), "part sizes to benchmark separated comma")
b.clientConfig.SetupFlags(prefix, flagSet)
}

var benchStrategies = []struct {
name string
bufferProvider s3manager.ReadSeekerWriteToProvider
}{
{name: "Unbuffered", bufferProvider: nil},
{name: "Buffered", bufferProvider: s3manager.NewBufferedReadSeekerWriteToPool(1024 * 1024)},
func (b *BenchmarkConfig) BufferSizes() []int {
ints, err := b.stringToInt(b.bufferSize)
if err != nil {
panic(fmt.Sprintf("failed to parse file sizes: %v", err))
}

return ints
}

func BenchmarkInMemory(b *testing.B) {
memBreader := bytes.NewReader(make([]byte, 1*1024*1024*1024))

baseSdkConfig := SDKConfig{WithUnsignedPayload: true, ExpectContinue: true, WithContentMD5: false}

key := integration.UniqueID()
// Concurrency: 5, 10, 100
for _, concurrency := range []int{s3manager.DefaultUploadConcurrency, 2 * s3manager.DefaultUploadConcurrency, 100} {
b.Run(fmt.Sprintf("%d_Concurrency", concurrency), func(b *testing.B) {
// PartSize: 5 MB, 25 MB, 100 MB
for _, partSize := range []int64{s3manager.DefaultUploadPartSize, 25 * 1024 * 1024, 100 * 1024 * 1024} {
b.Run(fmt.Sprintf("%s_PartSize", integration.SizeToName(int(partSize))), func(b *testing.B) {
sdkConfig := baseSdkConfig

sdkConfig.BufferProvider = nil
sdkConfig.Concurrency = concurrency
sdkConfig.PartSize = partSize

b.ResetTimer()
for i := 0; i < b.N; i++ {
benchUpload(b, benchConfig.bucket, key, memBreader, sdkConfig, benchConfig.clientConfig)
_, err := memBreader.Seek(0, sdkio.SeekStart)
if err != nil {
b.Fatalf("failed to seek to start of file: %v", err)
}
}
})
}
})
func (b *BenchmarkConfig) FileSizes() []int64 {
ints, err := b.stringToInt64(b.sizes)
if err != nil {
panic(fmt.Sprintf("failed to parse file sizes: %v", err))
}

return ints
}

func (b *BenchmarkConfig) PartSizes() []int64 {
ints, err := b.stringToInt64(b.parts)
if err != nil {
panic(fmt.Sprintf("failed to parse part sizes: %v", err))
}

return ints
}

func (b *BenchmarkConfig) Concurrences() []int {
ints, err := b.stringToInt(b.concurrency)
if err != nil {
panic(fmt.Sprintf("failed to parse part sizes: %v", err))
}

return ints
}

func (b *BenchmarkConfig) stringToInt(s string) ([]int, error) {
int64s, err := b.stringToInt64(s)
if err != nil {
return nil, err
}

var ints []int
for i := range int64s {
ints = append(ints, int(int64s[i]))
}

return ints, nil
}

func (b *BenchmarkConfig) stringToInt64(s string) ([]int64, error) {
var sizes []int64

split := strings.Split(s, ",")

for _, size := range split {
size = strings.Trim(size, " ")
i, err := strconv.ParseInt(size, 10, 64)
if err != nil {
return nil, fmt.Errorf("invalid integer %s: %v", size, err)
}

sizes = append(sizes, i)
}

return sizes, nil
}

func BenchmarkUpload(b *testing.B) {
baseSdkConfig := SDKConfig{WithUnsignedPayload: true, ExpectContinue: true, WithContentMD5: false}

// FileSizes: 5 MB, 1 GB, 10 GB
for _, fileSize := range []int64{5 * 1024 * 1024, 1024 * 1024 * 1024, 10 * 1024 * 1024 * 1024} {
b.Run(fmt.Sprintf("%s_File", integration.SizeToName(int(fileSize))), func(b *testing.B) {
b.Logf("creating file of size: %s", integration.SizeToName(int(fileSize)))
file, err := integration.CreateFileOfSize(benchConfig.tempdir, fileSize)
if err != nil {
b.Fatalf("failed to create file: %v", err)
}
baseSdkConfig := SDKConfig{WithUnsignedPayload: true}

// Concurrency: 5, 10, 100
for _, concurrency := range []int{s3manager.DefaultUploadConcurrency, 2 * s3manager.DefaultUploadConcurrency, 100} {
b.Run(fmt.Sprintf("%d_Concurrency", concurrency), func(b *testing.B) {
// PartSize: 5 MB, 25 MB, 100 MB
for _, partSize := range []int64{s3manager.DefaultUploadPartSize, 25 * 1024 * 1024, 100 * 1024 * 1024} {
for _, fileSize := range benchConfig.FileSizes() {
b.Run(fmt.Sprintf("%s File", integration.SizeToName(int(fileSize))), func(b *testing.B) {
for _, concurrency := range benchConfig.Concurrences() {
b.Run(fmt.Sprintf("%d Concurrency", concurrency), func(b *testing.B) {
for _, partSize := range benchConfig.PartSizes() {
if partSize > fileSize {
continue
}
b.Run(fmt.Sprintf("%s_PartSize", integration.SizeToName(int(partSize))), func(b *testing.B) {
for _, strat := range benchStrategies {
b.Run(strat.name, func(b *testing.B) {
partSize = getUploadPartSize(fileSize, partSize)
b.Run(fmt.Sprintf("%s PartSize", integration.SizeToName(int(partSize))), func(b *testing.B) {
for _, bufferSize := range benchConfig.BufferSizes() {
var name string
if bufferSize == 0 {
name = "Unbuffered"
} else {
name = fmt.Sprintf("%s Buffer", integration.SizeToName(bufferSize))
}
b.Run(name, func(b *testing.B) {
sdkConfig := baseSdkConfig

sdkConfig.BufferProvider = strat.bufferProvider
sdkConfig.Concurrency = concurrency
sdkConfig.PartSize = partSize
if bufferSize > 0 {
sdkConfig.BufferProvider = s3manager.NewBufferedReadSeekerWriteToPool(bufferSize)
}

reader := aws.ReadSeekCloser(io.LimitReader(&awstesting.EndlessReader{}, fileSize))

b.ResetTimer()
for i := 0; i < b.N; i++ {
benchUpload(b, benchConfig.bucket, filepath.Base(file.Name()), file, sdkConfig, benchConfig.clientConfig)
_, err := file.Seek(0, sdkio.SeekStart)
if err != nil {
b.Fatalf("failed to seek to start of file: %v", err)
}
benchUpload(b, benchConfig.bucket, integration.UniqueID(), reader, sdkConfig, benchConfig.clientConfig)
}
})
}
})
}
})
}

os.Remove(file.Name())
file.Close()
})
}
}
Expand Down

0 comments on commit ad9ccb0

Please sign in to comment.