Skip to content

Commit

Permalink
rls: deflake tests (#5877)
Browse files Browse the repository at this point in the history
Fixes #5845
  • Loading branch information
easwars committed Dec 21, 2022
1 parent 08479c5 commit 94a65dc
Show file tree
Hide file tree
Showing 5 changed files with 166 additions and 79 deletions.
71 changes: 47 additions & 24 deletions balancer/rls/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,16 @@ func (rlsBB) Name() string {

func (rlsBB) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
lb := &rlsBalancer{
done: grpcsync.NewEvent(),
cc: cc,
bopts: opts,
purgeTicker: dataCachePurgeTicker(),
lbCfg: &lbConfig{},
pendingMap: make(map[cacheKey]*backoffState),
childPolicies: make(map[string]*childPolicyWrapper),
updateCh: buffer.NewUnbounded(),
closed: grpcsync.NewEvent(),
done: grpcsync.NewEvent(),
cc: cc,
bopts: opts,
purgeTicker: dataCachePurgeTicker(),
dataCachePurgeHook: dataCachePurgeHook,
lbCfg: &lbConfig{},
pendingMap: make(map[cacheKey]*backoffState),
childPolicies: make(map[string]*childPolicyWrapper),
updateCh: buffer.NewUnbounded(),
}
lb.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[rls-experimental-lb %p] ", lb))
lb.dataCache = newDataCache(maxCacheSize, lb.logger)
Expand All @@ -110,11 +112,13 @@ func (rlsBB) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.

// rlsBalancer implements the RLS LB policy.
type rlsBalancer struct {
done *grpcsync.Event
cc balancer.ClientConn
bopts balancer.BuildOptions
purgeTicker *time.Ticker
logger *internalgrpclog.PrefixLogger
closed *grpcsync.Event // Fires when Close() is invoked. Guarded by stateMu.
done *grpcsync.Event // Fires when Close() is done.
cc balancer.ClientConn
bopts balancer.BuildOptions
purgeTicker *time.Ticker
dataCachePurgeHook func()
logger *internalgrpclog.PrefixLogger

// If both cacheMu and stateMu need to be acquired, the former must be
// acquired first to prevent a deadlock. This order restriction is due to the
Expand Down Expand Up @@ -167,7 +171,18 @@ type controlChannelReady struct{}
// on to a channel that this goroutine will select on, thereby the handling of
// the update will happen asynchronously.
func (b *rlsBalancer) run() {
go b.purgeDataCache()
// We exit out of the for loop below only after `Close()` has been invoked.
// Firing the done event here will ensure that Close() returns only after
// all goroutines are done.
defer func() { b.done.Fire() }()

// Wait for purgeDataCache() goroutine to exit before returning from here.
doneCh := make(chan struct{})
defer func() {
<-doneCh
}()
go b.purgeDataCache(doneCh)

for {
select {
case u := <-b.updateCh.Get():
Expand All @@ -194,7 +209,7 @@ func (b *rlsBalancer) run() {
default:
b.logger.Errorf("Unsupported update type %T", update)
}
case <-b.done.Done():
case <-b.closed.Done():
return
}
}
Expand All @@ -203,10 +218,12 @@ func (b *rlsBalancer) run() {
// purgeDataCache is a long-running goroutine which periodically deletes expired
// entries. An expired entry is one for which both the expiryTime and
// backoffExpiryTime are in the past.
func (b *rlsBalancer) purgeDataCache() {
func (b *rlsBalancer) purgeDataCache(doneCh chan struct{}) {
defer close(doneCh)

for {
select {
case <-b.done.Done():
case <-b.closed.Done():
return
case <-b.purgeTicker.C:
b.cacheMu.Lock()
Expand All @@ -215,19 +232,21 @@ func (b *rlsBalancer) purgeDataCache() {
if updatePicker {
b.sendNewPicker()
}
dataCachePurgeHook()
b.dataCachePurgeHook()
}
}
}

func (b *rlsBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error {
defer clientConnUpdateHook()
if b.done.HasFired() {

b.stateMu.Lock()
if b.closed.HasFired() {
b.stateMu.Unlock()
b.logger.Warningf("Received service config after balancer close: %s", pretty.ToJSON(ccs.BalancerConfig))
return errBalancerClosed
}

b.stateMu.Lock()
newCfg := ccs.BalancerConfig.(*lbConfig)
if b.lbCfg.Equal(newCfg) {
b.stateMu.Unlock()
Expand Down Expand Up @@ -405,10 +424,9 @@ func (b *rlsBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.Sub
}

func (b *rlsBalancer) Close() {
b.done.Fire()

b.purgeTicker.Stop()
b.stateMu.Lock()
b.closed.Fire()
b.purgeTicker.Stop()
if b.ctrlCh != nil {
b.ctrlCh.close()
}
Expand All @@ -418,6 +436,8 @@ func (b *rlsBalancer) Close() {
b.cacheMu.Lock()
b.dataCache.stop()
b.cacheMu.Unlock()

<-b.done.Done()
}

func (b *rlsBalancer) ExitIdle() {
Expand Down Expand Up @@ -479,8 +499,11 @@ func (b *rlsBalancer) sendNewPickerLocked() {

func (b *rlsBalancer) sendNewPicker() {
b.stateMu.Lock()
defer b.stateMu.Unlock()
if b.closed.HasFired() {
return
}
b.sendNewPickerLocked()
b.stateMu.Unlock()
}

// The aggregated connectivity state reported is determined as follows:
Expand Down
37 changes: 37 additions & 0 deletions balancer/rls/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1048,6 +1048,43 @@ func (s) TestUpdateStatePauses(t *testing.T) {
// Make sure an RLS request is sent out.
verifyRLSRequest(t, rlsReqCh, true)

// Wait for the control channel to become READY, before reading the states
// out of the wrapping top-level balancer.
//
// makeTestRPCAndExpectItToReachBackend repeatedly sends RPCs with short
// deadlines until one succeeds. See its docstring for details.
//
// The following sequence of events is possible:
// 1. When the first RPC is attempted above, a pending cache entry is
// created, an RLS request is sent out, and the pick is queued. The
// channel is in CONNECTING state.
// 2. When the RLS response arrives, the pending cache entry is moved to the
// data cache, a child policy is created for the target specified in the
// response and a new picker is returned. The channel is still in
// CONNECTING, and retried pick is again queued.
// 3. The child policy moves through the standard set of states, IDLE -->
// CONNECTING --> READY. And for each of these state changes, a new
// picker is sent on the channel. But the overall connectivity state of
// the channel is still CONNECTING.
// 4. Right around the time when the child policy becomes READY, the
// deadline associated with the first RPC made by
// makeTestRPCAndExpectItToReachBackend() could expire, and it could send
// a new one. And because the internal state of the LB policy now
// contains a child policy which is READY, this RPC will succeed. But the
// RLS LB policy has yet to push a new picker on the channel.
// 5. If we read the states seen by the top-level wrapping LB policy without
// waiting for the channel to become READY, there is a possibility that we
// might not see the READY state in there. And if that happens, we will
// see two extra states in the last check made in the test, and thereby
// the test would fail. Waiting for the channel to become READY here
// ensures that the test does not flake because of this rare sequence of
// events.
for s := cc.GetState(); s != connectivity.Ready; s = cc.GetState() {
if !cc.WaitForStateChange(ctx, s) {
t.Fatal("Timeout when waiting for connectivity state to reach READY")
}
}

// Cache the state changes seen up to this point.
states0 := wb.getStates()

Expand Down
20 changes: 8 additions & 12 deletions balancer/rls/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ package rls
import (
"context"
"strings"
"sync"
"testing"
"time"

Expand All @@ -30,6 +29,7 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/balancergroup"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/grpctest"
rlspb "google.golang.org/grpc/internal/proto/grpc_lookup_v1"
internalserviceconfig "google.golang.org/grpc/internal/serviceconfig"
Expand Down Expand Up @@ -104,18 +104,14 @@ func neverThrottlingThrottler() *fakeThrottler {
}
}

// oneTimeAllowingThrottler returns a fake throttler which does not throttle the
// first request, but throttles everything that comes after. This is useful for
// tests which need to set up a valid cache entry before testing other cases.
func oneTimeAllowingThrottler() *fakeThrottler {
var once sync.Once
// oneTimeAllowingThrottler returns a fake throttler which does not throttle
// requests until the client RPC succeeds, but throttles everything that comes
// after. This is useful for tests which need to set up a valid cache entry
// before testing other cases.
func oneTimeAllowingThrottler(firstRPCDone *grpcsync.Event) *fakeThrottler {
return &fakeThrottler{
throttleFunc: func() bool {
throttle := true
once.Do(func() { throttle = false })
return throttle
},
throttleCh: make(chan struct{}, 1),
throttleFunc: firstRPCDone.HasFired,
throttleCh: make(chan struct{}, 1),
}
}

Expand Down

0 comments on commit 94a65dc

Please sign in to comment.