From aeb3c1946ab80f7c0ac65c6827f6eea5dd545a37 Mon Sep 17 00:00:00 2001 From: Zach Reyes Date: Tue, 28 Jun 2022 18:05:29 -0400 Subject: [PATCH] e2e test and bug fix --- test/xds/xds_client_outlier_detection_test.go | 143 +++++++++++++++++- .../balancer/outlierdetection/balancer.go | 143 +++++------------- .../outlierdetection/balancer_test.go | 24 +-- 3 files changed, 179 insertions(+), 131 deletions(-) diff --git a/test/xds/xds_client_outlier_detection_test.go b/test/xds/xds_client_outlier_detection_test.go index d500de03c808..d304d35a31a0 100644 --- a/test/xds/xds_client_outlier_detection_test.go +++ b/test/xds/xds_client_outlier_detection_test.go @@ -20,19 +20,31 @@ package xds_test import ( "context" + "errors" "fmt" + "strings" "testing" + "time" + v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" + v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3" + v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" + v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/internal" "google.golang.org/grpc/internal/envconfig" + "google.golang.org/grpc/internal/stubserver" "google.golang.org/grpc/internal/testutils/xds/e2e" - testgrpc "google.golang.org/grpc/test/grpc_testing" testpb "google.golang.org/grpc/test/grpc_testing" + "google.golang.org/protobuf/types/known/durationpb" + "google.golang.org/protobuf/types/known/wrapperspb" ) +// TestOutlierDetection tests an xDS configured ClientConn with an Outlier +// Detection present in the system which is a logical no-op. An RPC should +// proceed as normal. func (s) TestOutlierDetection(t *testing.T) { oldOD := envconfig.XDSOutlierDetection envconfig.XDSOutlierDetection = true @@ -74,3 +86,132 @@ func (s) TestOutlierDetection(t *testing.T) { t.Fatalf("rpc EmptyCall() failed: %v", err) } } + +// defaultClientResourcesSpecifyingMultipleBackendsAndOutlierDetection returns +// xDS resources which correspond to multiple upstreams, corresponding different +// backends listening on different localhost:port combinations. The resources +// also configure an Outlier Detection Balancer set up with Failure Percentage +// Algorithm, which ejects endpoints based on failure rate. +func defaultClientResourcesSpecifyingMultipleBackendsAndOutlierDetection(params e2e.ResourceParams, ports []uint32) e2e.UpdateOptions { + routeConfigName := "route-" + params.DialTarget + clusterName := "cluster-" + params.DialTarget + endpointsName := "endpoints-" + params.DialTarget + return e2e.UpdateOptions{ + NodeID: params.NodeID, + Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(params.DialTarget, routeConfigName)}, + Routes: []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(routeConfigName, params.DialTarget, clusterName)}, + Clusters: []*v3clusterpb.Cluster{defaultClusterWithOutlierDetection(clusterName, endpointsName, params.SecLevel)}, + Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(endpointsName, params.Host, ports)}, + } +} + +func defaultClusterWithOutlierDetection(clusterName, edsServiceName string, secLevel e2e.SecurityLevel) *v3clusterpb.Cluster { + cluster := e2e.DefaultCluster(clusterName, edsServiceName, secLevel) + cluster.OutlierDetection = &v3clusterpb.OutlierDetection{ + Interval: &durationpb.Duration{ + Nanos: 500000000, + }, + BaseEjectionTime: &durationpb.Duration{Seconds: 30}, + MaxEjectionTime: &durationpb.Duration{Seconds: 300}, + MaxEjectionPercent: &wrapperspb.UInt32Value{Value: 1}, + FailurePercentageThreshold: &wrapperspb.UInt32Value{Value: 50}, + EnforcingFailurePercentage: &wrapperspb.UInt32Value{Value: 100}, + FailurePercentageRequestVolume: &wrapperspb.UInt32Value{Value: 1}, + FailurePercentageMinimumHosts: &wrapperspb.UInt32Value{Value: 1}, + } + return cluster +} + +// TestOutlierDetectionWithOutlier tests the Outlier Detection Balancer e2e. It +// spins up three backends, one which consistently errors, and configures the +// ClientConn using xDS to connect to all three of those backends. The Outlier +// Detection Balancer should eject the connection to the backend which +// constantly errors, and thus RPC's should mainly go to backend 1 and 2. +func (s) TestOutlierDetectionWithOutlier(t *testing.T) { + oldOD := envconfig.XDSOutlierDetection + envconfig.XDSOutlierDetection = true + internal.RegisterOutlierDetectionBalancerForTesting() + defer func() { + envconfig.XDSOutlierDetection = oldOD + internal.UnregisterOutlierDetectionBalancerForTesting() + }() + + managementServer, nodeID, _, resolver, cleanup := e2e.SetupManagementServer(t) + defer cleanup() + + // counters for how many times backends got called + var count1, count2, count3 int + + // Working backend 1. + port1, cleanup1 := startTestService(t, &stubserver.StubServer{ + EmptyCallF: func(context.Context, *testpb.Empty) (*testpb.Empty, error) { + count1++ + return &testpb.Empty{}, nil + }, + Address: "localhost:0", + }) + defer cleanup1() + + // Working backend 2. + port2, cleanup2 := startTestService(t, &stubserver.StubServer{ + EmptyCallF: func(context.Context, *testpb.Empty) (*testpb.Empty, error) { + count2++ + return &testpb.Empty{}, nil + }, + Address: "localhost:0", + }) + defer cleanup2() + // Backend 3 that will always return an error and eventually ejected. + port3, cleanup3 := startTestService(t, &stubserver.StubServer{ + EmptyCallF: func(context.Context, *testpb.Empty) (*testpb.Empty, error) { + count3++ + return nil, errors.New("some error") + }, + Address: "localhost:0", + }) + defer cleanup3() + + const serviceName = "my-service-client-side-xds" + resources := defaultClientResourcesSpecifyingMultipleBackendsAndOutlierDetection(e2e.ResourceParams{ + DialTarget: serviceName, + NodeID: nodeID, + Host: "localhost", + SecLevel: e2e.SecurityLevelNone, + }, []uint32{port1, port2, port3}) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*100) + defer cancel() + if err := managementServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + cc, err := grpc.Dial(fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(resolver)) + if err != nil { + t.Fatalf("failed to dial local test server: %v", err) + } + defer cc.Close() + + client := testgrpc.NewTestServiceClient(cc) + for i := 0; i < 2000; i++ { + // Can either error or not depending on the backend called. + if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil && !strings.Contains(err.Error(), "some error") { + t.Fatalf("rpc EmptyCall() failed: %v", err) + } + time.Sleep(time.Millisecond) + } + + // Backend 1 should've gotten more than 1/3rd of the load as backend 3 + // should get ejected, leaving only 1 and 2. + if count1 < 700 { + t.Fatalf("backend 1 should've gotten more than 1/3rd of the load") + } + // Backend 2 should've gotten more than 1/3rd of the load as backend 3 + // should get ejected, leaving only 1 and 2. + if count2 < 700 { + t.Fatalf("backend 2 should've gotten more than 1/3rd of the load") + } + // Backend 3 should've gotten less than 1/3rd of the load since it gets + // ejected. + if count3 > 650 { + t.Fatalf("backend 1 should've gotten more than 1/3rd of the load") + } +} diff --git a/xds/internal/balancer/outlierdetection/balancer.go b/xds/internal/balancer/outlierdetection/balancer.go index 6c99f94e049d..1331afed9e74 100644 --- a/xds/internal/balancer/outlierdetection/balancer.go +++ b/xds/internal/balancer/outlierdetection/balancer.go @@ -67,12 +67,11 @@ func init() { type bb struct{} func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer { - am := resolver.NewAddressMap() b := &outlierDetectionBalancer{ cc: cc, bOpts: bOpts, closed: grpcsync.NewEvent(), - odAddrs: am, + odAddrs: make(map[string]*object), scWrappers: make(map[balancer.SubConn]*subConnWrapper), scUpdateCh: buffer.NewUnbounded(), pickerUpdateCh: buffer.NewUnbounded(), @@ -174,7 +173,7 @@ type outlierDetectionBalancer struct { // running the interval timer algorithm which uses odAddrs heavily. This // will cause undefined behavior for the interval timer algorithm. mu sync.Mutex - odAddrs *resolver.AddressMap + odAddrs map[string]*object odCfg *LBConfig scWrappers map[balancer.SubConn]*subConnWrapper timerStartTime time.Time @@ -218,14 +217,14 @@ func (b *outlierDetectionBalancer) UpdateClientConnState(s balancer.ClientConnSt // When the outlier_detection LB policy receives an address update, it will // create a map entry for each subchannel address in the list, and remove // each map entry for a subchannel address not in the list. - addrs := make(map[resolver.Address]bool) + addrs := make(map[string]bool, len(s.ResolverState.Addresses)) for _, addr := range s.ResolverState.Addresses { - addrs[addr] = true - b.odAddrs.Set(addr, newObject()) + addrs[addr.Addr] = true + b.odAddrs[addr.Addr] = newObject() } - for _, addr := range b.odAddrs.Keys() { + for addr := range b.odAddrs { if !addrs[addr] { - b.odAddrs.Delete(addr) + delete(b.odAddrs, addr) } } @@ -235,7 +234,7 @@ func (b *outlierDetectionBalancer) UpdateClientConnState(s balancer.ClientConnSt var interval time.Duration if b.timerStartTime.IsZero() { b.timerStartTime = time.Now() - for _, obj := range b.objects() { + for _, obj := range b.odAddrs { obj.callCounter.clear() } interval = b.odCfg.Interval @@ -433,13 +432,7 @@ func (b *outlierDetectionBalancer) NewSubConn(addrs []resolver.Address, opts bal if len(addrs) != 1 { return scw, nil } - - val, ok := b.odAddrs.Get(addrs[0]) - if !ok { - return scw, nil - } - - obj, ok := val.(*object) + obj, ok := b.odAddrs[addrs[0].Addr] if !ok { return scw, nil } @@ -468,16 +461,12 @@ func (b *outlierDetectionBalancer) RemoveSubConn(sc balancer.SubConn) { // appendIfPresent appends the scw to the address, if the address is present in // the Outlier Detection balancers address map. Returns nil if not present, and // the map entry if present. -func (b *outlierDetectionBalancer) appendIfPresent(addr resolver.Address, scw *subConnWrapper) *object { - val, ok := b.odAddrs.Get(addr) +func (b *outlierDetectionBalancer) appendIfPresent(addr string, scw *subConnWrapper) *object { + obj, ok := b.odAddrs[addr] if !ok { return nil } - obj, ok := val.(*object) - if !ok { - // shouldn't happen, logical no-op - return nil - } + obj.sws = append(obj.sws, scw) atomic.StorePointer(&scw.obj, unsafe.Pointer(obj)) return obj @@ -498,19 +487,6 @@ func (b *outlierDetectionBalancer) removeSubConnFromAddressesMapEntry(scw *subCo } } -// sameAddrForMap returns if two addresses are the same in regards to subchannel -// uniqueness/identity (i.e. what the addresses map is keyed on - address -// string, Server Name, and Attributes). -func sameAddrForMap(oldAddr resolver.Address, newAddr resolver.Address) bool { - if oldAddr.Addr != newAddr.Addr { - return false - } - if oldAddr.ServerName != newAddr.ServerName { - return false - } - return oldAddr.Attributes.Equal(newAddr.Attributes) -} - func (b *outlierDetectionBalancer) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) { scw, ok := sc.(*subConnWrapper) if !ok { @@ -527,16 +503,15 @@ func (b *outlierDetectionBalancer) UpdateAddresses(sc balancer.SubConn, addrs [] // old address/new address). if len(scw.addresses) == 1 { if len(addrs) == 1 { // single address to single address - // If everything we care for in regards to address specificity for a - // list of SubConn's (Addr, ServerName, Attributes) is the same, - // then there is nothing to do past this point. - if sameAddrForMap(scw.addresses[0], addrs[0]) { + // If the updated address is the same, then there is nothing to do + // past this point. + if scw.addresses[0].Addr == addrs[0].Addr { return } // 1. Remove Subchannel from Addresses map entry if present in Addresses map. b.removeSubConnFromAddressesMapEntry(scw) // 2. Add Subchannel to Addresses map entry if new address present in map. - obj := b.appendIfPresent(addrs[0], scw) + obj := b.appendIfPresent(addrs[0].Addr, scw) // 3. Relay state with eject() recalculated (using the corresponding // map entry to see if it's currently ejected). if obj == nil { // uneject unconditionally because could have come from an ejected address @@ -562,7 +537,7 @@ func (b *outlierDetectionBalancer) UpdateAddresses(sc balancer.SubConn, addrs [] } else { if len(addrs) == 1 { // multiple addresses to single address // 1. Add Subchannel to Addresses map entry if new address present in map. - obj := b.appendIfPresent(addrs[0], scw) + obj := b.appendIfPresent(addrs[0].Addr, scw) if obj != nil && !obj.latestEjectionTimestamp.IsZero() { scw.eject() } @@ -580,23 +555,6 @@ func (b *outlierDetectionBalancer) Target() string { return b.cc.Target() } -// objects returns a list of objects corresponding to every address in the address map. -func (b *outlierDetectionBalancer) objects() []*object { - var objs []*object - for _, addr := range b.odAddrs.Keys() { - val, ok := b.odAddrs.Get(addr) - if !ok { // Shouldn't happen - continue - } - obj, ok := val.(*object) - if !ok { - continue - } - objs = append(objs, obj) - } - return objs -} - func max(x, y int64) int64 { if x < y { return y @@ -618,7 +576,7 @@ func (b *outlierDetectionBalancer) intervalTimerAlgorithm() { // 2. For each address, swap the call counter's buckets in that address's // map entry. - for _, obj := range b.objects() { + for _, obj := range b.odAddrs { obj.callCounter.swap() } @@ -635,15 +593,7 @@ func (b *outlierDetectionBalancer) intervalTimerAlgorithm() { } // 5. For each address in the map: - for _, addr := range b.odAddrs.Keys() { - val, ok := b.odAddrs.Get(addr) - if !ok { - continue - } - obj, ok := val.(*object) - if !ok { - continue - } + for addr, obj := range b.odAddrs { // If the address is not ejected and the multiplier is greater than 0, // decrease the multiplier by 1. if obj.latestEjectionTimestamp.IsZero() && obj.ejectionTimeMultiplier > 0 { @@ -658,6 +608,7 @@ func (b *outlierDetectionBalancer) intervalTimerAlgorithm() { b.unejectAddress(addr) } } + // This conditional only for testing (since the interval timer algorithm is // called manually), will never hit in production. if b.intervalTimer != nil { @@ -761,7 +712,7 @@ func (b *outlierDetectionBalancer) run() { // the map that have request volume of at least requestVolume. func (b *outlierDetectionBalancer) numAddrsWithAtLeastRequestVolume() uint32 { var numAddrs uint32 - for _, obj := range b.objects() { + for _, obj := range b.odAddrs { if uint32(obj.callCounter.inactiveBucket.requestVolume) >= b.odCfg.SuccessRateEjection.RequestVolume { numAddrs++ } @@ -777,21 +728,20 @@ func (b *outlierDetectionBalancer) meanAndStdDevOfSuccessesAtLeastRequestVolume( // success_rate_ejection.request_volume. var totalFractionOfSuccessfulRequests float64 var mean float64 - - for _, obj := range b.objects() { + for _, obj := range b.odAddrs { // "of at least success_rate_ejection.request_volume" if uint32(obj.callCounter.inactiveBucket.requestVolume) >= b.odCfg.SuccessRateEjection.RequestVolume { totalFractionOfSuccessfulRequests += float64(obj.callCounter.inactiveBucket.numSuccesses) / float64(obj.callCounter.inactiveBucket.requestVolume) } } - mean = totalFractionOfSuccessfulRequests / float64(b.odAddrs.Len()) + mean = totalFractionOfSuccessfulRequests / float64(len(b.odAddrs)) var sumOfSquares float64 - for _, obj := range b.objects() { + for _, obj := range b.odAddrs { devFromMean := (float64(obj.callCounter.inactiveBucket.numSuccesses) / float64(obj.callCounter.inactiveBucket.requestVolume)) - mean sumOfSquares += devFromMean * devFromMean } - variance := sumOfSquares / float64(b.odAddrs.Len()) + variance := sumOfSquares / float64(len(b.odAddrs)) return mean, math.Sqrt(variance) } @@ -810,20 +760,12 @@ func (b *outlierDetectionBalancer) successRateAlgorithm() { mean, stddev := b.meanAndStdDevOfSuccessesAtLeastRequestVolume() // 3. For each address: - for _, addr := range b.odAddrs.Keys() { - val, ok := b.odAddrs.Get(addr) - if !ok { - continue - } - obj, ok := val.(*object) - if !ok { - continue - } + for addr, obj := range b.odAddrs { ccb := obj.callCounter.inactiveBucket sre := b.odCfg.SuccessRateEjection // i. If the percentage of ejected addresses is greater than // max_ejection_percent, stop. - if float64(b.numAddrsEjected)/float64(b.odAddrs.Len())*100 > float64(b.odCfg.MaxEjectionPercent) { + if float64(b.numAddrsEjected)/float64(len(b.odAddrs))*100 > float64(b.odCfg.MaxEjectionPercent) { return } @@ -850,25 +792,17 @@ func (b *outlierDetectionBalancer) successRateAlgorithm() { func (b *outlierDetectionBalancer) failurePercentageAlgorithm() { // 1. If the number of addresses is less than // failure_percentage_ejection.minimum_hosts, stop. - if uint32(b.odAddrs.Len()) < b.odCfg.FailurePercentageEjection.MinimumHosts { + if uint32(len(b.odAddrs)) < b.odCfg.FailurePercentageEjection.MinimumHosts { return } // 2. For each address: - for _, addr := range b.odAddrs.Keys() { - val, ok := b.odAddrs.Get(addr) - if !ok { - continue - } - obj, ok := val.(*object) - if !ok { - continue - } + for addr, obj := range b.odAddrs { ccb := obj.callCounter.inactiveBucket fpe := b.odCfg.FailurePercentageEjection // i. If the percentage of ejected addresses is greater than // max_ejection_percent, stop. - if float64(b.numAddrsEjected)/float64(b.odAddrs.Len())*100 > float64(b.odCfg.MaxEjectionPercent) { + if float64(b.numAddrsEjected)/float64(len(b.odAddrs))*100 > float64(b.odCfg.MaxEjectionPercent) { return } // ii. If the address's total request volume is less than @@ -891,16 +825,11 @@ func (b *outlierDetectionBalancer) failurePercentageAlgorithm() { } } -func (b *outlierDetectionBalancer) ejectAddress(addr resolver.Address) { - val, ok := b.odAddrs.Get(addr) - if !ok { // Shouldn't happen - return - } - obj, ok := val.(*object) +func (b *outlierDetectionBalancer) ejectAddress(addr string) { + obj, ok := b.odAddrs[addr] if !ok { // Shouldn't happen return } - b.numAddrsEjected++ // To eject an address, set the current ejection timestamp to the timestamp @@ -914,12 +843,8 @@ func (b *outlierDetectionBalancer) ejectAddress(addr resolver.Address) { } } -func (b *outlierDetectionBalancer) unejectAddress(addr resolver.Address) { - val, ok := b.odAddrs.Get(addr) - if !ok { // Shouldn't happen - return - } - obj, ok := val.(*object) +func (b *outlierDetectionBalancer) unejectAddress(addr string) { + obj, ok := b.odAddrs[addr] if !ok { // Shouldn't happen return } diff --git a/xds/internal/balancer/outlierdetection/balancer_test.go b/xds/internal/balancer/outlierdetection/balancer_test.go index 16ae32cdc83b..b84831eecad0 100644 --- a/xds/internal/balancer/outlierdetection/balancer_test.go +++ b/xds/internal/balancer/outlierdetection/balancer_test.go @@ -969,16 +969,10 @@ func (s) TestPicker(t *testing.T) { pi.Done(balancer.DoneInfo{}) pi.Done(balancer.DoneInfo{Err: errors.New("some error")}) od.mu.Lock() - val, ok := od.odAddrs.Get(resolver.Address{ - Addr: "address1", - }) + obj, ok := od.odAddrs["address1"] if !ok { t.Fatal("map entry for address: address1 not present in map") } - obj, ok := val.(*object) - if !ok { - t.Fatal("map value isn't obj type") - } bucketWant := &bucket{ numSuccesses: 1, numFailures: 1, @@ -1045,16 +1039,10 @@ func (s) TestPicker(t *testing.T) { pi.Done(balancer.DoneInfo{Err: errors.New("some error")}) pi.Done(balancer.DoneInfo{Err: errors.New("some error")}) od.mu.Lock() - val, ok := od.odAddrs.Get(resolver.Address{ - Addr: "address1", - }) + obj, ok := od.odAddrs["address1"] if !ok { t.Fatal("map entry for address: address1 not present in map") } - obj, ok := val.(*object) - if !ok { - t.Fatal("map value isn't obj type") - } // The active bucket should be cleared because the interval timer // algorithm didn't run in between ClientConn updates and the picker @@ -1100,16 +1088,10 @@ func (s) TestPicker(t *testing.T) { pi.Done(balancer.DoneInfo{Err: errors.New("some error")}) pi.Done(balancer.DoneInfo{Err: errors.New("some error")}) od.mu.Lock() - val, ok := od.odAddrs.Get(resolver.Address{ - Addr: "address1", - }) + obj, ok := od.odAddrs["address1"] if !ok { t.Fatal("map entry for address: address1 not present in map") } - obj, ok := val.(*object) - if !ok { - t.Fatal("map value isn't obj type") - } // The active bucket should be cleared because the interval timer // algorithm didn't run in between ClientConn updates and the picker