diff --git a/internal/balancergroup/balancergroup.go b/internal/balancergroup/balancergroup.go index 3daad14473e..70edbb17c0c 100644 --- a/internal/balancergroup/balancergroup.go +++ b/internal/balancergroup/balancergroup.go @@ -97,7 +97,7 @@ func (sbc *subBalancerWrapper) startBalancer() { if sbc.balancer == nil { sbc.balancer = gracefulswitch.NewBalancer(sbc, sbc.buildOpts) } - sbc.group.logger.Infof("Creating child policy of type %v", sbc.builder.Name()) + sbc.group.logger.Infof("Creating child policy of type %q for locality %q", sbc.builder.Name(), sbc.id) sbc.balancer.SwitchTo(sbc.builder) if sbc.ccState != nil { sbc.balancer.UpdateClientConnState(*sbc.ccState) @@ -298,10 +298,22 @@ func (bg *BalancerGroup) Start() { bg.outgoingMu.Unlock() } -// Add adds a balancer built by builder to the group, with given id. -func (bg *BalancerGroup) Add(id string, builder balancer.Builder) { +// AddWithClientConn adds a balancer with the given id to the group. The +// balancer is built with a balancer builder registered with balancerName. The +// given ClientConn is passed to the newly built balancer instead of the +// onepassed to balancergroup.New(). +// +// TODO: Get rid of the existing Add() API and replace it with this. +func (bg *BalancerGroup) AddWithClientConn(id, balancerName string, cc balancer.ClientConn) error { + bg.logger.Infof("Adding child policy of type %q for locality %q", balancerName, id) + builder := balancer.Get(balancerName) + if builder == nil { + return fmt.Errorf("unregistered balancer name %q", balancerName) + } + // Store data in static map, and then check to see if bg is started. bg.outgoingMu.Lock() + defer bg.outgoingMu.Unlock() var sbc *subBalancerWrapper // If outgoingStarted is true, search in the cache. Otherwise, cache is // guaranteed to be empty, searching is unnecessary. @@ -326,7 +338,7 @@ func (bg *BalancerGroup) Add(id string, builder balancer.Builder) { } if sbc == nil { sbc = &subBalancerWrapper{ - ClientConn: bg.cc, + ClientConn: cc, id: id, group: bg, builder: builder, @@ -343,11 +355,18 @@ func (bg *BalancerGroup) Add(id string, builder balancer.Builder) { sbc.updateBalancerStateWithCachedPicker() } bg.idToBalancerConfig[id] = sbc - bg.outgoingMu.Unlock() + return nil +} + +// Add adds a balancer built by builder to the group, with given id. +func (bg *BalancerGroup) Add(id string, builder balancer.Builder) { + bg.AddWithClientConn(id, builder.Name(), bg.cc) } // UpdateBuilder updates the builder for a current child, starting the Graceful // Switch process for that child. +// +// TODO: update this API to take the name of the new builder instead. func (bg *BalancerGroup) UpdateBuilder(id string, builder balancer.Builder) { bg.outgoingMu.Lock() // This does not deal with the balancer cache because this call should come @@ -369,6 +388,7 @@ func (bg *BalancerGroup) UpdateBuilder(id string, builder balancer.Builder) { // closed after timeout. Cleanup work (closing sub-balancer and removing // subconns) will be done after timeout. func (bg *BalancerGroup) Remove(id string) { + bg.logger.Infof("Removing child policy for locality %q", id) bg.outgoingMu.Lock() if sbToRemove, ok := bg.idToBalancerConfig[id]; ok { if bg.outgoingStarted { diff --git a/internal/balancergroup/balancergroup_test.go b/internal/balancergroup/balancergroup_test.go index d962faa0ab8..57ab28895f1 100644 --- a/internal/balancergroup/balancergroup_test.go +++ b/internal/balancergroup/balancergroup_test.go @@ -374,11 +374,19 @@ func (s) TestBalancerGroup_locality_caching_not_readd_within_timeout(t *testing. } } -// Wrap the rr builder, so it behaves the same, but has a different pointer. +// Wrap the rr builder, so it behaves the same, but has a different name. type noopBalancerBuilderWrapper struct { balancer.Builder } +func init() { + balancer.Register(&noopBalancerBuilderWrapper{Builder: rrBuilder}) +} + +func (*noopBalancerBuilderWrapper) Name() string { + return "noopBalancerBuilderWrapper" +} + // After removing a sub-balancer, re-add with same ID, but different balancer // builder. Old subconns should be removed, and new subconns should be created. func (s) TestBalancerGroup_locality_caching_readd_with_different_builder(t *testing.T) { diff --git a/xds/internal/balancer/clusterimpl/clusterimpl.go b/xds/internal/balancer/clusterimpl/clusterimpl.go index 0a6cf6ca906..b79b941ec79 100644 --- a/xds/internal/balancer/clusterimpl/clusterimpl.go +++ b/xds/internal/balancer/clusterimpl/clusterimpl.go @@ -331,6 +331,7 @@ func (b *clusterImplBalancer) Close() { if b.childLB != nil { b.childLB.Close() b.childLB = nil + b.childState = balancer.State{} } <-b.done.Done() b.logger.Infof("Shutdown") diff --git a/xds/internal/balancer/clusterresolver/clusterresolver.go b/xds/internal/balancer/clusterresolver/clusterresolver.go index 9b373fb3697..b4a37f60c01 100644 --- a/xds/internal/balancer/clusterresolver/clusterresolver.go +++ b/xds/internal/balancer/clusterresolver/clusterresolver.go @@ -194,7 +194,6 @@ func (b *clusterResolverBalancer) handleWatchUpdate(update *resourceUpdate) { return } - b.logger.Infof("resource update: %+v", pretty.ToJSON(update.priorities)) b.watchUpdateReceived = true b.priorities = update.priorities diff --git a/xds/internal/balancer/priority/balancer.go b/xds/internal/balancer/priority/balancer.go index b5cace68496..17f57a576d3 100644 --- a/xds/internal/balancer/priority/balancer.go +++ b/xds/internal/balancer/priority/balancer.go @@ -127,7 +127,7 @@ func (b *priorityBalancer) UpdateClientConnState(s balancer.ClientConnState) err // This is a new child, add it to the children list. But note that // the balancer isn't built, because this child can be a low // priority. If necessary, it will be built when syncing priorities. - cb := newChildBalancer(name, b, bb) + cb := newChildBalancer(name, b, bb.Name(), b.cc) cb.updateConfig(newSubConfig, resolver.State{ Addresses: addressesSplit[name], ServiceConfig: s.ResolverState.ServiceConfig, @@ -141,9 +141,9 @@ func (b *priorityBalancer) UpdateClientConnState(s balancer.ClientConnState) err // The balancing policy name is changed, close the old child. But don't // rebuild, rebuild will happen when syncing priorities. - if currentChild.bb.Name() != bb.Name() { + if currentChild.balancerName != bb.Name() { currentChild.stop() - currentChild.updateBuilder(bb) + currentChild.updateBalancerName(bb.Name()) } // Update config and address, but note that this doesn't send the @@ -155,10 +155,11 @@ func (b *priorityBalancer) UpdateClientConnState(s balancer.ClientConnState) err Attributes: s.ResolverState.Attributes, }) } - // Remove child from children if it's not in new config. + // Cleanup resources used by children removed from the config. for name, oldChild := range b.children { if _, ok := newConfig.Children[name]; !ok { oldChild.stop() + delete(b.children, name) } } diff --git a/xds/internal/balancer/priority/balancer_child.go b/xds/internal/balancer/priority/balancer_child.go index 34bab34c915..ea7778bb56f 100644 --- a/xds/internal/balancer/priority/balancer_child.go +++ b/xds/internal/balancer/priority/balancer_child.go @@ -29,9 +29,11 @@ import ( ) type childBalancer struct { - name string - parent *priorityBalancer - bb *ignoreResolveNowBalancerBuilder + name string + parent *priorityBalancer + parentCC balancer.ClientConn + balancerName string + cc *ignoreResolveNowClientConn ignoreReresolutionRequests bool config serviceconfig.LoadBalancingConfig @@ -53,12 +55,14 @@ type childBalancer struct { // newChildBalancer creates a child balancer place holder, but doesn't // build/start the child balancer. -func newChildBalancer(name string, parent *priorityBalancer, bb balancer.Builder) *childBalancer { +func newChildBalancer(name string, parent *priorityBalancer, balancerName string, cc balancer.ClientConn) *childBalancer { return &childBalancer{ - name: name, - parent: parent, - bb: newIgnoreResolveNowBalancerBuilder(bb, false), - started: false, + name: name, + parent: parent, + parentCC: cc, + balancerName: balancerName, + cc: newIgnoreResolveNowClientConn(cc, false), + started: false, // Start with the connecting state and picker with re-pick error, so // that when a priority switch causes this child picked before it's // balancing policy is created, a re-pick will happen. @@ -69,9 +73,13 @@ func newChildBalancer(name string, parent *priorityBalancer, bb balancer.Builder } } -// updateBuilder updates builder for the child, but doesn't build. -func (cb *childBalancer) updateBuilder(bb balancer.Builder) { - cb.bb = newIgnoreResolveNowBalancerBuilder(bb, cb.ignoreReresolutionRequests) +// updateBalancerName updates balancer name for the child, but doesn't build a +// new one. The parent priority LB always closes the child policy before +// updating the balancer name, and the new balancer is built when it gets added +// to the balancergroup as part of start(). +func (cb *childBalancer) updateBalancerName(balancerName string) { + cb.balancerName = balancerName + cb.cc = newIgnoreResolveNowClientConn(cb.parentCC, cb.ignoreReresolutionRequests) } // updateConfig sets childBalancer's config and state, but doesn't send update to @@ -93,14 +101,14 @@ func (cb *childBalancer) start() { return } cb.started = true - cb.parent.bg.Add(cb.name, cb.bb) + cb.parent.bg.AddWithClientConn(cb.name, cb.balancerName, cb.cc) cb.startInitTimer() cb.sendUpdate() } // sendUpdate sends the addresses and config to the child balancer. func (cb *childBalancer) sendUpdate() { - cb.bb.updateIgnoreResolveNow(cb.ignoreReresolutionRequests) + cb.cc.updateIgnoreResolveNow(cb.ignoreReresolutionRequests) // TODO: return and aggregate the returned error in the parent. err := cb.parent.bg.UpdateClientConnState(cb.name, balancer.ClientConnState{ ResolverState: cb.rState, diff --git a/xds/internal/balancer/priority/balancer_priority.go b/xds/internal/balancer/priority/balancer_priority.go index c12dfe47ffe..ca6f118c5cc 100644 --- a/xds/internal/balancer/priority/balancer_priority.go +++ b/xds/internal/balancer/priority/balancer_priority.go @@ -162,6 +162,10 @@ func (b *priorityBalancer) handleChildStateUpdate(childName string, s balancer.S b.logger.Warningf("priority: child balancer not found for child %v", childName) return } + if !child.started { + b.logger.Warningf("priority: ignoring update from child %q which is not in started state: %+v", childName, s) + return + } child.state = s // We start/stop the init timer of this child based on the new connectivity diff --git a/xds/internal/balancer/priority/balancer_test.go b/xds/internal/balancer/priority/balancer_test.go index d119076d1af..5e29edf6698 100644 --- a/xds/internal/balancer/priority/balancer_test.go +++ b/xds/internal/balancer/priority/balancer_test.go @@ -37,7 +37,10 @@ import ( "google.golang.org/grpc/resolver" ) -const defaultTestTimeout = 5 * time.Second +const ( + defaultTestTimeout = 5 * time.Second + defaultTestShortTimeout = 100 * time.Millisecond +) type s struct { grpctest.Tester @@ -1525,11 +1528,21 @@ func (s) TestPriority_ChildPolicyUpdatePickerInline(t *testing.T) { } } -// When the child policy's configured to ignore reresolution requests, the -// ResolveNow() calls from this child should be all ignored. +// TestPriority_IgnoreReresolutionRequest tests the case where the priority +// policy has a single child policy. The test verifies that ResolveNow() calls +// from the child policy are ignored based on the value of the +// IgnoreReresolutionRequests field in the configuration. func (s) TestPriority_IgnoreReresolutionRequest(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() + // Register a stub balancer to act the child policy of the priority policy. + // Provide an init function to the stub balancer to capture the ClientConn + // passed to the child policy. + ccCh := testutils.NewChannel() + childPolicyName := t.Name() + stub.Register(childPolicyName, stub.BalancerFuncs{ + Init: func(data *stub.BalancerData) { + ccCh.Send(data.ClientConn) + }, + }) cc := testutils.NewTestClientConn(t) bb := balancer.Get(Name) @@ -1547,7 +1560,7 @@ func (s) TestPriority_IgnoreReresolutionRequest(t *testing.T) { BalancerConfig: &LBConfig{ Children: map[string]*Child{ "child-0": { - Config: &internalserviceconfig.BalancerConfig{Name: resolveNowBalancerName}, + Config: &internalserviceconfig.BalancerConfig{Name: childPolicyName}, IgnoreReresolutionRequests: true, }, }, @@ -1557,13 +1570,14 @@ func (s) TestPriority_IgnoreReresolutionRequest(t *testing.T) { t.Fatalf("failed to update ClientConn state: %v", err) } - // This is the balancer.ClientConn that the inner resolverNowBalancer is - // built with. - balancerCCI, err := resolveNowBalancerCCCh.Receive(ctx) + // Retrieve the ClientConn passed to the child policy. + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + val, err := ccCh.Receive(ctx) if err != nil { - t.Fatalf("timeout waiting for ClientConn from balancer builder") + t.Fatalf("timeout waiting for ClientConn from the child policy") } - balancerCC := balancerCCI.(balancer.ClientConn) + balancerCC := val.(balancer.ClientConn) // Since IgnoreReresolutionRequests was set to true, all ResolveNow() calls // should be ignored. @@ -1573,7 +1587,7 @@ func (s) TestPriority_IgnoreReresolutionRequest(t *testing.T) { select { case <-cc.ResolveNowCh: t.Fatalf("got unexpected ResolveNow() call") - case <-time.After(time.Millisecond * 100): + case <-time.After(defaultTestShortTimeout): } // Send another update to set IgnoreReresolutionRequests to false. @@ -1586,7 +1600,7 @@ func (s) TestPriority_IgnoreReresolutionRequest(t *testing.T) { BalancerConfig: &LBConfig{ Children: map[string]*Child{ "child-0": { - Config: &internalserviceconfig.BalancerConfig{Name: resolveNowBalancerName}, + Config: &internalserviceconfig.BalancerConfig{Name: childPolicyName}, IgnoreReresolutionRequests: false, }, }, @@ -1606,12 +1620,38 @@ func (s) TestPriority_IgnoreReresolutionRequest(t *testing.T) { } -// When the child policy's configured to ignore reresolution requests, the -// ResolveNow() calls from this child should be all ignored, from the other -// children are forwarded. +type wrappedRoundRobinBalancerBuilder struct { + name string + ccCh *testutils.Channel +} + +func (w *wrappedRoundRobinBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer { + w.ccCh.Send(cc) + rrBuilder := balancer.Get(roundrobin.Name) + return &wrappedRoundRobinBalancer{Balancer: rrBuilder.Build(cc, opts)} +} + +func (w *wrappedRoundRobinBalancerBuilder) Name() string { + return w.name +} + +type wrappedRoundRobinBalancer struct { + balancer.Balancer +} + +// TestPriority_IgnoreReresolutionRequestTwoChildren tests the case where the +// priority policy has two child policies, one of them has the +// IgnoreReresolutionRequests field set to true while the other one has it set +// to false. The test verifies that ResolveNow() calls from the child which is +// set to ignore reresolution requests are ignored, while calls from the other +// child are processed. func (s) TestPriority_IgnoreReresolutionRequestTwoChildren(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() + // Register a wrapping balancer to act the child policy of the priority + // policy. The wrapping balancer builder's Build() method pushes the + // balancer.ClientConn on a channel for this test to use. + ccCh := testutils.NewChannel() + childPolicyName := t.Name() + balancer.Register(&wrappedRoundRobinBalancerBuilder{name: childPolicyName, ccCh: ccCh}) cc := testutils.NewTestClientConn(t) bb := balancer.Get(Name) @@ -1630,11 +1670,11 @@ func (s) TestPriority_IgnoreReresolutionRequestTwoChildren(t *testing.T) { BalancerConfig: &LBConfig{ Children: map[string]*Child{ "child-0": { - Config: &internalserviceconfig.BalancerConfig{Name: resolveNowBalancerName}, + Config: &internalserviceconfig.BalancerConfig{Name: childPolicyName}, IgnoreReresolutionRequests: true, }, "child-1": { - Config: &internalserviceconfig.BalancerConfig{Name: resolveNowBalancerName}, + Config: &internalserviceconfig.BalancerConfig{Name: childPolicyName}, }, }, Priorities: []string{"child-0", "child-1"}, @@ -1643,12 +1683,14 @@ func (s) TestPriority_IgnoreReresolutionRequestTwoChildren(t *testing.T) { t.Fatalf("failed to update ClientConn state: %v", err) } - // This is the balancer.ClientConn from p0. - balancerCCI0, err := resolveNowBalancerCCCh.Receive(ctx) + // Retrieve the ClientConn passed to the child policy from p0. + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + val, err := ccCh.Receive(ctx) if err != nil { - t.Fatalf("timeout waiting for ClientConn from balancer builder 0") + t.Fatalf("timeout waiting for ClientConn from the child policy") } - balancerCC0 := balancerCCI0.(balancer.ClientConn) + balancerCC0 := val.(balancer.ClientConn) // Set p0 to transient failure, p1 will be started. addrs0 := <-cc.NewSubConnAddrsCh @@ -1658,14 +1700,12 @@ func (s) TestPriority_IgnoreReresolutionRequestTwoChildren(t *testing.T) { sc0 := <-cc.NewSubConnCh pb.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure}) - // This is the balancer.ClientConn from p1. - ctx1, cancel1 := context.WithTimeout(context.Background(), time.Second) - defer cancel1() - balancerCCI1, err := resolveNowBalancerCCCh.Receive(ctx1) + // Retrieve the ClientConn passed to the child policy from p1. + val, err = ccCh.Receive(ctx) if err != nil { - t.Fatalf("timeout waiting for ClientConn from balancer builder 1") + t.Fatalf("timeout waiting for ClientConn from the child policy") } - balancerCC1 := balancerCCI1.(balancer.ClientConn) + balancerCC1 := val.(balancer.ClientConn) // Since IgnoreReresolutionRequests was set to true for p0, ResolveNow() // from p0 should all be ignored. @@ -1675,7 +1715,7 @@ func (s) TestPriority_IgnoreReresolutionRequestTwoChildren(t *testing.T) { select { case <-cc.ResolveNowCh: t.Fatalf("got unexpected ResolveNow() call") - case <-time.After(time.Millisecond * 100): + case <-time.After(defaultTestShortTimeout): } // But IgnoreReresolutionRequests was false for p1, ResolveNow() from p1 @@ -1683,7 +1723,7 @@ func (s) TestPriority_IgnoreReresolutionRequestTwoChildren(t *testing.T) { balancerCC1.ResolveNow(resolver.ResolveNowOptions{}) select { case <-cc.ResolveNowCh: - case <-time.After(time.Second): + case <-time.After(defaultTestShortTimeout): t.Fatalf("timeout waiting for ResolveNow()") } } diff --git a/xds/internal/balancer/priority/ignore_resolve_now.go b/xds/internal/balancer/priority/ignore_resolve_now.go index 9a9f4777269..792ee4b3f24 100644 --- a/xds/internal/balancer/priority/ignore_resolve_now.go +++ b/xds/internal/balancer/priority/ignore_resolve_now.go @@ -25,46 +25,31 @@ import ( "google.golang.org/grpc/resolver" ) -type ignoreResolveNowBalancerBuilder struct { - balancer.Builder +// ignoreResolveNowClientConn wraps a balancer.ClientConn and overrides the +// ResolveNow() method to ignore those calls if the ignoreResolveNow bit is set. +type ignoreResolveNowClientConn struct { + balancer.ClientConn ignoreResolveNow *uint32 } -// If `ignore` is true, all `ResolveNow()` from the balancer built from this -// builder will be ignored. -// -// `ignore` can be updated later by `updateIgnoreResolveNow`, and the update -// will be propagated to all the old and new balancers built with this. -func newIgnoreResolveNowBalancerBuilder(bb balancer.Builder, ignore bool) *ignoreResolveNowBalancerBuilder { - ret := &ignoreResolveNowBalancerBuilder{ - Builder: bb, +func newIgnoreResolveNowClientConn(cc balancer.ClientConn, ignore bool) *ignoreResolveNowClientConn { + ret := &ignoreResolveNowClientConn{ + ClientConn: cc, ignoreResolveNow: new(uint32), } ret.updateIgnoreResolveNow(ignore) return ret } -func (irnbb *ignoreResolveNowBalancerBuilder) updateIgnoreResolveNow(b bool) { +func (i *ignoreResolveNowClientConn) updateIgnoreResolveNow(b bool) { if b { - atomic.StoreUint32(irnbb.ignoreResolveNow, 1) + atomic.StoreUint32(i.ignoreResolveNow, 1) return } - atomic.StoreUint32(irnbb.ignoreResolveNow, 0) + atomic.StoreUint32(i.ignoreResolveNow, 0) } -func (irnbb *ignoreResolveNowBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer { - return irnbb.Builder.Build(&ignoreResolveNowClientConn{ - ClientConn: cc, - ignoreResolveNow: irnbb.ignoreResolveNow, - }, opts) -} - -type ignoreResolveNowClientConn struct { - balancer.ClientConn - ignoreResolveNow *uint32 -} - func (i ignoreResolveNowClientConn) ResolveNow(o resolver.ResolveNowOptions) { if atomic.LoadUint32(i.ignoreResolveNow) != 0 { return diff --git a/xds/internal/balancer/priority/ignore_resolve_now_test.go b/xds/internal/balancer/priority/ignore_resolve_now_test.go index 29b719d9e12..5a008314788 100644 --- a/xds/internal/balancer/priority/ignore_resolve_now_test.go +++ b/xds/internal/balancer/priority/ignore_resolve_now_test.go @@ -23,81 +23,44 @@ import ( "testing" "time" - "google.golang.org/grpc/balancer" - "google.golang.org/grpc/balancer/roundrobin" "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/resolver" ) -const resolveNowBalancerName = "test-resolve-now-balancer" - -var resolveNowBalancerCCCh = testutils.NewChannel() - -type resolveNowBalancerBuilder struct { - balancer.Builder -} - -func (r *resolveNowBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer { - resolveNowBalancerCCCh.Send(cc) - return r.Builder.Build(cc, opts) -} - -func (r *resolveNowBalancerBuilder) Name() string { - return resolveNowBalancerName -} - -func init() { - balancer.Register(&resolveNowBalancerBuilder{ - Builder: balancer.Get(roundrobin.Name), - }) -} - -func (s) TestIgnoreResolveNowBalancerBuilder(t *testing.T) { - resolveNowBB := balancer.Get(resolveNowBalancerName) - // Create a build wrapper, but will not ignore ResolveNow(). - ignoreResolveNowBB := newIgnoreResolveNowBalancerBuilder(resolveNowBB, false) - +func (s) TestIgnoreResolveNowClientConn(t *testing.T) { cc := testutils.NewTestClientConn(t) - tb := ignoreResolveNowBB.Build(cc, balancer.BuildOptions{}) - defer tb.Close() + ignoreCC := newIgnoreResolveNowClientConn(cc, false) - ctx, cancel := context.WithTimeout(context.Background(), time.Second) + // Call ResolveNow() on the CC, it should be forwarded. + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - // This is the balancer.ClientConn that the inner resolverNowBalancer is - // built with. - balancerCCI, err := resolveNowBalancerCCCh.Receive(ctx) - if err != nil { - t.Fatalf("timeout waiting for ClientConn from balancer builder") - } - balancerCC := balancerCCI.(balancer.ClientConn) - // Call ResolveNow() on the CC, it should be forwarded. - balancerCC.ResolveNow(resolver.ResolveNowOptions{}) + ignoreCC.ResolveNow(resolver.ResolveNowOptions{}) select { case <-cc.ResolveNowCh: - case <-time.After(time.Second): - t.Fatalf("timeout waiting for ResolveNow()") + case <-ctx.Done(): + t.Fatalf("Timeout waiting for ResolveNow()") } // Update ignoreResolveNow to true, call ResolveNow() on the CC, they should // all be ignored. - ignoreResolveNowBB.updateIgnoreResolveNow(true) + ignoreCC.updateIgnoreResolveNow(true) for i := 0; i < 5; i++ { - balancerCC.ResolveNow(resolver.ResolveNowOptions{}) + ignoreCC.ResolveNow(resolver.ResolveNowOptions{}) } select { case <-cc.ResolveNowCh: t.Fatalf("got unexpected ResolveNow() call") - case <-time.After(time.Millisecond * 100): + case <-time.After(defaultTestShortTimeout): } // Update ignoreResolveNow to false, new ResolveNow() calls should be // forwarded. - ignoreResolveNowBB.updateIgnoreResolveNow(false) - balancerCC.ResolveNow(resolver.ResolveNowOptions{}) + ignoreCC.updateIgnoreResolveNow(false) + ignoreCC.ResolveNow(resolver.ResolveNowOptions{}) select { case <-cc.ResolveNowCh: - case <-time.After(time.Second): + case <-ctx.Done(): t.Fatalf("timeout waiting for ResolveNow()") } } diff --git a/xds/internal/balancer/ringhash/ringhash.go b/xds/internal/balancer/ringhash/ringhash.go index 59ccd0127a2..eaa4d2638dd 100644 --- a/xds/internal/balancer/ringhash/ringhash.go +++ b/xds/internal/balancer/ringhash/ringhash.go @@ -441,7 +441,9 @@ func (b *ringhashBalancer) regeneratePicker() { b.picker = newPicker(b.ring, b.logger) } -func (b *ringhashBalancer) Close() {} +func (b *ringhashBalancer) Close() { + b.logger.Infof("Shutdown") +} func (b *ringhashBalancer) ExitIdle() { // ExitIdle implementation is a no-op because connections are either