Skip to content

Commit

Permalink
priority: release references to child policies which are removed (#5682)
Browse files Browse the repository at this point in the history
  • Loading branch information
easwars committed Oct 6, 2022
1 parent 5fc798b commit c03925d
Show file tree
Hide file tree
Showing 11 changed files with 163 additions and 132 deletions.
30 changes: 25 additions & 5 deletions internal/balancergroup/balancergroup.go
Expand Up @@ -97,7 +97,7 @@ func (sbc *subBalancerWrapper) startBalancer() {
if sbc.balancer == nil {
sbc.balancer = gracefulswitch.NewBalancer(sbc, sbc.buildOpts)
}
sbc.group.logger.Infof("Creating child policy of type %v", sbc.builder.Name())
sbc.group.logger.Infof("Creating child policy of type %q for locality %q", sbc.builder.Name(), sbc.id)
sbc.balancer.SwitchTo(sbc.builder)
if sbc.ccState != nil {
sbc.balancer.UpdateClientConnState(*sbc.ccState)
Expand Down Expand Up @@ -298,10 +298,22 @@ func (bg *BalancerGroup) Start() {
bg.outgoingMu.Unlock()
}

// Add adds a balancer built by builder to the group, with given id.
func (bg *BalancerGroup) Add(id string, builder balancer.Builder) {
// AddWithClientConn adds a balancer with the given id to the group. The
// balancer is built with a balancer builder registered with balancerName. The
// given ClientConn is passed to the newly built balancer instead of the
// onepassed to balancergroup.New().
//
// TODO: Get rid of the existing Add() API and replace it with this.
func (bg *BalancerGroup) AddWithClientConn(id, balancerName string, cc balancer.ClientConn) error {
bg.logger.Infof("Adding child policy of type %q for locality %q", balancerName, id)
builder := balancer.Get(balancerName)
if builder == nil {
return fmt.Errorf("unregistered balancer name %q", balancerName)
}

// Store data in static map, and then check to see if bg is started.
bg.outgoingMu.Lock()
defer bg.outgoingMu.Unlock()
var sbc *subBalancerWrapper
// If outgoingStarted is true, search in the cache. Otherwise, cache is
// guaranteed to be empty, searching is unnecessary.
Expand All @@ -326,7 +338,7 @@ func (bg *BalancerGroup) Add(id string, builder balancer.Builder) {
}
if sbc == nil {
sbc = &subBalancerWrapper{
ClientConn: bg.cc,
ClientConn: cc,
id: id,
group: bg,
builder: builder,
Expand All @@ -343,11 +355,18 @@ func (bg *BalancerGroup) Add(id string, builder balancer.Builder) {
sbc.updateBalancerStateWithCachedPicker()
}
bg.idToBalancerConfig[id] = sbc
bg.outgoingMu.Unlock()
return nil
}

// Add adds a balancer built by builder to the group, with given id.
func (bg *BalancerGroup) Add(id string, builder balancer.Builder) {
bg.AddWithClientConn(id, builder.Name(), bg.cc)
}

// UpdateBuilder updates the builder for a current child, starting the Graceful
// Switch process for that child.
//
// TODO: update this API to take the name of the new builder instead.
func (bg *BalancerGroup) UpdateBuilder(id string, builder balancer.Builder) {
bg.outgoingMu.Lock()
// This does not deal with the balancer cache because this call should come
Expand All @@ -369,6 +388,7 @@ func (bg *BalancerGroup) UpdateBuilder(id string, builder balancer.Builder) {
// closed after timeout. Cleanup work (closing sub-balancer and removing
// subconns) will be done after timeout.
func (bg *BalancerGroup) Remove(id string) {
bg.logger.Infof("Removing child policy for locality %q", id)
bg.outgoingMu.Lock()
if sbToRemove, ok := bg.idToBalancerConfig[id]; ok {
if bg.outgoingStarted {
Expand Down
10 changes: 9 additions & 1 deletion internal/balancergroup/balancergroup_test.go
Expand Up @@ -374,11 +374,19 @@ func (s) TestBalancerGroup_locality_caching_not_readd_within_timeout(t *testing.
}
}

// Wrap the rr builder, so it behaves the same, but has a different pointer.
// Wrap the rr builder, so it behaves the same, but has a different name.
type noopBalancerBuilderWrapper struct {
balancer.Builder
}

func init() {
balancer.Register(&noopBalancerBuilderWrapper{Builder: rrBuilder})
}

func (*noopBalancerBuilderWrapper) Name() string {
return "noopBalancerBuilderWrapper"
}

// After removing a sub-balancer, re-add with same ID, but different balancer
// builder. Old subconns should be removed, and new subconns should be created.
func (s) TestBalancerGroup_locality_caching_readd_with_different_builder(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions xds/internal/balancer/clusterimpl/clusterimpl.go
Expand Up @@ -331,6 +331,7 @@ func (b *clusterImplBalancer) Close() {
if b.childLB != nil {
b.childLB.Close()
b.childLB = nil
b.childState = balancer.State{}
}
<-b.done.Done()
b.logger.Infof("Shutdown")
Expand Down
1 change: 0 additions & 1 deletion xds/internal/balancer/clusterresolver/clusterresolver.go
Expand Up @@ -194,7 +194,6 @@ func (b *clusterResolverBalancer) handleWatchUpdate(update *resourceUpdate) {
return
}

b.logger.Infof("resource update: %+v", pretty.ToJSON(update.priorities))
b.watchUpdateReceived = true
b.priorities = update.priorities

Expand Down
9 changes: 5 additions & 4 deletions xds/internal/balancer/priority/balancer.go
Expand Up @@ -127,7 +127,7 @@ func (b *priorityBalancer) UpdateClientConnState(s balancer.ClientConnState) err
// This is a new child, add it to the children list. But note that
// the balancer isn't built, because this child can be a low
// priority. If necessary, it will be built when syncing priorities.
cb := newChildBalancer(name, b, bb)
cb := newChildBalancer(name, b, bb.Name(), b.cc)
cb.updateConfig(newSubConfig, resolver.State{
Addresses: addressesSplit[name],
ServiceConfig: s.ResolverState.ServiceConfig,
Expand All @@ -141,9 +141,9 @@ func (b *priorityBalancer) UpdateClientConnState(s balancer.ClientConnState) err

// The balancing policy name is changed, close the old child. But don't
// rebuild, rebuild will happen when syncing priorities.
if currentChild.bb.Name() != bb.Name() {
if currentChild.balancerName != bb.Name() {
currentChild.stop()
currentChild.updateBuilder(bb)
currentChild.updateBalancerName(bb.Name())
}

// Update config and address, but note that this doesn't send the
Expand All @@ -155,10 +155,11 @@ func (b *priorityBalancer) UpdateClientConnState(s balancer.ClientConnState) err
Attributes: s.ResolverState.Attributes,
})
}
// Remove child from children if it's not in new config.
// Cleanup resources used by children removed from the config.
for name, oldChild := range b.children {
if _, ok := newConfig.Children[name]; !ok {
oldChild.stop()
delete(b.children, name)
}
}

Expand Down
34 changes: 21 additions & 13 deletions xds/internal/balancer/priority/balancer_child.go
Expand Up @@ -29,9 +29,11 @@ import (
)

type childBalancer struct {
name string
parent *priorityBalancer
bb *ignoreResolveNowBalancerBuilder
name string
parent *priorityBalancer
parentCC balancer.ClientConn
balancerName string
cc *ignoreResolveNowClientConn

ignoreReresolutionRequests bool
config serviceconfig.LoadBalancingConfig
Expand All @@ -53,12 +55,14 @@ type childBalancer struct {

// newChildBalancer creates a child balancer place holder, but doesn't
// build/start the child balancer.
func newChildBalancer(name string, parent *priorityBalancer, bb balancer.Builder) *childBalancer {
func newChildBalancer(name string, parent *priorityBalancer, balancerName string, cc balancer.ClientConn) *childBalancer {
return &childBalancer{
name: name,
parent: parent,
bb: newIgnoreResolveNowBalancerBuilder(bb, false),
started: false,
name: name,
parent: parent,
parentCC: cc,
balancerName: balancerName,
cc: newIgnoreResolveNowClientConn(cc, false),
started: false,
// Start with the connecting state and picker with re-pick error, so
// that when a priority switch causes this child picked before it's
// balancing policy is created, a re-pick will happen.
Expand All @@ -69,9 +73,13 @@ func newChildBalancer(name string, parent *priorityBalancer, bb balancer.Builder
}
}

// updateBuilder updates builder for the child, but doesn't build.
func (cb *childBalancer) updateBuilder(bb balancer.Builder) {
cb.bb = newIgnoreResolveNowBalancerBuilder(bb, cb.ignoreReresolutionRequests)
// updateBalancerName updates balancer name for the child, but doesn't build a
// new one. The parent priority LB always closes the child policy before
// updating the balancer name, and the new balancer is built when it gets added
// to the balancergroup as part of start().
func (cb *childBalancer) updateBalancerName(balancerName string) {
cb.balancerName = balancerName
cb.cc = newIgnoreResolveNowClientConn(cb.parentCC, cb.ignoreReresolutionRequests)
}

// updateConfig sets childBalancer's config and state, but doesn't send update to
Expand All @@ -93,14 +101,14 @@ func (cb *childBalancer) start() {
return
}
cb.started = true
cb.parent.bg.Add(cb.name, cb.bb)
cb.parent.bg.AddWithClientConn(cb.name, cb.balancerName, cb.cc)
cb.startInitTimer()
cb.sendUpdate()
}

// sendUpdate sends the addresses and config to the child balancer.
func (cb *childBalancer) sendUpdate() {
cb.bb.updateIgnoreResolveNow(cb.ignoreReresolutionRequests)
cb.cc.updateIgnoreResolveNow(cb.ignoreReresolutionRequests)
// TODO: return and aggregate the returned error in the parent.
err := cb.parent.bg.UpdateClientConnState(cb.name, balancer.ClientConnState{
ResolverState: cb.rState,
Expand Down
4 changes: 4 additions & 0 deletions xds/internal/balancer/priority/balancer_priority.go
Expand Up @@ -162,6 +162,10 @@ func (b *priorityBalancer) handleChildStateUpdate(childName string, s balancer.S
b.logger.Warningf("priority: child balancer not found for child %v", childName)
return
}
if !child.started {
b.logger.Warningf("priority: ignoring update from child %q which is not in started state: %+v", childName, s)
return
}
child.state = s

// We start/stop the init timer of this child based on the new connectivity
Expand Down

0 comments on commit c03925d

Please sign in to comment.