diff --git a/xds/internal/balancer/clustermanager/clustermanager.go b/xds/internal/balancer/clustermanager/clustermanager.go index 8d71200d8c61..930427df1b72 100644 --- a/xds/internal/balancer/clustermanager/clustermanager.go +++ b/xds/internal/balancer/clustermanager/clustermanager.go @@ -93,6 +93,11 @@ func (b *bal) updateChildren(s balancer.ClientConnState, newConfig *lbConfig) { b.stateAggregator.add(name) // Then add to the balancer group. b.bg.Add(name, balancer.Get(newT.ChildPolicy.Name)) + } else { + // Already present, check for type change and if so send down a new builder. + if newT.ChildPolicy.Name != b.children[name].ChildPolicy.Name { + b.bg.UpdateBuilder(name, balancer.Get(newT.ChildPolicy.Name)) + } } // TODO: handle error? How to aggregate errors and return? _ = b.bg.UpdateClientConnState(name, balancer.ClientConnState{ diff --git a/xds/internal/balancer/clustermanager/clustermanager_test.go b/xds/internal/balancer/clustermanager/clustermanager_test.go index 771152b7bb97..e8e551a0ca17 100644 --- a/xds/internal/balancer/clustermanager/clustermanager_test.go +++ b/xds/internal/balancer/clustermanager/clustermanager_test.go @@ -25,6 +25,7 @@ import ( "time" "github.com/google/go-cmp/cmp" + "google.golang.org/grpc" "google.golang.org/grpc/balancer" "google.golang.org/grpc/balancer/roundrobin" "google.golang.org/grpc/codes" @@ -48,6 +49,11 @@ func Test(t *testing.T) { grpctest.RunSubTests(t, s{}) } +const ( + defaultTestTimeout = 5 * time.Second + defaultTestShortTimeout = 10 * time.Millisecond +) + var ( rtBuilder balancer.Builder rtParser balancer.ConfigParser @@ -102,6 +108,7 @@ func init() { rtParser = rtBuilder.(balancer.ConfigParser) balancer.Register(&ignoreAttrsRRBuilder{balancer.Get(roundrobin.Name)}) + balancer.Register(wrappedPickFirstBalancerBuilder{}) balancergroup.DefaultSubBalancerCloseTimeout = time.Millisecond } @@ -632,3 +639,127 @@ func TestInitialIdle(t *testing.T) { t.Fatalf("Received aggregated state: %v, want Idle", state1) } } + +// TestClusterGracefulSwitch tests the graceful switch functionality for a child +// of the cluster manager. At first, the child is configured as a round robin +// load balancer, and thus should behave accordingly. The test then gracefully +// switches this child to a pick first load balancer. Once that balancer updates +// it's state and completes the graceful switch process the new picker should +// reflect this change. +func TestClusterGracefulSwitch(t *testing.T) { + cc := testutils.NewTestClientConn(t) + rtb := rtBuilder.Build(cc, balancer.BuildOptions{}) + + configJSON1 := `{ +"children": { + "csp:cluster":{ "childPolicy": [{"ignore_attrs_round_robin":""}] } +} +}` + config1, err := rtParser.ParseConfig([]byte(configJSON1)) + if err != nil { + t.Fatalf("failed to parse balancer config: %v", err) + } + wantAddrs := []resolver.Address{ + {Addr: testBackendAddrStrs[0], BalancerAttributes: nil}, + {Addr: testBackendAddrStrs[1], BalancerAttributes: nil}, + } + if err := rtb.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: resolver.State{Addresses: []resolver.Address{ + hierarchy.Set(wantAddrs[0], []string{"csp:cluster"}), + }}, + BalancerConfig: config1, + }); err != nil { + t.Fatalf("failed to update ClientConn state: %v", err) + } + + sc1 := <-cc.NewSubConnCh + rtb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + rtb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready}) + p1 := <-cc.NewPickerCh + pi := balancer.PickInfo{ + Ctx: SetPickedCluster(context.Background(), "csp:cluster"), + } + testPick(t, p1, pi, sc1, nil) + + // Same cluster, different balancer type. + configJSON2 := `{ +"children": { + "csp:cluster":{ "childPolicy": [{"wrappedPickFirstBalancer":""}] } +} +}` + config2, err := rtParser.ParseConfig([]byte(configJSON2)) + if err != nil { + t.Fatalf("failed to parse balancer config: %v", err) + } + if err := rtb.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: resolver.State{Addresses: []resolver.Address{ + hierarchy.Set(wantAddrs[1], []string{"csp:cluster"}), + }}, + BalancerConfig: config2, + }); err != nil { + t.Fatalf("failed to update ClientConn state: %v", err) + } + sc2 := <-cc.NewSubConnCh + // Update the pick first balancers SubConn as CONNECTING. This will cause + // the pick first balancer to UpdateState() with CONNECTING, which shouldn't send + // a Picker update back, as the Graceful Switch process is not complete. + rtb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestShortTimeout) + defer cancel() + select { + case <-cc.NewPickerCh: + t.Fatalf("No new picker should have been sent due to the Graceful Switch process not completing") + case <-ctx.Done(): + } + + // Update the pick first balancers SubConn as READY. This will cause + // the pick first balancer to UpdateState() with READY, which should send a + // Picker update back, as the Graceful Switch process is complete. This + // Picker should always pick the pick first's created SubConn. + rtb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Ready}) + p2 := <-cc.NewPickerCh + testPick(t, p2, pi, sc2, nil) + // The Graceful Switch process completing for the child should cause the + // SubConns for the balancer being gracefully switched from to get deleted. + ctx, cancel = context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + select { + case <-ctx.Done(): + t.Fatalf("error waiting for RemoveSubConn()") + case rsc := <-cc.RemoveSubConnCh: + // The SubConn removed should have been the created SubConn + // from the child before switching. + if rsc != sc1 { + t.Fatalf("RemoveSubConn() got: %v, want %v", rsc, sc1) + } + } +} + +type wrappedPickFirstBalancerBuilder struct{} + +func (wrappedPickFirstBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer { + builder := balancer.Get(grpc.PickFirstBalancerName) + wpfb := &wrappedPickFirstBalancer{ + ClientConn: cc, + } + pf := builder.Build(wpfb, opts) + wpfb.Balancer = pf + return wpfb +} + +func (wrappedPickFirstBalancerBuilder) Name() string { + return "wrappedPickFirstBalancer" +} + +type wrappedPickFirstBalancer struct { + balancer.Balancer + balancer.ClientConn +} + +func (wb *wrappedPickFirstBalancer) UpdateState(state balancer.State) { + // Eat it if IDLE - allows it to switch over only on a READY SubConn. + if state.ConnectivityState == connectivity.Idle { + return + } + wb.ClientConn.UpdateState(state) +}