Skip to content

Commit

Permalink
Responded to Doug's comments
Browse files Browse the repository at this point in the history
  • Loading branch information
zasweq committed Nov 16, 2021
1 parent 8a6bf55 commit a616529
Showing 1 changed file with 22 additions and 22 deletions.
44 changes: 22 additions & 22 deletions xds/internal/resolver/serviceconfig.go
Expand Up @@ -45,8 +45,10 @@ import (
)

const (
cdsName = "cds_experimental"
xdsClusterManagerName = "xds_cluster_manager_experimental"
cdsName = "cds_experimental"
xdsClusterManagerName = "xds_cluster_manager_experimental"
clusterPrefix = "cluster:"
clusterSpecifierPluginPrefix = "cluster_specifier_plugin:"
)

type serviceConfig struct {
Expand Down Expand Up @@ -180,7 +182,6 @@ func (cs *configSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*iresolver.RP
return nil, status.Errorf(codes.Internal, "error retrieving cluster for match: %v (%T)", cluster, cluster)
}


// Add a ref to the selected cluster, as this RPC needs this cluster until
// it is committed.
ref := &cs.clusters[cluster.name].refCount
Expand Down Expand Up @@ -373,32 +374,19 @@ func (r *xdsResolver) newConfigSelector(su serviceUpdate) (*configSelector, erro
for i, rt := range su.virtualHost.Routes {
clusters := newWRR()
if rt.ClusterSpecifierPlugin != "" {
clusterName := clusterSpecifierPluginPrefix + rt.ClusterSpecifierPlugin
clusters.Add(&routeCluster{
name: "cluster:" + rt.ClusterSpecifierPlugin,
name: clusterName,
}, 1)

ci := r.activeClusters["cluster_specifier_plugin:" + rt.ClusterSpecifierPlugin]
if ci == nil {
ci = &clusterInfo{refCount: 0}
r.activeClusters["cluster_specifier_plugin:" + rt.ClusterSpecifierPlugin] = ci
}
cs.clusters["cluster_specifier_plugin:" + rt.ClusterSpecifierPlugin] = ci
r.initializeCluster(clusterName, cs)
} else {
for cluster, wc := range rt.WeightedClusters {
clusterName := clusterPrefix + cluster
clusters.Add(&routeCluster{
name: "cluster:" + cluster,
name: clusterName,
httpFilterConfigOverride: wc.HTTPFilterConfigOverride,
}, int64(wc.Weight))

// Initialize entries in cs.clusters map, creating entries in
// r.activeClusters as necessary. Set to zero as they will be
// incremented by incRefs.
ci := r.activeClusters["cluster:" + cluster]
if ci == nil {
ci = &clusterInfo{refCount: 0}
r.activeClusters["cluster:" + cluster] = ci
}
cs.clusters["cluster:" + cluster] = ci
r.initializeCluster(clusterName, cs)
}
}
cs.routes[i].clusters = clusters
Expand Down Expand Up @@ -429,6 +417,18 @@ func (r *xdsResolver) newConfigSelector(su serviceUpdate) (*configSelector, erro
return cs, nil
}

// initializeCluster initializes entries in cs.clusters map, creating entries in
// r.activeClusters as necessary. Any created entries will be set to zero as
// they will be incremented by incRefs.
func (r *xdsResolver) initializeCluster(clusterName string, cs *configSelector) {
ci := r.activeClusters[clusterName]
if ci == nil {
ci = &clusterInfo{refCount: 0}
r.activeClusters[clusterName] = ci
}
cs.clusters[clusterName] = ci
}

type clusterInfo struct {
// number of references to this cluster; accessed atomically
refCount int32
Expand Down

0 comments on commit a616529

Please sign in to comment.