Skip to content

Commit

Permalink
priority: forward the first IDLE state and picker (#4731)
Browse files Browse the repository at this point in the history
  • Loading branch information
menghanl committed Sep 7, 2021
1 parent 0ca7dca commit c99a9c1
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 0 deletions.
24 changes: 24 additions & 0 deletions xds/internal/balancer/priority/balancer_priority.go
Expand Up @@ -231,6 +231,8 @@ func (b *priorityBalancer) handleChildStateUpdate(childName string, s balancer.S
b.handlePriorityWithNewStateTransientFailure(child, priority)
case connectivity.Connecting:
b.handlePriorityWithNewStateConnecting(child, priority, oldState)
case connectivity.Idle:
b.handlePriorityWithNewStateIdle(child, priority)
default:
// New state is Idle, should never happen. Don't forward.
}
Expand Down Expand Up @@ -356,3 +358,25 @@ func (b *priorityBalancer) handlePriorityWithNewStateConnecting(child *childBala
// Old state is Connecting, TransientFailure or Shutdown. Don't forward.
}
}

// handlePriorityWithNewStateIdle handles state Idle from a higher or equal
// priority.
//
// An update with state Idle:
// - If it's from higher priority:
// - Do nothing
// - It actually shouldn't happen, no balancer switches back to Idle.
// - If it's from priorityInUse:
// - Forward only
//
// Caller must make sure priorityInUse is not higher than priority.
//
// Caller must hold mu.
func (b *priorityBalancer) handlePriorityWithNewStateIdle(child *childBalancer, priority int) {
// priorityInUse is lower than this priority, do nothing.
if b.priorityInUse > priority {
return
}
// Forward the update.
b.cc.UpdateState(child.state)
}
96 changes: 96 additions & 0 deletions xds/internal/balancer/priority/balancer_test.go
Expand Up @@ -1778,3 +1778,99 @@ func (s) TestPriority_IgnoreReresolutionRequestTwoChildren(t *testing.T) {
t.Fatalf("timeout waiting for ResolveNow()")
}
}

const initIdleBalancerName = "test-init-Idle-balancer"

var errsTestInitIdle = []error{
fmt.Errorf("init Idle balancer error 0"),
fmt.Errorf("init Idle balancer error 1"),
}

func init() {
for i := 0; i < 2; i++ {
ii := i
stub.Register(fmt.Sprintf("%s-%d", initIdleBalancerName, ii), stub.BalancerFuncs{
UpdateClientConnState: func(bd *stub.BalancerData, opts balancer.ClientConnState) error {
bd.ClientConn.NewSubConn(opts.ResolverState.Addresses, balancer.NewSubConnOptions{})
return nil
},
UpdateSubConnState: func(bd *stub.BalancerData, sc balancer.SubConn, state balancer.SubConnState) {
err := fmt.Errorf("wrong picker error")
if state.ConnectivityState == connectivity.Idle {
err = errsTestInitIdle[ii]
}
bd.ClientConn.UpdateState(balancer.State{
ConnectivityState: state.ConnectivityState,
Picker: &testutils.TestConstPicker{Err: err},
})
},
})
}
}

// If the high priorities send initial pickers with Idle state, their pickers
// should get picks, because policies like ringhash starts in Idle, and doesn't
// connect.
//
// Init 0, 1; 0 is Idle, use 0; 0 is down, start 1; 1 is Idle, use 1.
func (s) TestPriority_HighPriorityInitIdle(t *testing.T) {
cc := testutils.NewTestClientConn(t)
bb := balancer.Get(Name)
pb := bb.Build(cc, balancer.BuildOptions{})
defer pb.Close()

// Two children, with priorities [0, 1], each with one backend.
if err := pb.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{
Addresses: []resolver.Address{
hierarchy.Set(resolver.Address{Addr: testBackendAddrStrs[0]}, []string{"child-0"}),
hierarchy.Set(resolver.Address{Addr: testBackendAddrStrs[1]}, []string{"child-1"}),
},
},
BalancerConfig: &LBConfig{
Children: map[string]*Child{
"child-0": {Config: &internalserviceconfig.BalancerConfig{Name: fmt.Sprintf("%s-%d", initIdleBalancerName, 0)}},
"child-1": {Config: &internalserviceconfig.BalancerConfig{Name: fmt.Sprintf("%s-%d", initIdleBalancerName, 1)}},
},
Priorities: []string{"child-0", "child-1"},
},
}); err != nil {
t.Fatalf("failed to update ClientConn state: %v", err)
}

addrs0 := <-cc.NewSubConnAddrsCh
if got, want := addrs0[0].Addr, testBackendAddrStrs[0]; got != want {
t.Fatalf("sc is created with addr %v, want %v", got, want)
}
sc0 := <-cc.NewSubConnCh

// Send an Idle state update to trigger an Idle picker update.
pb.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Idle})
p0 := <-cc.NewPickerCh
if pr, err := p0.Pick(balancer.PickInfo{}); err != errsTestInitIdle[0] {
t.Fatalf("pick returned %v, %v, want _, %v", pr, err, errsTestInitIdle[0])
}

// Turn p0 down, to start p1.
pb.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
// Before 1 gets READY, picker should return NoSubConnAvailable, so RPCs
// will retry.
p1 := <-cc.NewPickerCh
for i := 0; i < 5; i++ {
if _, err := p1.Pick(balancer.PickInfo{}); err != balancer.ErrNoSubConnAvailable {
t.Fatalf("want pick error %v, got %v", balancer.ErrNoSubConnAvailable, err)
}
}

addrs1 := <-cc.NewSubConnAddrsCh
if got, want := addrs1[0].Addr, testBackendAddrStrs[1]; got != want {
t.Fatalf("sc is created with addr %v, want %v", got, want)
}
sc1 := <-cc.NewSubConnCh
// Idle picker from p1 should also be forwarded.
pb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Idle})
p2 := <-cc.NewPickerCh
if pr, err := p2.Pick(balancer.PickInfo{}); err != errsTestInitIdle[1] {
t.Fatalf("pick returned %v, %v, want _, %v", pr, err, errsTestInitIdle[1])
}
}

0 comments on commit c99a9c1

Please sign in to comment.