Skip to content

Commit

Permalink
xds/priority: bug fix and minor behavior change (#5417)
Browse files Browse the repository at this point in the history
  • Loading branch information
dfawley committed Jun 17, 2022
1 parent 29d9970 commit 3e7b97f
Show file tree
Hide file tree
Showing 9 changed files with 463 additions and 321 deletions.
4 changes: 4 additions & 0 deletions balancer/base/balancer.go
Expand Up @@ -45,6 +45,7 @@ func (bb *baseBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions)
scStates: make(map[balancer.SubConn]connectivity.State),
csEvltr: &balancer.ConnectivityStateEvaluator{},
config: bb.config,
state: connectivity.Connecting,
}
// Initialize picker to a picker that always returns
// ErrNoSubConnAvailable, because when state of a SubConn changes, we
Expand Down Expand Up @@ -134,6 +135,9 @@ func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
b.ResolverError(errors.New("produced zero addresses"))
return balancer.ErrBadResolverState
}

b.regeneratePicker()
b.cc.UpdateState(balancer.State{ConnectivityState: b.state, Picker: b.picker})
return nil
}

Expand Down
48 changes: 35 additions & 13 deletions internal/testutils/balancer.go
Expand Up @@ -188,17 +188,16 @@ func (tcc *TestClientConn) WaitForErrPicker(ctx context.Context) error {
}

// WaitForPickerWithErr waits until an error picker is pushed to this
// ClientConn with the error matching the wanted error. Also drains the
// matching entry from the state channel. Returns an error if the provided
// context expires, including the last received picker error (if any).
// ClientConn with the error matching the wanted error. Returns an error if
// the provided context expires, including the last received picker error (if
// any).
func (tcc *TestClientConn) WaitForPickerWithErr(ctx context.Context, want error) error {
lastErr := errors.New("received no picker")
for {
select {
case <-ctx.Done():
return fmt.Errorf("timeout when waiting for an error picker with %v; last picker error: %v", want, lastErr)
case picker := <-tcc.NewPickerCh:
<-tcc.NewStateCh
for i := 0; i < 5; i++ {
if _, lastErr = picker.Pick(balancer.PickInfo{}); lastErr == nil || lastErr.Error() != want.Error() {
break
Expand All @@ -210,17 +209,15 @@ func (tcc *TestClientConn) WaitForPickerWithErr(ctx context.Context, want error)
}

// WaitForConnectivityState waits until the state pushed to this ClientConn
// matches the wanted state. Also drains the matching entry from the picker
// channel. Returns an error if the provided context expires, including the
// last received state (if any).
// matches the wanted state. Returns an error if the provided context expires,
// including the last received state (if any).
func (tcc *TestClientConn) WaitForConnectivityState(ctx context.Context, want connectivity.State) error {
var lastState connectivity.State = -1
for {
select {
case <-ctx.Done():
return fmt.Errorf("timeout when waiting for state to be %s; last state: %s", want, lastState)
case s := <-tcc.NewStateCh:
<-tcc.NewPickerCh
if s == want {
return nil
}
Expand All @@ -230,17 +227,22 @@ func (tcc *TestClientConn) WaitForConnectivityState(ctx context.Context, want co
}

// WaitForRoundRobinPicker waits for a picker that passes IsRoundRobin. Also
// drains the matching state channel and requires it to be READY to be
// considered. Returns an error if the provided context expires, including the
// last received error from IsRoundRobin or the picker (if any).
// drains the matching state channel and requires it to be READY (if an entry
// is pending) to be considered. Returns an error if the provided context
// expires, including the last received error from IsRoundRobin or the picker
// (if any).
func (tcc *TestClientConn) WaitForRoundRobinPicker(ctx context.Context, want ...balancer.SubConn) error {
lastErr := errors.New("received no picker")
for {
select {
case <-ctx.Done():
return fmt.Errorf("timeout when waiting for round robin picker with %v; last error: %v", want, lastErr)
case s := <-tcc.NewStateCh:
p := <-tcc.NewPickerCh
case p := <-tcc.NewPickerCh:
s := connectivity.Ready
select {
case s = <-tcc.NewStateCh:
default:
}
if s != connectivity.Ready {
lastErr = fmt.Errorf("received state %v instead of ready", s)
break
Expand All @@ -250,6 +252,8 @@ func (tcc *TestClientConn) WaitForRoundRobinPicker(ctx context.Context, want ...
sc, err := p.Pick(balancer.PickInfo{})
if err != nil {
pickerErr = err
} else if sc.Done != nil {
sc.Done(balancer.DoneInfo{})
}
return sc.SubConn
}); pickerErr != nil {
Expand All @@ -264,6 +268,24 @@ func (tcc *TestClientConn) WaitForRoundRobinPicker(ctx context.Context, want ...
}
}

// WaitForPicker waits for a picker that results in f returning nil. If the
// context expires, returns the last error returned by f (if any).
func (tcc *TestClientConn) WaitForPicker(ctx context.Context, f func(balancer.Picker) error) error {
lastErr := errors.New("received no picker")
for {
select {
case <-ctx.Done():
return fmt.Errorf("timeout when waiting for picker; last error: %v", lastErr)
case p := <-tcc.NewPickerCh:
if err := f(p); err != nil {
lastErr = err
continue
}
return nil
}
}
}

// IsRoundRobin checks whether f's return value is roundrobin of elements from
// want. But it doesn't check for the order. Note that want can contain
// duplicate items, which makes it weight-round-robin.
Expand Down

0 comments on commit 3e7b97f

Please sign in to comment.