Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rls: delegate pick to child policy as long as it is not in TransientFailure #5656

Merged
merged 2 commits into from Sep 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
44 changes: 11 additions & 33 deletions balancer/rls/picker.go
Expand Up @@ -162,21 +162,20 @@ func (p *rlsPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {

// delegateToChildPolicies is a helper function which iterates through the list
// of child policy wrappers in a cache entry and attempts to find a child policy
// to which this RPC can be routed to. If there is no child policy in READY
// state, we delegate to the first child policy arbitrarily.
// to which this RPC can be routed to. If all child policies are in
// TRANSIENT_FAILURE, we delegate to the last child policy arbitrarily.
//
// Caller must hold at least a read-lock on p.lb.cacheMu.
func (p *rlsPicker) delegateToChildPolicies(dcEntry *cacheEntry, info balancer.PickInfo) (balancer.PickResult, error) {
for _, cpw := range dcEntry.childPolicyWrappers {
ok, res, err := p.pickIfFeasible(cpw, info)
if ok {
return res, err
for i, cpw := range dcEntry.childPolicyWrappers {
state := (*balancer.State)(atomic.LoadPointer(&cpw.state))
// Delegate to the child policy if it is not in TRANSIENT_FAILURE, or if
// it the last one (which handles the case of delegating to the last
// child picker if all child polcies are in TRANSIENT_FAILURE).
if state.ConnectivityState != connectivity.TransientFailure || i == len(dcEntry.childPolicyWrappers)-1 {
return state.Picker.Pick(info)
}
}
if len(dcEntry.childPolicyWrappers) != 0 {
state := (*balancer.State)(atomic.LoadPointer(&dcEntry.childPolicyWrappers[0].state))
return state.Picker.Pick(info)
}
// In the unlikely event that we have a cache entry with no targets, we end up
// queueing the RPC.
return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
Expand Down Expand Up @@ -249,8 +248,8 @@ func (p *rlsPicker) sendRequestAndReturnPick(cacheKey cacheKey, bs *backoffState
// target if one is configured, or fails the pick with the given error.
func (p *rlsPicker) useDefaultPickIfPossible(info balancer.PickInfo, errOnNoDefault error) (balancer.PickResult, error) {
if p.defaultPolicy != nil {
_, res, err := p.pickIfFeasible(p.defaultPolicy, info)
return res, err
state := (*balancer.State)(atomic.LoadPointer(&p.defaultPolicy.state))
return state.Picker.Pick(info)
}
return balancer.PickResult{}, errOnNoDefault
}
Expand All @@ -275,27 +274,6 @@ func (p *rlsPicker) sendRouteLookupRequest(cacheKey cacheKey, bs *backoffState,
return throttled
}

// pickIfFeasible determines if a pick can be delegated to child policy based on
// its connectivity state.
// - If state is CONNECTING, the pick is to be queued
// - If state is IDLE, the child policy is instructed to exit idle, and the pick
// is to be queued
// - If state is READY, pick it delegated to the child policy's picker
func (p *rlsPicker) pickIfFeasible(cpw *childPolicyWrapper, info balancer.PickInfo) (bool, balancer.PickResult, error) {
state := (*balancer.State)(atomic.LoadPointer(&cpw.state))
switch state.ConnectivityState {
case connectivity.Connecting:
return true, balancer.PickResult{}, balancer.ErrNoSubConnAvailable
case connectivity.Idle:
p.bg.ExitIdleOne(cpw.target)
return true, balancer.PickResult{}, balancer.ErrNoSubConnAvailable
case connectivity.Ready:
r, e := state.Picker.Pick(info)
return true, r, e
}
return false, balancer.PickResult{}, balancer.ErrNoSubConnAvailable
}

// handleRouteLookupResponse is the callback invoked by the control channel upon
// receipt of an RLS response. Modifies the data cache and pending requests map
// and sends a new picker.
Expand Down
51 changes: 47 additions & 4 deletions internal/testutils/xds/e2e/clientresources.go
Expand Up @@ -305,8 +305,44 @@ func DefaultRouteConfig(routeName, ldsTarget, clusterName string) *v3routepb.Rou

// DefaultCluster returns a basic xds Cluster resource.
func DefaultCluster(clusterName, edsServiceName string, secLevel SecurityLevel) *v3clusterpb.Cluster {
return ClusterResourceWithOptions(&ClusterOptions{
ClusterName: clusterName,
ServiceName: edsServiceName,
Policy: LoadBalancingPolicyRoundRobin,
SecurityLevel: secLevel,
})
}

// LoadBalancingPolicy determines the policy used for balancing load across
// endpoints in the Cluster.
type LoadBalancingPolicy int

const (
// LoadBalancingPolicyRoundRobin results in the use of the weighted_target
// LB policy to balance load across localities and endpoints in the cluster.
LoadBalancingPolicyRoundRobin LoadBalancingPolicy = iota
// LoadBalancingPolicyRingHash results in the use of the ring_hash LB policy
// as the leaf policy.
LoadBalancingPolicyRingHash
)

// ClusterOptions contains options to configure a Cluster resource.
type ClusterOptions struct {
// ClusterName is the name of the Cluster resource.
ClusterName string
// ServiceName is the EDS service name of the Cluster.
ServiceName string
// Policy is the LB policy to be used.
Policy LoadBalancingPolicy
// SecurityLevel determines the security configuration for the Cluster.
SecurityLevel SecurityLevel
}

// ClusterResourceWithOptions returns an xDS Cluster resource configured with
// the provided options.
func ClusterResourceWithOptions(opts *ClusterOptions) *v3clusterpb.Cluster {
var tlsContext *v3tlspb.UpstreamTlsContext
switch secLevel {
switch opts.SecurityLevel {
case SecurityLevelNone:
case SecurityLevelTLS:
tlsContext = &v3tlspb.UpstreamTlsContext{
Expand All @@ -333,18 +369,25 @@ func DefaultCluster(clusterName, edsServiceName string, secLevel SecurityLevel)
}
}

var lbPolicy v3clusterpb.Cluster_LbPolicy
switch opts.Policy {
case LoadBalancingPolicyRoundRobin:
lbPolicy = v3clusterpb.Cluster_ROUND_ROBIN
case LoadBalancingPolicyRingHash:
lbPolicy = v3clusterpb.Cluster_RING_HASH
}
cluster := &v3clusterpb.Cluster{
Name: clusterName,
Name: opts.ClusterName,
ClusterDiscoveryType: &v3clusterpb.Cluster_Type{Type: v3clusterpb.Cluster_EDS},
EdsClusterConfig: &v3clusterpb.Cluster_EdsClusterConfig{
EdsConfig: &v3corepb.ConfigSource{
ConfigSourceSpecifier: &v3corepb.ConfigSource_Ads{
Ads: &v3corepb.AggregatedConfigSource{},
},
},
ServiceName: edsServiceName,
ServiceName: opts.ServiceName,
},
LbPolicy: v3clusterpb.Cluster_ROUND_ROBIN,
LbPolicy: lbPolicy,
}
if tlsContext != nil {
cluster.TransportSocket = &v3corepb.TransportSocket{
Expand Down
32 changes: 29 additions & 3 deletions test/xds/xds_rls_clusterspecifier_plugin_test.go
Expand Up @@ -46,15 +46,20 @@ import (

// defaultClientResourcesWithRLSCSP returns a set of resources (LDS, RDS, CDS, EDS) for a
// client to connect to a server with a RLS Load Balancer as a child of Cluster Manager.
func defaultClientResourcesWithRLSCSP(params e2e.ResourceParams, rlsProto *rlspb.RouteLookupConfig) e2e.UpdateOptions {
func defaultClientResourcesWithRLSCSP(lb e2e.LoadBalancingPolicy, params e2e.ResourceParams, rlsProto *rlspb.RouteLookupConfig) e2e.UpdateOptions {
routeConfigName := "route-" + params.DialTarget
clusterName := "cluster-" + params.DialTarget
endpointsName := "endpoints-" + params.DialTarget
return e2e.UpdateOptions{
NodeID: params.NodeID,
Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(params.DialTarget, routeConfigName)},
Routes: []*v3routepb.RouteConfiguration{defaultRouteConfigWithRLSCSP(routeConfigName, params.DialTarget, rlsProto)},
Clusters: []*v3clusterpb.Cluster{e2e.DefaultCluster(clusterName, endpointsName, params.SecLevel)},
Clusters: []*v3clusterpb.Cluster{e2e.ClusterResourceWithOptions(&e2e.ClusterOptions{
ClusterName: clusterName,
ServiceName: endpointsName,
Policy: lb,
SecurityLevel: params.SecLevel,
})},
Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(endpointsName, params.Host, []uint32{params.Port})},
}
}
Expand Down Expand Up @@ -93,6 +98,27 @@ func defaultRouteConfigWithRLSCSP(routeName, ldsTarget string, rlsProto *rlspb.R
// target corresponding to this test service. This test asserts an RPC proceeds
// as normal with the RLS Balancer as part of system.
func (s) TestRLSinxDS(t *testing.T) {
tests := []struct {
name string
lbPolicy e2e.LoadBalancingPolicy
}{
{
name: "roundrobin",
lbPolicy: e2e.LoadBalancingPolicyRoundRobin,
},
{
name: "ringhash",
lbPolicy: e2e.LoadBalancingPolicyRingHash,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
testRLSinxDS(t, test.lbPolicy)
})
}
}

func testRLSinxDS(t *testing.T, lbPolicy e2e.LoadBalancingPolicy) {
oldRLS := envconfig.XDSRLS
envconfig.XDSRLS = true
internal.RegisterRLSClusterSpecifierPluginForTesting()
Expand All @@ -119,7 +145,7 @@ func (s) TestRLSinxDS(t *testing.T) {
}

const serviceName = "my-service-client-side-xds"
resources := defaultClientResourcesWithRLSCSP(e2e.ResourceParams{
resources := defaultClientResourcesWithRLSCSP(lbPolicy, e2e.ResourceParams{
DialTarget: serviceName,
NodeID: nodeID,
Host: "localhost",
Expand Down