diff --git a/interop/client/client.go b/interop/client/client.go index 7b9339d7b61..456b74e101d 100644 --- a/interop/client/client.go +++ b/interop/client/client.go @@ -26,6 +26,7 @@ import ( "io/ioutil" "net" "strconv" + "time" "google.golang.org/grpc" _ "google.golang.org/grpc/balancer/grpclb" @@ -48,20 +49,24 @@ const ( ) var ( - caFile = flag.String("ca_file", "", "The file containning the CA root cert file") - useTLS = flag.Bool("use_tls", false, "Connection uses TLS if true") - useALTS = flag.Bool("use_alts", false, "Connection uses ALTS if true (this option can only be used on GCP)") - customCredentialsType = flag.String("custom_credentials_type", "", "Custom creds to use, excluding TLS or ALTS") - altsHSAddr = flag.String("alts_handshaker_service_address", "", "ALTS handshaker gRPC service address") - testCA = flag.Bool("use_test_ca", false, "Whether to replace platform root CAs with test CA as the CA root") - serviceAccountKeyFile = flag.String("service_account_key_file", "", "Path to service account json key file") - oauthScope = flag.String("oauth_scope", "", "The scope for OAuth2 tokens") - defaultServiceAccount = flag.String("default_service_account", "", "Email of GCE default service account") - serverHost = flag.String("server_host", "localhost", "The server host name") - serverPort = flag.Int("server_port", 10000, "The server port number") - serviceConfigJSON = flag.String("service_config_json", "", "Disables service config lookups and sets the provided string as the default service config.") - tlsServerName = flag.String("server_host_override", "", "The server name used to verify the hostname returned by TLS handshake if it is not empty. Otherwise, --server_host is used.") - testCase = flag.String("test_case", "large_unary", + caFile = flag.String("ca_file", "", "The file containning the CA root cert file") + useTLS = flag.Bool("use_tls", false, "Connection uses TLS if true") + useALTS = flag.Bool("use_alts", false, "Connection uses ALTS if true (this option can only be used on GCP)") + customCredentialsType = flag.String("custom_credentials_type", "", "Custom creds to use, excluding TLS or ALTS") + altsHSAddr = flag.String("alts_handshaker_service_address", "", "ALTS handshaker gRPC service address") + testCA = flag.Bool("use_test_ca", false, "Whether to replace platform root CAs with test CA as the CA root") + serviceAccountKeyFile = flag.String("service_account_key_file", "", "Path to service account json key file") + oauthScope = flag.String("oauth_scope", "", "The scope for OAuth2 tokens") + defaultServiceAccount = flag.String("default_service_account", "", "Email of GCE default service account") + serverHost = flag.String("server_host", "localhost", "The server host name") + serverPort = flag.Int("server_port", 10000, "The server port number") + serviceConfigJSON = flag.String("service_config_json", "", "Disables service config lookups and sets the provided string as the default service config.") + soakIterations = flag.Int("soak_iterations", 10, "The number of iterations to use for the two soak tests: rpc_soak and channel_soak") + soakMaxFailures = flag.Int("soak_max_failures", 0, "The number of iterations in soak tests that are allowed to fail (either due to non-OK status code or exceeding the per-iteration max acceptable latency).") + soakPerIterationMaxAcceptableLatencyMs = flag.Int("soak_per_iteration_max_acceptable_latency_ms", 1000, "The number of milliseconds a single iteration in the two soak tests (rpc_soak and channel_soak) should take.") + soakOverallTimeoutSeconds = flag.Int("soak_overall_timeout_seconds", 10, "The overall number of seconds after which a soak test should stop and fail, if the desired number of iterations have not yet completed.") + tlsServerName = flag.String("server_host_override", "", "The server name used to verify the hostname returned by TLS handshake if it is not empty. Otherwise, --server_host is used.") + testCase = flag.String("test_case", "large_unary", `Configure different test cases. Valid options are: empty_unary : empty (zero bytes) request and response; large_unary : single request and (large) response; @@ -292,6 +297,12 @@ func main() { case "pick_first_unary": interop.DoPickFirstUnary(tc) logger.Infoln("PickFirstUnary done") + case "rpc_soak": + interop.DoSoakTest(tc, serverAddr, opts, false /* resetChannel */, *soakIterations, *soakMaxFailures, time.Duration(*soakPerIterationMaxAcceptableLatencyMs)*time.Millisecond, time.Now().Add(time.Duration(*soakOverallTimeoutSeconds)*time.Second)) + logger.Infoln("RpcSoak done") + case "channel_soak": + interop.DoSoakTest(tc, serverAddr, opts, true /* resetChannel */, *soakIterations, *soakMaxFailures, time.Duration(*soakPerIterationMaxAcceptableLatencyMs)*time.Millisecond, time.Now().Add(time.Duration(*soakOverallTimeoutSeconds)*time.Second)) + logger.Infoln("ChannelSoak done") default: logger.Fatal("Unsupported test case: ", *testCase) } diff --git a/interop/test_utils.go b/interop/test_utils.go index cbcbcc4da17..19a5c1f7cd3 100644 --- a/interop/test_utils.go +++ b/interop/test_utils.go @@ -20,10 +20,12 @@ package interop import ( + "bytes" "context" "fmt" "io" "io/ioutil" + "os" "strings" "time" @@ -31,6 +33,7 @@ import ( "golang.org/x/oauth2" "golang.org/x/oauth2/google" "google.golang.org/grpc" + "google.golang.org/grpc/benchmark/stats" "google.golang.org/grpc/codes" "google.golang.org/grpc/grpclog" "google.golang.org/grpc/metadata" @@ -673,6 +676,91 @@ func DoPickFirstUnary(tc testgrpc.TestServiceClient) { } } +func doOneSoakIteration(ctx context.Context, tc testgrpc.TestServiceClient, resetChannel bool, serverAddr string, dopts []grpc.DialOption) (latency time.Duration, err error) { + start := time.Now() + client := tc + if resetChannel { + var conn *grpc.ClientConn + conn, err = grpc.Dial(serverAddr, dopts...) + if err != nil { + return + } + defer conn.Close() + client = testgrpc.NewTestServiceClient(conn) + } + // per test spec, don't include channel shutdown in latency measurement + defer func() { latency = time.Since(start) }() + // do a large-unary RPC + pl := ClientNewPayload(testpb.PayloadType_COMPRESSABLE, largeReqSize) + req := &testpb.SimpleRequest{ + ResponseType: testpb.PayloadType_COMPRESSABLE, + ResponseSize: int32(largeRespSize), + Payload: pl, + } + var reply *testpb.SimpleResponse + reply, err = client.UnaryCall(ctx, req) + if err != nil { + err = fmt.Errorf("/TestService/UnaryCall RPC failed: %s", err) + return + } + t := reply.GetPayload().GetType() + s := len(reply.GetPayload().GetBody()) + if t != testpb.PayloadType_COMPRESSABLE || s != largeRespSize { + err = fmt.Errorf("got the reply with type %d len %d; want %d, %d", t, s, testpb.PayloadType_COMPRESSABLE, largeRespSize) + return + } + return +} + +// DoSoakTest runs large unary RPCs in a loop for a configurable number of times, with configurable failure thresholds. +// If resetChannel is false, then each RPC will be performed on tc. Otherwise, each RPC will be performed on a new +// stub that is created with the provided server address and dial options. +func DoSoakTest(tc testgrpc.TestServiceClient, serverAddr string, dopts []grpc.DialOption, resetChannel bool, soakIterations int, maxFailures int, perIterationMaxAcceptableLatency time.Duration, overallDeadline time.Time) { + start := time.Now() + ctx, cancel := context.WithDeadline(context.Background(), overallDeadline) + defer cancel() + iterationsDone := 0 + totalFailures := 0 + hopts := stats.HistogramOptions{ + NumBuckets: 20, + GrowthFactor: 1, + BaseBucketSize: 1, + MinValue: 0, + } + h := stats.NewHistogram(hopts) + for i := 0; i < soakIterations; i++ { + if time.Now().After(overallDeadline) { + break + } + iterationsDone++ + latency, err := doOneSoakIteration(ctx, tc, resetChannel, serverAddr, dopts) + latencyMs := int64(latency / time.Millisecond) + h.Add(latencyMs) + if err != nil { + totalFailures++ + fmt.Fprintf(os.Stderr, "soak iteration: %d elapsed_ms: %d failed: %s\n", i, latencyMs, err) + continue + } + if latency > perIterationMaxAcceptableLatency { + totalFailures++ + fmt.Fprintf(os.Stderr, "soak iteration: %d elapsed_ms: %d exceeds max acceptable latency: %d\n", i, latencyMs, perIterationMaxAcceptableLatency.Milliseconds()) + continue + } + fmt.Fprintf(os.Stderr, "soak iteration: %d elapsed_ms: %d succeeded\n", i, latencyMs) + } + var b bytes.Buffer + h.Print(&b) + fmt.Fprintln(os.Stderr, "Histogram of per-iteration latencies in milliseconds:") + fmt.Fprintln(os.Stderr, b.String()) + fmt.Fprintf(os.Stderr, "soak test ran: %d / %d iterations. total failures: %d. max failures threshold: %d. See breakdown above for which iterations succeeded, failed, and why for more info.\n", iterationsDone, soakIterations, totalFailures, maxFailures) + if iterationsDone < soakIterations { + logger.Fatalf("soak test consumed all %f seconds of time and quit early, only having ran %d out of desired %d iterations.", overallDeadline.Sub(start).Seconds(), iterationsDone, soakIterations) + } + if totalFailures > maxFailures { + logger.Fatalf("soak test total failures: %d exceeds max failures threshold: %d.", totalFailures, maxFailures) + } +} + type testServer struct { testgrpc.UnimplementedTestServiceServer }