From 94ee3865e17c5645148f8d24a32dc50f40810f13 Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Tue, 15 Mar 2022 17:01:07 -0700 Subject: [PATCH] test: cleanup roundrobin tests to use stubserver (#5236) --- balancer/roundrobin/roundrobin_test.go | 591 ------------------------- test/insecure_creds_test.go | 3 - test/roundrobin_test.go | 411 +++++++++++++++++ test/timeouts.go | 29 ++ 4 files changed, 440 insertions(+), 594 deletions(-) delete mode 100644 balancer/roundrobin/roundrobin_test.go create mode 100644 test/roundrobin_test.go create mode 100644 test/timeouts.go diff --git a/balancer/roundrobin/roundrobin_test.go b/balancer/roundrobin/roundrobin_test.go deleted file mode 100644 index 574625642be..00000000000 --- a/balancer/roundrobin/roundrobin_test.go +++ /dev/null @@ -1,591 +0,0 @@ -/* - * - * Copyright 2017 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package roundrobin_test - -import ( - "context" - "fmt" - "net" - "strings" - "sync" - "testing" - "time" - - "google.golang.org/grpc" - "google.golang.org/grpc/balancer/roundrobin" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/connectivity" - "google.golang.org/grpc/credentials/insecure" - "google.golang.org/grpc/internal/grpctest" - imetadata "google.golang.org/grpc/internal/metadata" - "google.golang.org/grpc/metadata" - "google.golang.org/grpc/peer" - "google.golang.org/grpc/resolver" - "google.golang.org/grpc/resolver/manual" - "google.golang.org/grpc/status" - testpb "google.golang.org/grpc/test/grpc_testing" -) - -const ( - testMDKey = "test-md" -) - -type s struct { - grpctest.Tester -} - -func Test(t *testing.T) { - grpctest.RunSubTests(t, s{}) -} - -type testServer struct { - testpb.UnimplementedTestServiceServer - - testMDChan chan []string -} - -func newTestServer(mdchan bool) *testServer { - t := &testServer{} - if mdchan { - t.testMDChan = make(chan []string, 1) - } - return t -} - -func (s *testServer) EmptyCall(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { - if s.testMDChan == nil { - return &testpb.Empty{}, nil - } - md, ok := metadata.FromIncomingContext(ctx) - if !ok { - return nil, status.Errorf(codes.Internal, "no metadata in context") - } - select { - case s.testMDChan <- md[testMDKey]: - case <-ctx.Done(): - return nil, ctx.Err() - } - return &testpb.Empty{}, nil -} - -func (s *testServer) FullDuplexCall(stream testpb.TestService_FullDuplexCallServer) error { - return nil -} - -type test struct { - servers []*grpc.Server - serverImpls []*testServer - addresses []string -} - -func (t *test) cleanup() { - for _, s := range t.servers { - s.Stop() - } -} - -func startTestServers(count int, mdchan bool) (_ *test, err error) { - t := &test{} - - defer func() { - if err != nil { - t.cleanup() - } - }() - for i := 0; i < count; i++ { - lis, err := net.Listen("tcp", "localhost:0") - if err != nil { - return nil, fmt.Errorf("failed to listen %v", err) - } - - s := grpc.NewServer() - sImpl := newTestServer(mdchan) - testpb.RegisterTestServiceServer(s, sImpl) - t.servers = append(t.servers, s) - t.serverImpls = append(t.serverImpls, sImpl) - t.addresses = append(t.addresses, lis.Addr().String()) - - go func(s *grpc.Server, l net.Listener) { - s.Serve(l) - }(s, lis) - } - - return t, nil -} - -func (s) TestOneBackend(t *testing.T) { - r := manual.NewBuilderWithScheme("whatever") - - test, err := startTestServers(1, false) - if err != nil { - t.Fatalf("failed to start servers: %v", err) - } - defer test.cleanup() - cc, err := grpc.Dial(r.Scheme()+":///test.server", - grpc.WithTransportCredentials(insecure.NewCredentials()), - grpc.WithResolvers(r), - grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, roundrobin.Name))) - if err != nil { - t.Fatalf("failed to dial: %v", err) - } - defer cc.Close() - testc := testpb.NewTestServiceClient(cc) - // The first RPC should fail because there's no address. - ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) - defer cancel() - if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded { - t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err) - } - - r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: test.addresses[0]}}}) - // The second RPC should succeed. - if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil { - t.Fatalf("EmptyCall() = _, %v, want _, ", err) - } -} - -func (s) TestBackendsRoundRobin(t *testing.T) { - r := manual.NewBuilderWithScheme("whatever") - - backendCount := 5 - test, err := startTestServers(backendCount, false) - if err != nil { - t.Fatalf("failed to start servers: %v", err) - } - defer test.cleanup() - - cc, err := grpc.Dial(r.Scheme()+":///test.server", - grpc.WithTransportCredentials(insecure.NewCredentials()), - grpc.WithResolvers(r), - grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, roundrobin.Name))) - if err != nil { - t.Fatalf("failed to dial: %v", err) - } - defer cc.Close() - testc := testpb.NewTestServiceClient(cc) - // The first RPC should fail because there's no address. - ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) - defer cancel() - if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded { - t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err) - } - - var resolvedAddrs []resolver.Address - for i := 0; i < backendCount; i++ { - resolvedAddrs = append(resolvedAddrs, resolver.Address{Addr: test.addresses[i]}) - } - - r.UpdateState(resolver.State{Addresses: resolvedAddrs}) - var p peer.Peer - // Make sure connections to all servers are up. - for si := 0; si < backendCount; si++ { - var connected bool - for i := 0; i < 1000; i++ { - if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil { - t.Fatalf("EmptyCall() = _, %v, want _, ", err) - } - if p.Addr.String() == test.addresses[si] { - connected = true - break - } - time.Sleep(time.Millisecond) - } - if !connected { - t.Fatalf("Connection to %v was not up after more than 1 second", test.addresses[si]) - } - } - - for i := 0; i < 3*backendCount; i++ { - if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil { - t.Fatalf("EmptyCall() = _, %v, want _, ", err) - } - if p.Addr.String() != test.addresses[i%backendCount] { - t.Fatalf("Index %d: want peer %v, got peer %v", i, test.addresses[i%backendCount], p.Addr.String()) - } - } -} - -func (s) TestAddressesRemoved(t *testing.T) { - r := manual.NewBuilderWithScheme("whatever") - - test, err := startTestServers(1, false) - if err != nil { - t.Fatalf("failed to start servers: %v", err) - } - defer test.cleanup() - - cc, err := grpc.Dial(r.Scheme()+":///test.server", - grpc.WithTransportCredentials(insecure.NewCredentials()), - grpc.WithResolvers(r), - grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, roundrobin.Name))) - if err != nil { - t.Fatalf("failed to dial: %v", err) - } - defer cc.Close() - testc := testpb.NewTestServiceClient(cc) - // The first RPC should fail because there's no address. - ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) - defer cancel() - if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded { - t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err) - } - - r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: test.addresses[0]}}}) - // The second RPC should succeed. - if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil { - t.Fatalf("EmptyCall() = _, %v, want _, ", err) - } - - r.UpdateState(resolver.State{Addresses: []resolver.Address{}}) - - ctx2, cancel2 := context.WithTimeout(context.Background(), 500*time.Millisecond) - defer cancel2() - // Wait for state to change to transient failure. - for src := cc.GetState(); src != connectivity.TransientFailure; src = cc.GetState() { - if !cc.WaitForStateChange(ctx2, src) { - t.Fatalf("timed out waiting for state change. got %v; want %v", src, connectivity.TransientFailure) - } - } - - const msgWant = "produced zero addresses" - if _, err := testc.EmptyCall(ctx2, &testpb.Empty{}); !strings.Contains(status.Convert(err).Message(), msgWant) { - t.Fatalf("EmptyCall() = _, %v, want _, Contains(Message(), %q)", err, msgWant) - } -} - -func (s) TestCloseWithPendingRPC(t *testing.T) { - r := manual.NewBuilderWithScheme("whatever") - - test, err := startTestServers(1, false) - if err != nil { - t.Fatalf("failed to start servers: %v", err) - } - defer test.cleanup() - - cc, err := grpc.Dial(r.Scheme()+":///test.server", - grpc.WithTransportCredentials(insecure.NewCredentials()), - grpc.WithResolvers(r), - grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, roundrobin.Name))) - if err != nil { - t.Fatalf("failed to dial: %v", err) - } - testc := testpb.NewTestServiceClient(cc) - - var wg sync.WaitGroup - for i := 0; i < 3; i++ { - wg.Add(1) - go func() { - defer wg.Done() - // This RPC blocks until cc is closed. - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) == codes.DeadlineExceeded { - t.Errorf("RPC failed because of deadline after cc is closed; want error the client connection is closing") - } - cancel() - }() - } - cc.Close() - wg.Wait() -} - -func (s) TestNewAddressWhileBlocking(t *testing.T) { - r := manual.NewBuilderWithScheme("whatever") - - test, err := startTestServers(1, false) - if err != nil { - t.Fatalf("failed to start servers: %v", err) - } - defer test.cleanup() - - cc, err := grpc.Dial(r.Scheme()+":///test.server", - grpc.WithTransportCredentials(insecure.NewCredentials()), - grpc.WithResolvers(r), - grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, roundrobin.Name))) - if err != nil { - t.Fatalf("failed to dial: %v", err) - } - defer cc.Close() - testc := testpb.NewTestServiceClient(cc) - // The first RPC should fail because there's no address. - ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) - defer cancel() - if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded { - t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err) - } - - r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: test.addresses[0]}}}) - // The second RPC should succeed. - ctx, cancel = context.WithTimeout(context.Background(), 2*time.Second) - defer cancel() - if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err != nil { - t.Fatalf("EmptyCall() = _, %v, want _, nil", err) - } - - r.UpdateState(resolver.State{Addresses: []resolver.Address{}}) - - var wg sync.WaitGroup - for i := 0; i < 3; i++ { - wg.Add(1) - go func() { - defer wg.Done() - // This RPC blocks until NewAddress is called. - testc.EmptyCall(context.Background(), &testpb.Empty{}) - }() - } - time.Sleep(50 * time.Millisecond) - r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: test.addresses[0]}}}) - wg.Wait() -} - -func (s) TestOneServerDown(t *testing.T) { - r := manual.NewBuilderWithScheme("whatever") - - backendCount := 3 - test, err := startTestServers(backendCount, false) - if err != nil { - t.Fatalf("failed to start servers: %v", err) - } - defer test.cleanup() - - cc, err := grpc.Dial(r.Scheme()+":///test.server", - grpc.WithTransportCredentials(insecure.NewCredentials()), - grpc.WithResolvers(r), - grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, roundrobin.Name))) - if err != nil { - t.Fatalf("failed to dial: %v", err) - } - defer cc.Close() - testc := testpb.NewTestServiceClient(cc) - // The first RPC should fail because there's no address. - ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) - defer cancel() - if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded { - t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err) - } - - var resolvedAddrs []resolver.Address - for i := 0; i < backendCount; i++ { - resolvedAddrs = append(resolvedAddrs, resolver.Address{Addr: test.addresses[i]}) - } - - r.UpdateState(resolver.State{Addresses: resolvedAddrs}) - var p peer.Peer - // Make sure connections to all servers are up. - for si := 0; si < backendCount; si++ { - var connected bool - for i := 0; i < 1000; i++ { - if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil { - t.Fatalf("EmptyCall() = _, %v, want _, ", err) - } - if p.Addr.String() == test.addresses[si] { - connected = true - break - } - time.Sleep(time.Millisecond) - } - if !connected { - t.Fatalf("Connection to %v was not up after more than 1 second", test.addresses[si]) - } - } - - for i := 0; i < 3*backendCount; i++ { - if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil { - t.Fatalf("EmptyCall() = _, %v, want _, ", err) - } - if p.Addr.String() != test.addresses[i%backendCount] { - t.Fatalf("Index %d: want peer %v, got peer %v", i, test.addresses[i%backendCount], p.Addr.String()) - } - } - - // Stop one server, RPCs should roundrobin among the remaining servers. - backendCount-- - test.servers[backendCount].Stop() - // Loop until see server[backendCount-1] twice without seeing server[backendCount]. - var targetSeen int - for i := 0; i < 1000; i++ { - if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil { - targetSeen = 0 - t.Logf("EmptyCall() = _, %v, want _, ", err) - // Due to a race, this RPC could possibly get the connection that - // was closing, and this RPC may fail. Keep trying when this - // happens. - continue - } - switch p.Addr.String() { - case test.addresses[backendCount-1]: - targetSeen++ - case test.addresses[backendCount]: - // Reset targetSeen if peer is server[backendCount]. - targetSeen = 0 - } - // Break to make sure the last picked address is server[-1], so the following for loop won't be flaky. - if targetSeen >= 2 { - break - } - } - if targetSeen != 2 { - t.Fatal("Failed to see server[backendCount-1] twice without seeing server[backendCount]") - } - for i := 0; i < 3*backendCount; i++ { - if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil { - t.Fatalf("EmptyCall() = _, %v, want _, ", err) - } - if p.Addr.String() != test.addresses[i%backendCount] { - t.Errorf("Index %d: want peer %v, got peer %v", i, test.addresses[i%backendCount], p.Addr.String()) - } - } -} - -func (s) TestAllServersDown(t *testing.T) { - r := manual.NewBuilderWithScheme("whatever") - - backendCount := 3 - test, err := startTestServers(backendCount, false) - if err != nil { - t.Fatalf("failed to start servers: %v", err) - } - defer test.cleanup() - - cc, err := grpc.Dial(r.Scheme()+":///test.server", - grpc.WithTransportCredentials(insecure.NewCredentials()), - grpc.WithResolvers(r), - grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, roundrobin.Name))) - if err != nil { - t.Fatalf("failed to dial: %v", err) - } - defer cc.Close() - testc := testpb.NewTestServiceClient(cc) - // The first RPC should fail because there's no address. - ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) - defer cancel() - if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded { - t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err) - } - - var resolvedAddrs []resolver.Address - for i := 0; i < backendCount; i++ { - resolvedAddrs = append(resolvedAddrs, resolver.Address{Addr: test.addresses[i]}) - } - - r.UpdateState(resolver.State{Addresses: resolvedAddrs}) - var p peer.Peer - // Make sure connections to all servers are up. - for si := 0; si < backendCount; si++ { - var connected bool - for i := 0; i < 1000; i++ { - if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil { - t.Fatalf("EmptyCall() = _, %v, want _, ", err) - } - if p.Addr.String() == test.addresses[si] { - connected = true - break - } - time.Sleep(time.Millisecond) - } - if !connected { - t.Fatalf("Connection to %v was not up after more than 1 second", test.addresses[si]) - } - } - - for i := 0; i < 3*backendCount; i++ { - if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Peer(&p)); err != nil { - t.Fatalf("EmptyCall() = _, %v, want _, ", err) - } - if p.Addr.String() != test.addresses[i%backendCount] { - t.Fatalf("Index %d: want peer %v, got peer %v", i, test.addresses[i%backendCount], p.Addr.String()) - } - } - - // All servers are stopped, failfast RPC should fail with unavailable. - for i := 0; i < backendCount; i++ { - test.servers[i].Stop() - } - time.Sleep(100 * time.Millisecond) - for i := 0; i < 1000; i++ { - if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}); status.Code(err) == codes.Unavailable { - return - } - time.Sleep(time.Millisecond) - } - t.Fatalf("Failfast RPCs didn't fail with Unavailable after all servers are stopped") -} - -func (s) TestUpdateAddressAttributes(t *testing.T) { - r := manual.NewBuilderWithScheme("whatever") - - test, err := startTestServers(1, true) - if err != nil { - t.Fatalf("failed to start servers: %v", err) - } - defer test.cleanup() - - cc, err := grpc.Dial(r.Scheme()+":///test.server", - grpc.WithTransportCredentials(insecure.NewCredentials()), - grpc.WithResolvers(r), - grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, roundrobin.Name))) - if err != nil { - t.Fatalf("failed to dial: %v", err) - } - defer cc.Close() - testc := testpb.NewTestServiceClient(cc) - - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - - // The first RPC should fail because there's no address. - ctxShort, cancel2 := context.WithTimeout(ctx, time.Millisecond) - defer cancel2() - if _, err := testc.EmptyCall(ctxShort, &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded { - t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err) - } - - r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: test.addresses[0]}}}) - // The second RPC should succeed. - if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err != nil { - t.Fatalf("EmptyCall() = _, %v, want _, ", err) - } - // The second RPC should not set metadata, so there's no md in the channel. - md1 := <-test.serverImpls[0].testMDChan - if md1 != nil { - t.Fatalf("got md: %v, want empty metadata", md1) - } - - const testMDValue = "test-md-value" - // Update metadata in address. - r.UpdateState(resolver.State{Addresses: []resolver.Address{ - imetadata.Set(resolver.Address{Addr: test.addresses[0]}, metadata.Pairs(testMDKey, testMDValue)), - }}) - - // A future RPC should send metadata with it. The update doesn't - // necessarily happen synchronously, so we wait some time before failing if - // some RPCs do not contain it. - for { - if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err != nil { - if status.Code(err) == codes.DeadlineExceeded { - t.Fatalf("timed out waiting for metadata in response") - } - t.Fatalf("EmptyCall() = _, %v, want _, ", err) - } - md2 := <-test.serverImpls[0].testMDChan - if len(md2) == 1 && md2[0] == testMDValue { - return - } - time.Sleep(10 * time.Millisecond) - } -} diff --git a/test/insecure_creds_test.go b/test/insecure_creds_test.go index ec1bb41433c..28e8b731814 100644 --- a/test/insecure_creds_test.go +++ b/test/insecure_creds_test.go @@ -23,7 +23,6 @@ import ( "net" "strings" "testing" - "time" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -36,8 +35,6 @@ import ( testpb "google.golang.org/grpc/test/grpc_testing" ) -const defaultTestTimeout = 5 * time.Second - // testLegacyPerRPCCredentials is a PerRPCCredentials that has yet incorporated security level. type testLegacyPerRPCCredentials struct{} diff --git a/test/roundrobin_test.go b/test/roundrobin_test.go new file mode 100644 index 00000000000..557a47f7744 --- /dev/null +++ b/test/roundrobin_test.go @@ -0,0 +1,411 @@ +/* + * + * Copyright 2022 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package test + +import ( + "context" + "fmt" + "strings" + "testing" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/internal/grpcsync" + imetadata "google.golang.org/grpc/internal/metadata" + "google.golang.org/grpc/internal/stubserver" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/peer" + "google.golang.org/grpc/resolver" + "google.golang.org/grpc/resolver/manual" + "google.golang.org/grpc/stats" + "google.golang.org/grpc/status" + testgrpc "google.golang.org/grpc/test/grpc_testing" + testpb "google.golang.org/grpc/test/grpc_testing" +) + +const rrServiceConfig = `{"loadBalancingConfig": [{"round_robin":{}}]}` + +func statsHandlerDialOption(funcs statsHandlerFuncs) grpc.DialOption { + return grpc.WithStatsHandler(&statsHandler{funcs: funcs}) +} + +type statsHandlerFuncs struct { + TagRPC func(context.Context, *stats.RPCTagInfo) context.Context + HandleRPC func(context.Context, stats.RPCStats) + TagConn func(context.Context, *stats.ConnTagInfo) context.Context + HandleConn func(context.Context, stats.ConnStats) +} + +type statsHandler struct { + funcs statsHandlerFuncs +} + +func (s *statsHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context { + if s.funcs.TagRPC != nil { + return s.funcs.TagRPC(ctx, info) + } + return ctx +} + +func (s *statsHandler) HandleRPC(ctx context.Context, stats stats.RPCStats) { + if s.funcs.HandleRPC != nil { + s.funcs.HandleRPC(ctx, stats) + } +} + +func (s *statsHandler) TagConn(ctx context.Context, info *stats.ConnTagInfo) context.Context { + if s.funcs.TagConn != nil { + return s.funcs.TagConn(ctx, info) + } + return ctx +} + +func (s *statsHandler) HandleConn(ctx context.Context, stats stats.ConnStats) { + if s.funcs.HandleConn != nil { + s.funcs.HandleConn(ctx, stats) + } +} + +func checkRoundRobin(ctx context.Context, client testgrpc.TestServiceClient, addrs []resolver.Address) error { + var peer peer.Peer + // Make sure connections to all backends are up. + backendCount := len(addrs) + for i := 0; i < backendCount; i++ { + for { + time.Sleep(time.Millisecond) + if ctx.Err() != nil { + return fmt.Errorf("timeout waiting for connection to %q to be up", addrs[i].Addr) + } + if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(&peer)); err != nil { + // Some tests remove backends and check if round robin is happening + // across the remaining backends. In such cases, RPCs can initially fail + // on the connection using the removed backend. Just keep retrying and + // eventually the connection using the removed backend will shutdown and + // will be removed. + continue + } + if peer.Addr.String() == addrs[i].Addr { + break + } + } + } + // Make sure RPCs are sent to all backends. + for i := 0; i < 3*backendCount; i++ { + if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(&peer)); err != nil { + return fmt.Errorf("EmptyCall() = %v, want ", err) + } + if gotPeer, wantPeer := addrs[i%backendCount].Addr, peer.Addr.String(); gotPeer != wantPeer { + return fmt.Errorf("rpc sent to peer %q, want peer %q", gotPeer, wantPeer) + } + } + return nil +} + +func testRoundRobinBasic(ctx context.Context, t *testing.T, opts ...grpc.DialOption) (*grpc.ClientConn, *manual.Resolver, []*stubserver.StubServer) { + t.Helper() + r := manual.NewBuilderWithScheme("whatever") + + const backendCount = 5 + backends := make([]*stubserver.StubServer, backendCount) + addrs := make([]resolver.Address, backendCount) + for i := 0; i < backendCount; i++ { + backend := &stubserver.StubServer{ + EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { return &testpb.Empty{}, nil }, + } + if err := backend.StartServer(); err != nil { + t.Fatalf("Failed to start backend: %v", err) + } + t.Logf("Started TestService backend at: %q", backend.Address) + t.Cleanup(func() { backend.Stop() }) + + backends[i] = backend + addrs[i] = resolver.Address{Addr: backend.Address} + } + + dopts := []grpc.DialOption{ + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithResolvers(r), + grpc.WithDefaultServiceConfig(rrServiceConfig), + } + dopts = append(dopts, opts...) + cc, err := grpc.Dial(r.Scheme()+":///test.server", dopts...) + if err != nil { + t.Fatalf("grpc.Dial() failed: %v", err) + } + t.Cleanup(func() { cc.Close() }) + client := testpb.NewTestServiceClient(cc) + + // At this point, the resolver has not returned any addresses to the channel. + // This RPC must block until the context expires. + sCtx, sCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout) + defer sCancel() + if _, err := client.EmptyCall(sCtx, &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded { + t.Fatalf("EmptyCall() = %s, want %s", status.Code(err), codes.DeadlineExceeded) + } + + r.UpdateState(resolver.State{Addresses: addrs}) + if err := checkRoundRobin(ctx, client, addrs); err != nil { + t.Fatal(err) + } + return cc, r, backends +} + +// TestRoundRobin_Basic tests the most basic scenario for round_robin. It brings +// up a bunch of backends and verifies that RPCs are getting round robin-ed +// across these backends. +func (s) TestRoundRobin_Basic(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + testRoundRobinBasic(ctx, t) +} + +// TestRoundRobin_AddressesRemoved tests the scenario where a bunch of backends +// are brought up, and round_robin is configured as the LB policy and RPCs are +// being correctly round robin-ed across these backends. We then send a resolver +// update with no addresses and verify that the channel enters TransientFailure +// and RPCs fail with an expected error message. +func (s) TestRoundRobin_AddressesRemoved(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + cc, r, _ := testRoundRobinBasic(ctx, t) + + // Send a resolver update with no addresses. This should push the channel into + // TransientFailure. + r.UpdateState(resolver.State{Addresses: []resolver.Address{}}) + for state := cc.GetState(); state != connectivity.TransientFailure; state = cc.GetState() { + if !cc.WaitForStateChange(ctx, state) { + t.Fatalf("timeout waiting for state change. got %v; want %v", state, connectivity.TransientFailure) + } + } + + const msgWant = "produced zero addresses" + client := testpb.NewTestServiceClient(cc) + if _, err := client.EmptyCall(ctx, &testpb.Empty{}); !strings.Contains(status.Convert(err).Message(), msgWant) { + t.Fatalf("EmptyCall() = %v, want Contains(Message(), %q)", err, msgWant) + } +} + +// TestRoundRobin_NewAddressWhileBlocking tests the case where round_robin is +// configured on a channel, things are working as expected and then a resolver +// updates removes all addresses. An RPC attempted at this point in time will be +// blocked because there are no valid backends. This test verifies that when new +// backends are added, the RPC is able to complete. +func (s) TestRoundRobin_NewAddressWhileBlocking(t *testing.T) { + // Register a stats handler which writes to `rpcCh` when an RPC is started. + // The stats handler starts writing to `rpcCh` only after `begin` has fired. + // We are not interested in being notified about initial RPCs which ensure + // that round_robin is working as expected. We are only interested in being + // notified when we have an RPC which is blocked because there are no + // backends, and will become unblocked when the resolver reports new backends. + begin := grpcsync.NewEvent() + rpcCh := make(chan struct{}, 1) + shOption := statsHandlerDialOption(statsHandlerFuncs{ + HandleRPC: func(ctx context.Context, rpcStats stats.RPCStats) { + if !begin.HasFired() { + return + } + select { + case rpcCh <- struct{}{}: + default: + } + }, + }) + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + cc, r, backends := testRoundRobinBasic(ctx, t, shOption) + + // Send a resolver update with no addresses. This should push the channel into + // TransientFailure. + r.UpdateState(resolver.State{Addresses: []resolver.Address{}}) + for state := cc.GetState(); state != connectivity.TransientFailure; state = cc.GetState() { + if !cc.WaitForStateChange(ctx, state) { + t.Fatalf("timeout waiting for state change. got %v; want %v", state, connectivity.TransientFailure) + } + } + + begin.Fire() + client := testpb.NewTestServiceClient(cc) + doneCh := make(chan struct{}) + go func() { + // The channel is currently in TransientFailure and this RPC will block + // until the channel becomes Ready, which will only happen when we push a + // resolver update with a valid backend address. + if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { + t.Errorf("EmptyCall() = %v, want ", err) + } + close(doneCh) + }() + + select { + case <-ctx.Done(): + t.Fatal("Timeout when waiting for RPC to start and block") + case <-rpcCh: + } + // Send a resolver update with a valid backend to push the channel to Ready + // and unblock the above RPC. + r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: backends[0].Address}}}) + + select { + case <-ctx.Done(): + t.Fatal("Timeout when waiting for blocked RPC to complete") + case <-doneCh: + } +} + +// TestRoundRobin_OneServerDown tests the scenario where a channel is configured +// to round robin across a set of backends, and things are working correctly. +// One backend goes down. The test verifies that going forward, RPCs are round +// robin-ed across the remaining set of backends. +func (s) TestRoundRobin_OneServerDown(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + cc, _, backends := testRoundRobinBasic(ctx, t) + + // Stop one backend. RPCs should round robin across the remaining backends. + backends[len(backends)-1].Stop() + + addrs := make([]resolver.Address, len(backends)-1) + for i := 0; i < len(backends)-1; i++ { + addrs[i] = resolver.Address{Addr: backends[i].Address} + } + client := testpb.NewTestServiceClient(cc) + if err := checkRoundRobin(ctx, client, addrs); err != nil { + t.Fatalf("RPCs are not being round robined across remaining servers: %v", err) + } +} + +// TestRoundRobin_AllServersDown tests the scenario where a channel is +// configured to round robin across a set of backends, and things are working +// correctly. Then, all backends go down. The test verifies that the channel +// moves to TransientFailure and failfast RPCs fail with Unavailable. +func (s) TestRoundRobin_AllServersDown(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + cc, _, backends := testRoundRobinBasic(ctx, t) + + // Stop all backends. + for _, b := range backends { + b.Stop() + } + + // Wait for TransientFailure. + for state := cc.GetState(); state != connectivity.TransientFailure; state = cc.GetState() { + if !cc.WaitForStateChange(ctx, state) { + t.Fatalf("timeout waiting for state change. got %v; want %v", state, connectivity.TransientFailure) + } + } + + // Failfast RPCs should fail with Unavailable. + client := testpb.NewTestServiceClient(cc) + if _, err := client.EmptyCall(context.Background(), &testpb.Empty{}); status.Code(err) == codes.Unavailable { + return + } +} + +// TestRoundRobin_UpdateAddressAttributes tests the scenario where the addresses +// returned by the resolver contain attributes. The test verifies that the +// attributes contained in the addresses show up as RPC metadata in the backend. +func (s) TestRoundRobin_UpdateAddressAttributes(t *testing.T) { + const ( + testMDKey = "test-md" + testMDValue = "test-md-value" + ) + r := manual.NewBuilderWithScheme("whatever") + + // Spin up a StubServer to serve as a backend. The implementation verifies + // that the expected metadata is received. + testMDChan := make(chan []string, 1) + backend := &stubserver.StubServer{ + EmptyCallF: func(ctx context.Context, _ *testpb.Empty) (*testpb.Empty, error) { + md, ok := metadata.FromIncomingContext(ctx) + if ok { + select { + case testMDChan <- md[testMDKey]: + case <-ctx.Done(): + return nil, ctx.Err() + } + } + return &testpb.Empty{}, nil + }, + } + if err := backend.StartServer(); err != nil { + t.Fatalf("Failed to start backend: %v", err) + } + t.Logf("Started TestService backend at: %q", backend.Address) + t.Cleanup(func() { backend.Stop() }) + + // Dial the backend with round_robin as the LB policy. + dopts := []grpc.DialOption{ + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithResolvers(r), + grpc.WithDefaultServiceConfig(rrServiceConfig), + } + cc, err := grpc.Dial(r.Scheme()+":///test.server", dopts...) + if err != nil { + t.Fatalf("grpc.Dial() failed: %v", err) + } + t.Cleanup(func() { cc.Close() }) + + // Send a resolver update with no address attributes. + addr := resolver.Address{Addr: backend.Address} + r.UpdateState(resolver.State{Addresses: []resolver.Address{addr}}) + + // Make an RPC and ensure it does not contain the metadata we are looking for. + client := testpb.NewTestServiceClient(cc) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil { + t.Fatalf("EmptyCall() = %v, want ", err) + } + select { + case <-ctx.Done(): + t.Fatalf("Timeout when waiting for metadata received in RPC") + case md := <-testMDChan: + if len(md) != 0 { + t.Fatalf("received metadata %v, want nil", md) + } + } + + // Send a resolver update with address attributes. + addrWithAttributes := imetadata.Set(addr, metadata.Pairs(testMDKey, testMDValue)) + r.UpdateState(resolver.State{Addresses: []resolver.Address{addrWithAttributes}}) + + // Make an RPC and ensure it contains the metadata we are looking for. The + // resolver update isn't processed synchronously, so we wait some time before + // failing if some RPCs do not contain it. +Done: + for { + if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil { + t.Fatalf("EmptyCall() = %v, want ", err) + } + select { + case <-ctx.Done(): + t.Fatalf("Timeout when waiting for metadata received in RPC") + case md := <-testMDChan: + if len(md) == 1 && md[0] == testMDValue { + break Done + } + } + time.Sleep(defaultTestShortTimeout) + } +} diff --git a/test/timeouts.go b/test/timeouts.go new file mode 100644 index 00000000000..1c0c2123938 --- /dev/null +++ b/test/timeouts.go @@ -0,0 +1,29 @@ +/* + * + * Copyright 2020 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package test + +import "time" + +const ( + // Default timeout for tests in this package. + defaultTestTimeout = 10 * time.Second + // Default short timeout, to be used when waiting for events which are not + // expected to happen. + defaultTestShortTimeout = 100 * time.Millisecond +)