From 73528ce65faa883f78528a0b3fb86dd7cb65bfb3 Mon Sep 17 00:00:00 2001 From: Zach Reyes Date: Wed, 6 Jul 2022 19:46:56 -0400 Subject: [PATCH] Responded to Easwar's comments --- test/xds/xds_client_outlier_detection_test.go | 33 ++-- .../balancer/outlierdetection/balancer.go | 163 +++++++++--------- .../outlierdetection/balancer_test.go | 8 +- .../balancer/outlierdetection/callcounter.go | 7 +- .../outlierdetection/subconn_wrapper.go | 20 ++- 5 files changed, 122 insertions(+), 109 deletions(-) diff --git a/test/xds/xds_client_outlier_detection_test.go b/test/xds/xds_client_outlier_detection_test.go index d304d35a31a..56660ae9e78 100644 --- a/test/xds/xds_client_outlier_detection_test.go +++ b/test/xds/xds_client_outlier_detection_test.go @@ -42,10 +42,13 @@ import ( "google.golang.org/protobuf/types/known/wrapperspb" ) -// TestOutlierDetection tests an xDS configured ClientConn with an Outlier -// Detection present in the system which is a logical no-op. An RPC should -// proceed as normal. -func (s) TestOutlierDetection(t *testing.T) { +// TestOutlierDetection_NoopConfig tests the scenario where the Outlier +// Detection feature is enabled on the gRPC client, but it receives no Outlier +// Detection configuration from the management server. This should result in a +// no-op Outlier Detection configuration being used to configure the Outlier +// Detection balancer. This test verifies that an RPC is able to proceed +// normally with this configuration. +func (s) TestOutlierDetection_NoopConfig(t *testing.T) { oldOD := envconfig.XDSOutlierDetection envconfig.XDSOutlierDetection = true internal.RegisterOutlierDetectionBalancerForTesting() @@ -54,7 +57,7 @@ func (s) TestOutlierDetection(t *testing.T) { internal.UnregisterOutlierDetectionBalancerForTesting() }() - managementServer, nodeID, _, resolver, cleanup1 := e2e.SetupManagementServer(t) + managementServer, nodeID, _, resolver, cleanup1 := e2e.SetupManagementServer(t, nil) defer cleanup1() port, cleanup2 := startTestService(t, nil) @@ -87,12 +90,12 @@ func (s) TestOutlierDetection(t *testing.T) { } } -// defaultClientResourcesSpecifyingMultipleBackendsAndOutlierDetection returns -// xDS resources which correspond to multiple upstreams, corresponding different -// backends listening on different localhost:port combinations. The resources -// also configure an Outlier Detection Balancer set up with Failure Percentage -// Algorithm, which ejects endpoints based on failure rate. -func defaultClientResourcesSpecifyingMultipleBackendsAndOutlierDetection(params e2e.ResourceParams, ports []uint32) e2e.UpdateOptions { +// defaultClientResourcesMultipleBackendsAndOD returns xDS resources which +// correspond to multiple upstreams, corresponding different backends listening +// on different localhost:port combinations. The resources also configure an +// Outlier Detection Balancer set up with Failure Percentage Algorithm, which +// ejects endpoints based on failure rate. +func defaultClientResourcesMultipleBackendsAndOD(params e2e.ResourceParams, ports []uint32) e2e.UpdateOptions { routeConfigName := "route-" + params.DialTarget clusterName := "cluster-" + params.DialTarget endpointsName := "endpoints-" + params.DialTarget @@ -108,9 +111,7 @@ func defaultClientResourcesSpecifyingMultipleBackendsAndOutlierDetection(params func defaultClusterWithOutlierDetection(clusterName, edsServiceName string, secLevel e2e.SecurityLevel) *v3clusterpb.Cluster { cluster := e2e.DefaultCluster(clusterName, edsServiceName, secLevel) cluster.OutlierDetection = &v3clusterpb.OutlierDetection{ - Interval: &durationpb.Duration{ - Nanos: 500000000, - }, + Interval: &durationpb.Duration{Nanos: 50000000}, // .5 seconds BaseEjectionTime: &durationpb.Duration{Seconds: 30}, MaxEjectionTime: &durationpb.Duration{Seconds: 300}, MaxEjectionPercent: &wrapperspb.UInt32Value{Value: 1}, @@ -136,7 +137,7 @@ func (s) TestOutlierDetectionWithOutlier(t *testing.T) { internal.UnregisterOutlierDetectionBalancerForTesting() }() - managementServer, nodeID, _, resolver, cleanup := e2e.SetupManagementServer(t) + managementServer, nodeID, _, resolver, cleanup := e2e.SetupManagementServer(t, nil) defer cleanup() // counters for how many times backends got called @@ -172,7 +173,7 @@ func (s) TestOutlierDetectionWithOutlier(t *testing.T) { defer cleanup3() const serviceName = "my-service-client-side-xds" - resources := defaultClientResourcesSpecifyingMultipleBackendsAndOutlierDetection(e2e.ResourceParams{ + resources := defaultClientResourcesMultipleBackendsAndOD(e2e.ResourceParams{ DialTarget: serviceName, NodeID: nodeID, Host: "localhost", diff --git a/xds/internal/balancer/outlierdetection/balancer.go b/xds/internal/balancer/outlierdetection/balancer.go index 1331afed9e7..b33719d235f 100644 --- a/xds/internal/balancer/outlierdetection/balancer.go +++ b/xds/internal/balancer/outlierdetection/balancer.go @@ -71,7 +71,7 @@ func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Ba cc: cc, bOpts: bOpts, closed: grpcsync.NewEvent(), - odAddrs: make(map[string]*object), + addrs: make(map[string]*addressInfo), scWrappers: make(map[balancer.SubConn]*subConnWrapper), scUpdateCh: buffer.NewUnbounded(), pickerUpdateCh: buffer.NewUnbounded(), @@ -165,16 +165,16 @@ type outlierDetectionBalancer struct { // intersplicing certain operations. // // ex 1: interval timer goes off, outlier detection algorithm starts running - // based on knobs in odCfg. in the middle of running the algorithm, a - // ClientConn update comes in and writes to odCfg. This causes undefined + // based on knobs in cfg. in the middle of running the algorithm, a + // ClientConn update comes in and writes to cfg. This causes undefined // behavior for the interval timer algorithm. // - // ex 2: Updating the odAddrs map from UpdateAddresses in the middle of - // running the interval timer algorithm which uses odAddrs heavily. This - // will cause undefined behavior for the interval timer algorithm. + // ex 2: Updating the addrs map from UpdateAddresses in the middle of + // running the interval timer algorithm which uses addrs heavily. This will + // cause undefined behavior for the interval timer algorithm. mu sync.Mutex - odAddrs map[string]*object - odCfg *LBConfig + addrs map[string]*addressInfo + cfg *LBConfig scWrappers map[balancer.SubConn]*subConnWrapper timerStartTime time.Time intervalTimer *time.Timer @@ -186,7 +186,7 @@ type outlierDetectionBalancer struct { // noopConfig returns whether this balancer is configured with a logical no-op // configuration or not. func (b *outlierDetectionBalancer) noopConfig() bool { - return b.odCfg.SuccessRateEjection == nil && b.odCfg.FailurePercentageEjection == nil + return b.cfg.SuccessRateEjection == nil && b.cfg.FailurePercentageEjection == nil } func (b *outlierDetectionBalancer) UpdateClientConnState(s balancer.ClientConnState) error { @@ -201,7 +201,7 @@ func (b *outlierDetectionBalancer) UpdateClientConnState(s balancer.ClientConnSt return fmt.Errorf("balancer %q not registered", lbCfg.ChildPolicy.Name) } - if b.child == nil || b.odCfg.ChildPolicy.Name != lbCfg.ChildPolicy.Name { + if b.child == nil || b.cfg.ChildPolicy.Name != lbCfg.ChildPolicy.Name { b.childMu.Lock() if b.child != nil { b.child.Close() @@ -212,7 +212,7 @@ func (b *outlierDetectionBalancer) UpdateClientConnState(s balancer.ClientConnSt } b.mu.Lock() - b.odCfg = lbCfg + b.cfg = lbCfg // When the outlier_detection LB policy receives an address update, it will // create a map entry for each subchannel address in the list, and remove @@ -220,11 +220,11 @@ func (b *outlierDetectionBalancer) UpdateClientConnState(s balancer.ClientConnSt addrs := make(map[string]bool, len(s.ResolverState.Addresses)) for _, addr := range s.ResolverState.Addresses { addrs[addr.Addr] = true - b.odAddrs[addr.Addr] = newObject() + b.addrs[addr.Addr] = newObject() } - for addr := range b.odAddrs { + for addr := range b.addrs { if !addrs[addr] { - delete(b.odAddrs, addr) + delete(b.addrs, addr) } } @@ -234,16 +234,16 @@ func (b *outlierDetectionBalancer) UpdateClientConnState(s balancer.ClientConnSt var interval time.Duration if b.timerStartTime.IsZero() { b.timerStartTime = time.Now() - for _, obj := range b.odAddrs { + for _, obj := range b.addrs { obj.callCounter.clear() } - interval = b.odCfg.Interval + interval = b.cfg.Interval } else { // If the timer start timestamp is set, instead cancel the existing // timer and start the timer for the configured interval minus the // difference between the current time and the previous start timestamp, // or 0 if that would be negative. - interval = b.odCfg.Interval - (now().Sub(b.timerStartTime)) + interval = b.cfg.Interval - (now().Sub(b.timerStartTime)) if interval < 0 { interval = 0 } @@ -274,7 +274,7 @@ func (b *outlierDetectionBalancer) UpdateClientConnState(s balancer.ClientConnSt defer b.childMu.Unlock() return b.child.UpdateClientConnState(balancer.ClientConnState{ ResolverState: s.ResolverState, - BalancerConfig: b.odCfg.ChildPolicy.Config, + BalancerConfig: b.cfg.ChildPolicy.Config, }) } @@ -337,10 +337,9 @@ func (b *outlierDetectionBalancer) ExitIdle() { // in ClusterImpl. I guess we should do that here? } -// "The outlier_detection LB policy will provide a picker that delegates to the -// child policy's picker, and when the request finishes, increment the -// corresponding counter in the map entry referenced by the subchannel wrapper -// that was picked." - A50 +// wrappedPicker delegates to the child policy's picker, and when the request +// finishes, it increments the corresponding counter in the map entry referenced +// by the subConnWrapper that was picked. type wrappedPicker struct { childPicker balancer.Picker noopPicker bool @@ -381,21 +380,21 @@ func incrementCounter(sc balancer.SubConn, info balancer.DoneInfo) { return } - // scw.obj and callCounter.activeBucket can be written to concurrently (the - // pointers themselves). Thus, protect the reads here with atomics to - // prevent data corruption. There exists a race in which you read the object - // or active bucket pointer and then that pointer points to deprecated - // memory. If this goroutine yields the processor, in between reading the - // object pointer and writing to the active bucket, UpdateAddresses can - // switch the obj the scw points to. Writing to an outdated addresses is a - // very small race and tolerable. After reading callCounter.activeBucket in - // this picker a swap call can concurrently change what activeBucket points - // to. A50 says to swap the pointer, but I decided to make create new memory - // for both active and inactive bucket, and have this race instead write to - // deprecated memory. If you swap the pointers, this write would write to - // the inactive buckets memory, which is read throughout in the interval - // timers algorithm. - obj := (*object)(atomic.LoadPointer(&scw.obj)) + // scw.addressInfo and callCounter.activeBucket can be written to + // concurrently (the pointers themselves). Thus, protect the reads here with + // atomics to prevent data corruption. There exists a race in which you read + // the addressInfo or active bucket pointer and then that pointer points to + // deprecated memory. If this goroutine yields the processor, in between + // reading the addressInfo pointer and writing to the active bucket, + // UpdateAddresses can switch the addressInfo the scw points to. Writing to + // an outdated addresses is a very small race and tolerable. After reading + // callCounter.activeBucket in this picker a swap call can concurrently + // change what activeBucket points to. A50 says to swap the pointer, but I + // decided to make create new memory for both active and inactive bucket, + // and have this race instead write to deprecated memory. If you swap the + // pointers, this write would write to the inactive buckets memory, which is + // read throughout in the interval timers algorithm. + obj := (*addressInfo)(atomic.LoadPointer(&scw.addressInfo)) if obj == nil { return } @@ -432,12 +431,12 @@ func (b *outlierDetectionBalancer) NewSubConn(addrs []resolver.Address, opts bal if len(addrs) != 1 { return scw, nil } - obj, ok := b.odAddrs[addrs[0].Addr] + obj, ok := b.addrs[addrs[0].Addr] if !ok { return scw, nil } obj.sws = append(obj.sws, scw) - atomic.StorePointer(&scw.obj, unsafe.Pointer(obj)) + atomic.StorePointer(&scw.addressInfo, unsafe.Pointer(obj)) // "If that address is currently ejected, that subchannel wrapper's eject // method will be called." - A50 @@ -461,21 +460,21 @@ func (b *outlierDetectionBalancer) RemoveSubConn(sc balancer.SubConn) { // appendIfPresent appends the scw to the address, if the address is present in // the Outlier Detection balancers address map. Returns nil if not present, and // the map entry if present. -func (b *outlierDetectionBalancer) appendIfPresent(addr string, scw *subConnWrapper) *object { - obj, ok := b.odAddrs[addr] +func (b *outlierDetectionBalancer) appendIfPresent(addr string, scw *subConnWrapper) *addressInfo { + obj, ok := b.addrs[addr] if !ok { return nil } obj.sws = append(obj.sws, scw) - atomic.StorePointer(&scw.obj, unsafe.Pointer(obj)) + atomic.StorePointer(&scw.addressInfo, unsafe.Pointer(obj)) return obj } // removeSubConnFromAddressesMapEntry removes the scw from it's map entry if // present. func (b *outlierDetectionBalancer) removeSubConnFromAddressesMapEntry(scw *subConnWrapper) { - obj := (*object)(atomic.LoadPointer(&scw.obj)) + obj := (*addressInfo)(atomic.LoadPointer(&scw.addressInfo)) if obj == nil { return } @@ -527,7 +526,7 @@ func (b *outlierDetectionBalancer) UpdateAddresses(sc balancer.SubConn, addrs [] // 1. Remove Subchannel from Addresses map entry if present in Addresses map. b.removeSubConnFromAddressesMapEntry(scw) // 2. Clear the Subchannel wrapper's Call Counter entry. - obj := (*object)(atomic.LoadPointer(&scw.obj)) + obj := (*addressInfo)(atomic.LoadPointer(&scw.addressInfo)) if obj != nil { obj.callCounter.clear() } @@ -576,24 +575,24 @@ func (b *outlierDetectionBalancer) intervalTimerAlgorithm() { // 2. For each address, swap the call counter's buckets in that address's // map entry. - for _, obj := range b.odAddrs { + for _, obj := range b.addrs { obj.callCounter.swap() } // 3. If the success_rate_ejection configuration field is set, run the // success rate algorithm. - if b.odCfg.SuccessRateEjection != nil { + if b.cfg.SuccessRateEjection != nil { b.successRateAlgorithm() } // 4. If the failure_percentage_ejection configuration field is set, run the // failure percentage algorithm. - if b.odCfg.FailurePercentageEjection != nil { + if b.cfg.FailurePercentageEjection != nil { b.failurePercentageAlgorithm() } // 5. For each address in the map: - for addr, obj := range b.odAddrs { + for addr, obj := range b.addrs { // If the address is not ejected and the multiplier is greater than 0, // decrease the multiplier by 1. if obj.latestEjectionTimestamp.IsZero() && obj.ejectionTimeMultiplier > 0 { @@ -604,7 +603,7 @@ func (b *outlierDetectionBalancer) intervalTimerAlgorithm() { // ejection_timestamp + min(base_ejection_time (type: time.Time) * // multiplier (type: int), max(base_ejection_time (type: time.Time), // max_ejection_time (type: time.Time))), un-eject the address. - if !obj.latestEjectionTimestamp.IsZero() && now().After(obj.latestEjectionTimestamp.Add(time.Duration(min(b.odCfg.BaseEjectionTime.Nanoseconds()*obj.ejectionTimeMultiplier, max(b.odCfg.BaseEjectionTime.Nanoseconds(), b.odCfg.MaxEjectionTime.Nanoseconds()))))) { // need to way to inject a desired bool here at a certain point in tests, mock time.Now to return a late time, mock time.After to always return true... + if !obj.latestEjectionTimestamp.IsZero() && now().After(obj.latestEjectionTimestamp.Add(time.Duration(min(b.cfg.BaseEjectionTime.Nanoseconds()*obj.ejectionTimeMultiplier, max(b.cfg.BaseEjectionTime.Nanoseconds(), b.cfg.MaxEjectionTime.Nanoseconds()))))) { b.unejectAddress(addr) } } @@ -614,7 +613,7 @@ func (b *outlierDetectionBalancer) intervalTimerAlgorithm() { if b.intervalTimer != nil { b.intervalTimer.Stop() } - b.intervalTimer = afterFunc(b.odCfg.Interval, func() { + b.intervalTimer = afterFunc(b.cfg.Interval, func() { b.intervalTimerAlgorithm() }) } @@ -712,8 +711,8 @@ func (b *outlierDetectionBalancer) run() { // the map that have request volume of at least requestVolume. func (b *outlierDetectionBalancer) numAddrsWithAtLeastRequestVolume() uint32 { var numAddrs uint32 - for _, obj := range b.odAddrs { - if uint32(obj.callCounter.inactiveBucket.requestVolume) >= b.odCfg.SuccessRateEjection.RequestVolume { + for _, obj := range b.addrs { + if uint32(obj.callCounter.inactiveBucket.requestVolume) >= b.cfg.SuccessRateEjection.RequestVolume { numAddrs++ } } @@ -728,20 +727,20 @@ func (b *outlierDetectionBalancer) meanAndStdDevOfSuccessesAtLeastRequestVolume( // success_rate_ejection.request_volume. var totalFractionOfSuccessfulRequests float64 var mean float64 - for _, obj := range b.odAddrs { + for _, obj := range b.addrs { // "of at least success_rate_ejection.request_volume" - if uint32(obj.callCounter.inactiveBucket.requestVolume) >= b.odCfg.SuccessRateEjection.RequestVolume { + if uint32(obj.callCounter.inactiveBucket.requestVolume) >= b.cfg.SuccessRateEjection.RequestVolume { totalFractionOfSuccessfulRequests += float64(obj.callCounter.inactiveBucket.numSuccesses) / float64(obj.callCounter.inactiveBucket.requestVolume) } } - mean = totalFractionOfSuccessfulRequests / float64(len(b.odAddrs)) + mean = totalFractionOfSuccessfulRequests / float64(len(b.addrs)) var sumOfSquares float64 - for _, obj := range b.odAddrs { + for _, obj := range b.addrs { devFromMean := (float64(obj.callCounter.inactiveBucket.numSuccesses) / float64(obj.callCounter.inactiveBucket.requestVolume)) - mean sumOfSquares += devFromMean * devFromMean } - variance := sumOfSquares / float64(len(b.odAddrs)) + variance := sumOfSquares / float64(len(b.addrs)) return mean, math.Sqrt(variance) } @@ -750,7 +749,7 @@ func (b *outlierDetectionBalancer) successRateAlgorithm() { // 1. If the number of addresses with request volume of at least // success_rate_ejection.request_volume is less than // success_rate_ejection.minimum_hosts, stop. - if b.numAddrsWithAtLeastRequestVolume() < b.odCfg.SuccessRateEjection.MinimumHosts { // TODO: O(n) search, is there a way to optimize this? + if b.numAddrsWithAtLeastRequestVolume() < b.cfg.SuccessRateEjection.MinimumHosts { // TODO: O(n) search, is there a way to optimize this? return } @@ -760,12 +759,12 @@ func (b *outlierDetectionBalancer) successRateAlgorithm() { mean, stddev := b.meanAndStdDevOfSuccessesAtLeastRequestVolume() // 3. For each address: - for addr, obj := range b.odAddrs { + for addr, obj := range b.addrs { ccb := obj.callCounter.inactiveBucket - sre := b.odCfg.SuccessRateEjection + sre := b.cfg.SuccessRateEjection // i. If the percentage of ejected addresses is greater than // max_ejection_percent, stop. - if float64(b.numAddrsEjected)/float64(len(b.odAddrs))*100 > float64(b.odCfg.MaxEjectionPercent) { + if float64(b.numAddrsEjected)/float64(len(b.addrs))*100 > float64(b.cfg.MaxEjectionPercent) { return } @@ -792,17 +791,17 @@ func (b *outlierDetectionBalancer) successRateAlgorithm() { func (b *outlierDetectionBalancer) failurePercentageAlgorithm() { // 1. If the number of addresses is less than // failure_percentage_ejection.minimum_hosts, stop. - if uint32(len(b.odAddrs)) < b.odCfg.FailurePercentageEjection.MinimumHosts { + if uint32(len(b.addrs)) < b.cfg.FailurePercentageEjection.MinimumHosts { return } // 2. For each address: - for addr, obj := range b.odAddrs { + for addr, obj := range b.addrs { ccb := obj.callCounter.inactiveBucket - fpe := b.odCfg.FailurePercentageEjection + fpe := b.cfg.FailurePercentageEjection // i. If the percentage of ejected addresses is greater than // max_ejection_percent, stop. - if float64(b.numAddrsEjected)/float64(len(b.odAddrs))*100 > float64(b.odCfg.MaxEjectionPercent) { + if float64(b.numAddrsEjected)/float64(len(b.addrs))*100 > float64(b.cfg.MaxEjectionPercent) { return } // ii. If the address's total request volume is less than @@ -814,11 +813,11 @@ func (b *outlierDetectionBalancer) failurePercentageAlgorithm() { // 2c. If the address's failure percentage is greater than // failure_percentage_ejection.threshold failurePercentage := (float64(ccb.numFailures) / float64(ccb.requestVolume)) * 100 - if failurePercentage > float64(b.odCfg.FailurePercentageEjection.Threshold) { + if failurePercentage > float64(b.cfg.FailurePercentageEjection.Threshold) { // then choose a random integer in [0, 100). If that number is less // than failiure_percentage_ejection.enforcement_percentage, eject // that address. - if uint32(grpcrand.Int31n(100)) < b.odCfg.FailurePercentageEjection.EnforcementPercentage { + if uint32(grpcrand.Int31n(100)) < b.cfg.FailurePercentageEjection.EnforcementPercentage { b.ejectAddress(addr) } } @@ -826,7 +825,7 @@ func (b *outlierDetectionBalancer) failurePercentageAlgorithm() { } func (b *outlierDetectionBalancer) ejectAddress(addr string) { - obj, ok := b.odAddrs[addr] + obj, ok := b.addrs[addr] if !ok { // Shouldn't happen return } @@ -844,7 +843,7 @@ func (b *outlierDetectionBalancer) ejectAddress(addr string) { } func (b *outlierDetectionBalancer) unejectAddress(addr string) { - obj, ok := b.odAddrs[addr] + obj, ok := b.addrs[addr] if !ok { // Shouldn't happen return } @@ -861,23 +860,31 @@ func (b *outlierDetectionBalancer) unejectAddress(addr string) { } } -type object struct { - // The call result counter object +// addressInfo contains the runtime information about an address that pertains +// to Outlier Detection, including the counter for successful/failing RPC's, and +// also information about whether the addresses has been ejected, and the +// SubConns that are present that use this address. This struct and all of it's +// fields is protected by outlierDetectionBalancer.mu in the case where it +// accessed through the address map. In the case of Picker callbacks, the writes +// to the activeBucket of callCounter are protected by atomically loading and +// storing unsafe.Pointers (see further explanation in incrementCounter()). +type addressInfo struct { + // The call result counter object. callCounter *callCounter - // The latest ejection timestamp, or null if the address is currently not - // ejected - latestEjectionTimestamp time.Time // We represent the branching logic on the null with a time.Zero() value + // The latest ejection timestamp, or zero if the address is currently not + // ejected. + latestEjectionTimestamp time.Time - // The current ejection time multiplier, starting at 0 + // The current ejection time multiplier, starting at 0. ejectionTimeMultiplier int64 - // A list of subchannel wrapper objects that correspond to this address + // A list of subchannel wrapper objects that correspond to this address. sws []*subConnWrapper } -func newObject() *object { - return &object{ +func newObject() *addressInfo { + return &addressInfo{ callCounter: newCallCounter(), sws: make([]*subConnWrapper, 0), } diff --git a/xds/internal/balancer/outlierdetection/balancer_test.go b/xds/internal/balancer/outlierdetection/balancer_test.go index b84831eecad..181b3946ab7 100644 --- a/xds/internal/balancer/outlierdetection/balancer_test.go +++ b/xds/internal/balancer/outlierdetection/balancer_test.go @@ -399,7 +399,7 @@ func (tb *testClusterImplBalancer) waitForSubConnUpdate(ctx context.Context, wan return err } gotSCS := scs.(subConnWithState) - if !cmp.Equal(gotSCS, wantSCS, cmp.AllowUnexported(subConnWithState{}, testutils.TestSubConn{}, subConnWrapper{}, object{}), cmpopts.IgnoreFields(subConnWrapper{}, "scUpdateCh")) { + if !cmp.Equal(gotSCS, wantSCS, cmp.AllowUnexported(subConnWithState{}, testutils.TestSubConn{}, subConnWrapper{}, addressInfo{}), cmpopts.IgnoreFields(subConnWrapper{}, "scUpdateCh")) { return fmt.Errorf("received SubConnState: %+v, want %+v", gotSCS, wantSCS) } return nil @@ -969,7 +969,7 @@ func (s) TestPicker(t *testing.T) { pi.Done(balancer.DoneInfo{}) pi.Done(balancer.DoneInfo{Err: errors.New("some error")}) od.mu.Lock() - obj, ok := od.odAddrs["address1"] + obj, ok := od.addrs["address1"] if !ok { t.Fatal("map entry for address: address1 not present in map") } @@ -1039,7 +1039,7 @@ func (s) TestPicker(t *testing.T) { pi.Done(balancer.DoneInfo{Err: errors.New("some error")}) pi.Done(balancer.DoneInfo{Err: errors.New("some error")}) od.mu.Lock() - obj, ok := od.odAddrs["address1"] + obj, ok := od.addrs["address1"] if !ok { t.Fatal("map entry for address: address1 not present in map") } @@ -1088,7 +1088,7 @@ func (s) TestPicker(t *testing.T) { pi.Done(balancer.DoneInfo{Err: errors.New("some error")}) pi.Done(balancer.DoneInfo{Err: errors.New("some error")}) od.mu.Lock() - obj, ok := od.odAddrs["address1"] + obj, ok := od.addrs["address1"] if !ok { t.Fatal("map entry for address: address1 not present in map") } diff --git a/xds/internal/balancer/outlierdetection/callcounter.go b/xds/internal/balancer/outlierdetection/callcounter.go index 2c4667d7d1c..7de43792cd7 100644 --- a/xds/internal/balancer/outlierdetection/callcounter.go +++ b/xds/internal/balancer/outlierdetection/callcounter.go @@ -35,10 +35,11 @@ func newCallCounter() *callCounter { } } +// callCounter has two buckets, which each count successes and failures. The +// activeBucket is used to actively count any finished RPC's, and the +// inactiveBucket is populated with this activeBucket's data every interval for +// use by the Outlier Detection algorithm. type callCounter struct { - // "The object contains two buckets, and each bucket has a number counting - // successes, and another counting failures." - A50 - // activeBucket updates every time a call finishes (from picker passed to // Client Conn), so protect pointer read with atomic load of unsafe.Pointer // so picker does not have to grab a mutex per RPC, the critical path. diff --git a/xds/internal/balancer/outlierdetection/subconn_wrapper.go b/xds/internal/balancer/outlierdetection/subconn_wrapper.go index aef343e7243..f2ecbae605e 100644 --- a/xds/internal/balancer/outlierdetection/subconn_wrapper.go +++ b/xds/internal/balancer/outlierdetection/subconn_wrapper.go @@ -25,13 +25,17 @@ import ( "google.golang.org/grpc/resolver" ) +// subConnWrapper wraps every created SubConn in the Outlier Detection Balancer. +// It is used to store whether the SubConn has been ejected or not, and also to +// store the latest state for use when the SubConn gets unejected. It also +// stores the addresses the SubConn was created with to support any change in +// address(es). type subConnWrapper struct { balancer.SubConn - // "The subchannel wrappers created by the outlier_detection LB policy will - // hold a reference to its map entry in the LB policy, if that map entry - // exists." - A50 - obj unsafe.Pointer // *object + // addressInfo is a pointer to the subConnWrapper's corresponding address + // map entry, if the map entry exists. + addressInfo unsafe.Pointer // *addressInfo // These two pieces of state will reach eventual consistency due to sync in // run(), and child will always have the correctly updated SubConnState. latestState balancer.SubConnState @@ -42,8 +46,8 @@ type subConnWrapper struct { addresses []resolver.Address } -// eject(): "The wrapper will report a state update with the TRANSIENT_FAILURE -// state, and will stop passing along updates from the underlying subchannel." +// eject causes the wrapper to report a state update with the TRANSIENT_FAILURE +// state, and to stop passing along updates from the underlying subchannel. func (scw *subConnWrapper) eject() { scw.scUpdateCh.Put(&ejectedUpdate{ scw: scw, @@ -51,9 +55,9 @@ func (scw *subConnWrapper) eject() { }) } -// uneject(): "The wrapper will report a state update with the latest update +// uneject causes the wrapper to report a state update with the latest update // from the underlying subchannel, and resume passing along updates from the -// underlying subchannel." +// underlying subchannel. func (scw *subConnWrapper) uneject() { scw.scUpdateCh.Put(&ejectedUpdate{ scw: scw,