From bd0f88150dfdee1a5a316f2f64ecb9cf05c27fb7 Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Wed, 20 Oct 2021 10:52:03 -0700 Subject: [PATCH] grpclb: recover after receiving an empty server list (#4879) --- balancer/grpclb/grpclb_remote_balancer.go | 12 +++- balancer/grpclb/grpclb_test.go | 67 +++++++++++++++++++++++ 2 files changed, 77 insertions(+), 2 deletions(-) diff --git a/balancer/grpclb/grpclb_remote_balancer.go b/balancer/grpclb/grpclb_remote_balancer.go index 0210c012d7b..330df4baa21 100644 --- a/balancer/grpclb/grpclb_remote_balancer.go +++ b/balancer/grpclb/grpclb_remote_balancer.go @@ -135,11 +135,19 @@ func (lb *lbBalancer) refreshSubConns(backendAddrs []resolver.Address, fallback } if lb.usePickFirst { - var sc balancer.SubConn - for _, sc = range lb.subConns { + var ( + scKey resolver.Address + sc balancer.SubConn + ) + for scKey, sc = range lb.subConns { break } if sc != nil { + if len(backendAddrs) == 0 { + lb.cc.cc.RemoveSubConn(sc) + delete(lb.subConns, scKey) + return + } lb.cc.cc.UpdateAddresses(sc, backendAddrs) sc.Connect() return diff --git a/balancer/grpclb/grpclb_test.go b/balancer/grpclb/grpclb_test.go index 3b666764728..22aa8f1b868 100644 --- a/balancer/grpclb/grpclb_test.go +++ b/balancer/grpclb/grpclb_test.go @@ -1274,6 +1274,73 @@ func (s) TestGRPCLBBackendConnectionErrorPropagation(t *testing.T) { wg.Wait() } +func testGRPCLBEmptyServerList(t *testing.T, svcfg string) { + r := manual.NewBuilderWithScheme("whatever") + + tss, cleanup, err := startBackendsAndRemoteLoadBalancer(1, "", nil) + if err != nil { + t.Fatalf("failed to create new load balancer: %v", err) + } + defer cleanup() + + beServers := []*lbpb.Server{{ + IpAddress: tss.beIPs[0], + Port: int32(tss.bePorts[0]), + LoadBalanceToken: lbToken, + }} + + creds := serverNameCheckCreds{} + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName, + grpc.WithResolvers(r), + grpc.WithTransportCredentials(&creds), + grpc.WithContextDialer(fakeNameDialer)) + if err != nil { + t.Fatalf("Failed to dial to the backend %v", err) + } + defer cc.Close() + testC := testpb.NewTestServiceClient(cc) + + tss.ls.sls <- &lbpb.ServerList{Servers: beServers} + + rs := grpclbstate.Set(resolver.State{ServiceConfig: r.CC.ParseServiceConfig(svcfg)}, + &grpclbstate.State{BalancerAddresses: []resolver.Address{{ + Addr: tss.lbAddr, + ServerName: lbServerName, + }}}) + r.UpdateState(rs) + t.Log("Perform an initial RPC and expect it to succeed...") + if _, err := testC.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { + t.Fatalf("Initial _.EmptyCall(_, _) = _, %v, want _, ", err) + } + t.Log("Now send an empty server list. Wait until we see an RPC failure to make sure the client got it...") + tss.ls.sls <- &lbpb.ServerList{} + gotError := false + for i := 0; i < 100; i++ { + if _, err := testC.EmptyCall(ctx, &testpb.Empty{}); err != nil { + gotError = true + break + } + } + if !gotError { + t.Fatalf("Expected to eventually see an RPC fail after the grpclb sends an empty server list, but none did.") + } + t.Log("Now send a non-empty server list. A wait-for-ready RPC should now succeed...") + tss.ls.sls <- &lbpb.ServerList{Servers: beServers} + if _, err := testC.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { + t.Fatalf("Final _.EmptyCall(_, _) = _, %v, want _, ", err) + } +} + +func (s) TestGRPCLBEmptyServerListRoundRobin(t *testing.T) { + testGRPCLBEmptyServerList(t, `{"loadBalancingConfig":[{"grpclb":{"childPolicy":[{"round_robin":{}}]}}]}`) +} + +func (s) TestGRPCLBEmptyServerListPickFirst(t *testing.T) { + testGRPCLBEmptyServerList(t, `{"loadBalancingConfig":[{"grpclb":{"childPolicy":[{"pick_first":{}}]}}]}`) +} + func (s) TestGRPCLBWithTargetNameFieldInConfig(t *testing.T) { r := manual.NewBuilderWithScheme("whatever")