From 44160e9d4c67de2756f93a0f4e83da8293df8f80 Mon Sep 17 00:00:00 2001 From: Zach Reyes Date: Mon, 15 Nov 2021 17:34:18 -0500 Subject: [PATCH 1/7] Added support for cluster specifier plugins in xds resolver --- xds/internal/resolver/serviceconfig.go | 90 +++++++++++++++++++------- xds/internal/resolver/watch_service.go | 5 ++ xds/internal/resolver/xds_resolver.go | 2 +- 3 files changed, 71 insertions(+), 26 deletions(-) diff --git a/xds/internal/resolver/serviceconfig.go b/xds/internal/resolver/serviceconfig.go index 77287309210..aef8cee15af 100644 --- a/xds/internal/resolver/serviceconfig.go +++ b/xds/internal/resolver/serviceconfig.go @@ -38,6 +38,7 @@ import ( "google.golang.org/grpc/status" "google.golang.org/grpc/xds/internal/balancer/clustermanager" "google.golang.org/grpc/xds/internal/balancer/ringhash" + "google.golang.org/grpc/xds/internal/clusterspecifier" "google.golang.org/grpc/xds/internal/httpfilter" "google.golang.org/grpc/xds/internal/httpfilter/router" "google.golang.org/grpc/xds/internal/xdsclient/xdsresource" @@ -83,12 +84,23 @@ func (r *xdsResolver) pruneActiveClusters() { // serviceConfigJSON produces a service config in JSON format representing all // the clusters referenced in activeClusters. This includes clusters with zero // references, so they must be pruned first. -func serviceConfigJSON(activeClusters map[string]*clusterInfo) ([]byte, error) { +func serviceConfigJSON(activeClusters map[string]*clusterInfo, clusterSpecifierPlugins map[string]clusterspecifier.BalancerConfig) ([]byte, error) { // Generate children (all entries in activeClusters). children := make(map[string]xdsChildConfig) for cluster := range activeClusters { - children[cluster] = xdsChildConfig{ - ChildPolicy: newBalancerConfig(cdsName, cdsBalancerConfig{Cluster: cluster}), + // Look into cluster specifier plugins, which hasn't had any prefix attached to it's cluster specifier plugin names, + // to determine the LB Config if the cluster is a CSP. + cspCfg, ok := clusterSpecifierPlugins[strings.TrimPrefix(cluster, "cluster_specifier_plugin:")] + if ok { + children[cluster] = xdsChildConfig{ + ChildPolicy: balancerConfig(cspCfg), + } + } else { + // Will now have "cluster:" prefixing the cluster name...CDS policy + // will now have to trim this off when it queries CDS. + children[cluster] = xdsChildConfig{ + ChildPolicy: newBalancerConfig(cdsName, cdsBalancerConfig{Cluster: cluster}), + } } } @@ -121,7 +133,11 @@ type routeCluster struct { type route struct { m *xdsresource.CompositeMatcher // converted from route matchers + + // Exactly one of clusterSpecifierPlugin or clusters will be set. + clusterSpecifierPlugin string clusters wrr.WRR // holds *routeCluster entries + maxStreamDuration time.Duration // map from filter name to its config httpFilterConfigOverride map[string]httpfilter.FilterConfig @@ -134,11 +150,15 @@ func (r route) String() string { } type configSelector struct { - r *xdsResolver - virtualHost virtualHost - routes []route - clusters map[string]*clusterInfo - httpFilterConfig []xdsresource.HTTPFilter + r *xdsResolver + virtualHost virtualHost + routes []route + clusters map[string]*clusterInfo + httpFilterConfig []xdsresource.HTTPFilter + clusterSpecifierPlugins map[string]clusterspecifier.BalancerConfig + // Will be used for: + // a. serviceConfigJSON (this will build out the service config...but you still need to keep ref + // counts in active clusters), this will be used to get the LB Configurations. } var errNoMatchedRouteFound = status.Errorf(codes.Unavailable, "no matched route was found") @@ -158,10 +178,19 @@ func (cs *configSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*iresolver.RP if rt == nil || rt.clusters == nil { return nil, errNoMatchedRouteFound } - cluster, ok := rt.clusters.Next().(*routeCluster) - if !ok { - return nil, status.Errorf(codes.Internal, "error retrieving cluster for match: %v (%T)", cluster, cluster) + + var cluster *routeCluster + if rt.clusterSpecifierPlugin != "" { + cluster.name = rt.clusterSpecifierPlugin + // cluster.httpFilterConfigOverride = /*Do cluster specifier plugins even have http filter config overrides*/ + } else { + cluster, ok := rt.clusters.Next().(*routeCluster) + if !ok { + return nil, status.Errorf(codes.Internal, "error retrieving cluster for match: %v (%T)", cluster, cluster) + } } + + // Add a ref to the selected cluster, as this RPC needs this cluster until // it is committed. ref := &cs.clusters[cluster.name].refCount @@ -352,24 +381,35 @@ func (r *xdsResolver) newConfigSelector(su serviceUpdate) (*configSelector, erro } for i, rt := range su.virtualHost.Routes { - clusters := newWRR() - for cluster, wc := range rt.WeightedClusters { - clusters.Add(&routeCluster{ - name: cluster, - httpFilterConfigOverride: wc.HTTPFilterConfigOverride, - }, int64(wc.Weight)) - - // Initialize entries in cs.clusters map, creating entries in - // r.activeClusters as necessary. Set to zero as they will be - // incremented by incRefs. - ci := r.activeClusters[cluster] + if rt.ClusterSpecifierPlugin != "" { + ci := r.activeClusters["cluster_specifier_plugin:" + rt.ClusterSpecifierPlugin] if ci == nil { ci = &clusterInfo{refCount: 0} - r.activeClusters[cluster] = ci + r.activeClusters["cluster_specifier_plugin:" + rt.ClusterSpecifierPlugin] = ci + } + cs.clusters["cluster_specifier_plugin:" + rt.ClusterSpecifierPlugin] = ci + + cs.routes[i].clusterSpecifierPlugin = "cluster_specifier_plugin:" + rt.ClusterSpecifierPlugin + } else { + clusters := newWRR() + for cluster, wc := range rt.WeightedClusters { + clusters.Add(&routeCluster{ + name: "cluster:" + cluster, + httpFilterConfigOverride: wc.HTTPFilterConfigOverride, + }, int64(wc.Weight)) + + // Initialize entries in cs.clusters map, creating entries in + // r.activeClusters as necessary. Set to zero as they will be + // incremented by incRefs. + ci := r.activeClusters["cluster:" + cluster] + if ci == nil { + ci = &clusterInfo{refCount: 0} + r.activeClusters["cluster:" + cluster] = ci + } + cs.clusters["cluster:" + cluster] = ci } - cs.clusters[cluster] = ci + cs.routes[i].clusters = clusters } - cs.routes[i].clusters = clusters var err error cs.routes[i].m, err = xdsresource.RouteToMatcher(rt) diff --git a/xds/internal/resolver/watch_service.go b/xds/internal/resolver/watch_service.go index 30f65727d08..06897265785 100644 --- a/xds/internal/resolver/watch_service.go +++ b/xds/internal/resolver/watch_service.go @@ -25,6 +25,7 @@ import ( "google.golang.org/grpc/internal/grpclog" "google.golang.org/grpc/internal/pretty" + "google.golang.org/grpc/xds/internal/clusterspecifier" "google.golang.org/grpc/xds/internal/xdsclient" "google.golang.org/grpc/xds/internal/xdsclient/xdsresource" ) @@ -35,6 +36,9 @@ import ( type serviceUpdate struct { // virtualHost contains routes and other configuration to route RPCs. virtualHost *xdsresource.VirtualHost + // clusterSpecifierPlugins contain the configurations for any cluster + // specifier plugins emitted by the xdsclient. + clusterSpecifierPlugins map[string]clusterspecifier.BalancerConfig // ldsConfig contains configuration that applies to all routes. ldsConfig ldsConfig } @@ -160,6 +164,7 @@ func (w *serviceUpdateWatcher) updateVirtualHostsFromRDS(update xdsresource.Rout } w.lastUpdate.virtualHost = matchVh + w.lastUpdate.clusterSpecifierPlugins = update.ClusterSpecifierPlugins w.serviceCb(w.lastUpdate, nil) } diff --git a/xds/internal/resolver/xds_resolver.go b/xds/internal/resolver/xds_resolver.go index 2192051ae2f..7e29e57057c 100644 --- a/xds/internal/resolver/xds_resolver.go +++ b/xds/internal/resolver/xds_resolver.go @@ -206,7 +206,7 @@ func (r *xdsResolver) sendNewServiceConfig(cs *configSelector) bool { } // Produce the service config. - sc, err := serviceConfigJSON(r.activeClusters) + sc, err := serviceConfigJSON(r.activeClusters, cs.clusterSpecifierPlugins) if err != nil { // JSON marshal error; should never happen. r.logger.Errorf("%v", err) From 8a6bf559c5b09c9f01c9b51256ebcf7dd09370a9 Mon Sep 17 00:00:00 2001 From: Zach Reyes Date: Tue, 16 Nov 2021 15:42:02 -0500 Subject: [PATCH 2/7] Responded to Doug's comments --- xds/internal/resolver/serviceconfig.go | 26 +++++++++----------------- xds/internal/resolver/watch_service.go | 6 +++--- 2 files changed, 12 insertions(+), 20 deletions(-) diff --git a/xds/internal/resolver/serviceconfig.go b/xds/internal/resolver/serviceconfig.go index aef8cee15af..6ae1f42169a 100644 --- a/xds/internal/resolver/serviceconfig.go +++ b/xds/internal/resolver/serviceconfig.go @@ -133,11 +133,7 @@ type routeCluster struct { type route struct { m *xdsresource.CompositeMatcher // converted from route matchers - - // Exactly one of clusterSpecifierPlugin or clusters will be set. - clusterSpecifierPlugin string clusters wrr.WRR // holds *routeCluster entries - maxStreamDuration time.Duration // map from filter name to its config httpFilterConfigOverride map[string]httpfilter.FilterConfig @@ -179,15 +175,9 @@ func (cs *configSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*iresolver.RP return nil, errNoMatchedRouteFound } - var cluster *routeCluster - if rt.clusterSpecifierPlugin != "" { - cluster.name = rt.clusterSpecifierPlugin - // cluster.httpFilterConfigOverride = /*Do cluster specifier plugins even have http filter config overrides*/ - } else { - cluster, ok := rt.clusters.Next().(*routeCluster) - if !ok { - return nil, status.Errorf(codes.Internal, "error retrieving cluster for match: %v (%T)", cluster, cluster) - } + cluster, ok := rt.clusters.Next().(*routeCluster) + if !ok { + return nil, status.Errorf(codes.Internal, "error retrieving cluster for match: %v (%T)", cluster, cluster) } @@ -381,17 +371,19 @@ func (r *xdsResolver) newConfigSelector(su serviceUpdate) (*configSelector, erro } for i, rt := range su.virtualHost.Routes { + clusters := newWRR() if rt.ClusterSpecifierPlugin != "" { + clusters.Add(&routeCluster{ + name: "cluster:" + rt.ClusterSpecifierPlugin, + }, 1) + ci := r.activeClusters["cluster_specifier_plugin:" + rt.ClusterSpecifierPlugin] if ci == nil { ci = &clusterInfo{refCount: 0} r.activeClusters["cluster_specifier_plugin:" + rt.ClusterSpecifierPlugin] = ci } cs.clusters["cluster_specifier_plugin:" + rt.ClusterSpecifierPlugin] = ci - - cs.routes[i].clusterSpecifierPlugin = "cluster_specifier_plugin:" + rt.ClusterSpecifierPlugin } else { - clusters := newWRR() for cluster, wc := range rt.WeightedClusters { clusters.Add(&routeCluster{ name: "cluster:" + cluster, @@ -408,8 +400,8 @@ func (r *xdsResolver) newConfigSelector(su serviceUpdate) (*configSelector, erro } cs.clusters["cluster:" + cluster] = ci } - cs.routes[i].clusters = clusters } + cs.routes[i].clusters = clusters var err error cs.routes[i].m, err = xdsresource.RouteToMatcher(rt) diff --git a/xds/internal/resolver/watch_service.go b/xds/internal/resolver/watch_service.go index 06897265785..f7aff2fbd24 100644 --- a/xds/internal/resolver/watch_service.go +++ b/xds/internal/resolver/watch_service.go @@ -124,7 +124,7 @@ func (w *serviceUpdateWatcher) handleLDSResp(update xdsresource.ListenerUpdate, } // Handle the inline RDS update as if it's from an RDS watch. - w.updateVirtualHostsFromRDS(*update.InlineRouteConfig) + w.applyRouteConfigUpdate(*update.InlineRouteConfig) return } @@ -155,7 +155,7 @@ func (w *serviceUpdateWatcher) handleLDSResp(update xdsresource.ListenerUpdate, w.rdsCancel = w.c.WatchRouteConfig(update.RouteConfigName, w.handleRDSResp) } -func (w *serviceUpdateWatcher) updateVirtualHostsFromRDS(update xdsresource.RouteConfigUpdate) { +func (w *serviceUpdateWatcher) applyRouteConfigUpdate(update xdsresource.RouteConfigUpdate) { matchVh := xdsresource.FindBestMatchingVirtualHost(w.serviceName, update.VirtualHosts) if matchVh == nil { // No matching virtual host found. @@ -184,7 +184,7 @@ func (w *serviceUpdateWatcher) handleRDSResp(update xdsresource.RouteConfigUpdat w.serviceCb(serviceUpdate{}, err) return } - w.updateVirtualHostsFromRDS(update) + w.applyRouteConfigUpdate(update) } func (w *serviceUpdateWatcher) close() { From a6165293e4812139f76ed7b1d71f7054e486b70d Mon Sep 17 00:00:00 2001 From: Zach Reyes Date: Tue, 16 Nov 2021 18:19:11 -0500 Subject: [PATCH 3/7] Responded to Doug's comments --- xds/internal/resolver/serviceconfig.go | 44 +++++++++++++------------- 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/xds/internal/resolver/serviceconfig.go b/xds/internal/resolver/serviceconfig.go index 6ae1f42169a..c65ce8c7b7c 100644 --- a/xds/internal/resolver/serviceconfig.go +++ b/xds/internal/resolver/serviceconfig.go @@ -45,8 +45,10 @@ import ( ) const ( - cdsName = "cds_experimental" - xdsClusterManagerName = "xds_cluster_manager_experimental" + cdsName = "cds_experimental" + xdsClusterManagerName = "xds_cluster_manager_experimental" + clusterPrefix = "cluster:" + clusterSpecifierPluginPrefix = "cluster_specifier_plugin:" ) type serviceConfig struct { @@ -180,7 +182,6 @@ func (cs *configSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*iresolver.RP return nil, status.Errorf(codes.Internal, "error retrieving cluster for match: %v (%T)", cluster, cluster) } - // Add a ref to the selected cluster, as this RPC needs this cluster until // it is committed. ref := &cs.clusters[cluster.name].refCount @@ -373,32 +374,19 @@ func (r *xdsResolver) newConfigSelector(su serviceUpdate) (*configSelector, erro for i, rt := range su.virtualHost.Routes { clusters := newWRR() if rt.ClusterSpecifierPlugin != "" { + clusterName := clusterSpecifierPluginPrefix + rt.ClusterSpecifierPlugin clusters.Add(&routeCluster{ - name: "cluster:" + rt.ClusterSpecifierPlugin, + name: clusterName, }, 1) - - ci := r.activeClusters["cluster_specifier_plugin:" + rt.ClusterSpecifierPlugin] - if ci == nil { - ci = &clusterInfo{refCount: 0} - r.activeClusters["cluster_specifier_plugin:" + rt.ClusterSpecifierPlugin] = ci - } - cs.clusters["cluster_specifier_plugin:" + rt.ClusterSpecifierPlugin] = ci + r.initializeCluster(clusterName, cs) } else { for cluster, wc := range rt.WeightedClusters { + clusterName := clusterPrefix + cluster clusters.Add(&routeCluster{ - name: "cluster:" + cluster, + name: clusterName, httpFilterConfigOverride: wc.HTTPFilterConfigOverride, }, int64(wc.Weight)) - - // Initialize entries in cs.clusters map, creating entries in - // r.activeClusters as necessary. Set to zero as they will be - // incremented by incRefs. - ci := r.activeClusters["cluster:" + cluster] - if ci == nil { - ci = &clusterInfo{refCount: 0} - r.activeClusters["cluster:" + cluster] = ci - } - cs.clusters["cluster:" + cluster] = ci + r.initializeCluster(clusterName, cs) } } cs.routes[i].clusters = clusters @@ -429,6 +417,18 @@ func (r *xdsResolver) newConfigSelector(su serviceUpdate) (*configSelector, erro return cs, nil } +// initializeCluster initializes entries in cs.clusters map, creating entries in +// r.activeClusters as necessary. Any created entries will be set to zero as +// they will be incremented by incRefs. +func (r *xdsResolver) initializeCluster(clusterName string, cs *configSelector) { + ci := r.activeClusters[clusterName] + if ci == nil { + ci = &clusterInfo{refCount: 0} + r.activeClusters[clusterName] = ci + } + cs.clusters[clusterName] = ci +} + type clusterInfo struct { // number of references to this cluster; accessed atomically refCount int32 From 6393ab4847526351053c1b8f76c69c21ac9bb60f Mon Sep 17 00:00:00 2001 From: Zach Reyes Date: Wed, 17 Nov 2021 14:37:45 -0500 Subject: [PATCH 4/7] Responded to Doug's comments and added tests --- .../balancer/cdsbalancer/cdsbalancer.go | 6 +- xds/internal/resolver/serviceconfig.go | 9 +- xds/internal/resolver/xds_resolver.go | 7 +- xds/internal/resolver/xds_resolver_test.go | 138 ++++++++++++++---- 4 files changed, 126 insertions(+), 34 deletions(-) diff --git a/xds/internal/balancer/cdsbalancer/cdsbalancer.go b/xds/internal/balancer/cdsbalancer/cdsbalancer.go index 5f898c87918..7ef7f2f5e90 100644 --- a/xds/internal/balancer/cdsbalancer/cdsbalancer.go +++ b/xds/internal/balancer/cdsbalancer/cdsbalancer.go @@ -21,6 +21,7 @@ import ( "encoding/json" "errors" "fmt" + "strings" "google.golang.org/grpc/balancer" "google.golang.org/grpc/balancer/base" @@ -42,7 +43,8 @@ import ( ) const ( - cdsName = "cds_experimental" + cdsName = "cds_experimental" + clusterPrefix = "cluster:" ) var ( @@ -179,7 +181,7 @@ func (b *cdsBalancer) handleClientConnUpdate(update *ccUpdate) { b.handleErrorFromUpdate(err, true) return } - b.clusterHandler.updateRootCluster(update.clusterName) + b.clusterHandler.updateRootCluster(strings.TrimPrefix(update.clusterName, clusterPrefix)) } // handleSecurityConfig processes the security configuration received from the diff --git a/xds/internal/resolver/serviceconfig.go b/xds/internal/resolver/serviceconfig.go index c65ce8c7b7c..d2b8e645bc1 100644 --- a/xds/internal/resolver/serviceconfig.go +++ b/xds/internal/resolver/serviceconfig.go @@ -92,7 +92,7 @@ func serviceConfigJSON(activeClusters map[string]*clusterInfo, clusterSpecifierP for cluster := range activeClusters { // Look into cluster specifier plugins, which hasn't had any prefix attached to it's cluster specifier plugin names, // to determine the LB Config if the cluster is a CSP. - cspCfg, ok := clusterSpecifierPlugins[strings.TrimPrefix(cluster, "cluster_specifier_plugin:")] + cspCfg, ok := clusterSpecifierPlugins[strings.TrimPrefix(cluster, clusterSpecifierPluginPrefix)] if ok { children[cluster] = xdsChildConfig{ ChildPolicy: balancerConfig(cspCfg), @@ -366,9 +366,10 @@ func (r *xdsResolver) newConfigSelector(su serviceUpdate) (*configSelector, erro httpFilterConfigOverride: su.virtualHost.HTTPFilterConfigOverride, retryConfig: su.virtualHost.RetryConfig, }, - routes: make([]route, len(su.virtualHost.Routes)), - clusters: make(map[string]*clusterInfo), - httpFilterConfig: su.ldsConfig.httpFilterConfig, + routes: make([]route, len(su.virtualHost.Routes)), + clusters: make(map[string]*clusterInfo), + clusterSpecifierPlugins: su.clusterSpecifierPlugins, + httpFilterConfig: su.ldsConfig.httpFilterConfig, } for i, rt := range su.virtualHost.Routes { diff --git a/xds/internal/resolver/xds_resolver.go b/xds/internal/resolver/xds_resolver.go index 7e29e57057c..212c1cd23b0 100644 --- a/xds/internal/resolver/xds_resolver.go +++ b/xds/internal/resolver/xds_resolver.go @@ -30,6 +30,7 @@ import ( "google.golang.org/grpc/internal/pretty" iresolver "google.golang.org/grpc/internal/resolver" "google.golang.org/grpc/resolver" + "google.golang.org/grpc/xds/internal/clusterspecifier" "google.golang.org/grpc/xds/internal/xdsclient" "google.golang.org/grpc/xds/internal/xdsclient/bootstrap" "google.golang.org/grpc/xds/internal/xdsclient/xdsresource" @@ -206,7 +207,11 @@ func (r *xdsResolver) sendNewServiceConfig(cs *configSelector) bool { } // Produce the service config. - sc, err := serviceConfigJSON(r.activeClusters, cs.clusterSpecifierPlugins) + csps := make(map[string]clusterspecifier.BalancerConfig) + if cs != nil { + csps = cs.clusterSpecifierPlugins + } + sc, err := serviceConfigJSON(r.activeClusters, csps) if err != nil { // JSON marshal error; should never happen. r.logger.Errorf("%v", err) diff --git a/xds/internal/resolver/xds_resolver_test.go b/xds/internal/resolver/xds_resolver_test.go index df4f4780371..962ad5ec9c4 100644 --- a/xds/internal/resolver/xds_resolver_test.go +++ b/xds/internal/resolver/xds_resolver_test.go @@ -29,6 +29,7 @@ import ( xxhash "github.com/cespare/xxhash/v2" "github.com/google/go-cmp/cmp" + "google.golang.org/grpc/balancer" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials/insecure" xdscreds "google.golang.org/grpc/credentials/xds" @@ -46,6 +47,7 @@ import ( _ "google.golang.org/grpc/xds/internal/balancer/cdsbalancer" // To parse LB config "google.golang.org/grpc/xds/internal/balancer/clustermanager" "google.golang.org/grpc/xds/internal/balancer/ringhash" + "google.golang.org/grpc/xds/internal/clusterspecifier" "google.golang.org/grpc/xds/internal/httpfilter" "google.golang.org/grpc/xds/internal/httpfilter/router" "google.golang.org/grpc/xds/internal/testutils/fakeclient" @@ -473,12 +475,12 @@ func (s) TestXDSResolverGoodServiceUpdate(t *testing.T) { wantJSON: `{"loadBalancingConfig":[{ "xds_cluster_manager_experimental":{ "children":{ - "test-cluster-1":{ - "childPolicy":[{"cds_experimental":{"cluster":"test-cluster-1"}}] + "cluster:test-cluster-1":{ + "childPolicy":[{"cds_experimental":{"cluster":"cluster:test-cluster-1"}}] } } }}]}`, - wantClusters: map[string]bool{"test-cluster-1": true}, + wantClusters: map[string]bool{"cluster:test-cluster-1": true}, }, { routes: []*xdsresource.Route{{Prefix: newStringP(""), WeightedClusters: map[string]xdsresource.WeightedCluster{ @@ -491,18 +493,18 @@ func (s) TestXDSResolverGoodServiceUpdate(t *testing.T) { wantJSON: `{"loadBalancingConfig":[{ "xds_cluster_manager_experimental":{ "children":{ - "test-cluster-1":{ - "childPolicy":[{"cds_experimental":{"cluster":"test-cluster-1"}}] + "cluster:test-cluster-1":{ + "childPolicy":[{"cds_experimental":{"cluster":"cluster:test-cluster-1"}}] }, - "cluster_1":{ - "childPolicy":[{"cds_experimental":{"cluster":"cluster_1"}}] + "cluster:cluster_1":{ + "childPolicy":[{"cds_experimental":{"cluster":"cluster:cluster_1"}}] }, - "cluster_2":{ - "childPolicy":[{"cds_experimental":{"cluster":"cluster_2"}}] + "cluster:cluster_2":{ + "childPolicy":[{"cds_experimental":{"cluster":"cluster:cluster_2"}}] } } }}]}`, - wantClusters: map[string]bool{"cluster_1": true, "cluster_2": true}, + wantClusters: map[string]bool{"cluster:cluster_1": true, "cluster:cluster_2": true}, }, { routes: []*xdsresource.Route{{Prefix: newStringP(""), WeightedClusters: map[string]xdsresource.WeightedCluster{ @@ -515,15 +517,15 @@ func (s) TestXDSResolverGoodServiceUpdate(t *testing.T) { wantJSON: `{"loadBalancingConfig":[{ "xds_cluster_manager_experimental":{ "children":{ - "cluster_1":{ - "childPolicy":[{"cds_experimental":{"cluster":"cluster_1"}}] + "cluster:cluster_1":{ + "childPolicy":[{"cds_experimental":{"cluster":"cluster:cluster_1"}}] }, - "cluster_2":{ - "childPolicy":[{"cds_experimental":{"cluster":"cluster_2"}}] + "cluster:cluster_2":{ + "childPolicy":[{"cds_experimental":{"cluster":"cluster:cluster_2"}}] } } }}]}`, - wantClusters: map[string]bool{"cluster_1": true, "cluster_2": true}, + wantClusters: map[string]bool{"cluster:cluster_1": true, "cluster:cluster_2": true}, }, } { // Invoke the watchAPI callback with a good service update and wait for the @@ -725,8 +727,8 @@ func (s) TestXDSResolverRemovedResource(t *testing.T) { wantJSON := `{"loadBalancingConfig":[{ "xds_cluster_manager_experimental":{ "children":{ - "test-cluster-1":{ - "childPolicy":[{"cds_experimental":{"cluster":"test-cluster-1"}}] + "cluster:test-cluster-1":{ + "childPolicy":[{"cds_experimental":{"cluster":"cluster:test-cluster-1"}}] } } }}]}` @@ -857,12 +859,94 @@ func (s) TestXDSResolverWRR(t *testing.T) { picks[clustermanager.GetPickedClusterForTesting(res.Context)]++ res.OnCommitted() } - want := map[string]int{"A": 10, "B": 20} + want := map[string]int{"cluster:A": 10, "cluster:B": 20} if !reflect.DeepEqual(picks, want) { t.Errorf("picked clusters = %v; want %v", picks, want) } } +func init() { + balancer.Register(cspB{}) +} + +type cspB struct{} + +func (cspB) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer { + return nil +} + +func (cspB) Name() string { + return "csp_experimental" +} + +type cspConfig struct { + ArbitraryField string `json:"arbitrary_field"` +} + +func TestXDSResolverClusterSpecifierPlugin(t *testing.T) { + xdsR, xdsC, tcc, cancel := testSetup(t, setupOpts{target: target}) + defer xdsR.Close() + defer cancel() + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + waitForWatchListener(ctx, t, xdsC, targetStr) + xdsC.InvokeWatchListenerCallback(xdsresource.ListenerUpdate{RouteConfigName: routeStr, HTTPFilters: routerFilterList}, nil) + + waitForWatchRouteConfig(ctx, t, xdsC, routeStr) + xdsC.InvokeWatchRouteConfigCallback("", xdsresource.RouteConfigUpdate{ + VirtualHosts: []*xdsresource.VirtualHost{ + { + Domains: []string{targetStr}, + Routes: []*xdsresource.Route{{Prefix: newStringP(""), ClusterSpecifierPlugin: "cspA"}}, + }, + }, + // Top level csp config here - the value of cspA should get directly + // placed as a child policy of xds cluster manager. + ClusterSpecifierPlugins: map[string]clusterspecifier.BalancerConfig{"cspA": []map[string]interface{}{{"csp_experimental": cspConfig{ArbitraryField: "anything"}}}}, + }, nil) + + gotState, err := tcc.stateCh.Receive(ctx) + if err != nil { + t.Fatalf("Error waiting for UpdateState to be called: %v", err) + } + rState := gotState.(resolver.State) + if err := rState.ServiceConfig.Err; err != nil { + t.Fatalf("ClientConn.UpdateState received error in service config: %v", rState.ServiceConfig.Err) + } + wantJSON := `{"loadBalancingConfig":[{ + "xds_cluster_manager_experimental":{ + "children":{ + "cluster_specifier_plugin:cspA":{ + "childPolicy":[{"csp_experimental":{"arbitrary_field":"anything"}}] + } + } + }}]}` + + wantSCParsed := internal.ParseServiceConfigForTesting.(func(string) *serviceconfig.ParseResult)(wantJSON) + if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed.Config) { + t.Errorf("ClientConn.UpdateState received different service config") + t.Error("got: ", cmp.Diff(nil, rState.ServiceConfig.Config)) + t.Fatal("want: ", cmp.Diff(nil, wantSCParsed.Config)) + } + + cs := iresolver.GetConfigSelector(rState) + if cs == nil { + t.Fatal("received nil config selector") + } + + res, err := cs.SelectConfig(iresolver.RPCInfo{Context: context.Background()}) + if err != nil { + t.Fatalf("Unexpected error from cs.SelectConfig(_): %v", err) + } + + cluster := clustermanager.GetPickedClusterForTesting(res.Context) + clusterWant := clusterSpecifierPluginPrefix + "cspA" + if cluster != clusterWant { + t.Fatalf("cluster: %+v, want: %+v", cluster, clusterWant) + } +} + func (s) TestXDSResolverMaxStreamDuration(t *testing.T) { xdsR, xdsC, tcc, cancel := testSetup(t, setupOpts{target: target}) defer xdsR.Close() @@ -987,8 +1071,8 @@ func (s) TestXDSResolverDelayedOnCommitted(t *testing.T) { wantJSON := `{"loadBalancingConfig":[{ "xds_cluster_manager_experimental":{ "children":{ - "test-cluster-1":{ - "childPolicy":[{"cds_experimental":{"cluster":"test-cluster-1"}}] + "cluster:test-cluster-1":{ + "childPolicy":[{"cds_experimental":{"cluster":"cluster:test-cluster-1"}}] } } }}]}` @@ -1009,7 +1093,7 @@ func (s) TestXDSResolverDelayedOnCommitted(t *testing.T) { t.Fatalf("Unexpected error from cs.SelectConfig(_): %v", err) } cluster := clustermanager.GetPickedClusterForTesting(res.Context) - if cluster != "test-cluster-1" { + if cluster != "cluster:test-cluster-1" { t.Fatalf("") } // delay res.OnCommitted() @@ -1046,11 +1130,11 @@ func (s) TestXDSResolverDelayedOnCommitted(t *testing.T) { wantJSON2 := `{"loadBalancingConfig":[{ "xds_cluster_manager_experimental":{ "children":{ - "test-cluster-1":{ - "childPolicy":[{"cds_experimental":{"cluster":"test-cluster-1"}}] + "cluster:test-cluster-1":{ + "childPolicy":[{"cds_experimental":{"cluster":"cluster:test-cluster-1"}}] }, - "NEW":{ - "childPolicy":[{"cds_experimental":{"cluster":"NEW"}}] + "cluster:NEW":{ + "childPolicy":[{"cds_experimental":{"cluster":"cluster:NEW"}}] } } }}]}` @@ -1084,8 +1168,8 @@ func (s) TestXDSResolverDelayedOnCommitted(t *testing.T) { wantJSON3 := `{"loadBalancingConfig":[{ "xds_cluster_manager_experimental":{ "children":{ - "NEW":{ - "childPolicy":[{"cds_experimental":{"cluster":"NEW"}}] + "cluster:NEW":{ + "childPolicy":[{"cds_experimental":{"cluster":"cluster:NEW"}}] } } }}]}` From af713af498a3ad20cd790883e32be8ca73d75138 Mon Sep 17 00:00:00 2001 From: Zach Reyes Date: Tue, 23 Nov 2021 14:04:04 -0500 Subject: [PATCH 5/7] Responded to Doug's comments --- .../balancer/cdsbalancer/cdsbalancer.go | 6 +- xds/internal/resolver/serviceconfig.go | 40 ++-- xds/internal/resolver/xds_resolver.go | 8 +- xds/internal/resolver/xds_resolver_test.go | 182 ++++++++++++++++-- 4 files changed, 191 insertions(+), 45 deletions(-) diff --git a/xds/internal/balancer/cdsbalancer/cdsbalancer.go b/xds/internal/balancer/cdsbalancer/cdsbalancer.go index 7ef7f2f5e90..5f898c87918 100644 --- a/xds/internal/balancer/cdsbalancer/cdsbalancer.go +++ b/xds/internal/balancer/cdsbalancer/cdsbalancer.go @@ -21,7 +21,6 @@ import ( "encoding/json" "errors" "fmt" - "strings" "google.golang.org/grpc/balancer" "google.golang.org/grpc/balancer/base" @@ -43,8 +42,7 @@ import ( ) const ( - cdsName = "cds_experimental" - clusterPrefix = "cluster:" + cdsName = "cds_experimental" ) var ( @@ -181,7 +179,7 @@ func (b *cdsBalancer) handleClientConnUpdate(update *ccUpdate) { b.handleErrorFromUpdate(err, true) return } - b.clusterHandler.updateRootCluster(strings.TrimPrefix(update.clusterName, clusterPrefix)) + b.clusterHandler.updateRootCluster(update.clusterName) } // handleSecurityConfig processes the security configuration received from the diff --git a/xds/internal/resolver/serviceconfig.go b/xds/internal/resolver/serviceconfig.go index d2b8e645bc1..55060ad1abe 100644 --- a/xds/internal/resolver/serviceconfig.go +++ b/xds/internal/resolver/serviceconfig.go @@ -86,22 +86,17 @@ func (r *xdsResolver) pruneActiveClusters() { // serviceConfigJSON produces a service config in JSON format representing all // the clusters referenced in activeClusters. This includes clusters with zero // references, so they must be pruned first. -func serviceConfigJSON(activeClusters map[string]*clusterInfo, clusterSpecifierPlugins map[string]clusterspecifier.BalancerConfig) ([]byte, error) { +func serviceConfigJSON(activeClusters map[string]*clusterInfo) ([]byte, error) { // Generate children (all entries in activeClusters). children := make(map[string]xdsChildConfig) - for cluster := range activeClusters { - // Look into cluster specifier plugins, which hasn't had any prefix attached to it's cluster specifier plugin names, - // to determine the LB Config if the cluster is a CSP. - cspCfg, ok := clusterSpecifierPlugins[strings.TrimPrefix(cluster, clusterSpecifierPluginPrefix)] - if ok { + for cluster, ci := range activeClusters { + if ci.cspCfg != nil { children[cluster] = xdsChildConfig{ - ChildPolicy: balancerConfig(cspCfg), + ChildPolicy: balancerConfig(ci.cspCfg), } } else { - // Will now have "cluster:" prefixing the cluster name...CDS policy - // will now have to trim this off when it queries CDS. children[cluster] = xdsChildConfig{ - ChildPolicy: newBalancerConfig(cdsName, cdsBalancerConfig{Cluster: cluster}), + ChildPolicy: newBalancerConfig(cdsName, cdsBalancerConfig{Cluster: strings.TrimPrefix(cluster, clusterPrefix)}), } } } @@ -148,15 +143,11 @@ func (r route) String() string { } type configSelector struct { - r *xdsResolver - virtualHost virtualHost - routes []route - clusters map[string]*clusterInfo - httpFilterConfig []xdsresource.HTTPFilter - clusterSpecifierPlugins map[string]clusterspecifier.BalancerConfig - // Will be used for: - // a. serviceConfigJSON (this will build out the service config...but you still need to keep ref - // counts in active clusters), this will be used to get the LB Configurations. + r *xdsResolver + virtualHost virtualHost + routes []route + clusters map[string]*clusterInfo + httpFilterConfig []xdsresource.HTTPFilter } var errNoMatchedRouteFound = status.Errorf(codes.Unavailable, "no matched route was found") @@ -366,10 +357,9 @@ func (r *xdsResolver) newConfigSelector(su serviceUpdate) (*configSelector, erro httpFilterConfigOverride: su.virtualHost.HTTPFilterConfigOverride, retryConfig: su.virtualHost.RetryConfig, }, - routes: make([]route, len(su.virtualHost.Routes)), - clusters: make(map[string]*clusterInfo), - clusterSpecifierPlugins: su.clusterSpecifierPlugins, - httpFilterConfig: su.ldsConfig.httpFilterConfig, + routes: make([]route, len(su.virtualHost.Routes)), + clusters: make(map[string]*clusterInfo), + httpFilterConfig: su.ldsConfig.httpFilterConfig, } for i, rt := range su.virtualHost.Routes { @@ -380,6 +370,7 @@ func (r *xdsResolver) newConfigSelector(su serviceUpdate) (*configSelector, erro name: clusterName, }, 1) r.initializeCluster(clusterName, cs) + r.activeClusters[clusterName].cspCfg = su.clusterSpecifierPlugins[rt.ClusterSpecifierPlugin] } else { for cluster, wc := range rt.WeightedClusters { clusterName := clusterPrefix + cluster @@ -433,6 +424,9 @@ func (r *xdsResolver) initializeCluster(clusterName string, cs *configSelector) type clusterInfo struct { // number of references to this cluster; accessed atomically refCount int32 + // cspCfg is the configuration for this cluster if the cluster is a cluster + // specifier plugin. This will be nil otherwise. + cspCfg clusterspecifier.BalancerConfig } type interceptorList struct { diff --git a/xds/internal/resolver/xds_resolver.go b/xds/internal/resolver/xds_resolver.go index 212c1cd23b0..6788090e29c 100644 --- a/xds/internal/resolver/xds_resolver.go +++ b/xds/internal/resolver/xds_resolver.go @@ -30,7 +30,6 @@ import ( "google.golang.org/grpc/internal/pretty" iresolver "google.golang.org/grpc/internal/resolver" "google.golang.org/grpc/resolver" - "google.golang.org/grpc/xds/internal/clusterspecifier" "google.golang.org/grpc/xds/internal/xdsclient" "google.golang.org/grpc/xds/internal/xdsclient/bootstrap" "google.golang.org/grpc/xds/internal/xdsclient/xdsresource" @@ -206,12 +205,7 @@ func (r *xdsResolver) sendNewServiceConfig(cs *configSelector) bool { return true } - // Produce the service config. - csps := make(map[string]clusterspecifier.BalancerConfig) - if cs != nil { - csps = cs.clusterSpecifierPlugins - } - sc, err := serviceConfigJSON(r.activeClusters, csps) + sc, err := serviceConfigJSON(r.activeClusters) if err != nil { // JSON marshal error; should never happen. r.logger.Errorf("%v", err) diff --git a/xds/internal/resolver/xds_resolver_test.go b/xds/internal/resolver/xds_resolver_test.go index 962ad5ec9c4..0786102025e 100644 --- a/xds/internal/resolver/xds_resolver_test.go +++ b/xds/internal/resolver/xds_resolver_test.go @@ -476,7 +476,7 @@ func (s) TestXDSResolverGoodServiceUpdate(t *testing.T) { "xds_cluster_manager_experimental":{ "children":{ "cluster:test-cluster-1":{ - "childPolicy":[{"cds_experimental":{"cluster":"cluster:test-cluster-1"}}] + "childPolicy":[{"cds_experimental":{"cluster":"test-cluster-1"}}] } } }}]}`, @@ -494,13 +494,13 @@ func (s) TestXDSResolverGoodServiceUpdate(t *testing.T) { "xds_cluster_manager_experimental":{ "children":{ "cluster:test-cluster-1":{ - "childPolicy":[{"cds_experimental":{"cluster":"cluster:test-cluster-1"}}] + "childPolicy":[{"cds_experimental":{"cluster":"test-cluster-1"}}] }, "cluster:cluster_1":{ - "childPolicy":[{"cds_experimental":{"cluster":"cluster:cluster_1"}}] + "childPolicy":[{"cds_experimental":{"cluster":"cluster_1"}}] }, "cluster:cluster_2":{ - "childPolicy":[{"cds_experimental":{"cluster":"cluster:cluster_2"}}] + "childPolicy":[{"cds_experimental":{"cluster":"cluster_2"}}] } } }}]}`, @@ -518,10 +518,10 @@ func (s) TestXDSResolverGoodServiceUpdate(t *testing.T) { "xds_cluster_manager_experimental":{ "children":{ "cluster:cluster_1":{ - "childPolicy":[{"cds_experimental":{"cluster":"cluster:cluster_1"}}] + "childPolicy":[{"cds_experimental":{"cluster":"cluster_1"}}] }, "cluster:cluster_2":{ - "childPolicy":[{"cds_experimental":{"cluster":"cluster:cluster_2"}}] + "childPolicy":[{"cds_experimental":{"cluster":"cluster_2"}}] } } }}]}`, @@ -728,7 +728,7 @@ func (s) TestXDSResolverRemovedResource(t *testing.T) { "xds_cluster_manager_experimental":{ "children":{ "cluster:test-cluster-1":{ - "childPolicy":[{"cds_experimental":{"cluster":"cluster:test-cluster-1"}}] + "childPolicy":[{"cds_experimental":{"cluster":"test-cluster-1"}}] } } }}]}` @@ -1072,7 +1072,7 @@ func (s) TestXDSResolverDelayedOnCommitted(t *testing.T) { "xds_cluster_manager_experimental":{ "children":{ "cluster:test-cluster-1":{ - "childPolicy":[{"cds_experimental":{"cluster":"cluster:test-cluster-1"}}] + "childPolicy":[{"cds_experimental":{"cluster":"test-cluster-1"}}] } } }}]}` @@ -1131,10 +1131,10 @@ func (s) TestXDSResolverDelayedOnCommitted(t *testing.T) { "xds_cluster_manager_experimental":{ "children":{ "cluster:test-cluster-1":{ - "childPolicy":[{"cds_experimental":{"cluster":"cluster:test-cluster-1"}}] + "childPolicy":[{"cds_experimental":{"cluster":"test-cluster-1"}}] }, "cluster:NEW":{ - "childPolicy":[{"cds_experimental":{"cluster":"cluster:NEW"}}] + "childPolicy":[{"cds_experimental":{"cluster":"NEW"}}] } } }}]}` @@ -1169,7 +1169,7 @@ func (s) TestXDSResolverDelayedOnCommitted(t *testing.T) { "xds_cluster_manager_experimental":{ "children":{ "cluster:NEW":{ - "childPolicy":[{"cds_experimental":{"cluster":"cluster:NEW"}}] + "childPolicy":[{"cds_experimental":{"cluster":"NEW"}}] } } }}]}` @@ -1181,6 +1181,166 @@ func (s) TestXDSResolverDelayedOnCommitted(t *testing.T) { } } +// TestXDSResolverDelayedOnCommittedCSP tests that cluster specifier plugins and +// their corresponding configurations remain in service config if RPCs are in +// flight. +func (s) TestXDSResolverDelayedOnCommittedCSP(t *testing.T) { + xdsR, xdsC, tcc, cancel := testSetup(t, setupOpts{target: target}) + defer xdsR.Close() + defer cancel() + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + waitForWatchListener(ctx, t, xdsC, targetStr) + xdsC.InvokeWatchListenerCallback(xdsresource.ListenerUpdate{RouteConfigName: routeStr, HTTPFilters: routerFilterList}, nil) + waitForWatchRouteConfig(ctx, t, xdsC, routeStr) + + xdsC.InvokeWatchRouteConfigCallback("", xdsresource.RouteConfigUpdate{ + VirtualHosts: []*xdsresource.VirtualHost{ + { + Domains: []string{targetStr}, + Routes: []*xdsresource.Route{{Prefix: newStringP(""), ClusterSpecifierPlugin: "cspA"}}, + }, + }, + // Top level csp config here - the value of cspA should get directly + // placed as a child policy of xds cluster manager. + ClusterSpecifierPlugins: map[string]clusterspecifier.BalancerConfig{"cspA": []map[string]interface{}{{"csp_experimental": cspConfig{ArbitraryField: "anythingA"}}}}, + }, nil) + + gotState, err := tcc.stateCh.Receive(ctx) + if err != nil { + t.Fatalf("Error waiting for UpdateState to be called: %v", err) + } + rState := gotState.(resolver.State) + if err := rState.ServiceConfig.Err; err != nil { + t.Fatalf("ClientConn.UpdateState received error in service config: %v", rState.ServiceConfig.Err) + } + wantJSON := `{"loadBalancingConfig":[{ + "xds_cluster_manager_experimental":{ + "children":{ + "cluster_specifier_plugin:cspA":{ + "childPolicy":[{"csp_experimental":{"arbitrary_field":"anythingA"}}] + } + } + }}]}` + + wantSCParsed := internal.ParseServiceConfigForTesting.(func(string) *serviceconfig.ParseResult)(wantJSON) + if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed.Config) { + t.Errorf("ClientConn.UpdateState received different service config") + t.Error("got: ", cmp.Diff(nil, rState.ServiceConfig.Config)) + t.Fatal("want: ", cmp.Diff(nil, wantSCParsed.Config)) + } + + cs := iresolver.GetConfigSelector(rState) + if cs == nil { + t.Fatal("received nil config selector") + } + + res, err := cs.SelectConfig(iresolver.RPCInfo{Context: context.Background()}) + if err != nil { + t.Fatalf("Unexpected error from cs.SelectConfig(_): %v", err) + } + + cluster := clustermanager.GetPickedClusterForTesting(res.Context) + clusterWant := clusterSpecifierPluginPrefix + "cspA" + if cluster != clusterWant { + t.Fatalf("cluster: %+v, want: %+v", cluster, clusterWant) + } + // delay res.OnCommitted() + + // Perform TWO updates to ensure the old config selector does not hold a reference to cspA + xdsC.InvokeWatchRouteConfigCallback("", xdsresource.RouteConfigUpdate{ + VirtualHosts: []*xdsresource.VirtualHost{ + { + Domains: []string{targetStr}, + Routes: []*xdsresource.Route{{Prefix: newStringP(""), ClusterSpecifierPlugin: "cspB"}}, + }, + }, + // Top level csp config here - the value of cspB should get directly + // placed as a child policy of xds cluster manager. + ClusterSpecifierPlugins: map[string]clusterspecifier.BalancerConfig{"cspB": []map[string]interface{}{{"csp_experimental": cspConfig{ArbitraryField: "anythingB"}}}}, + }, nil) + tcc.stateCh.Receive(ctx) // Ignore the first update. + + xdsC.InvokeWatchRouteConfigCallback("", xdsresource.RouteConfigUpdate{ + VirtualHosts: []*xdsresource.VirtualHost{ + { + Domains: []string{targetStr}, + Routes: []*xdsresource.Route{{Prefix: newStringP(""), ClusterSpecifierPlugin: "cspB"}}, + }, + }, + // Top level csp config here - the value of cspB should get directly + // placed as a child policy of xds cluster manager. + ClusterSpecifierPlugins: map[string]clusterspecifier.BalancerConfig{"cspB": []map[string]interface{}{{"csp_experimental": cspConfig{ArbitraryField: "anythingB"}}}}, + }, nil) + + gotState, err = tcc.stateCh.Receive(ctx) + if err != nil { + t.Fatalf("Error waiting for UpdateState to be called: %v", err) + } + rState = gotState.(resolver.State) + if err := rState.ServiceConfig.Err; err != nil { + t.Fatalf("ClientConn.UpdateState received error in service config: %v", rState.ServiceConfig.Err) + } + wantJSON2 := `{"loadBalancingConfig":[{ + "xds_cluster_manager_experimental":{ + "children":{ + "cluster_specifier_plugin:cspA":{ + "childPolicy":[{"csp_experimental":{"arbitrary_field":"anythingA"}}] + }, + "cluster_specifier_plugin:cspB":{ + "childPolicy":[{"csp_experimental":{"arbitrary_field":"anythingB"}}] + } + } + }}]}` + + wantSCParsed2 := internal.ParseServiceConfigForTesting.(func(string) *serviceconfig.ParseResult)(wantJSON2) + if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed2.Config) { + t.Errorf("ClientConn.UpdateState received different service config") + t.Error("got: ", cmp.Diff(nil, rState.ServiceConfig.Config)) + t.Fatal("want: ", cmp.Diff(nil, wantSCParsed2.Config)) + } + + // Invoke OnCommitted; should lead to a service config update that deletes + // cspA. + res.OnCommitted() + + xdsC.InvokeWatchRouteConfigCallback("", xdsresource.RouteConfigUpdate{ + VirtualHosts: []*xdsresource.VirtualHost{ + { + Domains: []string{targetStr}, + Routes: []*xdsresource.Route{{Prefix: newStringP(""), ClusterSpecifierPlugin: "cspB"}}, + }, + }, + // Top level csp config here - the value of cspB should get directly + // placed as a child policy of xds cluster manager. + ClusterSpecifierPlugins: map[string]clusterspecifier.BalancerConfig{"cspB": []map[string]interface{}{{"csp_experimental": cspConfig{ArbitraryField: "anythingB"}}}}, + }, nil) + gotState, err = tcc.stateCh.Receive(ctx) + if err != nil { + t.Fatalf("Error waiting for UpdateState to be called: %v", err) + } + rState = gotState.(resolver.State) + if err := rState.ServiceConfig.Err; err != nil { + t.Fatalf("ClientConn.UpdateState received error in service config: %v", rState.ServiceConfig.Err) + } + wantJSON3 := `{"loadBalancingConfig":[{ + "xds_cluster_manager_experimental":{ + "children":{ + "cluster_specifier_plugin:cspB":{ + "childPolicy":[{"csp_experimental":{"arbitrary_field":"anythingB"}}] + } + } + }}]}` + + wantSCParsed3 := internal.ParseServiceConfigForTesting.(func(string) *serviceconfig.ParseResult)(wantJSON3) + if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed3.Config) { + t.Errorf("ClientConn.UpdateState received different service config") + t.Error("got: ", cmp.Diff(nil, rState.ServiceConfig.Config)) + t.Fatal("want: ", cmp.Diff(nil, wantSCParsed3.Config)) + } +} + // TestXDSResolverUpdates tests the cases where the resolver gets a good update // after an error, and an error after the good update. func (s) TestXDSResolverGoodUpdateAfterError(t *testing.T) { From d219ac69d7e82f1c068ca44f73f6a15bc32aef1d Mon Sep 17 00:00:00 2001 From: Zach Reyes Date: Tue, 30 Nov 2021 14:17:20 -0500 Subject: [PATCH 6/7] Responded to Doug's comments --- .../resolver/cluster_specifier_plugin_test.go | 368 ++++++++++++++++++ xds/internal/resolver/serviceconfig.go | 37 +- xds/internal/resolver/watch_service.go | 2 +- xds/internal/resolver/xds_resolver_test.go | 244 ------------ 4 files changed, 385 insertions(+), 266 deletions(-) create mode 100644 xds/internal/resolver/cluster_specifier_plugin_test.go diff --git a/xds/internal/resolver/cluster_specifier_plugin_test.go b/xds/internal/resolver/cluster_specifier_plugin_test.go new file mode 100644 index 00000000000..156c21ccd54 --- /dev/null +++ b/xds/internal/resolver/cluster_specifier_plugin_test.go @@ -0,0 +1,368 @@ +/* + * + * Copyright 2021 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package resolver + +import ( + "context" + "testing" + + "github.com/google/go-cmp/cmp" + "google.golang.org/grpc/balancer" + "google.golang.org/grpc/internal" + iresolver "google.golang.org/grpc/internal/resolver" + "google.golang.org/grpc/resolver" + "google.golang.org/grpc/serviceconfig" + "google.golang.org/grpc/xds/internal/balancer/clustermanager" + "google.golang.org/grpc/xds/internal/clusterspecifier" + "google.golang.org/grpc/xds/internal/xdsclient/xdsresource" +) + +func init() { + balancer.Register(cspB{}) +} + +type cspB struct{} + +func (cspB) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer { + return nil +} + +func (cspB) Name() string { + return "csp_experimental" +} + +type cspConfig struct { + ArbitraryField string `json:"arbitrary_field"` +} + +// TestXDSResolverClusterSpecifierPlugin tests that cluster specifier plugins +// produce the correct service config, and that the config selector routes to a +// cluster specifier plugin supported by this service config (i.e. prefixed with +// a cluster specifier plugin prefix). +func (s) TestXDSResolverClusterSpecifierPlugin(t *testing.T) { + xdsR, xdsC, tcc, cancel := testSetup(t, setupOpts{target: target}) + defer xdsR.Close() + defer cancel() + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + waitForWatchListener(ctx, t, xdsC, targetStr) + xdsC.InvokeWatchListenerCallback(xdsresource.ListenerUpdate{RouteConfigName: routeStr, HTTPFilters: routerFilterList}, nil) + + waitForWatchRouteConfig(ctx, t, xdsC, routeStr) + xdsC.InvokeWatchRouteConfigCallback("", xdsresource.RouteConfigUpdate{ + VirtualHosts: []*xdsresource.VirtualHost{ + { + Domains: []string{targetStr}, + Routes: []*xdsresource.Route{{Prefix: newStringP(""), ClusterSpecifierPlugin: "cspA"}}, + }, + }, + // Top level csp config here - the value of cspA should get directly + // placed as a child policy of xds cluster manager. + ClusterSpecifierPlugins: map[string]clusterspecifier.BalancerConfig{"cspA": []map[string]interface{}{{"csp_experimental": cspConfig{ArbitraryField: "anything"}}}}, + }, nil) + + gotState, err := tcc.stateCh.Receive(ctx) + if err != nil { + t.Fatalf("Error waiting for UpdateState to be called: %v", err) + } + rState := gotState.(resolver.State) + if err := rState.ServiceConfig.Err; err != nil { + t.Fatalf("ClientConn.UpdateState received error in service config: %v", rState.ServiceConfig.Err) + } + wantJSON := `{"loadBalancingConfig":[{ + "xds_cluster_manager_experimental":{ + "children":{ + "cluster_specifier_plugin:cspA":{ + "childPolicy":[{"csp_experimental":{"arbitrary_field":"anything"}}] + } + } + }}]}` + + wantSCParsed := internal.ParseServiceConfigForTesting.(func(string) *serviceconfig.ParseResult)(wantJSON) + if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed.Config) { + t.Errorf("ClientConn.UpdateState received different service config") + t.Error("got: ", cmp.Diff(nil, rState.ServiceConfig.Config)) + t.Fatal("want: ", cmp.Diff(nil, wantSCParsed.Config)) + } + + cs := iresolver.GetConfigSelector(rState) + if cs == nil { + t.Fatal("received nil config selector") + } + + res, err := cs.SelectConfig(iresolver.RPCInfo{Context: context.Background()}) + if err != nil { + t.Fatalf("Unexpected error from cs.SelectConfig(_): %v", err) + } + + cluster := clustermanager.GetPickedClusterForTesting(res.Context) + clusterWant := clusterSpecifierPluginPrefix + "cspA" + if cluster != clusterWant { + t.Fatalf("cluster: %+v, want: %+v", cluster, clusterWant) + } +} + +// TestXDSResolverClusterSpecifierPluginConfigUpdate tests that cluster +// specifier plugins produce the correct service config, and that on an update +// to the CSP Configuration, the new config is accounted for in the output +// service config. +func (s) TestXDSResolverClusterSpecifierPluginConfigUpdate(t *testing.T) { + xdsR, xdsC, tcc, cancel := testSetup(t, setupOpts{target: target}) + defer xdsR.Close() + defer cancel() + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + waitForWatchListener(ctx, t, xdsC, targetStr) + xdsC.InvokeWatchListenerCallback(xdsresource.ListenerUpdate{RouteConfigName: routeStr, HTTPFilters: routerFilterList}, nil) + + waitForWatchRouteConfig(ctx, t, xdsC, routeStr) + xdsC.InvokeWatchRouteConfigCallback("", xdsresource.RouteConfigUpdate{ + VirtualHosts: []*xdsresource.VirtualHost{ + { + Domains: []string{targetStr}, + Routes: []*xdsresource.Route{{Prefix: newStringP(""), ClusterSpecifierPlugin: "cspA"}}, + }, + }, + // Top level csp config here - the value of cspA should get directly + // placed as a child policy of xds cluster manager. + ClusterSpecifierPlugins: map[string]clusterspecifier.BalancerConfig{"cspA": []map[string]interface{}{{"csp_experimental": cspConfig{ArbitraryField: "anything"}}}}, + }, nil) + + gotState, err := tcc.stateCh.Receive(ctx) + if err != nil { + t.Fatalf("Error waiting for UpdateState to be called: %v", err) + } + rState := gotState.(resolver.State) + if err := rState.ServiceConfig.Err; err != nil { + t.Fatalf("ClientConn.UpdateState received error in service config: %v", rState.ServiceConfig.Err) + } + wantJSON := `{"loadBalancingConfig":[{ + "xds_cluster_manager_experimental":{ + "children":{ + "cluster_specifier_plugin:cspA":{ + "childPolicy":[{"csp_experimental":{"arbitrary_field":"anything"}}] + } + } + }}]}` + + wantSCParsed := internal.ParseServiceConfigForTesting.(func(string) *serviceconfig.ParseResult)(wantJSON) + if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed.Config) { + t.Errorf("ClientConn.UpdateState received different service config") + t.Error("got: ", cmp.Diff(nil, rState.ServiceConfig.Config)) + t.Fatal("want: ", cmp.Diff(nil, wantSCParsed.Config)) + } + + xdsC.InvokeWatchRouteConfigCallback("", xdsresource.RouteConfigUpdate{ + VirtualHosts: []*xdsresource.VirtualHost{ + { + Domains: []string{targetStr}, + Routes: []*xdsresource.Route{{Prefix: newStringP(""), ClusterSpecifierPlugin: "cspA"}}, + }, + }, + // Top level csp config here - the value of cspA should get directly + // placed as a child policy of xds cluster manager. + ClusterSpecifierPlugins: map[string]clusterspecifier.BalancerConfig{"cspA": []map[string]interface{}{{"csp_experimental": cspConfig{ArbitraryField: "changed"}}}}, + }, nil) + + gotState, err = tcc.stateCh.Receive(ctx) + if err != nil { + t.Fatalf("Error waiting for UpdateState to be called: %v", err) + } + rState = gotState.(resolver.State) + if err := rState.ServiceConfig.Err; err != nil { + t.Fatalf("ClientConn.UpdateState received error in service config: %v", rState.ServiceConfig.Err) + } + wantJSON = `{"loadBalancingConfig":[{ + "xds_cluster_manager_experimental":{ + "children":{ + "cluster_specifier_plugin:cspA":{ + "childPolicy":[{"csp_experimental":{"arbitrary_field":"changed"}}] + } + } + }}]}` + + wantSCParsed = internal.ParseServiceConfigForTesting.(func(string) *serviceconfig.ParseResult)(wantJSON) + if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed.Config) { + t.Errorf("ClientConn.UpdateState received different service config") + t.Error("got: ", cmp.Diff(nil, rState.ServiceConfig.Config)) + t.Fatal("want: ", cmp.Diff(nil, wantSCParsed.Config)) + } +} + +// TestXDSResolverDelayedOnCommittedCSP tests that cluster specifier plugins and +// their corresponding configurations remain in service config if RPCs are in +// flight. +func (s) TestXDSResolverDelayedOnCommittedCSP(t *testing.T) { + xdsR, xdsC, tcc, cancel := testSetup(t, setupOpts{target: target}) + defer xdsR.Close() + defer cancel() + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + waitForWatchListener(ctx, t, xdsC, targetStr) + xdsC.InvokeWatchListenerCallback(xdsresource.ListenerUpdate{RouteConfigName: routeStr, HTTPFilters: routerFilterList}, nil) + waitForWatchRouteConfig(ctx, t, xdsC, routeStr) + + xdsC.InvokeWatchRouteConfigCallback("", xdsresource.RouteConfigUpdate{ + VirtualHosts: []*xdsresource.VirtualHost{ + { + Domains: []string{targetStr}, + Routes: []*xdsresource.Route{{Prefix: newStringP(""), ClusterSpecifierPlugin: "cspA"}}, + }, + }, + // Top level csp config here - the value of cspA should get directly + // placed as a child policy of xds cluster manager. + ClusterSpecifierPlugins: map[string]clusterspecifier.BalancerConfig{"cspA": []map[string]interface{}{{"csp_experimental": cspConfig{ArbitraryField: "anythingA"}}}}, + }, nil) + + gotState, err := tcc.stateCh.Receive(ctx) + if err != nil { + t.Fatalf("Error waiting for UpdateState to be called: %v", err) + } + rState := gotState.(resolver.State) + if err := rState.ServiceConfig.Err; err != nil { + t.Fatalf("ClientConn.UpdateState received error in service config: %v", rState.ServiceConfig.Err) + } + wantJSON := `{"loadBalancingConfig":[{ + "xds_cluster_manager_experimental":{ + "children":{ + "cluster_specifier_plugin:cspA":{ + "childPolicy":[{"csp_experimental":{"arbitrary_field":"anythingA"}}] + } + } + }}]}` + + wantSCParsed := internal.ParseServiceConfigForTesting.(func(string) *serviceconfig.ParseResult)(wantJSON) + if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed.Config) { + t.Errorf("ClientConn.UpdateState received different service config") + t.Error("got: ", cmp.Diff(nil, rState.ServiceConfig.Config)) + t.Fatal("want: ", cmp.Diff(nil, wantSCParsed.Config)) + } + + cs := iresolver.GetConfigSelector(rState) + if cs == nil { + t.Fatal("received nil config selector") + } + + res, err := cs.SelectConfig(iresolver.RPCInfo{Context: context.Background()}) + if err != nil { + t.Fatalf("Unexpected error from cs.SelectConfig(_): %v", err) + } + + cluster := clustermanager.GetPickedClusterForTesting(res.Context) + clusterWant := clusterSpecifierPluginPrefix + "cspA" + if cluster != clusterWant { + t.Fatalf("cluster: %+v, want: %+v", cluster, clusterWant) + } + // delay res.OnCommitted() + + // Perform TWO updates to ensure the old config selector does not hold a reference to cspA + xdsC.InvokeWatchRouteConfigCallback("", xdsresource.RouteConfigUpdate{ + VirtualHosts: []*xdsresource.VirtualHost{ + { + Domains: []string{targetStr}, + Routes: []*xdsresource.Route{{Prefix: newStringP(""), ClusterSpecifierPlugin: "cspB"}}, + }, + }, + // Top level csp config here - the value of cspB should get directly + // placed as a child policy of xds cluster manager. + ClusterSpecifierPlugins: map[string]clusterspecifier.BalancerConfig{"cspB": []map[string]interface{}{{"csp_experimental": cspConfig{ArbitraryField: "anythingB"}}}}, + }, nil) + tcc.stateCh.Receive(ctx) // Ignore the first update. + + xdsC.InvokeWatchRouteConfigCallback("", xdsresource.RouteConfigUpdate{ + VirtualHosts: []*xdsresource.VirtualHost{ + { + Domains: []string{targetStr}, + Routes: []*xdsresource.Route{{Prefix: newStringP(""), ClusterSpecifierPlugin: "cspB"}}, + }, + }, + // Top level csp config here - the value of cspB should get directly + // placed as a child policy of xds cluster manager. + ClusterSpecifierPlugins: map[string]clusterspecifier.BalancerConfig{"cspB": []map[string]interface{}{{"csp_experimental": cspConfig{ArbitraryField: "anythingB"}}}}, + }, nil) + + gotState, err = tcc.stateCh.Receive(ctx) + if err != nil { + t.Fatalf("Error waiting for UpdateState to be called: %v", err) + } + rState = gotState.(resolver.State) + if err := rState.ServiceConfig.Err; err != nil { + t.Fatalf("ClientConn.UpdateState received error in service config: %v", rState.ServiceConfig.Err) + } + wantJSON2 := `{"loadBalancingConfig":[{ + "xds_cluster_manager_experimental":{ + "children":{ + "cluster_specifier_plugin:cspA":{ + "childPolicy":[{"csp_experimental":{"arbitrary_field":"anythingA"}}] + }, + "cluster_specifier_plugin:cspB":{ + "childPolicy":[{"csp_experimental":{"arbitrary_field":"anythingB"}}] + } + } + }}]}` + + wantSCParsed2 := internal.ParseServiceConfigForTesting.(func(string) *serviceconfig.ParseResult)(wantJSON2) + if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed2.Config) { + t.Errorf("ClientConn.UpdateState received different service config") + t.Error("got: ", cmp.Diff(nil, rState.ServiceConfig.Config)) + t.Fatal("want: ", cmp.Diff(nil, wantSCParsed2.Config)) + } + + // Invoke OnCommitted; should lead to a service config update that deletes + // cspA. + res.OnCommitted() + + xdsC.InvokeWatchRouteConfigCallback("", xdsresource.RouteConfigUpdate{ + VirtualHosts: []*xdsresource.VirtualHost{ + { + Domains: []string{targetStr}, + Routes: []*xdsresource.Route{{Prefix: newStringP(""), ClusterSpecifierPlugin: "cspB"}}, + }, + }, + // Top level csp config here - the value of cspB should get directly + // placed as a child policy of xds cluster manager. + ClusterSpecifierPlugins: map[string]clusterspecifier.BalancerConfig{"cspB": []map[string]interface{}{{"csp_experimental": cspConfig{ArbitraryField: "anythingB"}}}}, + }, nil) + gotState, err = tcc.stateCh.Receive(ctx) + if err != nil { + t.Fatalf("Error waiting for UpdateState to be called: %v", err) + } + rState = gotState.(resolver.State) + if err := rState.ServiceConfig.Err; err != nil { + t.Fatalf("ClientConn.UpdateState received error in service config: %v", rState.ServiceConfig.Err) + } + wantJSON3 := `{"loadBalancingConfig":[{ + "xds_cluster_manager_experimental":{ + "children":{ + "cluster_specifier_plugin:cspB":{ + "childPolicy":[{"csp_experimental":{"arbitrary_field":"anythingB"}}] + } + } + }}]}` + + wantSCParsed3 := internal.ParseServiceConfigForTesting.(func(string) *serviceconfig.ParseResult)(wantJSON3) + if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed3.Config) { + t.Errorf("ClientConn.UpdateState received different service config") + t.Error("got: ", cmp.Diff(nil, rState.ServiceConfig.Config)) + t.Fatal("want: ", cmp.Diff(nil, wantSCParsed3.Config)) + } +} diff --git a/xds/internal/resolver/serviceconfig.go b/xds/internal/resolver/serviceconfig.go index 55060ad1abe..fd75af21045 100644 --- a/xds/internal/resolver/serviceconfig.go +++ b/xds/internal/resolver/serviceconfig.go @@ -38,7 +38,6 @@ import ( "google.golang.org/grpc/status" "google.golang.org/grpc/xds/internal/balancer/clustermanager" "google.golang.org/grpc/xds/internal/balancer/ringhash" - "google.golang.org/grpc/xds/internal/clusterspecifier" "google.golang.org/grpc/xds/internal/httpfilter" "google.golang.org/grpc/xds/internal/httpfilter/router" "google.golang.org/grpc/xds/internal/xdsclient/xdsresource" @@ -90,15 +89,7 @@ func serviceConfigJSON(activeClusters map[string]*clusterInfo) ([]byte, error) { // Generate children (all entries in activeClusters). children := make(map[string]xdsChildConfig) for cluster, ci := range activeClusters { - if ci.cspCfg != nil { - children[cluster] = xdsChildConfig{ - ChildPolicy: balancerConfig(ci.cspCfg), - } - } else { - children[cluster] = xdsChildConfig{ - ChildPolicy: newBalancerConfig(cdsName, cdsBalancerConfig{Cluster: strings.TrimPrefix(cluster, clusterPrefix)}), - } - } + children[cluster] = ci.cfg } sc := serviceConfig{ @@ -369,8 +360,9 @@ func (r *xdsResolver) newConfigSelector(su serviceUpdate) (*configSelector, erro clusters.Add(&routeCluster{ name: clusterName, }, 1) - r.initializeCluster(clusterName, cs) - r.activeClusters[clusterName].cspCfg = su.clusterSpecifierPlugins[rt.ClusterSpecifierPlugin] + cs.initializeCluster(clusterName, xdsChildConfig{ + ChildPolicy: balancerConfig(su.clusterSpecifierPlugins[rt.ClusterSpecifierPlugin]), + }) } else { for cluster, wc := range rt.WeightedClusters { clusterName := clusterPrefix + cluster @@ -378,7 +370,9 @@ func (r *xdsResolver) newConfigSelector(su serviceUpdate) (*configSelector, erro name: clusterName, httpFilterConfigOverride: wc.HTTPFilterConfigOverride, }, int64(wc.Weight)) - r.initializeCluster(clusterName, cs) + cs.initializeCluster(clusterName, xdsChildConfig{ + ChildPolicy: newBalancerConfig(cdsName, cdsBalancerConfig{Cluster: cluster}), + }) } } cs.routes[i].clusters = clusters @@ -410,23 +404,24 @@ func (r *xdsResolver) newConfigSelector(su serviceUpdate) (*configSelector, erro } // initializeCluster initializes entries in cs.clusters map, creating entries in -// r.activeClusters as necessary. Any created entries will be set to zero as -// they will be incremented by incRefs. -func (r *xdsResolver) initializeCluster(clusterName string, cs *configSelector) { - ci := r.activeClusters[clusterName] +// r.activeClusters as necessary. Any created entries will have a ref count set +// to zero as their ref count will be incremented by incRefs. +func (cs *configSelector) initializeCluster(clusterName string, cfg xdsChildConfig) { + ci := cs.r.activeClusters[clusterName] if ci == nil { ci = &clusterInfo{refCount: 0} - r.activeClusters[clusterName] = ci + cs.r.activeClusters[clusterName] = ci } cs.clusters[clusterName] = ci + cs.clusters[clusterName].cfg = cfg } type clusterInfo struct { // number of references to this cluster; accessed atomically refCount int32 - // cspCfg is the configuration for this cluster if the cluster is a cluster - // specifier plugin. This will be nil otherwise. - cspCfg clusterspecifier.BalancerConfig + // cfg is the child configuration for this cluster, containing either the + // csp config or the cds cluster config. + cfg xdsChildConfig } type interceptorList struct { diff --git a/xds/internal/resolver/watch_service.go b/xds/internal/resolver/watch_service.go index f7aff2fbd24..3db9be1cac0 100644 --- a/xds/internal/resolver/watch_service.go +++ b/xds/internal/resolver/watch_service.go @@ -36,7 +36,7 @@ import ( type serviceUpdate struct { // virtualHost contains routes and other configuration to route RPCs. virtualHost *xdsresource.VirtualHost - // clusterSpecifierPlugins contain the configurations for any cluster + // clusterSpecifierPlugins contains the configurations for any cluster // specifier plugins emitted by the xdsclient. clusterSpecifierPlugins map[string]clusterspecifier.BalancerConfig // ldsConfig contains configuration that applies to all routes. diff --git a/xds/internal/resolver/xds_resolver_test.go b/xds/internal/resolver/xds_resolver_test.go index 0786102025e..c5fa3b8f749 100644 --- a/xds/internal/resolver/xds_resolver_test.go +++ b/xds/internal/resolver/xds_resolver_test.go @@ -29,7 +29,6 @@ import ( xxhash "github.com/cespare/xxhash/v2" "github.com/google/go-cmp/cmp" - "google.golang.org/grpc/balancer" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials/insecure" xdscreds "google.golang.org/grpc/credentials/xds" @@ -47,7 +46,6 @@ import ( _ "google.golang.org/grpc/xds/internal/balancer/cdsbalancer" // To parse LB config "google.golang.org/grpc/xds/internal/balancer/clustermanager" "google.golang.org/grpc/xds/internal/balancer/ringhash" - "google.golang.org/grpc/xds/internal/clusterspecifier" "google.golang.org/grpc/xds/internal/httpfilter" "google.golang.org/grpc/xds/internal/httpfilter/router" "google.golang.org/grpc/xds/internal/testutils/fakeclient" @@ -865,88 +863,6 @@ func (s) TestXDSResolverWRR(t *testing.T) { } } -func init() { - balancer.Register(cspB{}) -} - -type cspB struct{} - -func (cspB) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer { - return nil -} - -func (cspB) Name() string { - return "csp_experimental" -} - -type cspConfig struct { - ArbitraryField string `json:"arbitrary_field"` -} - -func TestXDSResolverClusterSpecifierPlugin(t *testing.T) { - xdsR, xdsC, tcc, cancel := testSetup(t, setupOpts{target: target}) - defer xdsR.Close() - defer cancel() - - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - waitForWatchListener(ctx, t, xdsC, targetStr) - xdsC.InvokeWatchListenerCallback(xdsresource.ListenerUpdate{RouteConfigName: routeStr, HTTPFilters: routerFilterList}, nil) - - waitForWatchRouteConfig(ctx, t, xdsC, routeStr) - xdsC.InvokeWatchRouteConfigCallback("", xdsresource.RouteConfigUpdate{ - VirtualHosts: []*xdsresource.VirtualHost{ - { - Domains: []string{targetStr}, - Routes: []*xdsresource.Route{{Prefix: newStringP(""), ClusterSpecifierPlugin: "cspA"}}, - }, - }, - // Top level csp config here - the value of cspA should get directly - // placed as a child policy of xds cluster manager. - ClusterSpecifierPlugins: map[string]clusterspecifier.BalancerConfig{"cspA": []map[string]interface{}{{"csp_experimental": cspConfig{ArbitraryField: "anything"}}}}, - }, nil) - - gotState, err := tcc.stateCh.Receive(ctx) - if err != nil { - t.Fatalf("Error waiting for UpdateState to be called: %v", err) - } - rState := gotState.(resolver.State) - if err := rState.ServiceConfig.Err; err != nil { - t.Fatalf("ClientConn.UpdateState received error in service config: %v", rState.ServiceConfig.Err) - } - wantJSON := `{"loadBalancingConfig":[{ - "xds_cluster_manager_experimental":{ - "children":{ - "cluster_specifier_plugin:cspA":{ - "childPolicy":[{"csp_experimental":{"arbitrary_field":"anything"}}] - } - } - }}]}` - - wantSCParsed := internal.ParseServiceConfigForTesting.(func(string) *serviceconfig.ParseResult)(wantJSON) - if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed.Config) { - t.Errorf("ClientConn.UpdateState received different service config") - t.Error("got: ", cmp.Diff(nil, rState.ServiceConfig.Config)) - t.Fatal("want: ", cmp.Diff(nil, wantSCParsed.Config)) - } - - cs := iresolver.GetConfigSelector(rState) - if cs == nil { - t.Fatal("received nil config selector") - } - - res, err := cs.SelectConfig(iresolver.RPCInfo{Context: context.Background()}) - if err != nil { - t.Fatalf("Unexpected error from cs.SelectConfig(_): %v", err) - } - - cluster := clustermanager.GetPickedClusterForTesting(res.Context) - clusterWant := clusterSpecifierPluginPrefix + "cspA" - if cluster != clusterWant { - t.Fatalf("cluster: %+v, want: %+v", cluster, clusterWant) - } -} - func (s) TestXDSResolverMaxStreamDuration(t *testing.T) { xdsR, xdsC, tcc, cancel := testSetup(t, setupOpts{target: target}) defer xdsR.Close() @@ -1181,166 +1097,6 @@ func (s) TestXDSResolverDelayedOnCommitted(t *testing.T) { } } -// TestXDSResolverDelayedOnCommittedCSP tests that cluster specifier plugins and -// their corresponding configurations remain in service config if RPCs are in -// flight. -func (s) TestXDSResolverDelayedOnCommittedCSP(t *testing.T) { - xdsR, xdsC, tcc, cancel := testSetup(t, setupOpts{target: target}) - defer xdsR.Close() - defer cancel() - - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - waitForWatchListener(ctx, t, xdsC, targetStr) - xdsC.InvokeWatchListenerCallback(xdsresource.ListenerUpdate{RouteConfigName: routeStr, HTTPFilters: routerFilterList}, nil) - waitForWatchRouteConfig(ctx, t, xdsC, routeStr) - - xdsC.InvokeWatchRouteConfigCallback("", xdsresource.RouteConfigUpdate{ - VirtualHosts: []*xdsresource.VirtualHost{ - { - Domains: []string{targetStr}, - Routes: []*xdsresource.Route{{Prefix: newStringP(""), ClusterSpecifierPlugin: "cspA"}}, - }, - }, - // Top level csp config here - the value of cspA should get directly - // placed as a child policy of xds cluster manager. - ClusterSpecifierPlugins: map[string]clusterspecifier.BalancerConfig{"cspA": []map[string]interface{}{{"csp_experimental": cspConfig{ArbitraryField: "anythingA"}}}}, - }, nil) - - gotState, err := tcc.stateCh.Receive(ctx) - if err != nil { - t.Fatalf("Error waiting for UpdateState to be called: %v", err) - } - rState := gotState.(resolver.State) - if err := rState.ServiceConfig.Err; err != nil { - t.Fatalf("ClientConn.UpdateState received error in service config: %v", rState.ServiceConfig.Err) - } - wantJSON := `{"loadBalancingConfig":[{ - "xds_cluster_manager_experimental":{ - "children":{ - "cluster_specifier_plugin:cspA":{ - "childPolicy":[{"csp_experimental":{"arbitrary_field":"anythingA"}}] - } - } - }}]}` - - wantSCParsed := internal.ParseServiceConfigForTesting.(func(string) *serviceconfig.ParseResult)(wantJSON) - if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed.Config) { - t.Errorf("ClientConn.UpdateState received different service config") - t.Error("got: ", cmp.Diff(nil, rState.ServiceConfig.Config)) - t.Fatal("want: ", cmp.Diff(nil, wantSCParsed.Config)) - } - - cs := iresolver.GetConfigSelector(rState) - if cs == nil { - t.Fatal("received nil config selector") - } - - res, err := cs.SelectConfig(iresolver.RPCInfo{Context: context.Background()}) - if err != nil { - t.Fatalf("Unexpected error from cs.SelectConfig(_): %v", err) - } - - cluster := clustermanager.GetPickedClusterForTesting(res.Context) - clusterWant := clusterSpecifierPluginPrefix + "cspA" - if cluster != clusterWant { - t.Fatalf("cluster: %+v, want: %+v", cluster, clusterWant) - } - // delay res.OnCommitted() - - // Perform TWO updates to ensure the old config selector does not hold a reference to cspA - xdsC.InvokeWatchRouteConfigCallback("", xdsresource.RouteConfigUpdate{ - VirtualHosts: []*xdsresource.VirtualHost{ - { - Domains: []string{targetStr}, - Routes: []*xdsresource.Route{{Prefix: newStringP(""), ClusterSpecifierPlugin: "cspB"}}, - }, - }, - // Top level csp config here - the value of cspB should get directly - // placed as a child policy of xds cluster manager. - ClusterSpecifierPlugins: map[string]clusterspecifier.BalancerConfig{"cspB": []map[string]interface{}{{"csp_experimental": cspConfig{ArbitraryField: "anythingB"}}}}, - }, nil) - tcc.stateCh.Receive(ctx) // Ignore the first update. - - xdsC.InvokeWatchRouteConfigCallback("", xdsresource.RouteConfigUpdate{ - VirtualHosts: []*xdsresource.VirtualHost{ - { - Domains: []string{targetStr}, - Routes: []*xdsresource.Route{{Prefix: newStringP(""), ClusterSpecifierPlugin: "cspB"}}, - }, - }, - // Top level csp config here - the value of cspB should get directly - // placed as a child policy of xds cluster manager. - ClusterSpecifierPlugins: map[string]clusterspecifier.BalancerConfig{"cspB": []map[string]interface{}{{"csp_experimental": cspConfig{ArbitraryField: "anythingB"}}}}, - }, nil) - - gotState, err = tcc.stateCh.Receive(ctx) - if err != nil { - t.Fatalf("Error waiting for UpdateState to be called: %v", err) - } - rState = gotState.(resolver.State) - if err := rState.ServiceConfig.Err; err != nil { - t.Fatalf("ClientConn.UpdateState received error in service config: %v", rState.ServiceConfig.Err) - } - wantJSON2 := `{"loadBalancingConfig":[{ - "xds_cluster_manager_experimental":{ - "children":{ - "cluster_specifier_plugin:cspA":{ - "childPolicy":[{"csp_experimental":{"arbitrary_field":"anythingA"}}] - }, - "cluster_specifier_plugin:cspB":{ - "childPolicy":[{"csp_experimental":{"arbitrary_field":"anythingB"}}] - } - } - }}]}` - - wantSCParsed2 := internal.ParseServiceConfigForTesting.(func(string) *serviceconfig.ParseResult)(wantJSON2) - if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed2.Config) { - t.Errorf("ClientConn.UpdateState received different service config") - t.Error("got: ", cmp.Diff(nil, rState.ServiceConfig.Config)) - t.Fatal("want: ", cmp.Diff(nil, wantSCParsed2.Config)) - } - - // Invoke OnCommitted; should lead to a service config update that deletes - // cspA. - res.OnCommitted() - - xdsC.InvokeWatchRouteConfigCallback("", xdsresource.RouteConfigUpdate{ - VirtualHosts: []*xdsresource.VirtualHost{ - { - Domains: []string{targetStr}, - Routes: []*xdsresource.Route{{Prefix: newStringP(""), ClusterSpecifierPlugin: "cspB"}}, - }, - }, - // Top level csp config here - the value of cspB should get directly - // placed as a child policy of xds cluster manager. - ClusterSpecifierPlugins: map[string]clusterspecifier.BalancerConfig{"cspB": []map[string]interface{}{{"csp_experimental": cspConfig{ArbitraryField: "anythingB"}}}}, - }, nil) - gotState, err = tcc.stateCh.Receive(ctx) - if err != nil { - t.Fatalf("Error waiting for UpdateState to be called: %v", err) - } - rState = gotState.(resolver.State) - if err := rState.ServiceConfig.Err; err != nil { - t.Fatalf("ClientConn.UpdateState received error in service config: %v", rState.ServiceConfig.Err) - } - wantJSON3 := `{"loadBalancingConfig":[{ - "xds_cluster_manager_experimental":{ - "children":{ - "cluster_specifier_plugin:cspB":{ - "childPolicy":[{"csp_experimental":{"arbitrary_field":"anythingB"}}] - } - } - }}]}` - - wantSCParsed3 := internal.ParseServiceConfigForTesting.(func(string) *serviceconfig.ParseResult)(wantJSON3) - if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed3.Config) { - t.Errorf("ClientConn.UpdateState received different service config") - t.Error("got: ", cmp.Diff(nil, rState.ServiceConfig.Config)) - t.Fatal("want: ", cmp.Diff(nil, wantSCParsed3.Config)) - } -} - // TestXDSResolverUpdates tests the cases where the resolver gets a good update // after an error, and an error after the good update. func (s) TestXDSResolverGoodUpdateAfterError(t *testing.T) { From 5b4f327a153de39a92327bb097e8339db65c1379 Mon Sep 17 00:00:00 2001 From: Zach Reyes Date: Mon, 6 Dec 2021 13:18:10 -0500 Subject: [PATCH 7/7] Formatting --- xds/internal/resolver/cluster_specifier_plugin_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/xds/internal/resolver/cluster_specifier_plugin_test.go b/xds/internal/resolver/cluster_specifier_plugin_test.go index 156c21ccd54..d432ad3c489 100644 --- a/xds/internal/resolver/cluster_specifier_plugin_test.go +++ b/xds/internal/resolver/cluster_specifier_plugin_test.go @@ -314,7 +314,7 @@ func (s) TestXDSResolverDelayedOnCommittedCSP(t *testing.T) { "cluster_specifier_plugin:cspA":{ "childPolicy":[{"csp_experimental":{"arbitrary_field":"anythingA"}}] }, - "cluster_specifier_plugin:cspB":{ + "cluster_specifier_plugin:cspB":{ "childPolicy":[{"csp_experimental":{"arbitrary_field":"anythingB"}}] } } @@ -353,7 +353,7 @@ func (s) TestXDSResolverDelayedOnCommittedCSP(t *testing.T) { wantJSON3 := `{"loadBalancingConfig":[{ "xds_cluster_manager_experimental":{ "children":{ - "cluster_specifier_plugin:cspB":{ + "cluster_specifier_plugin:cspB":{ "childPolicy":[{"csp_experimental":{"arbitrary_field":"anythingB"}}] } }