Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

grpc: perform graceful switching of LB policies in the ClientConn by default #5285

Merged
merged 16 commits into from Apr 1, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
319 changes: 222 additions & 97 deletions balancer_conn_wrappers.go

Large diffs are not rendered by default.

86 changes: 12 additions & 74 deletions clientconn.go
Expand Up @@ -278,15 +278,15 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
if creds := cc.dopts.copts.TransportCredentials; creds != nil {
credsClone = creds.Clone()
}
cc.balancerBuildOpts = balancer.BuildOptions{
cc.balancerWrapper = newCCBalancerWrapper(cc, balancer.BuildOptions{
DialCreds: credsClone,
CredsBundle: cc.dopts.copts.CredsBundle,
Dialer: cc.dopts.copts.Dialer,
Authority: cc.authority,
CustomUserAgent: cc.dopts.copts.UserAgent,
ChannelzParentID: cc.channelzID,
Target: cc.parsedTarget,
}
})

// Build the resolver.
rWrapper, err := newCCResolverWrapper(cc, resolverBuilder)
Expand Down Expand Up @@ -465,12 +465,12 @@ type ClientConn struct {
cancel context.CancelFunc // Cancelled on close.

// The following are initialized at dial time, and are read-only after that.
target string // User's dial target.
parsedTarget resolver.Target // See parseTargetAndFindResolver().
authority string // See determineAuthority().
dopts dialOptions // Default and user specified dial options.
balancerBuildOpts balancer.BuildOptions // TODO: delete once we move to the gracefulswitch balancer.
channelzID *channelz.Identifier // Channelz identifier for the channel.
target string // User's dial target.
parsedTarget resolver.Target // See parseTargetAndFindResolver().
authority string // See determineAuthority().
dopts dialOptions // Default and user specified dial options.
channelzID *channelz.Identifier // Channelz identifier for the channel.
balancerWrapper *ccBalancerWrapper // Uses gracefulswitch.balancer underneath.

// The following provide their own synchronization, and therefore don't
// require cc.mu to be held to access them.
Expand All @@ -491,8 +491,6 @@ type ClientConn struct {
sc *ServiceConfig // Latest service config received from the resolver.
conns map[*addrConn]struct{} // Set to nil on close.
mkp keepalive.ClientParameters // May be updated upon receipt of a GoAway.
curBalancerName string // TODO: delete as part of https://github.com/grpc/grpc-go/issues/5229.
balancerWrapper *ccBalancerWrapper // TODO: Use gracefulswitch balancer to be able to initialize this once and never rewrite.

lceMu sync.Mutex // protects lastConnectionError
lastConnectionError error
Expand Down Expand Up @@ -539,12 +537,7 @@ func (cc *ClientConn) GetState() connectivity.State {
func (cc *ClientConn) Connect() {
cc.mu.Lock()
defer cc.mu.Unlock()
if cc.balancerWrapper.exitIdle() {
return
}
for ac := range cc.conns {
go ac.connect()
}
cc.balancerWrapper.exitIdle()
}

func (cc *ClientConn) scWatcher() {
Expand Down Expand Up @@ -666,21 +659,9 @@ func (cc *ClientConn) updateResolverState(s resolver.State, err error) error {
if cc.sc != nil && cc.sc.lbConfig != nil {
balCfg = cc.sc.lbConfig.cfg
}

cbn := cc.curBalancerName
bw := cc.balancerWrapper
cc.mu.Unlock()
if cbn != grpclbName {
// Filter any grpclb addresses since we don't have the grpclb balancer.
var addrs []resolver.Address
for _, addr := range s.Addresses {
if addr.Type == resolver.GRPCLB {
continue
}
addrs = append(addrs, addr)
}
s.Addresses = addrs
}

uccsErr := bw.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg})
if ret == nil {
ret = uccsErr // prefer ErrBadResolver state since any other error is
Expand Down Expand Up @@ -709,49 +690,9 @@ func (cc *ClientConn) applyFailingLB(sc *serviceconfig.ParseResult) {
cc.csMgr.updateState(connectivity.TransientFailure)
}

// switchBalancer starts the switching from current balancer to the balancer
// with the given name.
//
// It will NOT send the current address list to the new balancer. If needed,
// caller of this function should send address list to the new balancer after
// this function returns.
//
// Caller must hold cc.mu.
func (cc *ClientConn) switchBalancer(name string) {
if strings.EqualFold(cc.curBalancerName, name) {
return
}

channelz.Infof(logger, cc.channelzID, "ClientConn switching balancer to %q", name)
// Don't hold cc.mu while closing the balancers. The balancers may call
// methods that require cc.mu (e.g. cc.NewSubConn()). Holding the mutex
// would cause a deadlock in that case.
cc.mu.Unlock()
cc.balancerWrapper.close()
cc.mu.Lock()

builder := balancer.Get(name)
if builder == nil {
channelz.Warningf(logger, cc.channelzID, "Channel switches to new LB policy %q due to fallback from invalid balancer name", PickFirstBalancerName)
channelz.Infof(logger, cc.channelzID, "failed to get balancer builder for: %v, using pick_first instead", name)
builder = newPickfirstBuilder()
} else {
channelz.Infof(logger, cc.channelzID, "Channel switches to new LB policy %q", name)
}

cc.curBalancerName = builder.Name()
cc.balancerWrapper = newCCBalancerWrapper(cc, builder, cc.balancerBuildOpts)
}

func (cc *ClientConn) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State, err error) {
cc.mu.Lock()
if cc.conns == nil {
cc.mu.Unlock()
return
}
// TODO(bar switching) send updates to all balancer wrappers when balancer
// gracefully switching is supported.
cc.balancerWrapper.handleSubConnStateChange(sc, s, err)
cc.balancerWrapper.updateSubConnState(sc, s, err)
cc.mu.Unlock()
}

Expand Down Expand Up @@ -1002,8 +943,6 @@ func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, configSel
cc.retryThrottler.Store((*retryThrottler)(nil))
}

// Only look at balancer types and switch balancer if balancer dial
// option is not set.
var newBalancerName string
if cc.sc != nil && cc.sc.lbConfig != nil {
newBalancerName = cc.sc.lbConfig.name
Expand All @@ -1023,7 +962,7 @@ func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, configSel
newBalancerName = PickFirstBalancerName
}
}
cc.switchBalancer(newBalancerName)
cc.balancerWrapper.switchTo(newBalancerName)
}

func (cc *ClientConn) resolveNow(o resolver.ResolveNowOptions) {
Expand Down Expand Up @@ -1074,7 +1013,6 @@ func (cc *ClientConn) Close() error {
rWrapper := cc.resolverWrapper
cc.resolverWrapper = nil
bWrapper := cc.balancerWrapper
cc.balancerWrapper = nil
cc.mu.Unlock()

cc.blockingpicker.close()
Expand Down
10 changes: 7 additions & 3 deletions clientconn_test.go
Expand Up @@ -845,9 +845,13 @@ func (s) TestBackoffCancel(t *testing.T) {
if err != nil {
t.Fatalf("Failed to create ClientConn: %v", err)
}
<-dialStrCh
cc.Close()
// Should not leak. May need -count 5000 to exercise.
defer cc.Close()

select {
case <-time.After(defaultTestTimeout):
t.Fatal("Timeout when waiting for custom dialer to be invoked during Dial")
case <-dialStrCh:
}
}

// UpdateAddresses should cause the next reconnect to begin from the top of the
Expand Down
8 changes: 8 additions & 0 deletions internal/balancer/gracefulswitch/gracefulswitch.go
Expand Up @@ -178,6 +178,10 @@ func (gsb *Balancer) ResolverError(err error) {
}

// ExitIdle forwards the call to the latest balancer created.
//
// If the latest balancer does not support ExitIdle, the subConns need to be
// re-connected manually. This used to be done in the ClientConn earlier, but
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would delete the sentence about how it used to be. Or at least it doesn't belong here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

// is done here now since the ClientConn uses gsb by default.
func (gsb *Balancer) ExitIdle() {
balToUpdate := gsb.latestBalancer()
if balToUpdate == nil {
Expand All @@ -188,6 +192,10 @@ func (gsb *Balancer) ExitIdle() {
// called.
if ei, ok := balToUpdate.Balancer.(balancer.ExitIdler); ok {
ei.ExitIdle()
return
}
for sc := range balToUpdate.subconns {
sc.Connect()
}
}

Expand Down
16 changes: 15 additions & 1 deletion internal/balancer/stub/stub.go
Expand Up @@ -19,7 +19,12 @@
// Package stub implements a balancer for testing purposes.
package stub

import "google.golang.org/grpc/balancer"
import (
"encoding/json"

"google.golang.org/grpc/balancer"
"google.golang.org/grpc/serviceconfig"
)

// BalancerFuncs contains all balancer.Balancer functions with a preceding
// *BalancerData parameter for passing additional instance information. Any
Expand All @@ -28,6 +33,8 @@ type BalancerFuncs struct {
// Init is called after ClientConn and BuildOptions are set in
// BalancerData. It may be used to initialize BalancerData.Data.
Init func(*BalancerData)
// ParseConfig is used for parsing LB configs, if specified.
ParseConfig func(json.RawMessage) (serviceconfig.LoadBalancingConfig, error)

UpdateClientConnState func(*BalancerData, balancer.ClientConnState) error
ResolverError func(*BalancerData, error)
Expand Down Expand Up @@ -97,6 +104,13 @@ func (bb bb) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.

func (bb bb) Name() string { return bb.name }

func (bb bb) ParseConfig(lbCfg json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
if bb.bf.ParseConfig != nil {
return bb.bf.ParseConfig(lbCfg)
}
return nil, nil
}

// Register registers a stub balancer builder which will call the provided
// functions. The name used should be unique.
func Register(name string, bf BalancerFuncs) {
Expand Down
109 changes: 97 additions & 12 deletions test/balancer_switching_test.go
Expand Up @@ -269,9 +269,6 @@ func (s) TestBalancerSwitch_pickFirstToGRPCLB(t *testing.T) {
if err := checkForTraceEvent(ctx, wantGRPCLBTraceDesc, now); err != nil {
t.Fatalf("timeout when waiting for a trace event: %s, err: %v", wantGRPCLBTraceDesc, err)
}
if err := checkRoundRobin(ctx, cc, addrs); err != nil {
t.Fatal(err)
}

// Push a resolver update containing a non-existent grpclb server address.
// This should not lead to a balancer switch.
Expand Down Expand Up @@ -354,9 +351,6 @@ func (s) TestBalancerSwitch_RoundRobinToGRPCLB(t *testing.T) {
if err := checkForTraceEvent(ctx, wantGRPCLBTraceDesc, now); err != nil {
t.Fatalf("timeout when waiting for a trace event: %s, err: %v", wantGRPCLBTraceDesc, err)
}
if err := checkRoundRobin(ctx, cc, addrs); err != nil {
t.Fatal(err)
}

// Switch back to "round_robin".
now = time.Now()
Expand Down Expand Up @@ -465,9 +459,6 @@ func (s) TestBalancerSwitch_grpclbAddressOverridesLoadBalancingPolicy(t *testing
if err := checkForTraceEvent(ctx, wantGRPCLBTraceDesc, now); err != nil {
t.Fatalf("timeout when waiting for a trace event: %s, err: %v", wantGRPCLBTraceDesc, err)
}
if err := checkRoundRobin(ctx, cc, addrs); err != nil {
t.Fatal(err)
}

// Push a resolver update with a service config using the deprecated
// `loadBalancingPolicy` field pointing to round_robin. The addresses list
Expand All @@ -491,12 +482,12 @@ func (s) TestBalancerSwitch_grpclbAddressOverridesLoadBalancingPolicy(t *testing
// Switch to "round_robin" by removing the address of type "grpclb".
now = time.Now()
r.UpdateState(resolver.State{Addresses: addrs})
if err := checkRoundRobin(ctx, cc, addrs); err != nil {
t.Fatal(err)
}
if err := checkForTraceEvent(ctx, wantRoundRobinTraceDesc, now); err != nil {
t.Fatalf("timeout when waiting for a trace event: %s, err: %v", wantRoundRobinTraceDesc, err)
}
if err := checkRoundRobin(ctx, cc, addrs); err != nil {
t.Fatal(err)
}
}

// TestBalancerSwitch_LoadBalancingConfigTrumps verifies that the
Expand Down Expand Up @@ -634,3 +625,97 @@ func (s) TestBalancerSwitch_OldBalancerCallsRemoveSubConnInClose(t *testing.T) {
case <-done:
}
}

// TestBalancerSwitch_Graceful tests the graceful switching of LB policies. It
// starts off by configuring "round_robin" on the channel and ensures that RPCs
// are successful. Then, it switches to a stub balancer which does not report a
// picker until instructed by the test do to so. At this point, the test
// verifies that RPCs are still successful using the old balancer. Then the test
// asks the new balancer to report a healthy picker and the test verifies that
// the RPCs get routed using the picker reported by the new balancer.
func (s) TestBalancerSwitch_Graceful(t *testing.T) {
backends, cleanup := startBackendsForBalancerSwitch(t)
defer cleanup()
addrs := stubBackendsToResolverAddrs(backends)

r := manual.NewBuilderWithScheme("whatever")
cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r))
if err != nil {
t.Fatalf("grpc.Dial() failed: %v", err)
}
defer cc.Close()

// Push a resolver update with the service config specifying "round_robin".
now := time.Now()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
r.UpdateState(resolver.State{
Addresses: addrs,
ServiceConfig: parseServiceConfig(t, r, rrServiceConfig),
})
if err := checkForTraceEvent(ctx, wantRoundRobinTraceDesc, now); err != nil {
t.Fatalf("timeout when waiting for a trace event: %s, err: %v", wantRoundRobinTraceDesc, err)
}
if err := checkRoundRobin(ctx, cc, addrs); err != nil {
t.Fatal(err)
}

// Register a stub balancer which uses a "pick_first" balancer underneath and
// signals on a channel when it receives ClientConn updates. But it does not
// forward the ccUpdate to the underlying "pick_first" balancer until the test
// asks it to do so. This allows us to test the graceful switch functionality.
// Until the test asks the stub balancer to forward the ccUpdate, RPCs should
// get routed to the old balancer. And once the test gives the go ahead, RPCs
// should get routed to the new balancer.
ccUpdateCh := make(chan struct{})
waitToProceed := make(chan struct{})
stub.Register(t.Name(), stub.BalancerFuncs{
Init: func(bd *stub.BalancerData) {
pf := balancer.Get(grpc.PickFirstBalancerName)
bd.Data = pf.Build(bd.ClientConn, bd.BuildOptions)
},
UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error {
bal := bd.Data.(balancer.Balancer)
close(ccUpdateCh)
go func() {
<-waitToProceed
bal.UpdateClientConnState(ccs)
}()
return nil
},
UpdateSubConnState: func(bd *stub.BalancerData, sc balancer.SubConn, state balancer.SubConnState) {
bal := bd.Data.(balancer.Balancer)
bal.UpdateSubConnState(sc, state)
},
})

// Push a resolver update with the service config specifying our stub
// balancer. We should see a trace event for this balancer switch. But RPCs
// should still be routed to the old balancer since our stub balancer does not
// report a ready picker until we ask it to do so.
now = time.Now()
r.UpdateState(resolver.State{
Addresses: addrs,
ServiceConfig: r.CC.ParseServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%v": {}}]}`, t.Name())),
})
select {
case <-ctx.Done():
t.Fatal("Timeout when waiting for a ClientConnState update on the new balancer")
case <-ccUpdateCh:
}
wantTraceDesc := fmt.Sprintf("Channel switches to new LB policy %q", t.Name())
if err := checkForTraceEvent(ctx, wantTraceDesc, now); err != nil {
t.Fatalf("timeout when waiting for a trace event: %s, err: %v", wantTraceDesc, err)
}
if err := checkRoundRobin(ctx, cc, addrs); err != nil {
t.Fatal(err)
}

// Ask our stub balancer to forward the earlier received ccUpdate to the
// underlying "pick_first" balancer which will result in a healthy picker
// being reported to the channel. RPCs should start using the new balancer.
close(waitToProceed)
if err := checkPickFirst(ctx, cc, addrs[0].Addr); err != nil {
t.Fatal(err)
}
}