From 07737b6fd5cfe52b7a51cc6ca7ed64536e5d021d Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Wed, 30 Mar 2022 10:12:02 -0700 Subject: [PATCH 01/16] use gracefulswitch.Balancer in ccBalancerWrapper --- balancer_conn_wrappers.go | 332 +++++++++++++----- clientconn.go | 81 +---- clientconn_test.go | 10 +- .../balancer/gracefulswitch/gracefulswitch.go | 8 + internal/balancer/stub/stub.go | 16 +- test/balancer_switching_test.go | 94 +++++ test/resolver_update_test.go | 90 ++--- 7 files changed, 407 insertions(+), 224 deletions(-) diff --git a/balancer_conn_wrappers.go b/balancer_conn_wrappers.go index 84934bc0e67..006e32e2e50 100644 --- a/balancer_conn_wrappers.go +++ b/balancer_conn_wrappers.go @@ -20,136 +20,171 @@ package grpc import ( "fmt" + "strings" "sync" "google.golang.org/grpc/balancer" "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/internal/balancer/gracefulswitch" "google.golang.org/grpc/internal/buffer" "google.golang.org/grpc/internal/channelz" "google.golang.org/grpc/internal/grpcsync" "google.golang.org/grpc/resolver" ) -// scStateUpdate contains the subConn and the new state it changed to. -type scStateUpdate struct { - sc balancer.SubConn - state connectivity.State - err error -} - -// exitIdle contains no data and is just a signal sent on the updateCh in -// ccBalancerWrapper to instruct the balancer to exit idle. -type exitIdle struct{} - -// ccBalancerWrapper is a wrapper on top of cc for balancers. +// ccBalancerWrapper is a wrapper on top of cc for balancers. It ensures that +// method invocations on the underlying balancer happen synchronously and in the +// same order in which they were received from grpc. +// +// It uses the gracefulswitch.Balancer internally to ensure that balancer +// switches happen in a graceful manner. +// // It implements balancer.ClientConn interface. type ccBalancerWrapper struct { - cc *ClientConn - balancerMu sync.Mutex // synchronizes calls to the balancer - balancer balancer.Balancer - hasExitIdle bool - updateCh *buffer.Unbounded - closed *grpcsync.Event - done *grpcsync.Event + cc *ClientConn + + // balancerMu protects access to the following fields. Any calls on the + // underlying balancer must be made with the mutex held. This ensures that we + // never call the underlying balancer methods concurrently. + balancerMu sync.Mutex + balancer *gracefulswitch.Balancer + curBalancerName string + + updateCh *buffer.Unbounded // Updates written on this channel are processed by watcher(). + resultCh *buffer.Unbounded // Results of calls to UpdateClientConnState() are pushed here. + closed *grpcsync.Event // Indicates if close has been called. + done *grpcsync.Event // Indicates if close has completed its work. mu sync.Mutex subConns map[*acBalancerWrapper]struct{} } -func newCCBalancerWrapper(cc *ClientConn, b balancer.Builder, bopts balancer.BuildOptions) *ccBalancerWrapper { +// newCCBalancerWrapper creates a new balancer wrapper. The underlying balancer +// is not created until the switchTo() method is invoked. +func newCCBalancerWrapper(cc *ClientConn, bopts balancer.BuildOptions) *ccBalancerWrapper { ccb := &ccBalancerWrapper{ cc: cc, updateCh: buffer.NewUnbounded(), + resultCh: buffer.NewUnbounded(), closed: grpcsync.NewEvent(), done: grpcsync.NewEvent(), subConns: make(map[*acBalancerWrapper]struct{}), } go ccb.watcher() - ccb.balancer = b.Build(ccb, bopts) - _, ccb.hasExitIdle = ccb.balancer.(balancer.ExitIdler) + ccb.balancer = gracefulswitch.NewBalancer(ccb, bopts) return ccb } -// watcher balancer functions sequentially, so the balancer can be implemented -// lock-free. +// updateType indicates the type of update pushed to the watcher goroutine. +type updateType int + +const ( + updateTypeClientConnState updateType = iota // clientConn state change from grpc + updateTypeSubConnState // subConn state change from grpc + updateTypeExitIdle // exitIdle from grpc + updateTypeResolverError // resolver error from grpc + updateTypeSwitchTo // balancer switch update from grpc + updateTypeSubConn // removeSubConn from the balancer +) + +// watcherUpdate wraps the actual update to be passed to the watcher goroutine +// with a type indicating the kind of update being wrapped. +type watcherUpdate struct { + typ updateType + update interface{} +} + +// scStateUpdate contains the subConn and the new state it changed to. +type scStateUpdate struct { + sc balancer.SubConn + state connectivity.State + err error +} + +// watcher is a long-running goroutine which reads updates from a channel and +// invokes corresponding methods on the underlying balancer. It ensures that +// these methods are invoked in a synchronous fashion. It also ensures that +// these methods are invoked in the order in which the updates were received. func (ccb *ccBalancerWrapper) watcher() { for { select { - case t := <-ccb.updateCh.Get(): + case u := <-ccb.updateCh.Get(): ccb.updateCh.Load() if ccb.closed.HasFired() { break } - switch u := t.(type) { - case *scStateUpdate: - ccb.balancerMu.Lock() - ccb.balancer.UpdateSubConnState(u.sc, balancer.SubConnState{ConnectivityState: u.state, ConnectionError: u.err}) - ccb.balancerMu.Unlock() - case *acBalancerWrapper: - ccb.mu.Lock() - if ccb.subConns != nil { - delete(ccb.subConns, u) - ccb.cc.removeAddrConn(u.getAddrConn(), errConnDrain) - } - ccb.mu.Unlock() - case exitIdle: - if ccb.cc.GetState() == connectivity.Idle { - if ei, ok := ccb.balancer.(balancer.ExitIdler); ok { - // We already checked that the balancer implements - // ExitIdle before pushing the event to updateCh, but - // check conditionally again as defensive programming. - ccb.balancerMu.Lock() - ei.ExitIdle() - ccb.balancerMu.Unlock() - } - } + update := u.(watcherUpdate) + switch update.typ { + case updateTypeClientConnState: + ccb.handleClientConnStateChange(update.update.(*balancer.ClientConnState)) + case updateTypeSubConnState: + ccb.handleSubConnStateChange(update.update.(*scStateUpdate)) + case updateTypeExitIdle: + ccb.handleExitIdle() + case updateTypeResolverError: + ccb.handleResolverError(update.update.(error)) + case updateTypeSwitchTo: + ccb.handleSwitchTo(update.update.(string)) + case updateTypeSubConn: + ccb.handleRemoveSubConn(update.update.(*acBalancerWrapper)) default: - logger.Errorf("ccBalancerWrapper.watcher: unknown update %+v, type %T", t, t) + logger.Errorf("ccBalancerWrapper.watcher: unknown update %+v, type %T", update, update) } case <-ccb.closed.Done(): } if ccb.closed.HasFired() { - ccb.balancerMu.Lock() - ccb.balancer.Close() - ccb.balancerMu.Unlock() - ccb.mu.Lock() - scs := ccb.subConns - ccb.subConns = nil - ccb.mu.Unlock() - ccb.UpdateState(balancer.State{ConnectivityState: connectivity.Connecting, Picker: nil}) - ccb.done.Fire() - // Fire done before removing the addr conns. We can safely unblock - // ccb.close and allow the removeAddrConns to happen - // asynchronously. - for acbw := range scs { - ccb.cc.removeAddrConn(acbw.getAddrConn(), errConnDrain) - } + ccb.handleClose() return } } } -func (ccb *ccBalancerWrapper) close() { +// updateClientConnState is invoked by grpc to push a ClientConnState update to +// the underlying balancer. +// +// Unlike other methods invoked by grpc to push updates to the underlying +// balancer, this method cannot simply push the update onto the update channel +// and return. It needs to return the error returned by the underlying balancer +// back to grpc which propagates that to the resolver. +func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnState) error { if ccb == nil { - return + return nil } - ccb.closed.Fire() - <-ccb.done.Done() + ccb.updateCh.Put(watcherUpdate{typ: updateTypeClientConnState, update: ccs}) + res := <-ccb.resultCh.Get() + ccb.resultCh.Load() + // If the returned error is nil, attempting to type assert to error leads to + // panic. So, this needs to handled separately. + if res == nil { + return nil + } + return res.(error) } -func (ccb *ccBalancerWrapper) exitIdle() bool { - if ccb == nil { - return true - } - if !ccb.hasExitIdle { - return false +// handleClientConnStateChange handles a ClientConnState update from the update +// channel and invokes the appropriate method on the underlying balancer. +func (ccb *ccBalancerWrapper) handleClientConnStateChange(ccs *balancer.ClientConnState) { + ccb.balancerMu.Lock() + defer ccb.balancerMu.Unlock() + + if ccb.curBalancerName != grpclbName { + // Filter any grpclb addresses since we don't have the grpclb balancer. + var addrs []resolver.Address + for _, addr := range ccs.ResolverState.Addresses { + if addr.Type == resolver.GRPCLB { + continue + } + addrs = append(addrs, addr) + } + ccs.ResolverState.Addresses = addrs } - ccb.updateCh.Put(exitIdle{}) - return true + ccb.resultCh.Put(ccb.balancer.UpdateClientConnState(*ccs)) } -func (ccb *ccBalancerWrapper) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State, err error) { +// updateSubConnState is invoked by grpc to push a subConn state update to the +// underlying balancer. +func (ccb *ccBalancerWrapper) updateSubConnState(sc balancer.SubConn, s connectivity.State, err error) { // When updating addresses for a SubConn, if the address in use is not in // the new addresses, the old ac will be tearDown() and a new ac will be // created. tearDown() generates a state change with Shutdown state, we @@ -160,35 +195,133 @@ func (ccb *ccBalancerWrapper) handleSubConnStateChange(sc balancer.SubConn, s co if sc == nil { return } - ccb.updateCh.Put(&scStateUpdate{ + ccb.updateCh.Put(watcherUpdate{typ: updateTypeSubConnState, update: &scStateUpdate{ sc: sc, state: s, err: err, - }) + }}) } -// updateClientConnState forwards the clientConn update to the wrapped balancer -// synchronously. -// -// Other calls from the channel like exitIdle() and handleSubConnStateChange() -// are handled asynchronously by pushing the update onto a channel, which is -// picked up by the watcher() goroutine and forwarded to the wrapped balancer. -// That approach cannot be taken here because the corresponding API on the -// balancer returns an error which needs to be sent back to the channel to be -// forward to the resolver. -func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnState) error { +// handleSubConnStateChange handles a SubConnState update from the update +// channel and invokes the appropriate method on the underlying balancer. +func (ccb *ccBalancerWrapper) handleSubConnStateChange(update *scStateUpdate) { ccb.balancerMu.Lock() - defer ccb.balancerMu.Unlock() - return ccb.balancer.UpdateClientConnState(*ccs) + ccb.balancer.UpdateSubConnState(update.sc, balancer.SubConnState{ConnectivityState: update.state, ConnectionError: update.err}) + ccb.balancerMu.Unlock() +} + +func (ccb *ccBalancerWrapper) exitIdle() { + if ccb == nil || ccb.cc.GetState() != connectivity.Idle { + return + } + ccb.updateCh.Put(watcherUpdate{typ: updateTypeExitIdle}) +} + +func (ccb *ccBalancerWrapper) handleExitIdle() { + ccb.balancerMu.Lock() + ccb.balancer.ExitIdle() + ccb.balancerMu.Unlock() } func (ccb *ccBalancerWrapper) resolverError(err error) { if ccb == nil { return } + ccb.updateCh.Put(watcherUpdate{typ: updateTypeResolverError, update: err}) +} + +func (ccb *ccBalancerWrapper) handleResolverError(err error) { ccb.balancerMu.Lock() - defer ccb.balancerMu.Unlock() ccb.balancer.ResolverError(err) + ccb.balancerMu.Unlock() +} + +// switchTo is invoked by grpc to instruct the balancer wrapper to switch to the +// LB policy identified by name. +// +// ClientConn calls newCCBalancerWrapper() at creation time. Upon receipt of the +// first good update from the name resolver, it determines the LB policy to use +// and invokes the switchTo() method. Upon receipt of every subsequent update +// from the name resolver, it invokes this method. +// +// the ccBalancerWrapper keeps track of the current LB policy name, and skips +// the graceful balancer switching process if the name does not change. +func (ccb *ccBalancerWrapper) switchTo(name string) { + if ccb == nil { + return + } + ccb.updateCh.Put(watcherUpdate{typ: updateTypeSwitchTo, update: name}) +} + +// handleSwitchTo handles a balancer switch update from the update channel. It +// calls the SwitchTo() method on the gracefulswitch.Balancer with a +// balancer.Builder corresponding to name. If no balancer.Builder is registered +// for the given name, it uses the default LB policy which is "pick_first". +func (ccb *ccBalancerWrapper) handleSwitchTo(name string) { + ccb.balancerMu.Lock() + defer ccb.balancerMu.Unlock() + + if strings.EqualFold(ccb.curBalancerName, name) { + return + } + + channelz.Infof(logger, ccb.cc.channelzID, "ClientConn switching balancer to %q", name) + builder := balancer.Get(name) + if builder == nil { + channelz.Warningf(logger, ccb.cc.channelzID, "Channel switches to new LB policy %q due to fallback from invalid balancer name", PickFirstBalancerName) + channelz.Infof(logger, ccb.cc.channelzID, "failed to get balancer builder for: %v, using pick_first instead", name) + builder = newPickfirstBuilder() + } else { + channelz.Infof(logger, ccb.cc.channelzID, "Channel switches to new LB policy %q", name) + } + + if err := ccb.balancer.SwitchTo(builder); err != nil { + channelz.Errorf(logger, ccb.cc.channelzID, "Channel failed to build new LB policy %q: %v", name, err) + return + } + ccb.curBalancerName = builder.Name() +} + +// handleRemoveSucConn handles a request from the underlying balancer to remove +// a subConn. +// +// See comments in RemoveSubConn() for more details. +func (ccb *ccBalancerWrapper) handleRemoveSubConn(acbw *acBalancerWrapper) { + ccb.mu.Lock() + if ccb.subConns != nil { + delete(ccb.subConns, acbw) + ccb.cc.removeAddrConn(acbw.getAddrConn(), errConnDrain) + } + ccb.mu.Unlock() +} + +func (ccb *ccBalancerWrapper) close() { + if ccb == nil { + return + } + ccb.closed.Fire() + <-ccb.done.Done() +} + +func (ccb *ccBalancerWrapper) handleClose() { + ccb.balancerMu.Lock() + ccb.balancer.Close() + ccb.balancerMu.Unlock() + + ccb.mu.Lock() + scs := ccb.subConns + ccb.subConns = nil + ccb.mu.Unlock() + + ccb.UpdateState(balancer.State{ConnectivityState: connectivity.Connecting, Picker: nil}) + ccb.done.Fire() + // Fire done before removing the addr conns. We can safely unblock + // ccb.close and allow the removeAddrConns to happen + // asynchronously. + for acbw := range scs { + ccb.cc.removeAddrConn(acbw.getAddrConn(), errConnDrain) + } + return } func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) { @@ -214,10 +347,17 @@ func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer } func (ccb *ccBalancerWrapper) RemoveSubConn(sc balancer.SubConn) { - // The RemoveSubConn() is handled in the run() goroutine, to avoid deadlock - // during switchBalancer() if the old balancer calls RemoveSubConn() in its - // Close(). - ccb.updateCh.Put(sc) + // Before we switched the ccBalancerWrapper to use gracefulswitch.Balancer, it + // was required to handle the RemoveSubConn() method asynchronously by pushing + // the update onto the update channel. This was done to avoid a deadlock as + // switchBalancer() was holding cc.mu when calling Close() on the old + // balancer, which would in turn call RemoveSubConn(). + // + // With the use of gracefulswitch.Balancer in ccBalancerWrapper, handling this + // asynchronously is probably not required anymore since the switchTo() method + // handles the balancer switch by pushing the update onto the channel. + // TODO(easwars): Handle this inline. + ccb.updateCh.Put(watcherUpdate{typ: updateTypeSubConn, update: sc}) } func (ccb *ccBalancerWrapper) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) { diff --git a/clientconn.go b/clientconn.go index 86275dca4de..f9896dfa2b5 100644 --- a/clientconn.go +++ b/clientconn.go @@ -278,7 +278,7 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * if creds := cc.dopts.copts.TransportCredentials; creds != nil { credsClone = creds.Clone() } - cc.balancerBuildOpts = balancer.BuildOptions{ + cc.balancerWrapper = newCCBalancerWrapper(cc, balancer.BuildOptions{ DialCreds: credsClone, CredsBundle: cc.dopts.copts.CredsBundle, Dialer: cc.dopts.copts.Dialer, @@ -286,7 +286,7 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * CustomUserAgent: cc.dopts.copts.UserAgent, ChannelzParentID: cc.channelzID, Target: cc.parsedTarget, - } + }) // Build the resolver. rWrapper, err := newCCResolverWrapper(cc, resolverBuilder) @@ -465,12 +465,12 @@ type ClientConn struct { cancel context.CancelFunc // Cancelled on close. // The following are initialized at dial time, and are read-only after that. - target string // User's dial target. - parsedTarget resolver.Target // See parseTargetAndFindResolver(). - authority string // See determineAuthority(). - dopts dialOptions // Default and user specified dial options. - balancerBuildOpts balancer.BuildOptions // TODO: delete once we move to the gracefulswitch balancer. - channelzID *channelz.Identifier // Channelz identifier for the channel. + target string // User's dial target. + parsedTarget resolver.Target // See parseTargetAndFindResolver(). + authority string // See determineAuthority(). + dopts dialOptions // Default and user specified dial options. + channelzID *channelz.Identifier // Channelz identifier for the channel. + balancerWrapper *ccBalancerWrapper // Uses gracefulswitch.balancer underneath. // The following provide their own synchronization, and therefore don't // require cc.mu to be held to access them. @@ -491,8 +491,6 @@ type ClientConn struct { sc *ServiceConfig // Latest service config received from the resolver. conns map[*addrConn]struct{} // Set to nil on close. mkp keepalive.ClientParameters // May be updated upon receipt of a GoAway. - curBalancerName string // TODO: delete as part of https://github.com/grpc/grpc-go/issues/5229. - balancerWrapper *ccBalancerWrapper // TODO: Use gracefulswitch balancer to be able to initialize this once and never rewrite. lceMu sync.Mutex // protects lastConnectionError lastConnectionError error @@ -539,12 +537,7 @@ func (cc *ClientConn) GetState() connectivity.State { func (cc *ClientConn) Connect() { cc.mu.Lock() defer cc.mu.Unlock() - if cc.balancerWrapper.exitIdle() { - return - } - for ac := range cc.conns { - go ac.connect() - } + cc.balancerWrapper.exitIdle() } func (cc *ClientConn) scWatcher() { @@ -666,21 +659,9 @@ func (cc *ClientConn) updateResolverState(s resolver.State, err error) error { if cc.sc != nil && cc.sc.lbConfig != nil { balCfg = cc.sc.lbConfig.cfg } - - cbn := cc.curBalancerName bw := cc.balancerWrapper cc.mu.Unlock() - if cbn != grpclbName { - // Filter any grpclb addresses since we don't have the grpclb balancer. - var addrs []resolver.Address - for _, addr := range s.Addresses { - if addr.Type == resolver.GRPCLB { - continue - } - addrs = append(addrs, addr) - } - s.Addresses = addrs - } + uccsErr := bw.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg}) if ret == nil { ret = uccsErr // prefer ErrBadResolver state since any other error is @@ -709,49 +690,13 @@ func (cc *ClientConn) applyFailingLB(sc *serviceconfig.ParseResult) { cc.csMgr.updateState(connectivity.TransientFailure) } -// switchBalancer starts the switching from current balancer to the balancer -// with the given name. -// -// It will NOT send the current address list to the new balancer. If needed, -// caller of this function should send address list to the new balancer after -// this function returns. -// -// Caller must hold cc.mu. -func (cc *ClientConn) switchBalancer(name string) { - if strings.EqualFold(cc.curBalancerName, name) { - return - } - - channelz.Infof(logger, cc.channelzID, "ClientConn switching balancer to %q", name) - // Don't hold cc.mu while closing the balancers. The balancers may call - // methods that require cc.mu (e.g. cc.NewSubConn()). Holding the mutex - // would cause a deadlock in that case. - cc.mu.Unlock() - cc.balancerWrapper.close() - cc.mu.Lock() - - builder := balancer.Get(name) - if builder == nil { - channelz.Warningf(logger, cc.channelzID, "Channel switches to new LB policy %q due to fallback from invalid balancer name", PickFirstBalancerName) - channelz.Infof(logger, cc.channelzID, "failed to get balancer builder for: %v, using pick_first instead", name) - builder = newPickfirstBuilder() - } else { - channelz.Infof(logger, cc.channelzID, "Channel switches to new LB policy %q", name) - } - - cc.curBalancerName = builder.Name() - cc.balancerWrapper = newCCBalancerWrapper(cc, builder, cc.balancerBuildOpts) -} - func (cc *ClientConn) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State, err error) { cc.mu.Lock() if cc.conns == nil { cc.mu.Unlock() return } - // TODO(bar switching) send updates to all balancer wrappers when balancer - // gracefully switching is supported. - cc.balancerWrapper.handleSubConnStateChange(sc, s, err) + cc.balancerWrapper.updateSubConnState(sc, s, err) cc.mu.Unlock() } @@ -1002,8 +947,6 @@ func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, configSel cc.retryThrottler.Store((*retryThrottler)(nil)) } - // Only look at balancer types and switch balancer if balancer dial - // option is not set. var newBalancerName string if cc.sc != nil && cc.sc.lbConfig != nil { newBalancerName = cc.sc.lbConfig.name @@ -1023,7 +966,7 @@ func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, configSel newBalancerName = PickFirstBalancerName } } - cc.switchBalancer(newBalancerName) + cc.balancerWrapper.switchTo(newBalancerName) } func (cc *ClientConn) resolveNow(o resolver.ResolveNowOptions) { diff --git a/clientconn_test.go b/clientconn_test.go index 353d8fb325d..21f70c8f251 100644 --- a/clientconn_test.go +++ b/clientconn_test.go @@ -845,9 +845,13 @@ func (s) TestBackoffCancel(t *testing.T) { if err != nil { t.Fatalf("Failed to create ClientConn: %v", err) } - <-dialStrCh - cc.Close() - // Should not leak. May need -count 5000 to exercise. + defer cc.Close() + + select { + case <-time.After(defaultTestTimeout): + t.Fatal("Timeout when waiting for custom dialer to be invoked during Dial") + case <-dialStrCh: + } } // UpdateAddresses should cause the next reconnect to begin from the top of the diff --git a/internal/balancer/gracefulswitch/gracefulswitch.go b/internal/balancer/gracefulswitch/gracefulswitch.go index af6cc46189f..bc39d310350 100644 --- a/internal/balancer/gracefulswitch/gracefulswitch.go +++ b/internal/balancer/gracefulswitch/gracefulswitch.go @@ -178,6 +178,10 @@ func (gsb *Balancer) ResolverError(err error) { } // ExitIdle forwards the call to the latest balancer created. +// +// If the latest balancer does not support ExitIdle, the subConns need to be +// re-connected manually. This used to be done in the ClientConn earlier, but +// is done here now since the ClientConn uses gsb by default. func (gsb *Balancer) ExitIdle() { balToUpdate := gsb.latestBalancer() if balToUpdate == nil { @@ -188,6 +192,10 @@ func (gsb *Balancer) ExitIdle() { // called. if ei, ok := balToUpdate.Balancer.(balancer.ExitIdler); ok { ei.ExitIdle() + return + } + for sc := range balToUpdate.subconns { + sc.Connect() } } diff --git a/internal/balancer/stub/stub.go b/internal/balancer/stub/stub.go index 950eaaa0278..9fe6d93c9db 100644 --- a/internal/balancer/stub/stub.go +++ b/internal/balancer/stub/stub.go @@ -19,7 +19,12 @@ // Package stub implements a balancer for testing purposes. package stub -import "google.golang.org/grpc/balancer" +import ( + "encoding/json" + + "google.golang.org/grpc/balancer" + "google.golang.org/grpc/serviceconfig" +) // BalancerFuncs contains all balancer.Balancer functions with a preceding // *BalancerData parameter for passing additional instance information. Any @@ -28,6 +33,8 @@ type BalancerFuncs struct { // Init is called after ClientConn and BuildOptions are set in // BalancerData. It may be used to initialize BalancerData.Data. Init func(*BalancerData) + // ParseConfig is used for parsing LB configs, if specified. + ParseConfig func(json.RawMessage) (serviceconfig.LoadBalancingConfig, error) UpdateClientConnState func(*BalancerData, balancer.ClientConnState) error ResolverError func(*BalancerData, error) @@ -97,6 +104,13 @@ func (bb bb) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer. func (bb bb) Name() string { return bb.name } +func (bb bb) ParseConfig(lbCfg json.RawMessage) (serviceconfig.LoadBalancingConfig, error) { + if bb.bf.ParseConfig != nil { + return bb.bf.ParseConfig(lbCfg) + } + return nil, nil +} + // Register registers a stub balancer builder which will call the provided // functions. The name used should be unique. func Register(name string, bf BalancerFuncs) { diff --git a/test/balancer_switching_test.go b/test/balancer_switching_test.go index 2453738bfa4..b5d793b72e4 100644 --- a/test/balancer_switching_test.go +++ b/test/balancer_switching_test.go @@ -634,3 +634,97 @@ func (s) TestBalancerSwitch_OldBalancerCallsRemoveSubConnInClose(t *testing.T) { case <-done: } } + +// TestBalancerSwitch_Graceful tests the graceful switching of LB policies. It +// starts off by configuring "round_robin" on the channel and ensures that RPCs +// are successful. Then, it switches to a stub balancer which does not report a +// picker until instructed by the test do to so. At this point, the test +// verifies that RPCs are still successful using the old balancer. Then the test +// asks the new balancer to report a healthy picker and the test verifies that +// the RPCs get routed using the picker reported by the new balancer. +func (s) TestBalancerSwitch_Graceful(t *testing.T) { + backends, cleanup := startBackendsForBalancerSwitch(t) + defer cleanup() + addrs := stubBackendsToResolverAddrs(backends) + + r := manual.NewBuilderWithScheme("whatever") + cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r)) + if err != nil { + t.Fatalf("grpc.Dial() failed: %v", err) + } + defer cc.Close() + + // Push a resolver update with the service config specifying "round_robin". + now := time.Now() + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + r.UpdateState(resolver.State{ + Addresses: addrs, + ServiceConfig: parseServiceConfig(t, r, rrServiceConfig), + }) + if err := checkForTraceEvent(ctx, wantRoundRobinTraceDesc, now); err != nil { + t.Fatalf("timeout when waiting for a trace event: %s, err: %v", wantRoundRobinTraceDesc, err) + } + if err := checkRoundRobin(ctx, cc, addrs); err != nil { + t.Fatal(err) + } + + // Register a stub balancer which uses a "pick_first" balancer underneath and + // signals on a channel when it receives ClientConn updates. But it does not + // forward the ccUpdate to the underlying "pick_first" balancer until the test + // asks it to do so. This allows us to test the graceful switch functionality. + // Until the test asks the stub balancer to forward the ccUpdate, RPCs should + // get routed to the old balancer. And once the test gives the go ahead, RPCs + // should get routed to the new balancer. + ccUpdateCh := make(chan struct{}) + waitToProceed := make(chan struct{}) + stub.Register(t.Name(), stub.BalancerFuncs{ + Init: func(bd *stub.BalancerData) { + pf := balancer.Get(grpc.PickFirstBalancerName) + bd.Data = pf.Build(bd.ClientConn, bd.BuildOptions) + }, + UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error { + bal := bd.Data.(balancer.Balancer) + close(ccUpdateCh) + go func() { + <-waitToProceed + bal.UpdateClientConnState(ccs) + }() + return nil + }, + UpdateSubConnState: func(bd *stub.BalancerData, sc balancer.SubConn, state balancer.SubConnState) { + bal := bd.Data.(balancer.Balancer) + bal.UpdateSubConnState(sc, state) + }, + }) + + // Push a resolver update with the service config specifying our stub + // balancer. We should see a trace event for this balancer switch. But RPCs + // should still be routed to the old balancer since our stub balancer does not + // report a ready picker until we ask it to do so. + now = time.Now() + r.UpdateState(resolver.State{ + Addresses: addrs, + ServiceConfig: r.CC.ParseServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%v": {}}]}`, t.Name())), + }) + select { + case <-ctx.Done(): + t.Fatal("Timeout when waiting for a ClientConnState update on the new balancer") + case <-ccUpdateCh: + } + wantTraceDesc := fmt.Sprintf("Channel switches to new LB policy %q", t.Name()) + if err := checkForTraceEvent(ctx, wantTraceDesc, now); err != nil { + t.Fatalf("timeout when waiting for a trace event: %s, err: %v", wantTraceDesc, err) + } + if err := checkRoundRobin(ctx, cc, addrs); err != nil { + t.Fatal(err) + } + + // Ask our stub balancer to forward the earlier received ccUpdate to the + // underlying "pick_first" balancer which will result in a healthy picker + // being reported to the channel. RPCs should start using the new balancer. + close(waitToProceed) + if err := checkPickFirst(ctx, cc, addrs[0].Addr); err != nil { + t.Fatal(err) + } +} diff --git a/test/resolver_update_test.go b/test/resolver_update_test.go index bf7f0d2c2ed..6b568a227aa 100644 --- a/test/resolver_update_test.go +++ b/test/resolver_update_test.go @@ -32,6 +32,7 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/internal" + "google.golang.org/grpc/internal/balancer/stub" "google.golang.org/grpc/internal/stubserver" "google.golang.org/grpc/internal/testutils" "google.golang.org/grpc/resolver" @@ -127,54 +128,6 @@ func (s) TestResolverUpdate_InvalidServiceConfigAsFirstUpdate(t *testing.T) { } } -// The wrappingBalancer wraps a pick_first balancer and writes to a channel when -// it receives a ClientConn update. This is different to a stub balancer which -// only notifies of updates from grpc, but does not contain a real balancer. -// -// The wrappingBalancer allows us to write tests with a real backend and make -// real RPCs. -type wrappingBalancerBuilder struct { - name string - updateCh *testutils.Channel -} - -func (bb wrappingBalancerBuilder) Name() string { return bb.name } - -func (bb wrappingBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer { - pf := balancer.Get(grpc.PickFirstBalancerName) - b := &wrappingBalancer{ - Balancer: pf.Build(cc, opts), - updateCh: bb.updateCh, - } - return b -} - -func (bb wrappingBalancerBuilder) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, error) { - cfg := &wrappingBalancerConfig{} - if err := json.Unmarshal(c, cfg); err != nil { - return nil, err - } - return cfg, nil -} - -type wrappingBalancer struct { - balancer.Balancer - updateCh *testutils.Channel -} - -func (b *wrappingBalancer) UpdateClientConnState(c balancer.ClientConnState) error { - if _, ok := c.BalancerConfig.(*wrappingBalancerConfig); !ok { - return fmt.Errorf("received balancer config of unsupported type %T", c.BalancerConfig) - } - b.updateCh.Send(c) - return b.Balancer.UpdateClientConnState(c) -} - -type wrappingBalancerConfig struct { - serviceconfig.LoadBalancingConfig - Config string `json:"config,omitempty"` -} - func verifyClientConnStateUpdate(got, want balancer.ClientConnState) error { if got, want := got.ResolverState.Addresses, want.ResolverState.Addresses; !cmp.Equal(got, want) { return fmt.Errorf("update got unexpected addresses: %v, want %v", got, want) @@ -193,11 +146,38 @@ func verifyClientConnStateUpdate(got, want balancer.ClientConnState) error { // having sent a good update. This should result in the ClientConn discarding // the new invalid service config, and continuing to use the old good config. func (s) TestResolverUpdate_InvalidServiceConfigAfterGoodUpdate(t *testing.T) { - // Register a wrapper balancer to get notified of ClientConn updates. - ccsCh := testutils.NewChannel() - balancer.Register(wrappingBalancerBuilder{ - name: t.Name(), - updateCh: ccsCh, + type wrappingBalancerConfig struct { + serviceconfig.LoadBalancingConfig + Config string `json:"config,omitempty"` + } + + // Register a stub balancer which uses a "pick_first" balancer underneath and + // signals on a channel when it receives ClientConn updates. + ccUpdateCh := testutils.NewChannel() + stub.Register(t.Name(), stub.BalancerFuncs{ + Init: func(bd *stub.BalancerData) { + pf := balancer.Get(grpc.PickFirstBalancerName) + bd.Data = pf.Build(bd.ClientConn, bd.BuildOptions) + }, + ParseConfig: func(lbCfg json.RawMessage) (serviceconfig.LoadBalancingConfig, error) { + cfg := &wrappingBalancerConfig{} + if err := json.Unmarshal(lbCfg, cfg); err != nil { + return nil, err + } + return cfg, nil + }, + UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error { + if _, ok := ccs.BalancerConfig.(*wrappingBalancerConfig); !ok { + return fmt.Errorf("received balancer config of unsupported type %T", ccs.BalancerConfig) + } + bal := bd.Data.(balancer.Balancer) + ccUpdateCh.Send(ccs) + return bal.UpdateClientConnState(ccs) + }, + UpdateSubConnState: func(bd *stub.BalancerData, sc balancer.SubConn, state balancer.SubConnState) { + bal := bd.Data.(balancer.Balancer) + bal.UpdateSubConnState(sc, state) + }, }) // Start a backend exposing the test service. @@ -242,7 +222,7 @@ func (s) TestResolverUpdate_InvalidServiceConfigAfterGoodUpdate(t *testing.T) { }, BalancerConfig: &wrappingBalancerConfig{Config: lbCfg}, } - ccs, err := ccsCh.Receive(ctx) + ccs, err := ccUpdateCh.Receive(ctx) if err != nil { t.Fatalf("Timeout when waiting for ClientConnState update from grpc") } @@ -263,7 +243,7 @@ func (s) TestResolverUpdate_InvalidServiceConfigAfterGoodUpdate(t *testing.T) { badSC := r.CC.ParseServiceConfig("bad json service config") wantCCS.ResolverState.ServiceConfig = badSC r.UpdateState(resolver.State{Addresses: addrs, ServiceConfig: badSC}) - ccs, err = ccsCh.Receive(ctx) + ccs, err = ccUpdateCh.Receive(ctx) if err != nil { t.Fatalf("Timeout when waiting for ClientConnState update from grpc") } From b745f67dc920716f62217b18fbe1d755ccb88d64 Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Thu, 31 Mar 2022 12:21:43 -0700 Subject: [PATCH 02/16] first round of review comments --- balancer_conn_wrappers.go | 49 ++++++++++++++------------------------- clientconn.go | 5 ---- test/roundrobin_test.go | 5 ++-- 3 files changed, 20 insertions(+), 39 deletions(-) diff --git a/balancer_conn_wrappers.go b/balancer_conn_wrappers.go index 006e32e2e50..d3fec9d8d22 100644 --- a/balancer_conn_wrappers.go +++ b/balancer_conn_wrappers.go @@ -148,9 +148,6 @@ func (ccb *ccBalancerWrapper) watcher() { // and return. It needs to return the error returned by the underlying balancer // back to grpc which propagates that to the resolver. func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnState) error { - if ccb == nil { - return nil - } ccb.updateCh.Put(watcherUpdate{typ: updateTypeClientConnState, update: ccs}) res := <-ccb.resultCh.Get() ccb.resultCh.Load() @@ -164,6 +161,10 @@ func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnStat // handleClientConnStateChange handles a ClientConnState update from the update // channel and invokes the appropriate method on the underlying balancer. +// +// If the addresses specified in the update contain addresses of type "grpclb" +// and the selected LB policy is not "grpclb", these addresses will be filtered +// out and ccs will be modified with the updated address list. func (ccb *ccBalancerWrapper) handleClientConnStateChange(ccs *balancer.ClientConnState) { ccb.balancerMu.Lock() defer ccb.balancerMu.Unlock() @@ -206,34 +207,31 @@ func (ccb *ccBalancerWrapper) updateSubConnState(sc balancer.SubConn, s connecti // channel and invokes the appropriate method on the underlying balancer. func (ccb *ccBalancerWrapper) handleSubConnStateChange(update *scStateUpdate) { ccb.balancerMu.Lock() + defer ccb.balancerMu.Unlock() ccb.balancer.UpdateSubConnState(update.sc, balancer.SubConnState{ConnectivityState: update.state, ConnectionError: update.err}) - ccb.balancerMu.Unlock() } func (ccb *ccBalancerWrapper) exitIdle() { - if ccb == nil || ccb.cc.GetState() != connectivity.Idle { - return - } ccb.updateCh.Put(watcherUpdate{typ: updateTypeExitIdle}) } func (ccb *ccBalancerWrapper) handleExitIdle() { ccb.balancerMu.Lock() + defer ccb.balancerMu.Unlock() + if ccb.cc.GetState() != connectivity.Idle { + return + } ccb.balancer.ExitIdle() - ccb.balancerMu.Unlock() } func (ccb *ccBalancerWrapper) resolverError(err error) { - if ccb == nil { - return - } ccb.updateCh.Put(watcherUpdate{typ: updateTypeResolverError, update: err}) } func (ccb *ccBalancerWrapper) handleResolverError(err error) { ccb.balancerMu.Lock() + defer ccb.balancerMu.Unlock() ccb.balancer.ResolverError(err) - ccb.balancerMu.Unlock() } // switchTo is invoked by grpc to instruct the balancer wrapper to switch to the @@ -247,9 +245,6 @@ func (ccb *ccBalancerWrapper) handleResolverError(err error) { // the ccBalancerWrapper keeps track of the current LB policy name, and skips // the graceful balancer switching process if the name does not change. func (ccb *ccBalancerWrapper) switchTo(name string) { - if ccb == nil { - return - } ccb.updateCh.Put(watcherUpdate{typ: updateTypeSwitchTo, update: name}) } @@ -261,15 +256,18 @@ func (ccb *ccBalancerWrapper) handleSwitchTo(name string) { ccb.balancerMu.Lock() defer ccb.balancerMu.Unlock() + // TODO: Ensure other languages using case-insensitive balancer registries. if strings.EqualFold(ccb.curBalancerName, name) { return } - channelz.Infof(logger, ccb.cc.channelzID, "ClientConn switching balancer to %q", name) + // TODO: Ensure that name is a registered LB policy when we get here. + // We currently only validate the `loadBalancingConfig` field. We need to do + // the same for the `loadBalancingPolicy` field and reject the service config + // if the specified policy is not registered. builder := balancer.Get(name) if builder == nil { - channelz.Warningf(logger, ccb.cc.channelzID, "Channel switches to new LB policy %q due to fallback from invalid balancer name", PickFirstBalancerName) - channelz.Infof(logger, ccb.cc.channelzID, "failed to get balancer builder for: %v, using pick_first instead", name) + channelz.Warningf(logger, ccb.cc.channelzID, "Channel switches to new LB policy %q, since the specified LB policy %q was not registered", PickFirstBalancerName, name) builder = newPickfirstBuilder() } else { channelz.Infof(logger, ccb.cc.channelzID, "Channel switches to new LB policy %q", name) @@ -288,17 +286,14 @@ func (ccb *ccBalancerWrapper) handleSwitchTo(name string) { // See comments in RemoveSubConn() for more details. func (ccb *ccBalancerWrapper) handleRemoveSubConn(acbw *acBalancerWrapper) { ccb.mu.Lock() + defer ccb.mu.Unlock() if ccb.subConns != nil { delete(ccb.subConns, acbw) ccb.cc.removeAddrConn(acbw.getAddrConn(), errConnDrain) } - ccb.mu.Unlock() } func (ccb *ccBalancerWrapper) close() { - if ccb == nil { - return - } ccb.closed.Fire() <-ccb.done.Done() } @@ -309,19 +304,9 @@ func (ccb *ccBalancerWrapper) handleClose() { ccb.balancerMu.Unlock() ccb.mu.Lock() - scs := ccb.subConns ccb.subConns = nil ccb.mu.Unlock() - - ccb.UpdateState(balancer.State{ConnectivityState: connectivity.Connecting, Picker: nil}) ccb.done.Fire() - // Fire done before removing the addr conns. We can safely unblock - // ccb.close and allow the removeAddrConns to happen - // asynchronously. - for acbw := range scs { - ccb.cc.removeAddrConn(acbw.getAddrConn(), errConnDrain) - } - return } func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) { diff --git a/clientconn.go b/clientconn.go index f9896dfa2b5..9473e3fb172 100644 --- a/clientconn.go +++ b/clientconn.go @@ -692,10 +692,6 @@ func (cc *ClientConn) applyFailingLB(sc *serviceconfig.ParseResult) { func (cc *ClientConn) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State, err error) { cc.mu.Lock() - if cc.conns == nil { - cc.mu.Unlock() - return - } cc.balancerWrapper.updateSubConnState(sc, s, err) cc.mu.Unlock() } @@ -1017,7 +1013,6 @@ func (cc *ClientConn) Close() error { rWrapper := cc.resolverWrapper cc.resolverWrapper = nil bWrapper := cc.balancerWrapper - cc.balancerWrapper = nil cc.mu.Unlock() cc.blockingpicker.close() diff --git a/test/roundrobin_test.go b/test/roundrobin_test.go index 7f16aa2cb3c..3724743ec7c 100644 --- a/test/roundrobin_test.go +++ b/test/roundrobin_test.go @@ -45,7 +45,6 @@ const rrServiceConfig = `{"loadBalancingConfig": [{"round_robin":{}}]}` func checkRoundRobin(ctx context.Context, cc *grpc.ClientConn, addrs []resolver.Address) error { client := testgrpc.NewTestServiceClient(cc) - var peer peer.Peer // Make sure connections to all backends are up. backendCount := len(addrs) for i := 0; i < backendCount; i++ { @@ -54,6 +53,7 @@ func checkRoundRobin(ctx context.Context, cc *grpc.ClientConn, addrs []resolver. if ctx.Err() != nil { return fmt.Errorf("timeout waiting for connection to %q to be up", addrs[i].Addr) } + var peer peer.Peer if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(&peer)); err != nil { // Some tests remove backends and check if round robin is happening // across the remaining backends. In such cases, RPCs can initially fail @@ -69,10 +69,11 @@ func checkRoundRobin(ctx context.Context, cc *grpc.ClientConn, addrs []resolver. } // Make sure RPCs are sent to all backends. for i := 0; i < 3*backendCount; i++ { + var peer peer.Peer if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(&peer)); err != nil { return fmt.Errorf("EmptyCall() = %v, want ", err) } - if gotPeer, wantPeer := addrs[i%backendCount].Addr, peer.Addr.String(); gotPeer != wantPeer { + if gotPeer, wantPeer := peer.Addr.String(), addrs[i%backendCount].Addr; gotPeer != wantPeer { return fmt.Errorf("rpc sent to peer %q, want peer %q", gotPeer, wantPeer) } } From d6fc583e64a0423eb7b7f683a11455178887e0de Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Thu, 31 Mar 2022 14:56:12 -0700 Subject: [PATCH 03/16] dont check for round_robin after checking for grpclb trace event --- test/balancer_switching_test.go | 9 --------- 1 file changed, 9 deletions(-) diff --git a/test/balancer_switching_test.go b/test/balancer_switching_test.go index b5d793b72e4..331193323dc 100644 --- a/test/balancer_switching_test.go +++ b/test/balancer_switching_test.go @@ -269,9 +269,6 @@ func (s) TestBalancerSwitch_pickFirstToGRPCLB(t *testing.T) { if err := checkForTraceEvent(ctx, wantGRPCLBTraceDesc, now); err != nil { t.Fatalf("timeout when waiting for a trace event: %s, err: %v", wantGRPCLBTraceDesc, err) } - if err := checkRoundRobin(ctx, cc, addrs); err != nil { - t.Fatal(err) - } // Push a resolver update containing a non-existent grpclb server address. // This should not lead to a balancer switch. @@ -354,9 +351,6 @@ func (s) TestBalancerSwitch_RoundRobinToGRPCLB(t *testing.T) { if err := checkForTraceEvent(ctx, wantGRPCLBTraceDesc, now); err != nil { t.Fatalf("timeout when waiting for a trace event: %s, err: %v", wantGRPCLBTraceDesc, err) } - if err := checkRoundRobin(ctx, cc, addrs); err != nil { - t.Fatal(err) - } // Switch back to "round_robin". now = time.Now() @@ -465,9 +459,6 @@ func (s) TestBalancerSwitch_grpclbAddressOverridesLoadBalancingPolicy(t *testing if err := checkForTraceEvent(ctx, wantGRPCLBTraceDesc, now); err != nil { t.Fatalf("timeout when waiting for a trace event: %s, err: %v", wantGRPCLBTraceDesc, err) } - if err := checkRoundRobin(ctx, cc, addrs); err != nil { - t.Fatal(err) - } // Push a resolver update with a service config using the deprecated // `loadBalancingPolicy` field pointing to round_robin. The addresses list From 9af61d99b041c6790ff0d65894d152b3af2c9a2f Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Thu, 31 Mar 2022 15:14:10 -0700 Subject: [PATCH 04/16] another test fix --- test/balancer_switching_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/balancer_switching_test.go b/test/balancer_switching_test.go index 331193323dc..d1c0bcdf517 100644 --- a/test/balancer_switching_test.go +++ b/test/balancer_switching_test.go @@ -482,12 +482,12 @@ func (s) TestBalancerSwitch_grpclbAddressOverridesLoadBalancingPolicy(t *testing // Switch to "round_robin" by removing the address of type "grpclb". now = time.Now() r.UpdateState(resolver.State{Addresses: addrs}) - if err := checkRoundRobin(ctx, cc, addrs); err != nil { - t.Fatal(err) - } if err := checkForTraceEvent(ctx, wantRoundRobinTraceDesc, now); err != nil { t.Fatalf("timeout when waiting for a trace event: %s, err: %v", wantRoundRobinTraceDesc, err) } + if err := checkRoundRobin(ctx, cc, addrs); err != nil { + t.Fatal(err) + } } // TestBalancerSwitch_LoadBalancingConfigTrumps verifies that the From aaa96fe61764a25c7e6bbfb631c5e3d1f23d6a33 Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Thu, 31 Mar 2022 15:53:16 -0700 Subject: [PATCH 05/16] don't keep of subConns in the balancer wrapper anymore --- balancer_conn_wrappers.go | 24 +++--------------------- 1 file changed, 3 insertions(+), 21 deletions(-) diff --git a/balancer_conn_wrappers.go b/balancer_conn_wrappers.go index d3fec9d8d22..46f3d8fa7a4 100644 --- a/balancer_conn_wrappers.go +++ b/balancer_conn_wrappers.go @@ -54,9 +54,6 @@ type ccBalancerWrapper struct { resultCh *buffer.Unbounded // Results of calls to UpdateClientConnState() are pushed here. closed *grpcsync.Event // Indicates if close has been called. done *grpcsync.Event // Indicates if close has completed its work. - - mu sync.Mutex - subConns map[*acBalancerWrapper]struct{} } // newCCBalancerWrapper creates a new balancer wrapper. The underlying balancer @@ -68,7 +65,6 @@ func newCCBalancerWrapper(cc *ClientConn, bopts balancer.BuildOptions) *ccBalanc resultCh: buffer.NewUnbounded(), closed: grpcsync.NewEvent(), done: grpcsync.NewEvent(), - subConns: make(map[*acBalancerWrapper]struct{}), } go ccb.watcher() ccb.balancer = gracefulswitch.NewBalancer(ccb, bopts) @@ -285,12 +281,7 @@ func (ccb *ccBalancerWrapper) handleSwitchTo(name string) { // // See comments in RemoveSubConn() for more details. func (ccb *ccBalancerWrapper) handleRemoveSubConn(acbw *acBalancerWrapper) { - ccb.mu.Lock() - defer ccb.mu.Unlock() - if ccb.subConns != nil { - delete(ccb.subConns, acbw) - ccb.cc.removeAddrConn(acbw.getAddrConn(), errConnDrain) - } + ccb.cc.removeAddrConn(acbw.getAddrConn(), errConnDrain) } func (ccb *ccBalancerWrapper) close() { @@ -302,10 +293,6 @@ func (ccb *ccBalancerWrapper) handleClose() { ccb.balancerMu.Lock() ccb.balancer.Close() ccb.balancerMu.Unlock() - - ccb.mu.Lock() - ccb.subConns = nil - ccb.mu.Unlock() ccb.done.Fire() } @@ -313,9 +300,7 @@ func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer if len(addrs) <= 0 { return nil, fmt.Errorf("grpc: cannot create SubConn with empty address list") } - ccb.mu.Lock() - defer ccb.mu.Unlock() - if ccb.subConns == nil { + if ccb.done.HasFired() { return nil, fmt.Errorf("grpc: ClientConn balancer wrapper was closed") } ac, err := ccb.cc.newAddrConn(addrs, opts) @@ -327,7 +312,6 @@ func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer acbw.ac.mu.Lock() ac.acbw = acbw acbw.ac.mu.Unlock() - ccb.subConns[acbw] = struct{}{} return acbw, nil } @@ -354,9 +338,7 @@ func (ccb *ccBalancerWrapper) UpdateAddresses(sc balancer.SubConn, addrs []resol } func (ccb *ccBalancerWrapper) UpdateState(s balancer.State) { - ccb.mu.Lock() - defer ccb.mu.Unlock() - if ccb.subConns == nil { + if ccb.done.HasFired() { return } // Update picker before updating state. Even though the ordering here does From 9504370cbeae5057e82292e3f010d6b65c52646f Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Thu, 31 Mar 2022 16:00:49 -0700 Subject: [PATCH 06/16] improve comment on the ccBalancerWrapper type --- balancer_conn_wrappers.go | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/balancer_conn_wrappers.go b/balancer_conn_wrappers.go index 46f3d8fa7a4..e8597b5651d 100644 --- a/balancer_conn_wrappers.go +++ b/balancer_conn_wrappers.go @@ -32,14 +32,19 @@ import ( "google.golang.org/grpc/resolver" ) -// ccBalancerWrapper is a wrapper on top of cc for balancers. It ensures that -// method invocations on the underlying balancer happen synchronously and in the -// same order in which they were received from grpc. +// ccBalancerWrapper sits between the ClientConn and the Balancer. +// +// ccBalancerWrapper implements methods corresponding to the ones on the +// balancer.Balancer interface. The ClientConn is free to call these methods +// concurrently and the ccBalancerWrapper ensures that calls from the ClientConn +// to the Balancer happen synchronously and in order. +// +// ccBalancerWrapper also implements the balancer.ClientConn interface and is +// passed to the Balancer implementations. It invokes unexported methods on the +// ClientConn to handle these calls from the Balancer. // // It uses the gracefulswitch.Balancer internally to ensure that balancer // switches happen in a graceful manner. -// -// It implements balancer.ClientConn interface. type ccBalancerWrapper struct { cc *ClientConn From e963f8020a8480ef25bb2a182718845619b3854d Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Thu, 31 Mar 2022 16:13:47 -0700 Subject: [PATCH 07/16] exit early while waiting for clientconn update result if close is called --- balancer_conn_wrappers.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/balancer_conn_wrappers.go b/balancer_conn_wrappers.go index e8597b5651d..0c02411ce04 100644 --- a/balancer_conn_wrappers.go +++ b/balancer_conn_wrappers.go @@ -150,8 +150,16 @@ func (ccb *ccBalancerWrapper) watcher() { // back to grpc which propagates that to the resolver. func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnState) error { ccb.updateCh.Put(watcherUpdate{typ: updateTypeClientConnState, update: ccs}) - res := <-ccb.resultCh.Get() - ccb.resultCh.Load() + + var res interface{} + select { + case res = <-ccb.resultCh.Get(): + ccb.resultCh.Load() + case <-ccb.closed.Done(): + // Return early if the balancer wrapper is closed while we are waiting for + // the underlying balancer to process a ClientConnState update. + return nil + } // If the returned error is nil, attempting to type assert to error leads to // panic. So, this needs to handled separately. if res == nil { From 1409a6051b6b3352d42a6b3287ceb5d340a50e8e Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Thu, 31 Mar 2022 16:22:39 -0700 Subject: [PATCH 08/16] get rid of balancerMu --- balancer_conn_wrappers.go | 21 +++------------------ 1 file changed, 3 insertions(+), 18 deletions(-) diff --git a/balancer_conn_wrappers.go b/balancer_conn_wrappers.go index 0c02411ce04..d9778ff3cd5 100644 --- a/balancer_conn_wrappers.go +++ b/balancer_conn_wrappers.go @@ -48,10 +48,9 @@ import ( type ccBalancerWrapper struct { cc *ClientConn - // balancerMu protects access to the following fields. Any calls on the - // underlying balancer must be made with the mutex held. This ensures that we - // never call the underlying balancer methods concurrently. - balancerMu sync.Mutex + // Since these fields are accessed only from handleXxx() methods which are + // synchronized by the watcher goroutine, we do not need a mutex to protect + // these fields. balancer *gracefulswitch.Balancer curBalancerName string @@ -175,9 +174,6 @@ func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnStat // and the selected LB policy is not "grpclb", these addresses will be filtered // out and ccs will be modified with the updated address list. func (ccb *ccBalancerWrapper) handleClientConnStateChange(ccs *balancer.ClientConnState) { - ccb.balancerMu.Lock() - defer ccb.balancerMu.Unlock() - if ccb.curBalancerName != grpclbName { // Filter any grpclb addresses since we don't have the grpclb balancer. var addrs []resolver.Address @@ -215,8 +211,6 @@ func (ccb *ccBalancerWrapper) updateSubConnState(sc balancer.SubConn, s connecti // handleSubConnStateChange handles a SubConnState update from the update // channel and invokes the appropriate method on the underlying balancer. func (ccb *ccBalancerWrapper) handleSubConnStateChange(update *scStateUpdate) { - ccb.balancerMu.Lock() - defer ccb.balancerMu.Unlock() ccb.balancer.UpdateSubConnState(update.sc, balancer.SubConnState{ConnectivityState: update.state, ConnectionError: update.err}) } @@ -225,8 +219,6 @@ func (ccb *ccBalancerWrapper) exitIdle() { } func (ccb *ccBalancerWrapper) handleExitIdle() { - ccb.balancerMu.Lock() - defer ccb.balancerMu.Unlock() if ccb.cc.GetState() != connectivity.Idle { return } @@ -238,8 +230,6 @@ func (ccb *ccBalancerWrapper) resolverError(err error) { } func (ccb *ccBalancerWrapper) handleResolverError(err error) { - ccb.balancerMu.Lock() - defer ccb.balancerMu.Unlock() ccb.balancer.ResolverError(err) } @@ -262,9 +252,6 @@ func (ccb *ccBalancerWrapper) switchTo(name string) { // balancer.Builder corresponding to name. If no balancer.Builder is registered // for the given name, it uses the default LB policy which is "pick_first". func (ccb *ccBalancerWrapper) handleSwitchTo(name string) { - ccb.balancerMu.Lock() - defer ccb.balancerMu.Unlock() - // TODO: Ensure other languages using case-insensitive balancer registries. if strings.EqualFold(ccb.curBalancerName, name) { return @@ -303,9 +290,7 @@ func (ccb *ccBalancerWrapper) close() { } func (ccb *ccBalancerWrapper) handleClose() { - ccb.balancerMu.Lock() ccb.balancer.Close() - ccb.balancerMu.Unlock() ccb.done.Fire() } From 6f91640878b389715b86cfa7fc051b7b2f6c2af9 Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Thu, 31 Mar 2022 16:29:01 -0700 Subject: [PATCH 09/16] fix a comment in gsb --- internal/balancer/gracefulswitch/gracefulswitch.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/internal/balancer/gracefulswitch/gracefulswitch.go b/internal/balancer/gracefulswitch/gracefulswitch.go index bc39d310350..7ba8f4d1831 100644 --- a/internal/balancer/gracefulswitch/gracefulswitch.go +++ b/internal/balancer/gracefulswitch/gracefulswitch.go @@ -179,9 +179,8 @@ func (gsb *Balancer) ResolverError(err error) { // ExitIdle forwards the call to the latest balancer created. // -// If the latest balancer does not support ExitIdle, the subConns need to be -// re-connected manually. This used to be done in the ClientConn earlier, but -// is done here now since the ClientConn uses gsb by default. +// If the latest balancer does not support ExitIdle, the subConns are +// re-connected to manually. func (gsb *Balancer) ExitIdle() { balToUpdate := gsb.latestBalancer() if balToUpdate == nil { From a1624bed4d5939a8dadaaa1101ae82c0ff517b9f Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Thu, 31 Mar 2022 16:46:32 -0700 Subject: [PATCH 10/16] fix a few tests to not rely on the channelz trace event --- test/balancer_switching_test.go | 125 ++++++++++---------------------- 1 file changed, 39 insertions(+), 86 deletions(-) diff --git a/test/balancer_switching_test.go b/test/balancer_switching_test.go index d1c0bcdf517..574a3780157 100644 --- a/test/balancer_switching_test.go +++ b/test/balancer_switching_test.go @@ -44,20 +44,28 @@ const ( wantGRPCLBTraceDesc = `Channel switches to new LB policy "grpclb"` wantRoundRobinTraceDesc = `Channel switches to new LB policy "round_robin"` wantPickFirstTraceDesc = `Channel switches to new LB policy "pick_first"` + + // This is the number of stub backends set up at the start of each test. + backendCount = 3 ) -// setupBackendsAndFakeGRPCLB sets up the stub server backends and a fake grpclb -// server for tests which exercise balancer switch scenarios involving grpclb. +// setupBackendsAndFakeGRPCLB sets up backendCount number of stub server +// backends and a fake grpclb server for tests which exercise balancer switch +// scenarios involving grpclb. +// +// The fake grpclb server always returns the first of the configured stub +// backends as backend addresses. So, the tests are free to use the other +// backends with other LB policies to verify balancer switching scenarios. +// // Returns a cleanup function to be invoked by the caller. func setupBackendsAndFakeGRPCLB(t *testing.T) ([]*stubserver.StubServer, *fakegrpclb.Server, func()) { czCleanup := channelz.NewChannelzStorageForTesting() backends, backendsCleanup := startBackendsForBalancerSwitch(t) - rawAddrs := stubBackendsToRawAddrs(backends) lbServer, err := fakegrpclb.NewServer(fakegrpclb.ServerParams{ LoadBalancedServiceName: loadBalancedServiceName, LoadBalancedServicePort: loadBalancedServicePort, - BackendAddresses: rawAddrs, + BackendAddresses: []string{backends[0].Address}, }) if err != nil { t.Fatalf("failed to create fake grpclb server: %v", err) @@ -81,7 +89,6 @@ func setupBackendsAndFakeGRPCLB(t *testing.T) ([]*stubserver.StubServer, *fakegr func startBackendsForBalancerSwitch(t *testing.T) ([]*stubserver.StubServer, func()) { t.Helper() - const backendCount = 3 backends := make([]*stubserver.StubServer, backendCount) for i := 0; i < backendCount; i++ { backend := &stubserver.StubServer{ @@ -100,16 +107,6 @@ func startBackendsForBalancerSwitch(t *testing.T) ([]*stubserver.StubServer, fun } } -// stubBackendsToRawAddrs converts from a set of stub server backends to raw -// address strings. Useful when pushing addresses to the fake grpclb server. -func stubBackendsToRawAddrs(backends []*stubserver.StubServer) []string { - addrs := make([]string, len(backends)) - for i, backend := range backends { - addrs[i] = backend.Address - } - return addrs -} - // checkForTraceEvent looks for a trace event in the top level channel matching // the given description. Events before since are ignored. Returns nil error if // such an event is found. @@ -193,42 +190,31 @@ func (s) TestBalancerSwitch_grpclbToPickFirst(t *testing.T) { // Push a resolver update with no service config and a single address pointing // to the grpclb server we created above. This will cause the channel to - // switch to the "grpclb" balancer, and will equally distribute RPCs across - // the backends as the fake grpclb server does not support load reporting from - // the clients. - now := time.Now() + // switch to the "grpclb" balancer, which returns a single backend address. r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: lbServer.Address(), Type: resolver.GRPCLB}}}) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - if err := checkRoundRobin(ctx, cc, addrs); err != nil { + if err := checkRoundRobin(ctx, cc, addrs[0:1]); err != nil { t.Fatal(err) } - if err := checkForTraceEvent(ctx, wantGRPCLBTraceDesc, now); err != nil { - t.Fatalf("timeout when waiting for a trace event: %s, err: %v", wantGRPCLBTraceDesc, err) - } // Push a resolver update containing a non-existent grpclb server address. // This should not lead to a balancer switch. - now = time.Now() const nonExistentServer = "non-existent-grpclb-server-address" r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: nonExistentServer, Type: resolver.GRPCLB}}}) - sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout) - defer sCancel() - wantDesc := fmt.Sprintf("Channel switches to new LB policy %q", nonExistentServer) - if err := checkForTraceEvent(sCtx, wantDesc, now); err == nil { - t.Fatal("channel switched balancers when expected not to") + if err := checkRoundRobin(ctx, cc, addrs[:1]); err != nil { + t.Fatal(err) } // Push a resolver update containing no grpclb server address. This should - // lead to the channel using the default LB policy which is pick_first. - now = time.Now() - r.UpdateState(resolver.State{Addresses: addrs}) - if err := checkPickFirst(ctx, cc, addrs[0].Addr); err != nil { + // lead to the channel using the default LB policy which is pick_first. The + // list of addresses pushed as part of this update is different from the one + // returned by the "grpclb" balancer. So, we should see RPCs going to the + // newly configured backends, as part of the balancer switch. + r.UpdateState(resolver.State{Addresses: addrs[1:]}) + if err := checkPickFirst(ctx, cc, addrs[1].Addr); err != nil { t.Fatal(err) } - if err := checkForTraceEvent(ctx, wantPickFirstTraceDesc, now); err != nil { - t.Fatalf("timeout when waiting for a trace event: %s, err: %v", wantPickFirstTraceDesc, err) - } } // TestBalancerSwitch_pickFirstToGRPCLB tests the scenario where the channel @@ -248,47 +234,31 @@ func (s) TestBalancerSwitch_pickFirstToGRPCLB(t *testing.T) { // Push a resolver update containing no grpclb server address. This should // lead to the channel using the default LB policy which is pick_first. - now := time.Now() - r.UpdateState(resolver.State{Addresses: addrs}) + r.UpdateState(resolver.State{Addresses: addrs[1:]}) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - if err := checkForTraceEvent(ctx, wantPickFirstTraceDesc, now); err != nil { - t.Fatalf("timeout when waiting for a trace event: %s, err: %v", wantPickFirstTraceDesc, err) - } - if err := checkPickFirst(ctx, cc, addrs[0].Addr); err != nil { + if err := checkPickFirst(ctx, cc, addrs[1].Addr); err != nil { t.Fatal(err) } // Push a resolver update with no service config and a single address pointing // to the grpclb server we created above. This will cause the channel to - // switch to the "grpclb" balancer, and will equally distribute RPCs across - // the backends as the fake grpclb server does not support load reporting from - // the clients. - now = time.Now() + // switch to the "grpclb" balancer, which returns a single backend address. r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: lbServer.Address(), Type: resolver.GRPCLB}}}) - if err := checkForTraceEvent(ctx, wantGRPCLBTraceDesc, now); err != nil { - t.Fatalf("timeout when waiting for a trace event: %s, err: %v", wantGRPCLBTraceDesc, err) + if err := checkRoundRobin(ctx, cc, addrs[:1]); err != nil { + t.Fatal(err) } // Push a resolver update containing a non-existent grpclb server address. // This should not lead to a balancer switch. - now = time.Now() - const nonExistentServer = "non-existent-grpclb-server-address" - r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: nonExistentServer, Type: resolver.GRPCLB}}}) - sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout) - defer sCancel() - wantDesc := fmt.Sprintf("Channel switches to new LB policy %q", nonExistentServer) - if err := checkForTraceEvent(sCtx, wantDesc, now); err == nil { - t.Fatal("channel switched balancers when expected not to") + r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: "nonExistentServer", Type: resolver.GRPCLB}}}) + if err := checkRoundRobin(ctx, cc, addrs[:1]); err != nil { + t.Fatal(err) } // Switch to "pick_first" again by sending no grpclb server addresses. - now = time.Now() - r.UpdateState(resolver.State{Addresses: addrs}) - if err := checkForTraceEvent(ctx, wantPickFirstTraceDesc, now); err != nil { - t.Fatalf("timeout when waiting for a trace event: %s, err: %v", wantPickFirstTraceDesc, err) - } - if err := checkPickFirst(ctx, cc, addrs[0].Addr); err != nil { + r.UpdateState(resolver.State{Addresses: addrs[1:]}) + if err := checkPickFirst(ctx, cc, addrs[1].Addr); err != nil { t.Fatal(err) } } @@ -324,44 +294,27 @@ func (s) TestBalancerSwitch_RoundRobinToGRPCLB(t *testing.T) { scpr := parseServiceConfig(t, r, `{"loadBalancingPolicy": "round_robin"}`) // Push a resolver update with the service config specifying "round_robin". - now := time.Now() - r.UpdateState(resolver.State{ - Addresses: addrs, - ServiceConfig: scpr, - }) + r.UpdateState(resolver.State{Addresses: addrs[1:], ServiceConfig: scpr}) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - if err := checkForTraceEvent(ctx, wantRoundRobinTraceDesc, now); err != nil { - t.Fatalf("timeout when waiting for a trace event: %s, err: %v", wantRoundRobinTraceDesc, err) - } - if err := checkRoundRobin(ctx, cc, addrs); err != nil { + if err := checkRoundRobin(ctx, cc, addrs[1:]); err != nil { t.Fatal(err) } // Push a resolver update with no service config and a single address pointing // to the grpclb server we created above. This will cause the channel to - // switch to the "grpclb" balancer, and will equally distribute RPCs across - // the backends as the fake grpclb server does not support load reporting from - // the clients. - now = time.Now() + // switch to the "grpclb" balancer, which returns a single backend address. r.UpdateState(resolver.State{ Addresses: []resolver.Address{{Addr: lbServer.Address(), Type: resolver.GRPCLB}}, ServiceConfig: scpr, }) - if err := checkForTraceEvent(ctx, wantGRPCLBTraceDesc, now); err != nil { - t.Fatalf("timeout when waiting for a trace event: %s, err: %v", wantGRPCLBTraceDesc, err) + if err := checkRoundRobin(ctx, cc, addrs[:1]); err != nil { + t.Fatal(err) } // Switch back to "round_robin". - now = time.Now() - r.UpdateState(resolver.State{ - Addresses: addrs, - ServiceConfig: scpr, - }) - if err := checkForTraceEvent(ctx, wantRoundRobinTraceDesc, now); err != nil { - t.Fatalf("timeout when waiting for a trace event: %s, err: %v", wantRoundRobinTraceDesc, err) - } - if err := checkRoundRobin(ctx, cc, addrs); err != nil { + r.UpdateState(resolver.State{Addresses: addrs[1:], ServiceConfig: scpr}) + if err := checkRoundRobin(ctx, cc, addrs[1:]); err != nil { t.Fatal(err) } } From 7d43c4518479b3ea0dc6c86383139c5e73338736 Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Thu, 31 Mar 2022 16:57:15 -0700 Subject: [PATCH 11/16] fix more tests --- test/balancer_switching_test.go | 39 +++++++++------------------------ 1 file changed, 10 insertions(+), 29 deletions(-) diff --git a/test/balancer_switching_test.go b/test/balancer_switching_test.go index 574a3780157..2815906f313 100644 --- a/test/balancer_switching_test.go +++ b/test/balancer_switching_test.go @@ -348,16 +348,12 @@ func (s) TestBalancerSwitch_grpclbNotRegistered(t *testing.T) { // list fo pick_first. grpclbAddr := []resolver.Address{{Addr: "non-existent-grpclb-server-address", Type: resolver.GRPCLB}} addrs = append(grpclbAddr, addrs...) - now := time.Now() r.UpdateState(resolver.State{Addresses: addrs}) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() if err := checkPickFirst(ctx, cc, addrs[1].Addr); err != nil { t.Fatal(err) } - if err := checkForTraceEvent(ctx, wantPickFirstTraceDesc, now); err != nil { - t.Fatalf("timeout when waiting for a trace event: %s, err: %v", wantPickFirstTraceDesc, err) - } // Push a resolver update with the same addresses, but with a service config // specifying "round_robin". The ClientConn is expected to filter out the @@ -390,27 +386,22 @@ func (s) TestBalancerSwitch_grpclbAddressOverridesLoadBalancingPolicy(t *testing // Push a resolver update containing no grpclb server address. This should // lead to the channel using the default LB policy which is pick_first. - now := time.Now() - r.UpdateState(resolver.State{Addresses: addrs}) + r.UpdateState(resolver.State{Addresses: addrs[1:]}) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - if err := checkForTraceEvent(ctx, wantPickFirstTraceDesc, now); err != nil { - t.Fatalf("timeout when waiting for a trace event: %s, err: %v", wantPickFirstTraceDesc, err) - } - if err := checkPickFirst(ctx, cc, addrs[0].Addr); err != nil { + if err := checkPickFirst(ctx, cc, addrs[1].Addr); err != nil { t.Fatal(err) } // Push a resolver update with no service config. The addresses list contains // the stub backend addresses and a single address pointing to the grpclb // server we created above. This will cause the channel to switch to the - // "grpclb" balancer, and will equally distribute RPCs across the backends. - now = time.Now() + // "grpclb" balancer, which returns a single backend address. r.UpdateState(resolver.State{ - Addresses: append(addrs, resolver.Address{Addr: lbServer.Address(), Type: resolver.GRPCLB}), + Addresses: append(addrs[1:], resolver.Address{Addr: lbServer.Address(), Type: resolver.GRPCLB}), }) - if err := checkForTraceEvent(ctx, wantGRPCLBTraceDesc, now); err != nil { - t.Fatalf("timeout when waiting for a trace event: %s, err: %v", wantGRPCLBTraceDesc, err) + if err := checkRoundRobin(ctx, cc, addrs[:1]); err != nil { + t.Fatal(err) } // Push a resolver update with a service config using the deprecated @@ -418,27 +409,17 @@ func (s) TestBalancerSwitch_grpclbAddressOverridesLoadBalancingPolicy(t *testing // contains an address of type "grpclb". This should be preferred and hence // there should be no balancer switch. scpr := parseServiceConfig(t, r, `{"loadBalancingPolicy": "round_robin"}`) - now = time.Now() r.UpdateState(resolver.State{ - Addresses: append(addrs, resolver.Address{Addr: lbServer.Address(), Type: resolver.GRPCLB}), + Addresses: append(addrs[1:], resolver.Address{Addr: lbServer.Address(), Type: resolver.GRPCLB}), ServiceConfig: scpr, }) - sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout) - defer sCancel() - if err := checkForTraceEvent(sCtx, wantRoundRobinTraceDesc, now); err == nil { - t.Fatal("channel switched balancers when expected not to") - } - if err := checkRoundRobin(ctx, cc, addrs); err != nil { + if err := checkRoundRobin(ctx, cc, addrs[:1]); err != nil { t.Fatal(err) } // Switch to "round_robin" by removing the address of type "grpclb". - now = time.Now() - r.UpdateState(resolver.State{Addresses: addrs}) - if err := checkForTraceEvent(ctx, wantRoundRobinTraceDesc, now); err != nil { - t.Fatalf("timeout when waiting for a trace event: %s, err: %v", wantRoundRobinTraceDesc, err) - } - if err := checkRoundRobin(ctx, cc, addrs); err != nil { + r.UpdateState(resolver.State{Addresses: addrs[1:]}) + if err := checkRoundRobin(ctx, cc, addrs[1:]); err != nil { t.Fatal(err) } } From 270c56220efbbf9195642e4e39719dfa25bc861e Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Thu, 31 Mar 2022 17:01:38 -0700 Subject: [PATCH 12/16] fix one more test --- test/balancer_switching_test.go | 33 +++++++++------------------------ 1 file changed, 9 insertions(+), 24 deletions(-) diff --git a/test/balancer_switching_test.go b/test/balancer_switching_test.go index 2815906f313..be17762b980 100644 --- a/test/balancer_switching_test.go +++ b/test/balancer_switching_test.go @@ -43,9 +43,10 @@ const ( loadBalancedServicePort = 443 wantGRPCLBTraceDesc = `Channel switches to new LB policy "grpclb"` wantRoundRobinTraceDesc = `Channel switches to new LB policy "round_robin"` - wantPickFirstTraceDesc = `Channel switches to new LB policy "pick_first"` - // This is the number of stub backends set up at the start of each test. + // This is the number of stub backends set up at the start of each test. The + // first backend is used for the "grpclb" policy and the rest are used for + // other LB policies to test balancer switching. backendCount = 3 ) @@ -443,31 +444,21 @@ func (s) TestBalancerSwitch_LoadBalancingConfigTrumps(t *testing.T) { // Push a resolver update with no service config and a single address pointing // to the grpclb server we created above. This will cause the channel to - // switch to the "grpclb" balancer, and will equally distribute RPCs across - // the backends as the fake grpclb server does not support load reporting from - // the clients. - now := time.Now() + // switch to the "grpclb" balancer, which returns a single backend address. r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: lbServer.Address(), Type: resolver.GRPCLB}}}) ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - if err := checkRoundRobin(ctx, cc, addrs); err != nil { + if err := checkRoundRobin(ctx, cc, addrs[:1]); err != nil { t.Fatal(err) } - if err := checkForTraceEvent(ctx, wantGRPCLBTraceDesc, now); err != nil { - t.Fatalf("timeout when waiting for a trace event: %s, err: %v", wantGRPCLBTraceDesc, err) - } // Push a resolver update with the service config specifying "round_robin" // through the recommended `loadBalancingConfig` field. - now = time.Now() r.UpdateState(resolver.State{ - Addresses: addrs, + Addresses: addrs[1:], ServiceConfig: parseServiceConfig(t, r, rrServiceConfig), }) - if err := checkForTraceEvent(ctx, wantRoundRobinTraceDesc, now); err != nil { - t.Fatalf("timeout when waiting for a trace event: %s, err: %v", wantRoundRobinTraceDesc, err) - } - if err := checkRoundRobin(ctx, cc, addrs); err != nil { + if err := checkRoundRobin(ctx, cc, addrs[1:]); err != nil { t.Fatal(err) } @@ -478,14 +469,8 @@ func (s) TestBalancerSwitch_LoadBalancingConfigTrumps(t *testing.T) { // switched. And because the `loadBalancingConfig` field trumps everything // else, the address of type "grpclb" should be ignored. grpclbAddr := resolver.Address{Addr: "non-existent-grpclb-server-address", Type: resolver.GRPCLB} - now = time.Now() - r.UpdateState(resolver.State{Addresses: append(addrs, grpclbAddr)}) - sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout) - defer sCancel() - if err := checkForTraceEvent(sCtx, wantRoundRobinTraceDesc, now); err == nil { - t.Fatal("channel switched balancers when expected not to") - } - if err := checkRoundRobin(ctx, cc, addrs); err != nil { + r.UpdateState(resolver.State{Addresses: append(addrs[1:], grpclbAddr)}) + if err := checkRoundRobin(ctx, cc, addrs[1:]); err != nil { t.Fatal(err) } } From 936ddd6f39962351baae84ee36b2770d894d80fc Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Thu, 31 Mar 2022 17:07:46 -0700 Subject: [PATCH 13/16] one last one --- test/balancer_switching_test.go | 43 +++------------------------------ 1 file changed, 4 insertions(+), 39 deletions(-) diff --git a/test/balancer_switching_test.go b/test/balancer_switching_test.go index be17762b980..ede88fda572 100644 --- a/test/balancer_switching_test.go +++ b/test/balancer_switching_test.go @@ -21,9 +21,7 @@ package test import ( "context" "fmt" - "strings" "testing" - "time" "google.golang.org/grpc" "google.golang.org/grpc/balancer" @@ -108,30 +106,6 @@ func startBackendsForBalancerSwitch(t *testing.T) ([]*stubserver.StubServer, fun } } -// checkForTraceEvent looks for a trace event in the top level channel matching -// the given description. Events before since are ignored. Returns nil error if -// such an event is found. -func checkForTraceEvent(ctx context.Context, wantDesc string, since time.Time) error { - for { - if err := ctx.Err(); err != nil { - return err - } - tcs, _ := channelz.GetTopChannels(0, 0) - if len(tcs) != 1 { - return fmt.Errorf("channelz returned %d top channels, want 1", len(tcs)) - } - for _, event := range tcs[0].Trace.Events { - if event.Timestamp.Before(since) { - continue - } - if strings.Contains(event.Desc, wantDesc) { - return nil - } - } - time.Sleep(defaultTestShortTimeout) - } -} - // TestBalancerSwitch_Basic tests the basic scenario of switching from one LB // policy to another, as specified in the service config. func (s) TestBalancerSwitch_Basic(t *testing.T) { @@ -565,17 +539,13 @@ func (s) TestBalancerSwitch_Graceful(t *testing.T) { defer cc.Close() // Push a resolver update with the service config specifying "round_robin". - now := time.Now() ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() r.UpdateState(resolver.State{ - Addresses: addrs, + Addresses: addrs[1:], ServiceConfig: parseServiceConfig(t, r, rrServiceConfig), }) - if err := checkForTraceEvent(ctx, wantRoundRobinTraceDesc, now); err != nil { - t.Fatalf("timeout when waiting for a trace event: %s, err: %v", wantRoundRobinTraceDesc, err) - } - if err := checkRoundRobin(ctx, cc, addrs); err != nil { + if err := checkRoundRobin(ctx, cc, addrs[1:]); err != nil { t.Fatal(err) } @@ -612,9 +582,8 @@ func (s) TestBalancerSwitch_Graceful(t *testing.T) { // balancer. We should see a trace event for this balancer switch. But RPCs // should still be routed to the old balancer since our stub balancer does not // report a ready picker until we ask it to do so. - now = time.Now() r.UpdateState(resolver.State{ - Addresses: addrs, + Addresses: addrs[:1], ServiceConfig: r.CC.ParseServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%v": {}}]}`, t.Name())), }) select { @@ -622,11 +591,7 @@ func (s) TestBalancerSwitch_Graceful(t *testing.T) { t.Fatal("Timeout when waiting for a ClientConnState update on the new balancer") case <-ccUpdateCh: } - wantTraceDesc := fmt.Sprintf("Channel switches to new LB policy %q", t.Name()) - if err := checkForTraceEvent(ctx, wantTraceDesc, now); err != nil { - t.Fatalf("timeout when waiting for a trace event: %s, err: %v", wantTraceDesc, err) - } - if err := checkRoundRobin(ctx, cc, addrs); err != nil { + if err := checkRoundRobin(ctx, cc, addrs[1:]); err != nil { t.Fatal(err) } From c52b13c63c83dc70576328fa973d6161c356e85d Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Fri, 1 Apr 2022 11:32:43 -0700 Subject: [PATCH 14/16] no need to hold cc.mu from some methods --- balancer_conn_wrappers.go | 9 ++------- clientconn.go | 7 ++----- 2 files changed, 4 insertions(+), 12 deletions(-) diff --git a/balancer_conn_wrappers.go b/balancer_conn_wrappers.go index d9778ff3cd5..fe2cf80d6ad 100644 --- a/balancer_conn_wrappers.go +++ b/balancer_conn_wrappers.go @@ -252,7 +252,8 @@ func (ccb *ccBalancerWrapper) switchTo(name string) { // balancer.Builder corresponding to name. If no balancer.Builder is registered // for the given name, it uses the default LB policy which is "pick_first". func (ccb *ccBalancerWrapper) handleSwitchTo(name string) { - // TODO: Ensure other languages using case-insensitive balancer registries. + // TODO: Other languages use case-insensitive balancer registries. We should + // switch as well. See: https://github.com/grpc/grpc-go/issues/5288. if strings.EqualFold(ccb.curBalancerName, name) { return } @@ -298,9 +299,6 @@ func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer if len(addrs) <= 0 { return nil, fmt.Errorf("grpc: cannot create SubConn with empty address list") } - if ccb.done.HasFired() { - return nil, fmt.Errorf("grpc: ClientConn balancer wrapper was closed") - } ac, err := ccb.cc.newAddrConn(addrs, opts) if err != nil { channelz.Warningf(logger, ccb.cc.channelzID, "acBalancerWrapper: NewSubConn: failed to newAddrConn: %v", err) @@ -336,9 +334,6 @@ func (ccb *ccBalancerWrapper) UpdateAddresses(sc balancer.SubConn, addrs []resol } func (ccb *ccBalancerWrapper) UpdateState(s balancer.State) { - if ccb.done.HasFired() { - return - } // Update picker before updating state. Even though the ordering here does // not matter, it can lead to multiple calls of Pick in the common start-up // case where we wait for ready and then perform an RPC. If the picker is diff --git a/clientconn.go b/clientconn.go index 9473e3fb172..3ed6eb8e75e 100644 --- a/clientconn.go +++ b/clientconn.go @@ -535,8 +535,6 @@ func (cc *ClientConn) GetState() connectivity.State { // Notice: This API is EXPERIMENTAL and may be changed or removed in a later // release. func (cc *ClientConn) Connect() { - cc.mu.Lock() - defer cc.mu.Unlock() cc.balancerWrapper.exitIdle() } @@ -691,9 +689,7 @@ func (cc *ClientConn) applyFailingLB(sc *serviceconfig.ParseResult) { } func (cc *ClientConn) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State, err error) { - cc.mu.Lock() cc.balancerWrapper.updateSubConnState(sc, s, err) - cc.mu.Unlock() } // newAddrConn creates an addrConn for addrs and adds it to cc.conns. @@ -1015,8 +1011,9 @@ func (cc *ClientConn) Close() error { bWrapper := cc.balancerWrapper cc.mu.Unlock() + // The order of closing matters here since the balancer wrapper assumes the + // picker is closed before it is closed. cc.blockingpicker.close() - if bWrapper != nil { bWrapper.close() } From d90a0173c7ae82fb932b56ff6cdfee36ffeb8cf1 Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Fri, 1 Apr 2022 11:47:38 -0700 Subject: [PATCH 15/16] add wrapper types for updates instead of an enum --- balancer_conn_wrappers.go | 75 ++++++++++++++++++++------------------- 1 file changed, 38 insertions(+), 37 deletions(-) diff --git a/balancer_conn_wrappers.go b/balancer_conn_wrappers.go index fe2cf80d6ad..7e92310b4f2 100644 --- a/balancer_conn_wrappers.go +++ b/balancer_conn_wrappers.go @@ -75,32 +75,34 @@ func newCCBalancerWrapper(cc *ClientConn, bopts balancer.BuildOptions) *ccBalanc return ccb } -// updateType indicates the type of update pushed to the watcher goroutine. -type updateType int - -const ( - updateTypeClientConnState updateType = iota // clientConn state change from grpc - updateTypeSubConnState // subConn state change from grpc - updateTypeExitIdle // exitIdle from grpc - updateTypeResolverError // resolver error from grpc - updateTypeSwitchTo // balancer switch update from grpc - updateTypeSubConn // removeSubConn from the balancer -) +// The following xxxUpdate structs wrap the arguments received as part of the +// corresponding update. The watcher goroutine uses the 'type' of the update to +// invoke the appropriate handler routine to handle the update. -// watcherUpdate wraps the actual update to be passed to the watcher goroutine -// with a type indicating the kind of update being wrapped. -type watcherUpdate struct { - typ updateType - update interface{} +type ccStateUpdate struct { + ccs *balancer.ClientConnState } -// scStateUpdate contains the subConn and the new state it changed to. type scStateUpdate struct { sc balancer.SubConn state connectivity.State err error } +type exitIdleUpdate struct{} + +type resolverErrorUpdate struct { + err error +} + +type switchToUpdate struct { + name string +} + +type subConnUpdate struct { + sc balancer.SubConn +} + // watcher is a long-running goroutine which reads updates from a channel and // invokes corresponding methods on the underlying balancer. It ensures that // these methods are invoked in a synchronous fashion. It also ensures that @@ -113,20 +115,19 @@ func (ccb *ccBalancerWrapper) watcher() { if ccb.closed.HasFired() { break } - update := u.(watcherUpdate) - switch update.typ { - case updateTypeClientConnState: - ccb.handleClientConnStateChange(update.update.(*balancer.ClientConnState)) - case updateTypeSubConnState: - ccb.handleSubConnStateChange(update.update.(*scStateUpdate)) - case updateTypeExitIdle: + switch update := u.(type) { + case *ccStateUpdate: + ccb.handleClientConnStateChange(update.ccs) + case *scStateUpdate: + ccb.handleSubConnStateChange(update) + case *exitIdleUpdate: ccb.handleExitIdle() - case updateTypeResolverError: - ccb.handleResolverError(update.update.(error)) - case updateTypeSwitchTo: - ccb.handleSwitchTo(update.update.(string)) - case updateTypeSubConn: - ccb.handleRemoveSubConn(update.update.(*acBalancerWrapper)) + case *resolverErrorUpdate: + ccb.handleResolverError(update.err) + case *switchToUpdate: + ccb.handleSwitchTo(update.name) + case *subConnUpdate: + ccb.handleRemoveSubConn(update.sc.(*acBalancerWrapper)) default: logger.Errorf("ccBalancerWrapper.watcher: unknown update %+v, type %T", update, update) } @@ -148,7 +149,7 @@ func (ccb *ccBalancerWrapper) watcher() { // and return. It needs to return the error returned by the underlying balancer // back to grpc which propagates that to the resolver. func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnState) error { - ccb.updateCh.Put(watcherUpdate{typ: updateTypeClientConnState, update: ccs}) + ccb.updateCh.Put(&ccStateUpdate{ccs: ccs}) var res interface{} select { @@ -201,11 +202,11 @@ func (ccb *ccBalancerWrapper) updateSubConnState(sc balancer.SubConn, s connecti if sc == nil { return } - ccb.updateCh.Put(watcherUpdate{typ: updateTypeSubConnState, update: &scStateUpdate{ + ccb.updateCh.Put(&scStateUpdate{ sc: sc, state: s, err: err, - }}) + }) } // handleSubConnStateChange handles a SubConnState update from the update @@ -215,7 +216,7 @@ func (ccb *ccBalancerWrapper) handleSubConnStateChange(update *scStateUpdate) { } func (ccb *ccBalancerWrapper) exitIdle() { - ccb.updateCh.Put(watcherUpdate{typ: updateTypeExitIdle}) + ccb.updateCh.Put(&exitIdleUpdate{}) } func (ccb *ccBalancerWrapper) handleExitIdle() { @@ -226,7 +227,7 @@ func (ccb *ccBalancerWrapper) handleExitIdle() { } func (ccb *ccBalancerWrapper) resolverError(err error) { - ccb.updateCh.Put(watcherUpdate{typ: updateTypeResolverError, update: err}) + ccb.updateCh.Put(&resolverErrorUpdate{err: err}) } func (ccb *ccBalancerWrapper) handleResolverError(err error) { @@ -244,7 +245,7 @@ func (ccb *ccBalancerWrapper) handleResolverError(err error) { // the ccBalancerWrapper keeps track of the current LB policy name, and skips // the graceful balancer switching process if the name does not change. func (ccb *ccBalancerWrapper) switchTo(name string) { - ccb.updateCh.Put(watcherUpdate{typ: updateTypeSwitchTo, update: name}) + ccb.updateCh.Put(&switchToUpdate{name: name}) } // handleSwitchTo handles a balancer switch update from the update channel. It @@ -322,7 +323,7 @@ func (ccb *ccBalancerWrapper) RemoveSubConn(sc balancer.SubConn) { // asynchronously is probably not required anymore since the switchTo() method // handles the balancer switch by pushing the update onto the channel. // TODO(easwars): Handle this inline. - ccb.updateCh.Put(watcherUpdate{typ: updateTypeSubConn, update: sc}) + ccb.updateCh.Put(&subConnUpdate{sc: sc}) } func (ccb *ccBalancerWrapper) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) { From 03232f6b4624725b032452b2fce521a0550a6450 Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Fri, 1 Apr 2022 11:54:06 -0700 Subject: [PATCH 16/16] do a safe type assertion in remove subConn --- balancer_conn_wrappers.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/balancer_conn_wrappers.go b/balancer_conn_wrappers.go index 7e92310b4f2..b1c23eaae0d 100644 --- a/balancer_conn_wrappers.go +++ b/balancer_conn_wrappers.go @@ -100,7 +100,7 @@ type switchToUpdate struct { } type subConnUpdate struct { - sc balancer.SubConn + acbw *acBalancerWrapper } // watcher is a long-running goroutine which reads updates from a channel and @@ -127,7 +127,7 @@ func (ccb *ccBalancerWrapper) watcher() { case *switchToUpdate: ccb.handleSwitchTo(update.name) case *subConnUpdate: - ccb.handleRemoveSubConn(update.sc.(*acBalancerWrapper)) + ccb.handleRemoveSubConn(update.acbw) default: logger.Errorf("ccBalancerWrapper.watcher: unknown update %+v, type %T", update, update) } @@ -323,7 +323,11 @@ func (ccb *ccBalancerWrapper) RemoveSubConn(sc balancer.SubConn) { // asynchronously is probably not required anymore since the switchTo() method // handles the balancer switch by pushing the update onto the channel. // TODO(easwars): Handle this inline. - ccb.updateCh.Put(&subConnUpdate{sc: sc}) + acbw, ok := sc.(*acBalancerWrapper) + if !ok { + return + } + ccb.updateCh.Put(&subConnUpdate{acbw: acbw}) } func (ccb *ccBalancerWrapper) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) {