Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds/priority: bug fix and minor behavior change #5417

Merged
merged 2 commits into from Jun 17, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions balancer/base/balancer.go
Expand Up @@ -45,6 +45,7 @@ func (bb *baseBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions)
scStates: make(map[balancer.SubConn]connectivity.State),
csEvltr: &balancer.ConnectivityStateEvaluator{},
config: bb.config,
state: connectivity.Connecting,
}
// Initialize picker to a picker that always returns
// ErrNoSubConnAvailable, because when state of a SubConn changes, we
Expand Down Expand Up @@ -134,6 +135,9 @@ func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
b.ResolverError(errors.New("produced zero addresses"))
return balancer.ErrBadResolverState
}

b.regeneratePicker()
b.cc.UpdateState(balancer.State{ConnectivityState: b.state, Picker: b.picker})
return nil
}

Expand Down
48 changes: 35 additions & 13 deletions internal/testutils/balancer.go
Expand Up @@ -188,17 +188,16 @@ func (tcc *TestClientConn) WaitForErrPicker(ctx context.Context) error {
}

// WaitForPickerWithErr waits until an error picker is pushed to this
// ClientConn with the error matching the wanted error. Also drains the
// matching entry from the state channel. Returns an error if the provided
// context expires, including the last received picker error (if any).
// ClientConn with the error matching the wanted error. Returns an error if
// the provided context expires, including the last received picker error (if
// any).
func (tcc *TestClientConn) WaitForPickerWithErr(ctx context.Context, want error) error {
lastErr := errors.New("received no picker")
for {
select {
case <-ctx.Done():
return fmt.Errorf("timeout when waiting for an error picker with %v; last picker error: %v", want, lastErr)
case picker := <-tcc.NewPickerCh:
<-tcc.NewStateCh
for i := 0; i < 5; i++ {
if _, lastErr = picker.Pick(balancer.PickInfo{}); lastErr == nil || lastErr.Error() != want.Error() {
break
Expand All @@ -210,17 +209,15 @@ func (tcc *TestClientConn) WaitForPickerWithErr(ctx context.Context, want error)
}

// WaitForConnectivityState waits until the state pushed to this ClientConn
// matches the wanted state. Also drains the matching entry from the picker
// channel. Returns an error if the provided context expires, including the
// last received state (if any).
// matches the wanted state. Returns an error if the provided context expires,
// including the last received state (if any).
func (tcc *TestClientConn) WaitForConnectivityState(ctx context.Context, want connectivity.State) error {
var lastState connectivity.State = -1
for {
select {
case <-ctx.Done():
return fmt.Errorf("timeout when waiting for state to be %s; last state: %s", want, lastState)
case s := <-tcc.NewStateCh:
<-tcc.NewPickerCh
if s == want {
return nil
}
Expand All @@ -230,17 +227,22 @@ func (tcc *TestClientConn) WaitForConnectivityState(ctx context.Context, want co
}

// WaitForRoundRobinPicker waits for a picker that passes IsRoundRobin. Also
// drains the matching state channel and requires it to be READY to be
// considered. Returns an error if the provided context expires, including the
// last received error from IsRoundRobin or the picker (if any).
// drains the matching state channel and requires it to be READY (if an entry
// is pending) to be considered. Returns an error if the provided context
// expires, including the last received error from IsRoundRobin or the picker
// (if any).
func (tcc *TestClientConn) WaitForRoundRobinPicker(ctx context.Context, want ...balancer.SubConn) error {
lastErr := errors.New("received no picker")
for {
select {
case <-ctx.Done():
return fmt.Errorf("timeout when waiting for round robin picker with %v; last error: %v", want, lastErr)
case s := <-tcc.NewStateCh:
p := <-tcc.NewPickerCh
case p := <-tcc.NewPickerCh:
s := connectivity.Ready
select {
case s = <-tcc.NewStateCh:
default:
}
if s != connectivity.Ready {
lastErr = fmt.Errorf("received state %v instead of ready", s)
break
Expand All @@ -250,6 +252,8 @@ func (tcc *TestClientConn) WaitForRoundRobinPicker(ctx context.Context, want ...
sc, err := p.Pick(balancer.PickInfo{})
if err != nil {
pickerErr = err
} else if sc.Done != nil {
sc.Done(balancer.DoneInfo{})
}
return sc.SubConn
}); pickerErr != nil {
Expand All @@ -264,6 +268,24 @@ func (tcc *TestClientConn) WaitForRoundRobinPicker(ctx context.Context, want ...
}
}

// WaitForPicker waits for a picker that results in f returning nil. If the
// context expires, returns the last error returned by f (if any).
func (tcc *TestClientConn) WaitForPicker(ctx context.Context, f func(balancer.Picker) error) error {
lastErr := errors.New("received no picker")
for {
select {
case <-ctx.Done():
return fmt.Errorf("timeout when waiting for picker; last error: %v", lastErr)
case p := <-tcc.NewPickerCh:
if err := f(p); err != nil {
lastErr = err
continue
}
return nil
}
}
}

// IsRoundRobin checks whether f's return value is roundrobin of elements from
// want. But it doesn't check for the order. Note that want can contain
// duplicate items, which makes it weight-round-robin.
Expand Down