From 5c533639442c3271fb786211f6e7bc200f5b299d Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Thu, 24 Mar 2022 19:17:30 -0700 Subject: [PATCH] grpc: move to from LB policy when all addresses are removed --- pickfirst.go | 28 +++++++++++---- test/balancer_test.go | 82 ------------------------------------------ test/pickfirst_test.go | 70 ++++++++++++++++++++++++++++++++++++ 3 files changed, 91 insertions(+), 89 deletions(-) diff --git a/pickfirst.go b/pickfirst.go index 5168b62b078a..dc81f18edd98 100644 --- a/pickfirst.go +++ b/pickfirst.go @@ -50,23 +50,37 @@ type pickfirstBalancer struct { } func (b *pickfirstBalancer) ResolverError(err error) { - switch b.state { - case connectivity.TransientFailure, connectivity.Idle, connectivity.Connecting: - // Set a failing picker if we don't have a good picker. - b.cc.UpdateState(balancer.State{ConnectivityState: connectivity.TransientFailure, - Picker: &picker{err: fmt.Errorf("name resolver error: %v", err)}, - }) - } if logger.V(2) { logger.Infof("pickfirstBalancer: ResolverError called with error %v", err) } + if b.sc == nil { + b.state = connectivity.TransientFailure + } + + if b.state != connectivity.TransientFailure { + // The picker will not change since the balancer does not currently + // report an error. + return + } + b.cc.UpdateState(balancer.State{ + ConnectivityState: connectivity.TransientFailure, + Picker: &picker{err: fmt.Errorf("name resolver error: %v", err)}, + }) } func (b *pickfirstBalancer) UpdateClientConnState(cs balancer.ClientConnState) error { if len(cs.ResolverState.Addresses) == 0 { + if b.sc != nil { + // If the resolver returned empty addresses after it gave us non-empty + // addresses, we need to close the existing subConn. Setting sc to nil + // here will cause us to return an error picker from ResolverError(). + b.cc.RemoveSubConn(b.sc) + b.sc = nil + } b.ResolverError(errors.New("produced zero addresses")) return balancer.ErrBadResolverState } + if b.sc == nil { var err error b.sc, err = b.cc.NewSubConn(cs.ResolverState.Addresses, balancer.NewSubConnOptions{}) diff --git a/test/balancer_test.go b/test/balancer_test.go index 113fbaceafbc..c919f1e0f7c4 100644 --- a/test/balancer_test.go +++ b/test/balancer_test.go @@ -32,7 +32,6 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/attributes" "google.golang.org/grpc/balancer" - "google.golang.org/grpc/balancer/roundrobin" "google.golang.org/grpc/codes" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials" @@ -683,87 +682,6 @@ func (s) TestServersSwap(t *testing.T) { } } -// TestEmptyAddrs verifies client behavior when a working connection is -// removed. In pick first and round-robin, both will continue using the old -// connections. -func (s) TestEmptyAddrs(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - - // Initialize server - lis, err := net.Listen("tcp", "localhost:0") - if err != nil { - t.Fatalf("Error while listening. Err: %v", err) - } - s := grpc.NewServer() - defer s.Stop() - const one = "1" - ts := &funcServer{ - unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { - return &testpb.SimpleResponse{Username: one}, nil - }, - } - testpb.RegisterTestServiceServer(s, ts) - go s.Serve(lis) - - // Initialize pickfirst client - pfr := manual.NewBuilderWithScheme("whatever") - - pfr.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: lis.Addr().String()}}}) - - pfcc, err := grpc.DialContext(ctx, pfr.Scheme()+":///", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(pfr)) - if err != nil { - t.Fatalf("Error creating client: %v", err) - } - defer pfcc.Close() - pfclient := testpb.NewTestServiceClient(pfcc) - - // Confirm we are connected to the server - if res, err := pfclient.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil || res.Username != one { - t.Fatalf("UnaryCall(_) = %v, %v; want {Username: %q}, nil", res, err, one) - } - - // Remove all addresses. - pfr.UpdateState(resolver.State{}) - - // Initialize roundrobin client - rrr := manual.NewBuilderWithScheme("whatever") - - rrr.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: lis.Addr().String()}}}) - - rrcc, err := grpc.DialContext(ctx, rrr.Scheme()+":///", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(rrr), - grpc.WithDefaultServiceConfig(fmt.Sprintf(`{ "loadBalancingConfig": [{"%v": {}}] }`, roundrobin.Name))) - if err != nil { - t.Fatalf("Error creating client: %v", err) - } - defer rrcc.Close() - rrclient := testpb.NewTestServiceClient(rrcc) - - // Confirm we are connected to the server - if res, err := rrclient.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil || res.Username != one { - t.Fatalf("UnaryCall(_) = %v, %v; want {Username: %q}, nil", res, err, one) - } - - // Remove all addresses. - rrr.UpdateState(resolver.State{}) - - // Confirm several new RPCs succeed on pick first. - for i := 0; i < 10; i++ { - if _, err := pfclient.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil { - t.Fatalf("UnaryCall(_) = _, %v; want _, nil", err) - } - time.Sleep(5 * time.Millisecond) - } - - // Confirm several new RPCs succeed on round robin. - for i := 0; i < 10; i++ { - if _, err := pfclient.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil { - t.Fatalf("UnaryCall(_) = _, %v; want _, nil", err) - } - time.Sleep(5 * time.Millisecond) - } -} - func (s) TestWaitForReady(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() diff --git a/test/pickfirst_test.go b/test/pickfirst_test.go index 7379ce286910..fb1a9f0eeff7 100644 --- a/test/pickfirst_test.go +++ b/test/pickfirst_test.go @@ -26,7 +26,9 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/codes" + "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/internal/channelz" "google.golang.org/grpc/internal/stubserver" "google.golang.org/grpc/peer" "google.golang.org/grpc/resolver" @@ -258,3 +260,71 @@ func (s) TestPickFirst_AddressesRemoved(t *testing.T) { t.Fatal(err) } } + +// TestPickFirst_NewAddressWhileBlocking tests the case where pick_first is +// configured on a channel, things are working as expected and then a resolver +// updates removes all addresses. An RPC attempted at this point in time will be +// blocked because there are no valid backends. This test verifies that when new +// backends are added, the RPC is able to complete. +func (s) TestPickFirst_NewAddressWhileBlocking(t *testing.T) { + cc, r, backends := setupPickFirst(t, 2) + + addrs := backendsToAddrs(backends) + r.UpdateState(resolver.State{Addresses: addrs}) + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if err := checkPickFirst(ctx, cc, addrs[0].Addr); err != nil { + t.Fatal(err) + } + + // Send a resolver update with no addresses. This should push the channel into + // TransientFailure. + r.UpdateState(resolver.State{}) + for state := cc.GetState(); state != connectivity.TransientFailure; state = cc.GetState() { + if !cc.WaitForStateChange(ctx, state) { + t.Fatalf("timeout waiting for state change. got %v; want %v", state, connectivity.TransientFailure) + } + } + + doneCh := make(chan struct{}) + client := testpb.NewTestServiceClient(cc) + go func() { + // The channel is currently in TransientFailure and this RPC will block + // until the channel becomes Ready, which will only happen when we push a + // resolver update with a valid backend address. + if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { + t.Errorf("EmptyCall() = %v, want ", err) + } + close(doneCh) + }() + + // Make sure that there is one pending RPC on the ClientConn before attempting + // to push new addresses through the name resolver. If we don't do this, the + // resolver update can happen before the above goroutine gets to make the RPC. + for { + if err := ctx.Err(); err != nil { + t.Fatal(err) + } + tcs, _ := channelz.GetTopChannels(0, 0) + if len(tcs) != 1 { + t.Fatalf("there should only be one top channel, not %d", len(tcs)) + } + started := tcs[0].ChannelData.CallsStarted + completed := tcs[0].ChannelData.CallsSucceeded + tcs[0].ChannelData.CallsFailed + if (started - completed) == 1 { + break + } + time.Sleep(defaultTestShortTimeout) + } + + // Send a resolver update with a valid backend to push the channel to Ready + // and unblock the above RPC. + r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: backends[0].Address}}}) + + select { + case <-ctx.Done(): + t.Fatal("Timeout when waiting for blocked RPC to complete") + case <-doneCh: + } +}