Skip to content

Commit

Permalink
rls: suppress picker updates from children when handling config updat…
Browse files Browse the repository at this point in the history
…es (#5539)
  • Loading branch information
easwars committed Aug 1, 2022
1 parent 02f1a7a commit c14e29e
Show file tree
Hide file tree
Showing 3 changed files with 357 additions and 70 deletions.
147 changes: 85 additions & 62 deletions balancer/rls/balancer.go
Expand Up @@ -21,6 +21,7 @@ package rls

import (
"encoding/json"
"errors"
"fmt"
"sync"
"sync/atomic"
Expand All @@ -36,6 +37,7 @@ import (
"google.golang.org/grpc/internal/buffer"
internalgrpclog "google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/pretty"
"google.golang.org/grpc/resolver"
)

Expand All @@ -50,7 +52,8 @@ const (
)

var (
logger = grpclog.Component("rls")
logger = grpclog.Component("rls")
errBalancerClosed = errors.New("rls LB policy is closed")

// Below defined vars for overriding in unit tests.

Expand Down Expand Up @@ -88,16 +91,14 @@ func (rlsBB) Name() string {

func (rlsBB) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
lb := &rlsBalancer{
done: grpcsync.NewEvent(),
cc: cc,
bopts: opts,
purgeTicker: dataCachePurgeTicker(),
lbCfg: &lbConfig{},
pendingMap: make(map[cacheKey]*backoffState),
childPolicies: make(map[string]*childPolicyWrapper),
ccUpdateCh: make(chan *balancer.ClientConnState, 1),
childPolicyStateUpdateCh: buffer.NewUnbounded(),
connectivityStateCh: make(chan struct{}),
done: grpcsync.NewEvent(),
cc: cc,
bopts: opts,
purgeTicker: dataCachePurgeTicker(),
lbCfg: &lbConfig{},
pendingMap: make(map[cacheKey]*backoffState),
childPolicies: make(map[string]*childPolicyWrapper),
updateCh: buffer.NewUnbounded(),
}
lb.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[rls-experimental-lb %p] ", lb))
lb.dataCache = newDataCache(maxCacheSize, lb.logger)
Expand Down Expand Up @@ -139,13 +140,28 @@ type rlsBalancer struct {
// default child policy wrapper when a new picker is created. See
// sendNewPickerLocked() for details.
lastPicker *rlsPicker
// Set during UpdateClientConnState when pushing updates to child policies.
// Prevents state updates from child policies causing new pickers to be sent
// up the channel. Cleared after all child policies have processed the
// updates sent to them, after which a new picker is sent up the channel.
inhibitPickerUpdates bool

// Channel on which all updates are pushed. Processed in run().
updateCh *buffer.Unbounded
}

type resumePickerUpdates struct {
done chan struct{}
}

// Channels on which updates are received or pushed.
ccUpdateCh chan *balancer.ClientConnState
childPolicyStateUpdateCh *buffer.Unbounded // idAndState from child policy.
connectivityStateCh chan struct{} // signalled when control channel becomes READY again.
// childPolicyIDAndState wraps a child policy id and its state update.
type childPolicyIDAndState struct {
id string
state balancer.State
}

type controlChannelReady struct{}

// run is a long-running goroutine which handles all the updates that the
// balancer wishes to handle. The appropriate updateHandler will push the update
// on to a channel that this goroutine will select on, thereby the handling of
Expand All @@ -154,21 +170,30 @@ func (b *rlsBalancer) run() {
go b.purgeDataCache()
for {
select {
case u := <-b.ccUpdateCh:
b.handleClientConnUpdate(u)
case u := <-b.childPolicyStateUpdateCh.Get():
update := u.(idAndState)
b.childPolicyStateUpdateCh.Load()
b.handleChildPolicyStateUpdate(update.id, update.state)
case <-b.connectivityStateCh:
b.logger.Infof("Resetting backoff state after control channel getting back to READY")
b.cacheMu.Lock()
updatePicker := b.dataCache.resetBackoffState(&backoffState{bs: defaultBackoffStrategy})
b.cacheMu.Unlock()
if updatePicker {
b.sendNewPicker()
case u := <-b.updateCh.Get():
b.updateCh.Load()
switch update := u.(type) {
case childPolicyIDAndState:
b.handleChildPolicyStateUpdate(update.id, update.state)
case controlChannelReady:
b.logger.Infof("Resetting backoff state after control channel getting back to READY")
b.cacheMu.Lock()
updatePicker := b.dataCache.resetBackoffState(&backoffState{bs: defaultBackoffStrategy})
b.cacheMu.Unlock()
if updatePicker {
b.sendNewPicker()
}
resetBackoffHook()
case resumePickerUpdates:
b.stateMu.Lock()
b.logger.Infof("Resuming picker updates after config propagation to child policies")
b.inhibitPickerUpdates = false
b.sendNewPickerLocked()
close(update.done)
b.stateMu.Unlock()
default:
b.logger.Errorf("Unsupported update type %T", update)
}
resetBackoffHook()
case <-b.done.Done():
return
}
Expand Down Expand Up @@ -196,33 +221,23 @@ func (b *rlsBalancer) purgeDataCache() {
}

func (b *rlsBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error {
// Remove unprocessed update from the channel, if one exists, before pushing
// the most recent one.
select {
case <-b.ccUpdateCh:
default:
}
b.ccUpdateCh <- &ccs
return nil
}

// handleClientConnUpdate handles updates to the service config.
//
// Invoked from the run() goroutine and this will attempt to grab the mutex.
func (b *rlsBalancer) handleClientConnUpdate(ccs *balancer.ClientConnState) {
defer clientConnUpdateHook()
if b.done.HasFired() {
b.logger.Warningf("Received service config after balancer close")
return
b.logger.Warningf("Received service config after balancer close: %s", pretty.ToJSON(ccs.BalancerConfig))
return errBalancerClosed
}

b.stateMu.Lock()
defer b.stateMu.Unlock()
newCfg := ccs.BalancerConfig.(*lbConfig)
if b.lbCfg.Equal(newCfg) {
b.stateMu.Unlock()
b.logger.Infof("New service config matches existing config")
return
return nil
}

b.logger.Infof("Delaying picker updates until config is propagated to and processed by child policies")
b.inhibitPickerUpdates = true

// When the RLS server name changes, the old control channel needs to be
// swapped out for a new one. All state associated with the throttling
// algorithm is stored on a per-control-channel basis; when we swap out
Expand All @@ -238,13 +253,19 @@ func (b *rlsBalancer) handleClientConnUpdate(ccs *balancer.ClientConnState) {
// Any changes to child policy name or configuration needs to be handled by
// either creating new child policies or pushing updates to existing ones.
b.resolverState = ccs.ResolverState
b.handleChildPolicyConfigUpdate(newCfg, ccs)
b.handleChildPolicyConfigUpdate(newCfg, &ccs)

// Update the copy of the config in the LB policy and send a new picker.
// Update the copy of the config in the LB policy before releasing the lock.
b.lbCfg = newCfg
b.sendNewPickerLocked()

clientConnUpdateHook()
// Enqueue an event which will notify us when the above update has been
// propagated to all child policies, and the child policies have all
// processed their updates, and we have sent a picker update.
done := make(chan struct{})
b.updateCh.Put(resumePickerUpdates{done: done})
b.stateMu.Unlock()
<-done
return nil
}

// handleControlChannelUpdate handles updates to service config fields which
Expand All @@ -258,7 +279,10 @@ func (b *rlsBalancer) handleControlChannelUpdate(newCfg *lbConfig) {

// Create a new control channel and close the existing one.
b.logger.Infof("Creating control channel to RLS server at: %v", newCfg.lookupService)
ctrlCh, err := newControlChannel(newCfg.lookupService, newCfg.controlChannelServiceConfig, newCfg.lookupServiceTimeout, b.bopts, b.connectivityStateCh)
backToReadyFn := func() {
b.updateCh.Put(controlChannelReady{})
}
ctrlCh, err := newControlChannel(newCfg.lookupService, newCfg.controlChannelServiceConfig, newCfg.lookupServiceTimeout, b.bopts, backToReadyFn)
if err != nil {
// This is very uncommon and usually represents a non-transient error.
// There is not much we can do here other than wait for another update
Expand Down Expand Up @@ -438,8 +462,13 @@ func (b *rlsBalancer) sendNewPickerLocked() {
ConnectivityState: aggregatedState,
Picker: picker,
}
b.logger.Infof("New balancer.State: %+v", state)
b.cc.UpdateState(state)

if !b.inhibitPickerUpdates {
b.logger.Infof("New balancer.State: %+v", state)
b.cc.UpdateState(state)
} else {
b.logger.Infof("Delaying picker update: %+v", state)
}

if b.lastPicker != nil {
if b.defaultPolicy != nil {
Expand Down Expand Up @@ -499,18 +528,12 @@ func (b *rlsBalancer) aggregatedConnectivityState() connectivity.State {
}
}

// idAndState wraps a child policy id and its state update.
type idAndState struct {
id string
state balancer.State
}

// UpdateState is a implementation of the balancergroup.BalancerStateAggregator
// interface. The actual state aggregation functionality is handled
// asynchronously. This method only pushes the state update on to channel read
// and dispatched by the run() goroutine.
func (b *rlsBalancer) UpdateState(id string, state balancer.State) {
b.childPolicyStateUpdateCh.Put(idAndState{id: id, state: state})
b.updateCh.Put(childPolicyIDAndState{id: id, state: state})
}

// handleChildPolicyStateUpdate provides the state aggregator functionality for
Expand Down Expand Up @@ -543,7 +566,7 @@ func (b *rlsBalancer) handleChildPolicyStateUpdate(id string, newState balancer.
return
}
atomic.StorePointer(&cpw.state, unsafe.Pointer(&newState))
b.logger.Infof("Child policy %q has new state %+v", id, cpw.state)
b.logger.Infof("Child policy %q has new state %+v", id, newState)
b.sendNewPickerLocked()
}

Expand Down

0 comments on commit c14e29e

Please sign in to comment.