Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

grpc: handle invalid service configs by applying the default, if applicable #5238

Merged
merged 1 commit into from Mar 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 9 additions & 0 deletions balancer_conn_wrappers.go
Expand Up @@ -131,11 +131,17 @@ func (ccb *ccBalancerWrapper) watcher() {
}

func (ccb *ccBalancerWrapper) close() {
if ccb == nil {
return
}
ccb.closed.Fire()
<-ccb.done.Done()
}

func (ccb *ccBalancerWrapper) exitIdle() bool {
if ccb == nil {
return true
}
if !ccb.hasExitIdle {
return false
}
Expand Down Expand Up @@ -168,6 +174,9 @@ func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnStat
}

func (ccb *ccBalancerWrapper) resolverError(err error) {
if ccb == nil {
return
}
ccb.balancerMu.Lock()
defer ccb.balancerMu.Unlock()
ccb.balancer.ResolverError(err)
Expand Down
59 changes: 34 additions & 25 deletions clientconn.go
Expand Up @@ -483,9 +483,6 @@ type ClientConn struct {
// firstResolveEvent is used to track whether the name resolver sent us at
// least one update. RPCs block on this event.
firstResolveEvent *grpcsync.Event
// TODO: Add a goodResolveEvent to track whether the name resolver sent us a
// good update. This will be used to determine if a balancer is configured on
// the channel instead of checking for `cc.balancerWrapper != nil`.

// mu protects the following fields.
// TODO: split mu so the same mutex isn't used for everything.
Expand Down Expand Up @@ -542,7 +539,7 @@ func (cc *ClientConn) GetState() connectivity.State {
func (cc *ClientConn) Connect() {
cc.mu.Lock()
defer cc.mu.Unlock()
if cc.balancerWrapper != nil && cc.balancerWrapper.exitIdle() {
if cc.balancerWrapper.exitIdle() {
return
}
for ac := range cc.conns {
Expand Down Expand Up @@ -627,9 +624,7 @@ func (cc *ClientConn) updateResolverState(s resolver.State, err error) error {
// with the new addresses.
cc.maybeApplyDefaultServiceConfig(nil)

if cc.balancerWrapper != nil {
cc.balancerWrapper.resolverError(err)
}
cc.balancerWrapper.resolverError(err)

// No addresses are valid with err set; return early.
cc.mu.Unlock()
Expand Down Expand Up @@ -657,18 +652,14 @@ func (cc *ClientConn) updateResolverState(s resolver.State, err error) error {
cc.applyServiceConfigAndBalancer(sc, configSelector, s.Addresses)
} else {
ret = balancer.ErrBadResolverState
if cc.balancerWrapper == nil {
var err error
if s.ServiceConfig.Err != nil {
err = status.Errorf(codes.Unavailable, "error parsing service config: %v", s.ServiceConfig.Err)
if cc.sc == nil {
if cc.dopts.defaultServiceConfig != nil {
cc.applyServiceConfigAndBalancer(cc.dopts.defaultServiceConfig, &defaultConfigSelector{cc.dopts.defaultServiceConfig}, s.Addresses)
} else {
err = status.Errorf(codes.Unavailable, "illegal service config type: %T", s.ServiceConfig.Config)
cc.applyFailingLB(s.ServiceConfig)
cc.mu.Unlock()
return ret
}
cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{cc.sc})
cc.blockingpicker.updatePicker(base.NewErrPicker(err))
cc.csMgr.updateState(connectivity.TransientFailure)
cc.mu.Unlock()
return ret
}
}
}
Expand Down Expand Up @@ -700,6 +691,26 @@ func (cc *ClientConn) updateResolverState(s resolver.State, err error) error {
return ret
}

// applyFailingLB is akin to configuring an LB policy on the channel which
// always fails RPCs. Here, an actual LB policy is not configured, but an always
// erroring picker is configured, which returns errors with information about
// what was invalid in the received service config. A config selector with no
// service config is configured, and the connectivity state of the channel is
// set to TransientFailure.
//
// Caller must hold cc.mu.
func (cc *ClientConn) applyFailingLB(sc *serviceconfig.ParseResult) {
var err error
if sc.Err != nil {
err = status.Errorf(codes.Unavailable, "error parsing service config: %v", sc.Err)
} else {
err = status.Errorf(codes.Unavailable, "illegal service config type: %T", sc.Config)
}
cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{nil})
cc.blockingpicker.updatePicker(base.NewErrPicker(err))
cc.csMgr.updateState(connectivity.TransientFailure)
}

// switchBalancer starts the switching from current balancer to the balancer
// with the given name.
//
Expand All @@ -714,14 +725,12 @@ func (cc *ClientConn) switchBalancer(name string) {
}

channelz.Infof(logger, cc.channelzID, "ClientConn switching balancer to %q", name)
if cc.balancerWrapper != nil {
// Don't hold cc.mu while closing the balancers. The balancers may call
// methods that require cc.mu (e.g. cc.NewSubConn()). Holding the mutex
// would cause a deadlock in that case.
cc.mu.Unlock()
cc.balancerWrapper.close()
cc.mu.Lock()
}
// Don't hold cc.mu while closing the balancers. The balancers may call
// methods that require cc.mu (e.g. cc.NewSubConn()). Holding the mutex
// would cause a deadlock in that case.
cc.mu.Unlock()
cc.balancerWrapper.close()
cc.mu.Lock()

builder := balancer.Get(name)
if builder == nil {
Expand Down