Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

grpc: perform graceful switching of LB policies in the ClientConn by default #5285

Merged
merged 16 commits into from Apr 1, 2022
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
327 changes: 214 additions & 113 deletions balancer_conn_wrappers.go

Large diffs are not rendered by default.

93 changes: 14 additions & 79 deletions clientconn.go
Expand Up @@ -278,15 +278,15 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
if creds := cc.dopts.copts.TransportCredentials; creds != nil {
credsClone = creds.Clone()
}
cc.balancerBuildOpts = balancer.BuildOptions{
cc.balancerWrapper = newCCBalancerWrapper(cc, balancer.BuildOptions{
DialCreds: credsClone,
CredsBundle: cc.dopts.copts.CredsBundle,
Dialer: cc.dopts.copts.Dialer,
Authority: cc.authority,
CustomUserAgent: cc.dopts.copts.UserAgent,
ChannelzParentID: cc.channelzID,
Target: cc.parsedTarget,
}
})

// Build the resolver.
rWrapper, err := newCCResolverWrapper(cc, resolverBuilder)
Expand Down Expand Up @@ -465,12 +465,12 @@ type ClientConn struct {
cancel context.CancelFunc // Cancelled on close.

// The following are initialized at dial time, and are read-only after that.
target string // User's dial target.
parsedTarget resolver.Target // See parseTargetAndFindResolver().
authority string // See determineAuthority().
dopts dialOptions // Default and user specified dial options.
balancerBuildOpts balancer.BuildOptions // TODO: delete once we move to the gracefulswitch balancer.
channelzID *channelz.Identifier // Channelz identifier for the channel.
target string // User's dial target.
parsedTarget resolver.Target // See parseTargetAndFindResolver().
authority string // See determineAuthority().
dopts dialOptions // Default and user specified dial options.
channelzID *channelz.Identifier // Channelz identifier for the channel.
balancerWrapper *ccBalancerWrapper // Uses gracefulswitch.balancer underneath.

// The following provide their own synchronization, and therefore don't
// require cc.mu to be held to access them.
Expand All @@ -491,8 +491,6 @@ type ClientConn struct {
sc *ServiceConfig // Latest service config received from the resolver.
conns map[*addrConn]struct{} // Set to nil on close.
mkp keepalive.ClientParameters // May be updated upon receipt of a GoAway.
curBalancerName string // TODO: delete as part of https://github.com/grpc/grpc-go/issues/5229.
balancerWrapper *ccBalancerWrapper // TODO: Use gracefulswitch balancer to be able to initialize this once and never rewrite.

lceMu sync.Mutex // protects lastConnectionError
lastConnectionError error
Expand Down Expand Up @@ -537,14 +535,7 @@ func (cc *ClientConn) GetState() connectivity.State {
// Notice: This API is EXPERIMENTAL and may be changed or removed in a later
// release.
func (cc *ClientConn) Connect() {
cc.mu.Lock()
defer cc.mu.Unlock()
if cc.balancerWrapper.exitIdle() {
return
}
for ac := range cc.conns {
go ac.connect()
}
cc.balancerWrapper.exitIdle()
}

func (cc *ClientConn) scWatcher() {
Expand Down Expand Up @@ -666,21 +657,9 @@ func (cc *ClientConn) updateResolverState(s resolver.State, err error) error {
if cc.sc != nil && cc.sc.lbConfig != nil {
balCfg = cc.sc.lbConfig.cfg
}

cbn := cc.curBalancerName
bw := cc.balancerWrapper
cc.mu.Unlock()
if cbn != grpclbName {
// Filter any grpclb addresses since we don't have the grpclb balancer.
var addrs []resolver.Address
for _, addr := range s.Addresses {
if addr.Type == resolver.GRPCLB {
continue
}
addrs = append(addrs, addr)
}
s.Addresses = addrs
}

uccsErr := bw.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg})
if ret == nil {
ret = uccsErr // prefer ErrBadResolver state since any other error is
Expand Down Expand Up @@ -709,50 +688,8 @@ func (cc *ClientConn) applyFailingLB(sc *serviceconfig.ParseResult) {
cc.csMgr.updateState(connectivity.TransientFailure)
}

// switchBalancer starts the switching from current balancer to the balancer
// with the given name.
//
// It will NOT send the current address list to the new balancer. If needed,
// caller of this function should send address list to the new balancer after
// this function returns.
//
// Caller must hold cc.mu.
func (cc *ClientConn) switchBalancer(name string) {
if strings.EqualFold(cc.curBalancerName, name) {
return
}

channelz.Infof(logger, cc.channelzID, "ClientConn switching balancer to %q", name)
// Don't hold cc.mu while closing the balancers. The balancers may call
// methods that require cc.mu (e.g. cc.NewSubConn()). Holding the mutex
// would cause a deadlock in that case.
cc.mu.Unlock()
cc.balancerWrapper.close()
cc.mu.Lock()

builder := balancer.Get(name)
if builder == nil {
channelz.Warningf(logger, cc.channelzID, "Channel switches to new LB policy %q due to fallback from invalid balancer name", PickFirstBalancerName)
channelz.Infof(logger, cc.channelzID, "failed to get balancer builder for: %v, using pick_first instead", name)
builder = newPickfirstBuilder()
} else {
channelz.Infof(logger, cc.channelzID, "Channel switches to new LB policy %q", name)
}

cc.curBalancerName = builder.Name()
cc.balancerWrapper = newCCBalancerWrapper(cc, builder, cc.balancerBuildOpts)
}

func (cc *ClientConn) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State, err error) {
cc.mu.Lock()
if cc.conns == nil {
cc.mu.Unlock()
return
}
// TODO(bar switching) send updates to all balancer wrappers when balancer
// gracefully switching is supported.
cc.balancerWrapper.handleSubConnStateChange(sc, s, err)
cc.mu.Unlock()
cc.balancerWrapper.updateSubConnState(sc, s, err)
}

// newAddrConn creates an addrConn for addrs and adds it to cc.conns.
Expand Down Expand Up @@ -1002,8 +939,6 @@ func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, configSel
cc.retryThrottler.Store((*retryThrottler)(nil))
}

// Only look at balancer types and switch balancer if balancer dial
// option is not set.
var newBalancerName string
if cc.sc != nil && cc.sc.lbConfig != nil {
newBalancerName = cc.sc.lbConfig.name
Expand All @@ -1023,7 +958,7 @@ func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, configSel
newBalancerName = PickFirstBalancerName
}
}
cc.switchBalancer(newBalancerName)
cc.balancerWrapper.switchTo(newBalancerName)
}

func (cc *ClientConn) resolveNow(o resolver.ResolveNowOptions) {
Expand Down Expand Up @@ -1074,11 +1009,11 @@ func (cc *ClientConn) Close() error {
rWrapper := cc.resolverWrapper
cc.resolverWrapper = nil
bWrapper := cc.balancerWrapper
cc.balancerWrapper = nil
cc.mu.Unlock()

// The order of closing matters here since the balancer wrapper assumes the
// picker is closed before it is closed.
cc.blockingpicker.close()

if bWrapper != nil {
bWrapper.close()
}
Expand Down
10 changes: 7 additions & 3 deletions clientconn_test.go
Expand Up @@ -845,9 +845,13 @@ func (s) TestBackoffCancel(t *testing.T) {
if err != nil {
t.Fatalf("Failed to create ClientConn: %v", err)
}
<-dialStrCh
cc.Close()
// Should not leak. May need -count 5000 to exercise.
defer cc.Close()

select {
case <-time.After(defaultTestTimeout):
t.Fatal("Timeout when waiting for custom dialer to be invoked during Dial")
case <-dialStrCh:
}
}

// UpdateAddresses should cause the next reconnect to begin from the top of the
Expand Down
7 changes: 7 additions & 0 deletions internal/balancer/gracefulswitch/gracefulswitch.go
Expand Up @@ -178,6 +178,9 @@ func (gsb *Balancer) ResolverError(err error) {
}

// ExitIdle forwards the call to the latest balancer created.
//
// If the latest balancer does not support ExitIdle, the subConns are
// re-connected to manually.
func (gsb *Balancer) ExitIdle() {
balToUpdate := gsb.latestBalancer()
if balToUpdate == nil {
Expand All @@ -188,6 +191,10 @@ func (gsb *Balancer) ExitIdle() {
// called.
if ei, ok := balToUpdate.Balancer.(balancer.ExitIdler); ok {
ei.ExitIdle()
return
}
for sc := range balToUpdate.subconns {
sc.Connect()
}
}

Expand Down
16 changes: 15 additions & 1 deletion internal/balancer/stub/stub.go
Expand Up @@ -19,7 +19,12 @@
// Package stub implements a balancer for testing purposes.
package stub

import "google.golang.org/grpc/balancer"
import (
"encoding/json"

"google.golang.org/grpc/balancer"
"google.golang.org/grpc/serviceconfig"
)

// BalancerFuncs contains all balancer.Balancer functions with a preceding
// *BalancerData parameter for passing additional instance information. Any
Expand All @@ -28,6 +33,8 @@ type BalancerFuncs struct {
// Init is called after ClientConn and BuildOptions are set in
// BalancerData. It may be used to initialize BalancerData.Data.
Init func(*BalancerData)
// ParseConfig is used for parsing LB configs, if specified.
ParseConfig func(json.RawMessage) (serviceconfig.LoadBalancingConfig, error)

UpdateClientConnState func(*BalancerData, balancer.ClientConnState) error
ResolverError func(*BalancerData, error)
Expand Down Expand Up @@ -97,6 +104,13 @@ func (bb bb) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.

func (bb bb) Name() string { return bb.name }

func (bb bb) ParseConfig(lbCfg json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
if bb.bf.ParseConfig != nil {
return bb.bf.ParseConfig(lbCfg)
}
return nil, nil
}

// Register registers a stub balancer builder which will call the provided
// functions. The name used should be unique.
func Register(name string, bf BalancerFuncs) {
Expand Down