Skip to content

Commit

Permalink
chore(storage): add benchmarking script (#5856)
Browse files Browse the repository at this point in the history
  • Loading branch information
BrennaEpp committed Dec 7, 2022
1 parent ee5751f commit 2bd2db1
Show file tree
Hide file tree
Showing 9 changed files with 2,066 additions and 0 deletions.
45 changes: 45 additions & 0 deletions storage/internal/benchmarks/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# go-bench-gcs
**This is not an officially supported Google product**

## Run example:
This runs 1000 iterations on 512kib to 2Gib files in the background, sending output to `out.log`:

`go run main -p {PROJECT_ID} -t 72h -max_samples 1000 -o {RESULTS_FILE_NAME}.csv &> out.log &`


## CLI parameters

| Parameter | Description | Possible values | Default |
| --------- | ----------- | --------------- |:-------:|
| -p | projectID | a project ID | * |
| -creds | path to credentials file | any path | from environment |
| -o | file to output results to <br> if empty, will output to stdout | any file path | stdout |
| -output_type | output results as csv records or cloud monitoring | `csv`, `cloud-monitoring` | `cloud-monitoring` |
| -api | which API to use | `JSON`: use JSON to upload and XML to download <br> `XML`: use JSON to upload and XML to download <br> `GRPC`: use GRPC <br> `MIXED`: select an API at random for each upload/download <br> `DirectPath`: use GRPC with direct path | `MIXED` |
| -r | bucket region for benchmarks | any GCS region | `US-WEST1` |
| -workers | number of goroutines to run at once; set to 1 for no concurrency | any positive integer | `16` |
| -t | timeout (maximum time running benchmarks) <br> the program may run for longer while it finishes running processes | any [time.Duration](https://pkg.go.dev/time#Duration) | `1h` |
| -min_samples | minimum number of objects to upload | any positive integer | `10` |
| -max_samples | maximum number of objects to upload | any positive integer | `10 000` |
| -gc_f | whether to force garbage collection <br> before every write or read benchmark | `true` or `false` (present/not present) | `false` |
| -min_size | minimum object size in bytes | any positive integer | `512` |
| -max_size | maximum object size in bytes | any positive integer | `2 097 152` (2 GiB) |
| -defaults | use default settings for the client <br> (conn_pool, read, write and chunk size parameters will be ignored) | `true` or `false` | `false`
| -conn_pool | GRPC connection pool size | any positive integer | 4 |
| -min_cs | minimum ChunkSize in bytes | any positive integer | `16 384` (16 MiB) |
| -max_cs | maximum ChunkSize in bytes | any positive integer | `16 384` (16 MiB) |
| -q_read | download quantum | any positive integer | 1 |
| -q_write | upload quantum | any positive integer | 1 |
| -min_r_size | minimum read size in bytes | any positive integer | `4000` |
| -max_r_size | maximum read size in bytes | any positive integer | `4000` |
| -min_w_size | minimum write size in bytes | any positive integer | `4000` |
| -max_w_size | maximum write size in bytes | any positive integer | `4000` |
| -labels | labels added to cloud monitoring output (ignored when outputting as csv) | any string; should be in the format: <br> `stringKey=\"value\",intKey=3,boolKey=true` | empty |

\* required values

Note: while the default read/write size for HTTP clients is 4Kb
(the default for this benchmarking), the default for GRPC is 32Kb.
If you want to capture performance using the defaults for GRPC run the script
separately setting the read and write sizes to 32Kb, or run with the `defaults`
parameter set.
221 changes: 221 additions & 0 deletions storage/internal/benchmarks/client_pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"context"
"fmt"
"log"
"net"
"net/http"
"os"
"sync"
"time"

"cloud.google.com/go/storage"
"golang.org/x/net/http2"
"google.golang.org/api/option"
htransport "google.golang.org/api/transport/http"
"google.golang.org/grpc"
)

// clientPool functions much like a sync Pool (https://pkg.go.dev/sync#Pool),
// except it does not automatically remove items stored in the clientPool.
// Re-using the clients rather than creating a new one each time reduces overhead
// (such as re-creating the underlying HTTP client and opening credential files),
// and is the intended way to use Storage clients.
//
// There is no limit to how many clients will be created, but it should be around
// the order of 5 * min(workers, max_samples).
type clientPool struct {
New func() *storage.Client
clients []*storage.Client
}

func (p *clientPool) Get() *storage.Client {
// Create the slice if not already created
if p.clients == nil {
p.clients = make([]*storage.Client, 0)
}

// If there is an unused client, return it
if len(p.clients) > 0 {
c := p.clients[0]
p.clients = p.clients[1:]
return c
}

// Otherwise, create a new client and return it
return p.New()
}

func (p *clientPool) Put(c *storage.Client) {
p.clients = append(p.clients, c)
}

// we can share clients as long as the app buffer sizes are constant
var httpClients, gRPCClients *clientPool

var nonBenchmarkingClients = clientPool{
New: func() *storage.Client {
// For debuggability's sake, these are HTTP
clientMu.Lock()
client, err := storage.NewClient(context.Background())
clientMu.Unlock()
if err != nil {
log.Fatalf("storage.NewClient: %v", err)
}

return client
},
}

func initializeClientPools(opts *benchmarkOptions) func() {
httpClients = &clientPool{
New: func() *storage.Client {
client, err := initializeHTTPClient(context.Background(), opts.minWriteSize, opts.maxReadSize, opts.useDefaults)
if err != nil {
log.Fatalf("initializeHTTPClient: %v", err)
}

return client
},
}

gRPCClients = &clientPool{
New: func() *storage.Client {
client, err := initializeGRPCClient(context.Background(), opts.minWriteSize, opts.maxReadSize, opts.connPoolSize, opts.useDefaults)
if err != nil {
log.Fatalf("initializeGRPCClient: %v", err)
}
return client
},
}

return func() {
for _, c := range httpClients.clients {
c.Close()
}
for _, c := range gRPCClients.clients {
c.Close()
}
}
}

// We can't pool storage clients if we need to change parameters at the HTTP or GRPC client level,
// since we can't access those after creation as it is set up now.
// If we are using defaults (ie. not creating an underlying HTTP client ourselves), or if
// we are only interested in one app buffer size at a time, we don't need to change anything on the underlying
// client and can re-use it (and therefore the storage client) for other benchmark runs.
func canUseClientPool(opts *benchmarkOptions) bool {
return opts.useDefaults || (opts.maxReadSize == opts.minReadSize && opts.maxWriteSize == opts.minWriteSize)
}

func getClient(ctx context.Context, opts *benchmarkOptions, br benchmarkResult) (*storage.Client, func() error, error) {
noOp := func() error { return nil }
grpc := br.params.api == grpcAPI || br.params.api == directPath
if canUseClientPool(opts) {
if grpc {
c := gRPCClients.Get()
return c, func() error { gRPCClients.Put(c); return nil }, nil
}
c := httpClients.Get()
return c, func() error { httpClients.Put(c); return nil }, nil
}

// if necessary, create a client
if grpc {
c, err := initializeGRPCClient(ctx, br.params.appBufferSize, br.params.appBufferSize, opts.connPoolSize, false)
if err != nil {
return nil, noOp, fmt.Errorf("initializeGRPCClient: %w", err)
}
return c, c.Close, nil
}
c, err := initializeHTTPClient(ctx, br.params.appBufferSize, br.params.appBufferSize, false)
if err != nil {
return nil, noOp, fmt.Errorf("initializeHTTPClient: %w", err)
}
return c, c.Close, nil
}

// mutex on starting a client so that we can set an env variable for GRPC clients
var clientMu sync.Mutex

func initializeHTTPClient(ctx context.Context, writeBufferSize, readBufferSize int, useDefaults bool) (*storage.Client, error) {
if useDefaults {
clientMu.Lock()
c, err := storage.NewClient(ctx, option.WithCredentialsFile(credentialsFile))
clientMu.Unlock()
return c, err
}

dialer := &net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}

// These are the default parameters with write and read buffer sizes modified
base := &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: dialer.DialContext,
ForceAttemptHTTP2: true,
MaxIdleConns: 100,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
WriteBufferSize: writeBufferSize,
ReadBufferSize: readBufferSize,
}

http2Trans, err := http2.ConfigureTransports(base)
if err == nil {
http2Trans.ReadIdleTimeout = time.Second * 31
}

trans, err := htransport.NewTransport(ctx, base,
option.WithScopes("https://www.googleapis.com/auth/devstorage.full_control"),
option.WithCredentialsFile(credentialsFile))
if err != nil {
return nil, err
}

clientMu.Lock()
client, err := storage.NewClient(ctx, option.WithHTTPClient(&http.Client{Transport: trans}))
clientMu.Unlock()

return client, err
}

func initializeGRPCClient(ctx context.Context, writeBufferSize, readBufferSize int, connPoolSize int, useDefaults bool) (*storage.Client, error) {
if useDefaults {
clientMu.Lock()
os.Setenv("STORAGE_USE_GRPC", "true")
c, err := storage.NewClient(ctx, option.WithCredentialsFile(credentialsFile))
os.Unsetenv("STORAGE_USE_GRPC")
clientMu.Unlock()
return c, err
}

clientMu.Lock()
os.Setenv("STORAGE_USE_GRPC", "true")
client, err := storage.NewClient(ctx, option.WithCredentialsFile(credentialsFile),
option.WithGRPCDialOption(grpc.WithReadBufferSize(readBufferSize)),
option.WithGRPCDialOption(grpc.WithWriteBufferSize(writeBufferSize)),
option.WithGRPCConnectionPool(connPoolSize))
os.Unsetenv("STORAGE_USE_GRPC")
clientMu.Unlock()

return client, err
}
93 changes: 93 additions & 0 deletions storage/internal/benchmarks/download_benchmark.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"context"
"fmt"
"io"
"os"
"time"

"cloud.google.com/go/storage"
)

type downloadOpts struct {
client *storage.Client
objectSize int64
bucket string
object string
}

func downloadBenchmark(ctx context.Context, dopts downloadOpts) (elapsedTime time.Duration, rerr error) {
// Set timer
start := time.Now()
// Multiple defer statements execute in LIFO order, so this will be the last
// thing executed. We use named return parameters so that we can set it directly
// and defer the statement so that the time includes typical cleanup steps and
// gets set regardless of errors.
defer func() { elapsedTime = time.Since(start) }()

// Set additional timeout
ctx, cancel := context.WithTimeout(ctx, time.Minute)
defer cancel()

// Create file to download to
f, err := os.CreateTemp("", objectPrefix)
if err != nil {
rerr = fmt.Errorf("os.Create: %w", err)
return
}
defer func() {
closeErr := f.Close()
removeErr := os.Remove(f.Name())
// if we don't have another error to return, return error for closing file
// if that error is also nil, return removeErr
if rerr == nil {
rerr = removeErr
if closeErr != nil {
rerr = closeErr
}
}
}()

// Get reader from object
o := dopts.client.Bucket(dopts.bucket).Object(dopts.object)
objectReader, err := o.NewReader(ctx)
if err != nil {
rerr = fmt.Errorf("Object(%q).NewReader: %w", o.ObjectName(), err)
return
}
defer func() {
err := objectReader.Close()
if rerr == nil {
rerr = err
}
}()

// Download
written, err := io.Copy(f, objectReader)
if err != nil {
rerr = fmt.Errorf("io.Copy: %w", err)
return
}

if written != dopts.objectSize {
rerr = fmt.Errorf("did not read all bytes; read: %d, expected to read: %d", written, dopts.objectSize)
return
}

return
}

0 comments on commit 2bd2db1

Please sign in to comment.