diff --git a/balancer_conn_wrappers_test.go b/balancer_conn_wrappers_test.go deleted file mode 100644 index 935d11d1d39..00000000000 --- a/balancer_conn_wrappers_test.go +++ /dev/null @@ -1,90 +0,0 @@ -/* - * - * Copyright 2019 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package grpc - -import ( - "fmt" - "net" - "testing" - - "google.golang.org/grpc/balancer" - "google.golang.org/grpc/balancer/roundrobin" - "google.golang.org/grpc/internal/balancer/stub" - "google.golang.org/grpc/resolver" - "google.golang.org/grpc/resolver/manual" -) - -// TestBalancerErrorResolverPolling injects balancer errors and verifies -// ResolveNow is called on the resolver with the appropriate backoff strategy -// being consulted between ResolveNow calls. -func (s) TestBalancerErrorResolverPolling(t *testing.T) { - // The test balancer will return ErrBadResolverState iff the - // ClientConnState contains no addresses. - bf := stub.BalancerFuncs{ - UpdateClientConnState: func(_ *stub.BalancerData, s balancer.ClientConnState) error { - if len(s.ResolverState.Addresses) == 0 { - return balancer.ErrBadResolverState - } - return nil - }, - } - const balName = "BalancerErrorResolverPolling" - stub.Register(balName, bf) - - testResolverErrorPolling(t, - func(r *manual.Resolver) { - // No addresses so the balancer will fail. - r.CC.UpdateState(resolver.State{}) - }, func(r *manual.Resolver) { - // UpdateState will block if ResolveNow is being called (which blocks on - // rn), so call it in a goroutine. Include some address so the balancer - // will be happy. - go r.CC.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "x"}}}) - }, - WithDefaultServiceConfig(fmt.Sprintf(`{ "loadBalancingConfig": [{"%v": {}}] }`, balName))) -} - -// TestRoundRobinZeroAddressesResolverPolling reports no addresses to the round -// robin balancer and verifies ResolveNow is called on the resolver with the -// appropriate backoff strategy being consulted between ResolveNow calls. -func (s) TestRoundRobinZeroAddressesResolverPolling(t *testing.T) { - // We need to start a real server or else the connecting loop will call - // ResolveNow after every iteration, even after a valid resolver result is - // returned. - lis, err := net.Listen("tcp", "localhost:0") - if err != nil { - t.Fatalf("Error while listening. Err: %v", err) - } - defer lis.Close() - s := NewServer() - defer s.Stop() - go s.Serve(lis) - - testResolverErrorPolling(t, - func(r *manual.Resolver) { - // No addresses so the balancer will fail. - r.CC.UpdateState(resolver.State{}) - }, func(r *manual.Resolver) { - // UpdateState will block if ResolveNow is being called (which - // blocks on rn), so call it in a goroutine. Include a valid - // address so the balancer will be happy. - go r.CC.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: lis.Addr().String()}}}) - }, - WithDefaultServiceConfig(fmt.Sprintf(`{ "loadBalancingConfig": [{"%v": {}}] }`, roundrobin.Name))) -} diff --git a/dialoptions.go b/dialoptions.go index e7f86e6d7c8..7a497237bbd 100644 --- a/dialoptions.go +++ b/dialoptions.go @@ -66,11 +66,7 @@ type dialOptions struct { minConnectTimeout func() time.Duration defaultServiceConfig *ServiceConfig // defaultServiceConfig is parsed from defaultServiceConfigRawJSON. defaultServiceConfigRawJSON *string - // This is used by ccResolverWrapper to backoff between successive calls to - // resolver.ResolveNow(). The user will have no need to configure this, but - // we need to be able to configure this in tests. - resolveNowBackoff func(int) time.Duration - resolvers []resolver.Builder + resolvers []resolver.Builder } // DialOption configures how we set up the connection. @@ -596,7 +592,6 @@ func defaultDialOptions() dialOptions { ReadBufferSize: defaultReadBufSize, UseProxy: true, }, - resolveNowBackoff: internalbackoff.DefaultExponential.Backoff, } } @@ -611,16 +606,6 @@ func withMinConnectDeadline(f func() time.Duration) DialOption { }) } -// withResolveNowBackoff specifies the function that clientconn uses to backoff -// between successive calls to resolver.ResolveNow(). -// -// For testing purpose only. -func withResolveNowBackoff(f func(int) time.Duration) DialOption { - return newFuncDialOption(func(o *dialOptions) { - o.resolveNowBackoff = f - }) -} - // WithResolvers allows a list of resolver implementations to be registered // locally with the ClientConn without needing to be globally registered via // resolver.Register. They will be matched against the scheme used for the diff --git a/internal/resolver/dns/dns_resolver.go b/internal/resolver/dns/dns_resolver.go index 30423556658..9d86460ab6f 100644 --- a/internal/resolver/dns/dns_resolver.go +++ b/internal/resolver/dns/dns_resolver.go @@ -34,6 +34,7 @@ import ( grpclbstate "google.golang.org/grpc/balancer/grpclb/state" "google.golang.org/grpc/grpclog" + "google.golang.org/grpc/internal/backoff" "google.golang.org/grpc/internal/envconfig" "google.golang.org/grpc/internal/grpcrand" "google.golang.org/grpc/resolver" @@ -46,6 +47,9 @@ var EnableSRVLookups = false var logger = grpclog.Component("dns") +// A global to stub out in tests. +var newTimer = time.NewTimer + func init() { resolver.Register(NewBuilder()) } @@ -143,7 +147,6 @@ func (b *dnsBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts d.wg.Add(1) go d.watcher() - d.ResolveNow(resolver.ResolveNowOptions{}) return d, nil } @@ -201,28 +204,38 @@ func (d *dnsResolver) Close() { func (d *dnsResolver) watcher() { defer d.wg.Done() + backoffIndex := 1 for { - select { - case <-d.ctx.Done(): - return - case <-d.rn: - } - state, err := d.lookup() if err != nil { + // Report error to the underlying grpc.ClientConn. d.cc.ReportError(err) } else { - d.cc.UpdateState(*state) + err = d.cc.UpdateState(*state) } - // Sleep to prevent excessive re-resolutions. Incoming resolution requests - // will be queued in d.rn. - t := time.NewTimer(minDNSResRate) + var timer *time.Timer + if err == nil { + // Success resolving, wait for the next ResolveNow. However, also wait 30 seconds at the very least + // to prevent constantly re-resolving. + backoffIndex = 1 + timer = time.NewTimer(minDNSResRate) + select { + case <-d.ctx.Done(): + timer.Stop() + return + case <-d.rn: + } + } else { + // Poll on an error found in DNS Resolver or an error received from ClientConn. + timer = newTimer(backoff.DefaultExponential.Backoff(backoffIndex)) + backoffIndex++ + } select { - case <-t.C: case <-d.ctx.Done(): - t.Stop() + timer.Stop() return + case <-timer.C: } } } diff --git a/internal/resolver/dns/dns_resolver_test.go b/internal/resolver/dns/dns_resolver_test.go index 1c8469a275a..52067e39cc6 100644 --- a/internal/resolver/dns/dns_resolver_test.go +++ b/internal/resolver/dns/dns_resolver_test.go @@ -30,9 +30,11 @@ import ( "testing" "time" + "google.golang.org/grpc/balancer" grpclbstate "google.golang.org/grpc/balancer/grpclb/state" "google.golang.org/grpc/internal/envconfig" "google.golang.org/grpc/internal/leakcheck" + "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/resolver" "google.golang.org/grpc/serviceconfig" ) @@ -47,7 +49,8 @@ func TestMain(m *testing.M) { } const ( - txtBytesLimit = 255 + txtBytesLimit = 255 + defaultTestTimeout = 10 * time.Second ) type testClientConn struct { @@ -57,13 +60,17 @@ type testClientConn struct { state resolver.State updateStateCalls int errChan chan error + updateStateErr error } -func (t *testClientConn) UpdateState(s resolver.State) { +func (t *testClientConn) UpdateState(s resolver.State) error { t.m1.Lock() defer t.m1.Unlock() t.state = s t.updateStateCalls++ + // This error determines whether DNS Resolver actually decides to exponentially backoff or not. + // This can be any error. + return t.updateStateErr } func (t *testClientConn) getState() (resolver.State, int) { @@ -669,6 +676,13 @@ func TestResolve(t *testing.T) { func testDNSResolver(t *testing.T) { defer leakcheck.Check(t) + defer func(nt func(d time.Duration) *time.Timer) { + newTimer = nt + }(newTimer) + newTimer = func(_ time.Duration) *time.Timer { + // Will never fire on its own, will protect from triggering exponential backoff. + return time.NewTimer(time.Hour) + } tests := []struct { target string addrWant []resolver.Address @@ -736,12 +750,151 @@ func testDNSResolver(t *testing.T) { } } +// DNS Resolver immediately starts polling on an error from grpc. This should continue until the ClientConn doesn't +// send back an error from updating the DNS Resolver's state. +func TestDNSResolverExponentialBackoff(t *testing.T) { + defer leakcheck.Check(t) + defer func(nt func(d time.Duration) *time.Timer) { + newTimer = nt + }(newTimer) + timerChan := testutils.NewChannel() + newTimer = func(d time.Duration) *time.Timer { + // Will never fire on its own, allows this test to call timer immediately. + t := time.NewTimer(time.Hour) + timerChan.Send(t) + return t + } + tests := []struct { + name string + target string + addrWant []resolver.Address + scWant string + }{ + { + "happy case default port", + "foo.bar.com", + []resolver.Address{{Addr: "1.2.3.4" + colonDefaultPort}, {Addr: "5.6.7.8" + colonDefaultPort}}, + generateSC("foo.bar.com"), + }, + { + "happy case specified port", + "foo.bar.com:1234", + []resolver.Address{{Addr: "1.2.3.4:1234"}, {Addr: "5.6.7.8:1234"}}, + generateSC("foo.bar.com"), + }, + { + "happy case another default port", + "srv.ipv4.single.fake", + []resolver.Address{{Addr: "2.4.6.8" + colonDefaultPort}}, + generateSC("srv.ipv4.single.fake"), + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + b := NewBuilder() + cc := &testClientConn{target: test.target} + // Cause ClientConn to return an error. + cc.updateStateErr = balancer.ErrBadResolverState + r, err := b.Build(resolver.Target{Endpoint: test.target}, cc, resolver.BuildOptions{}) + if err != nil { + t.Fatalf("Error building resolver for target %v: %v", test.target, err) + } + var state resolver.State + var cnt int + for i := 0; i < 2000; i++ { + state, cnt = cc.getState() + if cnt > 0 { + break + } + time.Sleep(time.Millisecond) + } + if cnt == 0 { + t.Fatalf("UpdateState not called after 2s; aborting") + } + if !reflect.DeepEqual(test.addrWant, state.Addresses) { + t.Errorf("Resolved addresses of target: %q = %+v, want %+v", test.target, state.Addresses, test.addrWant) + } + sc := scFromState(state) + if test.scWant != sc { + t.Errorf("Resolved service config of target: %q = %+v, want %+v", test.target, sc, test.scWant) + } + ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer ctxCancel() + // Cause timer to go off 10 times, and see if it calls updateState() correctly. + for i := 0; i < 10; i++ { + timer, err := timerChan.Receive(ctx) + if err != nil { + t.Fatalf("Error receiving timer from mock NewTimer call: %v", err) + } + timerPointer := timer.(*time.Timer) + timerPointer.Reset(0) + } + // Poll to see if DNS Resolver updated state the correct number of times, which allows time for the DNS Resolver to call + // ClientConn update state. + deadline := time.Now().Add(defaultTestTimeout) + for { + cc.m1.Lock() + got := cc.updateStateCalls + cc.m1.Unlock() + if got == 11 { + break + } + + if time.Now().After(deadline) { + t.Fatalf("Exponential backoff is not working as expected - should update state 11 times instead of %d", got) + } + + time.Sleep(time.Millisecond) + } + + // Update resolver.ClientConn to not return an error anymore - this should stop it from backing off. + cc.updateStateErr = nil + timer, err := timerChan.Receive(ctx) + if err != nil { + t.Fatalf("Error receiving timer from mock NewTimer call: %v", err) + } + timerPointer := timer.(*time.Timer) + timerPointer.Reset(0) + // Poll to see if DNS Resolver updated state the correct number of times, which allows time for the DNS Resolver to call + // ClientConn update state the final time. The DNS Resolver should then stop polling. + deadline = time.Now().Add(defaultTestTimeout) + for { + cc.m1.Lock() + got := cc.updateStateCalls + cc.m1.Unlock() + if got == 12 { + break + } + + if time.Now().After(deadline) { + t.Fatalf("Exponential backoff is not working as expected - should stop backing off at 12 total UpdateState calls instead of %d", got) + } + + _, err := timerChan.ReceiveOrFail() + if err { + t.Fatalf("Should not poll again after Client Conn stops returning error.") + } + + time.Sleep(time.Millisecond) + } + r.Close() + }) + } +} + func testDNSResolverWithSRV(t *testing.T) { EnableSRVLookups = true defer func() { EnableSRVLookups = false }() defer leakcheck.Check(t) + defer func(nt func(d time.Duration) *time.Timer) { + newTimer = nt + }(newTimer) + newTimer = func(_ time.Duration) *time.Timer { + // Will never fire on its own, will protect from triggering exponential backoff. + return time.NewTimer(time.Hour) + } tests := []struct { target string addrWant []resolver.Address @@ -855,6 +1008,13 @@ func mutateTbl(target string) func() { func testDNSResolveNow(t *testing.T) { defer leakcheck.Check(t) + defer func(nt func(d time.Duration) *time.Timer) { + newTimer = nt + }(newTimer) + newTimer = func(_ time.Duration) *time.Timer { + // Will never fire on its own, will protect from triggering exponential backoff. + return time.NewTimer(time.Hour) + } tests := []struct { target string addrWant []resolver.Address @@ -926,6 +1086,13 @@ const colonDefaultPort = ":" + defaultPort func testIPResolver(t *testing.T) { defer leakcheck.Check(t) + defer func(nt func(d time.Duration) *time.Timer) { + newTimer = nt + }(newTimer) + newTimer = func(_ time.Duration) *time.Timer { + // Will never fire on its own, will protect from triggering exponential backoff. + return time.NewTimer(time.Hour) + } tests := []struct { target string want []resolver.Address @@ -975,6 +1142,13 @@ func testIPResolver(t *testing.T) { func TestResolveFunc(t *testing.T) { defer leakcheck.Check(t) + defer func(nt func(d time.Duration) *time.Timer) { + newTimer = nt + }(newTimer) + newTimer = func(d time.Duration) *time.Timer { + // Will never fire on its own, will protect from triggering exponential backoff. + return time.NewTimer(time.Hour) + } tests := []struct { addr string want error @@ -1013,6 +1187,13 @@ func TestResolveFunc(t *testing.T) { func TestDisableServiceConfig(t *testing.T) { defer leakcheck.Check(t) + defer func(nt func(d time.Duration) *time.Timer) { + newTimer = nt + }(newTimer) + newTimer = func(d time.Duration) *time.Timer { + // Will never fire on its own, will protect from triggering exponential backoff. + return time.NewTimer(time.Hour) + } tests := []struct { target string scWant string @@ -1059,6 +1240,13 @@ func TestDisableServiceConfig(t *testing.T) { func TestTXTError(t *testing.T) { defer leakcheck.Check(t) + defer func(nt func(d time.Duration) *time.Timer) { + newTimer = nt + }(newTimer) + newTimer = func(d time.Duration) *time.Timer { + // Will never fire on its own, will protect from triggering exponential backoff. + return time.NewTimer(time.Hour) + } defer func(v bool) { envconfig.TXTErrIgnore = v }(envconfig.TXTErrIgnore) for _, ignore := range []bool{false, true} { envconfig.TXTErrIgnore = ignore @@ -1090,6 +1278,13 @@ func TestTXTError(t *testing.T) { } func TestDNSResolverRetry(t *testing.T) { + defer func(nt func(d time.Duration) *time.Timer) { + newTimer = nt + }(newTimer) + newTimer = func(d time.Duration) *time.Timer { + // Will never fire on its own, will protect from triggering exponential backoff. + return time.NewTimer(time.Hour) + } b := NewBuilder() target := "ipv4.single.fake" cc := &testClientConn{target: target} @@ -1144,6 +1339,13 @@ func TestDNSResolverRetry(t *testing.T) { func TestCustomAuthority(t *testing.T) { defer leakcheck.Check(t) + defer func(nt func(d time.Duration) *time.Timer) { + newTimer = nt + }(newTimer) + newTimer = func(d time.Duration) *time.Timer { + // Will never fire on its own, will protect from triggering exponential backoff. + return time.NewTimer(time.Hour) + } tests := []struct { authority string @@ -1251,6 +1453,13 @@ func TestCustomAuthority(t *testing.T) { // requests are made. func TestRateLimitedResolve(t *testing.T) { defer leakcheck.Check(t) + defer func(nt func(d time.Duration) *time.Timer) { + newTimer = nt + }(newTimer) + newTimer = func(d time.Duration) *time.Timer { + // Will never fire on its own, will protect from triggering exponential backoff. + return time.NewTimer(time.Hour) + } const dnsResRate = 10 * time.Millisecond dc := replaceDNSResRate(dnsResRate) @@ -1347,21 +1556,66 @@ func TestRateLimitedResolve(t *testing.T) { } } +// DNS Resolver immediately starts polling on an error. This will cause the re-resolution to return another error. +// Thus, test that it constantly sends errors to the grpc.ClientConn. func TestReportError(t *testing.T) { const target = "notfoundaddress" + defer func(nt func(d time.Duration) *time.Timer) { + newTimer = nt + }(newTimer) + timerChan := testutils.NewChannel() + newTimer = func(d time.Duration) *time.Timer { + // Will never fire on its own, allows this test to call timer immediately. + t := time.NewTimer(time.Hour) + timerChan.Send(t) + return t + } cc := &testClientConn{target: target, errChan: make(chan error)} + totalTimesCalledError := 0 b := NewBuilder() r, err := b.Build(resolver.Target{Endpoint: target}, cc, resolver.BuildOptions{}) if err != nil { - t.Fatalf("%v\n", err) + t.Fatalf("Error building resolver for target %v: %v", target, err) + } + // Should receive first error. + err = <-cc.errChan + if !strings.Contains(err.Error(), "hostLookup error") { + t.Fatalf(`ReportError(err=%v) called; want err contains "hostLookupError"`, err) } + totalTimesCalledError++ + ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer ctxCancel() + timer, err := timerChan.Receive(ctx) + if err != nil { + t.Fatalf("Error receiving timer from mock NewTimer call: %v", err) + } + timerPointer := timer.(*time.Timer) + timerPointer.Reset(0) defer r.Close() - select { - case err := <-cc.errChan: + + // Cause timer to go off 10 times, and see if it matches DNS Resolver updating Error. + for i := 0; i < 10; i++ { + // Should call ReportError(). + err = <-cc.errChan if !strings.Contains(err.Error(), "hostLookup error") { t.Fatalf(`ReportError(err=%v) called; want err contains "hostLookupError"`, err) } - case <-time.After(time.Second): - t.Fatalf("did not receive error after 1s") + totalTimesCalledError++ + timer, err := timerChan.Receive(ctx) + if err != nil { + t.Fatalf("Error receiving timer from mock NewTimer call: %v", err) + } + timerPointer := timer.(*time.Timer) + timerPointer.Reset(0) + } + + if totalTimesCalledError != 11 { + t.Errorf("ReportError() not called 11 times, instead called %d times.", totalTimesCalledError) + } + // Clean up final watcher iteration. + <-cc.errChan + _, err = timerChan.Receive(ctx) + if err != nil { + t.Fatalf("Error receiving timer from mock NewTimer call: %v", err) } } diff --git a/resolver/resolver.go b/resolver/resolver.go index e9fa8e33d92..6a9d234a597 100644 --- a/resolver/resolver.go +++ b/resolver/resolver.go @@ -181,7 +181,7 @@ type State struct { // gRPC to add new methods to this interface. type ClientConn interface { // UpdateState updates the state of the ClientConn appropriately. - UpdateState(State) + UpdateState(State) error // ReportError notifies the ClientConn that the Resolver encountered an // error. The ClientConn will notify the load balancer and begin calling // ResolveNow on the Resolver with exponential backoff. diff --git a/resolver_conn_wrapper.go b/resolver_conn_wrapper.go index f2d81968f9e..4118de571ab 100644 --- a/resolver_conn_wrapper.go +++ b/resolver_conn_wrapper.go @@ -22,7 +22,6 @@ import ( "fmt" "strings" "sync" - "time" "google.golang.org/grpc/balancer" "google.golang.org/grpc/credentials" @@ -40,9 +39,6 @@ type ccResolverWrapper struct { resolver resolver.Resolver done *grpcsync.Event curState resolver.State - - pollingMu sync.Mutex - polling chan struct{} } // newCCResolverWrapper uses the resolver.Builder to build a Resolver and @@ -93,59 +89,19 @@ func (ccr *ccResolverWrapper) close() { ccr.resolverMu.Unlock() } -// poll begins or ends asynchronous polling of the resolver based on whether -// err is ErrBadResolverState. -func (ccr *ccResolverWrapper) poll(err error) { - ccr.pollingMu.Lock() - defer ccr.pollingMu.Unlock() - if err != balancer.ErrBadResolverState { - // stop polling - if ccr.polling != nil { - close(ccr.polling) - ccr.polling = nil - } - return - } - if ccr.polling != nil { - // already polling - return - } - p := make(chan struct{}) - ccr.polling = p - go func() { - for i := 0; ; i++ { - ccr.resolveNow(resolver.ResolveNowOptions{}) - t := time.NewTimer(ccr.cc.dopts.resolveNowBackoff(i)) - select { - case <-p: - t.Stop() - return - case <-ccr.done.Done(): - // Resolver has been closed. - t.Stop() - return - case <-t.C: - select { - case <-p: - return - default: - } - // Timer expired; re-resolve. - } - } - }() -} - -func (ccr *ccResolverWrapper) UpdateState(s resolver.State) { +func (ccr *ccResolverWrapper) UpdateState(s resolver.State) error { if ccr.done.HasFired() { - return + return nil } channelz.Infof(logger, ccr.cc.channelzID, "ccResolverWrapper: sending update to cc: %v", s) if channelz.IsOn() { ccr.addChannelzTraceEvent(s) } ccr.curState = s - ccr.poll(ccr.cc.updateResolverState(ccr.curState, nil)) + if err := ccr.cc.updateResolverState(ccr.curState, nil); err == balancer.ErrBadResolverState { + return balancer.ErrBadResolverState + } + return nil } func (ccr *ccResolverWrapper) ReportError(err error) { @@ -153,7 +109,7 @@ func (ccr *ccResolverWrapper) ReportError(err error) { return } channelz.Warningf(logger, ccr.cc.channelzID, "ccResolverWrapper: reporting error to cc: %v", err) - ccr.poll(ccr.cc.updateResolverState(resolver.State{}, err)) + ccr.cc.updateResolverState(resolver.State{}, err) } // NewAddress is called by the resolver implementation to send addresses to gRPC. @@ -166,7 +122,7 @@ func (ccr *ccResolverWrapper) NewAddress(addrs []resolver.Address) { ccr.addChannelzTraceEvent(resolver.State{Addresses: addrs, ServiceConfig: ccr.curState.ServiceConfig}) } ccr.curState.Addresses = addrs - ccr.poll(ccr.cc.updateResolverState(ccr.curState, nil)) + ccr.cc.updateResolverState(ccr.curState, nil) } // NewServiceConfig is called by the resolver implementation to send service @@ -183,14 +139,13 @@ func (ccr *ccResolverWrapper) NewServiceConfig(sc string) { scpr := parseServiceConfig(sc) if scpr.Err != nil { channelz.Warningf(logger, ccr.cc.channelzID, "ccResolverWrapper: error parsing service config: %v", scpr.Err) - ccr.poll(balancer.ErrBadResolverState) return } if channelz.IsOn() { ccr.addChannelzTraceEvent(resolver.State{Addresses: ccr.curState.Addresses, ServiceConfig: scpr}) } ccr.curState.ServiceConfig = scpr - ccr.poll(ccr.cc.updateResolverState(ccr.curState, nil)) + ccr.cc.updateResolverState(ccr.curState, nil) } func (ccr *ccResolverWrapper) ParseServiceConfig(scJSON string) *serviceconfig.ParseResult { diff --git a/resolver_conn_wrapper_test.go b/resolver_conn_wrapper_test.go index f13a408937b..81c5b9ea874 100644 --- a/resolver_conn_wrapper_test.go +++ b/resolver_conn_wrapper_test.go @@ -67,62 +67,6 @@ func (s) TestDialParseTargetUnknownScheme(t *testing.T) { } } -func testResolverErrorPolling(t *testing.T, badUpdate func(*manual.Resolver), goodUpdate func(*manual.Resolver), dopts ...DialOption) { - boIter := make(chan int) - resolverBackoff := func(v int) time.Duration { - boIter <- v - return 0 - } - - r := manual.NewBuilderWithScheme("whatever") - rn := make(chan struct{}) - defer func() { close(rn) }() - r.ResolveNowCallback = func(resolver.ResolveNowOptions) { rn <- struct{}{} } - - defaultDialOptions := []DialOption{ - WithInsecure(), - WithResolvers(r), - withResolveNowBackoff(resolverBackoff), - } - cc, err := Dial(r.Scheme()+":///test.server", append(defaultDialOptions, dopts...)...) - if err != nil { - t.Fatalf("Dial(_, _) = _, %v; want _, nil", err) - } - defer cc.Close() - badUpdate(r) - - panicAfter := time.AfterFunc(5*time.Second, func() { panic("timed out polling resolver") }) - defer panicAfter.Stop() - - // Ensure ResolveNow is called, then Backoff with the right parameter, several times - for i := 0; i < 7; i++ { - <-rn - if v := <-boIter; v != i { - t.Errorf("Backoff call %v uses value %v", i, v) - } - } - - // UpdateState will block if ResolveNow is being called (which blocks on - // rn), so call it in a goroutine. - goodUpdate(r) - - // Wait awhile to ensure ResolveNow and Backoff stop being called when the - // state is OK (i.e. polling was cancelled). - for { - t := time.NewTimer(50 * time.Millisecond) - select { - case <-rn: - // ClientConn is still calling ResolveNow - <-boIter - time.Sleep(5 * time.Millisecond) - continue - case <-t.C: - // ClientConn stopped calling ResolveNow; success - } - break - } -} - const happyBalancerName = "happy balancer" func init() { @@ -136,35 +80,6 @@ func init() { stub.Register(happyBalancerName, bf) } -// TestResolverErrorPolling injects resolver errors and verifies ResolveNow is -// called with the appropriate backoff strategy being consulted between -// ResolveNow calls. -func (s) TestResolverErrorPolling(t *testing.T) { - testResolverErrorPolling(t, func(r *manual.Resolver) { - r.CC.ReportError(errors.New("res err")) - }, func(r *manual.Resolver) { - // UpdateState will block if ResolveNow is being called (which blocks on - // rn), so call it in a goroutine. - go r.CC.UpdateState(resolver.State{}) - }, - WithDefaultServiceConfig(fmt.Sprintf(`{ "loadBalancingConfig": [{"%v": {}}] }`, happyBalancerName))) -} - -// TestServiceConfigErrorPolling injects a service config error and verifies -// ResolveNow is called with the appropriate backoff strategy being consulted -// between ResolveNow calls. -func (s) TestServiceConfigErrorPolling(t *testing.T) { - testResolverErrorPolling(t, func(r *manual.Resolver) { - badsc := r.CC.ParseServiceConfig("bad config") - r.UpdateState(resolver.State{ServiceConfig: badsc}) - }, func(r *manual.Resolver) { - // UpdateState will block if ResolveNow is being called (which blocks on - // rn), so call it in a goroutine. - go r.CC.UpdateState(resolver.State{}) - }, - WithDefaultServiceConfig(fmt.Sprintf(`{ "loadBalancingConfig": [{"%v": {}}] }`, happyBalancerName))) -} - // TestResolverErrorInBuild makes the resolver.Builder call into the ClientConn // during the Build call. We use two separate mutexes in the code which make // sure there is no data race in this code path, and also that there is no diff --git a/test/balancer_test.go b/test/balancer_test.go index bc22036dbac..a6a8f726afa 100644 --- a/test/balancer_test.go +++ b/test/balancer_test.go @@ -37,7 +37,6 @@ import ( "google.golang.org/grpc/credentials" "google.golang.org/grpc/internal/balancer/stub" "google.golang.org/grpc/internal/balancerload" - "google.golang.org/grpc/internal/grpcsync" "google.golang.org/grpc/internal/grpcutil" imetadata "google.golang.org/grpc/internal/metadata" "google.golang.org/grpc/internal/stubserver" @@ -698,10 +697,7 @@ func (s) TestEmptyAddrs(t *testing.T) { // Initialize pickfirst client pfr := manual.NewBuilderWithScheme("whatever") - pfrnCalled := grpcsync.NewEvent() - pfr.ResolveNowCallback = func(resolver.ResolveNowOptions) { - pfrnCalled.Fire() - } + pfr.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: lis.Addr().String()}}}) pfcc, err := grpc.DialContext(ctx, pfr.Scheme()+":///", grpc.WithInsecure(), grpc.WithResolvers(pfr)) @@ -718,16 +714,10 @@ func (s) TestEmptyAddrs(t *testing.T) { // Remove all addresses. pfr.UpdateState(resolver.State{}) - // Wait for a ResolveNow call on the pick first client's resolver. - <-pfrnCalled.Done() // Initialize roundrobin client rrr := manual.NewBuilderWithScheme("whatever") - rrrnCalled := grpcsync.NewEvent() - rrr.ResolveNowCallback = func(resolver.ResolveNowOptions) { - rrrnCalled.Fire() - } rrr.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: lis.Addr().String()}}}) rrcc, err := grpc.DialContext(ctx, rrr.Scheme()+":///", grpc.WithInsecure(), grpc.WithResolvers(rrr), @@ -745,8 +735,6 @@ func (s) TestEmptyAddrs(t *testing.T) { // Remove all addresses. rrr.UpdateState(resolver.State{}) - // Wait for a ResolveNow call on the round robin client's resolver. - <-rrrnCalled.Done() // Confirm several new RPCs succeed on pick first. for i := 0; i < 10; i++ { diff --git a/xds/internal/resolver/xds_resolver_test.go b/xds/internal/resolver/xds_resolver_test.go index 82355eecc70..b3c2006b72b 100644 --- a/xds/internal/resolver/xds_resolver_test.go +++ b/xds/internal/resolver/xds_resolver_test.go @@ -88,8 +88,9 @@ type testClientConn struct { errorCh *testutils.Channel } -func (t *testClientConn) UpdateState(s resolver.State) { +func (t *testClientConn) UpdateState(s resolver.State) error { t.stateCh.Send(s) + return nil } func (t *testClientConn) ReportError(err error) {