diff --git a/internal/grpcrand/grpcrand.go b/internal/grpcrand/grpcrand.go index 740f83c2b766..517ea70642a1 100644 --- a/internal/grpcrand/grpcrand.go +++ b/internal/grpcrand/grpcrand.go @@ -52,6 +52,13 @@ func Intn(n int) int { return r.Intn(n) } +// Int31n implements rand.Int31n on the grpcrand global source. +func Int31n(n int32) int32 { + mu.Lock() + defer mu.Unlock() + return r.Int31n(n) +} + // Float64 implements rand.Float64 on the grpcrand global source. func Float64() float64 { mu.Lock() diff --git a/test/xds/xds_client_outlier_detection_test.go b/test/xds/xds_client_outlier_detection_test.go new file mode 100644 index 000000000000..d500de03c808 --- /dev/null +++ b/test/xds/xds_client_outlier_detection_test.go @@ -0,0 +1,76 @@ +/* + * + * Copyright 2022 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package xds_test + +import ( + "context" + "fmt" + "testing" + + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/internal" + "google.golang.org/grpc/internal/envconfig" + "google.golang.org/grpc/internal/testutils/xds/e2e" + + testgrpc "google.golang.org/grpc/test/grpc_testing" + testpb "google.golang.org/grpc/test/grpc_testing" +) + +func (s) TestOutlierDetection(t *testing.T) { + oldOD := envconfig.XDSOutlierDetection + envconfig.XDSOutlierDetection = true + internal.RegisterOutlierDetectionBalancerForTesting() + defer func() { + envconfig.XDSOutlierDetection = oldOD + internal.UnregisterOutlierDetectionBalancerForTesting() + }() + + managementServer, nodeID, _, resolver, cleanup1 := e2e.SetupManagementServer(t) + defer cleanup1() + + port, cleanup2 := startTestService(t, nil) + defer cleanup2() + + const serviceName = "my-service-client-side-xds" + resources := e2e.DefaultClientResources(e2e.ResourceParams{ + DialTarget: serviceName, + NodeID: nodeID, + Host: "localhost", + Port: port, + SecLevel: e2e.SecurityLevelNone, + }) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if err := managementServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + // Create a ClientConn and make a successful RPC. + cc, err := grpc.Dial(fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(resolver)) + if err != nil { + t.Fatalf("failed to dial local test server: %v", err) + } + defer cc.Close() + + client := testgrpc.NewTestServiceClient(cc) + if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { + t.Fatalf("rpc EmptyCall() failed: %v", err) + } +} diff --git a/xds/internal/balancer/outlierdetection/balancer.go b/xds/internal/balancer/outlierdetection/balancer.go index 8729461383e9..6c99f94e049d 100644 --- a/xds/internal/balancer/outlierdetection/balancer.go +++ b/xds/internal/balancer/outlierdetection/balancer.go @@ -25,13 +25,29 @@ import ( "encoding/json" "errors" "fmt" + "math" + "sync" + "sync/atomic" + "time" + "unsafe" "google.golang.org/grpc/balancer" + "google.golang.org/grpc/connectivity" "google.golang.org/grpc/internal" + "google.golang.org/grpc/internal/buffer" "google.golang.org/grpc/internal/envconfig" + "google.golang.org/grpc/internal/grpcrand" + "google.golang.org/grpc/internal/grpcsync" + "google.golang.org/grpc/resolver" "google.golang.org/grpc/serviceconfig" ) +// Globals to stub out in tests. +var ( + afterFunc = time.AfterFunc + now = time.Now +) + // Name is the name of the outlier detection balancer. const Name = "outlier_detection_experimental" @@ -51,7 +67,18 @@ func init() { type bb struct{} func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer { - return nil + am := resolver.NewAddressMap() + b := &outlierDetectionBalancer{ + cc: cc, + bOpts: bOpts, + closed: grpcsync.NewEvent(), + odAddrs: am, + scWrappers: make(map[balancer.SubConn]*subConnWrapper), + scUpdateCh: buffer.NewUnbounded(), + pickerUpdateCh: buffer.NewUnbounded(), + } + go b.run() + return b } func (bb) ParseConfig(s json.RawMessage) (serviceconfig.LoadBalancingConfig, error) { @@ -82,6 +109,7 @@ func (bb) ParseConfig(s json.RawMessage) (serviceconfig.LoadBalancingConfig, err return nil, fmt.Errorf("OutlierDetectionLoadBalancingConfig.base_ejection_time = %s; must be >= 0", lbCfg.BaseEjectionTime) case lbCfg.MaxEjectionTime < 0: return nil, fmt.Errorf("OutlierDetectionLoadBalancingConfig.max_ejection_time = %s; must be >= 0", lbCfg.MaxEjectionTime) + // "The fields max_ejection_percent, // success_rate_ejection.enforcement_percentage, // failure_percentage_ejection.threshold, and @@ -105,3 +133,827 @@ func (bb) ParseConfig(s json.RawMessage) (serviceconfig.LoadBalancingConfig, err func (bb) Name() string { return Name } + +// scUpdate wraps a subConn update to be sent to the child balancer. +type scUpdate struct { + scw *subConnWrapper + state balancer.SubConnState +} + +type ejectedUpdate struct { + scw *subConnWrapper + ejected bool // true for ejected, false for unejected +} + +type outlierDetectionBalancer struct { + numAddrsEjected int // For fast calculations of percentage of addrs ejected + + childState balancer.State + recentPickerNoop bool + + closed *grpcsync.Event + cc balancer.ClientConn + bOpts balancer.BuildOptions + + // childMu protects child and also updates to the child (to uphold the + // balancer.Balancer API guarantee of synchronous calls). It also protects + // against run() reading that the child is not nil for SubConn updates, and + // then UpdateClientConnState or Close writing to the the child. + childMu sync.Mutex + child balancer.Balancer + + // mu guards access to a lot of the core LB Policy State. It also prevents + // intersplicing certain operations. + // + // ex 1: interval timer goes off, outlier detection algorithm starts running + // based on knobs in odCfg. in the middle of running the algorithm, a + // ClientConn update comes in and writes to odCfg. This causes undefined + // behavior for the interval timer algorithm. + // + // ex 2: Updating the odAddrs map from UpdateAddresses in the middle of + // running the interval timer algorithm which uses odAddrs heavily. This + // will cause undefined behavior for the interval timer algorithm. + mu sync.Mutex + odAddrs *resolver.AddressMap + odCfg *LBConfig + scWrappers map[balancer.SubConn]*subConnWrapper + timerStartTime time.Time + intervalTimer *time.Timer + + scUpdateCh *buffer.Unbounded + pickerUpdateCh *buffer.Unbounded +} + +// noopConfig returns whether this balancer is configured with a logical no-op +// configuration or not. +func (b *outlierDetectionBalancer) noopConfig() bool { + return b.odCfg.SuccessRateEjection == nil && b.odCfg.FailurePercentageEjection == nil +} + +func (b *outlierDetectionBalancer) UpdateClientConnState(s balancer.ClientConnState) error { + lbCfg, ok := s.BalancerConfig.(*LBConfig) + if !ok { + return balancer.ErrBadResolverState + } + + // Reject whole config if any errors, don't persist it for later + bb := balancer.Get(lbCfg.ChildPolicy.Name) + if bb == nil { + return fmt.Errorf("balancer %q not registered", lbCfg.ChildPolicy.Name) + } + + if b.child == nil || b.odCfg.ChildPolicy.Name != lbCfg.ChildPolicy.Name { + b.childMu.Lock() + if b.child != nil { + b.child.Close() + } + // What if this is nil? Seems fine + b.child = bb.Build(b, b.bOpts) + b.childMu.Unlock() + } + + b.mu.Lock() + b.odCfg = lbCfg + + // When the outlier_detection LB policy receives an address update, it will + // create a map entry for each subchannel address in the list, and remove + // each map entry for a subchannel address not in the list. + addrs := make(map[resolver.Address]bool) + for _, addr := range s.ResolverState.Addresses { + addrs[addr] = true + b.odAddrs.Set(addr, newObject()) + } + for _, addr := range b.odAddrs.Keys() { + if !addrs[addr] { + b.odAddrs.Delete(addr) + } + } + + // When a new config is provided, if the timer start timestamp is unset, set + // it to the current time and start the timer for the configured interval, + // then for each address, reset the call counters. + var interval time.Duration + if b.timerStartTime.IsZero() { + b.timerStartTime = time.Now() + for _, obj := range b.objects() { + obj.callCounter.clear() + } + interval = b.odCfg.Interval + } else { + // If the timer start timestamp is set, instead cancel the existing + // timer and start the timer for the configured interval minus the + // difference between the current time and the previous start timestamp, + // or 0 if that would be negative. + interval = b.odCfg.Interval - (now().Sub(b.timerStartTime)) + if interval < 0 { + interval = 0 + } + } + + if !b.noopConfig() { + if b.intervalTimer != nil { + b.intervalTimer.Stop() + } + b.intervalTimer = afterFunc(interval, func() { + b.intervalTimerAlgorithm() + }) + } else { + // "If a config is provided with both the `success_rate_ejection` and + // `failure_percentage_ejection` fields unset, skip starting the timer and + // unset the timer start timestamp." + b.timerStartTime = time.Time{} + // Should we stop the timer here as well? Not defined in gRFC but I feel + // like it might make sense as you don't want to eject addresses. Also + // how will addresses eventually get unejected in this case if only one + // more pass of the interval timer after no-op configuration comes in? + } + b.mu.Unlock() + b.pickerUpdateCh.Put(lbCfg) + + // then pass the address list along to the child policy. + b.childMu.Lock() + defer b.childMu.Unlock() + return b.child.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: s.ResolverState, + BalancerConfig: b.odCfg.ChildPolicy.Config, + }) +} + +func (b *outlierDetectionBalancer) ResolverError(err error) { + if b.child != nil { + b.childMu.Lock() + defer b.childMu.Unlock() + b.child.ResolverError(err) + } +} + +func (b *outlierDetectionBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) { + b.mu.Lock() + defer b.mu.Unlock() + scw, ok := b.scWrappers[sc] + if !ok { + // Return, shouldn't happen if passed up scw + return + } + if state.ConnectivityState == connectivity.Shutdown { + delete(b.scWrappers, scw.SubConn) + } + b.scUpdateCh.Put(&scUpdate{ + scw: scw, + state: state, + }) + +} + +func (b *outlierDetectionBalancer) Close() { + b.closed.Fire() + if b.child != nil { + b.childMu.Lock() + b.child.Close() + b.child = nil + b.childMu.Unlock() + } + + // Any other cleanup needs to happen (subconns, other resources?) + b.mu.Lock() + defer b.mu.Unlock() + if b.intervalTimer != nil { + b.intervalTimer.Stop() + } +} + +func (b *outlierDetectionBalancer) ExitIdle() { + if b.child == nil { + return + } + if ei, ok := b.child.(balancer.ExitIdler); ok { + b.childMu.Lock() + defer b.childMu.Unlock() + ei.ExitIdle() + return + } + + // Fallback for children handled in clusterimpl balancer Removing SubConns + // is defined in API and also in graceful switch balancer, but already done + // in ClusterImpl. I guess we should do that here? +} + +// "The outlier_detection LB policy will provide a picker that delegates to the +// child policy's picker, and when the request finishes, increment the +// corresponding counter in the map entry referenced by the subchannel wrapper +// that was picked." - A50 +type wrappedPicker struct { + childPicker balancer.Picker + noopPicker bool +} + +func (wp *wrappedPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { + pr, err := wp.childPicker.Pick(info) + if err != nil { + return balancer.PickResult{}, err + } + + done := func(di balancer.DoneInfo) { + if !wp.noopPicker { + incrementCounter(pr.SubConn, di) + } + if pr.Done != nil { + pr.Done(di) + } + } + // Shouldn't happen, defensive programming. + scw, ok := pr.SubConn.(*subConnWrapper) + if !ok { + return balancer.PickResult{ + SubConn: pr.SubConn, + Done: done, + }, nil + } + return balancer.PickResult{ + SubConn: scw.SubConn, + Done: done, + }, nil +} + +func incrementCounter(sc balancer.SubConn, info balancer.DoneInfo) { + scw, ok := sc.(*subConnWrapper) + if !ok { + // Shouldn't happen, as comes from child + return + } + + // scw.obj and callCounter.activeBucket can be written to concurrently (the + // pointers themselves). Thus, protect the reads here with atomics to + // prevent data corruption. There exists a race in which you read the object + // or active bucket pointer and then that pointer points to deprecated + // memory. If this goroutine yields the processor, in between reading the + // object pointer and writing to the active bucket, UpdateAddresses can + // switch the obj the scw points to. Writing to an outdated addresses is a + // very small race and tolerable. After reading callCounter.activeBucket in + // this picker a swap call can concurrently change what activeBucket points + // to. A50 says to swap the pointer, but I decided to make create new memory + // for both active and inactive bucket, and have this race instead write to + // deprecated memory. If you swap the pointers, this write would write to + // the inactive buckets memory, which is read throughout in the interval + // timers algorithm. + obj := (*object)(atomic.LoadPointer(&scw.obj)) + if obj == nil { + return + } + ab := (*bucket)(atomic.LoadPointer(&obj.callCounter.activeBucket)) + + if info.Err == nil { + atomic.AddInt64(&ab.numSuccesses, 1) + } else { + atomic.AddInt64(&ab.numFailures, 1) + } + atomic.AddInt64(&ab.requestVolume, 1) +} + +func (b *outlierDetectionBalancer) UpdateState(s balancer.State) { + b.pickerUpdateCh.Put(s) + +} + +func (b *outlierDetectionBalancer) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) { + // "When the child policy asks for a subchannel, the outlier_detection will + // wrap the subchannel with a wrapper." - A50 + sc, err := b.cc.NewSubConn(addrs, opts) + if err != nil { + return nil, err + } + scw := &subConnWrapper{ + SubConn: sc, + addresses: addrs, + scUpdateCh: b.scUpdateCh, + } + b.mu.Lock() + defer b.mu.Unlock() + b.scWrappers[sc] = scw + if len(addrs) != 1 { + return scw, nil + } + + val, ok := b.odAddrs.Get(addrs[0]) + if !ok { + return scw, nil + } + + obj, ok := val.(*object) + if !ok { + return scw, nil + } + obj.sws = append(obj.sws, scw) + atomic.StorePointer(&scw.obj, unsafe.Pointer(obj)) + + // "If that address is currently ejected, that subchannel wrapper's eject + // method will be called." - A50 + if !obj.latestEjectionTimestamp.IsZero() { + scw.eject() + } + return scw, nil +} + +func (b *outlierDetectionBalancer) RemoveSubConn(sc balancer.SubConn) { + scw, ok := sc.(*subConnWrapper) + if !ok { // Shouldn't happen + return + } + // Remove the wrapped SubConn from the parent Client Conn. We don't remove + // from map entry until we get a Shutdown state for the SubConn, as we need + // that data to forward that state down. + b.cc.RemoveSubConn(scw.SubConn) +} + +// appendIfPresent appends the scw to the address, if the address is present in +// the Outlier Detection balancers address map. Returns nil if not present, and +// the map entry if present. +func (b *outlierDetectionBalancer) appendIfPresent(addr resolver.Address, scw *subConnWrapper) *object { + val, ok := b.odAddrs.Get(addr) + if !ok { + return nil + } + obj, ok := val.(*object) + if !ok { + // shouldn't happen, logical no-op + return nil + } + obj.sws = append(obj.sws, scw) + atomic.StorePointer(&scw.obj, unsafe.Pointer(obj)) + return obj +} + +// removeSubConnFromAddressesMapEntry removes the scw from it's map entry if +// present. +func (b *outlierDetectionBalancer) removeSubConnFromAddressesMapEntry(scw *subConnWrapper) { + obj := (*object)(atomic.LoadPointer(&scw.obj)) + if obj == nil { + return + } + for i, sw := range obj.sws { + if scw == sw { + obj.sws = append(obj.sws[:i], obj.sws[i+1:]...) + break + } + } +} + +// sameAddrForMap returns if two addresses are the same in regards to subchannel +// uniqueness/identity (i.e. what the addresses map is keyed on - address +// string, Server Name, and Attributes). +func sameAddrForMap(oldAddr resolver.Address, newAddr resolver.Address) bool { + if oldAddr.Addr != newAddr.Addr { + return false + } + if oldAddr.ServerName != newAddr.ServerName { + return false + } + return oldAddr.Attributes.Equal(newAddr.Attributes) +} + +func (b *outlierDetectionBalancer) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) { + scw, ok := sc.(*subConnWrapper) + if !ok { + // Return, shouldn't happen if passed up scw + return + } + + b.cc.UpdateAddresses(scw.SubConn, addrs) + b.mu.Lock() + defer b.mu.Unlock() + + // Note that 0 addresses is a valid update/state for a SubConn to be in. + // This is correctly handled by this algorithm (handled as part of a non singular + // old address/new address). + if len(scw.addresses) == 1 { + if len(addrs) == 1 { // single address to single address + // If everything we care for in regards to address specificity for a + // list of SubConn's (Addr, ServerName, Attributes) is the same, + // then there is nothing to do past this point. + if sameAddrForMap(scw.addresses[0], addrs[0]) { + return + } + // 1. Remove Subchannel from Addresses map entry if present in Addresses map. + b.removeSubConnFromAddressesMapEntry(scw) + // 2. Add Subchannel to Addresses map entry if new address present in map. + obj := b.appendIfPresent(addrs[0], scw) + // 3. Relay state with eject() recalculated (using the corresponding + // map entry to see if it's currently ejected). + if obj == nil { // uneject unconditionally because could have come from an ejected address + scw.eject() + } else { + if obj.latestEjectionTimestamp.IsZero() { // relay new updated subconn state + scw.uneject() + } else { + scw.eject() + } + } + } else { // single address to multiple addresses + // 1. Remove Subchannel from Addresses map entry if present in Addresses map. + b.removeSubConnFromAddressesMapEntry(scw) + // 2. Clear the Subchannel wrapper's Call Counter entry. + obj := (*object)(atomic.LoadPointer(&scw.obj)) + if obj != nil { + obj.callCounter.clear() + } + // 3. Uneject the Subchannel in case it was previously ejected. + scw.uneject() + } + } else { + if len(addrs) == 1 { // multiple addresses to single address + // 1. Add Subchannel to Addresses map entry if new address present in map. + obj := b.appendIfPresent(addrs[0], scw) + if obj != nil && !obj.latestEjectionTimestamp.IsZero() { + scw.eject() + } + } // else is multiple to multiple - no op, continued to be ignored by outlier detection. + } + + scw.addresses = addrs +} + +func (b *outlierDetectionBalancer) ResolveNow(opts resolver.ResolveNowOptions) { + b.cc.ResolveNow(opts) +} + +func (b *outlierDetectionBalancer) Target() string { + return b.cc.Target() +} + +// objects returns a list of objects corresponding to every address in the address map. +func (b *outlierDetectionBalancer) objects() []*object { + var objs []*object + for _, addr := range b.odAddrs.Keys() { + val, ok := b.odAddrs.Get(addr) + if !ok { // Shouldn't happen + continue + } + obj, ok := val.(*object) + if !ok { + continue + } + objs = append(objs, obj) + } + return objs +} + +func max(x, y int64) int64 { + if x < y { + return y + } + return x +} + +func min(x, y int64) int64 { + if x < y { + return x + } + return y +} + +func (b *outlierDetectionBalancer) intervalTimerAlgorithm() { + b.mu.Lock() + defer b.mu.Unlock() + b.timerStartTime = time.Now() + + // 2. For each address, swap the call counter's buckets in that address's + // map entry. + for _, obj := range b.objects() { + obj.callCounter.swap() + } + + // 3. If the success_rate_ejection configuration field is set, run the + // success rate algorithm. + if b.odCfg.SuccessRateEjection != nil { + b.successRateAlgorithm() + } + + // 4. If the failure_percentage_ejection configuration field is set, run the + // failure percentage algorithm. + if b.odCfg.FailurePercentageEjection != nil { + b.failurePercentageAlgorithm() + } + + // 5. For each address in the map: + for _, addr := range b.odAddrs.Keys() { + val, ok := b.odAddrs.Get(addr) + if !ok { + continue + } + obj, ok := val.(*object) + if !ok { + continue + } + // If the address is not ejected and the multiplier is greater than 0, + // decrease the multiplier by 1. + if obj.latestEjectionTimestamp.IsZero() && obj.ejectionTimeMultiplier > 0 { + obj.ejectionTimeMultiplier-- + continue + } + // If the address is ejected, and the current time is after + // ejection_timestamp + min(base_ejection_time (type: time.Time) * + // multiplier (type: int), max(base_ejection_time (type: time.Time), + // max_ejection_time (type: time.Time))), un-eject the address. + if !obj.latestEjectionTimestamp.IsZero() && now().After(obj.latestEjectionTimestamp.Add(time.Duration(min(b.odCfg.BaseEjectionTime.Nanoseconds()*obj.ejectionTimeMultiplier, max(b.odCfg.BaseEjectionTime.Nanoseconds(), b.odCfg.MaxEjectionTime.Nanoseconds()))))) { // need to way to inject a desired bool here at a certain point in tests, mock time.Now to return a late time, mock time.After to always return true... + b.unejectAddress(addr) + } + } + // This conditional only for testing (since the interval timer algorithm is + // called manually), will never hit in production. + if b.intervalTimer != nil { + b.intervalTimer.Stop() + } + b.intervalTimer = afterFunc(b.odCfg.Interval, func() { + b.intervalTimerAlgorithm() + }) +} + +func (b *outlierDetectionBalancer) run() { + for { + select { + case update := <-b.scUpdateCh.Get(): + b.scUpdateCh.Load() + switch u := update.(type) { + case *scUpdate: + scw := u.scw + scw.latestState = u.state + b.childMu.Lock() + if !scw.ejected && b.child != nil { + b.child.UpdateSubConnState(scw, u.state) + } + b.childMu.Unlock() + case *ejectedUpdate: + scw := u.scw + scw.ejected = u.ejected + var stateToUpdate balancer.SubConnState + if u.ejected { + // "The wrapper will report a state update with the + // TRANSIENT_FAILURE state, and will stop passing along + // updates from the underlying subchannel." + stateToUpdate = balancer.SubConnState{ + ConnectivityState: connectivity.TransientFailure, + } + } else { + // "The wrapper will report a state update with the latest + // update from the underlying subchannel, and resume passing + // along updates from the underlying subchannel." + stateToUpdate = scw.latestState // If this has never been written to will send connectivity IDLE which seems fine to me + } + b.childMu.Lock() + if b.child != nil { + b.child.UpdateSubConnState(scw, stateToUpdate) + } + b.childMu.Unlock() + } + case update := <-b.pickerUpdateCh.Get(): + b.pickerUpdateCh.Load() + if b.closed.HasFired() { // don't send picker updates to grpc after the balancer has been closed + return + } + switch u := update.(type) { + case balancer.State: + b.childState = u + b.mu.Lock() + noopCfg := b.noopConfig() + b.mu.Unlock() + b.recentPickerNoop = noopCfg + b.cc.UpdateState(balancer.State{ + ConnectivityState: b.childState.ConnectivityState, + // The outlier_detection LB policy will provide a picker that delegates to + // the child policy's picker, and when the request finishes, increment the + // corresponding counter in the map entry referenced by the subchannel + // wrapper that was picked. + Picker: &wrappedPicker{ + childPicker: b.childState.Picker, + // If both the `success_rate_ejection` and + // `failure_percentage_ejection` fields are unset in the + // configuration, the picker should not do that counting. + noopPicker: noopCfg, + }, + }) + case *LBConfig: + noopCfg := u.SuccessRateEjection == nil && u.FailurePercentageEjection == nil + if b.childState.Picker != nil && noopCfg != b.recentPickerNoop { + b.recentPickerNoop = noopCfg + b.cc.UpdateState(balancer.State{ + ConnectivityState: b.childState.ConnectivityState, + // The outlier_detection LB policy will provide a picker that delegates to + // the child policy's picker, and when the request finishes, increment the + // corresponding counter in the map entry referenced by the subchannel + // wrapper that was picked. + Picker: &wrappedPicker{ + childPicker: b.childState.Picker, + // If both the `success_rate_ejection` and + // `failure_percentage_ejection` fields are unset in the + // configuration, the picker should not do that counting. + noopPicker: noopCfg, + }, + }) + } + } + case <-b.closed.Done(): + return + } + } +} + +// numAddrsWithAtLeastRequestVolume returns the number of addresses present in +// the map that have request volume of at least requestVolume. +func (b *outlierDetectionBalancer) numAddrsWithAtLeastRequestVolume() uint32 { + var numAddrs uint32 + for _, obj := range b.objects() { + if uint32(obj.callCounter.inactiveBucket.requestVolume) >= b.odCfg.SuccessRateEjection.RequestVolume { + numAddrs++ + } + } + return numAddrs +} + +// meanAndStdDevOfSucceseesAtLeastRequestVolume returns the mean and std dev of +// the number of requests of addresses that have at least requestVolume. +func (b *outlierDetectionBalancer) meanAndStdDevOfSuccessesAtLeastRequestVolume() (float64, float64) { + // 2. Calculate the mean and standard deviation of the fractions of + // successful requests among addresses with total request volume of at least + // success_rate_ejection.request_volume. + var totalFractionOfSuccessfulRequests float64 + var mean float64 + + for _, obj := range b.objects() { + // "of at least success_rate_ejection.request_volume" + if uint32(obj.callCounter.inactiveBucket.requestVolume) >= b.odCfg.SuccessRateEjection.RequestVolume { + totalFractionOfSuccessfulRequests += float64(obj.callCounter.inactiveBucket.numSuccesses) / float64(obj.callCounter.inactiveBucket.requestVolume) + } + } + mean = totalFractionOfSuccessfulRequests / float64(b.odAddrs.Len()) + var sumOfSquares float64 + for _, obj := range b.objects() { + devFromMean := (float64(obj.callCounter.inactiveBucket.numSuccesses) / float64(obj.callCounter.inactiveBucket.requestVolume)) - mean + sumOfSquares += devFromMean * devFromMean + } + + variance := sumOfSquares / float64(b.odAddrs.Len()) + return mean, math.Sqrt(variance) + +} + +func (b *outlierDetectionBalancer) successRateAlgorithm() { + // 1. If the number of addresses with request volume of at least + // success_rate_ejection.request_volume is less than + // success_rate_ejection.minimum_hosts, stop. + if b.numAddrsWithAtLeastRequestVolume() < b.odCfg.SuccessRateEjection.MinimumHosts { // TODO: O(n) search, is there a way to optimize this? + return + } + + // 2. Calculate the mean and standard deviation of the fractions of + // successful requests among addresses with total request volume of at least + // success_rate_ejection.request_volume. + mean, stddev := b.meanAndStdDevOfSuccessesAtLeastRequestVolume() + + // 3. For each address: + for _, addr := range b.odAddrs.Keys() { + val, ok := b.odAddrs.Get(addr) + if !ok { + continue + } + obj, ok := val.(*object) + if !ok { + continue + } + ccb := obj.callCounter.inactiveBucket + sre := b.odCfg.SuccessRateEjection + // i. If the percentage of ejected addresses is greater than + // max_ejection_percent, stop. + if float64(b.numAddrsEjected)/float64(b.odAddrs.Len())*100 > float64(b.odCfg.MaxEjectionPercent) { + return + } + + // ii. If the address's total request volume is less than + // success_rate_ejection.request_volume, continue to the next address. + if ccb.requestVolume < int64(sre.RequestVolume) { + continue + } + + // iii. If the address's success rate is less than (mean - stdev * + // (success_rate_ejection.stdev_factor / 1000)) + successRate := float64(ccb.numSuccesses) / float64(ccb.requestVolume) + if successRate < (mean - stddev*(float64(sre.StdevFactor)/1000)) { + // then choose a random integer in [0, 100). If that number is less + // than success_rate_ejection.enforcement_percentage, eject that + // address. + if uint32(grpcrand.Int31n(100)) < sre.EnforcementPercentage { + b.ejectAddress(addr) + } + } + } +} + +func (b *outlierDetectionBalancer) failurePercentageAlgorithm() { + // 1. If the number of addresses is less than + // failure_percentage_ejection.minimum_hosts, stop. + if uint32(b.odAddrs.Len()) < b.odCfg.FailurePercentageEjection.MinimumHosts { + return + } + + // 2. For each address: + for _, addr := range b.odAddrs.Keys() { + val, ok := b.odAddrs.Get(addr) + if !ok { + continue + } + obj, ok := val.(*object) + if !ok { + continue + } + ccb := obj.callCounter.inactiveBucket + fpe := b.odCfg.FailurePercentageEjection + // i. If the percentage of ejected addresses is greater than + // max_ejection_percent, stop. + if float64(b.numAddrsEjected)/float64(b.odAddrs.Len())*100 > float64(b.odCfg.MaxEjectionPercent) { + return + } + // ii. If the address's total request volume is less than + // failure_percentage_ejection.request_volume, continue to the next + // address. + if uint32(ccb.requestVolume) < fpe.RequestVolume { + continue + } + // 2c. If the address's failure percentage is greater than + // failure_percentage_ejection.threshold + failurePercentage := (float64(ccb.numFailures) / float64(ccb.requestVolume)) * 100 + if failurePercentage > float64(b.odCfg.FailurePercentageEjection.Threshold) { + // then choose a random integer in [0, 100). If that number is less + // than failiure_percentage_ejection.enforcement_percentage, eject + // that address. + if uint32(grpcrand.Int31n(100)) < b.odCfg.FailurePercentageEjection.EnforcementPercentage { + b.ejectAddress(addr) + } + } + } +} + +func (b *outlierDetectionBalancer) ejectAddress(addr resolver.Address) { + val, ok := b.odAddrs.Get(addr) + if !ok { // Shouldn't happen + return + } + obj, ok := val.(*object) + if !ok { // Shouldn't happen + return + } + + b.numAddrsEjected++ + + // To eject an address, set the current ejection timestamp to the timestamp + // that was recorded when the timer fired, increase the ejection time + // multiplier by 1, and call eject() on each subchannel wrapper in that + // address's subchannel wrapper list. + obj.latestEjectionTimestamp = b.timerStartTime + obj.ejectionTimeMultiplier++ + for _, sbw := range obj.sws { + sbw.eject() + } +} + +func (b *outlierDetectionBalancer) unejectAddress(addr resolver.Address) { + val, ok := b.odAddrs.Get(addr) + if !ok { // Shouldn't happen + return + } + obj, ok := val.(*object) + if !ok { // Shouldn't happen + return + } + b.numAddrsEjected-- + + // To un-eject an address, set the current ejection timestamp to null + // (doesn't he mean latest ejection timestamp?, in Golang null for time is + // logically equivalent in practice to the time zero value) and call + // uneject() on each subchannel wrapper in that address's subchannel wrapper + // list. + obj.latestEjectionTimestamp = time.Time{} + for _, sbw := range obj.sws { + sbw.uneject() + } +} + +type object struct { + // The call result counter object + callCounter *callCounter + + // The latest ejection timestamp, or null if the address is currently not + // ejected + latestEjectionTimestamp time.Time // We represent the branching logic on the null with a time.Zero() value + + // The current ejection time multiplier, starting at 0 + ejectionTimeMultiplier int64 + + // A list of subchannel wrapper objects that correspond to this address + sws []*subConnWrapper +} + +func newObject() *object { + return &object{ + callCounter: newCallCounter(), + sws: make([]*subConnWrapper, 0), + } +} diff --git a/xds/internal/balancer/outlierdetection/balancer_test.go b/xds/internal/balancer/outlierdetection/balancer_test.go index 106e2b64dbc2..16ae32cdc83b 100644 --- a/xds/internal/balancer/outlierdetection/balancer_test.go +++ b/xds/internal/balancer/outlierdetection/balancer_test.go @@ -19,20 +19,34 @@ package outlierdetection import ( + "context" "encoding/json" "errors" + "fmt" "strings" + "sync" "testing" "time" "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" "google.golang.org/grpc/balancer" + "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/internal" + "google.golang.org/grpc/internal/grpcsync" "google.golang.org/grpc/internal/grpctest" internalserviceconfig "google.golang.org/grpc/internal/serviceconfig" + "google.golang.org/grpc/internal/testutils" + "google.golang.org/grpc/resolver" "google.golang.org/grpc/serviceconfig" "google.golang.org/grpc/xds/internal/balancer/clusterimpl" ) +var ( + defaultTestTimeout = 5 * time.Second + defaultTestShortTimeout = 10 * time.Millisecond +) + type s struct { grpctest.Tester } @@ -272,6 +286,9 @@ func (lbc *LBConfig) Equal(lbc2 *LBConfig) bool { func init() { balancer.Register(errParseConfigBuilder{}) + balancer.Register(errParseConfigBuilder{}) + balancer.Register(tcibBuilder{}) + balancer.Register(verifyBalancerBuilder{}) } type errParseConfigBuilder struct{} @@ -287,3 +304,1780 @@ func (errParseConfigBuilder) Name() string { func (errParseConfigBuilder) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, error) { return nil, errors.New("some error") } + +func setup(t *testing.T) (*outlierDetectionBalancer, *testutils.TestClientConn) { + t.Helper() + builder := balancer.Get(Name) + if builder == nil { + t.Fatalf("balancer.Get(%q) returned nil", Name) + } + tcc := testutils.NewTestClientConn(t) + odB := builder.Build(tcc, balancer.BuildOptions{}) + return odB.(*outlierDetectionBalancer), tcc +} + +const tcibname = "testClusterImplBalancer" +const verifyBalancerName = "verifyBalancer" + +type tcibBuilder struct{} + +func (tcibBuilder) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer { + return &testClusterImplBalancer{ + ccsCh: testutils.NewChannel(), + scStateCh: testutils.NewChannel(), + resolverErrCh: testutils.NewChannel(), + closeCh: testutils.NewChannel(), + exitIdleCh: testutils.NewChannel(), + cc: cc, + } +} + +func (tcibBuilder) Name() string { + return tcibname +} + +type testClusterImplBalancerConfig struct { + serviceconfig.LoadBalancingConfig +} + +type testClusterImplBalancer struct { + // ccsCh is a channel used to signal the receipt of a ClientConn update. + ccsCh *testutils.Channel + // scStateCh is a channel used to signal the receipt of a SubConn update. + scStateCh *testutils.Channel + // resolverErrCh is a channel used to signal a resolver error. + resolverErrCh *testutils.Channel + // closeCh is a channel used to signal the closing of this balancer. + closeCh *testutils.Channel + exitIdleCh *testutils.Channel + // cc is the balancer.ClientConn passed to this test balancer as part of the + // Build() call. + cc balancer.ClientConn +} + +type subConnWithState struct { + sc balancer.SubConn + state balancer.SubConnState +} + +func (tb *testClusterImplBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error { + tb.ccsCh.Send(ccs) + return nil +} + +func (tb *testClusterImplBalancer) ResolverError(err error) { + tb.resolverErrCh.Send(err) +} + +func (tb *testClusterImplBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) { + tb.scStateCh.Send(subConnWithState{sc: sc, state: state}) +} + +func (tb *testClusterImplBalancer) Close() { + tb.closeCh.Send(struct{}{}) +} + +// waitForClientConnUpdate verifies if the testClusterImplBalancer receives the +// provided ClientConnState within a reasonable amount of time. +func (tb *testClusterImplBalancer) waitForClientConnUpdate(ctx context.Context, wantCCS balancer.ClientConnState) error { + ccs, err := tb.ccsCh.Receive(ctx) + if err != nil { + return err + } + gotCCS := ccs.(balancer.ClientConnState) + if diff := cmp.Diff(gotCCS, wantCCS, cmpopts.IgnoreFields(resolver.State{}, "Attributes")); diff != "" { + return fmt.Errorf("received unexpected ClientConnState, diff (-got +want): %v", diff) + } + return nil +} + +// waitForSubConnUpdate verifies if the testClusterImplBalancer receives the +// provided SubConn and SubConnState within a reasonable amount of time. +func (tb *testClusterImplBalancer) waitForSubConnUpdate(ctx context.Context, wantSCS subConnWithState) error { + scs, err := tb.scStateCh.Receive(ctx) + if err != nil { + return err + } + gotSCS := scs.(subConnWithState) + if !cmp.Equal(gotSCS, wantSCS, cmp.AllowUnexported(subConnWithState{}, testutils.TestSubConn{}, subConnWrapper{}, object{}), cmpopts.IgnoreFields(subConnWrapper{}, "scUpdateCh")) { + return fmt.Errorf("received SubConnState: %+v, want %+v", gotSCS, wantSCS) + } + return nil +} + +// waitForClose verifies if the testClusterImplBalancer receives a Close call +// within a reasonable amount of time. +func (tb *testClusterImplBalancer) waitForClose(ctx context.Context) error { + if _, err := tb.closeCh.Receive(ctx); err != nil { + return err + } + return nil +} + +// TestUpdateClientConnState invokes the UpdateClientConnState method on the +// odBalancer with different inputs and verifies that the child balancer is built +// and updated properly. +func (s) TestUpdateClientConnState(t *testing.T) { + internal.RegisterOutlierDetectionBalancerForTesting() + od, _ := setup(t) + defer func() { + od.Close() // this will leak a goroutine otherwise + internal.UnregisterOutlierDetectionBalancerForTesting() + }() + + od.UpdateClientConnState(balancer.ClientConnState{ + BalancerConfig: &LBConfig{ + Interval: 10 * time.Second, + BaseEjectionTime: 30 * time.Second, + MaxEjectionTime: 300 * time.Second, + MaxEjectionPercent: 10, + SuccessRateEjection: &SuccessRateEjection{ + StdevFactor: 1900, + EnforcementPercentage: 100, + MinimumHosts: 5, + RequestVolume: 100, + }, + FailurePercentageEjection: &FailurePercentageEjection{ + Threshold: 85, + EnforcementPercentage: 5, + MinimumHosts: 5, + RequestVolume: 50, + }, + ChildPolicy: &internalserviceconfig.BalancerConfig{ + Name: tcibname, + Config: testClusterImplBalancerConfig{}, + }, + }, + }) + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + // The child balancer should be created and forwarded the ClientConn update + // from the first successful UpdateClientConnState call. + if err := od.child.(*testClusterImplBalancer).waitForClientConnUpdate(ctx, balancer.ClientConnState{ + BalancerConfig: testClusterImplBalancerConfig{}, + }); err != nil { + t.Fatalf("Error waiting for Client Conn update: %v", err) + } +} + +// TestUpdateClientConnStateDifferentType invokes the UpdateClientConnState +// method on the odBalancer with two different types and verifies that the child +// balancer is built and updated properly on the first, and the second update +// closes the child and builds a new one. +func (s) TestUpdateClientConnStateDifferentType(t *testing.T) { + internal.RegisterOutlierDetectionBalancerForTesting() + od, _ := setup(t) + defer func() { + od.Close() // this will leak a goroutine otherwise + internal.UnregisterOutlierDetectionBalancerForTesting() + }() + + od.UpdateClientConnState(balancer.ClientConnState{ + BalancerConfig: &LBConfig{ + Interval: 10 * time.Second, + BaseEjectionTime: 30 * time.Second, + MaxEjectionTime: 300 * time.Second, + MaxEjectionPercent: 10, + SuccessRateEjection: &SuccessRateEjection{ + StdevFactor: 1900, + EnforcementPercentage: 100, + MinimumHosts: 5, + RequestVolume: 100, + }, + FailurePercentageEjection: &FailurePercentageEjection{ + Threshold: 85, + EnforcementPercentage: 5, + MinimumHosts: 5, + RequestVolume: 50, + }, + ChildPolicy: &internalserviceconfig.BalancerConfig{ + Name: tcibname, + Config: testClusterImplBalancerConfig{}, + }, + }, + }) + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + ciChild := od.child.(*testClusterImplBalancer) + // The child balancer should be created and forwarded the ClientConn update + // from the first successful UpdateClientConnState call. + if err := ciChild.waitForClientConnUpdate(ctx, balancer.ClientConnState{ + BalancerConfig: testClusterImplBalancerConfig{}, + }); err != nil { + t.Fatalf("Error waiting for Client Conn update: %v", err) + } + + od.UpdateClientConnState(balancer.ClientConnState{ + BalancerConfig: &LBConfig{ + Interval: 10 * time.Second, + BaseEjectionTime: 30 * time.Second, + MaxEjectionTime: 300 * time.Second, + MaxEjectionPercent: 10, + SuccessRateEjection: &SuccessRateEjection{ + StdevFactor: 1900, + EnforcementPercentage: 100, + MinimumHosts: 5, + RequestVolume: 100, + }, + FailurePercentageEjection: &FailurePercentageEjection{ + Threshold: 85, + EnforcementPercentage: 5, + MinimumHosts: 5, + RequestVolume: 50, + }, + ChildPolicy: &internalserviceconfig.BalancerConfig{ + Name: verifyBalancerName, + Config: verifyBalancerConfig{}, + }, + }, + }) + + // Verify previous child balancer closed. + if err := ciChild.waitForClose(ctx); err != nil { + t.Fatalf("Error waiting for Close() call on child balancer %v", err) + } +} + +// TestUpdateState tests that an UpdateState call gets forwarded to the +// ClientConn. +func (s) TestUpdateState(t *testing.T) { + internal.RegisterOutlierDetectionBalancerForTesting() + od, tcc := setup(t) + defer func() { + od.Close() + internal.UnregisterOutlierDetectionBalancerForTesting() + }() + + od.UpdateClientConnState(balancer.ClientConnState{ + BalancerConfig: &LBConfig{ + Interval: 10 * time.Second, + BaseEjectionTime: 30 * time.Second, + MaxEjectionTime: 300 * time.Second, + MaxEjectionPercent: 10, + SuccessRateEjection: &SuccessRateEjection{ + StdevFactor: 1900, + EnforcementPercentage: 100, + MinimumHosts: 5, + RequestVolume: 100, + }, + FailurePercentageEjection: &FailurePercentageEjection{ + Threshold: 85, + EnforcementPercentage: 5, + MinimumHosts: 5, + RequestVolume: 50, + }, + ChildPolicy: &internalserviceconfig.BalancerConfig{ + Name: tcibname, + Config: testClusterImplBalancerConfig{}, + }, + }, + }) + od.UpdateState(balancer.State{ + ConnectivityState: connectivity.Ready, + Picker: &testutils.TestConstPicker{}, + }) + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + // Should forward the connectivity State to Client Conn. + select { + case <-ctx.Done(): + t.Fatalf("timeout while waiting for a UpdateState call on the ClientConn") + case state := <-tcc.NewStateCh: + if state != connectivity.Ready { + t.Fatalf("ClientConn received connectivity state %v, want %v", state, connectivity.Ready) + } + } + + select { + case <-ctx.Done(): + t.Fatalf("timeout while waiting for a UpdateState call on the ClientConn") + case picker := <-tcc.NewPickerCh: + pi, err := picker.Pick(balancer.PickInfo{}) + if err != nil { + t.Fatalf("Picker.Pick should not have errored") + } + pi.Done(balancer.DoneInfo{}) + } +} + +// TestClose tests the Close operation on the Outlier Detection Balancer. The +// Close operation should close the child. +func (s) TestClose(t *testing.T) { + internal.RegisterOutlierDetectionBalancerForTesting() + defer func() { + internal.UnregisterOutlierDetectionBalancerForTesting() + }() + od, _ := setup(t) + + od.UpdateClientConnState(balancer.ClientConnState{ // could pull this out to the setup() call as well, maybe a wrapper on what is currently there... + BalancerConfig: &LBConfig{ + Interval: 10 * time.Second, + BaseEjectionTime: 30 * time.Second, + MaxEjectionTime: 300 * time.Second, + MaxEjectionPercent: 10, + SuccessRateEjection: &SuccessRateEjection{ + StdevFactor: 1900, + EnforcementPercentage: 100, + MinimumHosts: 5, + RequestVolume: 100, + }, + FailurePercentageEjection: &FailurePercentageEjection{ + Threshold: 85, + EnforcementPercentage: 5, + MinimumHosts: 5, + RequestVolume: 50, + }, + ChildPolicy: &internalserviceconfig.BalancerConfig{ + Name: tcibname, + Config: testClusterImplBalancerConfig{}, + }, + }, + }) + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + ciChild := od.child.(*testClusterImplBalancer) + if err := ciChild.waitForClientConnUpdate(ctx, balancer.ClientConnState{ + BalancerConfig: testClusterImplBalancerConfig{}, + }); err != nil { + t.Fatalf("Error waiting for Client Conn update: %v", err) + } + + od.Close() + + // Verify child balancer closed. + if err := ciChild.waitForClose(ctx); err != nil { + t.Fatalf("Error waiting for Close() call on child balancer %v", err) + } +} + +// TestUpdateAddresses tests the functionality of UpdateAddresses and any +// changes in the addresses/plurality of those addresses for a SubConn. +func (s) TestUpdateAddresses(t *testing.T) { + internal.RegisterOutlierDetectionBalancerForTesting() + od, tcc := setup(t) + defer func() { + od.Close() + internal.UnregisterOutlierDetectionBalancerForTesting() + }() + + od.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: resolver.State{ + Addresses: []resolver.Address{ + { + Addr: "address1", + }, + { + Addr: "address2", + }, + }, + }, + BalancerConfig: &LBConfig{ + Interval: 10 * time.Second, + BaseEjectionTime: 30 * time.Second, + MaxEjectionTime: 300 * time.Second, + MaxEjectionPercent: 10, + FailurePercentageEjection: &FailurePercentageEjection{ // have this eject one but not the other + Threshold: 50, + EnforcementPercentage: 100, + MinimumHosts: 2, + RequestVolume: 3, + }, + ChildPolicy: &internalserviceconfig.BalancerConfig{ + Name: tcibname, + Config: testClusterImplBalancerConfig{}, + }, + }, + }) + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + child := od.child.(*testClusterImplBalancer) + if err := child.waitForClientConnUpdate(ctx, balancer.ClientConnState{ + ResolverState: resolver.State{ + Addresses: []resolver.Address{ + { + Addr: "address1", + }, + { + Addr: "address2", + }, + }, + }, + BalancerConfig: testClusterImplBalancerConfig{}, + }); err != nil { + t.Fatalf("Error waiting for Client Conn update: %v", err) + } + + scw1, err := od.NewSubConn([]resolver.Address{ + { + Addr: "address1", + }, + }, balancer.NewSubConnOptions{}) + + if err != nil { + t.Fatalf("error in od.NewSubConn call: %v", err) + } + + select { + case <-ctx.Done(): + t.Fatal("timeout waiting for NewSubConn to be called on test Client Conn") + case <-tcc.NewSubConnCh: + } + _, ok := scw1.(*subConnWrapper) + if !ok { + t.Fatalf("SubConn passed downward should have been a subConnWrapper") + } + + scw2, err := od.NewSubConn([]resolver.Address{ + { + Addr: "address2", + }, + }, balancer.NewSubConnOptions{}) + if err != nil { + t.Fatalf("error in od.NewSubConn call: %v", err) + } + + od.UpdateState(balancer.State{ + ConnectivityState: connectivity.Ready, + Picker: &rrPicker{ + scs: []balancer.SubConn{scw1, scw2}, + }, + }) + + // Setup the system to where one address is ejected and one address + // isn't. + select { + case <-ctx.Done(): + t.Fatal("timeout while waiting for a UpdateState call on the ClientConn") + case picker := <-tcc.NewPickerCh: + pi, err := picker.Pick(balancer.PickInfo{}) + if err != nil { + t.Fatalf("Picker.Pick should not have errored") + } + pi.Done(balancer.DoneInfo{}) + pi.Done(balancer.DoneInfo{}) + pi.Done(balancer.DoneInfo{}) + pi.Done(balancer.DoneInfo{}) + pi.Done(balancer.DoneInfo{}) + // Eject the second address. + pi, err = picker.Pick(balancer.PickInfo{}) + if err != nil { + t.Fatalf("Picker.Pick should not have errored") + } + pi.Done(balancer.DoneInfo{Err: errors.New("some error")}) + pi.Done(balancer.DoneInfo{Err: errors.New("some error")}) + pi.Done(balancer.DoneInfo{Err: errors.New("some error")}) + pi.Done(balancer.DoneInfo{Err: errors.New("some error")}) + pi.Done(balancer.DoneInfo{Err: errors.New("some error")}) + od.intervalTimerAlgorithm() + // verify UpdateSubConnState() got called with TRANSIENT_FAILURE for + // child with address that was ejected. + if err := child.waitForSubConnUpdate(ctx, subConnWithState{ + sc: scw2, + state: balancer.SubConnState{ConnectivityState: connectivity.TransientFailure}, // Represents ejected + }); err != nil { + t.Fatalf("Error waiting for Sub Conn update: %v", err) + } + } + + // Update scw1 to another address that is currently ejected. This should + // cause scw1 to get ejected. + od.UpdateAddresses(scw1, []resolver.Address{ + { + Addr: "address2", + }, + }) + + // verify that update addresses gets forwarded to ClientConn. + select { + case <-ctx.Done(): + t.Fatal("timeout while waiting for a UpdateState call on the ClientConn") + case <-tcc.UpdateAddressesAddrsCh: + } + // verify scw1 got ejected (UpdateSubConnState called with TRANSIENT + // FAILURE). + if err := child.waitForSubConnUpdate(ctx, subConnWithState{ + sc: scw1, + state: balancer.SubConnState{ConnectivityState: connectivity.TransientFailure}, // Represents ejected + }); err != nil { + t.Fatalf("Error waiting for Sub Conn update: %v", err) + } + + // Update scw1 to multiple addresses. This should cause scw1 to get + // unejected, as is it no longer being tracked for Outlier Detection. + od.UpdateAddresses(scw1, []resolver.Address{ + { + Addr: "address1", + }, + { + Addr: "address2", + }, + }) + // verify scw2 got unejected (UpdateSubConnState called with recent state). + if err := child.waitForSubConnUpdate(ctx, subConnWithState{ + sc: scw1, + state: balancer.SubConnState{ConnectivityState: connectivity.Idle}, // If you uneject a SubConn that hasn't received a UpdateSubConnState, IDLE is recent state. This seems fine or is this wrong? + }); err != nil { + t.Fatalf("Error waiting for Sub Conn update: %v", err) + } + + // Update scw1 to a different multiple addresses list. A change of addresses + // in which the plurality goes from multiple to multiple should be a no-op, + // as the address continues to be ignored by outlier detection. + od.UpdateAddresses(scw1, []resolver.Address{ + { + Addr: "address2", + }, + { + Addr: "address3", + }, + }) + // Verify no downstream effects. + sCtx, cancel := context.WithTimeout(context.Background(), defaultTestShortTimeout) + defer cancel() + if err := od.child.(*testClusterImplBalancer).waitForSubConnUpdate(sCtx, subConnWithState{}); err == nil { + t.Fatalf("no SubConn update should have been sent (no SubConn got ejected/unejected)") + } + + // Update scw1 back to a single address, which is ejected. This should cause + // the SubConn to be re-ejected. + + od.UpdateAddresses(scw1, []resolver.Address{ + { + Addr: "address2", + }, + }) + // verify scw1 got ejected (UpdateSubConnState called with TRANSIENT FAILURE). + if err := child.waitForSubConnUpdate(ctx, subConnWithState{ + sc: scw1, + state: balancer.SubConnState{ConnectivityState: connectivity.TransientFailure}, // Represents ejected + }); err != nil { + t.Fatalf("Error waiting for Sub Conn update: %v", err) + } +} + +// Three important pieces of functionality run() synchronizes in regards to UpdateState calls towards grpc: + +// 1. On a config update, checking if the picker was actually created or not. +// 2. Keeping track of the most recent connectivity state sent from the child (determined by UpdateState()). +// * This will always forward up with most recent noopCfg bit +// 3. Keeping track of the most recent no-op config bit (determined by UpdateClientConnState()) +// * Will only forward up if no-op config bit changed and picker was already created + +// TestPicker tests the Picker updates sent upward to grpc from the Outlier +// Detection Balancer. Two things can trigger a picker update, an +// UpdateClientConnState call (can flip the no-op config bit that affects +// Picker) and an UpdateState call (determines the connectivity state sent +// upward). +func (s) TestPicker(t *testing.T) { + internal.RegisterOutlierDetectionBalancerForTesting() + od, tcc := setup(t) + defer func() { + od.Close() + internal.UnregisterOutlierDetectionBalancerForTesting() + }() + + od.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: resolver.State{ + Addresses: []resolver.Address{ + { + Addr: "address1", + }, + }, + }, + BalancerConfig: &LBConfig{ // TODO: S/ variable + Interval: 10 * time.Second, + BaseEjectionTime: 30 * time.Second, + MaxEjectionTime: 300 * time.Second, + MaxEjectionPercent: 10, + SuccessRateEjection: &SuccessRateEjection{ + StdevFactor: 1900, + EnforcementPercentage: 100, + MinimumHosts: 5, + RequestVolume: 100, + }, + FailurePercentageEjection: &FailurePercentageEjection{ + Threshold: 85, + EnforcementPercentage: 5, + MinimumHosts: 5, + RequestVolume: 50, + }, + ChildPolicy: &internalserviceconfig.BalancerConfig{ + Name: tcibname, + Config: testClusterImplBalancerConfig{}, + }, + }, + }) + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + child := od.child.(*testClusterImplBalancer) + if err := child.waitForClientConnUpdate(ctx, balancer.ClientConnState{ + ResolverState: resolver.State{ + Addresses: []resolver.Address{ + { + Addr: "address1", + }, + }, + }, + BalancerConfig: testClusterImplBalancerConfig{}, + }); err != nil { + t.Fatalf("Error waiting for Client Conn update: %v", err) + } + + scw, err := od.NewSubConn([]resolver.Address{ + { + Addr: "address1", + }, + }, balancer.NewSubConnOptions{}) + + if err != nil { + t.Fatalf("error in od.NewSubConn call: %v", err) + } + + od.UpdateState(balancer.State{ + ConnectivityState: connectivity.Ready, + Picker: &testutils.TestConstPicker{ + SC: scw, + }, + }) + + // Should forward the connectivity State to Client Conn. + select { + case <-ctx.Done(): + t.Fatalf("timeout while waiting for a UpdateState call on the ClientConn") + case state := <-tcc.NewStateCh: + if state != connectivity.Ready { + t.Fatalf("ClientConn received connectivity state %v, want %v", state, connectivity.Ready) + } + } + + select { + case <-ctx.Done(): + t.Fatalf("timeout while waiting for a UpdateState call on the ClientConn") + case picker := <-tcc.NewPickerCh: + pi, err := picker.Pick(balancer.PickInfo{}) + if err != nil { + t.Fatalf("Picker.Pick should not have errored") + } + pi.Done(balancer.DoneInfo{}) + pi.Done(balancer.DoneInfo{Err: errors.New("some error")}) + od.mu.Lock() + val, ok := od.odAddrs.Get(resolver.Address{ + Addr: "address1", + }) + if !ok { + t.Fatal("map entry for address: address1 not present in map") + } + obj, ok := val.(*object) + if !ok { + t.Fatal("map value isn't obj type") + } + bucketWant := &bucket{ + numSuccesses: 1, + numFailures: 1, + requestVolume: 2, + } + if diff := cmp.Diff((*bucket)(obj.callCounter.activeBucket), bucketWant); diff != "" { // no need for atomic read because not concurrent with Done() call from picker + t.Fatalf("callCounter is different than expected, diff (-got +want): %v", diff) + } + od.mu.Unlock() + } + + // UpdateClientConnState with a noop config + od.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: resolver.State{ + Addresses: []resolver.Address{ + { + Addr: "address1", + }, + }, + }, + BalancerConfig: &LBConfig{ + Interval: 1<<63 - 1, + ChildPolicy: &internalserviceconfig.BalancerConfig{ + Name: tcibname, + Config: testClusterImplBalancerConfig{}, + }, + }, + }) + + if err := child.waitForClientConnUpdate(ctx, balancer.ClientConnState{ + ResolverState: resolver.State{ + Addresses: []resolver.Address{ + { + Addr: "address1", + }, + }, + }, + BalancerConfig: testClusterImplBalancerConfig{}, + }); err != nil { + t.Fatalf("Error waiting for Client Conn update: %v", err) + } + + // The connectivity state sent to the Client Conn should be the persisted + // recent state received from the last UpdateState() call, which in this + // case is connecting. + select { + case <-ctx.Done(): + t.Fatal("timeout waiting for picker update on ClientConn, should have updated because no-op config changed on UpdateClientConnState") + case state := <-tcc.NewStateCh: + if state != connectivity.Ready { + t.Fatalf("ClientConn received connectivity state %v, want %v", state, connectivity.Ready) + } + } + + select { + case <-ctx.Done(): + t.Fatal("timeout waiting for picker update on ClientConn, should have updated because no-op config changed on UpdateClientConnState") + case picker := <-tcc.NewPickerCh: + pi, err := picker.Pick(balancer.PickInfo{}) + if err != nil { + t.Fatalf("Picker.Pick should not have errored") + } + pi.Done(balancer.DoneInfo{}) + pi.Done(balancer.DoneInfo{Err: errors.New("some error")}) + pi.Done(balancer.DoneInfo{Err: errors.New("some error")}) + od.mu.Lock() + val, ok := od.odAddrs.Get(resolver.Address{ + Addr: "address1", + }) + if !ok { + t.Fatal("map entry for address: address1 not present in map") + } + obj, ok := val.(*object) + if !ok { + t.Fatal("map value isn't obj type") + } + + // The active bucket should be cleared because the interval timer + // algorithm didn't run in between ClientConn updates and the picker + // should not count, as the outlier detection balancer is configured + // with a no-op configuration. + bucketWant := &bucket{} + if diff := cmp.Diff((*bucket)(obj.callCounter.activeBucket), bucketWant); diff != "" { // no need for atomic read because not concurrent with Done() call + t.Fatalf("callCounter is different than expected, diff (-got +want): %v", diff) + } + od.mu.Unlock() + } + + // UpdateState with a connecting state. This new most recent connectivity + // state should be forwarded to the Client Conn, alongside the most recent + // noop config bit which is true. + od.UpdateState(balancer.State{ + ConnectivityState: connectivity.Connecting, + Picker: &testutils.TestConstPicker{ + SC: scw, + }, + }) + + // Should forward the most recent connectivity State to Client Conn. + select { + case <-ctx.Done(): + t.Fatalf("timeout while waiting for a UpdateState call on the ClientConn") + case state := <-tcc.NewStateCh: + if state != connectivity.Connecting { + t.Fatalf("ClientConn received connectivity state %v, want %v", state, connectivity.Connecting) + } + } + + // Should forward the picker containing the most recent no-op config bit. + select { + case <-ctx.Done(): + t.Fatal("timeout while waiting for a UpdateState call on the ClientConn") + case picker := <-tcc.NewPickerCh: + pi, err := picker.Pick(balancer.PickInfo{}) + if err != nil { + t.Fatalf("Picker.Pick should not have errored") + } + pi.Done(balancer.DoneInfo{}) + pi.Done(balancer.DoneInfo{Err: errors.New("some error")}) + pi.Done(balancer.DoneInfo{Err: errors.New("some error")}) + od.mu.Lock() + val, ok := od.odAddrs.Get(resolver.Address{ + Addr: "address1", + }) + if !ok { + t.Fatal("map entry for address: address1 not present in map") + } + obj, ok := val.(*object) + if !ok { + t.Fatal("map value isn't obj type") + } + + // The active bucket should be cleared because the interval timer + // algorithm didn't run in between ClientConn updates and the picker + // should not count, as the outlier detection balancer is configured + // with a no-op configuration. + bucketWant := &bucket{} + if diff := cmp.Diff((*bucket)(obj.callCounter.activeBucket), bucketWant); diff != "" { // no need for atomic read because not concurrent with Done() call + t.Fatalf("callCounter is different than expected, diff (-got +want): %v", diff) + } + od.mu.Unlock() + } +} + +type rrPicker struct { + scs []balancer.SubConn + next int +} + +func (rrp *rrPicker) Pick(balancer.PickInfo) (balancer.PickResult, error) { + sc := rrp.scs[rrp.next] + rrp.next = (rrp.next + 1) % len(rrp.scs) + return balancer.PickResult{SubConn: sc}, nil +} + +// TestEjectUnejectSuccessRate tests the functionality of the interval timer +// algorithm of ejecting/unejecting SubConns when configured with +// SuccessRateEjection. It also tests a desired invariant of a SubConnWrapper +// being ejected or unejected, which is to either forward or not forward SubConn +// updates from grpc. +func (s) TestEjectUnejectSuccessRate(t *testing.T) { + internal.RegisterOutlierDetectionBalancerForTesting() + // Setup the outlier detection balancer to a point where it will be in a + // situation to potentially eject addresses. + od, tcc := setup(t) + defer func() { + od.Close() + internal.UnregisterOutlierDetectionBalancerForTesting() + }() + + od.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: resolver.State{ + Addresses: []resolver.Address{ + { + Addr: "address1", + }, + { + Addr: "address2", + }, + { + Addr: "address3", + }, + }, + }, + BalancerConfig: &LBConfig{ + Interval: 1<<63 - 1, // so the interval will never run unless called manually in test. + BaseEjectionTime: 30 * time.Second, + MaxEjectionTime: 300 * time.Second, + MaxEjectionPercent: 10, + SuccessRateEjection: &SuccessRateEjection{ + StdevFactor: 500, + EnforcementPercentage: 100, + MinimumHosts: 3, + RequestVolume: 3, + }, + ChildPolicy: &internalserviceconfig.BalancerConfig{ + Name: tcibname, + Config: testClusterImplBalancerConfig{}, + }, + }, + }) + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + child := od.child.(*testClusterImplBalancer) + if err := child.waitForClientConnUpdate(ctx, balancer.ClientConnState{ + ResolverState: resolver.State{ + Addresses: []resolver.Address{ + { + Addr: "address1", + }, + { + Addr: "address2", + }, + { + Addr: "address3", + }, + }, + }, + BalancerConfig: testClusterImplBalancerConfig{}, + }); err != nil { + t.Fatalf("Error waiting for Client Conn update: %v", err) + } + + scw1, err := od.NewSubConn([]resolver.Address{ + { + Addr: "address1", + }, + }, balancer.NewSubConnOptions{}) + if err != nil { + t.Fatalf("error in od.NewSubConn call: %v", err) + } + if err != nil { + t.Fatalf("error in od.NewSubConn call: %v", err) + } + + scw2, err := od.NewSubConn([]resolver.Address{ + { + Addr: "address2", + }, + }, balancer.NewSubConnOptions{}) + if err != nil { + t.Fatalf("error in od.NewSubConn call: %v", err) + } + + scw3, err := od.NewSubConn([]resolver.Address{ + { + Addr: "address3", + }, + }, balancer.NewSubConnOptions{}) + if err != nil { + t.Fatalf("error in od.NewSubConn call: %v", err) + } + + od.UpdateState(balancer.State{ + ConnectivityState: connectivity.Ready, + Picker: &rrPicker{ + scs: []balancer.SubConn{scw1, scw2, scw3}, + }, + }) + + select { + case <-ctx.Done(): + t.Fatalf("timeout while waiting for a UpdateState call on the ClientConn") + case picker := <-tcc.NewPickerCh: + // Set each of the three upstream addresses to have five successes each. + // This should cause none of the addresses to be ejected as none of them + // are outliers according to the success rate algorithm. + for i := 0; i < 3; i++ { + pi, err := picker.Pick(balancer.PickInfo{}) + if err != nil { + t.Fatalf("Picker.Pick should not have errored") + } + pi.Done(balancer.DoneInfo{}) + pi.Done(balancer.DoneInfo{}) + pi.Done(balancer.DoneInfo{}) + pi.Done(balancer.DoneInfo{}) + pi.Done(balancer.DoneInfo{}) + } + + od.intervalTimerAlgorithm() + + // verify no UpdateSubConnState() call on the child, as no addresses got + // ejected (ejected address will cause an UpdateSubConnState call). + sCtx, cancel := context.WithTimeout(context.Background(), defaultTestShortTimeout) + defer cancel() + if err := child.waitForSubConnUpdate(sCtx, subConnWithState{}); err == nil { + t.Fatalf("no SubConn update should have been sent (no SubConn got ejected)") + } + + // Since no addresses are ejected, a SubConn update should forward down + // to the child. + od.UpdateSubConnState(scw1.(*subConnWrapper).SubConn, balancer.SubConnState{ + ConnectivityState: connectivity.Connecting, + }) + + if err := child.waitForSubConnUpdate(ctx, subConnWithState{ + sc: scw1, + state: balancer.SubConnState{ConnectivityState: connectivity.Connecting}, + }); err != nil { + t.Fatalf("Error waiting for Sub Conn update: %v", err) + } + + // Set two of the upstream addresses to have five successes each, and + // one of the upstream addresses to have five failures. This should + // cause the address which has five failures to be ejected according the + // SuccessRateAlgorithm. + for i := 0; i < 2; i++ { + pi, err := picker.Pick(balancer.PickInfo{}) + if err != nil { + t.Fatalf("Picker.Pick should not have errored") + } + pi.Done(balancer.DoneInfo{}) + pi.Done(balancer.DoneInfo{}) + pi.Done(balancer.DoneInfo{}) + pi.Done(balancer.DoneInfo{}) + pi.Done(balancer.DoneInfo{}) + } + pi, err := picker.Pick(balancer.PickInfo{}) + if err != nil { + t.Fatalf("Picker.Pick should not have errored") + } + pi.Done(balancer.DoneInfo{Err: errors.New("some error")}) + pi.Done(balancer.DoneInfo{Err: errors.New("some error")}) + pi.Done(balancer.DoneInfo{Err: errors.New("some error")}) + pi.Done(balancer.DoneInfo{Err: errors.New("some error")}) + pi.Done(balancer.DoneInfo{Err: errors.New("some error")}) + + // should eject address that always errored. + od.intervalTimerAlgorithm() + // Due to the address being ejected, the SubConn with that address + // should be ejected, meaning a TRANSIENT_FAILURE connectivity state + // gets reported to the child. + if err := child.waitForSubConnUpdate(ctx, subConnWithState{ + sc: scw3, + state: balancer.SubConnState{ConnectivityState: connectivity.TransientFailure}, // Represents ejected + }); err != nil { + t.Fatalf("Error waiting for Sub Conn update: %v", err) + } + // Only one address should be ejected. + sCtx, cancel = context.WithTimeout(context.Background(), defaultTestShortTimeout) + defer cancel() + if err := child.waitForSubConnUpdate(sCtx, subConnWithState{}); err == nil { + t.Fatalf("Only one SubConn update should have been sent (only one SubConn got ejected)") + } + + // Now that an address is ejected, SubConn updates for SubConns using + // that address should not be forwarded downward. These SubConn updates + // will be cached to update the child sometime in the future when the + // address gets unejected. + od.UpdateSubConnState(pi.SubConn, balancer.SubConnState{ + ConnectivityState: connectivity.Connecting, + }) + if err := child.waitForSubConnUpdate(sCtx, subConnWithState{}); err == nil { + t.Fatalf("SubConn update should not have been forwarded (the SubConn is ejected)") + } + + // Override now to cause the interval timer algorithm to always uneject a SubConn. + defer func(n func() time.Time) { + now = n + }(now) + + now = func() time.Time { + return time.Now().Add(time.Second * 1000) // will cause to always uneject addresses which are ejected + } + od.intervalTimerAlgorithm() + + // unejected SubConn should report latest persisted state - which is + // connecting from earlier. + if err := child.waitForSubConnUpdate(ctx, subConnWithState{ + sc: scw3, + state: balancer.SubConnState{ConnectivityState: connectivity.Connecting}, + }); err != nil { + t.Fatalf("Error waiting for Sub Conn update: %v", err) + } + + } + +} + +// TestEjectFailureRate tests the functionality of the interval timer +// algorithm of ejecting SubConns when configured with +// FailurePercentageEjection. +func (s) TestEjectFailureRate(t *testing.T) { + internal.RegisterOutlierDetectionBalancerForTesting() + od, tcc := setup(t) + defer func() { + internal.UnregisterOutlierDetectionBalancerForTesting() + defer od.Close() + }() + + od.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: resolver.State{ + Addresses: []resolver.Address{ + { + Addr: "address1", + }, + { + Addr: "address2", + }, + { + Addr: "address3", + }, + }, + }, + BalancerConfig: &LBConfig{ + Interval: 1<<63 - 1, // so the interval will never run unless called manually in test. + BaseEjectionTime: 30 * time.Second, + MaxEjectionTime: 300 * time.Second, + MaxEjectionPercent: 10, + FailurePercentageEjection: &FailurePercentageEjection{ + Threshold: 50, + EnforcementPercentage: 100, + MinimumHosts: 3, + RequestVolume: 3, + }, + ChildPolicy: &internalserviceconfig.BalancerConfig{ + Name: tcibname, + Config: testClusterImplBalancerConfig{}, + }, + }, + }) + + child := od.child.(*testClusterImplBalancer) + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if err := child.waitForClientConnUpdate(ctx, balancer.ClientConnState{ + ResolverState: resolver.State{ + Addresses: []resolver.Address{ + { + Addr: "address1", + }, + { + Addr: "address2", + }, + { + Addr: "address3", + }, + }, + }, + BalancerConfig: testClusterImplBalancerConfig{}, + }); err != nil { + t.Fatalf("Error waiting for Client Conn update: %v", err) + } + + scw1, err := od.NewSubConn([]resolver.Address{ + { + Addr: "address1", + }, + }, balancer.NewSubConnOptions{}) + if err != nil { + t.Fatalf("error in od.NewSubConn call: %v", err) + } + if err != nil { + t.Fatalf("error in od.NewSubConn call: %v", err) + } + + scw2, err := od.NewSubConn([]resolver.Address{ + { + Addr: "address2", + }, + }, balancer.NewSubConnOptions{}) + if err != nil { + t.Fatalf("error in od.NewSubConn call: %v", err) + } + + scw3, err := od.NewSubConn([]resolver.Address{ + { + Addr: "address3", + }, + }, balancer.NewSubConnOptions{}) + if err != nil { + t.Fatalf("error in od.NewSubConn call: %v", err) + } + + od.UpdateState(balancer.State{ + ConnectivityState: connectivity.Ready, + Picker: &rrPicker{ + scs: []balancer.SubConn{scw1, scw2, scw3}, + }, + }) + + select { + case <-ctx.Done(): + t.Fatal("timeout while waiting for a UpdateState call on the ClientConn") + case picker := <-tcc.NewPickerCh: + // Set each upstream address to have five successes each. This should + // cause none of the addresses to be ejected as none of them are below + // the failure percentage threshold. + for i := 0; i < 3; i++ { + pi, err := picker.Pick(balancer.PickInfo{}) + if err != nil { + t.Fatalf("Picker.Pick should not have errored") + } + pi.Done(balancer.DoneInfo{}) + pi.Done(balancer.DoneInfo{}) + pi.Done(balancer.DoneInfo{}) + pi.Done(balancer.DoneInfo{}) + pi.Done(balancer.DoneInfo{}) + } + + od.intervalTimerAlgorithm() + + sCtx, cancel := context.WithTimeout(context.Background(), defaultTestShortTimeout) + defer cancel() + if err := child.waitForSubConnUpdate(sCtx, subConnWithState{}); err == nil { + t.Fatalf("no SubConn update should have been sent (no SubConn got ejected)") + } + + // Set two upstream addresses to have five successes each, and one + // upstream address to have five failures. This should cause the address + // with five failures to be ejected according to the Failure Percentage + // Algorithm. + for i := 0; i < 2; i++ { + pi, err := picker.Pick(balancer.PickInfo{}) + if err != nil { + t.Fatalf("Picker.Pick should not have errored") + } + pi.Done(balancer.DoneInfo{}) + pi.Done(balancer.DoneInfo{}) + pi.Done(balancer.DoneInfo{}) + pi.Done(balancer.DoneInfo{}) + pi.Done(balancer.DoneInfo{}) + } + pi, err := picker.Pick(balancer.PickInfo{}) + if err != nil { + t.Fatalf("Picker.Pick should not have errored") + } + pi.Done(balancer.DoneInfo{Err: errors.New("some error")}) + pi.Done(balancer.DoneInfo{Err: errors.New("some error")}) + pi.Done(balancer.DoneInfo{Err: errors.New("some error")}) + pi.Done(balancer.DoneInfo{Err: errors.New("some error")}) + pi.Done(balancer.DoneInfo{Err: errors.New("some error")}) + + // should eject address that always errored. + od.intervalTimerAlgorithm() + + // verify UpdateSubConnState() got called with TRANSIENT_FAILURE for + // child in address that was ejected. + if err := child.waitForSubConnUpdate(ctx, subConnWithState{ + sc: scw3, + state: balancer.SubConnState{ConnectivityState: connectivity.TransientFailure}, // Represents ejected + }); err != nil { + t.Fatalf("Error waiting for Sub Conn update: %v", err) + } + + // verify only one address got ejected + sCtx, cancel = context.WithTimeout(context.Background(), defaultTestShortTimeout) + defer cancel() + if err := child.waitForSubConnUpdate(sCtx, subConnWithState{}); err == nil { + t.Fatalf("Only one SubConn update should have been sent (only one SubConn got ejected)") + } + } +} + +// TestDurationOfInterval tests the configured interval timer. On the first +// config received, the Outlier Detection balancer should configure the timer +// with whatever is directly specified on the config. On subsequent configs +// received, the Outlier Detection balancer should configure the timer with +// whatever interval is configured minus the difference between the current time +// and the previous start timestamp. For a no-op configuration, the timer should +// not be configured at all. +func (s) TestDurationOfInterval(t *testing.T) { + internal.RegisterOutlierDetectionBalancerForTesting() + od, _ := setup(t) + defer func(af func(d time.Duration, f func()) *time.Timer) { + od.Close() + afterFunc = af + internal.UnregisterOutlierDetectionBalancerForTesting() + }(afterFunc) + + durationChan := testutils.NewChannel() + afterFunc = func(dur time.Duration, _ func()) *time.Timer { + durationChan.Send(dur) + return time.NewTimer(1<<63 - 1) + } + + od.UpdateClientConnState(balancer.ClientConnState{ + BalancerConfig: &LBConfig{ + Interval: 8 * time.Second, + BaseEjectionTime: 30 * time.Second, + MaxEjectionTime: 300 * time.Second, + MaxEjectionPercent: 10, + SuccessRateEjection: &SuccessRateEjection{ + StdevFactor: 1900, + EnforcementPercentage: 100, + MinimumHosts: 5, + RequestVolume: 100, + }, + FailurePercentageEjection: &FailurePercentageEjection{ + Threshold: 85, + EnforcementPercentage: 5, + MinimumHosts: 5, + RequestVolume: 50, + }, + ChildPolicy: &internalserviceconfig.BalancerConfig{ + Name: tcibname, + Config: testClusterImplBalancerConfig{}, + }, + }, + }) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if err := od.child.(*testClusterImplBalancer).waitForClientConnUpdate(ctx, balancer.ClientConnState{ + BalancerConfig: testClusterImplBalancerConfig{}, + }); err != nil { + t.Fatalf("Error waiting for Client Conn update: %v", err) + } + + d, err := durationChan.Receive(ctx) + if err != nil { + t.Fatalf("Error receiving duration from afterFunc() call: %v", err) + } + dur := d.(time.Duration) + // The configured duration should be 8 seconds - what the balancer was + // configured with. + if dur.Seconds() != 8 { + t.Fatalf("configured duration should have been 8 seconds to start timer") + } + + defer func(n func() time.Time) { + now = n + }(now) + now = func() time.Time { + return time.Now().Add(time.Second * 5) + } + + // UpdateClientConnState with an interval of 9 seconds. Due to 5 seconds + // already passing (from overridden time.Now function), this should start an + // interval timer of ~4 seconds. + od.UpdateClientConnState(balancer.ClientConnState{ + BalancerConfig: &LBConfig{ + Interval: 9 * time.Second, + BaseEjectionTime: 30 * time.Second, + MaxEjectionTime: 300 * time.Second, + MaxEjectionPercent: 10, + SuccessRateEjection: &SuccessRateEjection{ + StdevFactor: 1900, + EnforcementPercentage: 100, + MinimumHosts: 5, + RequestVolume: 100, + }, + FailurePercentageEjection: &FailurePercentageEjection{ + Threshold: 85, + EnforcementPercentage: 5, + MinimumHosts: 5, + RequestVolume: 50, + }, + ChildPolicy: &internalserviceconfig.BalancerConfig{ + Name: tcibname, + Config: testClusterImplBalancerConfig{}, + }, + }, + }) + + ctx, cancel = context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if err := od.child.(*testClusterImplBalancer).waitForClientConnUpdate(ctx, balancer.ClientConnState{ + BalancerConfig: testClusterImplBalancerConfig{}, + }); err != nil { + t.Fatalf("Error waiting for Client Conn update: %v", err) + } + d, err = durationChan.Receive(ctx) + if err != nil { + t.Fatalf("Error receiving duration from afterFunc() call: %v", err) + } + dur = d.(time.Duration) + if dur.Seconds() < 3.5 || 4.5 < dur.Seconds() { + t.Fatalf("configured duration should have been around 4 seconds to start timer") + } + + // UpdateClientConnState with a no-op config. This shouldn't configure the + // interval timer at all due to it being a no-op. + od.UpdateClientConnState(balancer.ClientConnState{ + BalancerConfig: &LBConfig{ + Interval: 10 * time.Second, + ChildPolicy: &internalserviceconfig.BalancerConfig{ + Name: tcibname, + Config: testClusterImplBalancerConfig{}, + }, + }, + }) + + ctx, cancel = context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if err := od.child.(*testClusterImplBalancer).waitForClientConnUpdate(ctx, balancer.ClientConnState{ + BalancerConfig: testClusterImplBalancerConfig{}, + }); err != nil { + t.Fatalf("Error waiting for Client Conn update: %v", err) + } + // No timer should have been started. + sCtx, cancel := context.WithTimeout(context.Background(), defaultTestShortTimeout) + defer cancel() + _, err = durationChan.Receive(sCtx) + if err == nil { + t.Fatal("No timer should have started.") + } +} + +// TestConcurrentPickerCountsWithIntervalTimer tests concurrent picker updates +// (writing to the callCounter) and the interval timer algorithm, which reads +// the callCounter. +func (s) TestConcurrentPickerCountsWithIntervalTimer(t *testing.T) { + internal.RegisterOutlierDetectionBalancerForTesting() + od, tcc := setup(t) + defer func() { + defer od.Close() + internal.UnregisterOutlierDetectionBalancerForTesting() + }() + + od.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: resolver.State{ + Addresses: []resolver.Address{ + { + Addr: "address1", + }, + { + Addr: "address2", + }, + { + Addr: "address3", + }, + }, + }, + BalancerConfig: &LBConfig{ + Interval: 1<<63 - 1, // so the interval will never run unless called manually in test. + BaseEjectionTime: 30 * time.Second, + MaxEjectionTime: 300 * time.Second, + MaxEjectionPercent: 10, + SuccessRateEjection: &SuccessRateEjection{ // Have both Success Rate and Failure Percentage to step through all the interval timer code + StdevFactor: 500, + EnforcementPercentage: 100, + MinimumHosts: 3, + RequestVolume: 3, + }, + FailurePercentageEjection: &FailurePercentageEjection{ + Threshold: 50, + EnforcementPercentage: 100, + MinimumHosts: 3, + RequestVolume: 3, + }, + ChildPolicy: &internalserviceconfig.BalancerConfig{ + Name: tcibname, + Config: testClusterImplBalancerConfig{}, + }, + }, + }) + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if err := od.child.(*testClusterImplBalancer).waitForClientConnUpdate(ctx, balancer.ClientConnState{ + ResolverState: resolver.State{ + Addresses: []resolver.Address{ + { + Addr: "address1", + }, + { + Addr: "address2", + }, + { + Addr: "address3", + }, + }, + }, + BalancerConfig: testClusterImplBalancerConfig{}, + }); err != nil { + t.Fatalf("Error waiting for Client Conn update: %v", err) + } + + scw1, err := od.NewSubConn([]resolver.Address{ + { + Addr: "address1", + }, + }, balancer.NewSubConnOptions{}) + if err != nil { + t.Fatalf("error in od.NewSubConn call: %v", err) + } + if err != nil { + t.Fatalf("error in od.NewSubConn call: %v", err) + } + + scw2, err := od.NewSubConn([]resolver.Address{ + { + Addr: "address2", + }, + }, balancer.NewSubConnOptions{}) + if err != nil { + t.Fatalf("error in od.NewSubConn call: %v", err) + } + + scw3, err := od.NewSubConn([]resolver.Address{ + { + Addr: "address3", + }, + }, balancer.NewSubConnOptions{}) + if err != nil { + t.Fatalf("error in od.NewSubConn call: %v", err) + } + + od.UpdateState(balancer.State{ + ConnectivityState: connectivity.Ready, + Picker: &rrPicker{ + scs: []balancer.SubConn{scw1, scw2, scw3}, + }, + }) + + var picker balancer.Picker + select { + case <-ctx.Done(): + t.Fatalf("timeout while waiting for a UpdateState call on the ClientConn") + case picker = <-tcc.NewPickerCh: + } + + // Spawn a goroutine that constantly picks and invokes the Done callback + // counting for successful and failing RPC's. + finished := make(chan struct{}) + go func() { + // constantly update the picker to test for no race conditions causing + // corrupted memory (have concurrent pointer reads/writes). + for { + select { + case <-finished: + return + default: + } + pi, err := picker.Pick(balancer.PickInfo{}) + if err != nil { + continue + } + pi.Done(balancer.DoneInfo{}) + pi.Done(balancer.DoneInfo{Err: errors.New("some error")}) + time.Sleep(1 * time.Nanosecond) + } + }() + + od.intervalTimerAlgorithm() // causes two swaps on the callCounter + od.intervalTimerAlgorithm() + close(finished) +} + +// TestConcurrentOperations calls different operations on the balancer in +// separate goroutines to test for any race conditions and deadlocks. It also +// uses a child balancer which verifies that no operations on the child get +// called after the child balancer is closed. +func (s) TestConcurrentOperations(t *testing.T) { + internal.RegisterOutlierDetectionBalancerForTesting() + od, tcc := setup(t) + defer func() { + od.Close() + internal.UnregisterOutlierDetectionBalancerForTesting() + }() + + od.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: resolver.State{ + Addresses: []resolver.Address{ + { + Addr: "address1", + }, + { + Addr: "address2", + }, + { + Addr: "address3", + }, + }, + }, + BalancerConfig: &LBConfig{ + Interval: 1<<63 - 1, // so the interval will never run unless called manually in test. + BaseEjectionTime: 30 * time.Second, + MaxEjectionTime: 300 * time.Second, + MaxEjectionPercent: 10, + SuccessRateEjection: &SuccessRateEjection{ // Have both Success Rate and Failure Percentage to step through all the interval timer code + StdevFactor: 500, + EnforcementPercentage: 100, + MinimumHosts: 3, + RequestVolume: 3, + }, + FailurePercentageEjection: &FailurePercentageEjection{ + Threshold: 50, + EnforcementPercentage: 100, + MinimumHosts: 3, + RequestVolume: 3, + }, + ChildPolicy: &internalserviceconfig.BalancerConfig{ + Name: verifyBalancerName, + Config: verifyBalancerConfig{}, + }, + }, + }) + + od.child.(*verifyBalancer).t = t + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + scw1, err := od.NewSubConn([]resolver.Address{ + { + Addr: "address1", + }, + }, balancer.NewSubConnOptions{}) + if err != nil { + t.Fatalf("error in od.NewSubConn call: %v", err) + } + if err != nil { + t.Fatalf("error in od.NewSubConn call: %v", err) + } + + scw2, err := od.NewSubConn([]resolver.Address{ + { + Addr: "address2", + }, + }, balancer.NewSubConnOptions{}) + if err != nil { + t.Fatalf("error in od.NewSubConn call: %v", err) + } + + scw3, err := od.NewSubConn([]resolver.Address{ + { + Addr: "address3", + }, + }, balancer.NewSubConnOptions{}) + if err != nil { + t.Fatalf("error in od.NewSubConn call: %v", err) + } + + od.UpdateState(balancer.State{ + ConnectivityState: connectivity.Ready, + Picker: &rrPicker{ + scs: []balancer.SubConn{scw2, scw3}, + }, + }) + + var picker balancer.Picker + select { + case <-ctx.Done(): + t.Fatalf("timeout while waiting for a UpdateState call on the ClientConn") + case picker = <-tcc.NewPickerCh: + } + + finished := make(chan struct{}) + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-finished: + return + default: + } + pi, err := picker.Pick(balancer.PickInfo{}) + if err != nil { + continue + } + pi.Done(balancer.DoneInfo{}) + pi.Done(balancer.DoneInfo{Err: errors.New("some error")}) + time.Sleep(1 * time.Nanosecond) + } + }() + + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-finished: + return + default: + } + od.intervalTimerAlgorithm() + } + }() + + // call Outlier Detection's balancer.ClientConn operations asynchrously. + // balancer.ClientConn operations have no guarantee from the API to be + // called synchronously. + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-finished: + return + default: + } + od.UpdateState(balancer.State{ + ConnectivityState: connectivity.Ready, + Picker: &rrPicker{ + scs: []balancer.SubConn{scw2, scw3}, + }, + }) + time.Sleep(1 * time.Nanosecond) + } + }() + + wg.Add(1) + go func() { + defer wg.Done() + od.NewSubConn([]resolver.Address{{Addr: "address4"}}, balancer.NewSubConnOptions{}) + }() + + wg.Add(1) + go func() { + defer wg.Done() + od.RemoveSubConn(scw1) + }() + + wg.Add(1) + go func() { + defer wg.Done() + od.UpdateAddresses(scw2, []resolver.Address{ + { + Addr: "address3", + }, + }) + }() + + // Call balancer.Balancers synchronously in this goroutine, upholding the + // balancer.Balancer API guarantee. + od.UpdateClientConnState(balancer.ClientConnState{ // This will delete addresses and flip to no op + ResolverState: resolver.State{ + Addresses: []resolver.Address{ + { + Addr: "address1", + }, + }, + }, + BalancerConfig: &LBConfig{ + Interval: 1<<63 - 1, + ChildPolicy: &internalserviceconfig.BalancerConfig{ + Name: verifyBalancerName, + Config: verifyBalancerConfig{}, + }, + }, + }) + + // Call balancer.Balancers synchronously in this goroutine, upholding the + // balancer.Balancer API guarantee. + od.UpdateSubConnState(scw1, balancer.SubConnState{ + ConnectivityState: connectivity.Connecting, + }) + od.ResolverError(errors.New("some error")) + od.ExitIdle() + od.Close() + close(finished) + wg.Wait() +} + +type verifyBalancerConfig struct { + serviceconfig.LoadBalancingConfig +} + +type verifyBalancerBuilder struct{} + +func (verifyBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer { + return &verifyBalancer{ + closed: grpcsync.NewEvent(), + } +} + +func (verifyBalancerBuilder) Name() string { + return verifyBalancerName +} + +// verifyBalancer is a balancer that verifies after a Close() call, +// no other balancer.Balancer methods are called afterward. +type verifyBalancer struct { + closed *grpcsync.Event + // To fail the test if any balancer.Balancer operation gets called after + // Close(). + t *testing.T +} + +func (vb *verifyBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error { + if vb.closed.HasFired() { + vb.t.Fatal("UpdateClientConnState was called after Close(), which breaks the balancer API") + } + return nil +} + +func (vb *verifyBalancer) ResolverError(err error) { + if vb.closed.HasFired() { + vb.t.Fatal("ResolverError was called after Close(), which breaks the balancer API") + } +} + +func (vb *verifyBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) { + if vb.closed.HasFired() { + vb.t.Fatal("UpdateSubConnState was called after Close(), which breaks the balancer API") + } +} + +func (vb *verifyBalancer) Close() { + vb.closed.Fire() +} + +func (vb *verifyBalancer) ExitIdle() { + if vb.closed.HasFired() { + vb.t.Fatal("ExitIdle was called after Close(), which breaks the balancer API") + } +} diff --git a/xds/internal/balancer/outlierdetection/callcounter.go b/xds/internal/balancer/outlierdetection/callcounter.go new file mode 100644 index 000000000000..2c4667d7d1cc --- /dev/null +++ b/xds/internal/balancer/outlierdetection/callcounter.go @@ -0,0 +1,78 @@ +/* + * + * Copyright 2022 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package outlierdetection + +import ( + "sync/atomic" + "unsafe" +) + +type bucket struct { + numSuccesses int64 + numFailures int64 + requestVolume int64 // numSuccesses + numFailures, needed because this number will be used in interval timer algorithm +} + +func newCallCounter() *callCounter { + return &callCounter{ + activeBucket: unsafe.Pointer(&bucket{}), + inactiveBucket: &bucket{}, + } +} + +type callCounter struct { + // "The object contains two buckets, and each bucket has a number counting + // successes, and another counting failures." - A50 + + // activeBucket updates every time a call finishes (from picker passed to + // Client Conn), so protect pointer read with atomic load of unsafe.Pointer + // so picker does not have to grab a mutex per RPC, the critical path. + activeBucket unsafe.Pointer + inactiveBucket *bucket +} + +func (cc *callCounter) clear() { + atomic.StorePointer(&cc.activeBucket, unsafe.Pointer(&bucket{})) + cc.inactiveBucket = &bucket{} +} + +// "When the timer triggers, the inactive bucket is zeroed and swapped with the +// active bucket. Then the inactive bucket contains the number of successes and +// failures since the last time the timer triggered. Those numbers are used to +// evaluate the ejection criteria." - A50 +func (cc *callCounter) swap() { + ab := (*bucket)(atomic.LoadPointer(&cc.activeBucket)) + // Don't do it exactly like defined but the same logically, as picker reads + // ref to active bucket so instead of swapping the pointers (inducing race + // conditions where picker writes to inactive bucket which is being used for + // outlier detection algorithm, copy active bucket to new memory on heap, + // picker updates which race simply write to deprecated heap memory + // activeBucket used to point to). The other options is to do this as + // defined, swap the pointers and have atomic reads of the Inactive Bucket + // in interval timer algorithm, but I think this is cleaner in regards to + // dealing with picker race condition. See the wrappedPicker explanation for + // the write to activeBucket for a more in depth explanation. + cc.inactiveBucket = &bucket{ + numSuccesses: atomic.LoadInt64(&ab.numSuccesses), + numFailures: atomic.LoadInt64(&ab.numFailures), + requestVolume: atomic.LoadInt64(&ab.requestVolume), + } + atomic.StorePointer(&cc.activeBucket, unsafe.Pointer(&bucket{})) + // end result, same as in gRFC: the inactive bucket contains the number of + // successes and failures since the last time the timer triggered. +} diff --git a/xds/internal/balancer/outlierdetection/callcounter_test.go b/xds/internal/balancer/outlierdetection/callcounter_test.go new file mode 100644 index 000000000000..638e252e146d --- /dev/null +++ b/xds/internal/balancer/outlierdetection/callcounter_test.go @@ -0,0 +1,99 @@ +/* + * + * Copyright 2022 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package outlierdetection + +import ( + "sync/atomic" + "testing" + + "github.com/google/go-cmp/cmp" +) + +func (b1 *bucket) Equal(b2 *bucket) bool { + if b1 == nil && b2 == nil { + return true + } + if (b1 != nil) != (b2 != nil) { + return false + } + if b1.numSuccesses != b2.numSuccesses { + return false + } + if b1.numFailures != b2.numFailures { + return false + } + return b1.requestVolume == b2.requestVolume +} + +func (cc1 *callCounter) Equal(cc2 *callCounter) bool { + if cc1 == nil && cc2 == nil { + return true + } + if (cc1 != nil) != (cc2 != nil) { + return false + } + ab1 := (*bucket)(atomic.LoadPointer(&cc1.activeBucket)) + ab2 := (*bucket)(atomic.LoadPointer(&cc2.activeBucket)) + if !ab1.Equal(ab2) { + return false + } + return cc1.inactiveBucket.Equal(cc2.inactiveBucket) +} + +// TestClear tests that clear on the call counter clears (everything set to 0) +// the active and inactive buckets. +func (s) TestClear(t *testing.T) { + cc := newCallCounter() + ab := (*bucket)(atomic.LoadPointer(&cc.activeBucket)) + ab.numSuccesses = 1 + ab.numFailures = 2 + ab.requestVolume = 3 + cc.inactiveBucket.numSuccesses = 4 + cc.inactiveBucket.numFailures = 5 + cc.inactiveBucket.requestVolume = 9 + cc.clear() + // Both the active and inactive buckets should be cleared. + ccWant := newCallCounter() + if diff := cmp.Diff(cc, ccWant); diff != "" { + t.Fatalf("callCounter is different than expected, diff (-got +want): %v", diff) + } +} + +// TestSwap tests that swap() on the callCounter successfully has the desired +// end result of inactive bucket containing the previous active buckets data, +// and the active bucket being cleared. +func (s) TestSwap(t *testing.T) { + cc := newCallCounter() + ab := (*bucket)(atomic.LoadPointer(&cc.activeBucket)) + ab.numSuccesses = 1 + ab.numFailures = 2 + ab.requestVolume = 3 + cc.inactiveBucket.numSuccesses = 4 + cc.inactiveBucket.numFailures = 5 + cc.inactiveBucket.requestVolume = 9 + + cc.swap() + // Inactive should pick up active's data, active should be cleared. + ccWant := newCallCounter() + ccWant.inactiveBucket.numSuccesses = 1 + ccWant.inactiveBucket.numFailures = 2 + ccWant.inactiveBucket.requestVolume = 3 + if diff := cmp.Diff(cc, ccWant); diff != "" { + t.Fatalf("callCounter is different than expected, diff (-got +want): %v", diff) + } +} diff --git a/xds/internal/balancer/outlierdetection/subconn_wrapper.go b/xds/internal/balancer/outlierdetection/subconn_wrapper.go new file mode 100644 index 000000000000..aef343e7243e --- /dev/null +++ b/xds/internal/balancer/outlierdetection/subconn_wrapper.go @@ -0,0 +1,62 @@ +/* + * + * Copyright 2022 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package outlierdetection + +import ( + "unsafe" + + "google.golang.org/grpc/balancer" + "google.golang.org/grpc/internal/buffer" + "google.golang.org/grpc/resolver" +) + +type subConnWrapper struct { + balancer.SubConn + + // "The subchannel wrappers created by the outlier_detection LB policy will + // hold a reference to its map entry in the LB policy, if that map entry + // exists." - A50 + obj unsafe.Pointer // *object + // These two pieces of state will reach eventual consistency due to sync in + // run(), and child will always have the correctly updated SubConnState. + latestState balancer.SubConnState + ejected bool + + scUpdateCh *buffer.Unbounded + + addresses []resolver.Address +} + +// eject(): "The wrapper will report a state update with the TRANSIENT_FAILURE +// state, and will stop passing along updates from the underlying subchannel." +func (scw *subConnWrapper) eject() { + scw.scUpdateCh.Put(&ejectedUpdate{ + scw: scw, + ejected: true, + }) +} + +// uneject(): "The wrapper will report a state update with the latest update +// from the underlying subchannel, and resume passing along updates from the +// underlying subchannel." +func (scw *subConnWrapper) uneject() { + scw.scUpdateCh.Put(&ejectedUpdate{ + scw: scw, + ejected: false, + }) +}