From f7d2036712ca0e5f26eea358ab806e10081b2a6e Mon Sep 17 00:00:00 2001 From: Zach Reyes <39203661+zasweq@users.noreply.github.com> Date: Tue, 6 Sep 2022 16:30:08 -0400 Subject: [PATCH] xds: add Outlier Detection Balancer (#5435) * xds: add Outlier Detection Balancer --- internal/grpcrand/grpcrand.go | 7 + test/xds/xds_client_outlier_detection_test.go | 219 ++++ .../balancer/outlierdetection/balancer.go | 797 +++++++++++- .../outlierdetection/balancer_test.go | 1067 ++++++++++++++++- .../balancer/outlierdetection/callcounter.go | 66 + .../outlierdetection/callcounter_test.go | 94 ++ .../e2e_test/outlierdetection_test.go | 369 ++++++ .../balancer/outlierdetection/logging.go | 34 + .../outlierdetection/subconn_wrapper.go | 68 ++ 9 files changed, 2711 insertions(+), 10 deletions(-) create mode 100644 test/xds/xds_client_outlier_detection_test.go create mode 100644 xds/internal/balancer/outlierdetection/callcounter.go create mode 100644 xds/internal/balancer/outlierdetection/callcounter_test.go create mode 100644 xds/internal/balancer/outlierdetection/e2e_test/outlierdetection_test.go create mode 100644 xds/internal/balancer/outlierdetection/logging.go create mode 100644 xds/internal/balancer/outlierdetection/subconn_wrapper.go diff --git a/internal/grpcrand/grpcrand.go b/internal/grpcrand/grpcrand.go index 740f83c2b76..517ea70642a 100644 --- a/internal/grpcrand/grpcrand.go +++ b/internal/grpcrand/grpcrand.go @@ -52,6 +52,13 @@ func Intn(n int) int { return r.Intn(n) } +// Int31n implements rand.Int31n on the grpcrand global source. +func Int31n(n int32) int32 { + mu.Lock() + defer mu.Unlock() + return r.Int31n(n) +} + // Float64 implements rand.Float64 on the grpcrand global source. func Float64() float64 { mu.Lock() diff --git a/test/xds/xds_client_outlier_detection_test.go b/test/xds/xds_client_outlier_detection_test.go new file mode 100644 index 00000000000..b53439cf66c --- /dev/null +++ b/test/xds/xds_client_outlier_detection_test.go @@ -0,0 +1,219 @@ +/* + * + * Copyright 2022 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package xds_test + +import ( + "context" + "errors" + "fmt" + "strings" + "testing" + "time" + + v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" + v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3" + v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" + v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/internal" + "google.golang.org/grpc/internal/envconfig" + "google.golang.org/grpc/internal/stubserver" + "google.golang.org/grpc/internal/testutils/xds/e2e" + testgrpc "google.golang.org/grpc/test/grpc_testing" + testpb "google.golang.org/grpc/test/grpc_testing" + "google.golang.org/protobuf/types/known/durationpb" + "google.golang.org/protobuf/types/known/wrapperspb" +) + +// TestOutlierDetection_NoopConfig tests the scenario where the Outlier +// Detection feature is enabled on the gRPC client, but it receives no Outlier +// Detection configuration from the management server. This should result in a +// no-op Outlier Detection configuration being used to configure the Outlier +// Detection balancer. This test verifies that an RPC is able to proceed +// normally with this configuration. +func (s) TestOutlierDetection_NoopConfig(t *testing.T) { + oldOD := envconfig.XDSOutlierDetection + envconfig.XDSOutlierDetection = true + internal.RegisterOutlierDetectionBalancerForTesting() + defer func() { + envconfig.XDSOutlierDetection = oldOD + internal.UnregisterOutlierDetectionBalancerForTesting() + }() + + managementServer, nodeID, _, resolver, cleanup1 := e2e.SetupManagementServer(t, nil) + defer cleanup1() + + port, cleanup2 := startTestService(t, nil) + defer cleanup2() + + const serviceName = "my-service-client-side-xds" + resources := e2e.DefaultClientResources(e2e.ResourceParams{ + DialTarget: serviceName, + NodeID: nodeID, + Host: "localhost", + Port: port, + SecLevel: e2e.SecurityLevelNone, + }) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if err := managementServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + // Create a ClientConn and make a successful RPC. + cc, err := grpc.Dial(fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(resolver)) + if err != nil { + t.Fatalf("failed to dial local test server: %v", err) + } + defer cc.Close() + + client := testgrpc.NewTestServiceClient(cc) + if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { + t.Fatalf("rpc EmptyCall() failed: %v", err) + } +} + +// clientResourcesMultipleBackendsAndOD returns xDS resources which correspond +// to multiple upstreams, corresponding different backends listening on +// different localhost:port combinations. The resources also configure an +// Outlier Detection Balancer set up with Failure Percentage Algorithm, which +// ejects endpoints based on failure rate. +func clientResourcesMultipleBackendsAndOD(params e2e.ResourceParams, ports []uint32) e2e.UpdateOptions { + routeConfigName := "route-" + params.DialTarget + clusterName := "cluster-" + params.DialTarget + endpointsName := "endpoints-" + params.DialTarget + return e2e.UpdateOptions{ + NodeID: params.NodeID, + Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(params.DialTarget, routeConfigName)}, + Routes: []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(routeConfigName, params.DialTarget, clusterName)}, + Clusters: []*v3clusterpb.Cluster{clusterWithOutlierDetection(clusterName, endpointsName, params.SecLevel)}, + Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(endpointsName, params.Host, ports)}, + } +} + +func clusterWithOutlierDetection(clusterName, edsServiceName string, secLevel e2e.SecurityLevel) *v3clusterpb.Cluster { + cluster := e2e.DefaultCluster(clusterName, edsServiceName, secLevel) + cluster.OutlierDetection = &v3clusterpb.OutlierDetection{ + Interval: &durationpb.Duration{Nanos: 50000000}, // .5 seconds + BaseEjectionTime: &durationpb.Duration{Seconds: 30}, + MaxEjectionTime: &durationpb.Duration{Seconds: 300}, + MaxEjectionPercent: &wrapperspb.UInt32Value{Value: 1}, + FailurePercentageThreshold: &wrapperspb.UInt32Value{Value: 50}, + EnforcingFailurePercentage: &wrapperspb.UInt32Value{Value: 100}, + FailurePercentageRequestVolume: &wrapperspb.UInt32Value{Value: 1}, + FailurePercentageMinimumHosts: &wrapperspb.UInt32Value{Value: 1}, + } + return cluster +} + +// TestOutlierDetectionWithOutlier tests the Outlier Detection Balancer e2e. It +// spins up three backends, one which consistently errors, and configures the +// ClientConn using xDS to connect to all three of those backends. The Outlier +// Detection Balancer should eject the connection to the backend which +// constantly errors, and thus RPC's should mainly go to backend 1 and 2. +func (s) TestOutlierDetectionWithOutlier(t *testing.T) { + oldOD := envconfig.XDSOutlierDetection + envconfig.XDSOutlierDetection = true + internal.RegisterOutlierDetectionBalancerForTesting() + defer func() { + envconfig.XDSOutlierDetection = oldOD + internal.UnregisterOutlierDetectionBalancerForTesting() + }() + + managementServer, nodeID, _, resolver, cleanup := e2e.SetupManagementServer(t, nil) + defer cleanup() + + // Counters for how many times backends got called. + var count1, count2, count3 int + + // Working backend 1. + port1, cleanup1 := startTestService(t, &stubserver.StubServer{ + EmptyCallF: func(context.Context, *testpb.Empty) (*testpb.Empty, error) { + count1++ + return &testpb.Empty{}, nil + }, + Address: "localhost:0", + }) + defer cleanup1() + + // Working backend 2. + port2, cleanup2 := startTestService(t, &stubserver.StubServer{ + EmptyCallF: func(context.Context, *testpb.Empty) (*testpb.Empty, error) { + count2++ + return &testpb.Empty{}, nil + }, + Address: "localhost:0", + }) + defer cleanup2() + + // Backend 3 that will always return an error and eventually ejected. + port3, cleanup3 := startTestService(t, &stubserver.StubServer{ + EmptyCallF: func(context.Context, *testpb.Empty) (*testpb.Empty, error) { + count3++ + return nil, errors.New("some error") + }, + Address: "localhost:0", + }) + defer cleanup3() + + const serviceName = "my-service-client-side-xds" + resources := clientResourcesMultipleBackendsAndOD(e2e.ResourceParams{ + DialTarget: serviceName, + NodeID: nodeID, + Host: "localhost", + SecLevel: e2e.SecurityLevelNone, + }, []uint32{port1, port2, port3}) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if err := managementServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + cc, err := grpc.Dial(fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(resolver)) + if err != nil { + t.Fatalf("failed to dial local test server: %v", err) + } + defer cc.Close() + + client := testgrpc.NewTestServiceClient(cc) + for i := 0; i < 2000; i++ { + // Can either error or not depending on the backend called. + if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil && !strings.Contains(err.Error(), "some error") { + t.Fatalf("rpc EmptyCall() failed: %v", err) + } + time.Sleep(time.Millisecond) + } + + // Backend 1 should've gotten more than 1/3rd of the load as backend 3 + // should get ejected, leaving only 1 and 2. + if count1 < 700 { + t.Fatalf("backend 1 should've gotten more than 1/3rd of the load") + } + // Backend 2 should've gotten more than 1/3rd of the load as backend 3 + // should get ejected, leaving only 1 and 2. + if count2 < 700 { + t.Fatalf("backend 2 should've gotten more than 1/3rd of the load") + } + // Backend 3 should've gotten less than 1/3rd of the load since it gets + // ejected. + if count3 > 650 { + t.Fatalf("backend 1 should've gotten more than 1/3rd of the load") + } +} diff --git a/xds/internal/balancer/outlierdetection/balancer.go b/xds/internal/balancer/outlierdetection/balancer.go index 8729461383e..8e54a4a10d5 100644 --- a/xds/internal/balancer/outlierdetection/balancer.go +++ b/xds/internal/balancer/outlierdetection/balancer.go @@ -25,13 +25,31 @@ import ( "encoding/json" "errors" "fmt" + "math" + "sync" + "sync/atomic" + "time" + "unsafe" "google.golang.org/grpc/balancer" + "google.golang.org/grpc/connectivity" "google.golang.org/grpc/internal" + "google.golang.org/grpc/internal/balancer/gracefulswitch" + "google.golang.org/grpc/internal/buffer" "google.golang.org/grpc/internal/envconfig" + "google.golang.org/grpc/internal/grpclog" + "google.golang.org/grpc/internal/grpcrand" + "google.golang.org/grpc/internal/grpcsync" + "google.golang.org/grpc/resolver" "google.golang.org/grpc/serviceconfig" ) +// Globals to stub out in tests. +var ( + afterFunc = time.AfterFunc + now = time.Now +) + // Name is the name of the outlier detection balancer. const Name = "outlier_detection_experimental" @@ -51,7 +69,20 @@ func init() { type bb struct{} func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer { - return nil + b := &outlierDetectionBalancer{ + cc: cc, + closed: grpcsync.NewEvent(), + done: grpcsync.NewEvent(), + addrs: make(map[string]*addressInfo), + scWrappers: make(map[balancer.SubConn]*subConnWrapper), + scUpdateCh: buffer.NewUnbounded(), + pickerUpdateCh: buffer.NewUnbounded(), + } + b.logger = prefixLogger(b) + b.logger.Infof("Created") + b.child = gracefulswitch.NewBalancer(b, bOpts) + go b.run() + return b } func (bb) ParseConfig(s json.RawMessage) (serviceconfig.LoadBalancingConfig, error) { @@ -82,6 +113,7 @@ func (bb) ParseConfig(s json.RawMessage) (serviceconfig.LoadBalancingConfig, err return nil, fmt.Errorf("OutlierDetectionLoadBalancingConfig.base_ejection_time = %s; must be >= 0", lbCfg.BaseEjectionTime) case lbCfg.MaxEjectionTime < 0: return nil, fmt.Errorf("OutlierDetectionLoadBalancingConfig.max_ejection_time = %s; must be >= 0", lbCfg.MaxEjectionTime) + // "The fields max_ejection_percent, // success_rate_ejection.enforcement_percentage, // failure_percentage_ejection.threshold, and @@ -105,3 +137,766 @@ func (bb) ParseConfig(s json.RawMessage) (serviceconfig.LoadBalancingConfig, err func (bb) Name() string { return Name } + +// scUpdate wraps a subConn update to be sent to the child balancer. +type scUpdate struct { + scw *subConnWrapper + state balancer.SubConnState +} + +type ejectionUpdate struct { + scw *subConnWrapper + isEjected bool // true for ejected, false for unejected +} + +type lbCfgUpdate struct { + lbCfg *LBConfig + // to make sure picker is updated synchronously. + done chan struct{} +} + +type outlierDetectionBalancer struct { + // These fields are safe to be accessed without holding any mutex because + // they are synchronized in run(), which makes these field accesses happen + // serially. + // + // childState is the latest balancer state received from the child. + childState balancer.State + // recentPickerNoop represents whether the most recent picker sent upward to + // the balancer.ClientConn is a noop picker, which doesn't count RPC's. Used + // to suppress redundant picker updates. + recentPickerNoop bool + + closed *grpcsync.Event + done *grpcsync.Event + cc balancer.ClientConn + logger *grpclog.PrefixLogger + + // childMu guards calls into child (to uphold the balancer.Balancer API + // guarantee of synchronous calls). + childMu sync.Mutex + child *gracefulswitch.Balancer + + // mu guards access to the following fields. It also helps to synchronize + // behaviors of the following events: config updates, firing of the interval + // timer, SubConn State updates, SubConn address updates, and child state + // updates. + // + // For example, when we receive a config update in the middle of the + // interval timer algorithm, which uses knobs present in the config, the + // balancer will wait for the interval timer algorithm to finish before + // persisting the new configuration. + // + // Another example would be the updating of the addrs map, such as from a + // SubConn address update in the middle of the interval timer algorithm + // which uses addrs. This balancer waits for the interval timer algorithm to + // finish before making the update to the addrs map. + // + // This mutex is never held at the same time as childMu (within the context + // of a single goroutine). + mu sync.Mutex + addrs map[string]*addressInfo + cfg *LBConfig + scWrappers map[balancer.SubConn]*subConnWrapper + timerStartTime time.Time + intervalTimer *time.Timer + inhibitPickerUpdates bool + updateUnconditionally bool + numAddrsEjected int // For fast calculations of percentage of addrs ejected + + scUpdateCh *buffer.Unbounded + pickerUpdateCh *buffer.Unbounded +} + +// noopConfig returns whether this balancer is configured with a logical no-op +// configuration or not. +// +// Caller must hold b.mu. +func (b *outlierDetectionBalancer) noopConfig() bool { + return b.cfg.SuccessRateEjection == nil && b.cfg.FailurePercentageEjection == nil +} + +// onIntervalConfig handles logic required specifically on the receipt of a +// configuration which specifies to count RPC's and periodically perform passive +// health checking based on heuristics defined in configuration every configured +// interval. +// +// Caller must hold b.mu. +func (b *outlierDetectionBalancer) onIntervalConfig() { + var interval time.Duration + if b.timerStartTime.IsZero() { + b.timerStartTime = time.Now() + for _, addrInfo := range b.addrs { + addrInfo.callCounter.clear() + } + interval = b.cfg.Interval + } else { + interval = b.cfg.Interval - now().Sub(b.timerStartTime) + if interval < 0 { + interval = 0 + } + } + b.intervalTimer = afterFunc(interval, b.intervalTimerAlgorithm) +} + +// onNoopConfig handles logic required specifically on the receipt of a +// configuration which specifies the balancer to be a noop. +// +// Caller must hold b.mu. +func (b *outlierDetectionBalancer) onNoopConfig() { + // "If a config is provided with both the `success_rate_ejection` and + // `failure_percentage_ejection` fields unset, skip starting the timer and + // do the following:" + // "Unset the timer start timestamp." + b.timerStartTime = time.Time{} + for _, addrInfo := range b.addrs { + // "Uneject all currently ejected addresses." + if !addrInfo.latestEjectionTimestamp.IsZero() { + b.unejectAddress(addrInfo) + } + // "Reset each address's ejection time multiplier to 0." + addrInfo.ejectionTimeMultiplier = 0 + } +} + +func (b *outlierDetectionBalancer) UpdateClientConnState(s balancer.ClientConnState) error { + lbCfg, ok := s.BalancerConfig.(*LBConfig) + if !ok { + b.logger.Errorf("received config with unexpected type %T: %v", s.BalancerConfig, s.BalancerConfig) + return balancer.ErrBadResolverState + } + + // Reject whole config if child policy doesn't exist, don't persist it for + // later. + bb := balancer.Get(lbCfg.ChildPolicy.Name) + if bb == nil { + return fmt.Errorf("outlier detection: child balancer %q not registered", lbCfg.ChildPolicy.Name) + } + + // It is safe to read b.cfg here without holding the mutex, as the only + // write to b.cfg happens later in this function. This function is part of + // the balancer.Balancer API, so it is guaranteed to be called in a + // synchronous manner, so it cannot race with this read. + if b.cfg == nil || b.cfg.ChildPolicy.Name != lbCfg.ChildPolicy.Name { + b.childMu.Lock() + err := b.child.SwitchTo(bb) + if err != nil { + b.childMu.Unlock() + return fmt.Errorf("outlier detection: error switching to child of type %q: %v", lbCfg.ChildPolicy.Name, err) + } + b.childMu.Unlock() + } + + b.mu.Lock() + // Inhibit child picker updates until this UpdateClientConnState() call + // completes. If needed, a picker update containing the no-op config bit + // determined from this config and most recent state from the child will be + // sent synchronously upward at the end of this UpdateClientConnState() + // call. + b.inhibitPickerUpdates = true + b.updateUnconditionally = false + b.cfg = lbCfg + + addrs := make(map[string]bool, len(s.ResolverState.Addresses)) + for _, addr := range s.ResolverState.Addresses { + addrs[addr.Addr] = true + if _, ok := b.addrs[addr.Addr]; !ok { + b.addrs[addr.Addr] = newAddressInfo() + } + } + for addr := range b.addrs { + if !addrs[addr] { + delete(b.addrs, addr) + } + } + + if b.intervalTimer != nil { + b.intervalTimer.Stop() + } + + if b.noopConfig() { + b.onNoopConfig() + } else { + b.onIntervalConfig() + } + b.mu.Unlock() + + b.childMu.Lock() + err := b.child.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: s.ResolverState, + BalancerConfig: b.cfg.ChildPolicy.Config, + }) + b.childMu.Unlock() + + done := make(chan struct{}) + b.pickerUpdateCh.Put(lbCfgUpdate{ + lbCfg: lbCfg, + done: done, + }) + <-done + + return err +} + +func (b *outlierDetectionBalancer) ResolverError(err error) { + b.childMu.Lock() + defer b.childMu.Unlock() + b.child.ResolverError(err) +} + +func (b *outlierDetectionBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) { + b.mu.Lock() + defer b.mu.Unlock() + scw, ok := b.scWrappers[sc] + if !ok { + // Shouldn't happen if passed down a SubConnWrapper to child on SubConn + // creation. + b.logger.Errorf("UpdateSubConnState called with SubConn that has no corresponding SubConnWrapper") + return + } + if state.ConnectivityState == connectivity.Shutdown { + delete(b.scWrappers, scw.SubConn) + } + b.scUpdateCh.Put(&scUpdate{ + scw: scw, + state: state, + }) +} + +func (b *outlierDetectionBalancer) Close() { + b.closed.Fire() + <-b.done.Done() + b.childMu.Lock() + b.child.Close() + b.childMu.Unlock() + + b.mu.Lock() + defer b.mu.Unlock() + if b.intervalTimer != nil { + b.intervalTimer.Stop() + } +} + +func (b *outlierDetectionBalancer) ExitIdle() { + b.childMu.Lock() + defer b.childMu.Unlock() + b.child.ExitIdle() +} + +// wrappedPicker delegates to the child policy's picker, and when the request +// finishes, it increments the corresponding counter in the map entry referenced +// by the subConnWrapper that was picked. If both the `success_rate_ejection` +// and `failure_percentage_ejection` fields are unset in the configuration, this +// picker will not count. +type wrappedPicker struct { + childPicker balancer.Picker + noopPicker bool +} + +func (wp *wrappedPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { + pr, err := wp.childPicker.Pick(info) + if err != nil { + return balancer.PickResult{}, err + } + + done := func(di balancer.DoneInfo) { + if !wp.noopPicker { + incrementCounter(pr.SubConn, di) + } + if pr.Done != nil { + pr.Done(di) + } + } + scw, ok := pr.SubConn.(*subConnWrapper) + if !ok { + // This can never happen, but check is present for defensive + // programming. + logger.Errorf("Picked SubConn from child picker is not a SubConnWrapper") + return balancer.PickResult{ + SubConn: pr.SubConn, + Done: done, + }, nil + } + return balancer.PickResult{ + SubConn: scw.SubConn, + Done: done, + }, nil +} + +func incrementCounter(sc balancer.SubConn, info balancer.DoneInfo) { + scw, ok := sc.(*subConnWrapper) + if !ok { + // Shouldn't happen, as comes from child + return + } + + // scw.addressInfo and callCounter.activeBucket can be written to + // concurrently (the pointers themselves). Thus, protect the reads here with + // atomics to prevent data corruption. There exists a race in which you read + // the addressInfo or active bucket pointer and then that pointer points to + // deprecated memory. If this goroutine yields the processor, in between + // reading the addressInfo pointer and writing to the active bucket, + // UpdateAddresses can switch the addressInfo the scw points to. Writing to + // an outdated addresses is a very small race and tolerable. After reading + // callCounter.activeBucket in this picker a swap call can concurrently + // change what activeBucket points to. A50 says to swap the pointer, which + // will cause this race to write to deprecated memory the interval timer + // algorithm will never read, which makes this race alright. + addrInfo := (*addressInfo)(atomic.LoadPointer(&scw.addressInfo)) + if addrInfo == nil { + return + } + ab := (*bucket)(atomic.LoadPointer(&addrInfo.callCounter.activeBucket)) + + if info.Err == nil { + atomic.AddUint32(&ab.numSuccesses, 1) + } else { + atomic.AddUint32(&ab.numFailures, 1) + } +} + +func (b *outlierDetectionBalancer) UpdateState(s balancer.State) { + b.pickerUpdateCh.Put(s) +} + +func (b *outlierDetectionBalancer) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) { + sc, err := b.cc.NewSubConn(addrs, opts) + if err != nil { + return nil, err + } + scw := &subConnWrapper{ + SubConn: sc, + addresses: addrs, + scUpdateCh: b.scUpdateCh, + } + b.mu.Lock() + defer b.mu.Unlock() + b.scWrappers[sc] = scw + if len(addrs) != 1 { + return scw, nil + } + addrInfo, ok := b.addrs[addrs[0].Addr] + if !ok { + return scw, nil + } + addrInfo.sws = append(addrInfo.sws, scw) + atomic.StorePointer(&scw.addressInfo, unsafe.Pointer(addrInfo)) + if !addrInfo.latestEjectionTimestamp.IsZero() { + scw.eject() + } + return scw, nil +} + +func (b *outlierDetectionBalancer) RemoveSubConn(sc balancer.SubConn) { + scw, ok := sc.(*subConnWrapper) + if !ok { // Shouldn't happen + return + } + // Remove the wrapped SubConn from the parent Client Conn. We don't remove + // from map entry until we get a Shutdown state for the SubConn, as we need + // that data to forward that state down. + b.cc.RemoveSubConn(scw.SubConn) +} + +// appendIfPresent appends the scw to the address, if the address is present in +// the Outlier Detection balancers address map. Returns nil if not present, and +// the map entry if present. +// +// Caller must hold b.mu. +func (b *outlierDetectionBalancer) appendIfPresent(addr string, scw *subConnWrapper) *addressInfo { + addrInfo, ok := b.addrs[addr] + if !ok { + return nil + } + + addrInfo.sws = append(addrInfo.sws, scw) + atomic.StorePointer(&scw.addressInfo, unsafe.Pointer(addrInfo)) + return addrInfo +} + +// removeSubConnFromAddressesMapEntry removes the scw from its map entry if +// present. +// +// Caller must hold b.mu. +func (b *outlierDetectionBalancer) removeSubConnFromAddressesMapEntry(scw *subConnWrapper) { + addrInfo := (*addressInfo)(atomic.LoadPointer(&scw.addressInfo)) + if addrInfo == nil { + return + } + for i, sw := range addrInfo.sws { + if scw == sw { + addrInfo.sws = append(addrInfo.sws[:i], addrInfo.sws[i+1:]...) + return + } + } +} + +func (b *outlierDetectionBalancer) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) { + scw, ok := sc.(*subConnWrapper) + if !ok { + // Return, shouldn't happen if passed up scw + return + } + + b.cc.UpdateAddresses(scw.SubConn, addrs) + b.mu.Lock() + defer b.mu.Unlock() + + // Note that 0 addresses is a valid update/state for a SubConn to be in. + // This is correctly handled by this algorithm (handled as part of a non singular + // old address/new address). + switch { + case len(scw.addresses) == 1 && len(addrs) == 1: // single address to single address + // If the updated address is the same, then there is nothing to do + // past this point. + if scw.addresses[0].Addr == addrs[0].Addr { + return + } + b.removeSubConnFromAddressesMapEntry(scw) + addrInfo := b.appendIfPresent(addrs[0].Addr, scw) + if addrInfo == nil { // uneject unconditionally because could have come from an ejected address + scw.uneject() + break + } + if addrInfo.latestEjectionTimestamp.IsZero() { // relay new updated subconn state + scw.uneject() + } else { + scw.eject() + } + case len(scw.addresses) == 1: // single address to multiple/no addresses + b.removeSubConnFromAddressesMapEntry(scw) + addrInfo := (*addressInfo)(atomic.LoadPointer(&scw.addressInfo)) + if addrInfo != nil { + addrInfo.callCounter.clear() + } + scw.uneject() + case len(addrs) == 1: // multiple/no addresses to single address + addrInfo := b.appendIfPresent(addrs[0].Addr, scw) + if addrInfo != nil && !addrInfo.latestEjectionTimestamp.IsZero() { + scw.eject() + } + } // otherwise multiple/no addresses to multiple/no addresses; ignore + + scw.addresses = addrs +} + +func (b *outlierDetectionBalancer) ResolveNow(opts resolver.ResolveNowOptions) { + b.cc.ResolveNow(opts) +} + +func (b *outlierDetectionBalancer) Target() string { + return b.cc.Target() +} + +func max(x, y int64) int64 { + if x < y { + return y + } + return x +} + +func min(x, y int64) int64 { + if x < y { + return x + } + return y +} + +// handleSubConnUpdate stores the recent state and forward the update +// if the SubConn is not ejected. +func (b *outlierDetectionBalancer) handleSubConnUpdate(u *scUpdate) { + scw := u.scw + scw.latestState = u.state + if !scw.ejected { + b.childMu.Lock() + b.child.UpdateSubConnState(scw, u.state) + b.childMu.Unlock() + } +} + +// handleEjectedUpdate handles any SubConns that get ejected/unejected, and +// forwards the appropriate corresponding subConnState to the child policy. +func (b *outlierDetectionBalancer) handleEjectedUpdate(u *ejectionUpdate) { + scw := u.scw + scw.ejected = u.isEjected + // If scw.latestState has never been written to will default to connectivity + // IDLE, which is fine. + stateToUpdate := scw.latestState + if u.isEjected { + stateToUpdate = balancer.SubConnState{ + ConnectivityState: connectivity.TransientFailure, + } + } + b.childMu.Lock() + b.child.UpdateSubConnState(scw, stateToUpdate) + b.childMu.Unlock() +} + +// handleChildStateUpdate forwards the picker update wrapped in a wrapped picker +// with the noop picker bit present. +func (b *outlierDetectionBalancer) handleChildStateUpdate(u balancer.State) { + b.childState = u + b.mu.Lock() + if b.inhibitPickerUpdates { + // If a child's state is updated during the suppression of child + // updates, the synchronous handleLBConfigUpdate function with respect + // to UpdateClientConnState should return a picker unconditionally. + b.updateUnconditionally = true + b.mu.Unlock() + return + } + noopCfg := b.noopConfig() + b.mu.Unlock() + b.recentPickerNoop = noopCfg + b.cc.UpdateState(balancer.State{ + ConnectivityState: b.childState.ConnectivityState, + Picker: &wrappedPicker{ + childPicker: b.childState.Picker, + noopPicker: noopCfg, + }, + }) +} + +// handleLBConfigUpdate compares whether the new config is a noop config or not, +// to the noop bit in the picker if present. It updates the picker if this bit +// changed compared to the picker currently in use. +func (b *outlierDetectionBalancer) handleLBConfigUpdate(u lbCfgUpdate) { + lbCfg := u.lbCfg + noopCfg := lbCfg.SuccessRateEjection == nil && lbCfg.FailurePercentageEjection == nil + // If the child has sent it's first update and this config flips the noop + // bit compared to the most recent picker update sent upward, then a new + // picker with this updated bit needs to be forwarded upward. If a child + // update was received during the suppression of child updates within + // UpdateClientConnState(), then a new picker needs to be forwarded with + // this updated state, irregardless of whether this new configuration flips + // the bit. + if b.childState.Picker != nil && noopCfg != b.recentPickerNoop || b.updateUnconditionally { + b.recentPickerNoop = noopCfg + b.cc.UpdateState(balancer.State{ + ConnectivityState: b.childState.ConnectivityState, + Picker: &wrappedPicker{ + childPicker: b.childState.Picker, + noopPicker: noopCfg, + }, + }) + } + b.inhibitPickerUpdates = false + b.updateUnconditionally = false + close(u.done) +} + +func (b *outlierDetectionBalancer) run() { + defer b.done.Fire() + for { + select { + case update := <-b.scUpdateCh.Get(): + b.scUpdateCh.Load() + if b.closed.HasFired() { // don't send SubConn updates to child after the balancer has been closed + return + } + switch u := update.(type) { + case *scUpdate: + b.handleSubConnUpdate(u) + case *ejectionUpdate: + b.handleEjectedUpdate(u) + } + case update := <-b.pickerUpdateCh.Get(): + b.pickerUpdateCh.Load() + if b.closed.HasFired() { // don't send picker updates to grpc after the balancer has been closed + return + } + switch u := update.(type) { + case balancer.State: + b.handleChildStateUpdate(u) + case lbCfgUpdate: + b.handleLBConfigUpdate(u) + } + case <-b.closed.Done(): + return + } + } +} + +// intervalTimerAlgorithm ejects and unejects addresses based on the Outlier +// Detection configuration and data about each address from the previous +// interval. +func (b *outlierDetectionBalancer) intervalTimerAlgorithm() { + b.mu.Lock() + defer b.mu.Unlock() + b.timerStartTime = time.Now() + + for _, addrInfo := range b.addrs { + addrInfo.callCounter.swap() + } + + if b.cfg.SuccessRateEjection != nil { + b.successRateAlgorithm() + } + + if b.cfg.FailurePercentageEjection != nil { + b.failurePercentageAlgorithm() + } + + for _, addrInfo := range b.addrs { + if addrInfo.latestEjectionTimestamp.IsZero() && addrInfo.ejectionTimeMultiplier > 0 { + addrInfo.ejectionTimeMultiplier-- + continue + } + if addrInfo.latestEjectionTimestamp.IsZero() { + // Address is already not ejected, so no need to check for whether + // to uneject the address below. + continue + } + et := b.cfg.BaseEjectionTime.Nanoseconds() * addrInfo.ejectionTimeMultiplier + met := max(b.cfg.BaseEjectionTime.Nanoseconds(), b.cfg.MaxEjectionTime.Nanoseconds()) + curTimeAfterEt := now().After(addrInfo.latestEjectionTimestamp.Add(time.Duration(min(et, met)))) + if curTimeAfterEt { + b.unejectAddress(addrInfo) + } + } + + // This conditional only for testing (since the interval timer algorithm is + // called manually), will never hit in production. + if b.intervalTimer != nil { + b.intervalTimer.Stop() + } + b.intervalTimer = afterFunc(b.cfg.Interval, b.intervalTimerAlgorithm) +} + +// addrsWithAtLeastRequestVolume returns a slice of address information of all +// addresses with at least request volume passed in. +// +// Caller must hold b.mu. +func (b *outlierDetectionBalancer) addrsWithAtLeastRequestVolume(requestVolume uint32) []*addressInfo { + var addrs []*addressInfo + for _, addrInfo := range b.addrs { + bucket := addrInfo.callCounter.inactiveBucket + rv := bucket.numSuccesses + bucket.numFailures + if rv >= requestVolume { + addrs = append(addrs, addrInfo) + } + } + return addrs +} + +// meanAndStdDev returns the mean and std dev of the fractions of successful +// requests of the addresses passed in. +// +// Caller must hold b.mu. +func (b *outlierDetectionBalancer) meanAndStdDev(addrs []*addressInfo) (float64, float64) { + var totalFractionOfSuccessfulRequests float64 + var mean float64 + for _, addrInfo := range addrs { + bucket := addrInfo.callCounter.inactiveBucket + rv := bucket.numSuccesses + bucket.numFailures + totalFractionOfSuccessfulRequests += float64(bucket.numSuccesses) / float64(rv) + } + mean = totalFractionOfSuccessfulRequests / float64(len(addrs)) + var sumOfSquares float64 + for _, addrInfo := range addrs { + bucket := addrInfo.callCounter.inactiveBucket + rv := bucket.numSuccesses + bucket.numFailures + devFromMean := (float64(bucket.numSuccesses) / float64(rv)) - mean + sumOfSquares += devFromMean * devFromMean + } + variance := sumOfSquares / float64(len(addrs)) + return mean, math.Sqrt(variance) +} + +// successRateAlgorithm ejects any addresses where the success rate falls below +// the other addresses according to mean and standard deviation, and if overall +// applicable from other set heuristics. +// +// Caller must hold b.mu. +func (b *outlierDetectionBalancer) successRateAlgorithm() { + addrsToConsider := b.addrsWithAtLeastRequestVolume(b.cfg.SuccessRateEjection.RequestVolume) + if len(addrsToConsider) < int(b.cfg.SuccessRateEjection.MinimumHosts) { + return + } + mean, stddev := b.meanAndStdDev(addrsToConsider) + for _, addrInfo := range addrsToConsider { + bucket := addrInfo.callCounter.inactiveBucket + ejectionCfg := b.cfg.SuccessRateEjection + if float64(b.numAddrsEjected)/float64(len(b.addrs))*100 >= float64(b.cfg.MaxEjectionPercent) { + return + } + successRate := float64(bucket.numSuccesses) / float64(bucket.numSuccesses+bucket.numFailures) + if successRate < (mean - stddev*(float64(ejectionCfg.StdevFactor)/1000)) { + if uint32(grpcrand.Int31n(100)) < ejectionCfg.EnforcementPercentage { + b.ejectAddress(addrInfo) + } + } + } +} + +// failurePercentageAlgorithm ejects any addresses where the failure percentage +// rate exceeds a set enforcement percentage, if overall applicable from other +// set heuristics. +// +// Caller must hold b.mu. +func (b *outlierDetectionBalancer) failurePercentageAlgorithm() { + addrsToConsider := b.addrsWithAtLeastRequestVolume(b.cfg.FailurePercentageEjection.RequestVolume) + if len(addrsToConsider) < int(b.cfg.FailurePercentageEjection.MinimumHosts) { + return + } + + for _, addrInfo := range addrsToConsider { + bucket := addrInfo.callCounter.inactiveBucket + ejectionCfg := b.cfg.FailurePercentageEjection + if float64(b.numAddrsEjected)/float64(len(b.addrs))*100 >= float64(b.cfg.MaxEjectionPercent) { + return + } + failurePercentage := (float64(bucket.numFailures) / float64(bucket.numSuccesses+bucket.numFailures)) * 100 + if failurePercentage > float64(b.cfg.FailurePercentageEjection.Threshold) { + if uint32(grpcrand.Int31n(100)) < ejectionCfg.EnforcementPercentage { + b.ejectAddress(addrInfo) + } + } + } +} + +// Caller must hold b.mu. +func (b *outlierDetectionBalancer) ejectAddress(addrInfo *addressInfo) { + b.numAddrsEjected++ + addrInfo.latestEjectionTimestamp = b.timerStartTime + addrInfo.ejectionTimeMultiplier++ + for _, sbw := range addrInfo.sws { + sbw.eject() + } +} + +// Caller must hold b.mu. +func (b *outlierDetectionBalancer) unejectAddress(addrInfo *addressInfo) { + b.numAddrsEjected-- + addrInfo.latestEjectionTimestamp = time.Time{} + for _, sbw := range addrInfo.sws { + sbw.uneject() + } +} + +// addressInfo contains the runtime information about an address that pertains +// to Outlier Detection. This struct and all of its fields is protected by +// outlierDetectionBalancer.mu in the case where it is accessed through the +// address map. In the case of Picker callbacks, the writes to the activeBucket +// of callCounter are protected by atomically loading and storing +// unsafe.Pointers (see further explanation in incrementCounter()). +type addressInfo struct { + // The call result counter object. + callCounter *callCounter + + // The latest ejection timestamp, or zero if the address is currently not + // ejected. + latestEjectionTimestamp time.Time + + // The current ejection time multiplier, starting at 0. + ejectionTimeMultiplier int64 + + // A list of subchannel wrapper objects that correspond to this address. + sws []*subConnWrapper +} + +func newAddressInfo() *addressInfo { + return &addressInfo{ + callCounter: newCallCounter(), + } +} diff --git a/xds/internal/balancer/outlierdetection/balancer_test.go b/xds/internal/balancer/outlierdetection/balancer_test.go index 106e2b64dbc..151684ac7fd 100644 --- a/xds/internal/balancer/outlierdetection/balancer_test.go +++ b/xds/internal/balancer/outlierdetection/balancer_test.go @@ -19,20 +19,36 @@ package outlierdetection import ( + "context" "encoding/json" "errors" + "fmt" + "math" "strings" + "sync" "testing" "time" "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" "google.golang.org/grpc/balancer" + "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/internal" + "google.golang.org/grpc/internal/balancer/stub" + "google.golang.org/grpc/internal/grpcsync" "google.golang.org/grpc/internal/grpctest" internalserviceconfig "google.golang.org/grpc/internal/serviceconfig" + "google.golang.org/grpc/internal/testutils" + "google.golang.org/grpc/resolver" "google.golang.org/grpc/serviceconfig" "google.golang.org/grpc/xds/internal/balancer/clusterimpl" ) +var ( + defaultTestTimeout = 5 * time.Second + defaultTestShortTimeout = 10 * time.Millisecond +) + type s struct { grpctest.Tester } @@ -44,6 +60,13 @@ func Test(t *testing.T) { // TestParseConfig verifies the ParseConfig() method in the Outlier Detection // Balancer. func (s) TestParseConfig(t *testing.T) { + const errParseConfigName = "errParseConfigBalancer" + stub.Register(errParseConfigName, stub.BalancerFuncs{ + ParseConfig: func(json.RawMessage) (serviceconfig.LoadBalancingConfig, error) { + return nil, errors.New("some error") + }, + }) + parser := bb{} tests := []struct { @@ -65,7 +88,7 @@ func (s) TestParseConfig(t *testing.T) { ] }`, wantCfg: &LBConfig{ - Interval: 1<<63 - 1, + Interval: math.MaxInt64, ChildPolicy: &internalserviceconfig.BalancerConfig{ Name: "xds_cluster_impl_experimental", Config: &clusterimpl.LBConfig{ @@ -270,20 +293,1046 @@ func (lbc *LBConfig) Equal(lbc2 *LBConfig) bool { return cmp.Equal(lbc.ChildPolicy, lbc2.ChildPolicy) } -func init() { - balancer.Register(errParseConfigBuilder{}) +type subConnWithState struct { + sc balancer.SubConn + state balancer.SubConnState +} + +func setup(t *testing.T) (*outlierDetectionBalancer, *testutils.TestClientConn, func()) { + t.Helper() + internal.RegisterOutlierDetectionBalancerForTesting() + builder := balancer.Get(Name) + if builder == nil { + t.Fatalf("balancer.Get(%q) returned nil", Name) + } + tcc := testutils.NewTestClientConn(t) + odB := builder.Build(tcc, balancer.BuildOptions{}) + return odB.(*outlierDetectionBalancer), tcc, func() { + odB.Close() + internal.UnregisterOutlierDetectionBalancerForTesting() + } +} + +type emptyChildConfig struct { + serviceconfig.LoadBalancingConfig } -type errParseConfigBuilder struct{} +// TestChildBasicOperations tests basic operations of the Outlier Detection +// Balancer and it's interaction with it's child. The following scenarios are +// tested, in a step by step fashion: +// 1. The Outlier Detection Balancer receives it's first good configuration. The +// balancer is expected to create a child and sent the child it's configuration. +// 2. The Outlier Detection Balancer receives new configuration that specifies a +// child's type, and the new type immediately reports READY inline. The first +// child balancer should be closed and the second child balancer should receive +// a config update. +// 3. The Outlier Detection Balancer is closed. The second child balancer should +// be closed. +func (s) TestChildBasicOperations(t *testing.T) { + bc := emptyChildConfig{} + + ccsCh := testutils.NewChannel() + closeCh := testutils.NewChannel() + + stub.Register(t.Name()+"child1", stub.BalancerFuncs{ + UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error { + ccsCh.Send(ccs.BalancerConfig) + return nil + }, + Close: func(bd *stub.BalancerData) { + closeCh.Send(nil) + }, + }) + + stub.Register(t.Name()+"child2", stub.BalancerFuncs{ + UpdateClientConnState: func(bd *stub.BalancerData, _ balancer.ClientConnState) error { + // UpdateState inline to READY to complete graceful switch process + // synchronously from any UpdateClientConnState call. + bd.ClientConn.UpdateState(balancer.State{ + ConnectivityState: connectivity.Ready, + Picker: &testutils.TestConstPicker{}, + }) + ccsCh.Send(nil) + return nil + }, + Close: func(bd *stub.BalancerData) { + closeCh.Send(nil) + }, + }) + + od, tcc, _ := setup(t) + defer internal.UnregisterOutlierDetectionBalancerForTesting() + + // This first config update should cause a child to be built and forwarded + // it's first update. + od.UpdateClientConnState(balancer.ClientConnState{ + BalancerConfig: &LBConfig{ + Interval: math.MaxInt64, + ChildPolicy: &internalserviceconfig.BalancerConfig{ + Name: t.Name() + "child1", + Config: bc, + }, + }, + }) + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + cr, err := ccsCh.Receive(ctx) + if err != nil { + t.Fatalf("timed out waiting for UpdateClientConnState on the first child balancer: %v", err) + } + if _, ok := cr.(emptyChildConfig); !ok { + t.Fatalf("Received child policy config of type %T, want %T", cr, emptyChildConfig{}) + } + + // This Update Client Conn State call should cause the first child balancer + // to close, and a new child to be created and also forwarded it's first + // config update. + od.UpdateClientConnState(balancer.ClientConnState{ + BalancerConfig: &LBConfig{ + Interval: math.MaxInt64, + ChildPolicy: &internalserviceconfig.BalancerConfig{ + Name: t.Name() + "child2", + Config: emptyChildConfig{}, + }, + }, + }) -func (errParseConfigBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer { + // Verify inline UpdateState() call from the new child eventually makes it's + // way to the Test Client Conn. + select { + case <-ctx.Done(): + t.Fatalf("timeout while waiting for a UpdateState call on the ClientConn") + case state := <-tcc.NewStateCh: + if state != connectivity.Ready { + t.Fatalf("ClientConn received connectivity state %v, want %v", state, connectivity.Ready) + } + } + + // Verify the first child balancer closed. + if _, err = closeCh.Receive(ctx); err != nil { + t.Fatalf("timed out waiting for the first child balancer to be closed: %v", err) + } + // Verify the second child balancer received it's first config update. + if _, err = ccsCh.Receive(ctx); err != nil { + t.Fatalf("timed out waiting for UpdateClientConnState on the second child balancer: %v", err) + } + // Closing the Outlier Detection Balancer should close the newly created + // child. + od.Close() + if _, err = closeCh.Receive(ctx); err != nil { + t.Fatalf("timed out waiting for the second child balancer to be closed: %v", err) + } +} + +// TestUpdateAddresses tests the functionality of UpdateAddresses and any +// changes in the addresses/plurality of those addresses for a SubConn. The +// Balancer is set up with two upstreams, with one of the upstreams being +// ejected. Initially, there is one SubConn for each address. The following +// scenarios are tested, in a step by step fashion: +// 1. The SubConn not currently ejected switches addresses to the address that +// is ejected. This should cause the SubConn to get ejected. +// 2. Update this same SubConn to multiple addresses. This should cause the +// SubConn to get unejected, as it is no longer being tracked by Outlier +// Detection at that point. +// 3. Update this same SubConn to different addresses, still multiple. This +// should be a noop, as the SubConn is still no longer being tracked by Outlier +// Detection. +// 4. Update this same SubConn to the a single address which is ejected. This +// should cause the SubConn to be ejected. +func (s) TestUpdateAddresses(t *testing.T) { + scsCh := testutils.NewChannel() + var scw1, scw2 balancer.SubConn + var err error + stub.Register(t.Name(), stub.BalancerFuncs{ + UpdateClientConnState: func(bd *stub.BalancerData, _ balancer.ClientConnState) error { + scw1, err = bd.ClientConn.NewSubConn([]resolver.Address{{Addr: "address1"}}, balancer.NewSubConnOptions{}) + if err != nil { + t.Errorf("error in od.NewSubConn call: %v", err) + } + scw2, err = bd.ClientConn.NewSubConn([]resolver.Address{{Addr: "address2"}}, balancer.NewSubConnOptions{}) + if err != nil { + t.Errorf("error in od.NewSubConn call: %v", err) + } + bd.ClientConn.UpdateState(balancer.State{ + ConnectivityState: connectivity.Ready, + Picker: &rrPicker{ + scs: []balancer.SubConn{scw1, scw2}, + }, + }) + return nil + }, + UpdateSubConnState: func(_ *stub.BalancerData, sc balancer.SubConn, state balancer.SubConnState) { + scsCh.Send(subConnWithState{ + sc: sc, + state: state, + }) + }}) + + od, tcc, cleanup := setup(t) + defer cleanup() + + od.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: resolver.State{ + Addresses: []resolver.Address{ + {Addr: "address1"}, + {Addr: "address2"}, + }, + }, + BalancerConfig: &LBConfig{ + Interval: 10 * time.Second, + BaseEjectionTime: 30 * time.Second, + MaxEjectionTime: 300 * time.Second, + MaxEjectionPercent: 10, + FailurePercentageEjection: &FailurePercentageEjection{ + Threshold: 50, + EnforcementPercentage: 100, + MinimumHosts: 2, + RequestVolume: 3, + }, + ChildPolicy: &internalserviceconfig.BalancerConfig{ + Name: t.Name(), + Config: emptyChildConfig{}, + }, + }, + }) + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + // Setup the system to where one address is ejected and one address + // isn't. + select { + case <-ctx.Done(): + t.Fatal("timeout while waiting for a UpdateState call on the ClientConn") + case picker := <-tcc.NewPickerCh: + pi, err := picker.Pick(balancer.PickInfo{}) + if err != nil { + t.Fatalf("picker.Pick failed with error: %v", err) + } + // Simulate 5 successful RPC calls on the first SubConn (the first call + // to picker.Pick). + for c := 0; c < 5; c++ { + pi.Done(balancer.DoneInfo{}) + } + pi, err = picker.Pick(balancer.PickInfo{}) + if err != nil { + t.Fatalf("picker.Pick failed with error: %v", err) + } + // Simulate 5 failed RPC calls on the second SubConn (the second call to + // picker.Pick). Thus, when the interval timer algorithm is run, the + // second SubConn's address should be ejected, which will allow us to + // further test UpdateAddresses() logic. + for c := 0; c < 5; c++ { + pi.Done(balancer.DoneInfo{Err: errors.New("some error")}) + } + od.intervalTimerAlgorithm() + // verify UpdateSubConnState() got called with TRANSIENT_FAILURE for + // child with address that was ejected. + gotSCWS, err := scsCh.Receive(ctx) + if err != nil { + t.Fatalf("Error waiting for Sub Conn update: %v", err) + } + if err = scwsEqual(gotSCWS.(subConnWithState), subConnWithState{ + sc: scw2, + state: balancer.SubConnState{ConnectivityState: connectivity.TransientFailure}, + }); err != nil { + t.Fatalf("Error in Sub Conn update: %v", err) + } + } + + // Update scw1 to another address that is currently ejected. This should + // cause scw1 to get ejected. + od.UpdateAddresses(scw1, []resolver.Address{{Addr: "address2"}}) + + // Verify that update addresses gets forwarded to ClientConn. + select { + case <-ctx.Done(): + t.Fatal("timeout while waiting for a UpdateState call on the ClientConn") + case <-tcc.UpdateAddressesAddrsCh: + } + // Verify scw1 got ejected (UpdateSubConnState called with TRANSIENT + // FAILURE). + gotSCWS, err := scsCh.Receive(ctx) + if err != nil { + t.Fatalf("Error waiting for Sub Conn update: %v", err) + } + if err = scwsEqual(gotSCWS.(subConnWithState), subConnWithState{ + sc: scw1, + state: balancer.SubConnState{ConnectivityState: connectivity.TransientFailure}, + }); err != nil { + t.Fatalf("Error in Sub Conn update: %v", err) + } + + // Update scw1 to multiple addresses. This should cause scw1 to get + // unejected, as is it no longer being tracked for Outlier Detection. + od.UpdateAddresses(scw1, []resolver.Address{ + {Addr: "address1"}, + {Addr: "address2"}, + }) + // Verify scw1 got unejected (UpdateSubConnState called with recent state). + gotSCWS, err = scsCh.Receive(ctx) + if err != nil { + t.Fatalf("Error waiting for Sub Conn update: %v", err) + } + if err = scwsEqual(gotSCWS.(subConnWithState), subConnWithState{ + sc: scw1, + state: balancer.SubConnState{ConnectivityState: connectivity.Idle}, + }); err != nil { + t.Fatalf("Error in Sub Conn update: %v", err) + } + + // Update scw1 to a different multiple addresses list. A change of addresses + // in which the plurality goes from multiple to multiple should be a no-op, + // as the address continues to be ignored by outlier detection. + od.UpdateAddresses(scw1, []resolver.Address{ + {Addr: "address2"}, + {Addr: "address3"}, + }) + // Verify no downstream effects. + sCtx, cancel := context.WithTimeout(context.Background(), defaultTestShortTimeout) + defer cancel() + if _, err := scsCh.Receive(sCtx); err == nil { + t.Fatalf("no SubConn update should have been sent (no SubConn got ejected/unejected)") + } + + // Update scw1 back to a single address, which is ejected. This should cause + // the SubConn to be re-ejected. + od.UpdateAddresses(scw1, []resolver.Address{{Addr: "address2"}}) + // Verify scw1 got ejected (UpdateSubConnState called with TRANSIENT FAILURE). + gotSCWS, err = scsCh.Receive(ctx) + if err != nil { + t.Fatalf("Error waiting for Sub Conn update: %v", err) + } + if err = scwsEqual(gotSCWS.(subConnWithState), subConnWithState{ + sc: scw1, + state: balancer.SubConnState{ConnectivityState: connectivity.TransientFailure}, + }); err != nil { + t.Fatalf("Error in Sub Conn update: %v", err) + } +} + +func scwsEqual(gotSCWS subConnWithState, wantSCWS subConnWithState) error { + if !cmp.Equal(gotSCWS, wantSCWS, cmp.AllowUnexported(subConnWithState{}, testutils.TestSubConn{}, subConnWrapper{}, addressInfo{}), cmpopts.IgnoreFields(subConnWrapper{}, "scUpdateCh")) { + return fmt.Errorf("received SubConnState: %+v, want %+v", gotSCWS, wantSCWS) + } return nil } -func (errParseConfigBuilder) Name() string { - return "errParseConfigBalancer" +type rrPicker struct { + scs []balancer.SubConn + next int +} + +func (rrp *rrPicker) Pick(balancer.PickInfo) (balancer.PickResult, error) { + sc := rrp.scs[rrp.next] + rrp.next = (rrp.next + 1) % len(rrp.scs) + return balancer.PickResult{SubConn: sc}, nil +} + +// TestDurationOfInterval tests the configured interval timer. +// The following scenarios are tested: +// 1. The Outlier Detection Balancer receives it's first config. The balancer +// should configure the timer with whatever is directly specified on the config. +// 2. The Outlier Detection Balancer receives a subsequent config. The balancer +// should configure with whatever interval is configured minus the difference +// between the current time and the previous start timestamp. +// 3. The Outlier Detection Balancer receives a no-op configuration. The +// balancer should not configure a timer at all. +func (s) TestDurationOfInterval(t *testing.T) { + stub.Register(t.Name(), stub.BalancerFuncs{}) + + od, _, cleanup := setup(t) + defer func(af func(d time.Duration, f func()) *time.Timer) { + cleanup() + afterFunc = af + }(afterFunc) + + durationChan := testutils.NewChannel() + afterFunc = func(dur time.Duration, _ func()) *time.Timer { + durationChan.Send(dur) + return time.NewTimer(math.MaxInt64) + } + + od.UpdateClientConnState(balancer.ClientConnState{ + BalancerConfig: &LBConfig{ + Interval: 8 * time.Second, + SuccessRateEjection: &SuccessRateEjection{ + StdevFactor: 1900, + EnforcementPercentage: 100, + MinimumHosts: 5, + RequestVolume: 100, + }, + ChildPolicy: &internalserviceconfig.BalancerConfig{ + Name: t.Name(), + Config: emptyChildConfig{}, + }, + }, + }) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + d, err := durationChan.Receive(ctx) + if err != nil { + t.Fatalf("Error receiving duration from afterFunc() call: %v", err) + } + dur := d.(time.Duration) + // The configured duration should be 8 seconds - what the balancer was + // configured with. + if dur != 8*time.Second { + t.Fatalf("configured duration should have been 8 seconds to start timer") + } + + // Override time.Now to time.Now() + 5 seconds. This will represent 5 + // seconds already passing for the next check in UpdateClientConnState. + defer func(n func() time.Time) { + now = n + }(now) + now = func() time.Time { + return time.Now().Add(time.Second * 5) + } + + // UpdateClientConnState with an interval of 9 seconds. Due to 5 seconds + // already passing (from overridden time.Now function), this should start an + // interval timer of ~4 seconds. + od.UpdateClientConnState(balancer.ClientConnState{ + BalancerConfig: &LBConfig{ + Interval: 9 * time.Second, + SuccessRateEjection: &SuccessRateEjection{ + StdevFactor: 1900, + EnforcementPercentage: 100, + MinimumHosts: 5, + RequestVolume: 100, + }, + ChildPolicy: &internalserviceconfig.BalancerConfig{ + Name: t.Name(), + Config: emptyChildConfig{}, + }, + }, + }) + + d, err = durationChan.Receive(ctx) + if err != nil { + t.Fatalf("Error receiving duration from afterFunc() call: %v", err) + } + dur = d.(time.Duration) + if dur.Seconds() < 3.5 || 4.5 < dur.Seconds() { + t.Fatalf("configured duration should have been around 4 seconds to start timer") + } + + // UpdateClientConnState with a no-op config. This shouldn't configure the + // interval timer at all due to it being a no-op. + od.UpdateClientConnState(balancer.ClientConnState{ + BalancerConfig: &LBConfig{ + Interval: 10 * time.Second, + ChildPolicy: &internalserviceconfig.BalancerConfig{ + Name: t.Name(), + Config: emptyChildConfig{}, + }, + }, + }) + + ctx, cancel = context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + // No timer should have been started. + sCtx, cancel := context.WithTimeout(context.Background(), defaultTestShortTimeout) + defer cancel() + if _, err = durationChan.Receive(sCtx); err == nil { + t.Fatal("No timer should have started.") + } +} + +// TestEjectUnejectSuccessRate tests the functionality of the interval timer +// algorithm when configured with SuccessRateEjection. The Outlier Detection +// Balancer will be set up with 3 SubConns, each with a different address. +// It tests the following scenarios, in a step by step fashion: +// 1. The three addresses each have 5 successes. The interval timer algorithm should +// not eject any of the addresses. +// 2. Two of the addresses have 5 successes, the third has five failures. The +// interval timer algorithm should eject the third address with five failures. +// 3. The interval timer algorithm is run at a later time past max ejection +// time. The interval timer algorithm should uneject the third address. +func (s) TestEjectUnejectSuccessRate(t *testing.T) { + scsCh := testutils.NewChannel() + var scw1, scw2, scw3 balancer.SubConn + var err error + stub.Register(t.Name(), stub.BalancerFuncs{ + UpdateClientConnState: func(bd *stub.BalancerData, _ balancer.ClientConnState) error { + scw1, err = bd.ClientConn.NewSubConn([]resolver.Address{{Addr: "address1"}}, balancer.NewSubConnOptions{}) + if err != nil { + t.Errorf("error in od.NewSubConn call: %v", err) + } + scw2, err = bd.ClientConn.NewSubConn([]resolver.Address{{Addr: "address2"}}, balancer.NewSubConnOptions{}) + if err != nil { + t.Errorf("error in od.NewSubConn call: %v", err) + } + scw3, err = bd.ClientConn.NewSubConn([]resolver.Address{{Addr: "address3"}}, balancer.NewSubConnOptions{}) + if err != nil { + t.Errorf("error in od.NewSubConn call: %v", err) + } + bd.ClientConn.UpdateState(balancer.State{ + ConnectivityState: connectivity.Ready, + Picker: &rrPicker{ + scs: []balancer.SubConn{scw1, scw2, scw3}, + }, + }) + return nil + }, + UpdateSubConnState: func(_ *stub.BalancerData, sc balancer.SubConn, state balancer.SubConnState) { + scsCh.Send(subConnWithState{ + sc: sc, + state: state, + }) + }, + }) + + od, tcc, cleanup := setup(t) + defer func() { + cleanup() + }() + + od.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: resolver.State{ + Addresses: []resolver.Address{ + {Addr: "address1"}, + {Addr: "address2"}, + {Addr: "address3"}, + }, + }, + BalancerConfig: &LBConfig{ + Interval: math.MaxInt64, // so the interval will never run unless called manually in test. + BaseEjectionTime: 30 * time.Second, + MaxEjectionTime: 300 * time.Second, + MaxEjectionPercent: 10, + FailurePercentageEjection: &FailurePercentageEjection{ + Threshold: 50, + EnforcementPercentage: 100, + MinimumHosts: 3, + RequestVolume: 3, + }, + ChildPolicy: &internalserviceconfig.BalancerConfig{ + Name: t.Name(), + Config: emptyChildConfig{}, + }, + }, + }) + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + select { + case <-ctx.Done(): + t.Fatalf("timeout while waiting for a UpdateState call on the ClientConn") + case picker := <-tcc.NewPickerCh: + // Set each of the three upstream addresses to have five successes each. + // This should cause none of the addresses to be ejected as none of them + // are outliers according to the success rate algorithm. + for i := 0; i < 3; i++ { + pi, err := picker.Pick(balancer.PickInfo{}) + if err != nil { + t.Fatalf("picker.Pick failed with error: %v", err) + } + for c := 0; c < 5; c++ { + pi.Done(balancer.DoneInfo{}) + } + } + + od.intervalTimerAlgorithm() + + // verify no UpdateSubConnState() call on the child, as no addresses got + // ejected (ejected address will cause an UpdateSubConnState call). + sCtx, cancel := context.WithTimeout(context.Background(), defaultTestShortTimeout) + defer cancel() + if _, err := scsCh.Receive(sCtx); err == nil { + t.Fatalf("no SubConn update should have been sent (no SubConn got ejected)") + } + + // Since no addresses are ejected, a SubConn update should forward down + // to the child. + od.UpdateSubConnState(scw1.(*subConnWrapper).SubConn, balancer.SubConnState{ + ConnectivityState: connectivity.Connecting, + }) + + gotSCWS, err := scsCh.Receive(ctx) + if err != nil { + t.Fatalf("Error waiting for Sub Conn update: %v", err) + } + if err = scwsEqual(gotSCWS.(subConnWithState), subConnWithState{ + sc: scw1, + state: balancer.SubConnState{ConnectivityState: connectivity.Connecting}, + }); err != nil { + t.Fatalf("Error in Sub Conn update: %v", err) + } + + // Set two of the upstream addresses to have five successes each, and + // one of the upstream addresses to have five failures. This should + // cause the address which has five failures to be ejected according to + // the SuccessRateAlgorithm. + for i := 0; i < 2; i++ { + pi, err := picker.Pick(balancer.PickInfo{}) + if err != nil { + t.Fatalf("picker.Pick failed with error: %v", err) + } + for c := 0; c < 5; c++ { + pi.Done(balancer.DoneInfo{}) + } + } + pi, err := picker.Pick(balancer.PickInfo{}) + if err != nil { + t.Fatalf("picker.Pick failed with error: %v", err) + } + for c := 0; c < 5; c++ { + pi.Done(balancer.DoneInfo{Err: errors.New("some error")}) + } + + // should eject address that always errored. + od.intervalTimerAlgorithm() + // Due to the address being ejected, the SubConn with that address + // should be ejected, meaning a TRANSIENT_FAILURE connectivity state + // gets reported to the child. + gotSCWS, err = scsCh.Receive(ctx) + if err != nil { + t.Fatalf("Error waiting for Sub Conn update: %v", err) + } + if err = scwsEqual(gotSCWS.(subConnWithState), subConnWithState{ + sc: scw3, + state: balancer.SubConnState{ConnectivityState: connectivity.TransientFailure}, + }); err != nil { + t.Fatalf("Error in Sub Conn update: %v", err) + } + // Only one address should be ejected. + sCtx, cancel = context.WithTimeout(context.Background(), defaultTestShortTimeout) + defer cancel() + if _, err := scsCh.Receive(sCtx); err == nil { + t.Fatalf("Only one SubConn update should have been sent (only one SubConn got ejected)") + } + + // Now that an address is ejected, SubConn updates for SubConns using + // that address should not be forwarded downward. These SubConn updates + // will be cached to update the child sometime in the future when the + // address gets unejected. + od.UpdateSubConnState(pi.SubConn, balancer.SubConnState{ + ConnectivityState: connectivity.Connecting, + }) + sCtx, cancel = context.WithTimeout(context.Background(), defaultTestShortTimeout) + defer cancel() + if _, err := scsCh.Receive(sCtx); err == nil { + t.Fatalf("SubConn update should not have been forwarded (the SubConn is ejected)") + } + + // Override now to cause the interval timer algorithm to always uneject + // the ejected address. This will always uneject the ejected address + // because this time is set way past the max ejection time set in the + // configuration, which will make the next interval timer algorithm run + // uneject any ejected addresses. + defer func(n func() time.Time) { + now = n + }(now) + now = func() time.Time { + return time.Now().Add(time.Second * 1000) + } + od.intervalTimerAlgorithm() + + // unejected SubConn should report latest persisted state - which is + // connecting from earlier. + gotSCWS, err = scsCh.Receive(ctx) + if err != nil { + t.Fatalf("Error waiting for Sub Conn update: %v", err) + } + if err = scwsEqual(gotSCWS.(subConnWithState), subConnWithState{ + sc: scw3, + state: balancer.SubConnState{ConnectivityState: connectivity.Connecting}, + }); err != nil { + t.Fatalf("Error in Sub Conn update: %v", err) + } + } +} + +// TestEjectFailureRate tests the functionality of the interval timer algorithm +// when configured with FailurePercentageEjection, and also the functionality of +// noop configuration. The Outlier Detection Balancer will be set up with 3 +// SubConns, each with a different address. It tests the following scenarios, in +// a step by step fashion: +// 1. The three addresses each have 5 successes. The interval timer algorithm +// should not eject any of the addresses. +// 2. Two of the addresses have 5 successes, the third has five failures. The +// interval timer algorithm should eject the third address with five failures. +// 3. The Outlier Detection Balancer receives a subsequent noop config update. +// The balancer should uneject all ejected addresses. +func (s) TestEjectFailureRate(t *testing.T) { + scsCh := testutils.NewChannel() + var scw1, scw2, scw3 balancer.SubConn + var err error + stub.Register(t.Name(), stub.BalancerFuncs{ + UpdateClientConnState: func(bd *stub.BalancerData, _ balancer.ClientConnState) error { + if scw1 != nil { // UpdateClientConnState was already called, no need to recreate SubConns. + return nil + } + scw1, err = bd.ClientConn.NewSubConn([]resolver.Address{{Addr: "address1"}}, balancer.NewSubConnOptions{}) + if err != nil { + t.Errorf("error in od.NewSubConn call: %v", err) + } + scw2, err = bd.ClientConn.NewSubConn([]resolver.Address{{Addr: "address2"}}, balancer.NewSubConnOptions{}) + if err != nil { + t.Errorf("error in od.NewSubConn call: %v", err) + } + scw3, err = bd.ClientConn.NewSubConn([]resolver.Address{{Addr: "address3"}}, balancer.NewSubConnOptions{}) + if err != nil { + t.Errorf("error in od.NewSubConn call: %v", err) + } + return nil + }, + UpdateSubConnState: func(_ *stub.BalancerData, sc balancer.SubConn, state balancer.SubConnState) { + scsCh.Send(subConnWithState{ + sc: sc, + state: state, + }) + }, + }) + + od, tcc, cleanup := setup(t) + defer func() { + cleanup() + }() + + od.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: resolver.State{ + Addresses: []resolver.Address{ + {Addr: "address1"}, + {Addr: "address2"}, + {Addr: "address3"}, + }, + }, + BalancerConfig: &LBConfig{ + Interval: math.MaxInt64, // so the interval will never run unless called manually in test. + BaseEjectionTime: 30 * time.Second, + MaxEjectionTime: 300 * time.Second, + MaxEjectionPercent: 10, + SuccessRateEjection: &SuccessRateEjection{ + StdevFactor: 500, + EnforcementPercentage: 100, + MinimumHosts: 3, + RequestVolume: 3, + }, + ChildPolicy: &internalserviceconfig.BalancerConfig{ + Name: t.Name(), + Config: emptyChildConfig{}, + }, + }, + }) + + od.UpdateState(balancer.State{ + ConnectivityState: connectivity.Ready, + Picker: &rrPicker{ + scs: []balancer.SubConn{scw1, scw2, scw3}, + }, + }) + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + select { + case <-ctx.Done(): + t.Fatalf("timeout while waiting for a UpdateState call on the ClientConn") + case picker := <-tcc.NewPickerCh: + // Set each upstream address to have five successes each. This should + // cause none of the addresses to be ejected as none of them are below + // the failure percentage threshold. + for i := 0; i < 3; i++ { + pi, err := picker.Pick(balancer.PickInfo{}) + if err != nil { + t.Fatalf("picker.Pick failed with error: %v", err) + } + for c := 0; c < 5; c++ { + pi.Done(balancer.DoneInfo{}) + } + } + + od.intervalTimerAlgorithm() + sCtx, cancel := context.WithTimeout(context.Background(), defaultTestShortTimeout) + defer cancel() + if _, err := scsCh.Receive(sCtx); err == nil { + t.Fatalf("no SubConn update should have been sent (no SubConn got ejected)") + } + + // Set two upstream addresses to have five successes each, and one + // upstream address to have five failures. This should cause the address + // with five failures to be ejected according to the Failure Percentage + // Algorithm. + for i := 0; i < 2; i++ { + pi, err := picker.Pick(balancer.PickInfo{}) + if err != nil { + t.Fatalf("picker.Pick failed with error: %v", err) + } + for c := 0; c < 5; c++ { + pi.Done(balancer.DoneInfo{}) + } + } + pi, err := picker.Pick(balancer.PickInfo{}) + if err != nil { + t.Fatalf("picker.Pick failed with error: %v", err) + } + for c := 0; c < 5; c++ { + pi.Done(balancer.DoneInfo{Err: errors.New("some error")}) + } + + // should eject address that always errored. + od.intervalTimerAlgorithm() + + // verify UpdateSubConnState() got called with TRANSIENT_FAILURE for + // child in address that was ejected. + gotSCWS, err := scsCh.Receive(ctx) + if err != nil { + t.Fatalf("Error waiting for Sub Conn update: %v", err) + } + if err = scwsEqual(gotSCWS.(subConnWithState), subConnWithState{ + sc: scw3, + state: balancer.SubConnState{ConnectivityState: connectivity.TransientFailure}, + }); err != nil { + t.Fatalf("Error in Sub Conn update: %v", err) + } + + // verify only one address got ejected. + sCtx, cancel = context.WithTimeout(context.Background(), defaultTestShortTimeout) + defer cancel() + if _, err := scsCh.Receive(sCtx); err == nil { + t.Fatalf("Only one SubConn update should have been sent (only one SubConn got ejected)") + } + + // upon the Outlier Detection balancer being reconfigured with a noop + // configuration, every ejected SubConn should be unejected. + od.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: resolver.State{ + Addresses: []resolver.Address{ + {Addr: "address1"}, + {Addr: "address2"}, + {Addr: "address3"}, + }, + }, + BalancerConfig: &LBConfig{ + Interval: math.MaxInt64, + BaseEjectionTime: 30 * time.Second, + MaxEjectionTime: 300 * time.Second, + MaxEjectionPercent: 10, + ChildPolicy: &internalserviceconfig.BalancerConfig{ + Name: t.Name(), + Config: emptyChildConfig{}, + }, + }, + }) + gotSCWS, err = scsCh.Receive(ctx) + if err != nil { + t.Fatalf("Error waiting for Sub Conn update: %v", err) + } + if err = scwsEqual(gotSCWS.(subConnWithState), subConnWithState{ + sc: scw3, + state: balancer.SubConnState{ConnectivityState: connectivity.Idle}, + }); err != nil { + t.Fatalf("Error in Sub Conn update: %v", err) + } + } } -func (errParseConfigBuilder) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, error) { - return nil, errors.New("some error") +// TestConcurrentOperations calls different operations on the balancer in +// separate goroutines to test for any race conditions and deadlocks. It also +// uses a child balancer which verifies that no operations on the child get +// called after the child balancer is closed. +func (s) TestConcurrentOperations(t *testing.T) { + closed := grpcsync.NewEvent() + stub.Register(t.Name(), stub.BalancerFuncs{ + UpdateClientConnState: func(*stub.BalancerData, balancer.ClientConnState) error { + if closed.HasFired() { + t.Error("UpdateClientConnState was called after Close(), which breaks the balancer API") + } + return nil + }, + ResolverError: func(*stub.BalancerData, error) { + if closed.HasFired() { + t.Error("ResolverError was called after Close(), which breaks the balancer API") + } + }, + UpdateSubConnState: func(*stub.BalancerData, balancer.SubConn, balancer.SubConnState) { + if closed.HasFired() { + t.Error("UpdateSubConnState was called after Close(), which breaks the balancer API") + } + }, + Close: func(*stub.BalancerData) { + closed.Fire() + }, + ExitIdle: func(*stub.BalancerData) { + if closed.HasFired() { + t.Error("ExitIdle was called after Close(), which breaks the balancer API") + } + }, + }) + + od, tcc, cleanup := setup(t) + defer func() { + cleanup() + }() + + od.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: resolver.State{ + Addresses: []resolver.Address{ + {Addr: "address1"}, + {Addr: "address2"}, + {Addr: "address3"}, + }, + }, + BalancerConfig: &LBConfig{ + Interval: math.MaxInt64, // so the interval will never run unless called manually in test. + BaseEjectionTime: 30 * time.Second, + MaxEjectionTime: 300 * time.Second, + MaxEjectionPercent: 10, + SuccessRateEjection: &SuccessRateEjection{ // Have both Success Rate and Failure Percentage to step through all the interval timer code + StdevFactor: 500, + EnforcementPercentage: 100, + MinimumHosts: 3, + RequestVolume: 3, + }, + FailurePercentageEjection: &FailurePercentageEjection{ + Threshold: 50, + EnforcementPercentage: 100, + MinimumHosts: 3, + RequestVolume: 3, + }, + ChildPolicy: &internalserviceconfig.BalancerConfig{ + Name: t.Name(), + Config: emptyChildConfig{}, + }, + }, + }) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + scw1, err := od.NewSubConn([]resolver.Address{{Addr: "address1"}}, balancer.NewSubConnOptions{}) + if err != nil { + t.Fatalf("error in od.NewSubConn call: %v", err) + } + if err != nil { + t.Fatalf("error in od.NewSubConn call: %v", err) + } + + scw2, err := od.NewSubConn([]resolver.Address{{Addr: "address2"}}, balancer.NewSubConnOptions{}) + if err != nil { + t.Fatalf("error in od.NewSubConn call: %v", err) + } + + scw3, err := od.NewSubConn([]resolver.Address{{Addr: "address3"}}, balancer.NewSubConnOptions{}) + if err != nil { + t.Fatalf("error in od.NewSubConn call: %v", err) + } + + od.UpdateState(balancer.State{ + ConnectivityState: connectivity.Ready, + Picker: &rrPicker{ + scs: []balancer.SubConn{scw2, scw3}, + }, + }) + + var picker balancer.Picker + select { + case <-ctx.Done(): + t.Fatalf("timeout while waiting for a UpdateState call on the ClientConn") + case picker = <-tcc.NewPickerCh: + } + + finished := make(chan struct{}) + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-finished: + return + default: + } + pi, err := picker.Pick(balancer.PickInfo{}) + if err != nil { + continue + } + pi.Done(balancer.DoneInfo{}) + pi.Done(balancer.DoneInfo{Err: errors.New("some error")}) + time.Sleep(1 * time.Nanosecond) + } + }() + + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-finished: + return + default: + } + od.intervalTimerAlgorithm() + } + }() + + // call Outlier Detection's balancer.ClientConn operations asynchronously. + // balancer.ClientConn operations have no guarantee from the API to be + // called synchronously. + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-finished: + return + default: + } + od.UpdateState(balancer.State{ + ConnectivityState: connectivity.Ready, + Picker: &rrPicker{ + scs: []balancer.SubConn{scw2, scw3}, + }, + }) + time.Sleep(1 * time.Nanosecond) + } + }() + + wg.Add(1) + go func() { + defer wg.Done() + od.NewSubConn([]resolver.Address{{Addr: "address4"}}, balancer.NewSubConnOptions{}) + }() + + wg.Add(1) + go func() { + defer wg.Done() + od.RemoveSubConn(scw1) + }() + + wg.Add(1) + go func() { + defer wg.Done() + od.UpdateAddresses(scw2, []resolver.Address{{Addr: "address3"}}) + }() + + // Call balancer.Balancers synchronously in this goroutine, upholding the + // balancer.Balancer API guarantee of synchronous calls. + od.UpdateClientConnState(balancer.ClientConnState{ // This will delete addresses and flip to no op + ResolverState: resolver.State{ + Addresses: []resolver.Address{{Addr: "address1"}}, + }, + BalancerConfig: &LBConfig{ + Interval: math.MaxInt64, + ChildPolicy: &internalserviceconfig.BalancerConfig{ + Name: t.Name(), + Config: emptyChildConfig{}, + }, + }, + }) + + // Call balancer.Balancers synchronously in this goroutine, upholding the + // balancer.Balancer API guarantee. + od.UpdateSubConnState(scw1.(*subConnWrapper).SubConn, balancer.SubConnState{ + ConnectivityState: connectivity.Connecting, + }) + od.ResolverError(errors.New("some error")) + od.ExitIdle() + od.Close() + close(finished) + wg.Wait() } diff --git a/xds/internal/balancer/outlierdetection/callcounter.go b/xds/internal/balancer/outlierdetection/callcounter.go new file mode 100644 index 00000000000..4597f727b6e --- /dev/null +++ b/xds/internal/balancer/outlierdetection/callcounter.go @@ -0,0 +1,66 @@ +/* + * + * Copyright 2022 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package outlierdetection + +import ( + "sync/atomic" + "unsafe" +) + +type bucket struct { + numSuccesses uint32 + numFailures uint32 +} + +func newCallCounter() *callCounter { + return &callCounter{ + activeBucket: unsafe.Pointer(&bucket{}), + inactiveBucket: &bucket{}, + } +} + +// callCounter has two buckets, which each count successful and failing RPC's. +// The activeBucket is used to actively count any finished RPC's, and the +// inactiveBucket is populated with this activeBucket's data every interval for +// use by the Outlier Detection algorithm. +type callCounter struct { + // activeBucket updates every time a call finishes (from picker passed to + // Client Conn), so protect pointer read with atomic load of unsafe.Pointer + // so picker does not have to grab a mutex per RPC, the critical path. + activeBucket unsafe.Pointer // bucket + inactiveBucket *bucket +} + +func (cc *callCounter) clear() { + atomic.StorePointer(&cc.activeBucket, unsafe.Pointer(&bucket{})) + cc.inactiveBucket = &bucket{} +} + +// "When the timer triggers, the inactive bucket is zeroed and swapped with the +// active bucket. Then the inactive bucket contains the number of successes and +// failures since the last time the timer triggered. Those numbers are used to +// evaluate the ejection criteria." - A50. +func (cc *callCounter) swap() { + ib := cc.inactiveBucket + *ib = bucket{} + ab := (*bucket)(atomic.SwapPointer(&cc.activeBucket, unsafe.Pointer(ib))) + cc.inactiveBucket = &bucket{ + numSuccesses: atomic.LoadUint32(&ab.numSuccesses), + numFailures: atomic.LoadUint32(&ab.numFailures), + } +} diff --git a/xds/internal/balancer/outlierdetection/callcounter_test.go b/xds/internal/balancer/outlierdetection/callcounter_test.go new file mode 100644 index 00000000000..8e4f5f29b5f --- /dev/null +++ b/xds/internal/balancer/outlierdetection/callcounter_test.go @@ -0,0 +1,94 @@ +/* + * + * Copyright 2022 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package outlierdetection + +import ( + "sync/atomic" + "testing" + "unsafe" + + "github.com/google/go-cmp/cmp" +) + +func (b1 *bucket) Equal(b2 *bucket) bool { + if b1 == nil && b2 == nil { + return true + } + if (b1 != nil) != (b2 != nil) { + return false + } + if b1.numSuccesses != b2.numSuccesses { + return false + } + return b1.numFailures == b2.numFailures +} + +func (cc1 *callCounter) Equal(cc2 *callCounter) bool { + if cc1 == nil && cc2 == nil { + return true + } + if (cc1 != nil) != (cc2 != nil) { + return false + } + ab1 := (*bucket)(atomic.LoadPointer(&cc1.activeBucket)) + ab2 := (*bucket)(atomic.LoadPointer(&cc2.activeBucket)) + if !ab1.Equal(ab2) { + return false + } + return cc1.inactiveBucket.Equal(cc2.inactiveBucket) +} + +// TestClear tests that clear on the call counter clears (everything set to 0) +// the active and inactive buckets. +func (s) TestClear(t *testing.T) { + cc := newCallCounter() + ab := (*bucket)(atomic.LoadPointer(&cc.activeBucket)) + ab.numSuccesses = 1 + ab.numFailures = 2 + cc.inactiveBucket.numSuccesses = 4 + cc.inactiveBucket.numFailures = 5 + cc.clear() + // Both the active and inactive buckets should be cleared. + ccWant := newCallCounter() + if diff := cmp.Diff(cc, ccWant); diff != "" { + t.Fatalf("callCounter is different than expected, diff (-got +want): %v", diff) + } +} + +// TestSwap tests that swap() on the callCounter successfully has the desired +// end result of inactive bucket containing the previous active buckets data, +// and the active bucket being cleared. +func (s) TestSwap(t *testing.T) { + cc := newCallCounter() + ab := (*bucket)(atomic.LoadPointer(&cc.activeBucket)) + ab.numSuccesses = 1 + ab.numFailures = 2 + cc.inactiveBucket.numSuccesses = 4 + cc.inactiveBucket.numFailures = 5 + ib := cc.inactiveBucket + cc.swap() + // Inactive should pick up active's data, active should be swapped to zeroed + // inactive. + ccWant := newCallCounter() + ccWant.inactiveBucket.numSuccesses = 1 + ccWant.inactiveBucket.numFailures = 2 + atomic.StorePointer(&ccWant.activeBucket, unsafe.Pointer(ib)) + if diff := cmp.Diff(cc, ccWant); diff != "" { + t.Fatalf("callCounter is different than expected, diff (-got +want): %v", diff) + } +} diff --git a/xds/internal/balancer/outlierdetection/e2e_test/outlierdetection_test.go b/xds/internal/balancer/outlierdetection/e2e_test/outlierdetection_test.go new file mode 100644 index 00000000000..a1987bf98a9 --- /dev/null +++ b/xds/internal/balancer/outlierdetection/e2e_test/outlierdetection_test.go @@ -0,0 +1,369 @@ +/* + * + * Copyright 2022 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// Package e2e_test contains e2e test cases for the Outlier Detection LB Policy. +package e2e_test + +import ( + "context" + "errors" + "fmt" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/internal" + "google.golang.org/grpc/internal/grpctest" + "google.golang.org/grpc/internal/stubserver" + "google.golang.org/grpc/peer" + "google.golang.org/grpc/resolver" + "google.golang.org/grpc/resolver/manual" + "google.golang.org/grpc/serviceconfig" + testpb "google.golang.org/grpc/test/grpc_testing" + _ "google.golang.org/grpc/xds/internal/balancer/outlierdetection" // To register helper functions which register/unregister Outlier Detection LB Policy. +) + +var defaultTestTimeout = 5 * time.Second + +type s struct { + grpctest.Tester +} + +func Test(t *testing.T) { + grpctest.RunSubTests(t, s{}) +} + +// Setup spins up three test backends, each listening on a port on localhost. +// Two of the backends are configured to always reply with an empty response and +// no error and one is configured to always return an error. +func setupBackends(t *testing.T) ([]string, func()) { + t.Helper() + + backends := make([]*stubserver.StubServer, 3) + addresses := make([]string, 3) + // Construct and start 2 working backends. + for i := 0; i < 2; i++ { + backend := &stubserver.StubServer{ + EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { + return &testpb.Empty{}, nil + }, + } + if err := backend.StartServer(); err != nil { + t.Fatalf("Failed to start backend: %v", err) + } + t.Logf("Started good TestService backend at: %q", backend.Address) + backends[i] = backend + addresses[i] = backend.Address + } + + // Construct and start a failing backend. + backend := &stubserver.StubServer{ + EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { + return nil, errors.New("some error") + }, + } + if err := backend.StartServer(); err != nil { + t.Fatalf("Failed to start backend: %v", err) + } + t.Logf("Started bad TestService backend at: %q", backend.Address) + backends[2] = backend + addresses[2] = backend.Address + cancel := func() { + for _, backend := range backends { + backend.Stop() + } + } + return addresses, cancel +} + +// checkRoundRobinRPCs verifies that EmptyCall RPCs on the given ClientConn, +// connected to a server exposing the test.grpc_testing.TestService, are +// roundrobined across the given backend addresses. +// +// Returns a non-nil error if context deadline expires before RPCs start to get +// roundrobined across the given backends. +func checkRoundRobinRPCs(ctx context.Context, client testpb.TestServiceClient, addrs []resolver.Address) error { + wantAddrCount := make(map[string]int) + for _, addr := range addrs { + wantAddrCount[addr.Addr]++ + } + for ; ctx.Err() == nil; <-time.After(time.Millisecond) { + // Perform 3 iterations. + var iterations [][]string + for i := 0; i < 3; i++ { + iteration := make([]string, len(addrs)) + for c := 0; c < len(addrs); c++ { + var peer peer.Peer + client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(&peer)) + if peer.Addr != nil { + iteration[c] = peer.Addr.String() + } + } + iterations = append(iterations, iteration) + } + // Ensure the the first iteration contains all addresses in addrs. + gotAddrCount := make(map[string]int) + for _, addr := range iterations[0] { + gotAddrCount[addr]++ + } + if diff := cmp.Diff(gotAddrCount, wantAddrCount); diff != "" { + continue + } + // Ensure all three iterations contain the same addresses. + if !cmp.Equal(iterations[0], iterations[1]) || !cmp.Equal(iterations[0], iterations[2]) { + continue + } + return nil + } + return fmt.Errorf("timeout when waiting for roundrobin distribution of RPCs across addresses: %v", addrs) +} + +// TestOutlierDetectionAlgorithmsE2E tests the Outlier Detection Success Rate +// and Failure Percentage algorithms in an e2e fashion. The Outlier Detection +// Balancer is configured as the top level LB Policy of the channel with a Round +// Robin child, and connects to three upstreams. Two of the upstreams are healthy and +// one is unhealthy. The two algorithms should at some point eject the failing +// upstream, causing RPC's to not be routed to those two upstreams, and only be +// Round Robined across the two healthy upstreams. Other than the intervals the +// two unhealthy upstreams are ejected, RPC's should regularly round robin +// across all three upstreams. +func (s) TestOutlierDetectionAlgorithmsE2E(t *testing.T) { + tests := []struct { + name string + odscJSON string + }{ + { + name: "Success Rate Algorithm", + odscJSON: ` +{ + "loadBalancingConfig": [ + { + "outlier_detection_experimental": { + "interval": 50000000, + "baseEjectionTime": 100000000, + "maxEjectionTime": 300000000000, + "maxEjectionPercent": 33, + "successRateEjection": { + "stdevFactor": 50, + "enforcementPercentage": 100, + "minimumHosts": 3, + "requestVolume": 5 + }, + "childPolicy": [{"round_robin": {}}] + } + } + ] +}`, + }, + { + name: "Failure Percentage Algorithm", + odscJSON: ` +{ + "loadBalancingConfig": [ + { + "outlier_detection_experimental": { + "interval": 50000000, + "baseEjectionTime": 100000000, + "maxEjectionTime": 300000000000, + "maxEjectionPercent": 33, + "failurePercentageEjection": { + "threshold": 50, + "enforcementPercentage": 100, + "minimumHosts": 3, + "requestVolume": 5 + }, + "childPolicy": [{"round_robin": {}} + ] + } + } + ] +}`, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + internal.RegisterOutlierDetectionBalancerForTesting() + defer internal.UnregisterOutlierDetectionBalancerForTesting() + addresses, cancel := setupBackends(t) + defer cancel() + + mr := manual.NewBuilderWithScheme("od-e2e") + defer mr.Close() + + sc := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(test.odscJSON) + // The full list of addresses. + fullAddresses := []resolver.Address{ + {Addr: addresses[0]}, + {Addr: addresses[1]}, + {Addr: addresses[2]}, + } + mr.InitialState(resolver.State{ + Addresses: fullAddresses, + ServiceConfig: sc, + }) + + cc, err := grpc.Dial(mr.Scheme()+":///", grpc.WithResolvers(mr), grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Fatalf("grpc.Dial() failed: %v", err) + } + defer cc.Close() + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + testServiceClient := testpb.NewTestServiceClient(cc) + + // At first, due to no statistics on each of the backends, the 3 + // upstreams should all be round robined across. + if err = checkRoundRobinRPCs(ctx, testServiceClient, fullAddresses); err != nil { + t.Fatalf("error in expected round robin: %v", err) + } + + // The addresses which don't return errors. + okAddresses := []resolver.Address{ + {Addr: addresses[0]}, + {Addr: addresses[1]}, + } + // After calling the three upstreams, one of them constantly error + // and should eventually be ejected for a period of time. This + // period of time should cause the RPC's to be round robined only + // across the two that are healthy. + if err = checkRoundRobinRPCs(ctx, testServiceClient, okAddresses); err != nil { + t.Fatalf("error in expected round robin: %v", err) + } + + // The failing upstream isn't ejected indefinitely, and eventually + // should be unejected in subsequent iterations of the interval + // algorithm as per the spec for the two specific algorithms. + if err = checkRoundRobinRPCs(ctx, testServiceClient, fullAddresses); err != nil { + t.Fatalf("error in expected round robin: %v", err) + } + }) + } +} + +// TestNoopConfiguration tests the Outlier Detection Balancer configured with a +// noop configuration. The noop configuration should cause the Outlier Detection +// Balancer to not count RPC's, and thus never eject any upstreams and continue +// to route to every upstream connected to, even if they continuously error. +// Once the Outlier Detection Balancer gets reconfigured with configuration +// requiring counting RPC's, the Outlier Detection Balancer should start +// ejecting any upstreams as specified in the configuration. +func (s) TestNoopConfiguration(t *testing.T) { + internal.RegisterOutlierDetectionBalancerForTesting() + defer internal.UnregisterOutlierDetectionBalancerForTesting() + addresses, cancel := setupBackends(t) + defer cancel() + + mr := manual.NewBuilderWithScheme("od-e2e") + defer mr.Close() + + noopODServiceConfigJSON := ` +{ + "loadBalancingConfig": [ + { + "outlier_detection_experimental": { + "interval": 50000000, + "baseEjectionTime": 100000000, + "maxEjectionTime": 300000000000, + "maxEjectionPercent": 33, + "childPolicy": [{"round_robin": {}}] + } + } + ] +}` + sc := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(noopODServiceConfigJSON) + // The full list of addresses. + fullAddresses := []resolver.Address{ + {Addr: addresses[0]}, + {Addr: addresses[1]}, + {Addr: addresses[2]}, + } + mr.InitialState(resolver.State{ + Addresses: fullAddresses, + ServiceConfig: sc, + }) + cc, err := grpc.Dial(mr.Scheme()+":///", grpc.WithResolvers(mr), grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Fatalf("grpc.Dial() failed: %v", err) + } + defer cc.Close() + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + testServiceClient := testpb.NewTestServiceClient(cc) + + for i := 0; i < 2; i++ { + // Since the Outlier Detection Balancer starts with a noop + // configuration, it shouldn't count RPCs or eject any upstreams. Thus, + // even though an upstream it connects to constantly errors, it should + // continue to Round Robin across every upstream. + if err := checkRoundRobinRPCs(ctx, testServiceClient, fullAddresses); err != nil { + t.Fatalf("error in expected round robin: %v", err) + } + } + + // Reconfigure the Outlier Detection Balancer with a configuration that + // specifies to count RPC's and eject upstreams. Due to the balancer no + // longer being a noop, it should eject any unhealthy addresses as specified + // by the failure percentage portion of the configuration. + countingODServiceConfigJSON := ` +{ + "loadBalancingConfig": [ + { + "outlier_detection_experimental": { + "interval": 50000000, + "baseEjectionTime": 100000000, + "maxEjectionTime": 300000000000, + "maxEjectionPercent": 33, + "failurePercentageEjection": { + "threshold": 50, + "enforcementPercentage": 100, + "minimumHosts": 3, + "requestVolume": 5 + }, + "childPolicy": [{"round_robin": {}}] + } + } + ] +}` + sc = internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(countingODServiceConfigJSON) + + mr.UpdateState(resolver.State{ + Addresses: fullAddresses, + ServiceConfig: sc, + }) + + // At first on the reconfigured balancer, the balancer has no stats + // collected about upstreams. Thus, it should at first route across the full + // upstream list. + if err = checkRoundRobinRPCs(ctx, testServiceClient, fullAddresses); err != nil { + t.Fatalf("error in expected round robin: %v", err) + } + + // The addresses which don't return errors. + okAddresses := []resolver.Address{ + {Addr: addresses[0]}, + {Addr: addresses[1]}, + } + // Now that the reconfigured balancer has data about the failing upstream, + // it should eject the upstream and only route across the two healthy + // upstreams. + if err = checkRoundRobinRPCs(ctx, testServiceClient, okAddresses); err != nil { + t.Fatalf("error in expected round robin: %v", err) + } +} diff --git a/xds/internal/balancer/outlierdetection/logging.go b/xds/internal/balancer/outlierdetection/logging.go new file mode 100644 index 00000000000..705b0cb6918 --- /dev/null +++ b/xds/internal/balancer/outlierdetection/logging.go @@ -0,0 +1,34 @@ +/* + * + * Copyright 2022 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package outlierdetection + +import ( + "fmt" + + "google.golang.org/grpc/grpclog" + internalgrpclog "google.golang.org/grpc/internal/grpclog" +) + +const prefix = "[outlier-detection-lb %p] " + +var logger = grpclog.Component("xds") + +func prefixLogger(p *outlierDetectionBalancer) *internalgrpclog.PrefixLogger { + return internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf(prefix, p)) +} diff --git a/xds/internal/balancer/outlierdetection/subconn_wrapper.go b/xds/internal/balancer/outlierdetection/subconn_wrapper.go new file mode 100644 index 00000000000..8e25eb788b1 --- /dev/null +++ b/xds/internal/balancer/outlierdetection/subconn_wrapper.go @@ -0,0 +1,68 @@ +/* + * + * Copyright 2022 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package outlierdetection + +import ( + "unsafe" + + "google.golang.org/grpc/balancer" + "google.golang.org/grpc/internal/buffer" + "google.golang.org/grpc/resolver" +) + +// subConnWrapper wraps every created SubConn in the Outlier Detection Balancer, +// to help track the latest state update from the underlying SubConn, and also +// whether or not this SubConn is ejected. +type subConnWrapper struct { + balancer.SubConn + + // addressInfo is a pointer to the subConnWrapper's corresponding address + // map entry, if the map entry exists. + addressInfo unsafe.Pointer // *addressInfo + // These two pieces of state will reach eventual consistency due to sync in + // run(), and child will always have the correctly updated SubConnState. + // latestState is the latest state update from the underlying SubConn. This + // is used whenever a SubConn gets unejected. + latestState balancer.SubConnState + ejected bool + + scUpdateCh *buffer.Unbounded + + // addresses is the list of address(es) this SubConn was created with to + // help support any change in address(es) + addresses []resolver.Address +} + +// eject causes the wrapper to report a state update with the TRANSIENT_FAILURE +// state, and to stop passing along updates from the underlying subchannel. +func (scw *subConnWrapper) eject() { + scw.scUpdateCh.Put(&ejectionUpdate{ + scw: scw, + isEjected: true, + }) +} + +// uneject causes the wrapper to report a state update with the latest update +// from the underlying subchannel, and resume passing along updates from the +// underlying subchannel. +func (scw *subConnWrapper) uneject() { + scw.scUpdateCh.Put(&ejectionUpdate{ + scw: scw, + isEjected: false, + }) +}