From 6393ab4847526351053c1b8f76c69c21ac9bb60f Mon Sep 17 00:00:00 2001 From: Zach Reyes Date: Wed, 17 Nov 2021 14:37:45 -0500 Subject: [PATCH] Responded to Doug's comments and added tests --- .../balancer/cdsbalancer/cdsbalancer.go | 6 +- xds/internal/resolver/serviceconfig.go | 9 +- xds/internal/resolver/xds_resolver.go | 7 +- xds/internal/resolver/xds_resolver_test.go | 138 ++++++++++++++---- 4 files changed, 126 insertions(+), 34 deletions(-) diff --git a/xds/internal/balancer/cdsbalancer/cdsbalancer.go b/xds/internal/balancer/cdsbalancer/cdsbalancer.go index 5f898c87918..7ef7f2f5e90 100644 --- a/xds/internal/balancer/cdsbalancer/cdsbalancer.go +++ b/xds/internal/balancer/cdsbalancer/cdsbalancer.go @@ -21,6 +21,7 @@ import ( "encoding/json" "errors" "fmt" + "strings" "google.golang.org/grpc/balancer" "google.golang.org/grpc/balancer/base" @@ -42,7 +43,8 @@ import ( ) const ( - cdsName = "cds_experimental" + cdsName = "cds_experimental" + clusterPrefix = "cluster:" ) var ( @@ -179,7 +181,7 @@ func (b *cdsBalancer) handleClientConnUpdate(update *ccUpdate) { b.handleErrorFromUpdate(err, true) return } - b.clusterHandler.updateRootCluster(update.clusterName) + b.clusterHandler.updateRootCluster(strings.TrimPrefix(update.clusterName, clusterPrefix)) } // handleSecurityConfig processes the security configuration received from the diff --git a/xds/internal/resolver/serviceconfig.go b/xds/internal/resolver/serviceconfig.go index c65ce8c7b7c..d2b8e645bc1 100644 --- a/xds/internal/resolver/serviceconfig.go +++ b/xds/internal/resolver/serviceconfig.go @@ -92,7 +92,7 @@ func serviceConfigJSON(activeClusters map[string]*clusterInfo, clusterSpecifierP for cluster := range activeClusters { // Look into cluster specifier plugins, which hasn't had any prefix attached to it's cluster specifier plugin names, // to determine the LB Config if the cluster is a CSP. - cspCfg, ok := clusterSpecifierPlugins[strings.TrimPrefix(cluster, "cluster_specifier_plugin:")] + cspCfg, ok := clusterSpecifierPlugins[strings.TrimPrefix(cluster, clusterSpecifierPluginPrefix)] if ok { children[cluster] = xdsChildConfig{ ChildPolicy: balancerConfig(cspCfg), @@ -366,9 +366,10 @@ func (r *xdsResolver) newConfigSelector(su serviceUpdate) (*configSelector, erro httpFilterConfigOverride: su.virtualHost.HTTPFilterConfigOverride, retryConfig: su.virtualHost.RetryConfig, }, - routes: make([]route, len(su.virtualHost.Routes)), - clusters: make(map[string]*clusterInfo), - httpFilterConfig: su.ldsConfig.httpFilterConfig, + routes: make([]route, len(su.virtualHost.Routes)), + clusters: make(map[string]*clusterInfo), + clusterSpecifierPlugins: su.clusterSpecifierPlugins, + httpFilterConfig: su.ldsConfig.httpFilterConfig, } for i, rt := range su.virtualHost.Routes { diff --git a/xds/internal/resolver/xds_resolver.go b/xds/internal/resolver/xds_resolver.go index 7e29e57057c..212c1cd23b0 100644 --- a/xds/internal/resolver/xds_resolver.go +++ b/xds/internal/resolver/xds_resolver.go @@ -30,6 +30,7 @@ import ( "google.golang.org/grpc/internal/pretty" iresolver "google.golang.org/grpc/internal/resolver" "google.golang.org/grpc/resolver" + "google.golang.org/grpc/xds/internal/clusterspecifier" "google.golang.org/grpc/xds/internal/xdsclient" "google.golang.org/grpc/xds/internal/xdsclient/bootstrap" "google.golang.org/grpc/xds/internal/xdsclient/xdsresource" @@ -206,7 +207,11 @@ func (r *xdsResolver) sendNewServiceConfig(cs *configSelector) bool { } // Produce the service config. - sc, err := serviceConfigJSON(r.activeClusters, cs.clusterSpecifierPlugins) + csps := make(map[string]clusterspecifier.BalancerConfig) + if cs != nil { + csps = cs.clusterSpecifierPlugins + } + sc, err := serviceConfigJSON(r.activeClusters, csps) if err != nil { // JSON marshal error; should never happen. r.logger.Errorf("%v", err) diff --git a/xds/internal/resolver/xds_resolver_test.go b/xds/internal/resolver/xds_resolver_test.go index df4f4780371..962ad5ec9c4 100644 --- a/xds/internal/resolver/xds_resolver_test.go +++ b/xds/internal/resolver/xds_resolver_test.go @@ -29,6 +29,7 @@ import ( xxhash "github.com/cespare/xxhash/v2" "github.com/google/go-cmp/cmp" + "google.golang.org/grpc/balancer" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials/insecure" xdscreds "google.golang.org/grpc/credentials/xds" @@ -46,6 +47,7 @@ import ( _ "google.golang.org/grpc/xds/internal/balancer/cdsbalancer" // To parse LB config "google.golang.org/grpc/xds/internal/balancer/clustermanager" "google.golang.org/grpc/xds/internal/balancer/ringhash" + "google.golang.org/grpc/xds/internal/clusterspecifier" "google.golang.org/grpc/xds/internal/httpfilter" "google.golang.org/grpc/xds/internal/httpfilter/router" "google.golang.org/grpc/xds/internal/testutils/fakeclient" @@ -473,12 +475,12 @@ func (s) TestXDSResolverGoodServiceUpdate(t *testing.T) { wantJSON: `{"loadBalancingConfig":[{ "xds_cluster_manager_experimental":{ "children":{ - "test-cluster-1":{ - "childPolicy":[{"cds_experimental":{"cluster":"test-cluster-1"}}] + "cluster:test-cluster-1":{ + "childPolicy":[{"cds_experimental":{"cluster":"cluster:test-cluster-1"}}] } } }}]}`, - wantClusters: map[string]bool{"test-cluster-1": true}, + wantClusters: map[string]bool{"cluster:test-cluster-1": true}, }, { routes: []*xdsresource.Route{{Prefix: newStringP(""), WeightedClusters: map[string]xdsresource.WeightedCluster{ @@ -491,18 +493,18 @@ func (s) TestXDSResolverGoodServiceUpdate(t *testing.T) { wantJSON: `{"loadBalancingConfig":[{ "xds_cluster_manager_experimental":{ "children":{ - "test-cluster-1":{ - "childPolicy":[{"cds_experimental":{"cluster":"test-cluster-1"}}] + "cluster:test-cluster-1":{ + "childPolicy":[{"cds_experimental":{"cluster":"cluster:test-cluster-1"}}] }, - "cluster_1":{ - "childPolicy":[{"cds_experimental":{"cluster":"cluster_1"}}] + "cluster:cluster_1":{ + "childPolicy":[{"cds_experimental":{"cluster":"cluster:cluster_1"}}] }, - "cluster_2":{ - "childPolicy":[{"cds_experimental":{"cluster":"cluster_2"}}] + "cluster:cluster_2":{ + "childPolicy":[{"cds_experimental":{"cluster":"cluster:cluster_2"}}] } } }}]}`, - wantClusters: map[string]bool{"cluster_1": true, "cluster_2": true}, + wantClusters: map[string]bool{"cluster:cluster_1": true, "cluster:cluster_2": true}, }, { routes: []*xdsresource.Route{{Prefix: newStringP(""), WeightedClusters: map[string]xdsresource.WeightedCluster{ @@ -515,15 +517,15 @@ func (s) TestXDSResolverGoodServiceUpdate(t *testing.T) { wantJSON: `{"loadBalancingConfig":[{ "xds_cluster_manager_experimental":{ "children":{ - "cluster_1":{ - "childPolicy":[{"cds_experimental":{"cluster":"cluster_1"}}] + "cluster:cluster_1":{ + "childPolicy":[{"cds_experimental":{"cluster":"cluster:cluster_1"}}] }, - "cluster_2":{ - "childPolicy":[{"cds_experimental":{"cluster":"cluster_2"}}] + "cluster:cluster_2":{ + "childPolicy":[{"cds_experimental":{"cluster":"cluster:cluster_2"}}] } } }}]}`, - wantClusters: map[string]bool{"cluster_1": true, "cluster_2": true}, + wantClusters: map[string]bool{"cluster:cluster_1": true, "cluster:cluster_2": true}, }, } { // Invoke the watchAPI callback with a good service update and wait for the @@ -725,8 +727,8 @@ func (s) TestXDSResolverRemovedResource(t *testing.T) { wantJSON := `{"loadBalancingConfig":[{ "xds_cluster_manager_experimental":{ "children":{ - "test-cluster-1":{ - "childPolicy":[{"cds_experimental":{"cluster":"test-cluster-1"}}] + "cluster:test-cluster-1":{ + "childPolicy":[{"cds_experimental":{"cluster":"cluster:test-cluster-1"}}] } } }}]}` @@ -857,12 +859,94 @@ func (s) TestXDSResolverWRR(t *testing.T) { picks[clustermanager.GetPickedClusterForTesting(res.Context)]++ res.OnCommitted() } - want := map[string]int{"A": 10, "B": 20} + want := map[string]int{"cluster:A": 10, "cluster:B": 20} if !reflect.DeepEqual(picks, want) { t.Errorf("picked clusters = %v; want %v", picks, want) } } +func init() { + balancer.Register(cspB{}) +} + +type cspB struct{} + +func (cspB) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer { + return nil +} + +func (cspB) Name() string { + return "csp_experimental" +} + +type cspConfig struct { + ArbitraryField string `json:"arbitrary_field"` +} + +func TestXDSResolverClusterSpecifierPlugin(t *testing.T) { + xdsR, xdsC, tcc, cancel := testSetup(t, setupOpts{target: target}) + defer xdsR.Close() + defer cancel() + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + waitForWatchListener(ctx, t, xdsC, targetStr) + xdsC.InvokeWatchListenerCallback(xdsresource.ListenerUpdate{RouteConfigName: routeStr, HTTPFilters: routerFilterList}, nil) + + waitForWatchRouteConfig(ctx, t, xdsC, routeStr) + xdsC.InvokeWatchRouteConfigCallback("", xdsresource.RouteConfigUpdate{ + VirtualHosts: []*xdsresource.VirtualHost{ + { + Domains: []string{targetStr}, + Routes: []*xdsresource.Route{{Prefix: newStringP(""), ClusterSpecifierPlugin: "cspA"}}, + }, + }, + // Top level csp config here - the value of cspA should get directly + // placed as a child policy of xds cluster manager. + ClusterSpecifierPlugins: map[string]clusterspecifier.BalancerConfig{"cspA": []map[string]interface{}{{"csp_experimental": cspConfig{ArbitraryField: "anything"}}}}, + }, nil) + + gotState, err := tcc.stateCh.Receive(ctx) + if err != nil { + t.Fatalf("Error waiting for UpdateState to be called: %v", err) + } + rState := gotState.(resolver.State) + if err := rState.ServiceConfig.Err; err != nil { + t.Fatalf("ClientConn.UpdateState received error in service config: %v", rState.ServiceConfig.Err) + } + wantJSON := `{"loadBalancingConfig":[{ + "xds_cluster_manager_experimental":{ + "children":{ + "cluster_specifier_plugin:cspA":{ + "childPolicy":[{"csp_experimental":{"arbitrary_field":"anything"}}] + } + } + }}]}` + + wantSCParsed := internal.ParseServiceConfigForTesting.(func(string) *serviceconfig.ParseResult)(wantJSON) + if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed.Config) { + t.Errorf("ClientConn.UpdateState received different service config") + t.Error("got: ", cmp.Diff(nil, rState.ServiceConfig.Config)) + t.Fatal("want: ", cmp.Diff(nil, wantSCParsed.Config)) + } + + cs := iresolver.GetConfigSelector(rState) + if cs == nil { + t.Fatal("received nil config selector") + } + + res, err := cs.SelectConfig(iresolver.RPCInfo{Context: context.Background()}) + if err != nil { + t.Fatalf("Unexpected error from cs.SelectConfig(_): %v", err) + } + + cluster := clustermanager.GetPickedClusterForTesting(res.Context) + clusterWant := clusterSpecifierPluginPrefix + "cspA" + if cluster != clusterWant { + t.Fatalf("cluster: %+v, want: %+v", cluster, clusterWant) + } +} + func (s) TestXDSResolverMaxStreamDuration(t *testing.T) { xdsR, xdsC, tcc, cancel := testSetup(t, setupOpts{target: target}) defer xdsR.Close() @@ -987,8 +1071,8 @@ func (s) TestXDSResolverDelayedOnCommitted(t *testing.T) { wantJSON := `{"loadBalancingConfig":[{ "xds_cluster_manager_experimental":{ "children":{ - "test-cluster-1":{ - "childPolicy":[{"cds_experimental":{"cluster":"test-cluster-1"}}] + "cluster:test-cluster-1":{ + "childPolicy":[{"cds_experimental":{"cluster":"cluster:test-cluster-1"}}] } } }}]}` @@ -1009,7 +1093,7 @@ func (s) TestXDSResolverDelayedOnCommitted(t *testing.T) { t.Fatalf("Unexpected error from cs.SelectConfig(_): %v", err) } cluster := clustermanager.GetPickedClusterForTesting(res.Context) - if cluster != "test-cluster-1" { + if cluster != "cluster:test-cluster-1" { t.Fatalf("") } // delay res.OnCommitted() @@ -1046,11 +1130,11 @@ func (s) TestXDSResolverDelayedOnCommitted(t *testing.T) { wantJSON2 := `{"loadBalancingConfig":[{ "xds_cluster_manager_experimental":{ "children":{ - "test-cluster-1":{ - "childPolicy":[{"cds_experimental":{"cluster":"test-cluster-1"}}] + "cluster:test-cluster-1":{ + "childPolicy":[{"cds_experimental":{"cluster":"cluster:test-cluster-1"}}] }, - "NEW":{ - "childPolicy":[{"cds_experimental":{"cluster":"NEW"}}] + "cluster:NEW":{ + "childPolicy":[{"cds_experimental":{"cluster":"cluster:NEW"}}] } } }}]}` @@ -1084,8 +1168,8 @@ func (s) TestXDSResolverDelayedOnCommitted(t *testing.T) { wantJSON3 := `{"loadBalancingConfig":[{ "xds_cluster_manager_experimental":{ "children":{ - "NEW":{ - "childPolicy":[{"cds_experimental":{"cluster":"NEW"}}] + "cluster:NEW":{ + "childPolicy":[{"cds_experimental":{"cluster":"cluster:NEW"}}] } } }}]}`