diff --git a/xds/internal/resolver/serviceconfig.go b/xds/internal/resolver/serviceconfig.go index 6ae1f42169a..c65ce8c7b7c 100644 --- a/xds/internal/resolver/serviceconfig.go +++ b/xds/internal/resolver/serviceconfig.go @@ -45,8 +45,10 @@ import ( ) const ( - cdsName = "cds_experimental" - xdsClusterManagerName = "xds_cluster_manager_experimental" + cdsName = "cds_experimental" + xdsClusterManagerName = "xds_cluster_manager_experimental" + clusterPrefix = "cluster:" + clusterSpecifierPluginPrefix = "cluster_specifier_plugin:" ) type serviceConfig struct { @@ -180,7 +182,6 @@ func (cs *configSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*iresolver.RP return nil, status.Errorf(codes.Internal, "error retrieving cluster for match: %v (%T)", cluster, cluster) } - // Add a ref to the selected cluster, as this RPC needs this cluster until // it is committed. ref := &cs.clusters[cluster.name].refCount @@ -373,32 +374,19 @@ func (r *xdsResolver) newConfigSelector(su serviceUpdate) (*configSelector, erro for i, rt := range su.virtualHost.Routes { clusters := newWRR() if rt.ClusterSpecifierPlugin != "" { + clusterName := clusterSpecifierPluginPrefix + rt.ClusterSpecifierPlugin clusters.Add(&routeCluster{ - name: "cluster:" + rt.ClusterSpecifierPlugin, + name: clusterName, }, 1) - - ci := r.activeClusters["cluster_specifier_plugin:" + rt.ClusterSpecifierPlugin] - if ci == nil { - ci = &clusterInfo{refCount: 0} - r.activeClusters["cluster_specifier_plugin:" + rt.ClusterSpecifierPlugin] = ci - } - cs.clusters["cluster_specifier_plugin:" + rt.ClusterSpecifierPlugin] = ci + r.initializeCluster(clusterName, cs) } else { for cluster, wc := range rt.WeightedClusters { + clusterName := clusterPrefix + cluster clusters.Add(&routeCluster{ - name: "cluster:" + cluster, + name: clusterName, httpFilterConfigOverride: wc.HTTPFilterConfigOverride, }, int64(wc.Weight)) - - // Initialize entries in cs.clusters map, creating entries in - // r.activeClusters as necessary. Set to zero as they will be - // incremented by incRefs. - ci := r.activeClusters["cluster:" + cluster] - if ci == nil { - ci = &clusterInfo{refCount: 0} - r.activeClusters["cluster:" + cluster] = ci - } - cs.clusters["cluster:" + cluster] = ci + r.initializeCluster(clusterName, cs) } } cs.routes[i].clusters = clusters @@ -429,6 +417,18 @@ func (r *xdsResolver) newConfigSelector(su serviceUpdate) (*configSelector, erro return cs, nil } +// initializeCluster initializes entries in cs.clusters map, creating entries in +// r.activeClusters as necessary. Any created entries will be set to zero as +// they will be incremented by incRefs. +func (r *xdsResolver) initializeCluster(clusterName string, cs *configSelector) { + ci := r.activeClusters[clusterName] + if ci == nil { + ci = &clusterInfo{refCount: 0} + r.activeClusters[clusterName] = ci + } + cs.clusters[clusterName] = ci +} + type clusterInfo struct { // number of references to this cluster; accessed atomically refCount int32