Skip to content

Commit

Permalink
xds/clustermanager: pause picker updates during UpdateClientConnState (
Browse files Browse the repository at this point in the history
  • Loading branch information
dfawley committed Jul 21, 2022
1 parent 86117db commit fdc5d2f
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 0 deletions.
38 changes: 38 additions & 0 deletions xds/internal/balancer/clustermanager/balancerstateaggregator.go
Expand Up @@ -57,6 +57,11 @@ type balancerStateAggregator struct {
//
// If an ID is not in map, it's either removed or never added.
idToPickerState map[string]*subBalancerState
// Set when UpdateState call propagation is paused.
pauseUpdateState bool
// Set when UpdateState call propagation is paused and an UpdateState call
// is suppressed.
needUpdateStateOnResume bool
}

func newBalancerStateAggregator(cc balancer.ClientConn, logger *grpclog.PrefixLogger) *balancerStateAggregator {
Expand Down Expand Up @@ -118,6 +123,27 @@ func (bsa *balancerStateAggregator) remove(id string) {
delete(bsa.idToPickerState, id)
}

// pauseStateUpdates causes UpdateState calls to not propagate to the parent
// ClientConn. The last state will be remembered and propagated when
// ResumeStateUpdates is called.
func (bsa *balancerStateAggregator) pauseStateUpdates() {
bsa.mu.Lock()
defer bsa.mu.Unlock()
bsa.pauseUpdateState = true
bsa.needUpdateStateOnResume = false
}

// resumeStateUpdates will resume propagating UpdateState calls to the parent,
// and call UpdateState on the parent if any UpdateState call was suppressed.
func (bsa *balancerStateAggregator) resumeStateUpdates() {
bsa.mu.Lock()
defer bsa.mu.Unlock()
bsa.pauseUpdateState = false
if bsa.needUpdateStateOnResume {
bsa.cc.UpdateState(bsa.build())
}
}

// UpdateState is called to report a balancer state change from sub-balancer.
// It's usually called by the balancer group.
//
Expand All @@ -143,6 +169,12 @@ func (bsa *balancerStateAggregator) UpdateState(id string, state balancer.State)
if !bsa.started {
return
}
if bsa.pauseUpdateState {
// If updates are paused, do not call UpdateState, but remember that we
// need to call it when they are resumed.
bsa.needUpdateStateOnResume = true
return
}
bsa.cc.UpdateState(bsa.build())
}

Expand All @@ -168,6 +200,12 @@ func (bsa *balancerStateAggregator) buildAndUpdate() {
if !bsa.started {
return
}
if bsa.pauseUpdateState {
// If updates are paused, do not call UpdateState, but remember that we
// need to call it when they are resumed.
bsa.needUpdateStateOnResume = true
return
}
bsa.cc.UpdateState(bsa.build())
}

Expand Down
2 changes: 2 additions & 0 deletions xds/internal/balancer/clustermanager/clustermanager.go
Expand Up @@ -123,6 +123,8 @@ func (b *bal) UpdateClientConnState(s balancer.ClientConnState) error {
}
b.logger.Infof("update with config %+v, resolver state %+v", pretty.ToJSON(s.BalancerConfig), s.ResolverState)

b.stateAggregator.pauseStateUpdates()
defer b.stateAggregator.resumeStateUpdates()
b.updateChildren(s, newConfig)
return nil
}
Expand Down
56 changes: 56 additions & 0 deletions xds/internal/balancer/clustermanager/clustermanager_test.go
Expand Up @@ -763,3 +763,59 @@ func (wb *wrappedPickFirstBalancer) UpdateState(state balancer.State) {
}
wb.ClientConn.UpdateState(state)
}

// tcc wraps a testutils.TestClientConn but stores all state transitions in a
// slice.
type tcc struct {
*testutils.TestClientConn
states []balancer.State
}

func (t *tcc) UpdateState(bs balancer.State) {
t.states = append(t.states, bs)
t.TestClientConn.UpdateState(bs)
}

func (s) TestUpdateStatePauses(t *testing.T) {
cc := &tcc{TestClientConn: testutils.NewTestClientConn(t)}

balFuncs := stub.BalancerFuncs{
UpdateClientConnState: func(bd *stub.BalancerData, s balancer.ClientConnState) error {
bd.ClientConn.UpdateState(balancer.State{ConnectivityState: connectivity.TransientFailure, Picker: nil})
bd.ClientConn.UpdateState(balancer.State{ConnectivityState: connectivity.Ready, Picker: nil})
return nil
},
}
stub.Register("update_state_balancer", balFuncs)

rtb := rtBuilder.Build(cc, balancer.BuildOptions{})

configJSON1 := `{
"children": {
"cds:cluster_1":{ "childPolicy": [{"update_state_balancer":""}] }
}
}`

config1, err := rtParser.ParseConfig([]byte(configJSON1))
if err != nil {
t.Fatalf("failed to parse balancer config: %v", err)
}

// Send the config, and an address with hierarchy path ["cluster_1"].
wantAddrs := []resolver.Address{
{Addr: testBackendAddrStrs[0], BalancerAttributes: nil},
}
if err := rtb.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{Addresses: []resolver.Address{
hierarchy.Set(wantAddrs[0], []string{"cds:cluster_1"}),
}},
BalancerConfig: config1,
}); err != nil {
t.Fatalf("failed to update ClientConn state: %v", err)
}

// Verify that the only state update is the second one called by the child.
if len(cc.states) != 1 || cc.states[0].ConnectivityState != connectivity.Ready {
t.Fatalf("cc.states = %v; want [connectivity.Ready]", cc.states)
}
}

0 comments on commit fdc5d2f

Please sign in to comment.