diff --git a/balancer_conn_wrappers.go b/balancer_conn_wrappers.go index 42f0ab68291..7dd437e7f5c 100644 --- a/balancer_conn_wrappers.go +++ b/balancer_conn_wrappers.go @@ -87,7 +87,7 @@ func (b *scStateUpdateBuffer) get() <-chan *scStateUpdate { // It implements balancer.ClientConn interface. type ccBalancerWrapper struct { cc *ClientConn - balancerMu sync.Mutex + balancerMu sync.Mutex // synchronizes calls to the balancer balancer balancer.Balancer stateChangeQueue *scStateUpdateBuffer done *grpcsync.Event @@ -166,22 +166,6 @@ func (ccb *ccBalancerWrapper) handleSubConnStateChange(sc balancer.SubConn, s co } func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnState) error { - cbn := ccb.cc.curBalancerName - ccb.cc.mu.Unlock() - defer ccb.cc.mu.Lock() - if cbn != grpclbName { - // Filter any grpclb addresses since we don't have the grpclb balancer. - s := &ccs.ResolverState - for i := 0; i < len(s.Addresses); { - if s.Addresses[i].Type == resolver.GRPCLB { - copy(s.Addresses[i:], s.Addresses[i+1:]) - s.Addresses = s.Addresses[:len(s.Addresses)-1] - continue - } - i++ - } - } - ccb.balancerMu.Lock() defer ccb.balancerMu.Unlock() if ub, ok := ccb.balancer.(balancer.V2Balancer); ok { diff --git a/clientconn.go b/clientconn.go index ec8e60026f9..e131307c5db 100644 --- a/clientconn.go +++ b/clientconn.go @@ -556,16 +556,18 @@ func (cc *ClientConn) maybeApplyDefaultServiceConfig(addrs []resolver.Address) { func (cc *ClientConn) updateResolverState(s resolver.State, err error) error { cc.mu.Lock() - defer cc.mu.Unlock() // Check if the ClientConn is already closed. Some fields (e.g. // balancerWrapper) are set to nil when closing the ClientConn, and could // cause nil pointer panic if we don't have this check. if cc.conns == nil { + cc.mu.Unlock() return nil } if err != nil { - // May need to apply the initial service config + // May need to apply the initial service config in case the resolver + // doesn't support service configs, or doesn't provide a service config + // with the new addresses. cc.maybeApplyDefaultServiceConfig(nil) if cc.balancerWrapper != nil { @@ -573,6 +575,7 @@ func (cc *ClientConn) updateResolverState(s resolver.State, err error) error { } // No addresses are valid with err set; return early. + cc.mu.Unlock() return balancer.ErrBadResolverState } @@ -594,7 +597,22 @@ func (cc *ClientConn) updateResolverState(s resolver.State, err error) error { if cc.dopts.balancerBuilder == nil && cc.sc != nil && cc.sc.lbConfig != nil { balCfg = cc.sc.lbConfig.cfg } - uccsErr := cc.balancerWrapper.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg}) + + cbn := cc.curBalancerName + bw := cc.balancerWrapper + cc.mu.Unlock() + if cbn != grpclbName { + // Filter any grpclb addresses since we don't have the grpclb balancer. + for i := 0; i < len(s.Addresses); { + if s.Addresses[i].Type == resolver.GRPCLB { + copy(s.Addresses[i:], s.Addresses[i+1:]) + s.Addresses = s.Addresses[:len(s.Addresses)-1] + continue + } + i++ + } + } + uccsErr := bw.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg}) if ret == nil { ret = uccsErr // prefer ErrBadResolver state since any other error is // currently meaningless to the caller. diff --git a/resolver_conn_wrapper.go b/resolver_conn_wrapper.go index 93b4428c423..36ae2ca8d47 100644 --- a/resolver_conn_wrapper.go +++ b/resolver_conn_wrapper.go @@ -22,13 +22,13 @@ import ( "fmt" "strings" "sync" - "sync/atomic" "time" "google.golang.org/grpc/balancer" "google.golang.org/grpc/grpclog" "google.golang.org/grpc/internal/backoff" "google.golang.org/grpc/internal/channelz" + "google.golang.org/grpc/internal/grpcsync" "google.golang.org/grpc/resolver" "google.golang.org/grpc/serviceconfig" ) @@ -40,7 +40,7 @@ type ccResolverWrapper struct { resolver resolver.Resolver addrCh chan []resolver.Address scCh chan string - done uint32 // accessed atomically; set to 1 when closed. + done *grpcsync.Event curState resolver.State mu sync.Mutex // protects polling @@ -91,6 +91,7 @@ func newCCResolverWrapper(cc *ClientConn) (*ccResolverWrapper, error) { cc: cc, addrCh: make(chan []resolver.Address, 1), scCh: make(chan string, 1), + done: grpcsync.NewEvent(), } var err error @@ -102,21 +103,28 @@ func newCCResolverWrapper(cc *ClientConn) (*ccResolverWrapper, error) { } func (ccr *ccResolverWrapper) resolveNow(o resolver.ResolveNowOption) { - ccr.resolver.ResolveNow(o) + ccr.mu.Lock() + if !ccr.done.HasFired() { + ccr.resolver.ResolveNow(o) + } + ccr.mu.Unlock() } func (ccr *ccResolverWrapper) close() { + ccr.mu.Lock() ccr.resolver.Close() - atomic.StoreUint32(&ccr.done, 1) + ccr.done.Fire() + ccr.mu.Unlock() } var resolverBackoff = backoff.Exponential{MaxDelay: 2 * time.Minute} -// poll begins or ends asynchronous polling of the resolver. -func (ccr *ccResolverWrapper) poll(run bool) { +// poll begins or ends asynchronous polling of the resolver based on whether +// err is ErrBadResolverState. +func (ccr *ccResolverWrapper) poll(err error) { ccr.mu.Lock() defer ccr.mu.Unlock() - if !run { + if err != balancer.ErrBadResolverState { // stop polling if ccr.polling != nil { close(ccr.polling) @@ -131,25 +139,26 @@ func (ccr *ccResolverWrapper) poll(run bool) { p := make(chan struct{}) ccr.polling = p go func() { - for i := 0; true; i++ { + for i := 0; ; i++ { ccr.resolveNow(resolver.ResolveNowOption{}) t := time.NewTimer(resolverBackoff.Backoff(i)) select { case <-p: t.Stop() return + case <-ccr.done.Done(): + // Resolver has been closed. + t.Stop() + return case <-t.C: + // Timer expired; re-resolve. } } }() } -func (ccr *ccResolverWrapper) isDone() bool { - return atomic.LoadUint32(&ccr.done) == 1 -} - func (ccr *ccResolverWrapper) UpdateState(s resolver.State) { - if ccr.isDone() { + if ccr.done.HasFired() { return } grpclog.Infof("ccResolverWrapper: sending update to cc: %v", s) @@ -157,11 +166,11 @@ func (ccr *ccResolverWrapper) UpdateState(s resolver.State) { ccr.addChannelzTraceEvent(s) } ccr.curState = s - ccr.poll(ccr.cc.updateResolverState(ccr.curState, nil) == balancer.ErrBadResolverState) + ccr.poll(ccr.cc.updateResolverState(ccr.curState, nil)) } func (ccr *ccResolverWrapper) ReportError(err error) { - if ccr.isDone() { + if ccr.done.HasFired() { return } grpclog.Warningf("ccResolverWrapper: reporting error to cc: %v", err) @@ -171,12 +180,12 @@ func (ccr *ccResolverWrapper) ReportError(err error) { Severity: channelz.CtWarning, }) } - ccr.poll(ccr.cc.updateResolverState(resolver.State{}, err) == balancer.ErrBadResolverState) + ccr.poll(ccr.cc.updateResolverState(resolver.State{}, err)) } // NewAddress is called by the resolver implementation to send addresses to gRPC. func (ccr *ccResolverWrapper) NewAddress(addrs []resolver.Address) { - if ccr.isDone() { + if ccr.done.HasFired() { return } grpclog.Infof("ccResolverWrapper: sending new addresses to cc: %v", addrs) @@ -184,13 +193,13 @@ func (ccr *ccResolverWrapper) NewAddress(addrs []resolver.Address) { ccr.addChannelzTraceEvent(resolver.State{Addresses: addrs, ServiceConfigGetter: ccr.curState.ServiceConfigGetter}) } ccr.curState.Addresses = addrs - ccr.poll(ccr.cc.updateResolverState(ccr.curState, nil) == balancer.ErrBadResolverState) + ccr.poll(ccr.cc.updateResolverState(ccr.curState, nil)) } // NewServiceConfig is called by the resolver implementation to send service // configs to gRPC. func (ccr *ccResolverWrapper) NewServiceConfig(sc string) { - if ccr.isDone() { + if ccr.done.HasFired() { return } grpclog.Infof("ccResolverWrapper: got new service config: %v", sc) @@ -203,14 +212,14 @@ func (ccr *ccResolverWrapper) NewServiceConfig(sc string) { Severity: channelz.CtWarning, }) } - ccr.poll(true) + ccr.poll(balancer.ErrBadResolverState) return } if channelz.IsOn() { ccr.addChannelzTraceEvent(resolver.State{Addresses: ccr.curState.Addresses, ServiceConfigGetter: scg}) } ccr.curState.ServiceConfigGetter = scg - ccr.poll(ccr.cc.updateResolverState(ccr.curState, nil) == balancer.ErrBadResolverState) + ccr.poll(ccr.cc.updateResolverState(ccr.curState, nil)) } func (ccr *ccResolverWrapper) ParseServiceConfig(scJSON string) *serviceconfig.Getter {