From c14e29e609121fa66a1621141fae4fc9cc28e727 Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Mon, 1 Aug 2022 16:10:21 -0700 Subject: [PATCH] rls: suppress picker updates from children when handling config updates (#5539) --- balancer/rls/balancer.go | 147 ++++++++++-------- balancer/rls/balancer_test.go | 264 ++++++++++++++++++++++++++++++++ balancer/rls/control_channel.go | 16 +- 3 files changed, 357 insertions(+), 70 deletions(-) diff --git a/balancer/rls/balancer.go b/balancer/rls/balancer.go index cde95d992d6..b7a11e8851b 100644 --- a/balancer/rls/balancer.go +++ b/balancer/rls/balancer.go @@ -21,6 +21,7 @@ package rls import ( "encoding/json" + "errors" "fmt" "sync" "sync/atomic" @@ -36,6 +37,7 @@ import ( "google.golang.org/grpc/internal/buffer" internalgrpclog "google.golang.org/grpc/internal/grpclog" "google.golang.org/grpc/internal/grpcsync" + "google.golang.org/grpc/internal/pretty" "google.golang.org/grpc/resolver" ) @@ -50,7 +52,8 @@ const ( ) var ( - logger = grpclog.Component("rls") + logger = grpclog.Component("rls") + errBalancerClosed = errors.New("rls LB policy is closed") // Below defined vars for overriding in unit tests. @@ -88,16 +91,14 @@ func (rlsBB) Name() string { func (rlsBB) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer { lb := &rlsBalancer{ - done: grpcsync.NewEvent(), - cc: cc, - bopts: opts, - purgeTicker: dataCachePurgeTicker(), - lbCfg: &lbConfig{}, - pendingMap: make(map[cacheKey]*backoffState), - childPolicies: make(map[string]*childPolicyWrapper), - ccUpdateCh: make(chan *balancer.ClientConnState, 1), - childPolicyStateUpdateCh: buffer.NewUnbounded(), - connectivityStateCh: make(chan struct{}), + done: grpcsync.NewEvent(), + cc: cc, + bopts: opts, + purgeTicker: dataCachePurgeTicker(), + lbCfg: &lbConfig{}, + pendingMap: make(map[cacheKey]*backoffState), + childPolicies: make(map[string]*childPolicyWrapper), + updateCh: buffer.NewUnbounded(), } lb.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[rls-experimental-lb %p] ", lb)) lb.dataCache = newDataCache(maxCacheSize, lb.logger) @@ -139,13 +140,28 @@ type rlsBalancer struct { // default child policy wrapper when a new picker is created. See // sendNewPickerLocked() for details. lastPicker *rlsPicker + // Set during UpdateClientConnState when pushing updates to child policies. + // Prevents state updates from child policies causing new pickers to be sent + // up the channel. Cleared after all child policies have processed the + // updates sent to them, after which a new picker is sent up the channel. + inhibitPickerUpdates bool + + // Channel on which all updates are pushed. Processed in run(). + updateCh *buffer.Unbounded +} + +type resumePickerUpdates struct { + done chan struct{} +} - // Channels on which updates are received or pushed. - ccUpdateCh chan *balancer.ClientConnState - childPolicyStateUpdateCh *buffer.Unbounded // idAndState from child policy. - connectivityStateCh chan struct{} // signalled when control channel becomes READY again. +// childPolicyIDAndState wraps a child policy id and its state update. +type childPolicyIDAndState struct { + id string + state balancer.State } +type controlChannelReady struct{} + // run is a long-running goroutine which handles all the updates that the // balancer wishes to handle. The appropriate updateHandler will push the update // on to a channel that this goroutine will select on, thereby the handling of @@ -154,21 +170,30 @@ func (b *rlsBalancer) run() { go b.purgeDataCache() for { select { - case u := <-b.ccUpdateCh: - b.handleClientConnUpdate(u) - case u := <-b.childPolicyStateUpdateCh.Get(): - update := u.(idAndState) - b.childPolicyStateUpdateCh.Load() - b.handleChildPolicyStateUpdate(update.id, update.state) - case <-b.connectivityStateCh: - b.logger.Infof("Resetting backoff state after control channel getting back to READY") - b.cacheMu.Lock() - updatePicker := b.dataCache.resetBackoffState(&backoffState{bs: defaultBackoffStrategy}) - b.cacheMu.Unlock() - if updatePicker { - b.sendNewPicker() + case u := <-b.updateCh.Get(): + b.updateCh.Load() + switch update := u.(type) { + case childPolicyIDAndState: + b.handleChildPolicyStateUpdate(update.id, update.state) + case controlChannelReady: + b.logger.Infof("Resetting backoff state after control channel getting back to READY") + b.cacheMu.Lock() + updatePicker := b.dataCache.resetBackoffState(&backoffState{bs: defaultBackoffStrategy}) + b.cacheMu.Unlock() + if updatePicker { + b.sendNewPicker() + } + resetBackoffHook() + case resumePickerUpdates: + b.stateMu.Lock() + b.logger.Infof("Resuming picker updates after config propagation to child policies") + b.inhibitPickerUpdates = false + b.sendNewPickerLocked() + close(update.done) + b.stateMu.Unlock() + default: + b.logger.Errorf("Unsupported update type %T", update) } - resetBackoffHook() case <-b.done.Done(): return } @@ -196,33 +221,23 @@ func (b *rlsBalancer) purgeDataCache() { } func (b *rlsBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error { - // Remove unprocessed update from the channel, if one exists, before pushing - // the most recent one. - select { - case <-b.ccUpdateCh: - default: - } - b.ccUpdateCh <- &ccs - return nil -} - -// handleClientConnUpdate handles updates to the service config. -// -// Invoked from the run() goroutine and this will attempt to grab the mutex. -func (b *rlsBalancer) handleClientConnUpdate(ccs *balancer.ClientConnState) { + defer clientConnUpdateHook() if b.done.HasFired() { - b.logger.Warningf("Received service config after balancer close") - return + b.logger.Warningf("Received service config after balancer close: %s", pretty.ToJSON(ccs.BalancerConfig)) + return errBalancerClosed } b.stateMu.Lock() - defer b.stateMu.Unlock() newCfg := ccs.BalancerConfig.(*lbConfig) if b.lbCfg.Equal(newCfg) { + b.stateMu.Unlock() b.logger.Infof("New service config matches existing config") - return + return nil } + b.logger.Infof("Delaying picker updates until config is propagated to and processed by child policies") + b.inhibitPickerUpdates = true + // When the RLS server name changes, the old control channel needs to be // swapped out for a new one. All state associated with the throttling // algorithm is stored on a per-control-channel basis; when we swap out @@ -238,13 +253,19 @@ func (b *rlsBalancer) handleClientConnUpdate(ccs *balancer.ClientConnState) { // Any changes to child policy name or configuration needs to be handled by // either creating new child policies or pushing updates to existing ones. b.resolverState = ccs.ResolverState - b.handleChildPolicyConfigUpdate(newCfg, ccs) + b.handleChildPolicyConfigUpdate(newCfg, &ccs) - // Update the copy of the config in the LB policy and send a new picker. + // Update the copy of the config in the LB policy before releasing the lock. b.lbCfg = newCfg - b.sendNewPickerLocked() - clientConnUpdateHook() + // Enqueue an event which will notify us when the above update has been + // propagated to all child policies, and the child policies have all + // processed their updates, and we have sent a picker update. + done := make(chan struct{}) + b.updateCh.Put(resumePickerUpdates{done: done}) + b.stateMu.Unlock() + <-done + return nil } // handleControlChannelUpdate handles updates to service config fields which @@ -258,7 +279,10 @@ func (b *rlsBalancer) handleControlChannelUpdate(newCfg *lbConfig) { // Create a new control channel and close the existing one. b.logger.Infof("Creating control channel to RLS server at: %v", newCfg.lookupService) - ctrlCh, err := newControlChannel(newCfg.lookupService, newCfg.controlChannelServiceConfig, newCfg.lookupServiceTimeout, b.bopts, b.connectivityStateCh) + backToReadyFn := func() { + b.updateCh.Put(controlChannelReady{}) + } + ctrlCh, err := newControlChannel(newCfg.lookupService, newCfg.controlChannelServiceConfig, newCfg.lookupServiceTimeout, b.bopts, backToReadyFn) if err != nil { // This is very uncommon and usually represents a non-transient error. // There is not much we can do here other than wait for another update @@ -438,8 +462,13 @@ func (b *rlsBalancer) sendNewPickerLocked() { ConnectivityState: aggregatedState, Picker: picker, } - b.logger.Infof("New balancer.State: %+v", state) - b.cc.UpdateState(state) + + if !b.inhibitPickerUpdates { + b.logger.Infof("New balancer.State: %+v", state) + b.cc.UpdateState(state) + } else { + b.logger.Infof("Delaying picker update: %+v", state) + } if b.lastPicker != nil { if b.defaultPolicy != nil { @@ -499,18 +528,12 @@ func (b *rlsBalancer) aggregatedConnectivityState() connectivity.State { } } -// idAndState wraps a child policy id and its state update. -type idAndState struct { - id string - state balancer.State -} - // UpdateState is a implementation of the balancergroup.BalancerStateAggregator // interface. The actual state aggregation functionality is handled // asynchronously. This method only pushes the state update on to channel read // and dispatched by the run() goroutine. func (b *rlsBalancer) UpdateState(id string, state balancer.State) { - b.childPolicyStateUpdateCh.Put(idAndState{id: id, state: state}) + b.updateCh.Put(childPolicyIDAndState{id: id, state: state}) } // handleChildPolicyStateUpdate provides the state aggregator functionality for @@ -543,7 +566,7 @@ func (b *rlsBalancer) handleChildPolicyStateUpdate(id string, newState balancer. return } atomic.StorePointer(&cpw.state, unsafe.Pointer(&newState)) - b.logger.Infof("Child policy %q has new state %+v", id, cpw.state) + b.logger.Infof("Child policy %q has new state %+v", id, newState) b.sendNewPickerLocked() } diff --git a/balancer/rls/balancer_test.go b/balancer/rls/balancer_test.go index 1c2f9fca32a..7df7f433520 100644 --- a/balancer/rls/balancer_test.go +++ b/balancer/rls/balancer_test.go @@ -20,8 +20,10 @@ package rls import ( "context" + "encoding/json" "errors" "fmt" + "sync" "testing" "time" @@ -30,6 +32,7 @@ import ( "google.golang.org/grpc/balancer" "google.golang.org/grpc/balancer/rls/internal/test/e2e" "google.golang.org/grpc/codes" + "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/internal" @@ -39,6 +42,7 @@ import ( rlstest "google.golang.org/grpc/internal/testutils/rls" "google.golang.org/grpc/metadata" "google.golang.org/grpc/resolver" + "google.golang.org/grpc/resolver/manual" "google.golang.org/grpc/serviceconfig" "google.golang.org/grpc/testdata" "google.golang.org/protobuf/types/known/durationpb" @@ -833,3 +837,263 @@ func (s) TestControlChannelConnectivityStateMonitoring(t *testing.T) { makeTestRPCAndExpectItToReachBackend(ctx, t, cc, backendCh) verifyRLSRequest(t, rlsReqCh, true) } + +const wrappingTopLevelBalancerName = "wrapping-top-level-balancer" +const multipleUpdateStateChildBalancerName = "multiple-update-state-child-balancer" + +type wrappingTopLevelBalancerBuilder struct { + balCh chan balancer.Balancer +} + +func (w *wrappingTopLevelBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer { + tlb := &wrappingTopLevelBalancer{ClientConn: cc} + tlb.Balancer = balancer.Get(Name).Build(tlb, balancer.BuildOptions{}) + w.balCh <- tlb + return tlb +} + +func (w *wrappingTopLevelBalancerBuilder) Name() string { + return wrappingTopLevelBalancerName +} + +func (w *wrappingTopLevelBalancerBuilder) ParseConfig(sc json.RawMessage) (serviceconfig.LoadBalancingConfig, error) { + parser := balancer.Get(Name).(balancer.ConfigParser) + return parser.ParseConfig(sc) +} + +// wrappingTopLevelBalancer acts as the top-level LB policy on the channel and +// wraps an RLS LB policy. It forwards all balancer API calls unmodified to the +// underlying RLS LB policy. It overrides the UpdateState method on the +// balancer.ClientConn passed to the RLS LB policy and stores all state updates +// pushed by the latter. +type wrappingTopLevelBalancer struct { + balancer.ClientConn + balancer.Balancer + + mu sync.Mutex + states []balancer.State +} + +func (w *wrappingTopLevelBalancer) UpdateState(bs balancer.State) { + w.mu.Lock() + w.states = append(w.states, bs) + w.mu.Unlock() + w.ClientConn.UpdateState(bs) +} + +func (w *wrappingTopLevelBalancer) getStates() []balancer.State { + w.mu.Lock() + defer w.mu.Unlock() + + states := make([]balancer.State, len(w.states)) + for i, s := range w.states { + states[i] = s + } + return states +} + +// wrappedPickFirstBalancerBuilder builds a balancer which wraps a pickfirst +// balancer. The wrapping balancing receives addresses to be passed to the +// underlying pickfirst balancer as part of its configuration. +type wrappedPickFirstBalancerBuilder struct{} + +func (wrappedPickFirstBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer { + builder := balancer.Get(grpc.PickFirstBalancerName) + wpfb := &wrappedPickFirstBalancer{ + ClientConn: cc, + } + pf := builder.Build(wpfb, opts) + wpfb.Balancer = pf + return wpfb +} + +func (wrappedPickFirstBalancerBuilder) Name() string { + return multipleUpdateStateChildBalancerName +} + +type WrappedPickFirstBalancerConfig struct { + serviceconfig.LoadBalancingConfig + Backend string // The target for which this child policy was created. +} + +func (wbb *wrappedPickFirstBalancerBuilder) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, error) { + cfg := &WrappedPickFirstBalancerConfig{} + if err := json.Unmarshal(c, cfg); err != nil { + return nil, err + } + return cfg, nil +} + +// wrappedPickFirstBalancer wraps a pickfirst balancer and makes multiple calls +// to UpdateState when handling a config update in UpdateClientConnState. When +// this policy is used as a child policy of the RLS LB policy, it is expected +// that the latter suppress these updates and push a single picker update on the +// channel (after the config has been processed by all child policies). +type wrappedPickFirstBalancer struct { + balancer.Balancer + balancer.ClientConn +} + +func (wb *wrappedPickFirstBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error { + wb.ClientConn.UpdateState(balancer.State{ConnectivityState: connectivity.Idle, Picker: &testutils.TestConstPicker{Err: balancer.ErrNoSubConnAvailable}}) + wb.ClientConn.UpdateState(balancer.State{ConnectivityState: connectivity.Connecting, Picker: &testutils.TestConstPicker{Err: balancer.ErrNoSubConnAvailable}}) + + cfg := ccs.BalancerConfig.(*WrappedPickFirstBalancerConfig) + return wb.Balancer.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: resolver.State{Addresses: []resolver.Address{{Addr: cfg.Backend}}}, + }) +} + +func (wb *wrappedPickFirstBalancer) UpdateState(state balancer.State) { + // Eat it if IDLE - allows it to switch over only on a READY SubConn. + if state.ConnectivityState == connectivity.Idle { + return + } + wb.ClientConn.UpdateState(state) +} + +// TestUpdateStatePauses tests the scenario where a config update received by +// the RLS LB policy results in multiple UpdateState calls from the child +// policies. This test verifies that picker updates are paused when the config +// update is being processed by RLS LB policy and its child policies. +// +// The test uses a wrapping balancer as the top-level LB policy on the channel. +// The wrapping balancer wraps an RLS LB policy as a child policy and forwards +// all calls to it. It also records the UpdateState() calls from the RLS LB +// policy and makes it available for inspection by the test. +// +// The test uses another wrapped balancer (which wraps a pickfirst balancer) as +// the child policy of the RLS LB policy. This balancer makes multiple +// UpdateState calls when handling an update from its parent in +// UpdateClientConnState. +func (s) TestUpdateStatePauses(t *testing.T) { + // Override the hook to get notified when UpdateClientConnState is done. + clientConnUpdateDone := make(chan struct{}, 1) + origClientConnUpdateHook := clientConnUpdateHook + clientConnUpdateHook = func() { clientConnUpdateDone <- struct{}{} } + defer func() { clientConnUpdateHook = origClientConnUpdateHook }() + + // Register the top-level wrapping balancer which forwards calls to RLS. + bb := &wrappingTopLevelBalancerBuilder{balCh: make(chan balancer.Balancer, 1)} + balancer.Register(bb) + + // Start an RLS server and set the throttler to never throttle requests. + rlsServer, rlsReqCh := rlstest.SetupFakeRLSServer(t, nil) + overrideAdaptiveThrottler(t, neverThrottlingThrottler()) + + // Start a test backend and set the RLS server to respond with it. + testBackendCh, testBackendAddress := startBackend(t) + rlsServer.SetResponseCallback(func(_ context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse { + return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{testBackendAddress}}} + }) + + // Register a child policy which wraps a pickfirst balancer and receives the + // backend address as part of its configuration. + balancer.Register(&wrappedPickFirstBalancerBuilder{}) + + // Register a manual resolver and push the RLS service config through it. + r := manual.NewBuilderWithScheme("rls-e2e") + scJSON := fmt.Sprintf(` +{ + "loadBalancingConfig": [ + { + "%s": { + "routeLookupConfig": { + "grpcKeybuilders": [{ + "names": [{"service": "grpc.testing.TestService"}] + }], + "lookupService": "%s", + "cacheSizeBytes": 1000 + }, + "childPolicy": [{"%s": {}}], + "childPolicyConfigTargetFieldName": "Backend" + } + } + ] +}`, wrappingTopLevelBalancerName, rlsServer.Address, multipleUpdateStateChildBalancerName) + sc := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(scJSON) + r.InitialState(resolver.State{ServiceConfig: sc}) + + cc, err := grpc.Dial(r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Fatalf("grpc.Dial() failed: %v", err) + } + defer cc.Close() + + // Wait for the clientconn update to be processed by the RLS LB policy. + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + select { + case <-ctx.Done(): + case <-clientConnUpdateDone: + } + + // Get the top-level LB policy configured on the channel, to be able to read + // the state updates pushed by its child (the RLS LB policy.) + var wb *wrappingTopLevelBalancer + select { + case <-ctx.Done(): + t.Fatal("Timeout when waiting for state update on the top-level LB policy") + case b := <-bb.balCh: + wb = b.(*wrappingTopLevelBalancer) + } + + // It is important to note that at this point no child policies have been + // created because we have not attempted any RPC so far. When we attempt an + // RPC (below), child policies will be created and their configs will be + // pushed to them. But this config update will not happen in the context of + // a config update on the parent. + + // Make an RPC and ensure it gets routed to the test backend. + makeTestRPCAndExpectItToReachBackend(ctx, t, cc, testBackendCh) + + // Make sure an RLS request is sent out. + verifyRLSRequest(t, rlsReqCh, true) + + // Cache the state changes seen up to this point. + states0 := wb.getStates() + + // Push an updated service config. As mentioned earlier, the previous config + // updates on the child policies did not happen in the context of a config + // update on the parent. Hence, this update is required to force the + // scenario which we are interesting in testing here, i.e child policies get + // config updates as part of the parent policy getting its config update. + scJSON = fmt.Sprintf(` +{ + "loadBalancingConfig": [ + { + "%s": { + "routeLookupConfig": { + "grpcKeybuilders": [{ + "names": [ + {"service": "grpc.testing.TestService"}, + {"service": "grpc.health.v1.Health"} + ] + }], + "lookupService": "%s", + "cacheSizeBytes": 1000 + }, + "childPolicy": [{"%s": {}}], + "childPolicyConfigTargetFieldName": "Backend" + } + } + ] +}`, wrappingTopLevelBalancerName, rlsServer.Address, multipleUpdateStateChildBalancerName) + sc = internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(scJSON) + r.UpdateState(resolver.State{ServiceConfig: sc}) + + // Wait for the clientconn update to be processed by the RLS LB policy. + select { + case <-ctx.Done(): + case <-clientConnUpdateDone: + } + + // Even though the child policies used in this test make multiple calls to + // UpdateState as part of handling their configs, we expect the RLS policy + // to inhibit picker updates during this time frame, and send a single + // picker once the config update is completely handled. + states1 := wb.getStates() + if len(states1) != len(states0)+1 { + t.Fatalf("more than one state update seen. before %v, after %v", states0, states1) + } +} diff --git a/balancer/rls/control_channel.go b/balancer/rls/control_channel.go index df78f7b55fb..4acc11d90e9 100644 --- a/balancer/rls/control_channel.go +++ b/balancer/rls/control_channel.go @@ -48,9 +48,9 @@ type controlChannel struct { // rpcTimeout specifies the timeout for the RouteLookup RPC call. The LB // policy receives this value in its service config. rpcTimeout time.Duration - // backToReadyCh is the channel on which an update is pushed when the - // connectivity state changes from READY --> TRANSIENT_FAILURE --> READY. - backToReadyCh chan struct{} + // backToReadyFunc is a callback to be invoked when the connectivity state + // changes from READY --> TRANSIENT_FAILURE --> READY. + backToReadyFunc func() // throttler in an adaptive throttling implementation used to avoid // hammering the RLS service while it is overloaded or down. throttler adaptiveThrottler @@ -63,11 +63,11 @@ type controlChannel struct { // newControlChannel creates a controlChannel to rlsServerName and uses // serviceConfig, if non-empty, as the default service config for the underlying // gRPC channel. -func newControlChannel(rlsServerName, serviceConfig string, rpcTimeout time.Duration, bOpts balancer.BuildOptions, backToReadyCh chan struct{}) (*controlChannel, error) { +func newControlChannel(rlsServerName, serviceConfig string, rpcTimeout time.Duration, bOpts balancer.BuildOptions, backToReadyFunc func()) (*controlChannel, error) { ctrlCh := &controlChannel{ - rpcTimeout: rpcTimeout, - backToReadyCh: backToReadyCh, - throttler: newAdaptiveThrottler(), + rpcTimeout: rpcTimeout, + backToReadyFunc: backToReadyFunc, + throttler: newAdaptiveThrottler(), } ctrlCh.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[rls-control-channel %p] ", ctrlCh)) @@ -171,7 +171,7 @@ func (cc *controlChannel) monitorConnectivityState() { if !first { cc.logger.Infof("Control channel back to READY") - cc.backToReadyCh <- struct{}{} + cc.backToReadyFunc() } first = false