Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds/priority: avoid sending duplicate updates to children #5374

Merged
merged 1 commit into from May 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion xds/internal/balancer/priority/balancer.go
Expand Up @@ -170,7 +170,7 @@ func (b *priorityBalancer) UpdateClientConnState(s balancer.ClientConnState) err
}
// Sync the states of all children to the new updated priorities. This
// include starting/stopping child balancers when necessary.
b.syncPriority()
b.syncPriority(true)

return nil
}
Expand Down
2 changes: 1 addition & 1 deletion xds/internal/balancer/priority/balancer_child.go
Expand Up @@ -145,7 +145,7 @@ func (cb *childBalancer) startInitTimer() {
// Re-sync the priority. This will switch to the next priority if
// there's any. Note that it's important sync() is called after setting
// initTimer to nil.
cb.parent.syncPriority()
cb.parent.syncPriority(false)
})
}

Expand Down
14 changes: 11 additions & 3 deletions xds/internal/balancer/priority/balancer_priority.go
Expand Up @@ -68,7 +68,7 @@ var (
// - forward the new addresses and config
//
// Caller must hold b.mu.
func (b *priorityBalancer) syncPriority() {
func (b *priorityBalancer) syncPriority(forceUpdate bool) {
// Everything was removed by the update.
if len(b.priorities) == 0 {
b.childInUse = ""
Expand Down Expand Up @@ -99,8 +99,16 @@ func (b *priorityBalancer) syncPriority() {
b.cc.UpdateState(child.state)
}
b.logger.Infof("switching to (%q, %v) in syncPriority", child.name, p)
oldChildInUse := b.childInUse
b.switchToChild(child, p)
child.sendUpdate()
if b.childInUse != oldChildInUse || forceUpdate {
// If child is switched, send the update to the new child.
//
// Or if forceUpdate is true (when this is triggered by a
// ClientConn update), because the ClientConn update might
// contain changes for this child.
child.sendUpdate()
}
break
}
}
Expand Down Expand Up @@ -220,7 +228,7 @@ func (b *priorityBalancer) handleChildStateUpdate(childName string, s balancer.S
}

oldPriorityInUse := b.priorityInUse
child.parent.syncPriority()
child.parent.syncPriority(false)
// If child is switched by syncPriority(), it also sends the update from the
// new child to overwrite the old picker used by the parent.
//
Expand Down
16 changes: 0 additions & 16 deletions xds/internal/balancer/priority/balancer_test.go
Expand Up @@ -1854,22 +1854,6 @@ func (s) TestPriority_HighPriorityInitIdle(t *testing.T) {
t.Fatalf("pick returned %v, %v, want _, %v", pr, err, errsTestInitIdle[0])
}

// The ClientConn state update triggers a priority switch, from p0 -> p0
// (since p0 is still in use). Along with this the update, p0 also gets a
// ClientConn state update, with the addresses, which didn't change in this
// test (this update to the child is necessary in case the addresses are
// different).
//
// The test child policy, initIdleBalancer, blindly calls NewSubConn with
// all the addresses it receives, so this will trigger a NewSubConn with the
// old p0 addresses. (Note that in a real balancer, like roundrobin, no new
// SubConn will be created because the addresses didn't change).
//
// Clear those from the channel so the rest of the test can get the expected
// behavior.
<-cc.NewSubConnAddrsCh
<-cc.NewSubConnCh

// Turn p0 down, to start p1.
pb.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
// Before 1 gets READY, picker should return NoSubConnAvailable, so RPCs
Expand Down