diff --git a/balancer_conn_wrappers.go b/balancer_conn_wrappers.go index 5eb87a55203..cb4b6728c22 100644 --- a/balancer_conn_wrappers.go +++ b/balancer_conn_wrappers.go @@ -131,11 +131,17 @@ func (ccb *ccBalancerWrapper) watcher() { } func (ccb *ccBalancerWrapper) close() { + if ccb == nil { + return + } ccb.closed.Fire() <-ccb.done.Done() } func (ccb *ccBalancerWrapper) exitIdle() bool { + if ccb == nil { + return true + } if !ccb.hasExitIdle { return false } @@ -168,6 +174,9 @@ func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnStat } func (ccb *ccBalancerWrapper) resolverError(err error) { + if ccb == nil { + return + } ccb.balancerMu.Lock() defer ccb.balancerMu.Unlock() ccb.balancer.ResolverError(err) diff --git a/clientconn.go b/clientconn.go index dd12a14f09a..e4819ca76b4 100644 --- a/clientconn.go +++ b/clientconn.go @@ -483,9 +483,6 @@ type ClientConn struct { // firstResolveEvent is used to track whether the name resolver sent us at // least one update. RPCs block on this event. firstResolveEvent *grpcsync.Event - // TODO: Add a goodResolveEvent to track whether the name resolver sent us a - // good update. This will be used to determine if a balancer is configured on - // the channel instead of checking for `cc.balancerWrapper != nil`. // mu protects the following fields. // TODO: split mu so the same mutex isn't used for everything. @@ -542,7 +539,7 @@ func (cc *ClientConn) GetState() connectivity.State { func (cc *ClientConn) Connect() { cc.mu.Lock() defer cc.mu.Unlock() - if cc.balancerWrapper != nil && cc.balancerWrapper.exitIdle() { + if cc.balancerWrapper.exitIdle() { return } for ac := range cc.conns { @@ -627,9 +624,7 @@ func (cc *ClientConn) updateResolverState(s resolver.State, err error) error { // with the new addresses. cc.maybeApplyDefaultServiceConfig(nil) - if cc.balancerWrapper != nil { - cc.balancerWrapper.resolverError(err) - } + cc.balancerWrapper.resolverError(err) // No addresses are valid with err set; return early. cc.mu.Unlock() @@ -657,18 +652,14 @@ func (cc *ClientConn) updateResolverState(s resolver.State, err error) error { cc.applyServiceConfigAndBalancer(sc, configSelector, s.Addresses) } else { ret = balancer.ErrBadResolverState - if cc.balancerWrapper == nil { - var err error - if s.ServiceConfig.Err != nil { - err = status.Errorf(codes.Unavailable, "error parsing service config: %v", s.ServiceConfig.Err) + if cc.sc == nil { + if cc.dopts.defaultServiceConfig != nil { + cc.applyServiceConfigAndBalancer(cc.dopts.defaultServiceConfig, &defaultConfigSelector{cc.dopts.defaultServiceConfig}, s.Addresses) } else { - err = status.Errorf(codes.Unavailable, "illegal service config type: %T", s.ServiceConfig.Config) + cc.applyFailingLB(s.ServiceConfig) + cc.mu.Unlock() + return ret } - cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{cc.sc}) - cc.blockingpicker.updatePicker(base.NewErrPicker(err)) - cc.csMgr.updateState(connectivity.TransientFailure) - cc.mu.Unlock() - return ret } } } @@ -700,6 +691,26 @@ func (cc *ClientConn) updateResolverState(s resolver.State, err error) error { return ret } +// applyFailingLB is akin to configuring an LB policy on the channel which +// always fails RPCs. Here, an actual LB policy is not configured, but an always +// erroring picker is configured, which returns errors with information about +// what was invalid in the received service config. A config selector with no +// service config is configured, and the connectivity state of the channel is +// set to TransientFailure. +// +// Caller must hold cc.mu. +func (cc *ClientConn) applyFailingLB(sc *serviceconfig.ParseResult) { + var err error + if sc.Err != nil { + err = status.Errorf(codes.Unavailable, "error parsing service config: %v", sc.Err) + } else { + err = status.Errorf(codes.Unavailable, "illegal service config type: %T", sc.Config) + } + cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{nil}) + cc.blockingpicker.updatePicker(base.NewErrPicker(err)) + cc.csMgr.updateState(connectivity.TransientFailure) +} + // switchBalancer starts the switching from current balancer to the balancer // with the given name. // @@ -714,14 +725,12 @@ func (cc *ClientConn) switchBalancer(name string) { } channelz.Infof(logger, cc.channelzID, "ClientConn switching balancer to %q", name) - if cc.balancerWrapper != nil { - // Don't hold cc.mu while closing the balancers. The balancers may call - // methods that require cc.mu (e.g. cc.NewSubConn()). Holding the mutex - // would cause a deadlock in that case. - cc.mu.Unlock() - cc.balancerWrapper.close() - cc.mu.Lock() - } + // Don't hold cc.mu while closing the balancers. The balancers may call + // methods that require cc.mu (e.g. cc.NewSubConn()). Holding the mutex + // would cause a deadlock in that case. + cc.mu.Unlock() + cc.balancerWrapper.close() + cc.mu.Lock() builder := balancer.Get(name) if builder == nil {