From 5ebfcc36cf40e1da09dbd37fb3694b80e731de06 Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Wed, 9 Mar 2022 09:49:04 -0800 Subject: [PATCH] grpc: add a field to represent receipt of a good resolver update --- clientconn.go | 318 ++++++++++++++++++++++++++++++++------------------ 1 file changed, 204 insertions(+), 114 deletions(-) diff --git a/clientconn.go b/clientconn.go index c084a9013e42..f24fbeaf09d4 100644 --- a/clientconn.go +++ b/clientconn.go @@ -483,9 +483,6 @@ type ClientConn struct { // firstResolveEvent is used to track whether the name resolver sent us at // least one update. RPCs block on this event. firstResolveEvent *grpcsync.Event - // TODO: Add a goodResolveEvent to track whether the name resolver sent us a - // good update. This will be used to determine if a balancer is configured on - // the channel instead of checking for `cc.balancerWrapper != nil`. // mu protects the following fields. // TODO: split mu so the same mutex isn't used for everything. @@ -496,6 +493,9 @@ type ClientConn struct { mkp keepalive.ClientParameters // May be updated upon receipt of a GoAway. curBalancerName string // TODO: delete as part of https://github.com/grpc/grpc-go/issues/5229. balancerWrapper *ccBalancerWrapper // TODO: Use gracefulswitch balancer to be able to initialize this once and never rewrite. + // Whether the resolver got back with a good update. And when this set to + // true, we can safely assume that a balancer is configured on the channel. + rcvdGoodResolverUpdate bool lceMu sync.Mutex // protects lastConnectionError lastConnectionError error @@ -542,7 +542,7 @@ func (cc *ClientConn) GetState() connectivity.State { func (cc *ClientConn) Connect() { cc.mu.Lock() defer cc.mu.Unlock() - if cc.balancerWrapper != nil && cc.balancerWrapper.exitIdle() { + if cc.rcvdGoodResolverUpdate && cc.balancerWrapper.exitIdle() { return } for ac := range cc.conns { @@ -598,8 +598,26 @@ func init() { emptyServiceConfig = cfg.Config.(*ServiceConfig) } +// maybeApplyDefaultServiceConfig attempts to apply the default service config, +// and in the process may also switch balancers. +// +// The default service config is determined as follows: +// - If the channel previously received valid service config from the name +// resolver (and WithDisableServiceConfig() was not set) or it received +// service config through the old API WithServiceConfig(), it is applied. +// - If a default service config was specified via the WithDefaultServiceConfig +// dial option, it is applied. +// - If none of the above conditions are met, empty service config is applied. +// +// Caller must hold cc.mu. func (cc *ClientConn) maybeApplyDefaultServiceConfig(addrs []resolver.Address) { if cc.sc != nil { + // cc.sc is set when service configs are received through the old API, + // WithServiceConfig(), or from the name resolver and service configs from + // the name resolver are not disabled through the WithDisableServiceConfig() + // dial option. In the former case, config selector is updated, but the + // service configs are not applied. We wait for an update from the name + // resolver to apply those service configs. cc.applyServiceConfigAndBalancer(cc.sc, nil, addrs) return } @@ -610,79 +628,202 @@ func (cc *ClientConn) maybeApplyDefaultServiceConfig(addrs []resolver.Address) { } } +func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, configSelector iresolver.ConfigSelector, addrs []resolver.Address) { + cc.applyServiceConfig(sc, configSelector) + cc.applyBalancer(addrs) +} + +// applyServiceConfig applies the provided service config. +// +// Caller must hold cc.mu. +func (cc *ClientConn) applyServiceConfig(sc *ServiceConfig, configSelector iresolver.ConfigSelector) { + if sc == nil { + // should never reach here. + return + } + + // Update the most recently received service config field in cc. + cc.sc = sc + if configSelector != nil { + cc.safeConfigSelector.UpdateConfigSelector(configSelector) + } + + if cc.sc.retryThrottling == nil { + cc.retryThrottler.Store((*retryThrottler)(nil)) + return + } + cc.retryThrottler.Store(&retryThrottler{ + tokens: cc.sc.retryThrottling.MaxTokens, + max: cc.sc.retryThrottling.MaxTokens, + thresh: cc.sc.retryThrottling.MaxTokens / 2, + ratio: cc.sc.retryThrottling.TokenRatio, + }) +} + +// applyBalancer applies LB policy and associated configuration from the +// provided service config. It also sets `cc.rcvdGoodResolverUpdate` to true, +// indicating that we have a valid LB policy configured on the channel. +// +// Caller must hold cc.mu. +func (cc *ClientConn) applyBalancer(addrs []resolver.Address) { + if cc.dopts.balancerBuilder == nil { + // Only look at balancer types and switch balancer if balancer dial + // option is not set. + var newBalancerName string + if cc.sc != nil && cc.sc.lbConfig != nil { + newBalancerName = cc.sc.lbConfig.name + } else { + var isGRPCLB bool + for _, a := range addrs { + if a.Type == resolver.GRPCLB { + isGRPCLB = true + break + } + } + if isGRPCLB { + newBalancerName = grpclbName + } else if cc.sc != nil && cc.sc.LB != nil { + newBalancerName = *cc.sc.LB + } else { + newBalancerName = PickFirstBalancerName + } + } + cc.switchBalancer(newBalancerName) + } else if cc.balancerWrapper == nil { + // Balancer dial option was set, and this is the first time handling + // resolved addresses. Build a balancer with dopts.balancerBuilder. + cc.curBalancerName = cc.dopts.balancerBuilder.Name() + cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, cc.balancerBuildOpts) + } + cc.rcvdGoodResolverUpdate = true +} + +// onResolverError handles an update from the resolver which contains an error. +// +// If this is the first update from the resolver, the default service config is +// applied. If we have already heard from the resolver, applying the default +// service config will be a no-op and hence will be skipped. +// +// If an LB policy is configured on the channel, the error from the resolver is +// propagated to the balancer. +// +// Caller must hold cc.mu. +func (cc *ClientConn) onResolverError(err error) { + if !cc.firstResolveEvent.HasFired() { + cc.maybeApplyDefaultServiceConfig(nil) + } + if cc.rcvdGoodResolverUpdate { + cc.balancerWrapper.resolverError(err) + } +} + +// tryApplyServiceConfig attempts to apply the service config specified in the +// update from the name resolver. As part of applying the new service config, +// the balancer might be switched. +// +// The following logic is followed: +// - If the name resolver specifies an empty service config: +// - default service config is applied +// - If the name resolver specifies a valid service config: +// - the newly received service config is applied +// - If the name resolver specifies an invalid service config: +// - if there is no current valid service config +// - set a failing balancer on the channel. This is done by setting: +// - no balancer on the channel +// - an error picker returning service config parsing errors +// - connectivity state to TransientFailure +// - if there is current valid service config, nothing needs to be done +// +// Returns non-nil error if the resolver update contained a non-empty service +// config, and it was invalid. +// +// Caller must hold cc.mu. +func (cc *ClientConn) tryApplyServiceConfig(s resolver.State) error { + if s.ServiceConfig == nil { + cc.maybeApplyDefaultServiceConfig(s.Addresses) + return nil + } + + sc, ok := s.ServiceConfig.Config.(*ServiceConfig) + if s.ServiceConfig.Err == nil && ok { + if cc.dopts.disableServiceConfig { + channelz.Infof(logger, cc.channelzID, "ignoring service config from resolver (%v) and applying the default because service config is disabled", s.ServiceConfig) + cc.maybeApplyDefaultServiceConfig(s.Addresses) + return nil + } + + var configSelector iresolver.ConfigSelector = &defaultConfigSelector{sc} + if cs := iresolver.GetConfigSelector(s); cs != nil { + configSelector = cs + if len(s.ServiceConfig.Config.(*ServiceConfig).Methods) != 0 { + channelz.Infof(logger, cc.channelzID, "method configs in service config will be ignored due to presence of config selector") + } + } + cc.applyServiceConfigAndBalancer(sc, configSelector, s.Addresses) + return nil + } + + if !cc.rcvdGoodResolverUpdate { + var err error + if s.ServiceConfig.Err != nil { + err = status.Errorf(codes.Unavailable, "error parsing service config: %v", s.ServiceConfig.Err) + } else { + err = status.Errorf(codes.Unavailable, "illegal service config type: %T", s.ServiceConfig.Config) + } + cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{cc.sc}) + cc.blockingpicker.updatePicker(base.NewErrPicker(err)) + cc.csMgr.updateState(connectivity.TransientFailure) + } + return balancer.ErrBadResolverState +} + +// updateResolverState handles an update from the name resolver as follows: +// - If the update contains an error, the returned addresses are not used. Error +// handling logic is invoked, followed by an early return. +// - Else, an attempt is made to apply the service config: +// - If the update contains an invalid service config: +// - If the channel contains no previous valid service config: +// - an error picker is set on the channel, followed by an early return +// - If the channel contains previous valid service config: +// - existing service config and the new addresses are used +// - If the update contains no service config, or service configs from the +// resolver are disabled, the default service config is applied +// - If the update contains valid service config, they are applied +// +// As part of applying the service config, the LB policy might get switched. +// Finally, the update is pushed to the balancer. func (cc *ClientConn) updateResolverState(s resolver.State, err error) error { defer cc.firstResolveEvent.Fire() + + // If the channel is closed, return early. cc.mu.Lock() - // Check if the ClientConn is already closed. Some fields (e.g. - // balancerWrapper) are set to nil when closing the ClientConn, and could - // cause nil pointer panic if we don't have this check. if cc.conns == nil { cc.mu.Unlock() return nil } if err != nil { - // May need to apply the initial service config in case the resolver - // doesn't support service configs, or doesn't provide a service config - // with the new addresses. - cc.maybeApplyDefaultServiceConfig(nil) - - if cc.balancerWrapper != nil { - cc.balancerWrapper.resolverError(err) - } - - // No addresses are valid with err set; return early. + cc.onResolverError(err) cc.mu.Unlock() return balancer.ErrBadResolverState } - var ret error - if cc.dopts.disableServiceConfig { - channelz.Infof(logger, cc.channelzID, "ignoring service config from resolver (%v) and applying the default because service config is disabled", s.ServiceConfig) - cc.maybeApplyDefaultServiceConfig(s.Addresses) - } else if s.ServiceConfig == nil { - cc.maybeApplyDefaultServiceConfig(s.Addresses) - // TODO: do we need to apply a failing LB policy if there is no - // default, per the error handling design? - } else { - if sc, ok := s.ServiceConfig.Config.(*ServiceConfig); s.ServiceConfig.Err == nil && ok { - configSelector := iresolver.GetConfigSelector(s) - if configSelector != nil { - if len(s.ServiceConfig.Config.(*ServiceConfig).Methods) != 0 { - channelz.Infof(logger, cc.channelzID, "method configs in service config will be ignored due to presence of config selector") - } - } else { - configSelector = &defaultConfigSelector{sc} - } - cc.applyServiceConfigAndBalancer(sc, configSelector, s.Addresses) - } else { - ret = balancer.ErrBadResolverState - if cc.balancerWrapper == nil { - var err error - if s.ServiceConfig.Err != nil { - err = status.Errorf(codes.Unavailable, "error parsing service config: %v", s.ServiceConfig.Err) - } else { - err = status.Errorf(codes.Unavailable, "illegal service config type: %T", s.ServiceConfig.Config) - } - cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{cc.sc}) - cc.blockingpicker.updatePicker(base.NewErrPicker(err)) - cc.csMgr.updateState(connectivity.TransientFailure) - cc.mu.Unlock() - return ret - } - } + retErr := cc.tryApplyServiceConfig(s) + if retErr != nil && !cc.rcvdGoodResolverUpdate { + cc.mu.Unlock() + return retErr } var balCfg serviceconfig.LoadBalancingConfig if cc.dopts.balancerBuilder == nil && cc.sc != nil && cc.sc.lbConfig != nil { balCfg = cc.sc.lbConfig.cfg } - cbn := cc.curBalancerName bw := cc.balancerWrapper cc.mu.Unlock() + + // Filter out grpclb addresses for non-grpclb balancers. if cbn != grpclbName { - // Filter any grpclb addresses since we don't have the grpclb balancer. for i := 0; i < len(s.Addresses); { if s.Addresses[i].Type == resolver.GRPCLB { copy(s.Addresses[i:], s.Addresses[i+1:]) @@ -692,12 +833,15 @@ func (cc *ClientConn) updateResolverState(s resolver.State, err error) error { i++ } } - uccsErr := bw.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg}) - if ret == nil { - ret = uccsErr // prefer ErrBadResolver state since any other error is - // currently meaningless to the caller. + + // If `retErr != nil`, the only value it can contain is + // balancer.ErrBadResolverState. Prefer that error to any other error + // returned by the balancer since other error values are currently + // meaningless to the caller. + if uccsErr := bw.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg}); retErr == nil { + retErr = uccsErr } - return ret + return retErr } // switchBalancer starts the switching from current balancer to the balancer @@ -718,7 +862,7 @@ func (cc *ClientConn) switchBalancer(name string) { channelz.Info(logger, cc.channelzID, "ignoring balancer switching: Balancer DialOption used instead") return } - if cc.balancerWrapper != nil { + if cc.rcvdGoodResolverUpdate { // Don't hold cc.mu while closing the balancers. The balancers may call // methods that require cc.mu (e.g. cc.NewSubConn()). Holding the mutex // would cause a deadlock in that case. @@ -977,59 +1121,6 @@ func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method st return t, done, nil } -func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, configSelector iresolver.ConfigSelector, addrs []resolver.Address) { - if sc == nil { - // should never reach here. - return - } - cc.sc = sc - if configSelector != nil { - cc.safeConfigSelector.UpdateConfigSelector(configSelector) - } - - if cc.sc.retryThrottling != nil { - newThrottler := &retryThrottler{ - tokens: cc.sc.retryThrottling.MaxTokens, - max: cc.sc.retryThrottling.MaxTokens, - thresh: cc.sc.retryThrottling.MaxTokens / 2, - ratio: cc.sc.retryThrottling.TokenRatio, - } - cc.retryThrottler.Store(newThrottler) - } else { - cc.retryThrottler.Store((*retryThrottler)(nil)) - } - - if cc.dopts.balancerBuilder == nil { - // Only look at balancer types and switch balancer if balancer dial - // option is not set. - var newBalancerName string - if cc.sc != nil && cc.sc.lbConfig != nil { - newBalancerName = cc.sc.lbConfig.name - } else { - var isGRPCLB bool - for _, a := range addrs { - if a.Type == resolver.GRPCLB { - isGRPCLB = true - break - } - } - if isGRPCLB { - newBalancerName = grpclbName - } else if cc.sc != nil && cc.sc.LB != nil { - newBalancerName = *cc.sc.LB - } else { - newBalancerName = PickFirstBalancerName - } - } - cc.switchBalancer(newBalancerName) - } else if cc.balancerWrapper == nil { - // Balancer dial option was set, and this is the first time handling - // resolved addresses. Build a balancer with dopts.balancerBuilder. - cc.curBalancerName = cc.dopts.balancerBuilder.Name() - cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, cc.balancerBuildOpts) - } -} - func (cc *ClientConn) resolveNow(o resolver.ResolveNowOptions) { cc.mu.RLock() r := cc.resolverWrapper @@ -1078,7 +1169,6 @@ func (cc *ClientConn) Close() error { rWrapper := cc.resolverWrapper cc.resolverWrapper = nil bWrapper := cc.balancerWrapper - cc.balancerWrapper = nil cc.mu.Unlock() cc.blockingpicker.close()