From b6873c006da794d53fbee52aa463e8c21e9fc958 Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Tue, 29 Mar 2022 15:06:28 -0700 Subject: [PATCH] grpc: move to `TransientFailure` in `pick_first` LB policy when all addresses are removed (#5274) --- picker_wrapper_test.go | 4 +- pickfirst.go | 126 +++++++++++++++++++++++++---------------- test/balancer_test.go | 82 --------------------------- test/pickfirst_test.go | 74 ++++++++++++++++++++++++ 4 files changed, 153 insertions(+), 133 deletions(-) diff --git a/picker_wrapper_test.go b/picker_wrapper_test.go index 5f786b28580..a4fae85d397 100644 --- a/picker_wrapper_test.go +++ b/picker_wrapper_test.go @@ -97,7 +97,7 @@ func (s) TestBlockingPickNoSubAvailable(t *testing.T) { bp := newPickerWrapper() var finishedCount uint64 bp.updatePicker(&testingPicker{err: balancer.ErrNoSubConnAvailable, maxCalled: goroutineCount}) - // All goroutines should block because picker returns no sc available. + // All goroutines should block because picker returns no subConn available. for i := goroutineCount; i > 0; i-- { go func() { if tr, _, err := bp.pick(context.Background(), true, balancer.PickInfo{}); err != nil || tr != testT { @@ -138,7 +138,7 @@ func (s) TestBlockingPickSCNotReady(t *testing.T) { bp := newPickerWrapper() bp.updatePicker(&testingPicker{sc: testSCNotReady, maxCalled: goroutineCount}) var finishedCount uint64 - // All goroutines should block because sc is not ready. + // All goroutines should block because subConn is not ready. for i := goroutineCount; i > 0; i-- { go func() { if tr, _, err := bp.pick(context.Background(), true, balancer.PickInfo{}); err != nil || tr != testT { diff --git a/pickfirst.go b/pickfirst.go index 5168b62b078..fb7a99e0a27 100644 --- a/pickfirst.go +++ b/pickfirst.go @@ -44,79 +44,107 @@ func (*pickfirstBuilder) Name() string { } type pickfirstBalancer struct { - state connectivity.State - cc balancer.ClientConn - sc balancer.SubConn + state connectivity.State + cc balancer.ClientConn + subConn balancer.SubConn } func (b *pickfirstBalancer) ResolverError(err error) { - switch b.state { - case connectivity.TransientFailure, connectivity.Idle, connectivity.Connecting: - // Set a failing picker if we don't have a good picker. - b.cc.UpdateState(balancer.State{ConnectivityState: connectivity.TransientFailure, - Picker: &picker{err: fmt.Errorf("name resolver error: %v", err)}, - }) - } if logger.V(2) { logger.Infof("pickfirstBalancer: ResolverError called with error %v", err) } + if b.subConn == nil { + b.state = connectivity.TransientFailure + } + + if b.state != connectivity.TransientFailure { + // The picker will not change since the balancer does not currently + // report an error. + return + } + b.cc.UpdateState(balancer.State{ + ConnectivityState: connectivity.TransientFailure, + Picker: &picker{err: fmt.Errorf("name resolver error: %v", err)}, + }) } -func (b *pickfirstBalancer) UpdateClientConnState(cs balancer.ClientConnState) error { - if len(cs.ResolverState.Addresses) == 0 { +func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState) error { + if len(state.ResolverState.Addresses) == 0 { + // The resolver reported an empty address list. Treat it like an error by + // calling b.ResolverError. + if b.subConn != nil { + // Remove the old subConn. All addresses were removed, so it is no longer + // valid. + b.cc.RemoveSubConn(b.subConn) + b.subConn = nil + } b.ResolverError(errors.New("produced zero addresses")) return balancer.ErrBadResolverState } - if b.sc == nil { - var err error - b.sc, err = b.cc.NewSubConn(cs.ResolverState.Addresses, balancer.NewSubConnOptions{}) - if err != nil { - if logger.V(2) { - logger.Errorf("pickfirstBalancer: failed to NewSubConn: %v", err) - } - b.state = connectivity.TransientFailure - b.cc.UpdateState(balancer.State{ConnectivityState: connectivity.TransientFailure, - Picker: &picker{err: fmt.Errorf("error creating connection: %v", err)}, - }) - return balancer.ErrBadResolverState + + if b.subConn != nil { + b.cc.UpdateAddresses(b.subConn, state.ResolverState.Addresses) + return nil + } + + subConn, err := b.cc.NewSubConn(state.ResolverState.Addresses, balancer.NewSubConnOptions{}) + if err != nil { + if logger.V(2) { + logger.Errorf("pickfirstBalancer: failed to NewSubConn: %v", err) } - b.state = connectivity.Idle - b.cc.UpdateState(balancer.State{ConnectivityState: connectivity.Idle, Picker: &picker{result: balancer.PickResult{SubConn: b.sc}}}) - b.sc.Connect() - } else { - b.cc.UpdateAddresses(b.sc, cs.ResolverState.Addresses) - b.sc.Connect() + b.state = connectivity.TransientFailure + b.cc.UpdateState(balancer.State{ + ConnectivityState: connectivity.TransientFailure, + Picker: &picker{err: fmt.Errorf("error creating connection: %v", err)}, + }) + return balancer.ErrBadResolverState } + b.subConn = subConn + b.state = connectivity.Idle + b.cc.UpdateState(balancer.State{ + ConnectivityState: connectivity.Idle, + Picker: &picker{result: balancer.PickResult{SubConn: b.subConn}}, + }) + b.subConn.Connect() return nil } -func (b *pickfirstBalancer) UpdateSubConnState(sc balancer.SubConn, s balancer.SubConnState) { +func (b *pickfirstBalancer) UpdateSubConnState(subConn balancer.SubConn, state balancer.SubConnState) { if logger.V(2) { - logger.Infof("pickfirstBalancer: UpdateSubConnState: %p, %v", sc, s) + logger.Infof("pickfirstBalancer: UpdateSubConnState: %p, %v", subConn, state) } - if b.sc != sc { + if b.subConn != subConn { if logger.V(2) { - logger.Infof("pickfirstBalancer: ignored state change because sc is not recognized") + logger.Infof("pickfirstBalancer: ignored state change because subConn is not recognized") } return } - b.state = s.ConnectivityState - if s.ConnectivityState == connectivity.Shutdown { - b.sc = nil + b.state = state.ConnectivityState + if state.ConnectivityState == connectivity.Shutdown { + b.subConn = nil return } - switch s.ConnectivityState { + switch state.ConnectivityState { case connectivity.Ready: - b.cc.UpdateState(balancer.State{ConnectivityState: s.ConnectivityState, Picker: &picker{result: balancer.PickResult{SubConn: sc}}}) + b.cc.UpdateState(balancer.State{ + ConnectivityState: state.ConnectivityState, + Picker: &picker{result: balancer.PickResult{SubConn: subConn}}, + }) case connectivity.Connecting: - b.cc.UpdateState(balancer.State{ConnectivityState: s.ConnectivityState, Picker: &picker{err: balancer.ErrNoSubConnAvailable}}) + b.cc.UpdateState(balancer.State{ + ConnectivityState: state.ConnectivityState, + Picker: &picker{err: balancer.ErrNoSubConnAvailable}, + }) case connectivity.Idle: - b.cc.UpdateState(balancer.State{ConnectivityState: s.ConnectivityState, Picker: &idlePicker{sc: sc}}) + b.cc.UpdateState(balancer.State{ + ConnectivityState: state.ConnectivityState, + Picker: &idlePicker{subConn: subConn}, + }) case connectivity.TransientFailure: b.cc.UpdateState(balancer.State{ - ConnectivityState: s.ConnectivityState, - Picker: &picker{err: s.ConnectionError}, + ConnectivityState: state.ConnectivityState, + Picker: &picker{err: state.ConnectionError}, }) } } @@ -125,8 +153,8 @@ func (b *pickfirstBalancer) Close() { } func (b *pickfirstBalancer) ExitIdle() { - if b.sc != nil && b.state == connectivity.Idle { - b.sc.Connect() + if b.subConn != nil && b.state == connectivity.Idle { + b.subConn.Connect() } } @@ -135,18 +163,18 @@ type picker struct { err error } -func (p *picker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { +func (p *picker) Pick(balancer.PickInfo) (balancer.PickResult, error) { return p.result, p.err } // idlePicker is used when the SubConn is IDLE and kicks the SubConn into // CONNECTING when Pick is called. type idlePicker struct { - sc balancer.SubConn + subConn balancer.SubConn } -func (i *idlePicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { - i.sc.Connect() +func (i *idlePicker) Pick(balancer.PickInfo) (balancer.PickResult, error) { + i.subConn.Connect() return balancer.PickResult{}, balancer.ErrNoSubConnAvailable } diff --git a/test/balancer_test.go b/test/balancer_test.go index 113fbaceafb..c919f1e0f7c 100644 --- a/test/balancer_test.go +++ b/test/balancer_test.go @@ -32,7 +32,6 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/attributes" "google.golang.org/grpc/balancer" - "google.golang.org/grpc/balancer/roundrobin" "google.golang.org/grpc/codes" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials" @@ -683,87 +682,6 @@ func (s) TestServersSwap(t *testing.T) { } } -// TestEmptyAddrs verifies client behavior when a working connection is -// removed. In pick first and round-robin, both will continue using the old -// connections. -func (s) TestEmptyAddrs(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - - // Initialize server - lis, err := net.Listen("tcp", "localhost:0") - if err != nil { - t.Fatalf("Error while listening. Err: %v", err) - } - s := grpc.NewServer() - defer s.Stop() - const one = "1" - ts := &funcServer{ - unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { - return &testpb.SimpleResponse{Username: one}, nil - }, - } - testpb.RegisterTestServiceServer(s, ts) - go s.Serve(lis) - - // Initialize pickfirst client - pfr := manual.NewBuilderWithScheme("whatever") - - pfr.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: lis.Addr().String()}}}) - - pfcc, err := grpc.DialContext(ctx, pfr.Scheme()+":///", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(pfr)) - if err != nil { - t.Fatalf("Error creating client: %v", err) - } - defer pfcc.Close() - pfclient := testpb.NewTestServiceClient(pfcc) - - // Confirm we are connected to the server - if res, err := pfclient.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil || res.Username != one { - t.Fatalf("UnaryCall(_) = %v, %v; want {Username: %q}, nil", res, err, one) - } - - // Remove all addresses. - pfr.UpdateState(resolver.State{}) - - // Initialize roundrobin client - rrr := manual.NewBuilderWithScheme("whatever") - - rrr.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: lis.Addr().String()}}}) - - rrcc, err := grpc.DialContext(ctx, rrr.Scheme()+":///", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(rrr), - grpc.WithDefaultServiceConfig(fmt.Sprintf(`{ "loadBalancingConfig": [{"%v": {}}] }`, roundrobin.Name))) - if err != nil { - t.Fatalf("Error creating client: %v", err) - } - defer rrcc.Close() - rrclient := testpb.NewTestServiceClient(rrcc) - - // Confirm we are connected to the server - if res, err := rrclient.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil || res.Username != one { - t.Fatalf("UnaryCall(_) = %v, %v; want {Username: %q}, nil", res, err, one) - } - - // Remove all addresses. - rrr.UpdateState(resolver.State{}) - - // Confirm several new RPCs succeed on pick first. - for i := 0; i < 10; i++ { - if _, err := pfclient.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil { - t.Fatalf("UnaryCall(_) = _, %v; want _, nil", err) - } - time.Sleep(5 * time.Millisecond) - } - - // Confirm several new RPCs succeed on round robin. - for i := 0; i < 10; i++ { - if _, err := pfclient.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil { - t.Fatalf("UnaryCall(_) = _, %v; want _, nil", err) - } - time.Sleep(5 * time.Millisecond) - } -} - func (s) TestWaitForReady(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() diff --git a/test/pickfirst_test.go b/test/pickfirst_test.go index 7379ce28691..c88fdac1e72 100644 --- a/test/pickfirst_test.go +++ b/test/pickfirst_test.go @@ -26,7 +26,9 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/codes" + "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/internal/channelz" "google.golang.org/grpc/internal/stubserver" "google.golang.org/grpc/peer" "google.golang.org/grpc/resolver" @@ -43,6 +45,11 @@ const pickFirstServiceConfig = `{"loadBalancingConfig": [{"pick_first":{}}]}` // with service config specifying the use of the pick_first LB policy. func setupPickFirst(t *testing.T, backendCount int, opts ...grpc.DialOption) (*grpc.ClientConn, *manual.Resolver, []*stubserver.StubServer) { t.Helper() + + // Initialize channelz. Used to determine pending RPC count. + czCleanup := channelz.NewChannelzStorageForTesting() + t.Cleanup(func() { czCleanupWrapper(czCleanup, t) }) + r := manual.NewBuilderWithScheme("whatever") backends := make([]*stubserver.StubServer, backendCount) @@ -258,3 +265,70 @@ func (s) TestPickFirst_AddressesRemoved(t *testing.T) { t.Fatal(err) } } + +// TestPickFirst_NewAddressWhileBlocking tests the case where pick_first is +// configured on a channel, things are working as expected and then a resolver +// updates removes all addresses. An RPC attempted at this point in time will be +// blocked because there are no valid backends. This test verifies that when new +// backends are added, the RPC is able to complete. +func (s) TestPickFirst_NewAddressWhileBlocking(t *testing.T) { + cc, r, backends := setupPickFirst(t, 2) + addrs := backendsToAddrs(backends) + r.UpdateState(resolver.State{Addresses: addrs}) + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if err := checkPickFirst(ctx, cc, addrs[0].Addr); err != nil { + t.Fatal(err) + } + + // Send a resolver update with no addresses. This should push the channel into + // TransientFailure. + r.UpdateState(resolver.State{}) + for state := cc.GetState(); state != connectivity.TransientFailure; state = cc.GetState() { + if !cc.WaitForStateChange(ctx, state) { + t.Fatalf("timeout waiting for state change. got %v; want %v", state, connectivity.TransientFailure) + } + } + + doneCh := make(chan struct{}) + client := testpb.NewTestServiceClient(cc) + go func() { + // The channel is currently in TransientFailure and this RPC will block + // until the channel becomes Ready, which will only happen when we push a + // resolver update with a valid backend address. + if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { + t.Errorf("EmptyCall() = %v, want ", err) + } + close(doneCh) + }() + + // Make sure that there is one pending RPC on the ClientConn before attempting + // to push new addresses through the name resolver. If we don't do this, the + // resolver update can happen before the above goroutine gets to make the RPC. + for { + if err := ctx.Err(); err != nil { + t.Fatal(err) + } + tcs, _ := channelz.GetTopChannels(0, 0) + if len(tcs) != 1 { + t.Fatalf("there should only be one top channel, not %d", len(tcs)) + } + started := tcs[0].ChannelData.CallsStarted + completed := tcs[0].ChannelData.CallsSucceeded + tcs[0].ChannelData.CallsFailed + if (started - completed) == 1 { + break + } + time.Sleep(defaultTestShortTimeout) + } + + // Send a resolver update with a valid backend to push the channel to Ready + // and unblock the above RPC. + r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: backends[0].Address}}}) + + select { + case <-ctx.Done(): + t.Fatal("Timeout when waiting for blocked RPC to complete") + case <-doneCh: + } +}