Skip to content

Commit

Permalink
interop-testing: add soak test cases to interop client (#4677)
Browse files Browse the repository at this point in the history
  • Loading branch information
apolcyn committed Sep 9, 2021
1 parent a6a6317 commit 1fe5adb
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 14 deletions.
39 changes: 25 additions & 14 deletions interop/client/client.go
Expand Up @@ -26,6 +26,7 @@ import (
"io/ioutil"
"net"
"strconv"
"time"

"google.golang.org/grpc"
_ "google.golang.org/grpc/balancer/grpclb"
Expand All @@ -48,20 +49,24 @@ const (
)

var (
caFile = flag.String("ca_file", "", "The file containning the CA root cert file")
useTLS = flag.Bool("use_tls", false, "Connection uses TLS if true")
useALTS = flag.Bool("use_alts", false, "Connection uses ALTS if true (this option can only be used on GCP)")
customCredentialsType = flag.String("custom_credentials_type", "", "Custom creds to use, excluding TLS or ALTS")
altsHSAddr = flag.String("alts_handshaker_service_address", "", "ALTS handshaker gRPC service address")
testCA = flag.Bool("use_test_ca", false, "Whether to replace platform root CAs with test CA as the CA root")
serviceAccountKeyFile = flag.String("service_account_key_file", "", "Path to service account json key file")
oauthScope = flag.String("oauth_scope", "", "The scope for OAuth2 tokens")
defaultServiceAccount = flag.String("default_service_account", "", "Email of GCE default service account")
serverHost = flag.String("server_host", "localhost", "The server host name")
serverPort = flag.Int("server_port", 10000, "The server port number")
serviceConfigJSON = flag.String("service_config_json", "", "Disables service config lookups and sets the provided string as the default service config.")
tlsServerName = flag.String("server_host_override", "", "The server name used to verify the hostname returned by TLS handshake if it is not empty. Otherwise, --server_host is used.")
testCase = flag.String("test_case", "large_unary",
caFile = flag.String("ca_file", "", "The file containning the CA root cert file")
useTLS = flag.Bool("use_tls", false, "Connection uses TLS if true")
useALTS = flag.Bool("use_alts", false, "Connection uses ALTS if true (this option can only be used on GCP)")
customCredentialsType = flag.String("custom_credentials_type", "", "Custom creds to use, excluding TLS or ALTS")
altsHSAddr = flag.String("alts_handshaker_service_address", "", "ALTS handshaker gRPC service address")
testCA = flag.Bool("use_test_ca", false, "Whether to replace platform root CAs with test CA as the CA root")
serviceAccountKeyFile = flag.String("service_account_key_file", "", "Path to service account json key file")
oauthScope = flag.String("oauth_scope", "", "The scope for OAuth2 tokens")
defaultServiceAccount = flag.String("default_service_account", "", "Email of GCE default service account")
serverHost = flag.String("server_host", "localhost", "The server host name")
serverPort = flag.Int("server_port", 10000, "The server port number")
serviceConfigJSON = flag.String("service_config_json", "", "Disables service config lookups and sets the provided string as the default service config.")
soakIterations = flag.Int("soak_iterations", 10, "The number of iterations to use for the two soak tests: rpc_soak and channel_soak")
soakMaxFailures = flag.Int("soak_max_failures", 0, "The number of iterations in soak tests that are allowed to fail (either due to non-OK status code or exceeding the per-iteration max acceptable latency).")
soakPerIterationMaxAcceptableLatencyMs = flag.Int("soak_per_iteration_max_acceptable_latency_ms", 1000, "The number of milliseconds a single iteration in the two soak tests (rpc_soak and channel_soak) should take.")
soakOverallTimeoutSeconds = flag.Int("soak_overall_timeout_seconds", 10, "The overall number of seconds after which a soak test should stop and fail, if the desired number of iterations have not yet completed.")
tlsServerName = flag.String("server_host_override", "", "The server name used to verify the hostname returned by TLS handshake if it is not empty. Otherwise, --server_host is used.")
testCase = flag.String("test_case", "large_unary",
`Configure different test cases. Valid options are:
empty_unary : empty (zero bytes) request and response;
large_unary : single request and (large) response;
Expand Down Expand Up @@ -292,6 +297,12 @@ func main() {
case "pick_first_unary":
interop.DoPickFirstUnary(tc)
logger.Infoln("PickFirstUnary done")
case "rpc_soak":
interop.DoSoakTest(tc, serverAddr, opts, false /* resetChannel */, *soakIterations, *soakMaxFailures, time.Duration(*soakPerIterationMaxAcceptableLatencyMs)*time.Millisecond, time.Now().Add(time.Duration(*soakOverallTimeoutSeconds)*time.Second))
logger.Infoln("RpcSoak done")
case "channel_soak":
interop.DoSoakTest(tc, serverAddr, opts, true /* resetChannel */, *soakIterations, *soakMaxFailures, time.Duration(*soakPerIterationMaxAcceptableLatencyMs)*time.Millisecond, time.Now().Add(time.Duration(*soakOverallTimeoutSeconds)*time.Second))
logger.Infoln("ChannelSoak done")
default:
logger.Fatal("Unsupported test case: ", *testCase)
}
Expand Down
88 changes: 88 additions & 0 deletions interop/test_utils.go
Expand Up @@ -20,17 +20,20 @@
package interop

import (
"bytes"
"context"
"fmt"
"io"
"io/ioutil"
"os"
"strings"
"time"

"github.com/golang/protobuf/proto"
"golang.org/x/oauth2"
"golang.org/x/oauth2/google"
"google.golang.org/grpc"
"google.golang.org/grpc/benchmark/stats"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/metadata"
Expand Down Expand Up @@ -673,6 +676,91 @@ func DoPickFirstUnary(tc testgrpc.TestServiceClient) {
}
}

func doOneSoakIteration(ctx context.Context, tc testgrpc.TestServiceClient, resetChannel bool, serverAddr string, dopts []grpc.DialOption) (latency time.Duration, err error) {
start := time.Now()
client := tc
if resetChannel {
var conn *grpc.ClientConn
conn, err = grpc.Dial(serverAddr, dopts...)
if err != nil {
return
}
defer conn.Close()
client = testgrpc.NewTestServiceClient(conn)
}
// per test spec, don't include channel shutdown in latency measurement
defer func() { latency = time.Since(start) }()
// do a large-unary RPC
pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, largeReqSize)
req := &testpb.SimpleRequest{
ResponseType: testpb.PayloadType_COMPRESSABLE,
ResponseSize: int32(largeRespSize),
Payload: pl,
}
var reply *testpb.SimpleResponse
reply, err = client.UnaryCall(ctx, req)
if err != nil {
err = fmt.Errorf("/TestService/UnaryCall RPC failed: %s", err)
return
}
t := reply.GetPayload().GetType()
s := len(reply.GetPayload().GetBody())
if t != testpb.PayloadType_COMPRESSABLE || s != largeRespSize {
err = fmt.Errorf("got the reply with type %d len %d; want %d, %d", t, s, testpb.PayloadType_COMPRESSABLE, largeRespSize)
return
}
return
}

// DoSoakTest runs large unary RPCs in a loop for a configurable number of times, with configurable failure thresholds.
// If resetChannel is false, then each RPC will be performed on tc. Otherwise, each RPC will be performed on a new
// stub that is created with the provided server address and dial options.
func DoSoakTest(tc testgrpc.TestServiceClient, serverAddr string, dopts []grpc.DialOption, resetChannel bool, soakIterations int, maxFailures int, perIterationMaxAcceptableLatency time.Duration, overallDeadline time.Time) {
start := time.Now()
ctx, cancel := context.WithDeadline(context.Background(), overallDeadline)
defer cancel()
iterationsDone := 0
totalFailures := 0
hopts := stats.HistogramOptions{
NumBuckets: 20,
GrowthFactor: 1,
BaseBucketSize: 1,
MinValue: 0,
}
h := stats.NewHistogram(hopts)
for i := 0; i < soakIterations; i++ {
if time.Now().After(overallDeadline) {
break
}
iterationsDone++
latency, err := doOneSoakIteration(ctx, tc, resetChannel, serverAddr, dopts)
latencyMs := int64(latency / time.Millisecond)
h.Add(latencyMs)
if err != nil {
totalFailures++
fmt.Fprintf(os.Stderr, "soak iteration: %d elapsed_ms: %d failed: %s\n", i, latencyMs, err)
continue
}
if latency > perIterationMaxAcceptableLatency {
totalFailures++
fmt.Fprintf(os.Stderr, "soak iteration: %d elapsed_ms: %d exceeds max acceptable latency: %d\n", i, latencyMs, perIterationMaxAcceptableLatency.Milliseconds())
continue
}
fmt.Fprintf(os.Stderr, "soak iteration: %d elapsed_ms: %d succeeded\n", i, latencyMs)
}
var b bytes.Buffer
h.Print(&b)
fmt.Fprintln(os.Stderr, "Histogram of per-iteration latencies in milliseconds:")
fmt.Fprintln(os.Stderr, b.String())
fmt.Fprintf(os.Stderr, "soak test ran: %d / %d iterations. total failures: %d. max failures threshold: %d. See breakdown above for which iterations succeeded, failed, and why for more info.\n", iterationsDone, soakIterations, totalFailures, maxFailures)
if iterationsDone < soakIterations {
logger.Fatalf("soak test consumed all %f seconds of time and quit early, only having ran %d out of desired %d iterations.", overallDeadline.Sub(start).Seconds(), iterationsDone, soakIterations)
}
if totalFailures > maxFailures {
logger.Fatalf("soak test total failures: %d exceeds max failures threshold: %d.", totalFailures, maxFailures)
}
}

type testServer struct {
testgrpc.UnimplementedTestServiceServer
}
Expand Down

0 comments on commit 1fe5adb

Please sign in to comment.