From 250d03a7667b16160b79856a55a373a40d31147d Mon Sep 17 00:00:00 2001 From: Doug Fawley Date: Wed, 25 May 2022 12:53:36 -0700 Subject: [PATCH 1/2] priority: bug fix and minor behavior change - Bug: push config updates to all child policies, not just the active one. - Process all child picker updates before determining active priority. --- balancer/base/balancer.go | 4 + internal/testutils/balancer.go | 48 +++- .../balancer/clusterimpl/balancer_test.go | 250 ++++++++---------- .../balancer/clusterresolver/eds_impl_test.go | 139 +++++----- .../balancer/clusterresolver/priority_test.go | 43 ++- xds/internal/balancer/priority/balancer.go | 67 +++-- .../balancer/priority/balancer_child.go | 9 +- .../balancer/priority/balancer_priority.go | 95 +------ .../balancer/priority/balancer_test.go | 108 +++++++- 9 files changed, 429 insertions(+), 334 deletions(-) diff --git a/balancer/base/balancer.go b/balancer/base/balancer.go index a67074a3ad0..e8dfc828aaa 100644 --- a/balancer/base/balancer.go +++ b/balancer/base/balancer.go @@ -45,6 +45,7 @@ func (bb *baseBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) scStates: make(map[balancer.SubConn]connectivity.State), csEvltr: &balancer.ConnectivityStateEvaluator{}, config: bb.config, + state: connectivity.Connecting, } // Initialize picker to a picker that always returns // ErrNoSubConnAvailable, because when state of a SubConn changes, we @@ -134,6 +135,9 @@ func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error { b.ResolverError(errors.New("produced zero addresses")) return balancer.ErrBadResolverState } + + b.regeneratePicker() + b.cc.UpdateState(balancer.State{ConnectivityState: b.state, Picker: b.picker}) return nil } diff --git a/internal/testutils/balancer.go b/internal/testutils/balancer.go index e8c485e8556..f23b215a7b3 100644 --- a/internal/testutils/balancer.go +++ b/internal/testutils/balancer.go @@ -188,9 +188,9 @@ func (tcc *TestClientConn) WaitForErrPicker(ctx context.Context) error { } // WaitForPickerWithErr waits until an error picker is pushed to this -// ClientConn with the error matching the wanted error. Also drains the -// matching entry from the state channel. Returns an error if the provided -// context expires, including the last received picker error (if any). +// ClientConn with the error matching the wanted error. Returns an error if +// the provided context expires, including the last received picker error (if +// any). func (tcc *TestClientConn) WaitForPickerWithErr(ctx context.Context, want error) error { lastErr := errors.New("received no picker") for { @@ -198,7 +198,6 @@ func (tcc *TestClientConn) WaitForPickerWithErr(ctx context.Context, want error) case <-ctx.Done(): return fmt.Errorf("timeout when waiting for an error picker with %v; last picker error: %v", want, lastErr) case picker := <-tcc.NewPickerCh: - <-tcc.NewStateCh for i := 0; i < 5; i++ { if _, lastErr = picker.Pick(balancer.PickInfo{}); lastErr == nil || lastErr.Error() != want.Error() { break @@ -210,9 +209,8 @@ func (tcc *TestClientConn) WaitForPickerWithErr(ctx context.Context, want error) } // WaitForConnectivityState waits until the state pushed to this ClientConn -// matches the wanted state. Also drains the matching entry from the picker -// channel. Returns an error if the provided context expires, including the -// last received state (if any). +// matches the wanted state. Returns an error if the provided context expires, +// including the last received state (if any). func (tcc *TestClientConn) WaitForConnectivityState(ctx context.Context, want connectivity.State) error { var lastState connectivity.State = -1 for { @@ -220,7 +218,6 @@ func (tcc *TestClientConn) WaitForConnectivityState(ctx context.Context, want co case <-ctx.Done(): return fmt.Errorf("timeout when waiting for state to be %s; last state: %s", want, lastState) case s := <-tcc.NewStateCh: - <-tcc.NewPickerCh if s == want { return nil } @@ -230,17 +227,22 @@ func (tcc *TestClientConn) WaitForConnectivityState(ctx context.Context, want co } // WaitForRoundRobinPicker waits for a picker that passes IsRoundRobin. Also -// drains the matching state channel and requires it to be READY to be -// considered. Returns an error if the provided context expires, including the -// last received error from IsRoundRobin or the picker (if any). +// drains the matching state channel and requires it to be READY (if an entry +// is pending) to be considered. Returns an error if the provided context +// expires, including the last received error from IsRoundRobin or the picker +// (if any). func (tcc *TestClientConn) WaitForRoundRobinPicker(ctx context.Context, want ...balancer.SubConn) error { lastErr := errors.New("received no picker") for { select { case <-ctx.Done(): return fmt.Errorf("timeout when waiting for round robin picker with %v; last error: %v", want, lastErr) - case s := <-tcc.NewStateCh: - p := <-tcc.NewPickerCh + case p := <-tcc.NewPickerCh: + s := connectivity.Ready + select { + case s = <-tcc.NewStateCh: + default: + } if s != connectivity.Ready { lastErr = fmt.Errorf("received state %v instead of ready", s) break @@ -250,6 +252,8 @@ func (tcc *TestClientConn) WaitForRoundRobinPicker(ctx context.Context, want ... sc, err := p.Pick(balancer.PickInfo{}) if err != nil { pickerErr = err + } else if sc.Done != nil { + sc.Done(balancer.DoneInfo{}) } return sc.SubConn }); pickerErr != nil { @@ -264,6 +268,24 @@ func (tcc *TestClientConn) WaitForRoundRobinPicker(ctx context.Context, want ... } } +// WaitForPicker waits for a picker that results in f returning nil. If the +// context expires, returns the last error returned by f (if any). +func (tcc *TestClientConn) WaitForPicker(ctx context.Context, f func(balancer.Picker) error) error { + lastErr := errors.New("received no picker") + for { + select { + case <-ctx.Done(): + return fmt.Errorf("timeout when waiting for picker; last error: %v", lastErr) + case p := <-tcc.NewPickerCh: + if err := f(p); err != nil { + lastErr = err + continue + } + return nil + } + } +} + // IsRoundRobin checks whether f's return value is roundrobin of elements from // want. But it doesn't check for the order. Note that want can contain // duplicate items, which makes it weight-round-robin. diff --git a/xds/internal/balancer/clusterimpl/balancer_test.go b/xds/internal/balancer/clusterimpl/balancer_test.go index d444ecd4f4f..c8e6b55a588 100644 --- a/xds/internal/balancer/clusterimpl/balancer_test.go +++ b/xds/internal/balancer/clusterimpl/balancer_test.go @@ -46,7 +46,7 @@ import ( ) const ( - defaultTestTimeout = 1 * time.Second + defaultTestTimeout = 5 * time.Second defaultShortTestTimeout = 100 * time.Microsecond testClusterName = "test-cluster" @@ -90,6 +90,9 @@ func init() { // TestDropByCategory verifies that the balancer correctly drops the picks, and // that the drops are reported. func (s) TestDropByCategory(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + defer xdsclient.ClearCounterForTesting(testClusterName, testServiceName) xdsC := fakeclient.NewClient() defer xdsC.Close() @@ -122,9 +125,6 @@ func (s) TestDropByCategory(t *testing.T) { t.Fatalf("unexpected error from UpdateClientConnState: %v", err) } - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - got, err := xdsC.WaitForReportLoad(ctx) if err != nil { t.Fatalf("xdsClient.ReportLoad failed with error: %v", err) @@ -136,34 +136,31 @@ func (s) TestDropByCategory(t *testing.T) { sc1 := <-cc.NewSubConnCh b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) // This should get the connecting picker. - p0 := <-cc.NewPickerCh - for i := 0; i < 10; i++ { - _, err := p0.Pick(balancer.PickInfo{}) - if err != balancer.ErrNoSubConnAvailable { - t.Fatalf("picker.Pick, got _,%v, want Err=%v", err, balancer.ErrNoSubConnAvailable) - } - } + cc.WaitForPickerWithErr(ctx, balancer.ErrNoSubConnAvailable) b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready}) // Test pick with one backend. - p1 := <-cc.NewPickerCh + const rpcCount = 20 - for i := 0; i < rpcCount; i++ { - gotSCSt, err := p1.Pick(balancer.PickInfo{}) - // Even RPCs are dropped. - if i%2 == 0 { - if err == nil || !strings.Contains(err.Error(), "dropped") { - t.Fatalf("pick.Pick, got %v, %v, want error RPC dropped", gotSCSt, err) + cc.WaitForPicker(ctx, func(p balancer.Picker) error { + for i := 0; i < rpcCount; i++ { + gotSCSt, err := p.Pick(balancer.PickInfo{}) + // Even RPCs are dropped. + if i%2 == 0 { + if err == nil || !strings.Contains(err.Error(), "dropped") { + return fmt.Errorf("pick.Pick, got %v, %v, want error RPC dropped", gotSCSt, err) + } + continue + } + if err != nil || !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) { + return fmt.Errorf("picker.Pick, got %v, %v, want SubConn=%v", gotSCSt, err, sc1) + } + if gotSCSt.Done != nil { + gotSCSt.Done(balancer.DoneInfo{}) } - continue - } - if err != nil || !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) { - t.Fatalf("picker.Pick, got %v, %v, want SubConn=%v", gotSCSt, err, sc1) - } - if gotSCSt.Done != nil { - gotSCSt.Done(balancer.DoneInfo{}) } - } + return nil + }) // Dump load data from the store and compare with expected counts. loadStore := xdsC.LoadStore() @@ -210,23 +207,25 @@ func (s) TestDropByCategory(t *testing.T) { t.Fatalf("unexpected error from UpdateClientConnState: %v", err) } - p2 := <-cc.NewPickerCh - for i := 0; i < rpcCount; i++ { - gotSCSt, err := p2.Pick(balancer.PickInfo{}) - // Even RPCs are dropped. - if i%4 == 0 { - if err == nil || !strings.Contains(err.Error(), "dropped") { - t.Fatalf("pick.Pick, got %v, %v, want error RPC dropped", gotSCSt, err) + cc.WaitForPicker(ctx, func(p balancer.Picker) error { + for i := 0; i < rpcCount; i++ { + gotSCSt, err := p.Pick(balancer.PickInfo{}) + // Even RPCs are dropped. + if i%4 == 0 { + if err == nil || !strings.Contains(err.Error(), "dropped") { + return fmt.Errorf("pick.Pick, got %v, %v, want error RPC dropped", gotSCSt, err) + } + continue + } + if err != nil || !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) { + return fmt.Errorf("picker.Pick, got %v, %v, want SubConn=%v", gotSCSt, err, sc1) + } + if gotSCSt.Done != nil { + gotSCSt.Done(balancer.DoneInfo{}) } - continue - } - if err != nil || !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) { - t.Fatalf("picker.Pick, got %v, %v, want SubConn=%v", gotSCSt, err, sc1) - } - if gotSCSt.Done != nil { - gotSCSt.Done(balancer.DoneInfo{}) } - } + return nil + }) const dropCount2 = rpcCount * dropNumerator2 / dropDenominator2 wantStatsData1 := []*load.Data{{ @@ -287,52 +286,51 @@ func (s) TestDropCircuitBreaking(t *testing.T) { sc1 := <-cc.NewSubConnCh b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) // This should get the connecting picker. - p0 := <-cc.NewPickerCh - for i := 0; i < 10; i++ { - _, err := p0.Pick(balancer.PickInfo{}) - if err != balancer.ErrNoSubConnAvailable { - t.Fatalf("picker.Pick, got _,%v, want Err=%v", err, balancer.ErrNoSubConnAvailable) - } + if err := cc.WaitForPickerWithErr(ctx, balancer.ErrNoSubConnAvailable); err != nil { + t.Fatal(err.Error()) } b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready}) // Test pick with one backend. - dones := []func(){} - p1 := <-cc.NewPickerCh const rpcCount = 100 - for i := 0; i < rpcCount; i++ { - gotSCSt, err := p1.Pick(balancer.PickInfo{}) - if i < 50 && err != nil { - t.Errorf("The first 50%% picks should be non-drops, got error %v", err) - } else if i > 50 && err == nil { - t.Errorf("The second 50%% picks should be drops, got error ") - } - dones = append(dones, func() { - if gotSCSt.Done != nil { - gotSCSt.Done(balancer.DoneInfo{}) + cc.WaitForPicker(ctx, func(p balancer.Picker) error { + dones := []func(){} + for i := 0; i < rpcCount; i++ { + gotSCSt, err := p.Pick(balancer.PickInfo{}) + if i < 50 && err != nil { + return fmt.Errorf("The first 50%% picks should be non-drops, got error %v", err) + } else if i > 50 && err == nil { + return fmt.Errorf("The second 50%% picks should be drops, got error ") } - }) - } - for _, done := range dones { - done() - } - - dones = []func(){} - // Pick without drops. - for i := 0; i < 50; i++ { - gotSCSt, err := p1.Pick(balancer.PickInfo{}) - if err != nil { - t.Errorf("The third 50%% picks should be non-drops, got error %v", err) + dones = append(dones, func() { + if gotSCSt.Done != nil { + gotSCSt.Done(balancer.DoneInfo{}) + } + }) } - dones = append(dones, func() { - if gotSCSt.Done != nil { - gotSCSt.Done(balancer.DoneInfo{}) + for _, done := range dones { + done() + } + + dones = []func(){} + // Pick without drops. + for i := 0; i < 50; i++ { + gotSCSt, err := p.Pick(balancer.PickInfo{}) + if err != nil { + t.Errorf("The third 50%% picks should be non-drops, got error %v", err) } - }) - } - for _, done := range dones { - done() - } + dones = append(dones, func() { + if gotSCSt.Done != nil { + gotSCSt.Done(balancer.DoneInfo{}) + } + }) + } + for _, done := range dones { + done() + } + + return nil + }) // Dump load data from the store and compare with expected counts. loadStore := xdsC.LoadStore() @@ -426,6 +424,9 @@ func (s) TestPickerUpdateAfterClose(t *testing.T) { // TestClusterNameInAddressAttributes covers the case that cluster name is // attached to the subconn address attributes. func (s) TestClusterNameInAddressAttributes(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + defer xdsclient.ClearCounterForTesting(testClusterName, testServiceName) xdsC := fakeclient.NewClient() defer xdsC.Close() @@ -451,13 +452,7 @@ func (s) TestClusterNameInAddressAttributes(t *testing.T) { sc1 := <-cc.NewSubConnCh b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) // This should get the connecting picker. - p0 := <-cc.NewPickerCh - for i := 0; i < 10; i++ { - _, err := p0.Pick(balancer.PickInfo{}) - if err != balancer.ErrNoSubConnAvailable { - t.Fatalf("picker.Pick, got _,%v, want Err=%v", err, balancer.ErrNoSubConnAvailable) - } - } + cc.WaitForPickerWithErr(ctx, balancer.ErrNoSubConnAvailable) addrs1 := <-cc.NewSubConnAddrsCh if got, want := addrs1[0].Addr, testBackendAddrs[0].Addr; got != want { @@ -470,17 +465,7 @@ func (s) TestClusterNameInAddressAttributes(t *testing.T) { b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready}) // Test pick with one backend. - p1 := <-cc.NewPickerCh - const rpcCount = 20 - for i := 0; i < rpcCount; i++ { - gotSCSt, err := p1.Pick(balancer.PickInfo{}) - if err != nil || !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) { - t.Fatalf("picker.Pick, got %v, %v, want SubConn=%v", gotSCSt, err, sc1) - } - if gotSCSt.Done != nil { - gotSCSt.Done(balancer.DoneInfo{}) - } - } + cc.WaitForRoundRobinPicker(ctx, sc1) const testClusterName2 = "test-cluster-2" var addr2 = resolver.Address{Addr: "2.2.2.2"} @@ -511,6 +496,9 @@ func (s) TestClusterNameInAddressAttributes(t *testing.T) { // TestReResolution verifies that when a SubConn turns transient failure, // re-resolution is triggered. func (s) TestReResolution(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + defer xdsclient.ClearCounterForTesting(testClusterName, testServiceName) xdsC := fakeclient.NewClient() defer xdsC.Close() @@ -536,22 +524,14 @@ func (s) TestReResolution(t *testing.T) { sc1 := <-cc.NewSubConnCh b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) // This should get the connecting picker. - p0 := <-cc.NewPickerCh - for i := 0; i < 10; i++ { - _, err := p0.Pick(balancer.PickInfo{}) - if err != balancer.ErrNoSubConnAvailable { - t.Fatalf("picker.Pick, got _,%v, want Err=%v", err, balancer.ErrNoSubConnAvailable) - } + if err := cc.WaitForPickerWithErr(ctx, balancer.ErrNoSubConnAvailable); err != nil { + t.Fatal(err.Error()) } b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure}) // This should get the transient failure picker. - p1 := <-cc.NewPickerCh - for i := 0; i < 10; i++ { - _, err := p1.Pick(balancer.PickInfo{}) - if err == nil { - t.Fatalf("picker.Pick, got _,%v, want not nil", err) - } + if err := cc.WaitForErrPicker(ctx); err != nil { + t.Fatal(err.Error()) } // The transient failure should trigger a re-resolution. @@ -563,20 +543,14 @@ func (s) TestReResolution(t *testing.T) { b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready}) // Test pick with one backend. - p2 := <-cc.NewPickerCh - want := []balancer.SubConn{sc1} - if err := testutils.IsRoundRobin(want, subConnFromPicker(p2)); err != nil { - t.Fatalf("want %v, got %v", want, err) + if err := cc.WaitForRoundRobinPicker(ctx, sc1); err != nil { + t.Fatal(err.Error()) } b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure}) // This should get the transient failure picker. - p3 := <-cc.NewPickerCh - for i := 0; i < 10; i++ { - _, err := p3.Pick(balancer.PickInfo{}) - if err == nil { - t.Fatalf("picker.Pick, got _,%v, want not nil", err) - } + if err := cc.WaitForErrPicker(ctx); err != nil { + t.Fatal(err.Error()) } // The transient failure should trigger a re-resolution. @@ -635,33 +609,31 @@ func (s) TestLoadReporting(t *testing.T) { sc1 := <-cc.NewSubConnCh b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) // This should get the connecting picker. - p0 := <-cc.NewPickerCh - for i := 0; i < 10; i++ { - _, err := p0.Pick(balancer.PickInfo{}) - if err != balancer.ErrNoSubConnAvailable { - t.Fatalf("picker.Pick, got _,%v, want Err=%v", err, balancer.ErrNoSubConnAvailable) - } + if err := cc.WaitForPickerWithErr(ctx, balancer.ErrNoSubConnAvailable); err != nil { + t.Fatal(err.Error()) } b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready}) // Test pick with one backend. - p1 := <-cc.NewPickerCh const successCount = 5 - for i := 0; i < successCount; i++ { - gotSCSt, err := p1.Pick(balancer.PickInfo{}) - if !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) { - t.Fatalf("picker.Pick, got %v, %v, want SubConn=%v", gotSCSt, err, sc1) - } - gotSCSt.Done(balancer.DoneInfo{}) - } const errorCount = 5 - for i := 0; i < errorCount; i++ { - gotSCSt, err := p1.Pick(balancer.PickInfo{}) - if !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) { - t.Fatalf("picker.Pick, got %v, %v, want SubConn=%v", gotSCSt, err, sc1) + cc.WaitForPicker(ctx, func(p balancer.Picker) error { + for i := 0; i < successCount; i++ { + gotSCSt, err := p.Pick(balancer.PickInfo{}) + if !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) { + return fmt.Errorf("picker.Pick, got %v, %v, want SubConn=%v", gotSCSt, err, sc1) + } + gotSCSt.Done(balancer.DoneInfo{}) } - gotSCSt.Done(balancer.DoneInfo{Err: fmt.Errorf("error")}) - } + for i := 0; i < errorCount; i++ { + gotSCSt, err := p.Pick(balancer.PickInfo{}) + if !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) { + return fmt.Errorf("picker.Pick, got %v, %v, want SubConn=%v", gotSCSt, err, sc1) + } + gotSCSt.Done(balancer.DoneInfo{Err: fmt.Errorf("error")}) + } + return nil + }) // Dump load data from the store and compare with expected counts. loadStore := xdsC.LoadStore() diff --git a/xds/internal/balancer/clusterresolver/eds_impl_test.go b/xds/internal/balancer/clusterresolver/eds_impl_test.go index 7f2bfa8a75d..1dab8c9f8f5 100644 --- a/xds/internal/balancer/clusterresolver/eds_impl_test.go +++ b/xds/internal/balancer/clusterresolver/eds_impl_test.go @@ -456,6 +456,9 @@ func (s) TestEDS_EmptyUpdate(t *testing.T) { } func (s) TestEDS_CircuitBreaking(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + edsb, cc, xdsC, cleanup := setupTestEDS(t, nil) defer cleanup() @@ -481,44 +484,46 @@ func (s) TestEDS_CircuitBreaking(t *testing.T) { edsb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready}) // Picks with drops. - dones := []func(){} - p := <-cc.NewPickerCh - for i := 0; i < 100; i++ { - pr, err := p.Pick(balancer.PickInfo{}) - if i < 50 && err != nil { - t.Errorf("The first 50%% picks should be non-drops, got error %v", err) - } else if i > 50 && err == nil { - t.Errorf("The second 50%% picks should be drops, got error ") - } - dones = append(dones, func() { - if pr.Done != nil { - pr.Done(balancer.DoneInfo{}) + cc.WaitForPicker(ctx, func(p balancer.Picker) error { + dones := []func(){} + for i := 0; i < 100; i++ { + pr, err := p.Pick(balancer.PickInfo{}) + if i < 50 && err != nil { + return fmt.Errorf("The first 50%% picks should be non-drops, got error %v", err) + } else if i > 50 && err == nil { + return fmt.Errorf("The second 50%% picks should be drops, got error ") } - }) - } - - for _, done := range dones { - done() - } - dones = []func(){} + dones = append(dones, func() { + if pr.Done != nil { + pr.Done(balancer.DoneInfo{}) + } + }) + } - // Pick without drops. - for i := 0; i < 50; i++ { - pr, err := p.Pick(balancer.PickInfo{}) - if err != nil { - t.Errorf("The third 50%% picks should be non-drops, got error %v", err) + for _, done := range dones { + done() } - dones = append(dones, func() { - if pr.Done != nil { - pr.Done(balancer.DoneInfo{}) + dones = []func(){} + + // Pick without drops. + for i := 0; i < 50; i++ { + pr, err := p.Pick(balancer.PickInfo{}) + if err != nil { + return fmt.Errorf("The third 50%% picks should be non-drops, got error %v", err) } - }) - } + dones = append(dones, func() { + if pr.Done != nil { + pr.Done(balancer.DoneInfo{}) + } + }) + } - // Without this, future tests with the same service name will fail. - for _, done := range dones { - done() - } + // Without this, future tests with the same service name will fail. + for _, done := range dones { + done() + } + return nil + }) // Send another update, with only circuit breaking update (and no picker // update afterwards). Make sure the new picker uses the new configs. @@ -536,42 +541,44 @@ func (s) TestEDS_CircuitBreaking(t *testing.T) { } // Picks with drops. - dones = []func(){} - p2 := <-cc.NewPickerCh - for i := 0; i < 100; i++ { - pr, err := p2.Pick(balancer.PickInfo{}) - if i < 10 && err != nil { - t.Errorf("The first 10%% picks should be non-drops, got error %v", err) - } else if i > 10 && err == nil { - t.Errorf("The next 90%% picks should be drops, got error ") - } - dones = append(dones, func() { - if pr.Done != nil { - pr.Done(balancer.DoneInfo{}) + cc.WaitForPicker(ctx, func(p balancer.Picker) error { + dones := []func(){} + for i := 0; i < 100; i++ { + pr, err := p.Pick(balancer.PickInfo{}) + if i < 10 && err != nil { + return fmt.Errorf("The first 10%% picks should be non-drops, got error %v", err) + } else if i > 10 && err == nil { + return fmt.Errorf("The next 90%% picks should be drops, got error ") } - }) - } - - for _, done := range dones { - done() - } - dones = []func(){} + dones = append(dones, func() { + if pr.Done != nil { + pr.Done(balancer.DoneInfo{}) + } + }) + } - // Pick without drops. - for i := 0; i < 10; i++ { - pr, err := p2.Pick(balancer.PickInfo{}) - if err != nil { - t.Errorf("The next 10%% picks should be non-drops, got error %v", err) + for _, done := range dones { + done() } - dones = append(dones, func() { - if pr.Done != nil { - pr.Done(balancer.DoneInfo{}) + dones = []func(){} + + // Pick without drops. + for i := 0; i < 10; i++ { + pr, err := p.Pick(balancer.PickInfo{}) + if err != nil { + return fmt.Errorf("The next 10%% picks should be non-drops, got error %v", err) } - }) - } + dones = append(dones, func() { + if pr.Done != nil { + pr.Done(balancer.DoneInfo{}) + } + }) + } - // Without this, future tests with the same service name will fail. - for _, done := range dones { - done() - } + // Without this, future tests with the same service name will fail. + for _, done := range dones { + done() + } + return nil + }) } diff --git a/xds/internal/balancer/clusterresolver/priority_test.go b/xds/internal/balancer/clusterresolver/priority_test.go index ba497bb7f10..4c8f0b57c02 100644 --- a/xds/internal/balancer/clusterresolver/priority_test.go +++ b/xds/internal/balancer/clusterresolver/priority_test.go @@ -69,8 +69,6 @@ func (s) TestEDSPriority_HighPriorityReady(t *testing.T) { xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab2.Build()), nil) select { - case <-cc.NewPickerCh: - t.Fatalf("got unexpected new picker") case <-cc.NewSubConnCh: t.Fatalf("got unexpected new SubConn") case <-cc.RemoveSubConnCh: @@ -78,6 +76,18 @@ func (s) TestEDSPriority_HighPriorityReady(t *testing.T) { case <-time.After(defaultTestShortTimeout): } + select { + case p := <-cc.NewPickerCh: + // If we do get a new picker, ensure it is still a p1 picker. + if err := testutils.IsRoundRobin([]balancer.SubConn{sc1}, subConnFromPicker(p)); err != nil { + t.Fatal(err.Error()) + } + default: + // No new picker; we were previously using p1 and should still be using + // p1, so this is okay. No need to wait for defaultTestShortTimeout + // since we just waited immediately above. + } + // Remove p2, no updates. clab3 := xdstestutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) clab3.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil) @@ -85,14 +95,25 @@ func (s) TestEDSPriority_HighPriorityReady(t *testing.T) { xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab3.Build()), nil) select { - case <-cc.NewPickerCh: - t.Fatalf("got unexpected new picker") case <-cc.NewSubConnCh: t.Fatalf("got unexpected new SubConn") case <-cc.RemoveSubConnCh: t.Fatalf("got unexpected remove SubConn") case <-time.After(defaultTestShortTimeout): } + + select { + case p := <-cc.NewPickerCh: + // If we do get a new picker, ensure it is still a p1 picker. + if err := testutils.IsRoundRobin([]balancer.SubConn{sc1}, subConnFromPicker(p)); err != nil { + t.Fatal(err.Error()) + } + default: + // No new picker; we were previously using p1 and should still be using + // p1, so this is okay. No need to wait for defaultTestShortTimeout + // since we just waited immediately above. + } + } // Lower priority is used when higher priority is not ready. @@ -147,8 +168,6 @@ func (s) TestEDSPriority_SwitchPriority(t *testing.T) { xdsC.InvokeWatchEDSCallback("", parseEDSRespProtoForTesting(clab2.Build()), nil) select { - case <-cc.NewPickerCh: - t.Fatalf("got unexpected new picker") case <-cc.NewSubConnCh: t.Fatalf("got unexpected new SubConn") case <-cc.RemoveSubConnCh: @@ -156,6 +175,18 @@ func (s) TestEDSPriority_SwitchPriority(t *testing.T) { case <-time.After(defaultTestShortTimeout): } + select { + case p := <-cc.NewPickerCh: + // If we do get a new picker, ensure it is still a p1 picker. + if err := testutils.IsRoundRobin([]balancer.SubConn{sc1}, subConnFromPicker(p)); err != nil { + t.Fatal(err.Error()) + } + default: + // No new picker; we were previously using p1 and should still be using + // p1, so this is okay. No need to wait for defaultTestShortTimeout + // since we just waited immediately above. + } + // Turn down 1, use 2 edsb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure}) addrs2 := <-cc.NewSubConnAddrsCh diff --git a/xds/internal/balancer/priority/balancer.go b/xds/internal/balancer/priority/balancer.go index 672f10122ff..e15f7838993 100644 --- a/xds/internal/balancer/priority/balancer.go +++ b/xds/internal/balancer/priority/balancer.go @@ -30,6 +30,8 @@ import ( "time" "google.golang.org/grpc/balancer" + "google.golang.org/grpc/balancer/base" + "google.golang.org/grpc/connectivity" "google.golang.org/grpc/internal/balancergroup" "google.golang.org/grpc/internal/buffer" "google.golang.org/grpc/internal/grpclog" @@ -53,7 +55,6 @@ func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Ba b := &priorityBalancer{ cc: cc, done: grpcsync.NewEvent(), - childToPriority: make(map[string]int), children: make(map[string]*childBalancer), childBalancerStateUpdate: buffer.NewUnbounded(), } @@ -90,16 +91,17 @@ type priorityBalancer struct { mu sync.Mutex childInUse string - // priority of the child that's current in use. Int starting from 0, and 0 - // is the higher priority. - priorityInUse int // priorities is a list of child names from higher to lower priority. priorities []string - // childToPriority is a map from the child name to it's priority. Priority - // is an int start from 0, and 0 is the higher priority. - childToPriority map[string]int // children is a map from child name to sub-balancers. children map[string]*childBalancer + + // Set during UpdateClientConnState when calling into sub-balancers. + // Prevents child updates from recomputing the active priority or sending + // an update of the aggregated picker to the parent. Cleared after all + // sub-balancers have finished UpdateClientConnState, after which + // syncPriority is called manually. + inhibitChildUpdates bool } func (b *priorityBalancer) UpdateClientConnState(s balancer.ClientConnState) error { @@ -111,7 +113,6 @@ func (b *priorityBalancer) UpdateClientConnState(s balancer.ClientConnState) err addressesSplit := hierarchy.Group(s.ResolverState.Addresses) b.mu.Lock() - defer b.mu.Unlock() // Create and remove children, since we know all children from the config // are used by some priority. for name, newSubConfig := range newConfig.Children { @@ -146,15 +147,14 @@ func (b *priorityBalancer) UpdateClientConnState(s balancer.ClientConnState) err } // Update config and address, but note that this doesn't send the - // updates to child balancer (the child balancer might not be built, if - // it's a low priority). + // updates to non-started child balancers (the child balancer might not + // be built, if it's a low priority). currentChild.updateConfig(newSubConfig, resolver.State{ Addresses: addressesSplit[name], ServiceConfig: s.ResolverState.ServiceConfig, Attributes: s.ResolverState.Attributes, }) } - // Remove child from children if it's not in new config. for name, oldChild := range b.children { if _, ok := newConfig.Children[name]; !ok { @@ -164,13 +164,25 @@ func (b *priorityBalancer) UpdateClientConnState(s balancer.ClientConnState) err // Update priorities and handle priority changes. b.priorities = newConfig.Priorities - b.childToPriority = make(map[string]int, len(newConfig.Priorities)) - for pi, pName := range newConfig.Priorities { - b.childToPriority[pName] = pi + + // Everything was removed by the update. + if len(b.priorities) == 0 { + b.childInUse = "" + b.cc.UpdateState(balancer.State{ + ConnectivityState: connectivity.TransientFailure, + Picker: base.NewErrPicker(ErrAllPrioritiesRemoved), + }) + b.mu.Unlock() + return nil } - // Sync the states of all children to the new updated priorities. This - // include starting/stopping child balancers when necessary. - b.syncPriority(true) + + b.inhibitChildUpdates = true + b.mu.Unlock() + // This will sync the states of all children to the new updated + // priorities. Includes starting/stopping child balancers when necessary. + done := make(chan struct{}) + b.childBalancerStateUpdate.Put(clearInhibitChildUpdates{done: done}) + <-done return nil } @@ -206,7 +218,7 @@ func (b *priorityBalancer) ExitIdle() { // UpdateState implements balancergroup.BalancerStateAggregator interface. The // balancer group sends new connectivity state and picker here. func (b *priorityBalancer) UpdateState(childName string, state balancer.State) { - b.childBalancerStateUpdate.Put(&childBalancerState{ + b.childBalancerStateUpdate.Put(childBalancerState{ name: childName, s: state, }) @@ -217,6 +229,10 @@ type childBalancerState struct { s balancer.State } +type clearInhibitChildUpdates struct { + done chan struct{} +} + // run handles child update in a separate goroutine, so if the child sends // updates inline (when called by parent), it won't cause deadlocks (by trying // to hold the same mutex). @@ -225,11 +241,22 @@ func (b *priorityBalancer) run() { select { case u := <-b.childBalancerStateUpdate.Get(): b.childBalancerStateUpdate.Load() - s := u.(*childBalancerState) // Needs to handle state update in a goroutine, because each state // update needs to start/close child policy, could result in // deadlock. - b.handleChildStateUpdate(s.name, s.s) + b.mu.Lock() + if b.done.HasFired() { + return + } + switch s := u.(type) { + case childBalancerState: + b.handleChildStateUpdate(s.name, s.s) + case clearInhibitChildUpdates: + b.inhibitChildUpdates = false + b.syncPriority("") + close(s.done) + } + b.mu.Unlock() case <-b.done.Done(): return } diff --git a/xds/internal/balancer/priority/balancer_child.go b/xds/internal/balancer/priority/balancer_child.go index c00a56b8f9e..ba50404138e 100644 --- a/xds/internal/balancer/priority/balancer_child.go +++ b/xds/internal/balancer/priority/balancer_child.go @@ -44,7 +44,8 @@ type childBalancer struct { // will be restarted if the child has not reported TF more recently than it // reported Ready or Idle. reportedTF bool - state balancer.State + // The latest state the child balancer provided. + state balancer.State // The timer to give a priority some time to connect. And if the priority // doesn't go into Ready/Failure, the next priority will be started. initTimer *timerWrapper @@ -79,6 +80,9 @@ func (cb *childBalancer) updateConfig(child *Child, rState resolver.State) { cb.ignoreReresolutionRequests = child.IgnoreReresolutionRequests cb.config = child.Config.Config cb.rState = rState + if cb.started { + cb.sendUpdate() + } } // start builds the child balancer if it's not already started. @@ -91,6 +95,7 @@ func (cb *childBalancer) start() { cb.started = true cb.parent.bg.Add(cb.name, cb.bb) cb.startInitTimer() + cb.sendUpdate() } // sendUpdate sends the addresses and config to the child balancer. @@ -145,7 +150,7 @@ func (cb *childBalancer) startInitTimer() { // Re-sync the priority. This will switch to the next priority if // there's any. Note that it's important sync() is called after setting // initTimer to nil. - cb.parent.syncPriority(false) + cb.parent.syncPriority("") }) } diff --git a/xds/internal/balancer/priority/balancer_priority.go b/xds/internal/balancer/priority/balancer_priority.go index 2487c262604..ec52fd3cf7e 100644 --- a/xds/internal/balancer/priority/balancer_priority.go +++ b/xds/internal/balancer/priority/balancer_priority.go @@ -23,7 +23,6 @@ import ( "time" "google.golang.org/grpc/balancer" - "google.golang.org/grpc/balancer/base" "google.golang.org/grpc/connectivity" ) @@ -59,7 +58,7 @@ var ( // - If balancer is Connecting and has non-nil initTimer (meaning it // transitioned from Ready or Idle to connecting, not from TF, so we // should give it init-time to connect). -// - If balancer is READY +// - If balancer is READY or IDLE // - If this is the lowest priority // - do the following: // - if this is not the old childInUse, override picker so old picker is no @@ -68,18 +67,10 @@ var ( // - forward the new addresses and config // // Caller must hold b.mu. -func (b *priorityBalancer) syncPriority(forceUpdate bool) { - // Everything was removed by the update. - if len(b.priorities) == 0 { - b.childInUse = "" - b.priorityInUse = 0 - b.cc.UpdateState(balancer.State{ - ConnectivityState: connectivity.TransientFailure, - Picker: base.NewErrPicker(ErrAllPrioritiesRemoved), - }) +func (b *priorityBalancer) syncPriority(childUpdating string) { + if b.inhibitChildUpdates { return } - for p, name := range b.priorities { child, ok := b.children[name] if !ok { @@ -92,23 +83,14 @@ func (b *priorityBalancer) syncPriority(forceUpdate bool) { child.state.ConnectivityState == connectivity.Idle || (child.state.ConnectivityState == connectivity.Connecting && child.initTimer != nil) || p == len(b.priorities)-1 { - if b.childInUse != "" && b.childInUse != child.name { - // childInUse was set and is different from this child, will - // change childInUse later. We need to update picker here - // immediately so parent stops using the old picker. + if b.childInUse != child.name || child.name == childUpdating { + logger.Warningf("ciu, cn, cu: %v, %v, %v", b.childInUse, child.name, childUpdating) + // If we switch children or the child in use just updated its + // picker, push the child's picker to the parent. b.cc.UpdateState(child.state) } b.logger.Infof("switching to (%q, %v) in syncPriority", child.name, p) - oldChildInUse := b.childInUse b.switchToChild(child, p) - if b.childInUse != oldChildInUse || forceUpdate { - // If child is switched, send the update to the new child. - // - // Or if forceUpdate is true (when this is triggered by a - // ClientConn update), because the ClientConn update might - // contain changes for this child. - child.sendUpdate() - } break } } @@ -163,7 +145,6 @@ func (b *priorityBalancer) switchToChild(child *childBalancer, priority int) { return } b.childInUse = child.name - b.priorityInUse = priority if !child.started { child.start() @@ -173,40 +154,13 @@ func (b *priorityBalancer) switchToChild(child *childBalancer, priority int) { // handleChildStateUpdate start/close priorities based on the connectivity // state. func (b *priorityBalancer) handleChildStateUpdate(childName string, s balancer.State) { - b.mu.Lock() - defer b.mu.Unlock() - if b.done.HasFired() { - return - } - - priority, ok := b.childToPriority[childName] - if !ok { - b.logger.Warningf("priority: received picker update with unknown child %v", childName) - return - } - - if b.childInUse == "" { - b.logger.Warningf("priority: no child is in use when picker update is received") - return - } - - // priorityInUse is higher than this priority. - if b.priorityInUse < priority { - // Lower priorities should all be closed, this is an unexpected update. - // Can happen if the child policy sends an update after we tell it to - // close. - b.logger.Warningf("priority: received picker update from priority %v, lower than priority in use %v", priority, b.priorityInUse) - return - } - // Update state in child. The updated picker will be sent to parent later if // necessary. child, ok := b.children[childName] if !ok { - b.logger.Warningf("priority: child balancer not found for child %v, priority %v", childName, priority) + b.logger.Warningf("priority: child balancer not found for child %v", childName) return } - oldChildState := child.state child.state = s // We start/stop the init timer of this child based on the new connectivity @@ -227,36 +181,5 @@ func (b *priorityBalancer) handleChildStateUpdate(childName string, s balancer.S // New state is Shutdown, should never happen. Don't forward. } - oldPriorityInUse := b.priorityInUse - child.parent.syncPriority(false) - // If child is switched by syncPriority(), it also sends the update from the - // new child to overwrite the old picker used by the parent. - // - // But no update is sent if the child is not switches. That means if this - // update is from childInUse, and this child is still childInUse after - // syncing, the update being handled here is not sent to the parent. In that - // case, we need to do an explicit check here to forward the update. - if b.priorityInUse == oldPriorityInUse && b.priorityInUse == priority { - // Special handling for Connecting. If child was not switched, and this - // is a Connecting->Connecting transition, do not send the redundant - // update, since all Connecting pickers are the same (they tell the RPCs - // to repick). - // - // This can happen because the initial state of a child (before any - // update is received) is Connecting. When the child is started, it's - // picker is sent to the parent by syncPriority (to overwrite the old - // picker if there's any). When it reports Connecting after being - // started, it will send a Connecting update (handled here), causing a - // Connecting->Connecting transition. - if oldChildState.ConnectivityState == connectivity.Connecting && s.ConnectivityState == connectivity.Connecting { - return - } - // Only forward this update if sync() didn't switch child, and this - // child is in use. - // - // sync() forwards the update if the child was switched, so there's no - // need to forward again. - b.cc.UpdateState(child.state) - } - + child.parent.syncPriority(childName) } diff --git a/xds/internal/balancer/priority/balancer_test.go b/xds/internal/balancer/priority/balancer_test.go index 5b96d7101f3..891f5fcb394 100644 --- a/xds/internal/balancer/priority/balancer_test.go +++ b/xds/internal/balancer/priority/balancer_test.go @@ -149,8 +149,6 @@ func (s) TestPriority_HighPriorityReady(t *testing.T) { } select { - case <-cc.NewPickerCh: - t.Fatalf("got unexpected new picker") case sc := <-cc.NewSubConnCh: t.Fatalf("got unexpected new SubConn: %s", sc) case sc := <-cc.RemoveSubConnCh: @@ -1886,3 +1884,109 @@ func (s) TestPriority_AddLowPriorityWhenHighIsInIdle(t *testing.T) { t.Fatalf("got unexpected call to NewSubConn with addr: %v, want %v", addrsNew, want) } } + +// Lower priority is used when higher priority is not ready; higher priority +// still gets updates. +// +// Init 0 and 1; 0 is down, 1 is up, use 1; update 0; 0 is up, use 0 +func (s) TestPriority_HighPriorityUpdatesWhenLowInUse(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + cc := testutils.NewTestClientConn(t) + bb := balancer.Get(Name) + pb := bb.Build(cc, balancer.BuildOptions{}) + defer pb.Close() + + t.Log("Two localities, with priorities [0, 1], each with one backend.") + if err := pb.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: resolver.State{ + Addresses: []resolver.Address{ + hierarchy.Set(resolver.Address{Addr: testBackendAddrStrs[0]}, []string{"child-0"}), + hierarchy.Set(resolver.Address{Addr: testBackendAddrStrs[1]}, []string{"child-1"}), + }, + }, + BalancerConfig: &LBConfig{ + Children: map[string]*Child{ + "child-0": {Config: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name}}, + "child-1": {Config: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name}}, + }, + Priorities: []string{"child-0", "child-1"}, + }, + }); err != nil { + t.Fatalf("failed to update ClientConn state: %v", err) + } + + addrs0 := <-cc.NewSubConnAddrsCh + if got, want := addrs0[0].Addr, testBackendAddrStrs[0]; got != want { + t.Fatalf("sc is created with addr %v, want %v", got, want) + } + sc0 := <-cc.NewSubConnCh + + t.Log("Make p0 fail.") + pb.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + pb.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure}) + + // Before 1 gets READY, picker should return NoSubConnAvailable, so RPCs + // will retry. + if err := cc.WaitForPickerWithErr(ctx, balancer.ErrNoSubConnAvailable); err != nil { + t.Fatal(err.Error()) + } + + t.Log("Make p1 ready.") + addrs1 := <-cc.NewSubConnAddrsCh + if got, want := addrs1[0].Addr, testBackendAddrStrs[1]; got != want { + t.Fatalf("sc is created with addr %v, want %v", got, want) + } + sc1 := <-cc.NewSubConnCh + pb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + pb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready}) + + // Test pick with 1. + if err := cc.WaitForRoundRobinPicker(ctx, sc1); err != nil { + t.Fatal(err.Error()) + } + + pb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + // Does not change the aggregate state, because round robin does not leave + // TRANIENT_FAILURE if a subconn goes CONNECTING. + pb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready}) + + if err := cc.WaitForRoundRobinPicker(ctx, sc1); err != nil { + t.Fatal(err.Error()) + } + + t.Log("Change p0 to use new address.") + if err := pb.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: resolver.State{ + Addresses: []resolver.Address{ + hierarchy.Set(resolver.Address{Addr: testBackendAddrStrs[2]}, []string{"child-0"}), + hierarchy.Set(resolver.Address{Addr: testBackendAddrStrs[1]}, []string{"child-1"}), + }, + }, + BalancerConfig: &LBConfig{ + Children: map[string]*Child{ + "child-0": {Config: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name}}, + "child-1": {Config: &internalserviceconfig.BalancerConfig{Name: roundrobin.Name}}, + }, + Priorities: []string{"child-0", "child-1"}, + }, + }); err != nil { + t.Fatalf("failed to update ClientConn state: %v", err) + } + + addrs2 := <-cc.NewSubConnAddrsCh + if got, want := addrs2[0].Addr, testBackendAddrStrs[2]; got != want { + t.Fatalf("sc is created with addr %v, want %v", got, want) + } + sc2 := <-cc.NewSubConnCh + + t.Log("Make p0 ready.") + pb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + pb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Ready}) + + // Test pick with 0. + if err := cc.WaitForRoundRobinPicker(ctx, sc2); err != nil { + t.Fatal(err.Error()) + } +} From 21dccebefaaaddedeb4ca7e6d9e7da4b759e0455 Mon Sep 17 00:00:00 2001 From: Doug Fawley Date: Thu, 16 Jun 2022 17:45:19 +0000 Subject: [PATCH 2/2] updates --- .../balancer/clusterimpl/balancer_test.go | 36 ++++++--- .../balancer/clusterresolver/eds_impl_test.go | 76 +++++++++++-------- xds/internal/balancer/priority/balancer.go | 21 +++-- .../balancer/priority/balancer_child.go | 2 +- .../balancer/priority/balancer_priority.go | 2 +- .../balancer/priority/balancer_test.go | 34 ++++++--- 6 files changed, 109 insertions(+), 62 deletions(-) diff --git a/xds/internal/balancer/clusterimpl/balancer_test.go b/xds/internal/balancer/clusterimpl/balancer_test.go index c8e6b55a588..a0085872a41 100644 --- a/xds/internal/balancer/clusterimpl/balancer_test.go +++ b/xds/internal/balancer/clusterimpl/balancer_test.go @@ -136,13 +136,15 @@ func (s) TestDropByCategory(t *testing.T) { sc1 := <-cc.NewSubConnCh b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) // This should get the connecting picker. - cc.WaitForPickerWithErr(ctx, balancer.ErrNoSubConnAvailable) + if err := cc.WaitForPickerWithErr(ctx, balancer.ErrNoSubConnAvailable); err != nil { + t.Fatal(err.Error()) + } b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready}) // Test pick with one backend. const rpcCount = 20 - cc.WaitForPicker(ctx, func(p balancer.Picker) error { + if err := cc.WaitForPicker(ctx, func(p balancer.Picker) error { for i := 0; i < rpcCount; i++ { gotSCSt, err := p.Pick(balancer.PickInfo{}) // Even RPCs are dropped. @@ -160,7 +162,9 @@ func (s) TestDropByCategory(t *testing.T) { } } return nil - }) + }); err != nil { + t.Fatal(err.Error()) + } // Dump load data from the store and compare with expected counts. loadStore := xdsC.LoadStore() @@ -207,7 +211,7 @@ func (s) TestDropByCategory(t *testing.T) { t.Fatalf("unexpected error from UpdateClientConnState: %v", err) } - cc.WaitForPicker(ctx, func(p balancer.Picker) error { + if err := cc.WaitForPicker(ctx, func(p balancer.Picker) error { for i := 0; i < rpcCount; i++ { gotSCSt, err := p.Pick(balancer.PickInfo{}) // Even RPCs are dropped. @@ -225,7 +229,9 @@ func (s) TestDropByCategory(t *testing.T) { } } return nil - }) + }); err != nil { + t.Fatal(err.Error()) + } const dropCount2 = rpcCount * dropNumerator2 / dropDenominator2 wantStatsData1 := []*load.Data{{ @@ -293,7 +299,7 @@ func (s) TestDropCircuitBreaking(t *testing.T) { b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready}) // Test pick with one backend. const rpcCount = 100 - cc.WaitForPicker(ctx, func(p balancer.Picker) error { + if err := cc.WaitForPicker(ctx, func(p balancer.Picker) error { dones := []func(){} for i := 0; i < rpcCount; i++ { gotSCSt, err := p.Pick(balancer.PickInfo{}) @@ -330,7 +336,9 @@ func (s) TestDropCircuitBreaking(t *testing.T) { } return nil - }) + }); err != nil { + t.Fatal(err.Error()) + } // Dump load data from the store and compare with expected counts. loadStore := xdsC.LoadStore() @@ -452,7 +460,9 @@ func (s) TestClusterNameInAddressAttributes(t *testing.T) { sc1 := <-cc.NewSubConnCh b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) // This should get the connecting picker. - cc.WaitForPickerWithErr(ctx, balancer.ErrNoSubConnAvailable) + if err := cc.WaitForPickerWithErr(ctx, balancer.ErrNoSubConnAvailable); err != nil { + t.Fatal(err.Error()) + } addrs1 := <-cc.NewSubConnAddrsCh if got, want := addrs1[0].Addr, testBackendAddrs[0].Addr; got != want { @@ -465,7 +475,9 @@ func (s) TestClusterNameInAddressAttributes(t *testing.T) { b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready}) // Test pick with one backend. - cc.WaitForRoundRobinPicker(ctx, sc1) + if err := cc.WaitForRoundRobinPicker(ctx, sc1); err != nil { + t.Fatal(err.Error()) + } const testClusterName2 = "test-cluster-2" var addr2 = resolver.Address{Addr: "2.2.2.2"} @@ -617,7 +629,7 @@ func (s) TestLoadReporting(t *testing.T) { // Test pick with one backend. const successCount = 5 const errorCount = 5 - cc.WaitForPicker(ctx, func(p balancer.Picker) error { + if err := cc.WaitForPicker(ctx, func(p balancer.Picker) error { for i := 0; i < successCount; i++ { gotSCSt, err := p.Pick(balancer.PickInfo{}) if !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) { @@ -633,7 +645,9 @@ func (s) TestLoadReporting(t *testing.T) { gotSCSt.Done(balancer.DoneInfo{Err: fmt.Errorf("error")}) } return nil - }) + }); err != nil { + t.Fatal(err.Error()) + } // Dump load data from the store and compare with expected counts. loadStore := xdsC.LoadStore() diff --git a/xds/internal/balancer/clusterresolver/eds_impl_test.go b/xds/internal/balancer/clusterresolver/eds_impl_test.go index 1dab8c9f8f5..ddafa18a610 100644 --- a/xds/internal/balancer/clusterresolver/eds_impl_test.go +++ b/xds/internal/balancer/clusterresolver/eds_impl_test.go @@ -484,20 +484,27 @@ func (s) TestEDS_CircuitBreaking(t *testing.T) { edsb.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready}) // Picks with drops. - cc.WaitForPicker(ctx, func(p balancer.Picker) error { + if err := cc.WaitForPicker(ctx, func(p balancer.Picker) error { dones := []func(){} + defer func() { + for _, f := range dones { + f() + } + }() + for i := 0; i < 100; i++ { pr, err := p.Pick(balancer.PickInfo{}) + if pr.Done != nil { + dones = append(dones, func() { + pr.Done(balancer.DoneInfo{}) + }) + } + if i < 50 && err != nil { return fmt.Errorf("The first 50%% picks should be non-drops, got error %v", err) } else if i > 50 && err == nil { return fmt.Errorf("The second 50%% picks should be drops, got error ") } - dones = append(dones, func() { - if pr.Done != nil { - pr.Done(balancer.DoneInfo{}) - } - }) } for _, done := range dones { @@ -508,22 +515,21 @@ func (s) TestEDS_CircuitBreaking(t *testing.T) { // Pick without drops. for i := 0; i < 50; i++ { pr, err := p.Pick(balancer.PickInfo{}) + if pr.Done != nil { + dones = append(dones, func() { + pr.Done(balancer.DoneInfo{}) + }) + } + if err != nil { return fmt.Errorf("The third 50%% picks should be non-drops, got error %v", err) } - dones = append(dones, func() { - if pr.Done != nil { - pr.Done(balancer.DoneInfo{}) - } - }) } - // Without this, future tests with the same service name will fail. - for _, done := range dones { - done() - } return nil - }) + }); err != nil { + t.Fatal(err.Error()) + } // Send another update, with only circuit breaking update (and no picker // update afterwards). Make sure the new picker uses the new configs. @@ -541,20 +547,26 @@ func (s) TestEDS_CircuitBreaking(t *testing.T) { } // Picks with drops. - cc.WaitForPicker(ctx, func(p balancer.Picker) error { + if err := cc.WaitForPicker(ctx, func(p balancer.Picker) error { dones := []func(){} + defer func() { + for _, f := range dones { + f() + } + }() + for i := 0; i < 100; i++ { pr, err := p.Pick(balancer.PickInfo{}) + if pr.Done != nil { + dones = append(dones, func() { + pr.Done(balancer.DoneInfo{}) + }) + } if i < 10 && err != nil { return fmt.Errorf("The first 10%% picks should be non-drops, got error %v", err) } else if i > 10 && err == nil { return fmt.Errorf("The next 90%% picks should be drops, got error ") } - dones = append(dones, func() { - if pr.Done != nil { - pr.Done(balancer.DoneInfo{}) - } - }) } for _, done := range dones { @@ -565,20 +577,18 @@ func (s) TestEDS_CircuitBreaking(t *testing.T) { // Pick without drops. for i := 0; i < 10; i++ { pr, err := p.Pick(balancer.PickInfo{}) + if pr.Done != nil { + dones = append(dones, func() { + pr.Done(balancer.DoneInfo{}) + }) + } + if err != nil { return fmt.Errorf("The next 10%% picks should be non-drops, got error %v", err) } - dones = append(dones, func() { - if pr.Done != nil { - pr.Done(balancer.DoneInfo{}) - } - }) - } - - // Without this, future tests with the same service name will fail. - for _, done := range dones { - done() } return nil - }) + }); err != nil { + t.Fatal(err.Error()) + } } diff --git a/xds/internal/balancer/priority/balancer.go b/xds/internal/balancer/priority/balancer.go index e15f7838993..d05ef18c287 100644 --- a/xds/internal/balancer/priority/balancer.go +++ b/xds/internal/balancer/priority/balancer.go @@ -101,7 +101,7 @@ type priorityBalancer struct { // an update of the aggregated picker to the parent. Cleared after all // sub-balancers have finished UpdateClientConnState, after which // syncPriority is called manually. - inhibitChildUpdates bool + inhibitPickerUpdates bool } func (b *priorityBalancer) UpdateClientConnState(s balancer.ClientConnState) error { @@ -176,12 +176,19 @@ func (b *priorityBalancer) UpdateClientConnState(s balancer.ClientConnState) err return nil } - b.inhibitChildUpdates = true - b.mu.Unlock() // This will sync the states of all children to the new updated // priorities. Includes starting/stopping child balancers when necessary. + // Block picker updates until all children have had a chance to call + // UpdateState to prevent races where, e.g., the active priority reports + // transient failure but a higher priority may have reported something that + // made it active, and if the transient failure update is handled first, + // RPCs could fail. + b.inhibitPickerUpdates = true + // Add an item to queue to notify us when the current items in the queue + // are done and syncPriority has been called. done := make(chan struct{}) - b.childBalancerStateUpdate.Put(clearInhibitChildUpdates{done: done}) + b.childBalancerStateUpdate.Put(resumePickerUpdates{done: done}) + b.mu.Unlock() <-done return nil @@ -229,7 +236,7 @@ type childBalancerState struct { s balancer.State } -type clearInhibitChildUpdates struct { +type resumePickerUpdates struct { done chan struct{} } @@ -251,8 +258,8 @@ func (b *priorityBalancer) run() { switch s := u.(type) { case childBalancerState: b.handleChildStateUpdate(s.name, s.s) - case clearInhibitChildUpdates: - b.inhibitChildUpdates = false + case resumePickerUpdates: + b.inhibitPickerUpdates = false b.syncPriority("") close(s.done) } diff --git a/xds/internal/balancer/priority/balancer_child.go b/xds/internal/balancer/priority/balancer_child.go index ba50404138e..34bab34c915 100644 --- a/xds/internal/balancer/priority/balancer_child.go +++ b/xds/internal/balancer/priority/balancer_child.go @@ -75,7 +75,7 @@ func (cb *childBalancer) updateBuilder(bb balancer.Builder) { } // updateConfig sets childBalancer's config and state, but doesn't send update to -// the child balancer. +// the child balancer unless it is started. func (cb *childBalancer) updateConfig(child *Child, rState resolver.State) { cb.ignoreReresolutionRequests = child.IgnoreReresolutionRequests cb.config = child.Config.Config diff --git a/xds/internal/balancer/priority/balancer_priority.go b/xds/internal/balancer/priority/balancer_priority.go index ec52fd3cf7e..33068709e29 100644 --- a/xds/internal/balancer/priority/balancer_priority.go +++ b/xds/internal/balancer/priority/balancer_priority.go @@ -68,7 +68,7 @@ var ( // // Caller must hold b.mu. func (b *priorityBalancer) syncPriority(childUpdating string) { - if b.inhibitChildUpdates { + if b.inhibitPickerUpdates { return } for p, name := range b.priorities { diff --git a/xds/internal/balancer/priority/balancer_test.go b/xds/internal/balancer/priority/balancer_test.go index 891f5fcb394..ccf3a5edfc2 100644 --- a/xds/internal/balancer/priority/balancer_test.go +++ b/xds/internal/balancer/priority/balancer_test.go @@ -1961,7 +1961,7 @@ func (s) TestPriority_HighPriorityUpdatesWhenLowInUse(t *testing.T) { ResolverState: resolver.State{ Addresses: []resolver.Address{ hierarchy.Set(resolver.Address{Addr: testBackendAddrStrs[2]}, []string{"child-0"}), - hierarchy.Set(resolver.Address{Addr: testBackendAddrStrs[1]}, []string{"child-1"}), + hierarchy.Set(resolver.Address{Addr: testBackendAddrStrs[3]}, []string{"child-1"}), }, }, BalancerConfig: &LBConfig{ @@ -1975,15 +1975,31 @@ func (s) TestPriority_HighPriorityUpdatesWhenLowInUse(t *testing.T) { t.Fatalf("failed to update ClientConn state: %v", err) } - addrs2 := <-cc.NewSubConnAddrsCh - if got, want := addrs2[0].Addr, testBackendAddrStrs[2]; got != want { - t.Fatalf("sc is created with addr %v, want %v", got, want) + // Two new subconns are created by the previous update; one by p0 and one + // by p1. They don't happen concurrently, but they could happen in any + // order. + t.Log("Make p0 and p1 both ready; p0 should be used.") + var sc2, sc3 balancer.SubConn + for i := 0; i < 2; i++ { + addr := <-cc.NewSubConnAddrsCh + sc := <-cc.NewSubConnCh + switch addr[0].Addr { + case testBackendAddrStrs[2]: + sc2 = sc + case testBackendAddrStrs[3]: + sc3 = sc + default: + t.Fatalf("sc is created with addr %v, want %v or %v", addr[0].Addr, testBackendAddrStrs[2], testBackendAddrStrs[3]) + } + pb.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + pb.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Ready}) + } + if sc2 == nil { + t.Fatalf("sc not created with addr %v", testBackendAddrStrs[2]) + } + if sc3 == nil { + t.Fatalf("sc not created with addr %v", testBackendAddrStrs[3]) } - sc2 := <-cc.NewSubConnCh - - t.Log("Make p0 ready.") - pb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) - pb.UpdateSubConnState(sc2, balancer.SubConnState{ConnectivityState: connectivity.Ready}) // Test pick with 0. if err := cc.WaitForRoundRobinPicker(ctx, sc2); err != nil {