Skip to content

Commit

Permalink
Fixed child calls and child type switching
Browse files Browse the repository at this point in the history
  • Loading branch information
zasweq committed Jun 7, 2022
1 parent 334d66a commit cbac4fa
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 37 deletions.
64 changes: 30 additions & 34 deletions xds/internal/balancer/outlierdetection/balancer.go
Expand Up @@ -151,24 +151,12 @@ type outlierDetectionBalancer struct {
cc balancer.ClientConn
bOpts balancer.BuildOptions

// closeMu guards against run() reading a subconn update, reading that the
// child is not nil, and then a Close() call comes in, clears the balancer,
// and then run() continues to try and write the SubConn update to the
// child.
closeMu sync.Mutex
// child gets first written to on UpdateClientConnState and niled on Close.
// The only concurrent read that can happen is SubConnUpdates that are
// processed by run() (The rest of the child balancer calls are guaranteed
// to be called concurrently with Close(), as they are present in operations
// defined as part of the balancer.Balancer API.). This can only race with
// Close(), (child has to be built to receive SubConn updates) so protect
// SubConn updates and Close() with closeMu. nil checks on the child for
// forwarding updates are used as an invariant of the outlier detection
// balancer if it is closed.
child balancer.Balancer

// closeMu...canUpdateSubConnState cause close? If so move move niling to
// run(), and protect other reads with mu
// childMu protects child and also updates to the child (to uphold the
// balancer.Balancer API guarantee of synchronous calls). It also protects
// against run() reading that the child is not nil for SubConn updates, and
// then UpdateClientConnState or Close writing to the the child.
childMu sync.Mutex
child balancer.Balancer

// mu guards access to a lot of the core LB Policy State. It also prevents
// intersplicing certain operations.
Expand Down Expand Up @@ -205,14 +193,19 @@ func (b *outlierDetectionBalancer) UpdateClientConnState(s balancer.ClientConnSt
}

// Reject whole config if any errors, don't persist it for later
bb := balancer.Get(lbCfg.ChildPolicy.Name) // can nil panic, but child config already validated, (does parsing actually make sure child config is there?)
bb := balancer.Get(lbCfg.ChildPolicy.Name)
if bb == nil {
return fmt.Errorf("balancer %q not registered", lbCfg.ChildPolicy.Name)
}

if b.child == nil {
if b.child == nil || b.odCfg.ChildPolicy.Name != lbCfg.ChildPolicy.Name {
b.childMu.Lock()
if b.child != nil {
b.child.Close()
}
// What if this is nil? Seems fine
b.child = bb.Build(b, b.bOpts)
b.childMu.Unlock()
}

b.mu.Lock()
Expand Down Expand Up @@ -274,6 +267,8 @@ func (b *outlierDetectionBalancer) UpdateClientConnState(s balancer.ClientConnSt
b.pickerUpdateCh.Put(lbCfg)

// then pass the address list along to the child policy.
b.childMu.Lock()
defer b.childMu.Unlock()
return b.child.UpdateClientConnState(balancer.ClientConnState{
ResolverState: s.ResolverState,
BalancerConfig: b.odCfg.ChildPolicy.Config,
Expand All @@ -282,6 +277,8 @@ func (b *outlierDetectionBalancer) UpdateClientConnState(s balancer.ClientConnSt

func (b *outlierDetectionBalancer) ResolverError(err error) {
if b.child != nil {
b.childMu.Lock()
defer b.childMu.Unlock()
b.child.ResolverError(err)
}
}
Expand All @@ -307,11 +304,10 @@ func (b *outlierDetectionBalancer) UpdateSubConnState(sc balancer.SubConn, state
func (b *outlierDetectionBalancer) Close() {
b.closed.Fire()
if b.child != nil {
b.closeMu.Lock()
child := b.child
b.childMu.Lock()
b.child.Close()
b.child = nil
b.closeMu.Unlock()
child.Close()
b.childMu.Unlock()
}

// Any other cleanup needs to happen (subconns, other resources?)
Expand All @@ -330,10 +326,10 @@ func (b *outlierDetectionBalancer) ExitIdle() {
ei.ExitIdle()
return
}
// Fallback for children handled in clusterimpl balancer - do we ever
// validate that it's a clusterimpl child for the config? We should?
// Removing SubConns is defined in API and also in graceful switch balancer,
// but already done in ClusterImpl.

// Fallback for children handled in clusterimpl balancer Removing SubConns
// is defined in API and also in graceful switch balancer, but already done
// in ClusterImpl. I guess we should do that here?
}

// "The outlier_detection LB policy will provide a picker that delegates to the
Expand Down Expand Up @@ -670,11 +666,11 @@ func (b *outlierDetectionBalancer) run() {
case *scUpdate:
scw := u.scw
scw.latestState = u.state
b.closeMu.Lock()
b.childMu.Lock()
if !scw.ejected && b.child != nil {
b.child.UpdateSubConnState(scw, u.state) // can this call back and close, no close comes from higher level...unless UpdateSubConnState -> UpdateState -> Close(), that would cause deadlock
b.child.UpdateSubConnState(scw, u.state)
}
b.closeMu.Unlock()
b.childMu.Unlock()
case *ejectedUpdate:
scw := u.scw
scw.ejected = u.ejected
Expand All @@ -692,11 +688,11 @@ func (b *outlierDetectionBalancer) run() {
// along updates from the underlying subchannel."
stateToUpdate = scw.latestState // If this has never been written to will send connectivity IDLE which seems fine to me
}
b.closeMu.Lock()
b.childMu.Lock()
if b.child != nil {
b.child.UpdateSubConnState(scw, stateToUpdate)
}
b.closeMu.Unlock()
b.childMu.Unlock()
}
case update := <-b.pickerUpdateCh.Get():
b.pickerUpdateCh.Load()
Expand All @@ -706,7 +702,7 @@ func (b *outlierDetectionBalancer) run() {
switch u := update.(type) {
case balancer.State:
b.childState = u
b.mu.Lock() // Could make another mu that only protect the config to prevent this from blocking, but I think this is cleaner
b.mu.Lock()
noopCfg := b.noopConfig()
b.mu.Unlock()
b.recentPickerNoop = noopCfg
Expand Down
93 changes: 90 additions & 3 deletions xds/internal/balancer/outlierdetection/balancer_test.go
Expand Up @@ -24,6 +24,7 @@ import (
"errors"
"fmt"
"strings"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -439,6 +440,81 @@ func (s) TestUpdateClientConnState(t *testing.T) {
}
}

// TestUpdateClientConnStateDifferentType invokes the UpdateClientConnState
// method on the odBalancer with two different types and verifies that the child
// balancer is built and updated properly on the first, and the second update
// closes the child and builds a new one.
func (s) TestUpdateClientConnStateDifferentType(t *testing.T) {
od, _ := setup(t)
defer od.Close() // this will leak a goroutine otherwise

od.UpdateClientConnState(balancer.ClientConnState{
BalancerConfig: &LBConfig{
Interval: 10 * time.Second,
BaseEjectionTime: 30 * time.Second,
MaxEjectionTime: 300 * time.Second,
MaxEjectionPercent: 10,
SuccessRateEjection: &SuccessRateEjection{
StdevFactor: 1900,
EnforcementPercentage: 100,
MinimumHosts: 5,
RequestVolume: 100,
},
FailurePercentageEjection: &FailurePercentageEjection{
Threshold: 85,
EnforcementPercentage: 5,
MinimumHosts: 5,
RequestVolume: 50,
},
ChildPolicy: &internalserviceconfig.BalancerConfig{
Name: tcibname,
Config: testClusterImplBalancerConfig{},
},
},
})

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
ciChild := od.child.(*testClusterImplBalancer)
// The child balancer should be created and forwarded the ClientConn update
// from the first successful UpdateClientConnState call.
if err := ciChild.waitForClientConnUpdate(ctx, balancer.ClientConnState{
BalancerConfig: testClusterImplBalancerConfig{},
}); err != nil {
t.Fatalf("Error waiting for Client Conn update: %v", err)
}

od.UpdateClientConnState(balancer.ClientConnState{
BalancerConfig: &LBConfig{
Interval: 10 * time.Second,
BaseEjectionTime: 30 * time.Second,
MaxEjectionTime: 300 * time.Second,
MaxEjectionPercent: 10,
SuccessRateEjection: &SuccessRateEjection{
StdevFactor: 1900,
EnforcementPercentage: 100,
MinimumHosts: 5,
RequestVolume: 100,
},
FailurePercentageEjection: &FailurePercentageEjection{
Threshold: 85,
EnforcementPercentage: 5,
MinimumHosts: 5,
RequestVolume: 50,
},
ChildPolicy: &internalserviceconfig.BalancerConfig{
Name: verifyBalancerName,
Config: verifyBalancerConfig{},
},
},
})

// Verify previous child balancer closed.
if err := ciChild.waitForClose(ctx); err != nil {
t.Fatalf("Error waiting for Close() call on child balancer %v", err)
}
}

// TestUpdateState tests that an UpdateState call gets forwarded to the
// ClientConn.
func (s) TestUpdateState(t *testing.T) {
Expand Down Expand Up @@ -1786,7 +1862,10 @@ func (s) TestConcurrentOperations(t *testing.T) {
}

finished := make(chan struct{})
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-finished:
Expand All @@ -1803,7 +1882,9 @@ func (s) TestConcurrentOperations(t *testing.T) {
}
}()

wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-finished:
Expand All @@ -1817,7 +1898,9 @@ func (s) TestConcurrentOperations(t *testing.T) {
// call Outlier Detection's balancer.ClientConn operations asynchrously.
// balancer.ClientConn operations have no guarantee from the API to be
// called synchronously.
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-finished:
Expand All @@ -1834,15 +1917,21 @@ func (s) TestConcurrentOperations(t *testing.T) {
}
}()

wg.Add(1)
go func() {
defer wg.Done()
od.NewSubConn([]resolver.Address{{Addr: "address4"}}, balancer.NewSubConnOptions{})
}()

wg.Add(1)
go func() {
defer wg.Done()
od.RemoveSubConn(scw1)
}()

wg.Add(1)
go func() {
defer wg.Done()
od.UpdateAddresses(scw2, []resolver.Address{
{
Addr: "address3",
Expand Down Expand Up @@ -1878,9 +1967,7 @@ func (s) TestConcurrentOperations(t *testing.T) {
od.ExitIdle()
od.Close()
close(finished)
// Do I need to wait for all the spawned goroutines to return here in this
// main testing goroutine? It works fine without a sync point, but I feel
// like other tests wait.
wg.Wait()
}

type verifyBalancerConfig struct {
Expand Down

0 comments on commit cbac4fa

Please sign in to comment.