Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds/resolver: Add support for cluster specifier plugins #4987

Merged
merged 7 commits into from Dec 6, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
26 changes: 9 additions & 17 deletions xds/internal/resolver/serviceconfig.go
Expand Up @@ -133,11 +133,7 @@ type routeCluster struct {

type route struct {
m *xdsresource.CompositeMatcher // converted from route matchers

// Exactly one of clusterSpecifierPlugin or clusters will be set.
clusterSpecifierPlugin string
clusters wrr.WRR // holds *routeCluster entries

maxStreamDuration time.Duration
// map from filter name to its config
httpFilterConfigOverride map[string]httpfilter.FilterConfig
Expand Down Expand Up @@ -179,15 +175,9 @@ func (cs *configSelector) SelectConfig(rpcInfo iresolver.RPCInfo) (*iresolver.RP
return nil, errNoMatchedRouteFound
}

var cluster *routeCluster
if rt.clusterSpecifierPlugin != "" {
cluster.name = rt.clusterSpecifierPlugin
// cluster.httpFilterConfigOverride = /*Do cluster specifier plugins even have http filter config overrides*/
} else {
cluster, ok := rt.clusters.Next().(*routeCluster)
if !ok {
return nil, status.Errorf(codes.Internal, "error retrieving cluster for match: %v (%T)", cluster, cluster)
}
cluster, ok := rt.clusters.Next().(*routeCluster)
if !ok {
return nil, status.Errorf(codes.Internal, "error retrieving cluster for match: %v (%T)", cluster, cluster)
}


Expand Down Expand Up @@ -381,17 +371,19 @@ func (r *xdsResolver) newConfigSelector(su serviceUpdate) (*configSelector, erro
}

for i, rt := range su.virtualHost.Routes {
clusters := newWRR()
if rt.ClusterSpecifierPlugin != "" {
clusters.Add(&routeCluster{
name: "cluster:" + rt.ClusterSpecifierPlugin,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be "cluster_specifier_plugin:" + rt.ClusterSpecifierPlugin?

Please use a local to hold that instead of repeating it 4 times. Also, global consts for "cluster:" and "cluster_specifier_plugin:" would be a good idea to avoid any chance of a typo in one usage.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, good catch. Added global consts and also a local var for both branches.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, the comment isn't showing up on this PR, but I refactored the cluster initialization into another function. It made it much cleaner.

}, 1)

ci := r.activeClusters["cluster_specifier_plugin:" + rt.ClusterSpecifierPlugin]
if ci == nil {
ci = &clusterInfo{refCount: 0}
r.activeClusters["cluster_specifier_plugin:" + rt.ClusterSpecifierPlugin] = ci
}
cs.clusters["cluster_specifier_plugin:" + rt.ClusterSpecifierPlugin] = ci
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this bit be shared with the below section? Maybe factored into another function?


cs.routes[i].clusterSpecifierPlugin = "cluster_specifier_plugin:" + rt.ClusterSpecifierPlugin
} else {
clusters := newWRR()
for cluster, wc := range rt.WeightedClusters {
clusters.Add(&routeCluster{
name: "cluster:" + cluster,
Expand All @@ -408,8 +400,8 @@ func (r *xdsResolver) newConfigSelector(su serviceUpdate) (*configSelector, erro
}
cs.clusters["cluster:" + cluster] = ci
}
cs.routes[i].clusters = clusters
}
cs.routes[i].clusters = clusters

var err error
cs.routes[i].m, err = xdsresource.RouteToMatcher(rt)
Expand Down
6 changes: 3 additions & 3 deletions xds/internal/resolver/watch_service.go
Expand Up @@ -124,7 +124,7 @@ func (w *serviceUpdateWatcher) handleLDSResp(update xdsresource.ListenerUpdate,
}

// Handle the inline RDS update as if it's from an RDS watch.
w.updateVirtualHostsFromRDS(*update.InlineRouteConfig)
w.applyRouteConfigUpdate(*update.InlineRouteConfig)
return
}

Expand Down Expand Up @@ -155,7 +155,7 @@ func (w *serviceUpdateWatcher) handleLDSResp(update xdsresource.ListenerUpdate,
w.rdsCancel = w.c.WatchRouteConfig(update.RouteConfigName, w.handleRDSResp)
}

func (w *serviceUpdateWatcher) updateVirtualHostsFromRDS(update xdsresource.RouteConfigUpdate) {
func (w *serviceUpdateWatcher) applyRouteConfigUpdate(update xdsresource.RouteConfigUpdate) {
matchVh := xdsresource.FindBestMatchingVirtualHost(w.serviceName, update.VirtualHosts)
if matchVh == nil {
// No matching virtual host found.
Expand Down Expand Up @@ -184,7 +184,7 @@ func (w *serviceUpdateWatcher) handleRDSResp(update xdsresource.RouteConfigUpdat
w.serviceCb(serviceUpdate{}, err)
return
}
w.updateVirtualHostsFromRDS(update)
w.applyRouteConfigUpdate(update)
}

func (w *serviceUpdateWatcher) close() {
Expand Down