From 36afb5c3d5ccf524a59add15859618098e431872 Mon Sep 17 00:00:00 2001 From: Zach Reyes Date: Tue, 15 Mar 2022 20:41:25 -0400 Subject: [PATCH] Added call to UpdateBuilder() in Cluster Manager --- internal/balancergroup/balancergroup.go | 14 ++ .../balancer/clustermanager/clustermanager.go | 5 + .../clustermanager/clustermanager_test.go | 131 ++++++++++++++++++ 3 files changed, 150 insertions(+) diff --git a/internal/balancergroup/balancergroup.go b/internal/balancergroup/balancergroup.go index 3daad14473ee..1c71ab8f51e3 100644 --- a/internal/balancergroup/balancergroup.go +++ b/internal/balancergroup/balancergroup.go @@ -179,6 +179,20 @@ func (sbc *subBalancerWrapper) stopBalancer() { sbc.balancer = nil } +func (sbc *subBalancerWrapper) gracefulSwitch(builder balancer.Builder) { + sbc.builder = builder + b := sbc.balancer + // Even if you get an add and it persists builder but doesn't start + // balancer, this would leave graceful switch being nil, in which we are + // correctly overwriting with the recent builder here as well to use later. + // The graceful switch balancer's presence is an invariant of whether the + // balancer group is closed or not (if closed, nil, if started, present). + if sbc.balancer != nil { + sbc.group.logger.Infof("Switching child policy %v to type %v", sbc.id, sbc.builder.Name()) + b.SwitchTo(sbc.builder) + } +} + // BalancerGroup takes a list of balancers, and make them into one balancer. // // Note that this struct doesn't implement balancer.Balancer, because it's not diff --git a/xds/internal/balancer/clustermanager/clustermanager.go b/xds/internal/balancer/clustermanager/clustermanager.go index 8d71200d8c61..930427df1b72 100644 --- a/xds/internal/balancer/clustermanager/clustermanager.go +++ b/xds/internal/balancer/clustermanager/clustermanager.go @@ -93,6 +93,11 @@ func (b *bal) updateChildren(s balancer.ClientConnState, newConfig *lbConfig) { b.stateAggregator.add(name) // Then add to the balancer group. b.bg.Add(name, balancer.Get(newT.ChildPolicy.Name)) + } else { + // Already present, check for type change and if so send down a new builder. + if newT.ChildPolicy.Name != b.children[name].ChildPolicy.Name { + b.bg.UpdateBuilder(name, balancer.Get(newT.ChildPolicy.Name)) + } } // TODO: handle error? How to aggregate errors and return? _ = b.bg.UpdateClientConnState(name, balancer.ClientConnState{ diff --git a/xds/internal/balancer/clustermanager/clustermanager_test.go b/xds/internal/balancer/clustermanager/clustermanager_test.go index 771152b7bb97..e8e551a0ca17 100644 --- a/xds/internal/balancer/clustermanager/clustermanager_test.go +++ b/xds/internal/balancer/clustermanager/clustermanager_test.go @@ -25,6 +25,7 @@ import ( "time" "github.com/google/go-cmp/cmp" + "google.golang.org/grpc" "google.golang.org/grpc/balancer" "google.golang.org/grpc/balancer/roundrobin" "google.golang.org/grpc/codes" @@ -48,6 +49,11 @@ func Test(t *testing.T) { grpctest.RunSubTests(t, s{}) } +const ( + defaultTestTimeout = 5 * time.Second + defaultTestShortTimeout = 10 * time.Millisecond +) + var ( rtBuilder balancer.Builder rtParser balancer.ConfigParser @@ -102,6 +108,7 @@ func init() { rtParser = rtBuilder.(balancer.ConfigParser) balancer.Register(&ignoreAttrsRRBuilder{balancer.Get(roundrobin.Name)}) + balancer.Register(wrappedPickFirstBalancerBuilder{}) balancergroup.DefaultSubBalancerCloseTimeout = time.Millisecond } @@ -632,3 +639,127 @@ func TestInitialIdle(t *testing.T) { t.Fatalf("Received aggregated state: %v, want Idle", state1) } } + +// TestClusterGracefulSwitch tests the graceful switch functionality for a child +// of the cluster manager. At first, the child is configured as a round robin +// load balancer, and thus should behave accordingly. The test then gracefully +// switches this child to a pick first load balancer. Once that balancer updates +// it's state and completes the graceful switch process the new picker should +// reflect this change. +func TestClusterGracefulSwitch(t *testing.T) { + cc := testutils.NewTestClientConn(t) + rtb := rtBuilder.Build(cc, balancer.BuildOptions{}) + + configJSON1 := `{ +"children": { + "csp:cluster":{ "childPolicy": [{"ignore_attrs_round_robin":""}] } +} +}` + config1, err := rtParser.ParseConfig([]byte(configJSON1)) + if err != nil { + t.Fatalf("failed to parse balancer config: %v", err) + } + wantAddrs := []resolver.Address{ + {Addr: testBackendAddrStrs[0], BalancerAttributes: nil}, + {Addr: testBackendAddrStrs[1], BalancerAttributes: nil}, + } + if err := rtb.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: resolver.State{Addresses: []resolver.Address{ + hierarchy.Set(wantAddrs[0], []string{"csp:cluster"}), + }}, + BalancerConfig: config1, + }); err != nil { + t.Fatalf("failed to update ClientConn state: %v", err) + } + + sc1 := <-cc.NewSubConnCh + rtb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + rtb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready}) + p1 := <-cc.NewPickerCh + pi := balancer.PickInfo{ + Ctx: SetPickedCluster(context.Background(), "csp:cluster"), + } + testPick(t, p1, pi, sc1, nil) + + // Same cluster, different balancer type. + configJSON2 := `{ +"children": { + "csp:cluster":{ "childPolicy": [{"wrappedPickFirstBalancer":""}] } +} +}` + config2, err := rtParser.ParseConfig([]byte(configJSON2)) + if err != nil { + t.Fatalf("failed to parse balancer config: %v", err) + } + if err := rtb.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: resolver.State{Addresses: []resolver.Address{ + hierarchy.Set(wantAddrs[1], []string{"csp:cluster"}), + }}, + BalancerConfig: config2, + }); err != nil { + t.Fatalf("failed to update ClientConn state: %v", err) + } + sc2 := <-cc.NewSubConnCh + // Update the pick first balancers SubConn as CONNECTING. This will cause + // the pick first balancer to UpdateState() with CONNECTING, which shouldn't send + // a Picker update back, as the Graceful Switch process is not complete. + rtb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestShortTimeout) + defer cancel() + select { + case <-cc.NewPickerCh: + t.Fatalf("No new picker should have been sent due to the Graceful Switch process not completing") + case <-ctx.Done(): + } + + // Update the pick first balancers SubConn as READY. This will cause + // the pick first balancer to UpdateState() with READY, which should send a + // Picker update back, as the Graceful Switch process is complete. This + // Picker should always pick the pick first's created SubConn. + rtb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Ready}) + p2 := <-cc.NewPickerCh + testPick(t, p2, pi, sc2, nil) + // The Graceful Switch process completing for the child should cause the + // SubConns for the balancer being gracefully switched from to get deleted. + ctx, cancel = context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + select { + case <-ctx.Done(): + t.Fatalf("error waiting for RemoveSubConn()") + case rsc := <-cc.RemoveSubConnCh: + // The SubConn removed should have been the created SubConn + // from the child before switching. + if rsc != sc1 { + t.Fatalf("RemoveSubConn() got: %v, want %v", rsc, sc1) + } + } +} + +type wrappedPickFirstBalancerBuilder struct{} + +func (wrappedPickFirstBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer { + builder := balancer.Get(grpc.PickFirstBalancerName) + wpfb := &wrappedPickFirstBalancer{ + ClientConn: cc, + } + pf := builder.Build(wpfb, opts) + wpfb.Balancer = pf + return wpfb +} + +func (wrappedPickFirstBalancerBuilder) Name() string { + return "wrappedPickFirstBalancer" +} + +type wrappedPickFirstBalancer struct { + balancer.Balancer + balancer.ClientConn +} + +func (wb *wrappedPickFirstBalancer) UpdateState(state balancer.State) { + // Eat it if IDLE - allows it to switch over only on a READY SubConn. + if state.ConnectivityState == connectivity.Idle { + return + } + wb.ClientConn.UpdateState(state) +}