Skip to content

Commit

Permalink
grpc: handle invalid service configs by applying the default if avail…
Browse files Browse the repository at this point in the history
…able (#5238)
  • Loading branch information
easwars committed Mar 16, 2022
1 parent 94ee386 commit c4cabf7
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 25 deletions.
9 changes: 9 additions & 0 deletions balancer_conn_wrappers.go
Expand Up @@ -131,11 +131,17 @@ func (ccb *ccBalancerWrapper) watcher() {
}

func (ccb *ccBalancerWrapper) close() {
if ccb == nil {
return
}
ccb.closed.Fire()
<-ccb.done.Done()
}

func (ccb *ccBalancerWrapper) exitIdle() bool {
if ccb == nil {
return true
}
if !ccb.hasExitIdle {
return false
}
Expand Down Expand Up @@ -168,6 +174,9 @@ func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnStat
}

func (ccb *ccBalancerWrapper) resolverError(err error) {
if ccb == nil {
return
}
ccb.balancerMu.Lock()
defer ccb.balancerMu.Unlock()
ccb.balancer.ResolverError(err)
Expand Down
59 changes: 34 additions & 25 deletions clientconn.go
Expand Up @@ -483,9 +483,6 @@ type ClientConn struct {
// firstResolveEvent is used to track whether the name resolver sent us at
// least one update. RPCs block on this event.
firstResolveEvent *grpcsync.Event
// TODO: Add a goodResolveEvent to track whether the name resolver sent us a
// good update. This will be used to determine if a balancer is configured on
// the channel instead of checking for `cc.balancerWrapper != nil`.

// mu protects the following fields.
// TODO: split mu so the same mutex isn't used for everything.
Expand Down Expand Up @@ -542,7 +539,7 @@ func (cc *ClientConn) GetState() connectivity.State {
func (cc *ClientConn) Connect() {
cc.mu.Lock()
defer cc.mu.Unlock()
if cc.balancerWrapper != nil && cc.balancerWrapper.exitIdle() {
if cc.balancerWrapper.exitIdle() {
return
}
for ac := range cc.conns {
Expand Down Expand Up @@ -627,9 +624,7 @@ func (cc *ClientConn) updateResolverState(s resolver.State, err error) error {
// with the new addresses.
cc.maybeApplyDefaultServiceConfig(nil)

if cc.balancerWrapper != nil {
cc.balancerWrapper.resolverError(err)
}
cc.balancerWrapper.resolverError(err)

// No addresses are valid with err set; return early.
cc.mu.Unlock()
Expand Down Expand Up @@ -657,18 +652,14 @@ func (cc *ClientConn) updateResolverState(s resolver.State, err error) error {
cc.applyServiceConfigAndBalancer(sc, configSelector, s.Addresses)
} else {
ret = balancer.ErrBadResolverState
if cc.balancerWrapper == nil {
var err error
if s.ServiceConfig.Err != nil {
err = status.Errorf(codes.Unavailable, "error parsing service config: %v", s.ServiceConfig.Err)
if cc.sc == nil {
if cc.dopts.defaultServiceConfig != nil {
cc.applyServiceConfigAndBalancer(cc.dopts.defaultServiceConfig, &defaultConfigSelector{cc.dopts.defaultServiceConfig}, s.Addresses)
} else {
err = status.Errorf(codes.Unavailable, "illegal service config type: %T", s.ServiceConfig.Config)
cc.applyFailingLB(s.ServiceConfig)
cc.mu.Unlock()
return ret
}
cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{cc.sc})
cc.blockingpicker.updatePicker(base.NewErrPicker(err))
cc.csMgr.updateState(connectivity.TransientFailure)
cc.mu.Unlock()
return ret
}
}
}
Expand Down Expand Up @@ -700,6 +691,26 @@ func (cc *ClientConn) updateResolverState(s resolver.State, err error) error {
return ret
}

// applyFailingLB is akin to configuring an LB policy on the channel which
// always fails RPCs. Here, an actual LB policy is not configured, but an always
// erroring picker is configured, which returns errors with information about
// what was invalid in the received service config. A config selector with no
// service config is configured, and the connectivity state of the channel is
// set to TransientFailure.
//
// Caller must hold cc.mu.
func (cc *ClientConn) applyFailingLB(sc *serviceconfig.ParseResult) {
var err error
if sc.Err != nil {
err = status.Errorf(codes.Unavailable, "error parsing service config: %v", sc.Err)
} else {
err = status.Errorf(codes.Unavailable, "illegal service config type: %T", sc.Config)
}
cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{nil})
cc.blockingpicker.updatePicker(base.NewErrPicker(err))
cc.csMgr.updateState(connectivity.TransientFailure)
}

// switchBalancer starts the switching from current balancer to the balancer
// with the given name.
//
Expand All @@ -714,14 +725,12 @@ func (cc *ClientConn) switchBalancer(name string) {
}

channelz.Infof(logger, cc.channelzID, "ClientConn switching balancer to %q", name)
if cc.balancerWrapper != nil {
// Don't hold cc.mu while closing the balancers. The balancers may call
// methods that require cc.mu (e.g. cc.NewSubConn()). Holding the mutex
// would cause a deadlock in that case.
cc.mu.Unlock()
cc.balancerWrapper.close()
cc.mu.Lock()
}
// Don't hold cc.mu while closing the balancers. The balancers may call
// methods that require cc.mu (e.g. cc.NewSubConn()). Holding the mutex
// would cause a deadlock in that case.
cc.mu.Unlock()
cc.balancerWrapper.close()
cc.mu.Lock()

builder := balancer.Get(name)
if builder == nil {
Expand Down

0 comments on commit c4cabf7

Please sign in to comment.