Skip to content

Commit

Permalink
Responded to Doug's comment and add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
zasweq committed Nov 17, 2021
1 parent a616529 commit 179acf6
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 32 deletions.
9 changes: 5 additions & 4 deletions xds/internal/resolver/serviceconfig.go
Expand Up @@ -92,7 +92,7 @@ func serviceConfigJSON(activeClusters map[string]*clusterInfo, clusterSpecifierP
for cluster := range activeClusters {
// Look into cluster specifier plugins, which hasn't had any prefix attached to it's cluster specifier plugin names,
// to determine the LB Config if the cluster is a CSP.
cspCfg, ok := clusterSpecifierPlugins[strings.TrimPrefix(cluster, "cluster_specifier_plugin:")]
cspCfg, ok := clusterSpecifierPlugins[strings.TrimPrefix(cluster, clusterSpecifierPluginPrefix)]
if ok {
children[cluster] = xdsChildConfig{
ChildPolicy: balancerConfig(cspCfg),
Expand Down Expand Up @@ -366,9 +366,10 @@ func (r *xdsResolver) newConfigSelector(su serviceUpdate) (*configSelector, erro
httpFilterConfigOverride: su.virtualHost.HTTPFilterConfigOverride,
retryConfig: su.virtualHost.RetryConfig,
},
routes: make([]route, len(su.virtualHost.Routes)),
clusters: make(map[string]*clusterInfo),
httpFilterConfig: su.ldsConfig.httpFilterConfig,
routes: make([]route, len(su.virtualHost.Routes)),
clusters: make(map[string]*clusterInfo),
clusterSpecifierPlugins: su.clusterSpecifierPlugins,
httpFilterConfig: su.ldsConfig.httpFilterConfig,
}

for i, rt := range su.virtualHost.Routes {
Expand Down
7 changes: 6 additions & 1 deletion xds/internal/resolver/xds_resolver.go
Expand Up @@ -30,6 +30,7 @@ import (
"google.golang.org/grpc/internal/pretty"
iresolver "google.golang.org/grpc/internal/resolver"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/xds/internal/clusterspecifier"
"google.golang.org/grpc/xds/internal/xdsclient"
"google.golang.org/grpc/xds/internal/xdsclient/bootstrap"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
Expand Down Expand Up @@ -206,7 +207,11 @@ func (r *xdsResolver) sendNewServiceConfig(cs *configSelector) bool {
}

// Produce the service config.
sc, err := serviceConfigJSON(r.activeClusters, cs.clusterSpecifierPlugins)
csps := make(map[string]clusterspecifier.BalancerConfig)
if cs != nil {
csps = cs.clusterSpecifierPlugins
}
sc, err := serviceConfigJSON(r.activeClusters, csps)
if err != nil {
// JSON marshal error; should never happen.
r.logger.Errorf("%v", err)
Expand Down
138 changes: 111 additions & 27 deletions xds/internal/resolver/xds_resolver_test.go
Expand Up @@ -29,6 +29,7 @@ import (

xxhash "github.com/cespare/xxhash/v2"
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
xdscreds "google.golang.org/grpc/credentials/xds"
Expand All @@ -46,6 +47,7 @@ import (
_ "google.golang.org/grpc/xds/internal/balancer/cdsbalancer" // To parse LB config
"google.golang.org/grpc/xds/internal/balancer/clustermanager"
"google.golang.org/grpc/xds/internal/balancer/ringhash"
"google.golang.org/grpc/xds/internal/clusterspecifier"
"google.golang.org/grpc/xds/internal/httpfilter"
"google.golang.org/grpc/xds/internal/httpfilter/router"
"google.golang.org/grpc/xds/internal/testutils/fakeclient"
Expand Down Expand Up @@ -473,12 +475,12 @@ func (s) TestXDSResolverGoodServiceUpdate(t *testing.T) {
wantJSON: `{"loadBalancingConfig":[{
"xds_cluster_manager_experimental":{
"children":{
"test-cluster-1":{
"childPolicy":[{"cds_experimental":{"cluster":"test-cluster-1"}}]
"cluster:test-cluster-1":{
"childPolicy":[{"cds_experimental":{"cluster":"cluster:test-cluster-1"}}]
}
}
}}]}`,
wantClusters: map[string]bool{"test-cluster-1": true},
wantClusters: map[string]bool{"cluster:test-cluster-1": true},
},
{
routes: []*xdsresource.Route{{Prefix: newStringP(""), WeightedClusters: map[string]xdsresource.WeightedCluster{
Expand All @@ -491,18 +493,18 @@ func (s) TestXDSResolverGoodServiceUpdate(t *testing.T) {
wantJSON: `{"loadBalancingConfig":[{
"xds_cluster_manager_experimental":{
"children":{
"test-cluster-1":{
"childPolicy":[{"cds_experimental":{"cluster":"test-cluster-1"}}]
"cluster:test-cluster-1":{
"childPolicy":[{"cds_experimental":{"cluster":"cluster:test-cluster-1"}}]
},
"cluster_1":{
"childPolicy":[{"cds_experimental":{"cluster":"cluster_1"}}]
"cluster:cluster_1":{
"childPolicy":[{"cds_experimental":{"cluster":"cluster:cluster_1"}}]
},
"cluster_2":{
"childPolicy":[{"cds_experimental":{"cluster":"cluster_2"}}]
"cluster:cluster_2":{
"childPolicy":[{"cds_experimental":{"cluster":"cluster:cluster_2"}}]
}
}
}}]}`,
wantClusters: map[string]bool{"cluster_1": true, "cluster_2": true},
wantClusters: map[string]bool{"cluster:cluster_1": true, "cluster:cluster_2": true},
},
{
routes: []*xdsresource.Route{{Prefix: newStringP(""), WeightedClusters: map[string]xdsresource.WeightedCluster{
Expand All @@ -515,15 +517,15 @@ func (s) TestXDSResolverGoodServiceUpdate(t *testing.T) {
wantJSON: `{"loadBalancingConfig":[{
"xds_cluster_manager_experimental":{
"children":{
"cluster_1":{
"childPolicy":[{"cds_experimental":{"cluster":"cluster_1"}}]
"cluster:cluster_1":{
"childPolicy":[{"cds_experimental":{"cluster":"cluster:cluster_1"}}]
},
"cluster_2":{
"childPolicy":[{"cds_experimental":{"cluster":"cluster_2"}}]
"cluster:cluster_2":{
"childPolicy":[{"cds_experimental":{"cluster":"cluster:cluster_2"}}]
}
}
}}]}`,
wantClusters: map[string]bool{"cluster_1": true, "cluster_2": true},
wantClusters: map[string]bool{"cluster:cluster_1": true, "cluster:cluster_2": true},
},
} {
// Invoke the watchAPI callback with a good service update and wait for the
Expand Down Expand Up @@ -725,8 +727,8 @@ func (s) TestXDSResolverRemovedResource(t *testing.T) {
wantJSON := `{"loadBalancingConfig":[{
"xds_cluster_manager_experimental":{
"children":{
"test-cluster-1":{
"childPolicy":[{"cds_experimental":{"cluster":"test-cluster-1"}}]
"cluster:test-cluster-1":{
"childPolicy":[{"cds_experimental":{"cluster":"cluster:test-cluster-1"}}]
}
}
}}]}`
Expand Down Expand Up @@ -857,12 +859,94 @@ func (s) TestXDSResolverWRR(t *testing.T) {
picks[clustermanager.GetPickedClusterForTesting(res.Context)]++
res.OnCommitted()
}
want := map[string]int{"A": 10, "B": 20}
want := map[string]int{"cluster:A": 10, "cluster:B": 20}
if !reflect.DeepEqual(picks, want) {
t.Errorf("picked clusters = %v; want %v", picks, want)
}
}

func init() {
balancer.Register(cspB{})
}

type cspB struct{}

func (cspB) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
return nil
}

func (cspB) Name() string {
return "csp_experimental"
}

type cspConfig struct {
ArbitraryField string `json:"arbitrary_field"`
}

func TestXDSResolverClusterSpecifierPlugin(t *testing.T) {
xdsR, xdsC, tcc, cancel := testSetup(t, setupOpts{target: target})
defer xdsR.Close()
defer cancel()

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
waitForWatchListener(ctx, t, xdsC, targetStr)
xdsC.InvokeWatchListenerCallback(xdsresource.ListenerUpdate{RouteConfigName: routeStr, HTTPFilters: routerFilterList}, nil)

waitForWatchRouteConfig(ctx, t, xdsC, routeStr)
xdsC.InvokeWatchRouteConfigCallback("", xdsresource.RouteConfigUpdate{
VirtualHosts: []*xdsresource.VirtualHost{
{
Domains: []string{targetStr},
Routes: []*xdsresource.Route{{Prefix: newStringP(""), ClusterSpecifierPlugin: "cspA"}},
},
},
// Top level csp config here - the value of cspA should get directly
// placed as a child policy of xds cluster manager.
ClusterSpecifierPlugins: map[string]clusterspecifier.BalancerConfig{"cspA": []map[string]interface{}{{"csp_experimental": cspConfig{ArbitraryField: "anything"}}}},
}, nil)

gotState, err := tcc.stateCh.Receive(ctx)
if err != nil {
t.Fatalf("Error waiting for UpdateState to be called: %v", err)
}
rState := gotState.(resolver.State)
if err := rState.ServiceConfig.Err; err != nil {
t.Fatalf("ClientConn.UpdateState received error in service config: %v", rState.ServiceConfig.Err)
}
wantJSON := `{"loadBalancingConfig":[{
"xds_cluster_manager_experimental":{
"children":{
"cluster_specifier_plugin:cspA":{
"childPolicy":[{"csp_experimental":{"arbitrary_field":"anything"}}]
}
}
}}]}`

wantSCParsed := internal.ParseServiceConfigForTesting.(func(string) *serviceconfig.ParseResult)(wantJSON)
if !internal.EqualServiceConfigForTesting(rState.ServiceConfig.Config, wantSCParsed.Config) {
t.Errorf("ClientConn.UpdateState received different service config")
t.Error("got: ", cmp.Diff(nil, rState.ServiceConfig.Config))
t.Fatal("want: ", cmp.Diff(nil, wantSCParsed.Config))
}

cs := iresolver.GetConfigSelector(rState)
if cs == nil {
t.Fatal("received nil config selector")
}

res, err := cs.SelectConfig(iresolver.RPCInfo{Context: context.Background()})
if err != nil {
t.Fatalf("Unexpected error from cs.SelectConfig(_): %v", err)
}

cluster := clustermanager.GetPickedClusterForTesting(res.Context)
clusterWant := clusterSpecifierPluginPrefix + "cspA"
if cluster != clusterWant {
t.Fatalf("cluster: %+v, want: %+v", cluster, clusterWant)
}
}

func (s) TestXDSResolverMaxStreamDuration(t *testing.T) {
xdsR, xdsC, tcc, cancel := testSetup(t, setupOpts{target: target})
defer xdsR.Close()
Expand Down Expand Up @@ -987,8 +1071,8 @@ func (s) TestXDSResolverDelayedOnCommitted(t *testing.T) {
wantJSON := `{"loadBalancingConfig":[{
"xds_cluster_manager_experimental":{
"children":{
"test-cluster-1":{
"childPolicy":[{"cds_experimental":{"cluster":"test-cluster-1"}}]
"cluster:test-cluster-1":{
"childPolicy":[{"cds_experimental":{"cluster":"cluster:test-cluster-1"}}]
}
}
}}]}`
Expand All @@ -1009,7 +1093,7 @@ func (s) TestXDSResolverDelayedOnCommitted(t *testing.T) {
t.Fatalf("Unexpected error from cs.SelectConfig(_): %v", err)
}
cluster := clustermanager.GetPickedClusterForTesting(res.Context)
if cluster != "test-cluster-1" {
if cluster != "cluster:test-cluster-1" {
t.Fatalf("")
}
// delay res.OnCommitted()
Expand Down Expand Up @@ -1046,11 +1130,11 @@ func (s) TestXDSResolverDelayedOnCommitted(t *testing.T) {
wantJSON2 := `{"loadBalancingConfig":[{
"xds_cluster_manager_experimental":{
"children":{
"test-cluster-1":{
"childPolicy":[{"cds_experimental":{"cluster":"test-cluster-1"}}]
"cluster:test-cluster-1":{
"childPolicy":[{"cds_experimental":{"cluster":"cluster:test-cluster-1"}}]
},
"NEW":{
"childPolicy":[{"cds_experimental":{"cluster":"NEW"}}]
"cluster:NEW":{
"childPolicy":[{"cds_experimental":{"cluster":"cluster:NEW"}}]
}
}
}}]}`
Expand Down Expand Up @@ -1084,8 +1168,8 @@ func (s) TestXDSResolverDelayedOnCommitted(t *testing.T) {
wantJSON3 := `{"loadBalancingConfig":[{
"xds_cluster_manager_experimental":{
"children":{
"NEW":{
"childPolicy":[{"cds_experimental":{"cluster":"NEW"}}]
"cluster:NEW":{
"childPolicy":[{"cds_experimental":{"cluster":"cluster:NEW"}}]
}
}
}}]}`
Expand Down

0 comments on commit 179acf6

Please sign in to comment.