Skip to content

Commit

Permalink
balancer: add ExitIdle optional interface (#4673)
Browse files Browse the repository at this point in the history
  • Loading branch information
dfawley committed Aug 18, 2021
1 parent 52cea24 commit 8ab16ef
Show file tree
Hide file tree
Showing 19 changed files with 231 additions and 32 deletions.
46 changes: 31 additions & 15 deletions balancer/balancer.go
Expand Up @@ -75,24 +75,26 @@ func Get(name string) Builder {
return nil
}

// SubConn represents a gRPC sub connection.
// Each sub connection contains a list of addresses. gRPC will
// try to connect to them (in sequence), and stop trying the
// remainder once one connection is successful.
// A SubConn represents a single connection to a gRPC backend service.
//
// The reconnect backoff will be applied on the list, not a single address.
// For example, try_on_all_addresses -> backoff -> try_on_all_addresses.
// Each SubConn contains a list of addresses.
//
// All SubConns start in IDLE, and will not try to connect. To trigger
// the connecting, Balancers must call Connect.
// When the connection encounters an error, it will reconnect immediately.
// When the connection becomes IDLE, it will not reconnect unless Connect is
// called.
// All SubConns start in IDLE, and will not try to connect. To trigger the
// connecting, Balancers must call Connect. If a connection re-enters IDLE,
// Balancers must call Connect again to trigger a new connection attempt.
//
// This interface is to be implemented by gRPC. Users should not need a
// brand new implementation of this interface. For the situations like
// testing, the new implementation should embed this interface. This allows
// gRPC to add new methods to this interface.
// gRPC will try to connect to the addresses in sequence, and stop trying the
// remainder once the first connection is successful. If an attempt to connect
// to all addresses encounters an error, the SubConn will enter
// TRANSIENT_FAILURE for a backoff period, and then transition to IDLE.
//
// Once established, if a connection is lost, the SubConn will transition
// directly to IDLE.
//
// This interface is to be implemented by gRPC. Users should not need their own
// implementation of this interface. For situations like testing, any
// implementations should embed this interface. This allows gRPC to add new
// methods to this interface.
type SubConn interface {
// UpdateAddresses updates the addresses used in this SubConn.
// gRPC checks if currently-connected address is still in the new list.
Expand Down Expand Up @@ -326,6 +328,20 @@ type Balancer interface {
Close()
}

// ExitIdler is an optional interface for balancers to implement. If
// implemented, ExitIdle will be called when ClientConn.Connect is called, if
// the ClientConn is idle. If unimplemented, ClientConn.Connect will cause
// all SubConns to connect.
//
// Notice: it will be required for all balancers to implement this in a future
// release.
type ExitIdler interface {
// ExitIdle instructs the LB policy to reconnect to backends / exit the
// IDLE state, if appropriate and possible. Note that SubConns that enter
// the IDLE state will not reconnect until SubConn.Connect is called.
ExitIdle()
}

// SubConnState describes the state of a SubConn.
type SubConnState struct {
// ConnectivityState is the connectivity state of the SubConn.
Expand Down
5 changes: 5 additions & 0 deletions balancer/base/balancer.go
Expand Up @@ -251,6 +251,11 @@ func (b *baseBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.Su
func (b *baseBalancer) Close() {
}

// ExitIdle is a nop because the base balancer attempts to stay connected to
// all SubConns at all times.
func (b *baseBalancer) ExitIdle() {
}

// NewErrPicker returns a Picker that always returns err on Pick().
func NewErrPicker(err error) balancer.Picker {
return &errPicker{err: err}
Expand Down
2 changes: 2 additions & 0 deletions balancer/grpclb/grpclb.go
Expand Up @@ -488,3 +488,5 @@ func (lb *lbBalancer) Close() {
}
lb.cc.close()
}

func (lb *lbBalancer) ExitIdle() {}
4 changes: 4 additions & 0 deletions balancer/rls/internal/balancer.go
Expand Up @@ -129,6 +129,10 @@ func (lb *rlsBalancer) Close() {
}
}

func (lb *rlsBalancer) ExitIdle() {
// TODO: are we 100% sure this should be a nop?
}

// updateControlChannel updates the RLS client if required.
// Caller must hold lb.mu.
func (lb *rlsBalancer) updateControlChannel(newCfg *lbConfig) {
Expand Down
39 changes: 32 additions & 7 deletions balancer_conn_wrappers.go
Expand Up @@ -37,15 +37,20 @@ type scStateUpdate struct {
err error
}

// exitIdle contains no data and is just a signal sent on the updateCh in
// ccBalancerWrapper to instruct the balancer to exit idle.
type exitIdle struct{}

// ccBalancerWrapper is a wrapper on top of cc for balancers.
// It implements balancer.ClientConn interface.
type ccBalancerWrapper struct {
cc *ClientConn
balancerMu sync.Mutex // synchronizes calls to the balancer
balancer balancer.Balancer
updateCh *buffer.Unbounded
closed *grpcsync.Event
done *grpcsync.Event
cc *ClientConn
balancerMu sync.Mutex // synchronizes calls to the balancer
balancer balancer.Balancer
hasExitIdle bool
updateCh *buffer.Unbounded
closed *grpcsync.Event
done *grpcsync.Event

mu sync.Mutex
subConns map[*acBalancerWrapper]struct{}
Expand All @@ -61,6 +66,7 @@ func newCCBalancerWrapper(cc *ClientConn, b balancer.Builder, bopts balancer.Bui
}
go ccb.watcher()
ccb.balancer = b.Build(ccb, bopts)
_, ccb.hasExitIdle = ccb.balancer.(balancer.ExitIdler)
return ccb
}

Expand All @@ -86,6 +92,17 @@ func (ccb *ccBalancerWrapper) watcher() {
ccb.cc.removeAddrConn(u.getAddrConn(), errConnDrain)
}
ccb.mu.Unlock()
case exitIdle:
if ccb.cc.GetState() == connectivity.Idle {
if ei, ok := ccb.balancer.(balancer.ExitIdler); ok {
// We already checked that the balancer implements
// ExitIdle before pushing the event to updateCh, but
// check conditionally again as defensive programming.
ccb.balancerMu.Lock()
ei.ExitIdle()
ccb.balancerMu.Unlock()
}
}
default:
logger.Errorf("ccBalancerWrapper.watcher: unknown update %+v, type %T", t, t)
}
Expand Down Expand Up @@ -118,6 +135,14 @@ func (ccb *ccBalancerWrapper) close() {
<-ccb.done.Done()
}

func (ccb *ccBalancerWrapper) exitIdle() bool {
if !ccb.hasExitIdle {
return false
}
ccb.updateCh.Put(exitIdle{})
return true
}

func (ccb *ccBalancerWrapper) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State, err error) {
// When updating addresses for a SubConn, if the address in use is not in
// the new addresses, the old ac will be tearDown() and a new ac will be
Expand All @@ -144,8 +169,8 @@ func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnStat

func (ccb *ccBalancerWrapper) resolverError(err error) {
ccb.balancerMu.Lock()
defer ccb.balancerMu.Unlock()
ccb.balancer.ResolverError(err)
ccb.balancerMu.Unlock()
}

func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
Expand Down
2 changes: 2 additions & 0 deletions balancer_switching_test.go
Expand Up @@ -58,6 +58,8 @@ func (b *magicalLB) UpdateClientConnState(balancer.ClientConnState) error {

func (b *magicalLB) Close() {}

func (b *magicalLB) ExitIdle() {}

func init() {
balancer.Register(&magicalLB{})
}
Expand Down
14 changes: 7 additions & 7 deletions clientconn.go
Expand Up @@ -555,13 +555,13 @@ func (cc *ClientConn) GetState() connectivity.State {
// Notice: This API is EXPERIMENTAL and may be changed or removed in a later
// release.
func (cc *ClientConn) Connect() {
if cc.GetState() == connectivity.Idle {
cc.mu.Lock()
for ac := range cc.conns {
// TODO: should this be a signal to the LB policy instead?
go ac.connect()
}
cc.mu.Unlock()
cc.mu.Lock()
defer cc.mu.Unlock()
if cc.balancerWrapper != nil && cc.balancerWrapper.exitIdle() {
return
}
for ac := range cc.conns {
go ac.connect()
}
}

Expand Down
7 changes: 7 additions & 0 deletions internal/balancer/stub/stub.go
Expand Up @@ -33,6 +33,7 @@ type BalancerFuncs struct {
ResolverError func(*BalancerData, error)
UpdateSubConnState func(*BalancerData, balancer.SubConn, balancer.SubConnState)
Close func(*BalancerData)
ExitIdle func(*BalancerData)
}

// BalancerData contains data relevant to a stub balancer.
Expand Down Expand Up @@ -75,6 +76,12 @@ func (b *bal) Close() {
}
}

func (b *bal) ExitIdle() {
if b.bf.ExitIdle != nil {
b.bf.ExitIdle(b.bd)
}
}

type bb struct {
name string
bf BalancerFuncs
Expand Down
6 changes: 6 additions & 0 deletions pickfirst.go
Expand Up @@ -124,6 +124,12 @@ func (b *pickfirstBalancer) UpdateSubConnState(sc balancer.SubConn, s balancer.S
func (b *pickfirstBalancer) Close() {
}

func (b *pickfirstBalancer) ExitIdle() {
if b.state == connectivity.Idle {
b.sc.Connect()
}
}

type picker struct {
result balancer.PickResult
err error
Expand Down
7 changes: 5 additions & 2 deletions test/balancer_test.go
Expand Up @@ -116,6 +116,8 @@ func (b *testBalancer) UpdateSubConnState(sc balancer.SubConn, s balancer.SubCon

func (b *testBalancer) Close() {}

func (b *testBalancer) ExitIdle() {}

type picker struct {
err error
sc balancer.SubConn
Expand Down Expand Up @@ -373,8 +375,9 @@ func (testBalancerKeepAddresses) UpdateSubConnState(sc balancer.SubConn, s balan
panic("not used")
}

func (testBalancerKeepAddresses) Close() {
}
func (testBalancerKeepAddresses) Close() {}

func (testBalancerKeepAddresses) ExitIdle() {}

// Make sure that non-grpclb balancers don't get grpclb addresses even if name
// resolver sends them
Expand Down
27 changes: 27 additions & 0 deletions xds/internal/balancer/balancergroup/balancergroup.go
Expand Up @@ -104,6 +104,22 @@ func (sbc *subBalancerWrapper) startBalancer() {
}
}

func (sbc *subBalancerWrapper) exitIdle() {
b := sbc.balancer
if b == nil {
return
}
if ei, ok := b.(balancer.ExitIdler); ok {
ei.ExitIdle()
return
}
for sc, b := range sbc.group.scToSubBalancer {
if b == sbc {
sc.Connect()
}
}
}

func (sbc *subBalancerWrapper) updateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
b := sbc.balancer
if b == nil {
Expand Down Expand Up @@ -493,6 +509,17 @@ func (bg *BalancerGroup) Close() {
bg.outgoingMu.Unlock()
}

// ExitIdle should be invoked when the parent LB policy's ExitIdle is invoked.
// It will trigger this on all sub-balancers, or reconnect their subconns if
// not supported.
func (bg *BalancerGroup) ExitIdle() {
bg.outgoingMu.Lock()
for _, config := range bg.idToBalancerConfig {
config.exitIdle()
}
bg.outgoingMu.Unlock()
}

const (
serverLoadCPUName = "cpu_utilization"
serverLoadMemoryName = "mem_utilization"
Expand Down
18 changes: 18 additions & 0 deletions xds/internal/balancer/cdsbalancer/cdsbalancer.go
Expand Up @@ -141,6 +141,8 @@ type scUpdate struct {
state balancer.SubConnState
}

type exitIdle struct{}

// cdsBalancer implements a CDS based LB policy. It instantiates a
// cluster_resolver balancer to further resolve the serviceName received from
// CDS, into localities and endpoints. Implements the balancer.Balancer
Expand Down Expand Up @@ -376,6 +378,18 @@ func (b *cdsBalancer) run() {
break
}
b.childLB.UpdateSubConnState(update.subConn, update.state)
case exitIdle:
if b.childLB == nil {
b.logger.Errorf("xds: received ExitIdle with no child balancer")
break
}
// This implementation assumes the child balancer supports
// ExitIdle (but still checks for the interface's existence to
// avoid a panic if not). If the child does not, no subconns
// will be connected.
if ei, ok := b.childLB.(balancer.ExitIdler); ok {
ei.ExitIdle()
}
}
case u := <-b.clusterHandler.updateChannel:
b.handleWatchUpdate(u)
Expand Down Expand Up @@ -494,6 +508,10 @@ func (b *cdsBalancer) Close() {
<-b.done.Done()
}

func (b *cdsBalancer) ExitIdle() {
b.updateCh.Put(exitIdle{})
}

// ccWrapper wraps the balancer.ClientConn passed to the CDS balancer at
// creation and intercepts the NewSubConn() and UpdateAddresses() call from the
// child policy to add security configuration required by xDS credentials.
Expand Down

0 comments on commit 8ab16ef

Please sign in to comment.