Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

interop client: provide new flag, --soak_min_time_ms_between_rpcs #5421

Merged
merged 7 commits into from Jun 24, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 3 additions & 2 deletions interop/client/client.go
Expand Up @@ -68,6 +68,7 @@ var (
soakMaxFailures = flag.Int("soak_max_failures", 0, "The number of iterations in soak tests that are allowed to fail (either due to non-OK status code or exceeding the per-iteration max acceptable latency).")
soakPerIterationMaxAcceptableLatencyMs = flag.Int("soak_per_iteration_max_acceptable_latency_ms", 1000, "The number of milliseconds a single iteration in the two soak tests (rpc_soak and channel_soak) should take.")
soakOverallTimeoutSeconds = flag.Int("soak_overall_timeout_seconds", 10, "The overall number of seconds after which a soak test should stop and fail, if the desired number of iterations have not yet completed.")
soakMinTimeMsBetweenRPCs = flag.Int("soak_min_time_ms_between_rpcs", 0, "The minimum time in milliseconds between consecutive RPCs in a soak test (rpc_soak or channel_soak), useful for limiting QPS")
tlsServerName = flag.String("server_host_override", "", "The server name used to verify the hostname returned by TLS handshake if it is not empty. Otherwise, --server_host is used.")
testCase = flag.String("test_case", "large_unary",
`Configure different test cases. Valid options are:
Expand Down Expand Up @@ -301,10 +302,10 @@ func main() {
interop.DoPickFirstUnary(tc)
logger.Infoln("PickFirstUnary done")
case "rpc_soak":
interop.DoSoakTest(tc, serverAddr, opts, false /* resetChannel */, *soakIterations, *soakMaxFailures, time.Duration(*soakPerIterationMaxAcceptableLatencyMs)*time.Millisecond, time.Now().Add(time.Duration(*soakOverallTimeoutSeconds)*time.Second))
interop.DoSoakTest(tc, serverAddr, opts, false /* resetChannel */, *soakIterations, *soakMaxFailures, time.Duration(*soakPerIterationMaxAcceptableLatencyMs)*time.Millisecond, time.Duration(*soakMinTimeMsBetweenRPCs)*time.Millisecond, time.Now().Add(time.Duration(*soakOverallTimeoutSeconds)*time.Second))
logger.Infoln("RpcSoak done")
case "channel_soak":
interop.DoSoakTest(tc, serverAddr, opts, true /* resetChannel */, *soakIterations, *soakMaxFailures, time.Duration(*soakPerIterationMaxAcceptableLatencyMs)*time.Millisecond, time.Now().Add(time.Duration(*soakOverallTimeoutSeconds)*time.Second))
interop.DoSoakTest(tc, serverAddr, opts, true /* resetChannel */, *soakIterations, *soakMaxFailures, time.Duration(*soakPerIterationMaxAcceptableLatencyMs)*time.Millisecond, time.Duration(*soakMinTimeMsBetweenRPCs)*time.Millisecond, time.Now().Add(time.Duration(*soakOverallTimeoutSeconds)*time.Second))
logger.Infoln("ChannelSoak done")
default:
logger.Fatal("Unsupported test case: ", *testCase)
Expand Down
14 changes: 13 additions & 1 deletion interop/test_utils.go
Expand Up @@ -715,12 +715,16 @@ func doOneSoakIteration(ctx context.Context, tc testgrpc.TestServiceClient, rese
// DoSoakTest runs large unary RPCs in a loop for a configurable number of times, with configurable failure thresholds.
// If resetChannel is false, then each RPC will be performed on tc. Otherwise, each RPC will be performed on a new
// stub that is created with the provided server address and dial options.
func DoSoakTest(tc testgrpc.TestServiceClient, serverAddr string, dopts []grpc.DialOption, resetChannel bool, soakIterations int, maxFailures int, perIterationMaxAcceptableLatency time.Duration, overallDeadline time.Time) {
func DoSoakTest(tc testgrpc.TestServiceClient, serverAddr string, dopts []grpc.DialOption, resetChannel bool, soakIterations int, maxFailures int, perIterationMaxAcceptableLatency time.Duration, minTimeBetweenRPCs time.Duration, overallDeadline time.Time) {
start := time.Now()
ctx, cancel := context.WithDeadline(context.Background(), overallDeadline)
defer cancel()
iterationsDone := 0
totalFailures := 0
var t *time.Ticker
if minTimeBetweenRPCs.Nanoseconds() > 0 {
t = time.NewTicker(minTimeBetweenRPCs)
}
hopts := stats.HistogramOptions{
NumBuckets: 20,
GrowthFactor: 1,
Expand All @@ -747,6 +751,14 @@ func DoSoakTest(tc testgrpc.TestServiceClient, serverAddr string, dopts []grpc.D
continue
}
fmt.Fprintf(os.Stderr, "soak iteration: %d elapsed_ms: %d succeeded\n", i, latencyMs)
if t != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of creating a time.Ticker above, could you consider doing the following here?

if minTimeBetweenRPCs != 0 {
  <-time.After(minTimeBetweenRPCs)
}

With this approach, you wouldn't have to worry about stopping the ticker below.

Also, I'm not quite sure if time.After() returns immediately if it is passed a duration of 0. If that is the case, you would need to check if the duration is non-zero either.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, nice cleanup!

I made a slight change to your comment, which is to create the time.After channel before performing the RPC, and then wait on it after. This is because the flag is meant to control min time between RPC starts, rather than between finishes and starts.

select {
case <-t.C:
}
}
}
if t != nil {
t.Stop()
}
var b bytes.Buffer
h.Print(&b)
Expand Down