From 75d3e439886908067e1e60034ed894ba91abc578 Mon Sep 17 00:00:00 2001 From: Terry Wilson Date: Thu, 28 Apr 2022 09:14:56 -0700 Subject: [PATCH 1/6] Use new wrr_locality LB policy. Instead of providing round robin or least request configurations directly, ClientXdsClient now wraps them in a WRR locality config. ClusterResolverLoadBalancer passes this configuration directly to PriorityLoadBalancer to use as the endpoint LB policy it provides to ClusterImplLoadBalancer. A new ResolvedAddresses attribute is also set that has all the locality weights. This is needed by WrrLocalityLoadBalancer when it configures WeightedTargetLoadBalancer. --- .../grpc/xds/ClusterResolverLoadBalancer.java | 32 +----- .../xds/LegacyLoadBalancerConfigFactory.java | 16 ++- .../io/grpc/xds/ClientXdsClientDataTest.java | 6 +- .../io/grpc/xds/ClientXdsClientTestBase.java | 93 +++++++++++----- .../xds/ClusterResolverLoadBalancerTest.java | 100 +++++++++--------- .../LegacyLoadBalancerConfigFactoryTest.java | 29 +++-- 6 files changed, 163 insertions(+), 113 deletions(-) diff --git a/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java b/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java index 1642aba93d4..6d2f232cf38 100644 --- a/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java @@ -19,7 +19,6 @@ import static com.google.common.base.Preconditions.checkNotNull; import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; import static io.grpc.xds.XdsLbPolicies.PRIORITY_POLICY_NAME; -import static io.grpc.xds.XdsLbPolicies.WEIGHTED_TARGET_POLICY_NAME; import com.google.common.annotations.VisibleForTesting; import io.grpc.Attributes; @@ -49,8 +48,6 @@ import io.grpc.xds.EnvoyServerProtoData.UpstreamTlsContext; import io.grpc.xds.PriorityLoadBalancerProvider.PriorityLbConfig; import io.grpc.xds.PriorityLoadBalancerProvider.PriorityLbConfig.PriorityChildConfig; -import io.grpc.xds.WeightedTargetLoadBalancerProvider.WeightedPolicySelection; -import io.grpc.xds.WeightedTargetLoadBalancerProvider.WeightedTargetConfig; import io.grpc.xds.XdsClient.EdsResourceWatcher; import io.grpc.xds.XdsClient.EdsUpdate; import io.grpc.xds.XdsLogger.XdsLogLevel; @@ -155,6 +152,7 @@ private final class ClusterResolverLbState extends LoadBalancer { private final Helper helper; private final List clusters = new ArrayList<>(); private final Map clusterStates = new HashMap<>(); + private final Map localityWeights = new HashMap<>(); private PolicySelection endpointLbPolicy; private ResolvedAddresses resolvedAddresses; private LoadBalancer childLb; @@ -249,6 +247,8 @@ private void handleEndpointResourceUpdate() { resolvedAddresses.toBuilder() .setLoadBalancingPolicyConfig(childConfig) .setAddresses(Collections.unmodifiableList(addresses)) + .setAttributes(resolvedAddresses.getAttributes().toBuilder() + .set(InternalXdsAttributes.ATTR_LOCALITY_WEIGHTS, localityWeights).build()) .build()); } @@ -409,6 +409,7 @@ public void run() { "Discard locality {0} with 0 healthy endpoints", locality); continue; } + localityWeights.put(locality, localityLbInfo.localityWeight()); if (!prioritizedLocalityWeights.containsKey(priorityName)) { prioritizedLocalityWeights.put(priorityName, new HashMap()); } @@ -686,32 +687,9 @@ private static Map generateEdsBasedPriorityChildCon List dropOverloads) { Map configs = new HashMap<>(); for (String priority : prioritizedLocalityWeights.keySet()) { - PolicySelection leafPolicy = endpointLbPolicy; - // Depending on the endpoint-level load balancing policy, different LB hierarchy may be - // created. If the endpoint-level LB policy is round_robin or least_request_experimental, - // it creates a two-level LB hierarchy: a locality-level LB policy that balances load - // according to locality weights followed by an endpoint-level LB policy that balances load - // between endpoints within the locality. If the endpoint-level LB policy is - // ring_hash_experimental, it creates a unified LB policy that balances load by weighing the - // product of each endpoint's weight and the weight of the locality it belongs to. - if (endpointLbPolicy.getProvider().getPolicyName().equals("round_robin") - || endpointLbPolicy.getProvider().getPolicyName().equals("least_request_experimental")) { - Map localityWeights = prioritizedLocalityWeights.get(priority); - Map targets = new HashMap<>(); - for (Locality locality : localityWeights.keySet()) { - int weight = localityWeights.get(locality); - WeightedPolicySelection target = new WeightedPolicySelection(weight, endpointLbPolicy); - targets.put(localityName(locality), target); - } - LoadBalancerProvider weightedTargetLbProvider = - lbRegistry.getProvider(WEIGHTED_TARGET_POLICY_NAME); - WeightedTargetConfig weightedTargetConfig = - new WeightedTargetConfig(Collections.unmodifiableMap(targets)); - leafPolicy = new PolicySelection(weightedTargetLbProvider, weightedTargetConfig); - } ClusterImplConfig clusterImplConfig = new ClusterImplConfig(cluster, edsServiceName, lrsServerInfo, maxConcurrentRequests, - dropOverloads, leafPolicy, tlsContext); + dropOverloads, endpointLbPolicy, tlsContext); LoadBalancerProvider clusterImplLbProvider = lbRegistry.getProvider(XdsLbPolicies.CLUSTER_IMPL_POLICY_NAME); PolicySelection clusterImplPolicy = diff --git a/xds/src/main/java/io/grpc/xds/LegacyLoadBalancerConfigFactory.java b/xds/src/main/java/io/grpc/xds/LegacyLoadBalancerConfigFactory.java index 7f6fdfcd75d..3f6ff17345e 100644 --- a/xds/src/main/java/io/grpc/xds/LegacyLoadBalancerConfigFactory.java +++ b/xds/src/main/java/io/grpc/xds/LegacyLoadBalancerConfigFactory.java @@ -16,6 +16,7 @@ package io.grpc.xds; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import io.envoyproxy.envoy.config.cluster.v3.Cluster; import io.envoyproxy.envoy.config.cluster.v3.Cluster.LeastRequestLbConfig; @@ -38,6 +39,9 @@ abstract class LegacyLoadBalancerConfigFactory { static final String LEAST_REQUEST_FIELD_NAME = "least_request_experimental"; static final String CHOICE_COUNT_FIELD_NAME = "choiceCount"; + static final String WRR_LOCALITY_FIELD_NAME = "wrr_locality_experimental"; + static final String CHILD_POLICY_FIELD = "childPolicy"; + /** * Factory method for creating a new {link LoadBalancerConfigConverter} for a given xDS {@link * Cluster}. @@ -47,13 +51,13 @@ abstract class LegacyLoadBalancerConfigFactory { static ImmutableMap newConfig(Cluster cluster, boolean enableLeastRequest) throws ResourceInvalidException { switch (cluster.getLbPolicy()) { - case ROUND_ROBIN: - return newRoundRobinConfig(); case RING_HASH: return newRingHashConfig(cluster); + case ROUND_ROBIN: + return newWrrLocalityConfig(newRoundRobinConfig()); case LEAST_REQUEST: if (enableLeastRequest) { - return newLeastRequestConfig(cluster); + return newWrrLocalityConfig(newLeastRequestConfig(cluster)); } break; default: @@ -62,6 +66,12 @@ abstract class LegacyLoadBalancerConfigFactory { "Cluster " + cluster.getName() + ": unsupported lb policy: " + cluster.getLbPolicy()); } + private static ImmutableMap newWrrLocalityConfig( + ImmutableMap childConfig) { + return ImmutableMap.builder().put(WRR_LOCALITY_FIELD_NAME, + ImmutableMap.of(CHILD_POLICY_FIELD, ImmutableList.of(childConfig))).build(); + } + // Builds an empty configuration for round robin (it is not configurable). private static ImmutableMap newRoundRobinConfig() { return ImmutableMap.of(ROUND_ROBIN_FIELD_NAME, ImmutableMap.of()); diff --git a/xds/src/test/java/io/grpc/xds/ClientXdsClientDataTest.java b/xds/src/test/java/io/grpc/xds/ClientXdsClientDataTest.java index 2fabefa0c66..3ee84721dc4 100644 --- a/xds/src/test/java/io/grpc/xds/ClientXdsClientDataTest.java +++ b/xds/src/test/java/io/grpc/xds/ClientXdsClientDataTest.java @@ -1760,7 +1760,11 @@ public void parseCluster_leastRequestLbPolicy_defaultLbConfig() throws ResourceI cluster, new HashSet(), null, LRS_SERVER_INFO, LoadBalancerRegistry.getDefaultRegistry()); LbConfig lbConfig = ServiceConfigUtil.unwrapLoadBalancingConfig(update.lbPolicyConfig()); - assertThat(lbConfig.getPolicyName()).isEqualTo("least_request_experimental"); + assertThat(lbConfig.getPolicyName()).isEqualTo("wrr_locality_experimental"); + @SuppressWarnings("unchecked") + List childConfigs = ServiceConfigUtil.unwrapLoadBalancingConfigList( + (List>) lbConfig.getRawConfigValue().get("childPolicy")); + assertThat(childConfigs.get(0).getPolicyName()).isEqualTo("least_request_experimental"); } @Test diff --git a/xds/src/test/java/io/grpc/xds/ClientXdsClientTestBase.java b/xds/src/test/java/io/grpc/xds/ClientXdsClientTestBase.java index 5e57add5c13..12db1d233e6 100644 --- a/xds/src/test/java/io/grpc/xds/ClientXdsClientTestBase.java +++ b/xds/src/test/java/io/grpc/xds/ClientXdsClientTestBase.java @@ -1628,8 +1628,12 @@ public void cdsResourceFound() { assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE); assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS); assertThat(cdsUpdate.edsServiceName()).isNull(); - assertThat(ServiceConfigUtil.unwrapLoadBalancingConfig(cdsUpdate.lbPolicyConfig()) - .getPolicyName()).isEqualTo("round_robin"); + LbConfig lbConfig = ServiceConfigUtil.unwrapLoadBalancingConfig(cdsUpdate.lbPolicyConfig()); + assertThat(lbConfig.getPolicyName()).isEqualTo("wrr_locality_experimental"); + @SuppressWarnings("unchecked") + List childConfigs = ServiceConfigUtil.unwrapLoadBalancingConfigList( + (List>) lbConfig.getRawConfigValue().get("childPolicy")); + assertThat(childConfigs.get(0).getPolicyName()).isEqualTo("round_robin"); assertThat(cdsUpdate.lrsServerInfo()).isNull(); assertThat(cdsUpdate.maxConcurrentRequests()).isNull(); assertThat(cdsUpdate.upstreamTlsContext()).isNull(); @@ -1651,8 +1655,12 @@ public void wrappedCdsResource() { assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE); assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS); assertThat(cdsUpdate.edsServiceName()).isNull(); - assertThat(ServiceConfigUtil.unwrapLoadBalancingConfig(cdsUpdate.lbPolicyConfig()) - .getPolicyName()).isEqualTo("round_robin"); + LbConfig lbConfig = ServiceConfigUtil.unwrapLoadBalancingConfig(cdsUpdate.lbPolicyConfig()); + assertThat(lbConfig.getPolicyName()).isEqualTo("wrr_locality_experimental"); + @SuppressWarnings("unchecked") + List childConfigs = ServiceConfigUtil.unwrapLoadBalancingConfigList( + (List>) lbConfig.getRawConfigValue().get("childPolicy")); + assertThat(childConfigs.get(0).getPolicyName()).isEqualTo("round_robin"); assertThat(cdsUpdate.lrsServerInfo()).isNull(); assertThat(cdsUpdate.maxConcurrentRequests()).isNull(); assertThat(cdsUpdate.upstreamTlsContext()).isNull(); @@ -1680,8 +1688,12 @@ public void cdsResourceFound_leastRequestLbPolicy() { assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS); assertThat(cdsUpdate.edsServiceName()).isNull(); LbConfig lbConfig = ServiceConfigUtil.unwrapLoadBalancingConfig(cdsUpdate.lbPolicyConfig()); - assertThat(lbConfig.getPolicyName()).isEqualTo("least_request_experimental"); - assertThat(lbConfig.getRawConfigValue().get("choiceCount")).isEqualTo(3); + assertThat(lbConfig.getPolicyName()).isEqualTo("wrr_locality_experimental"); + @SuppressWarnings("unchecked") + List childConfigs = ServiceConfigUtil.unwrapLoadBalancingConfigList( + (List>) lbConfig.getRawConfigValue().get("childPolicy")); + assertThat(childConfigs.get(0).getPolicyName()).isEqualTo("least_request_experimental"); + assertThat(childConfigs.get(0).getRawConfigValue().get("choiceCount")).isEqualTo(3); assertThat(cdsUpdate.lrsServerInfo()).isNull(); assertThat(cdsUpdate.maxConcurrentRequests()).isNull(); assertThat(cdsUpdate.upstreamTlsContext()).isNull(); @@ -1736,8 +1748,12 @@ public void cdsResponseWithAggregateCluster() { CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue(); assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE); assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.AGGREGATE); - assertThat(ServiceConfigUtil.unwrapLoadBalancingConfig(cdsUpdate.lbPolicyConfig()) - .getPolicyName()).isEqualTo("round_robin"); + LbConfig lbConfig = ServiceConfigUtil.unwrapLoadBalancingConfig(cdsUpdate.lbPolicyConfig()); + assertThat(lbConfig.getPolicyName()).isEqualTo("wrr_locality_experimental"); + @SuppressWarnings("unchecked") + List childConfigs = ServiceConfigUtil.unwrapLoadBalancingConfigList( + (List>) lbConfig.getRawConfigValue().get("childPolicy")); + assertThat(childConfigs.get(0).getPolicyName()).isEqualTo("round_robin"); assertThat(cdsUpdate.prioritizedClusterNames()).containsExactlyElementsIn(candidates).inOrder(); verifyResourceMetadataAcked(CDS, CDS_RESOURCE, clusterAggregate, VERSION_1, TIME_INCREMENT); verifySubscribedResourcesMetadataSizes(0, 1, 0, 0); @@ -1758,8 +1774,12 @@ public void cdsResponseWithCircuitBreakers() { assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE); assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS); assertThat(cdsUpdate.edsServiceName()).isNull(); - assertThat(ServiceConfigUtil.unwrapLoadBalancingConfig(cdsUpdate.lbPolicyConfig()) - .getPolicyName()).isEqualTo("round_robin"); + LbConfig lbConfig = ServiceConfigUtil.unwrapLoadBalancingConfig(cdsUpdate.lbPolicyConfig()); + assertThat(lbConfig.getPolicyName()).isEqualTo("wrr_locality_experimental"); + @SuppressWarnings("unchecked") + List childConfigs = ServiceConfigUtil.unwrapLoadBalancingConfigList( + (List>) lbConfig.getRawConfigValue().get("childPolicy")); + assertThat(childConfigs.get(0).getPolicyName()).isEqualTo("round_robin"); assertThat(cdsUpdate.lrsServerInfo()).isNull(); assertThat(cdsUpdate.maxConcurrentRequests()).isEqualTo(200L); assertThat(cdsUpdate.upstreamTlsContext()).isNull(); @@ -1903,8 +1923,12 @@ public void cachedCdsResource_data() { assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE); assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS); assertThat(cdsUpdate.edsServiceName()).isNull(); - assertThat(ServiceConfigUtil.unwrapLoadBalancingConfig(cdsUpdate.lbPolicyConfig()) - .getPolicyName()).isEqualTo("round_robin"); + LbConfig lbConfig = ServiceConfigUtil.unwrapLoadBalancingConfig(cdsUpdate.lbPolicyConfig()); + assertThat(lbConfig.getPolicyName()).isEqualTo("wrr_locality_experimental"); + @SuppressWarnings("unchecked") + List childConfigs = ServiceConfigUtil.unwrapLoadBalancingConfigList( + (List>) lbConfig.getRawConfigValue().get("childPolicy")); + assertThat(childConfigs.get(0).getPolicyName()).isEqualTo("round_robin"); assertThat(cdsUpdate.lrsServerInfo()).isNull(); assertThat(cdsUpdate.maxConcurrentRequests()).isNull(); assertThat(cdsUpdate.upstreamTlsContext()).isNull(); @@ -1929,6 +1953,7 @@ public void cachedCdsResource_absent() { } @Test + @SuppressWarnings("unchecked") public void cdsResourceUpdated() { DiscoveryRpcCall call = startResourceWatcher(CDS, CDS_RESOURCE, cdsResourceWatcher); verifyResourceMetadataRequested(CDS, CDS_RESOURCE); @@ -1946,8 +1971,11 @@ public void cdsResourceUpdated() { assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE); assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.LOGICAL_DNS); assertThat(cdsUpdate.dnsHostName()).isEqualTo(dnsHostAddr + ":" + dnsHostPort); - assertThat(ServiceConfigUtil.unwrapLoadBalancingConfig(cdsUpdate.lbPolicyConfig()) - .getPolicyName()).isEqualTo("round_robin"); + LbConfig lbConfig = ServiceConfigUtil.unwrapLoadBalancingConfig(cdsUpdate.lbPolicyConfig()); + assertThat(lbConfig.getPolicyName()).isEqualTo("wrr_locality_experimental"); + List childConfigs = ServiceConfigUtil.unwrapLoadBalancingConfigList( + (List>) lbConfig.getRawConfigValue().get("childPolicy")); + assertThat(childConfigs.get(0).getPolicyName()).isEqualTo("round_robin"); assertThat(cdsUpdate.lrsServerInfo()).isNull(); assertThat(cdsUpdate.maxConcurrentRequests()).isNull(); assertThat(cdsUpdate.upstreamTlsContext()).isNull(); @@ -1966,8 +1994,11 @@ public void cdsResourceUpdated() { assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE); assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS); assertThat(cdsUpdate.edsServiceName()).isEqualTo(edsService); - assertThat(ServiceConfigUtil.unwrapLoadBalancingConfig(cdsUpdate.lbPolicyConfig()) - .getPolicyName()).isEqualTo("round_robin"); + lbConfig = ServiceConfigUtil.unwrapLoadBalancingConfig(cdsUpdate.lbPolicyConfig()); + assertThat(lbConfig.getPolicyName()).isEqualTo("wrr_locality_experimental"); + childConfigs = ServiceConfigUtil.unwrapLoadBalancingConfigList( + (List>) lbConfig.getRawConfigValue().get("childPolicy")); + assertThat(childConfigs.get(0).getPolicyName()).isEqualTo("round_robin"); assertThat(cdsUpdate.lrsServerInfo()).isEqualTo(lrsServerInfo); assertThat(cdsUpdate.maxConcurrentRequests()).isNull(); assertThat(cdsUpdate.upstreamTlsContext()).isNull(); @@ -2035,8 +2066,12 @@ public void cdsResourceDeleted() { assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE); assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS); assertThat(cdsUpdate.edsServiceName()).isNull(); - assertThat(ServiceConfigUtil.unwrapLoadBalancingConfig(cdsUpdate.lbPolicyConfig()) - .getPolicyName()).isEqualTo("round_robin"); + LbConfig lbConfig = ServiceConfigUtil.unwrapLoadBalancingConfig(cdsUpdate.lbPolicyConfig()); + assertThat(lbConfig.getPolicyName()).isEqualTo("wrr_locality_experimental"); + @SuppressWarnings("unchecked") + List childConfigs = ServiceConfigUtil.unwrapLoadBalancingConfigList( + (List>) lbConfig.getRawConfigValue().get("childPolicy")); + assertThat(childConfigs.get(0).getPolicyName()).isEqualTo("round_robin"); assertThat(cdsUpdate.lrsServerInfo()).isNull(); assertThat(cdsUpdate.maxConcurrentRequests()).isNull(); assertThat(cdsUpdate.upstreamTlsContext()).isNull(); @@ -2053,6 +2088,7 @@ public void cdsResourceDeleted() { } @Test + @SuppressWarnings("unchecked") public void multipleCdsWatchers() { String cdsResourceTwo = "cluster-bar.googleapis.com"; CdsResourceWatcher watcher1 = mock(CdsResourceWatcher.class); @@ -2088,8 +2124,11 @@ public void multipleCdsWatchers() { assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE); assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.LOGICAL_DNS); assertThat(cdsUpdate.dnsHostName()).isEqualTo(dnsHostAddr + ":" + dnsHostPort); - assertThat(ServiceConfigUtil.unwrapLoadBalancingConfig(cdsUpdate.lbPolicyConfig()) - .getPolicyName()).isEqualTo("round_robin"); + LbConfig lbConfig = ServiceConfigUtil.unwrapLoadBalancingConfig(cdsUpdate.lbPolicyConfig()); + assertThat(lbConfig.getPolicyName()).isEqualTo("wrr_locality_experimental"); + List childConfigs = ServiceConfigUtil.unwrapLoadBalancingConfigList( + (List>) lbConfig.getRawConfigValue().get("childPolicy")); + assertThat(childConfigs.get(0).getPolicyName()).isEqualTo("round_robin"); assertThat(cdsUpdate.lrsServerInfo()).isNull(); assertThat(cdsUpdate.maxConcurrentRequests()).isNull(); assertThat(cdsUpdate.upstreamTlsContext()).isNull(); @@ -2098,8 +2137,11 @@ public void multipleCdsWatchers() { assertThat(cdsUpdate.clusterName()).isEqualTo(cdsResourceTwo); assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS); assertThat(cdsUpdate.edsServiceName()).isEqualTo(edsService); - assertThat(ServiceConfigUtil.unwrapLoadBalancingConfig(cdsUpdate.lbPolicyConfig()) - .getPolicyName()).isEqualTo("round_robin"); + lbConfig = ServiceConfigUtil.unwrapLoadBalancingConfig(cdsUpdate.lbPolicyConfig()); + assertThat(lbConfig.getPolicyName()).isEqualTo("wrr_locality_experimental"); + childConfigs = ServiceConfigUtil.unwrapLoadBalancingConfigList( + (List>) lbConfig.getRawConfigValue().get("childPolicy")); + assertThat(childConfigs.get(0).getPolicyName()).isEqualTo("round_robin"); assertThat(cdsUpdate.lrsServerInfo()).isEqualTo(lrsServerInfo); assertThat(cdsUpdate.maxConcurrentRequests()).isNull(); assertThat(cdsUpdate.upstreamTlsContext()).isNull(); @@ -2108,8 +2150,11 @@ public void multipleCdsWatchers() { assertThat(cdsUpdate.clusterName()).isEqualTo(cdsResourceTwo); assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS); assertThat(cdsUpdate.edsServiceName()).isEqualTo(edsService); - assertThat(ServiceConfigUtil.unwrapLoadBalancingConfig(cdsUpdate.lbPolicyConfig()) - .getPolicyName()).isEqualTo("round_robin"); + lbConfig = ServiceConfigUtil.unwrapLoadBalancingConfig(cdsUpdate.lbPolicyConfig()); + assertThat(lbConfig.getPolicyName()).isEqualTo("wrr_locality_experimental"); + childConfigs = ServiceConfigUtil.unwrapLoadBalancingConfigList( + (List>) lbConfig.getRawConfigValue().get("childPolicy")); + assertThat(childConfigs.get(0).getPolicyName()).isEqualTo("round_robin"); assertThat(cdsUpdate.lrsServerInfo()).isEqualTo(lrsServerInfo); assertThat(cdsUpdate.maxConcurrentRequests()).isNull(); assertThat(cdsUpdate.upstreamTlsContext()).isNull(); diff --git a/xds/src/test/java/io/grpc/xds/ClusterResolverLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/ClusterResolverLoadBalancerTest.java index 9b9df80f1ba..a1ab3b9bede 100644 --- a/xds/src/test/java/io/grpc/xds/ClusterResolverLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/ClusterResolverLoadBalancerTest.java @@ -20,6 +20,7 @@ import static io.grpc.xds.XdsLbPolicies.CLUSTER_IMPL_POLICY_NAME; import static io.grpc.xds.XdsLbPolicies.PRIORITY_POLICY_NAME; import static io.grpc.xds.XdsLbPolicies.WEIGHTED_TARGET_POLICY_NAME; +import static io.grpc.xds.XdsLbPolicies.WRR_LOCALITY_POLICY_NAME; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; @@ -69,8 +70,7 @@ import io.grpc.xds.PriorityLoadBalancerProvider.PriorityLbConfig; import io.grpc.xds.PriorityLoadBalancerProvider.PriorityLbConfig.PriorityChildConfig; import io.grpc.xds.RingHashLoadBalancer.RingHashConfig; -import io.grpc.xds.WeightedTargetLoadBalancerProvider.WeightedPolicySelection; -import io.grpc.xds.WeightedTargetLoadBalancerProvider.WeightedTargetConfig; +import io.grpc.xds.WrrLocalityLoadBalancer.WrrLocalityConfig; import io.grpc.xds.internal.sds.CommonTlsContextTestsUtil; import java.net.SocketAddress; import java.net.URI; @@ -133,12 +133,15 @@ public void uncaughtException(Thread t, Throwable e) { private final FakeClock fakeClock = new FakeClock(); private final LoadBalancerRegistry lbRegistry = new LoadBalancerRegistry(); private final NameResolverRegistry nsRegistry = new NameResolverRegistry(); - private final PolicySelection roundRobin = - new PolicySelection(new FakeLoadBalancerProvider("round_robin"), null); + private final PolicySelection roundRobin = new PolicySelection( + new FakeLoadBalancerProvider("wrr_locality_experimental"), new WrrLocalityConfig( + new PolicySelection(new FakeLoadBalancerProvider("round_robin"), null))); private final PolicySelection ringHash = new PolicySelection( new FakeLoadBalancerProvider("ring_hash_experimental"), new RingHashConfig(10L, 100L)); private final PolicySelection leastRequest = new PolicySelection( - new FakeLoadBalancerProvider("least_request_experimental"), new LeastRequestConfig(3)); + new FakeLoadBalancerProvider("wrr_locality_experimental"), new WrrLocalityConfig( + new PolicySelection(new FakeLoadBalancerProvider("least_request_experimental"), + new LeastRequestConfig(3)))); private final List childBalancers = new ArrayList<>(); private final List resolvers = new ArrayList<>(); private final FakeXdsClient xdsClient = new FakeXdsClient(); @@ -303,10 +306,16 @@ public void edsClustersWithLeastRequestEndpointLbPolicy() { ClusterImplConfig clusterImplConfig = (ClusterImplConfig) priorityChildConfig.policySelection.getConfig(); assertClusterImplConfig(clusterImplConfig, CLUSTER1, EDS_SERVICE_NAME1, LRS_SERVER_INFO, 100L, - tlsContext, Collections.emptyList(), WEIGHTED_TARGET_POLICY_NAME); - WeightedTargetConfig weightedTargetConfig = - (WeightedTargetConfig) clusterImplConfig.childPolicy.getConfig(); - assertThat(weightedTargetConfig.targets.keySet()).containsExactly(locality1.toString()); + tlsContext, Collections.emptyList(), WRR_LOCALITY_POLICY_NAME); + WrrLocalityConfig wrrLocalityConfig = + (WrrLocalityConfig) clusterImplConfig.childPolicy.getConfig(); + assertThat(wrrLocalityConfig.childPolicy.getProvider().getPolicyName()).isEqualTo( + "least_request_experimental"); + + Map localityWeights = childBalancer.attributes.get( + InternalXdsAttributes.ATTR_LOCALITY_WEIGHTS); + assertThat(localityWeights).containsKey(locality1); + assertThat(localityWeights.get(locality1)).isEqualTo(100); } @Test @@ -366,13 +375,12 @@ public void onlyEdsClusters_receivedEndpoints() { ClusterImplConfig clusterImplConfig1 = (ClusterImplConfig) priorityChildConfig1.policySelection.getConfig(); assertClusterImplConfig(clusterImplConfig1, CLUSTER2, EDS_SERVICE_NAME2, LRS_SERVER_INFO, 200L, - tlsContext, Collections.emptyList(), WEIGHTED_TARGET_POLICY_NAME); - WeightedTargetConfig weightedTargetConfig1 = - (WeightedTargetConfig) clusterImplConfig1.childPolicy.getConfig(); - assertThat(weightedTargetConfig1.targets.keySet()).containsExactly(locality1.toString()); - WeightedPolicySelection target1 = weightedTargetConfig1.targets.get(locality1.toString()); - assertThat(target1.weight).isEqualTo(70); - assertThat(target1.policySelection.getProvider().getPolicyName()).isEqualTo("round_robin"); + tlsContext, Collections.emptyList(), WRR_LOCALITY_POLICY_NAME); + assertThat(clusterImplConfig1.childPolicy.getConfig()).isInstanceOf(WrrLocalityConfig.class); + WrrLocalityConfig wrrLocalityConfig1 = + (WrrLocalityConfig) clusterImplConfig1.childPolicy.getConfig(); + assertThat(wrrLocalityConfig1.childPolicy.getProvider().getPolicyName()).isEqualTo( + "round_robin"); PriorityChildConfig priorityChildConfig2 = priorityLbConfig.childConfigs.get(priority2); assertThat(priorityChildConfig2.ignoreReresolution).isTrue(); @@ -381,21 +389,12 @@ public void onlyEdsClusters_receivedEndpoints() { ClusterImplConfig clusterImplConfig2 = (ClusterImplConfig) priorityChildConfig2.policySelection.getConfig(); assertClusterImplConfig(clusterImplConfig2, CLUSTER2, EDS_SERVICE_NAME2, LRS_SERVER_INFO, 200L, - tlsContext, Collections.emptyList(), WEIGHTED_TARGET_POLICY_NAME); - WeightedTargetConfig weightedTargetConfig2 = - (WeightedTargetConfig) clusterImplConfig2.childPolicy.getConfig(); - assertThat(weightedTargetConfig2.targets.keySet()).containsExactly(locality3.toString()); - WeightedPolicySelection target2 = weightedTargetConfig2.targets.get(locality3.toString()); - assertThat(target2.weight).isEqualTo(20); - assertThat(target2.policySelection.getProvider().getPolicyName()).isEqualTo("round_robin"); - List priorityAddrs1 = - AddressFilter.filter(childBalancer.addresses, priority1); - assertThat(priorityAddrs1).hasSize(2); - assertAddressesEqual(Arrays.asList(endpoint1, endpoint2), priorityAddrs1); - List priorityAddrs2 = - AddressFilter.filter(childBalancer.addresses, priority2); - assertThat(priorityAddrs2).hasSize(1); - assertAddressesEqual(Collections.singletonList(endpoint4), priorityAddrs2); + tlsContext, Collections.emptyList(), WRR_LOCALITY_POLICY_NAME); + assertThat(clusterImplConfig2.childPolicy.getConfig()).isInstanceOf(WrrLocalityConfig.class); + WrrLocalityConfig wrrLocalityConfig2 = + (WrrLocalityConfig) clusterImplConfig1.childPolicy.getConfig(); + assertThat(wrrLocalityConfig2.childPolicy.getProvider().getPolicyName()).isEqualTo( + "round_robin"); PriorityChildConfig priorityChildConfig3 = priorityLbConfig.childConfigs.get(priority3); assertThat(priorityChildConfig3.ignoreReresolution).isTrue(); @@ -404,17 +403,21 @@ public void onlyEdsClusters_receivedEndpoints() { ClusterImplConfig clusterImplConfig3 = (ClusterImplConfig) priorityChildConfig3.policySelection.getConfig(); assertClusterImplConfig(clusterImplConfig3, CLUSTER1, EDS_SERVICE_NAME1, LRS_SERVER_INFO, 100L, - tlsContext, Collections.emptyList(), WEIGHTED_TARGET_POLICY_NAME); - WeightedTargetConfig weightedTargetConfig3 = - (WeightedTargetConfig) clusterImplConfig3.childPolicy.getConfig(); - assertThat(weightedTargetConfig3.targets.keySet()).containsExactly(locality2.toString()); - WeightedPolicySelection target3 = weightedTargetConfig3.targets.get(locality2.toString()); - assertThat(target3.weight).isEqualTo(10); - assertThat(target3.policySelection.getProvider().getPolicyName()).isEqualTo("round_robin"); - List priorityAddrs3 = - AddressFilter.filter(childBalancer.addresses, priority3); - assertThat(priorityAddrs3).hasSize(1); - assertAddressesEqual(Collections.singletonList(endpoint3), priorityAddrs3); + tlsContext, Collections.emptyList(), WRR_LOCALITY_POLICY_NAME); + assertThat(clusterImplConfig3.childPolicy.getConfig()).isInstanceOf(WrrLocalityConfig.class); + WrrLocalityConfig wrrLocalityConfig3 = + (WrrLocalityConfig) clusterImplConfig1.childPolicy.getConfig(); + assertThat(wrrLocalityConfig3.childPolicy.getProvider().getPolicyName()).isEqualTo( + "round_robin"); + + Map localityWeights = childBalancer.attributes.get( + InternalXdsAttributes.ATTR_LOCALITY_WEIGHTS); + assertThat(localityWeights).containsKey(locality1); + assertThat(localityWeights.get(locality1)).isEqualTo(70); + assertThat(localityWeights).containsKey(locality2); + assertThat(localityWeights.get(locality2)).isEqualTo(10); + assertThat(localityWeights).containsKey(locality3); + assertThat(localityWeights.get(locality3)).isEqualTo(20); } @Test @@ -510,19 +513,14 @@ public void handleEdsResource_ignoreLocalitiesWithNoHealthyEndpoints() { LocalityLbEndpoints.create( Collections.singletonList(LbEndpoint.create(endpoint2, 100, true /* isHealthy */)), 10 /* localityWeight */, 1 /* priority */); - String priority = CLUSTER1 + "[priority1]"; xdsClient.deliverClusterLoadAssignment( EDS_SERVICE_NAME1, ImmutableMap.of(locality1, localityLbEndpoints1, locality2, localityLbEndpoints2)); FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); - PriorityLbConfig priorityLbConfig = (PriorityLbConfig) childBalancer.config; - PriorityChildConfig priorityChildConfig = priorityLbConfig.childConfigs.get(priority); - ClusterImplConfig clusterImplConfig = - (ClusterImplConfig) priorityChildConfig.policySelection.getConfig(); - WeightedTargetConfig weightedTargetConfig = - (WeightedTargetConfig) clusterImplConfig.childPolicy.getConfig(); - assertThat(weightedTargetConfig.targets.keySet()).containsExactly(locality2.toString()); + Map localityWeights = childBalancer.attributes.get( + InternalXdsAttributes.ATTR_LOCALITY_WEIGHTS); + assertThat(localityWeights.keySet()).containsExactly(locality2); } @Test @@ -1142,6 +1140,7 @@ private final class FakeLoadBalancer extends LoadBalancer { private final Helper helper; private List addresses; private Object config; + private Attributes attributes; private Status upstreamError; private boolean shutdown; @@ -1154,6 +1153,7 @@ private final class FakeLoadBalancer extends LoadBalancer { public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) { addresses = resolvedAddresses.getAddresses(); config = resolvedAddresses.getLoadBalancingPolicyConfig(); + attributes = resolvedAddresses.getAttributes(); } @Override diff --git a/xds/src/test/java/io/grpc/xds/LegacyLoadBalancerConfigFactoryTest.java b/xds/src/test/java/io/grpc/xds/LegacyLoadBalancerConfigFactoryTest.java index e72e177df53..a2809537a5a 100644 --- a/xds/src/test/java/io/grpc/xds/LegacyLoadBalancerConfigFactoryTest.java +++ b/xds/src/test/java/io/grpc/xds/LegacyLoadBalancerConfigFactoryTest.java @@ -30,6 +30,8 @@ import io.grpc.internal.ServiceConfigUtil; import io.grpc.internal.ServiceConfigUtil.LbConfig; import io.grpc.xds.ClientXdsClient.ResourceInvalidException; +import java.util.List; +import java.util.Map; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -46,9 +48,14 @@ public void roundRobin() throws ResourceInvalidException { LbConfig lbConfig = ServiceConfigUtil.unwrapLoadBalancingConfig( LegacyLoadBalancerConfigFactory.newConfig(cluster, true)); - - assertThat(lbConfig.getPolicyName()).isEqualTo("round_robin"); - assertThat(lbConfig.getRawConfigValue()).isEmpty(); + assertThat(lbConfig.getPolicyName()).isEqualTo("wrr_locality_experimental"); + + @SuppressWarnings("unchecked") + List childConfigs = ServiceConfigUtil.unwrapLoadBalancingConfigList( + (List>) lbConfig.getRawConfigValue().get("childPolicy")); + assertThat(childConfigs).hasSize(1); + assertThat(childConfigs.get(0).getPolicyName()).isEqualTo("round_robin"); + assertThat(childConfigs.get(0).getRawConfigValue()).isEmpty(); } @Test @@ -70,7 +77,7 @@ public void ringHash() throws ResourceInvalidException { } @Test - public void ringHash_invalidHash() throws ResourceInvalidException { + public void ringHash_invalidHash() { Cluster cluster = Cluster.newBuilder().setLbPolicy(LbPolicy.RING_HASH).setRingHashLbConfig( RingHashLbConfig.newBuilder().setHashFunction(HashFunction.MURMUR_HASH_2)).build(); @@ -96,14 +103,20 @@ public void leastRequest() throws ResourceInvalidException { LbConfig lbConfig = ServiceConfigUtil.unwrapLoadBalancingConfig( LegacyLoadBalancerConfigFactory.newConfig(cluster, true)); - - assertThat(lbConfig.getPolicyName()).isEqualTo("least_request_experimental"); - assertThat(JsonUtil.getNumberAsLong(lbConfig.getRawConfigValue(), "choiceCount")).isEqualTo(10); + assertThat(lbConfig.getPolicyName()).isEqualTo("wrr_locality_experimental"); + + @SuppressWarnings("unchecked") + List childConfigs = ServiceConfigUtil.unwrapLoadBalancingConfigList( + (List>) lbConfig.getRawConfigValue().get("childPolicy")); + assertThat(childConfigs.get(0).getPolicyName()).isEqualTo("least_request_experimental"); + assertThat( + JsonUtil.getNumberAsLong(childConfigs.get(0).getRawConfigValue(), "choiceCount")).isEqualTo( + 10); } @Test - public void leastRequest_notEnabled() throws ResourceInvalidException { + public void leastRequest_notEnabled() { System.setProperty("io.grpc.xds.experimentalEnableLeastRequest", "false"); Cluster cluster = Cluster.newBuilder().setLbPolicy(LbPolicy.LEAST_REQUEST).build(); From 8a022149ab5e685d11289977f2ce1a1fb38e92be Mon Sep 17 00:00:00 2001 From: Terry Wilson Date: Mon, 2 May 2022 19:04:35 -0700 Subject: [PATCH 2/6] xds: Use load_balancing_policy if provided in Cluster. Renames the LegacyLoadBalancerConfigFactory to just LoadBalancerConfigFactory and gives it responsibility for both the legacy and the new LB config mechanism. The new configuration mechanism is explained in gRFC A52: https://github.com/grpc/proposal/pull/298 --- .../java/io/grpc/xds/ClientXdsClient.java | 4 +- .../xds/LegacyLoadBalancerConfigFactory.java | 115 ------ .../grpc/xds/LoadBalancerConfigFactory.java | 326 ++++++++++++++++ .../LegacyLoadBalancerConfigFactoryTest.java | 133 ------- .../xds/LoadBalancerConfigFactoryTest.java | 348 ++++++++++++++++++ 5 files changed, 675 insertions(+), 251 deletions(-) delete mode 100644 xds/src/main/java/io/grpc/xds/LegacyLoadBalancerConfigFactory.java create mode 100644 xds/src/main/java/io/grpc/xds/LoadBalancerConfigFactory.java delete mode 100644 xds/src/test/java/io/grpc/xds/LegacyLoadBalancerConfigFactoryTest.java create mode 100644 xds/src/test/java/io/grpc/xds/LoadBalancerConfigFactoryTest.java diff --git a/xds/src/main/java/io/grpc/xds/ClientXdsClient.java b/xds/src/main/java/io/grpc/xds/ClientXdsClient.java index 9e40eab6e7e..ff14a23ab05 100644 --- a/xds/src/main/java/io/grpc/xds/ClientXdsClient.java +++ b/xds/src/main/java/io/grpc/xds/ClientXdsClient.java @@ -1635,9 +1635,7 @@ static CdsUpdate processCluster(Cluster cluster, Set retainedEdsResource } CdsUpdate.Builder updateBuilder = structOrError.getStruct(); - // TODO: If load_balancing_policy is set in Cluster use it for LB config, otherwise fall back - // to using the legacy lb_policy field. - ImmutableMap lbPolicyConfig = LegacyLoadBalancerConfigFactory.newConfig(cluster, + ImmutableMap lbPolicyConfig = LoadBalancerConfigFactory.newConfig(cluster, enableLeastRequest); // Validate the LB config by trying to parse it with the corresponding LB provider. diff --git a/xds/src/main/java/io/grpc/xds/LegacyLoadBalancerConfigFactory.java b/xds/src/main/java/io/grpc/xds/LegacyLoadBalancerConfigFactory.java deleted file mode 100644 index 3f6ff17345e..00000000000 --- a/xds/src/main/java/io/grpc/xds/LegacyLoadBalancerConfigFactory.java +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Copyright 2022 The gRPC Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.grpc.xds; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import io.envoyproxy.envoy.config.cluster.v3.Cluster; -import io.envoyproxy.envoy.config.cluster.v3.Cluster.LeastRequestLbConfig; -import io.envoyproxy.envoy.config.cluster.v3.Cluster.RingHashLbConfig; -import io.grpc.xds.ClientXdsClient.ResourceInvalidException; - -/** - * Builds a JSON LB configuration based on the old style of using the xDS Cluster proto message. The - * lb_policy field is used to select the policy and configuration is extracted from various policy - * specific fields in Cluster. - */ -abstract class LegacyLoadBalancerConfigFactory { - - static final String ROUND_ROBIN_FIELD_NAME = "round_robin"; - - static final String RING_HASH_FIELD_NAME = "ring_hash_experimental"; - static final String MIN_RING_SIZE_FIELD_NAME = "minRingSize"; - static final String MAX_RING_SIZE_FIELD_NAME = "maxRingSize"; - - static final String LEAST_REQUEST_FIELD_NAME = "least_request_experimental"; - static final String CHOICE_COUNT_FIELD_NAME = "choiceCount"; - - static final String WRR_LOCALITY_FIELD_NAME = "wrr_locality_experimental"; - static final String CHILD_POLICY_FIELD = "childPolicy"; - - /** - * Factory method for creating a new {link LoadBalancerConfigConverter} for a given xDS {@link - * Cluster}. - * - * @throws ResourceInvalidException If the {@link Cluster} has an invalid LB configuration. - */ - static ImmutableMap newConfig(Cluster cluster, boolean enableLeastRequest) - throws ResourceInvalidException { - switch (cluster.getLbPolicy()) { - case RING_HASH: - return newRingHashConfig(cluster); - case ROUND_ROBIN: - return newWrrLocalityConfig(newRoundRobinConfig()); - case LEAST_REQUEST: - if (enableLeastRequest) { - return newWrrLocalityConfig(newLeastRequestConfig(cluster)); - } - break; - default: - } - throw new ResourceInvalidException( - "Cluster " + cluster.getName() + ": unsupported lb policy: " + cluster.getLbPolicy()); - } - - private static ImmutableMap newWrrLocalityConfig( - ImmutableMap childConfig) { - return ImmutableMap.builder().put(WRR_LOCALITY_FIELD_NAME, - ImmutableMap.of(CHILD_POLICY_FIELD, ImmutableList.of(childConfig))).build(); - } - - // Builds an empty configuration for round robin (it is not configurable). - private static ImmutableMap newRoundRobinConfig() { - return ImmutableMap.of(ROUND_ROBIN_FIELD_NAME, ImmutableMap.of()); - } - - // Builds a ring hash config and validates the hash function selection. - private static ImmutableMap newRingHashConfig(Cluster cluster) - throws ResourceInvalidException { - RingHashLbConfig lbConfig = cluster.getRingHashLbConfig(); - - // The hash function needs to be validated here as it is not exposed in the returned - // configuration for later validation. - if (lbConfig.getHashFunction() != RingHashLbConfig.HashFunction.XX_HASH) { - throw new ResourceInvalidException( - "Cluster " + cluster.getName() + ": invalid ring hash function: " + lbConfig); - } - - ImmutableMap.Builder configBuilder = ImmutableMap.builder(); - if (lbConfig.hasMinimumRingSize()) { - configBuilder.put(MIN_RING_SIZE_FIELD_NAME, - ((Long) lbConfig.getMinimumRingSize().getValue()).doubleValue()); - } - if (lbConfig.hasMaximumRingSize()) { - configBuilder.put(MAX_RING_SIZE_FIELD_NAME, - ((Long) lbConfig.getMaximumRingSize().getValue()).doubleValue()); - } - return ImmutableMap.of(RING_HASH_FIELD_NAME, configBuilder.buildOrThrow()); - } - - // Builds a new least request config. - private static ImmutableMap newLeastRequestConfig(Cluster cluster) { - LeastRequestLbConfig lbConfig = cluster.getLeastRequestLbConfig(); - - ImmutableMap.Builder configBuilder = ImmutableMap.builder(); - if (lbConfig.hasChoiceCount()) { - configBuilder.put(CHOICE_COUNT_FIELD_NAME, - ((Integer) lbConfig.getChoiceCount().getValue()).doubleValue()); - } - return ImmutableMap.of(LEAST_REQUEST_FIELD_NAME, configBuilder.buildOrThrow()); - } -} diff --git a/xds/src/main/java/io/grpc/xds/LoadBalancerConfigFactory.java b/xds/src/main/java/io/grpc/xds/LoadBalancerConfigFactory.java new file mode 100644 index 00000000000..79f34dda1da --- /dev/null +++ b/xds/src/main/java/io/grpc/xds/LoadBalancerConfigFactory.java @@ -0,0 +1,326 @@ +/* + * Copyright 2022 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds; + +import com.github.xds.type.v3.TypedStruct; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import com.google.protobuf.Any; +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.util.JsonFormat; +import io.envoyproxy.envoy.config.cluster.v3.Cluster; +import io.envoyproxy.envoy.config.cluster.v3.Cluster.LeastRequestLbConfig; +import io.envoyproxy.envoy.config.cluster.v3.Cluster.RingHashLbConfig; +import io.envoyproxy.envoy.config.cluster.v3.LoadBalancingPolicy; +import io.envoyproxy.envoy.config.cluster.v3.LoadBalancingPolicy.Policy; +import io.envoyproxy.envoy.extensions.load_balancing_policies.ring_hash.v3.RingHash; +import io.envoyproxy.envoy.extensions.load_balancing_policies.round_robin.v3.RoundRobin; +import io.envoyproxy.envoy.extensions.load_balancing_policies.wrr_locality.v3.WrrLocality; +import io.grpc.InternalLogId; +import io.grpc.LoadBalancerRegistry; +import io.grpc.internal.JsonParser; +import io.grpc.xds.ClientXdsClient.ResourceInvalidException; +import io.grpc.xds.LoadBalancerConfigFactory.LoadBalancingPolicyConverter.MaxRecursionReachedException; +import io.grpc.xds.XdsLogger.XdsLogLevel; +import java.io.IOException; +import java.util.Map; + +/** + * Creates service config JSON load balancer config objects for a given xDS Cluster message. + * Supports both the "legacy" configuration style and the new, more advanced one that utilizes the + * xDS "typed extension" mechanism. + * + *

Legacy configuration is done by setting the lb_policy enum field and any supporting + * configuration fields needed by the particular policy. + * + *

The new approach is to set the load_balancing_policy field that contains both the policy + * selection as well as any supporting configuration data. Providing a list of acceptable policies + * is also supported. Note that if this field is used, it will override any configuration set using + * the legacy approach. The new configuration approach is explained in detail in the Custom LB Policies + * gRFC + */ +class LoadBalancerConfigFactory { + + private static XdsLogger logger = XdsLogger.withLogId( + InternalLogId.allocate("xds-client-lbconfig-factory", null)); + + static final String ROUND_ROBIN_FIELD_NAME = "round_robin"; + + static final String RING_HASH_FIELD_NAME = "ring_hash_experimental"; + static final String MIN_RING_SIZE_FIELD_NAME = "minRingSize"; + static final String MAX_RING_SIZE_FIELD_NAME = "maxRingSize"; + + static final String LEAST_REQUEST_FIELD_NAME = "least_request_experimental"; + static final String CHOICE_COUNT_FIELD_NAME = "choiceCount"; + + static final String WRR_LOCALITY_FIELD_NAME = "wrr_locality_experimental"; + static final String CHILD_POLICY_FIELD = "childPolicy"; + + static final String TYPE_URL_PREFIX_REGEX = "type\\.googleapis\\.com/"; + + /** + * Factory method for creating a new {link LoadBalancerConfigConverter} for a given xDS {@link + * Cluster}. + * + * @throws ResourceInvalidException If the {@link Cluster} has an invalid LB configuration. + */ + static ImmutableMap newConfig(Cluster cluster, boolean enableLeastRequest) + throws ResourceInvalidException { + // The new load_balancing_policy will always be used if it is set, but for backward + // compatibility we will fall back to using the old lb_policy field if the new field is not set. + if (cluster.hasLoadBalancingPolicy()) { + try { + return LoadBalancingPolicyConverter.convertToServiceConfig(cluster.getLoadBalancingPolicy(), + 0); + } catch (MaxRecursionReachedException e) { + throw new ResourceInvalidException("Maximum LB config recursion depth reached"); + } + } else { + return LegacyLoadBalancingPolicyConverter.convertToServiceConfig(cluster, enableLeastRequest); + } + } + + /** + * Builds a service config JSON object for the ring_hash load balancer config based on the given + * config values. + */ + private static ImmutableMap buildRingHashConfig(Long minRingSize, Long maxRingSize) { + ImmutableMap.Builder configBuilder = ImmutableMap.builder(); + if (minRingSize != null) { + configBuilder.put(MIN_RING_SIZE_FIELD_NAME, minRingSize.doubleValue()); + } + if (maxRingSize != null) { + configBuilder.put(MAX_RING_SIZE_FIELD_NAME, maxRingSize.doubleValue()); + } + return ImmutableMap.of(RING_HASH_FIELD_NAME, configBuilder.build()); + } + + /** + * Builds a service config JSON object for the least_request load balancer config based on the + * given config values.. + */ + private static ImmutableMap buildLeastRequestConfig(Integer choiceCount) { + ImmutableMap.Builder configBuilder = ImmutableMap.builder(); + if (choiceCount != null) { + configBuilder.put(CHOICE_COUNT_FIELD_NAME, choiceCount.doubleValue()); + } + return ImmutableMap.of(LEAST_REQUEST_FIELD_NAME, configBuilder.build()); + } + + /** + * Builds a service config JSON wrr_locality by wrapping another policy config. + */ + private static ImmutableMap buildWrrLocalityConfig( + ImmutableMap childConfig) { + return ImmutableMap.builder().put(WRR_LOCALITY_FIELD_NAME, + ImmutableMap.of(CHILD_POLICY_FIELD, ImmutableList.of(childConfig))).build(); + } + + /** + * Builds an empty service config JSON config object for round robin (it is not configurable). + */ + private static ImmutableMap buildRoundRobinConfig() { + return ImmutableMap.of(ROUND_ROBIN_FIELD_NAME, ImmutableMap.of()); + } + + /** + * Responsible for converting from a {@code envoy.config.cluster.v3.LoadBalancingPolicy} proto + * message to a gRPC service config format. + */ + static class LoadBalancingPolicyConverter { + + private static final int MAX_RECURSION = 16; + + /** + * Converts a {@link LoadBalancingPolicy} object to a service config JSON object. + */ + private static ImmutableMap convertToServiceConfig( + LoadBalancingPolicy loadBalancingPolicy, int recursionDepth) + throws ResourceInvalidException { + if (recursionDepth > MAX_RECURSION) { + throw new MaxRecursionReachedException(); + } + ImmutableMap serviceConfig = null; + + for (Policy policy : loadBalancingPolicy.getPoliciesList()) { + Any typedConfig = policy.getTypedExtensionConfig().getTypedConfig(); + try { + if (typedConfig.is(RingHash.class)) { + serviceConfig = convertRingHashConfig(typedConfig); + } else if (typedConfig.is(WrrLocality.class)) { + serviceConfig = convertWrrLocalityConfig(typedConfig, recursionDepth); + } else if (typedConfig.is(RoundRobin.class)) { + serviceConfig = convertRoundRobinConfig(); + } else if (typedConfig.is(TypedStruct.class)) { + serviceConfig = convertCustomConfig(typedConfig); + } + // TODO: support least_request once it is added to the envoy protos. + } catch (InvalidProtocolBufferException e) { + logger.log(XdsLogLevel.WARNING, "Invalid Any protobuf for policy {0}: {1}", + typedConfig.getTypeUrl(), e.getMessage()); + continue; + } catch (ResourceInvalidException e) { + logger.log(XdsLogLevel.WARNING, "Invalid configuration for policy {0}: {1}", + typedConfig.getTypeUrl(), e.getMessage()); + continue; + } + // The service config is expected to have a single root entry, where the name of that entry + // is the name of the policy. A Load balancer with this name must exist in the registry. + if (LoadBalancerRegistry.getDefaultRegistry() + .getProvider(Iterables.getOnlyElement(serviceConfig.keySet())) == null) { + logger.log(XdsLogLevel.WARNING, "Policy {0} not found in the LB registry, skipping", + typedConfig.getTypeUrl()); + continue; + } else { + return serviceConfig; + } + } + + // If we could not find a Policy that we could both convert as well as find a provider for + // then we have an invalid LB policy configuration. + throw new ResourceInvalidException("Invalid LoadBalancingPolicy: " + loadBalancingPolicy); + } + + /** + * Converts a ring_hash {@link Any} configuration to service config format. + */ + private static ImmutableMap convertRingHashConfig(Any config) + throws InvalidProtocolBufferException, ResourceInvalidException { + RingHash ringHash = config.unpack(RingHash.class); + + // The hash function needs to be validated here as it is not exposed in the returned + // configuration for later validation. + if (RingHash.HashFunction.XX_HASH != ringHash.getHashFunction()) { + throw new ResourceInvalidException( + "Invalid ring hash function: " + ringHash.getHashFunction()); + } + + return buildRingHashConfig( + ringHash.hasMinimumRingSize() ? ringHash.getMinimumRingSize().getValue() : null, + ringHash.hasMaximumRingSize() ? ringHash.getMaximumRingSize().getValue() : null); + } + + /** + * Converts a wrr_locality {@link Any} configuration to service config format. + */ + private static ImmutableMap convertWrrLocalityConfig(Any config, int recursionDepth) + throws InvalidProtocolBufferException, ResourceInvalidException { + WrrLocality wrrLocality = config.unpack(WrrLocality.class); + return buildWrrLocalityConfig( + convertToServiceConfig(wrrLocality.getEndpointPickingPolicy(), recursionDepth + 1)); + } + + /** + * "Converts" a round_robin configuration to service config format. + */ + private static ImmutableMap convertRoundRobinConfig() { + return buildRoundRobinConfig(); + } + + /** + * Converts a custom LB config {@link Any} configuration to service config format. + */ + @SuppressWarnings("unchecked") + private static ImmutableMap convertCustomConfig(Any config) + throws InvalidProtocolBufferException, ResourceInvalidException { + TypedStruct configTypedStruct = config.unpack(TypedStruct.class); + Object rawJsonConfig = null; + try { + rawJsonConfig = JsonParser.parse(JsonFormat.printer().print(configTypedStruct.getValue())); + } catch (IOException e) { + throw new ResourceInvalidException("Unable to parse custom LB config JSON", e); + } + + if (!(rawJsonConfig instanceof Map)) { + throw new ResourceInvalidException("Custom LB config does not contain a JSON object"); + } + + return ImmutableMap.of( + configTypedStruct.getTypeUrl().replaceFirst(TYPE_URL_PREFIX_REGEX, ""), + (Map) rawJsonConfig); + } + + // Used to signal that the LB config goes too deep. + static class MaxRecursionReachedException extends RuntimeException { + static final long serialVersionUID = 1L; + } + } + + /** + * Builds a JSON LB configuration based on the old style of using the xDS Cluster proto message. + * The lb_policy field is used to select the policy and configuration is extracted from various + * policy specific fields in Cluster. + */ + static class LegacyLoadBalancingPolicyConverter { + + /** + * Factory method for creating a new {link LoadBalancerConfigConverter} for a given xDS {@link + * Cluster}. + * + * @throws ResourceInvalidException If the {@link Cluster} has an invalid LB configuration. + */ + static ImmutableMap convertToServiceConfig(Cluster cluster, + boolean enableLeastRequest) throws ResourceInvalidException { + switch (cluster.getLbPolicy()) { + case RING_HASH: + return convertRingHashConfig(cluster); + case ROUND_ROBIN: + return buildWrrLocalityConfig(buildRoundRobinConfig()); + case LEAST_REQUEST: + if (enableLeastRequest) { + return buildWrrLocalityConfig(convertLeastRequestConfig(cluster)); + } + break; + default: + } + throw new ResourceInvalidException( + "Cluster " + cluster.getName() + ": unsupported lb policy: " + cluster.getLbPolicy()); + } + + /** + * Creates a new ring_hash service config JSON object based on the old {@link RingHashLbConfig} + * config message. + */ + private static ImmutableMap convertRingHashConfig(Cluster cluster) + throws ResourceInvalidException { + RingHashLbConfig lbConfig = cluster.getRingHashLbConfig(); + + // The hash function needs to be validated here as it is not exposed in the returned + // configuration for later validation. + if (lbConfig.getHashFunction() != RingHashLbConfig.HashFunction.XX_HASH) { + throw new ResourceInvalidException( + "Cluster " + cluster.getName() + ": invalid ring hash function: " + lbConfig); + } + + return buildRingHashConfig( + lbConfig.hasMinimumRingSize() ? (Long) lbConfig.getMinimumRingSize().getValue() : null, + lbConfig.hasMaximumRingSize() ? (Long) lbConfig.getMaximumRingSize().getValue() : null); + } + + /** + * Creates a new least_request service config JSON object based on the old {@link + * LeastRequestLbConfig} config message. + */ + private static ImmutableMap convertLeastRequestConfig(Cluster cluster) { + LeastRequestLbConfig lbConfig = cluster.getLeastRequestLbConfig(); + return buildLeastRequestConfig( + lbConfig.hasChoiceCount() ? (Integer) lbConfig.getChoiceCount().getValue() : null); + } + } +} diff --git a/xds/src/test/java/io/grpc/xds/LegacyLoadBalancerConfigFactoryTest.java b/xds/src/test/java/io/grpc/xds/LegacyLoadBalancerConfigFactoryTest.java deleted file mode 100644 index a2809537a5a..00000000000 --- a/xds/src/test/java/io/grpc/xds/LegacyLoadBalancerConfigFactoryTest.java +++ /dev/null @@ -1,133 +0,0 @@ -/* - * Copyright 2022 The gRPC Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.grpc.xds; - -import static com.google.common.truth.Truth.assertThat; -import static org.junit.Assert.fail; - -import com.google.protobuf.UInt32Value; -import com.google.protobuf.UInt64Value; -import io.envoyproxy.envoy.config.cluster.v3.Cluster; -import io.envoyproxy.envoy.config.cluster.v3.Cluster.LbPolicy; -import io.envoyproxy.envoy.config.cluster.v3.Cluster.LeastRequestLbConfig; -import io.envoyproxy.envoy.config.cluster.v3.Cluster.RingHashLbConfig; -import io.envoyproxy.envoy.config.cluster.v3.Cluster.RingHashLbConfig.HashFunction; -import io.grpc.internal.JsonUtil; -import io.grpc.internal.ServiceConfigUtil; -import io.grpc.internal.ServiceConfigUtil.LbConfig; -import io.grpc.xds.ClientXdsClient.ResourceInvalidException; -import java.util.List; -import java.util.Map; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Unit test for {@link LegacyLoadBalancerConfigFactory}. - */ -@RunWith(JUnit4.class) -public class LegacyLoadBalancerConfigFactoryTest { - - @Test - public void roundRobin() throws ResourceInvalidException { - Cluster cluster = Cluster.newBuilder().setLbPolicy(LbPolicy.ROUND_ROBIN).build(); - - LbConfig lbConfig = ServiceConfigUtil.unwrapLoadBalancingConfig( - LegacyLoadBalancerConfigFactory.newConfig(cluster, true)); - assertThat(lbConfig.getPolicyName()).isEqualTo("wrr_locality_experimental"); - - @SuppressWarnings("unchecked") - List childConfigs = ServiceConfigUtil.unwrapLoadBalancingConfigList( - (List>) lbConfig.getRawConfigValue().get("childPolicy")); - assertThat(childConfigs).hasSize(1); - assertThat(childConfigs.get(0).getPolicyName()).isEqualTo("round_robin"); - assertThat(childConfigs.get(0).getRawConfigValue()).isEmpty(); - } - - @Test - public void ringHash() throws ResourceInvalidException { - Cluster cluster = Cluster.newBuilder() - .setLbPolicy(LbPolicy.RING_HASH) - .setRingHashLbConfig( - RingHashLbConfig.newBuilder() - .setMinimumRingSize(UInt64Value.of(1)) - .setMaximumRingSize(UInt64Value.of(2))) - .build(); - - LbConfig lbConfig = ServiceConfigUtil.unwrapLoadBalancingConfig( - LegacyLoadBalancerConfigFactory.newConfig(cluster, true)); - - assertThat(lbConfig.getPolicyName()).isEqualTo("ring_hash_experimental"); - assertThat(JsonUtil.getNumberAsLong(lbConfig.getRawConfigValue(), "minRingSize")).isEqualTo(1); - assertThat(JsonUtil.getNumberAsLong(lbConfig.getRawConfigValue(), "maxRingSize")).isEqualTo(2); - } - - @Test - public void ringHash_invalidHash() { - Cluster cluster = Cluster.newBuilder().setLbPolicy(LbPolicy.RING_HASH).setRingHashLbConfig( - RingHashLbConfig.newBuilder().setHashFunction(HashFunction.MURMUR_HASH_2)).build(); - - try { - ServiceConfigUtil.unwrapLoadBalancingConfig( - LegacyLoadBalancerConfigFactory.newConfig(cluster, true)); - } catch (ResourceInvalidException e) { - assertThat(e).hasMessageThat().contains("invalid ring hash function"); - return; - } - fail("ResourceInvalidException not thrown"); - } - - @Test - public void leastRequest() throws ResourceInvalidException { - System.setProperty("io.grpc.xds.experimentalEnableLeastRequest", "true"); - - Cluster cluster = Cluster.newBuilder() - .setLbPolicy(LbPolicy.LEAST_REQUEST) - .setLeastRequestLbConfig( - LeastRequestLbConfig.newBuilder().setChoiceCount(UInt32Value.of(10))) - .build(); - - LbConfig lbConfig = ServiceConfigUtil.unwrapLoadBalancingConfig( - LegacyLoadBalancerConfigFactory.newConfig(cluster, true)); - assertThat(lbConfig.getPolicyName()).isEqualTo("wrr_locality_experimental"); - - @SuppressWarnings("unchecked") - List childConfigs = ServiceConfigUtil.unwrapLoadBalancingConfigList( - (List>) lbConfig.getRawConfigValue().get("childPolicy")); - assertThat(childConfigs.get(0).getPolicyName()).isEqualTo("least_request_experimental"); - assertThat( - JsonUtil.getNumberAsLong(childConfigs.get(0).getRawConfigValue(), "choiceCount")).isEqualTo( - 10); - } - - - @Test - public void leastRequest_notEnabled() { - System.setProperty("io.grpc.xds.experimentalEnableLeastRequest", "false"); - - Cluster cluster = Cluster.newBuilder().setLbPolicy(LbPolicy.LEAST_REQUEST).build(); - - try { - ServiceConfigUtil.unwrapLoadBalancingConfig( - LegacyLoadBalancerConfigFactory.newConfig(cluster, false)); - } catch (ResourceInvalidException e) { - assertThat(e).hasMessageThat().contains("unsupported lb policy"); - return; - } - fail("ResourceInvalidException not thrown"); - } -} diff --git a/xds/src/test/java/io/grpc/xds/LoadBalancerConfigFactoryTest.java b/xds/src/test/java/io/grpc/xds/LoadBalancerConfigFactoryTest.java new file mode 100644 index 00000000000..a455eb4b41d --- /dev/null +++ b/xds/src/test/java/io/grpc/xds/LoadBalancerConfigFactoryTest.java @@ -0,0 +1,348 @@ +/* + * Copyright 2022 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds; + +import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.fail; + +import com.github.xds.type.v3.TypedStruct; +import com.google.protobuf.Any; +import com.google.protobuf.Struct; +import com.google.protobuf.UInt32Value; +import com.google.protobuf.UInt64Value; +import com.google.protobuf.Value; +import io.envoyproxy.envoy.config.cluster.v3.Cluster; +import io.envoyproxy.envoy.config.cluster.v3.Cluster.LbPolicy; +import io.envoyproxy.envoy.config.cluster.v3.Cluster.LeastRequestLbConfig; +import io.envoyproxy.envoy.config.cluster.v3.Cluster.RingHashLbConfig; +import io.envoyproxy.envoy.config.cluster.v3.Cluster.RingHashLbConfig.HashFunction; +import io.envoyproxy.envoy.config.cluster.v3.LoadBalancingPolicy; +import io.envoyproxy.envoy.config.cluster.v3.LoadBalancingPolicy.Policy; +import io.envoyproxy.envoy.config.core.v3.TypedExtensionConfig; +import io.envoyproxy.envoy.extensions.load_balancing_policies.ring_hash.v3.RingHash; +import io.envoyproxy.envoy.extensions.load_balancing_policies.round_robin.v3.RoundRobin; +import io.envoyproxy.envoy.extensions.load_balancing_policies.wrr_locality.v3.WrrLocality; +import io.grpc.LoadBalancer; +import io.grpc.LoadBalancer.Helper; +import io.grpc.LoadBalancerProvider; +import io.grpc.LoadBalancerRegistry; +import io.grpc.internal.JsonUtil; +import io.grpc.internal.ServiceConfigUtil; +import io.grpc.internal.ServiceConfigUtil.LbConfig; +import io.grpc.xds.ClientXdsClient.ResourceInvalidException; +import java.util.List; +import java.util.Map; +import org.junit.After; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Unit test for {@link LoadBalancerConfigFactory}. + */ +@RunWith(JUnit4.class) +public class LoadBalancerConfigFactoryTest { + + private static final Policy ROUND_ROBIN_POLICY = Policy.newBuilder().setTypedExtensionConfig( + TypedExtensionConfig.newBuilder().setTypedConfig( + Any.pack(RoundRobin.newBuilder().build()))).build(); + + private static final long RING_HASH_MIN_RING_SIZE = 1; + private static final long RING_HASH_MAX_RING_SIZE = 2; + private static final Policy RING_HASH_POLICY = Policy.newBuilder().setTypedExtensionConfig( + TypedExtensionConfig.newBuilder().setTypedConfig(Any.pack( + RingHash.newBuilder() + .setMinimumRingSize(UInt64Value.of(RING_HASH_MIN_RING_SIZE)) + .setMaximumRingSize(UInt64Value.of(RING_HASH_MAX_RING_SIZE)) + .setHashFunction(RingHash.HashFunction.XX_HASH).build()))).build(); + + private static final String CUSTOM_POLICY_NAME = "myorg.MyCustomLeastRequestPolicy"; + private static final String CUSTOM_POLICY_FIELD_KEY = "choiceCount"; + private static final double CUSTOM_POLICY_FIELD_VALUE = 2; + private static final Policy CUSTOM_POLICY = Policy.newBuilder().setTypedExtensionConfig( + TypedExtensionConfig.newBuilder().setTypedConfig(Any.pack(TypedStruct.newBuilder() + .setTypeUrl("type.googleapis.com/" + CUSTOM_POLICY_NAME).setValue( + Struct.newBuilder() + .putFields(CUSTOM_POLICY_FIELD_KEY, + Value.newBuilder().setNumberValue(CUSTOM_POLICY_FIELD_VALUE).build())) + .build()))).build(); + private static final FakeCustomLoadBalancerProvider CUSTOM_POLICY_PROVIDER + = new FakeCustomLoadBalancerProvider(); + + private static Policy buildWrrPolicy(Policy childPolicy) { + return Policy.newBuilder().setTypedExtensionConfig(TypedExtensionConfig.newBuilder() + .setTypedConfig(Any.pack(WrrLocality.newBuilder() + .setEndpointPickingPolicy(LoadBalancingPolicy.newBuilder().addPolicies(childPolicy)) + .build()))).build(); + } + + @After + public void deregisterCustomProvider() { + LoadBalancerRegistry.getDefaultRegistry().deregister(CUSTOM_POLICY_PROVIDER); + } + + @Test + public void roundRobin() throws ResourceInvalidException { + Cluster cluster = Cluster.newBuilder() + .setLoadBalancingPolicy( + LoadBalancingPolicy.newBuilder().addPolicies(buildWrrPolicy(ROUND_ROBIN_POLICY))) + .build(); + + assertValidRoundRobin(ServiceConfigUtil.unwrapLoadBalancingConfig( + LoadBalancerConfigFactory.newConfig(cluster, true))); + } + + @Test + public void roundRobin_legacy() throws ResourceInvalidException { + Cluster cluster = Cluster.newBuilder().setLbPolicy(LbPolicy.ROUND_ROBIN).build(); + + assertValidRoundRobin(ServiceConfigUtil.unwrapLoadBalancingConfig( + LoadBalancerConfigFactory.newConfig(cluster, true))); + } + + private void assertValidRoundRobin(LbConfig lbConfig) { + assertThat(lbConfig.getPolicyName()).isEqualTo("wrr_locality_experimental"); + + @SuppressWarnings("unchecked") + List childConfigs = ServiceConfigUtil.unwrapLoadBalancingConfigList( + (List>) lbConfig.getRawConfigValue().get("childPolicy")); + assertThat(childConfigs).hasSize(1); + assertThat(childConfigs.get(0).getPolicyName()).isEqualTo("round_robin"); + assertThat(childConfigs.get(0).getRawConfigValue()).isEmpty(); + } + + @Test + public void ringHash() throws ResourceInvalidException { + Cluster cluster = Cluster.newBuilder() + .setLoadBalancingPolicy(LoadBalancingPolicy.newBuilder().addPolicies(RING_HASH_POLICY)) + .build(); + + assertValidRingHash(ServiceConfigUtil.unwrapLoadBalancingConfig( + LoadBalancerConfigFactory.newConfig(cluster, true))); + } + + @Test + public void ringHash_legacy() throws ResourceInvalidException { + Cluster cluster = Cluster.newBuilder() + .setLbPolicy(LbPolicy.RING_HASH) + .setRingHashLbConfig( + RingHashLbConfig.newBuilder() + .setMinimumRingSize(UInt64Value.of(RING_HASH_MIN_RING_SIZE)) + .setMaximumRingSize(UInt64Value.of(RING_HASH_MAX_RING_SIZE)) + .setHashFunction(HashFunction.XX_HASH)) + .build(); + + assertValidRingHash(ServiceConfigUtil.unwrapLoadBalancingConfig( + LoadBalancerConfigFactory.newConfig(cluster, true))); + } + + private void assertValidRingHash(LbConfig lbConfig) { + assertThat(lbConfig.getPolicyName()).isEqualTo("ring_hash_experimental"); + assertThat(JsonUtil.getNumberAsLong(lbConfig.getRawConfigValue(), "minRingSize")).isEqualTo( + RING_HASH_MIN_RING_SIZE); + assertThat(JsonUtil.getNumberAsLong(lbConfig.getRawConfigValue(), "maxRingSize")).isEqualTo( + RING_HASH_MAX_RING_SIZE); + } + + @Test + public void ringHash_invalidHash() { + Cluster cluster = Cluster.newBuilder() + .setLoadBalancingPolicy(LoadBalancingPolicy.newBuilder() + .addPolicies(Policy.newBuilder().setTypedExtensionConfig( + TypedExtensionConfig.newBuilder().setTypedConfig(Any.pack( + RingHash.newBuilder() + .setMinimumRingSize(UInt64Value.of(RING_HASH_MIN_RING_SIZE)) + .setMaximumRingSize(UInt64Value.of(RING_HASH_MAX_RING_SIZE)) + .setHashFunction(RingHash.HashFunction.MURMUR_HASH_2).build()))).build())) + .build(); + + try { + ServiceConfigUtil.unwrapLoadBalancingConfig( + LoadBalancerConfigFactory.newConfig(cluster, true)); + } catch (ResourceInvalidException e) { + // With the new config mechanism we get a more generic error than with the old one because the + // logic loops over potentially multiple configurations and only throws an exception at the + // end if there was no valid policies found. + assertThat(e).hasMessageThat().contains("Invalid LoadBalancingPolicy"); + return; + } + fail("ResourceInvalidException not thrown"); + } + + @Test + public void ringHash_invalidHash_legacy() { + Cluster cluster = Cluster.newBuilder().setLbPolicy(LbPolicy.RING_HASH).setRingHashLbConfig( + RingHashLbConfig.newBuilder().setHashFunction(HashFunction.MURMUR_HASH_2)).build(); + + try { + ServiceConfigUtil.unwrapLoadBalancingConfig( + LoadBalancerConfigFactory.newConfig(cluster, true)); + } catch (ResourceInvalidException e) { + assertThat(e).hasMessageThat().contains("invalid ring hash function"); + return; + } + fail("ResourceInvalidException not thrown"); + } + + @Test + public void leastRequest() throws ResourceInvalidException { + System.setProperty("io.grpc.xds.experimentalEnableLeastRequest", "true"); + + Cluster cluster = Cluster.newBuilder() + .setLbPolicy(LbPolicy.LEAST_REQUEST) + .setLeastRequestLbConfig( + LeastRequestLbConfig.newBuilder().setChoiceCount(UInt32Value.of(10))) + .build(); + + LbConfig lbConfig = ServiceConfigUtil.unwrapLoadBalancingConfig( + LoadBalancerConfigFactory.newConfig(cluster, true)); + assertThat(lbConfig.getPolicyName()).isEqualTo("wrr_locality_experimental"); + + @SuppressWarnings("unchecked") + List childConfigs = ServiceConfigUtil.unwrapLoadBalancingConfigList( + (List>) lbConfig.getRawConfigValue().get("childPolicy")); + assertThat(childConfigs.get(0).getPolicyName()).isEqualTo("least_request_experimental"); + assertThat( + JsonUtil.getNumberAsLong(childConfigs.get(0).getRawConfigValue(), "choiceCount")).isEqualTo( + 10); + } + + + @Test + public void leastRequest_notEnabled() { + System.setProperty("io.grpc.xds.experimentalEnableLeastRequest", "false"); + + Cluster cluster = Cluster.newBuilder().setLbPolicy(LbPolicy.LEAST_REQUEST).build(); + + try { + ServiceConfigUtil.unwrapLoadBalancingConfig( + LoadBalancerConfigFactory.newConfig(cluster, false)); + } catch (ResourceInvalidException e) { + assertThat(e).hasMessageThat().contains("unsupported lb policy"); + return; + } + fail("ResourceInvalidException not thrown"); + } + + @Test + public void customConfiguration() throws ResourceInvalidException { + LoadBalancerRegistry.getDefaultRegistry().register(CUSTOM_POLICY_PROVIDER); + + Cluster cluster = Cluster.newBuilder() + .setLoadBalancingPolicy( + LoadBalancingPolicy.newBuilder().addPolicies(buildWrrPolicy(CUSTOM_POLICY))) + .build(); + + assertValidCustomConfig(ServiceConfigUtil.unwrapLoadBalancingConfig( + LoadBalancerConfigFactory.newConfig(cluster, false))); + } + + // When a provider for the custom policy is available, the configuration should use it. + @Test + public void complexCustomConfig_customProviderRegistered() throws ResourceInvalidException { + LoadBalancerRegistry.getDefaultRegistry().register(CUSTOM_POLICY_PROVIDER); + + Cluster cluster = Cluster.newBuilder() + .setLoadBalancingPolicy( + LoadBalancingPolicy.newBuilder().addPolicies(buildWrrPolicy(CUSTOM_POLICY)) + .addPolicies(buildWrrPolicy(ROUND_ROBIN_POLICY))) + .build(); + + assertValidCustomConfig(ServiceConfigUtil.unwrapLoadBalancingConfig( + LoadBalancerConfigFactory.newConfig(cluster, false))); + } + + // When a provider for the custom policy is NOT available, we fall back to the next available one. + @Test + public void complexCustomConfig_customProviderNotRegistered() throws ResourceInvalidException { + Cluster cluster = Cluster.newBuilder() + .setLoadBalancingPolicy( + LoadBalancingPolicy.newBuilder().addPolicies(buildWrrPolicy(CUSTOM_POLICY)) + .addPolicies(buildWrrPolicy(ROUND_ROBIN_POLICY))) + .build(); + + assertValidRoundRobin(ServiceConfigUtil.unwrapLoadBalancingConfig( + LoadBalancerConfigFactory.newConfig(cluster, false))); + } + + private void assertValidCustomConfig(LbConfig lbConfig) { + assertThat(lbConfig.getPolicyName()).isEqualTo("wrr_locality_experimental"); + @SuppressWarnings("unchecked") + List childConfigs = ServiceConfigUtil.unwrapLoadBalancingConfigList( + (List>) lbConfig.getRawConfigValue().get("childPolicy")); + assertThat(childConfigs).hasSize(1); + assertThat(childConfigs.get(0).getPolicyName()).isEqualTo(CUSTOM_POLICY_NAME); + assertThat(childConfigs.get(0).getRawConfigValue().get(CUSTOM_POLICY_FIELD_KEY)).isEqualTo( + CUSTOM_POLICY_FIELD_VALUE); + } + + @Test + public void maxRecursion() { + Cluster cluster = Cluster.newBuilder() + .setLoadBalancingPolicy( + LoadBalancingPolicy.newBuilder().addPolicies( + buildWrrPolicy( // Wheee... + buildWrrPolicy( // ...eee... + buildWrrPolicy( // ...eee! + buildWrrPolicy( + buildWrrPolicy( + buildWrrPolicy( + buildWrrPolicy( + buildWrrPolicy( + buildWrrPolicy( + buildWrrPolicy( + buildWrrPolicy( + buildWrrPolicy( + buildWrrPolicy( + buildWrrPolicy( + buildWrrPolicy( + buildWrrPolicy( + buildWrrPolicy( + ROUND_ROBIN_POLICY))))))))))))))))))).build(); + + try { + LoadBalancerConfigFactory.newConfig(cluster, false); + } catch (ResourceInvalidException e) { + assertThat(e).hasMessageThat().contains("Maximum LB config recursion depth reached"); + return; + } + fail("Expected a ResourceInvalidException because of max recursion exceeded"); + } + + private static class FakeCustomLoadBalancerProvider extends LoadBalancerProvider { + + @Override + public LoadBalancer newLoadBalancer(Helper helper) { + return null; + } + + @Override + public boolean isAvailable() { + return true; + } + + @Override + public int getPriority() { + return 5; + } + + @Override + public String getPolicyName() { + return CUSTOM_POLICY_NAME; + } + } +} From 84bc3da51d49ebb7ba0c27eaffb7e7ccd7253311 Mon Sep 17 00:00:00 2001 From: Terry Wilson Date: Fri, 6 May 2022 09:22:47 -0700 Subject: [PATCH 3/6] Better localityWeights handling. --- .../grpc/xds/ClusterResolverLoadBalancer.java | 21 ++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java b/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java index 6d2f232cf38..8640941cd98 100644 --- a/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java @@ -152,7 +152,6 @@ private final class ClusterResolverLbState extends LoadBalancer { private final Helper helper; private final List clusters = new ArrayList<>(); private final Map clusterStates = new HashMap<>(); - private final Map localityWeights = new HashMap<>(); private PolicySelection endpointLbPolicy; private ResolvedAddresses resolvedAddresses; private LoadBalancer childLb; @@ -206,6 +205,8 @@ private void handleEndpointResourceUpdate() { List addresses = new ArrayList<>(); Map priorityChildConfigs = new HashMap<>(); List priorities = new ArrayList<>(); // totally ordered priority list + Map localityWeights = new HashMap<>(); + Status endpointNotFound = Status.OK; for (String cluster : clusters) { ClusterState state = clusterStates.get(cluster); @@ -217,6 +218,7 @@ private void handleEndpointResourceUpdate() { addresses.addAll(state.result.addresses); priorityChildConfigs.putAll(state.result.priorityChildConfigs); priorities.addAll(state.result.priorities); + localityWeights.putAll(state.result.localityWeights); } else { endpointNotFound = state.status; } @@ -248,7 +250,8 @@ private void handleEndpointResourceUpdate() { .setLoadBalancingPolicyConfig(childConfig) .setAddresses(Collections.unmodifiableList(addresses)) .setAttributes(resolvedAddresses.getAttributes().toBuilder() - .set(InternalXdsAttributes.ATTR_LOCALITY_WEIGHTS, localityWeights).build()) + .set(InternalXdsAttributes.ATTR_LOCALITY_WEIGHTS, + Collections.unmodifiableMap(localityWeights)).build()) .build()); } @@ -318,6 +321,7 @@ private abstract class ClusterState { // Most recently resolved addresses and config, or null if resource not exists. @Nullable protected ClusterResolutionResult result; + protected boolean shutdown; private ClusterState(String name, @Nullable ServerInfo lrsServerInfo, @@ -377,6 +381,7 @@ public void run() { } Map localityLbEndpoints = update.localityLbEndpointsMap; + Map localityWeights = new HashMap<>(); List dropOverloads = update.dropPolicies; List addresses = new ArrayList<>(); Map> prioritizedLocalityWeights = new HashMap<>(); @@ -429,7 +434,8 @@ public void run() { endpointLbPolicy, lbRegistry, prioritizedLocalityWeights, dropOverloads); status = Status.OK; resolved = true; - result = new ClusterResolutionResult(addresses, priorityChildConfigs, priorities); + result = new ClusterResolutionResult(addresses, priorityChildConfigs, priorities, + localityWeights); handleEndpointResourceUpdate(); } } @@ -635,18 +641,23 @@ private static class ClusterResolutionResult { private final Map priorityChildConfigs; // List of priority names ordered in descending priorities. private final List priorities; + // Most recent view on how localities in the cluster should be wighted. Only set for EDS + // clusters that support the concept. + private final Map localityWeights; ClusterResolutionResult(List addresses, String priority, PriorityChildConfig config) { this(addresses, Collections.singletonMap(priority, config), - Collections.singletonList(priority)); + Collections.singletonList(priority), Collections.emptyMap()); } ClusterResolutionResult(List addresses, - Map configs, List priorities) { + Map configs, List priorities, + Map localityWeights) { this.addresses = addresses; this.priorityChildConfigs = configs; this.priorities = priorities; + this.localityWeights = localityWeights; } } From 735956d3349ff5ef8c8183ae9fe3bde4055f13cc Mon Sep 17 00:00:00 2001 From: Terry Wilson Date: Fri, 6 May 2022 13:23:00 -0700 Subject: [PATCH 4/6] Review feedback. --- .../grpc/xds/LoadBalancerConfigFactory.java | 44 +++++++++---------- .../io/grpc/xds/ClientXdsClientDataTest.java | 4 +- 2 files changed, 24 insertions(+), 24 deletions(-) diff --git a/xds/src/main/java/io/grpc/xds/LoadBalancerConfigFactory.java b/xds/src/main/java/io/grpc/xds/LoadBalancerConfigFactory.java index 79f34dda1da..1118d90e958 100644 --- a/xds/src/main/java/io/grpc/xds/LoadBalancerConfigFactory.java +++ b/xds/src/main/java/io/grpc/xds/LoadBalancerConfigFactory.java @@ -72,8 +72,6 @@ class LoadBalancerConfigFactory { static final String WRR_LOCALITY_FIELD_NAME = "wrr_locality_experimental"; static final String CHILD_POLICY_FIELD = "childPolicy"; - static final String TYPE_URL_PREFIX_REGEX = "type\\.googleapis\\.com/"; - /** * Factory method for creating a new {link LoadBalancerConfigConverter} for a given xDS {@link * Cluster}. @@ -108,7 +106,7 @@ class LoadBalancerConfigFactory { if (maxRingSize != null) { configBuilder.put(MAX_RING_SIZE_FIELD_NAME, maxRingSize.doubleValue()); } - return ImmutableMap.of(RING_HASH_FIELD_NAME, configBuilder.build()); + return ImmutableMap.of(RING_HASH_FIELD_NAME, configBuilder.buildOrThrow()); } /** @@ -120,7 +118,7 @@ class LoadBalancerConfigFactory { if (choiceCount != null) { configBuilder.put(CHOICE_COUNT_FIELD_NAME, choiceCount.doubleValue()); } - return ImmutableMap.of(LEAST_REQUEST_FIELD_NAME, configBuilder.build()); + return ImmutableMap.of(LEAST_REQUEST_FIELD_NAME, configBuilder.buildOrThrow()); } /** @@ -129,7 +127,7 @@ class LoadBalancerConfigFactory { private static ImmutableMap buildWrrLocalityConfig( ImmutableMap childConfig) { return ImmutableMap.builder().put(WRR_LOCALITY_FIELD_NAME, - ImmutableMap.of(CHILD_POLICY_FIELD, ImmutableList.of(childConfig))).build(); + ImmutableMap.of(CHILD_POLICY_FIELD, ImmutableList.of(childConfig))).buildOrThrow(); } /** @@ -152,7 +150,7 @@ static class LoadBalancingPolicyConverter { */ private static ImmutableMap convertToServiceConfig( LoadBalancingPolicy loadBalancingPolicy, int recursionDepth) - throws ResourceInvalidException { + throws ResourceInvalidException, MaxRecursionReachedException { if (recursionDepth > MAX_RECURSION) { throw new MaxRecursionReachedException(); } @@ -162,13 +160,14 @@ static class LoadBalancingPolicyConverter { Any typedConfig = policy.getTypedExtensionConfig().getTypedConfig(); try { if (typedConfig.is(RingHash.class)) { - serviceConfig = convertRingHashConfig(typedConfig); + serviceConfig = convertRingHashConfig(typedConfig.unpack(RingHash.class)); } else if (typedConfig.is(WrrLocality.class)) { - serviceConfig = convertWrrLocalityConfig(typedConfig, recursionDepth); + serviceConfig = convertWrrLocalityConfig(typedConfig.unpack(WrrLocality.class), + recursionDepth); } else if (typedConfig.is(RoundRobin.class)) { serviceConfig = convertRoundRobinConfig(); } else if (typedConfig.is(TypedStruct.class)) { - serviceConfig = convertCustomConfig(typedConfig); + serviceConfig = convertCustomConfig(typedConfig.unpack(TypedStruct.class)); } // TODO: support least_request once it is added to the envoy protos. } catch (InvalidProtocolBufferException e) { @@ -182,7 +181,7 @@ static class LoadBalancingPolicyConverter { } // The service config is expected to have a single root entry, where the name of that entry // is the name of the policy. A Load balancer with this name must exist in the registry. - if (LoadBalancerRegistry.getDefaultRegistry() + if (serviceConfig != null && LoadBalancerRegistry.getDefaultRegistry() .getProvider(Iterables.getOnlyElement(serviceConfig.keySet())) == null) { logger.log(XdsLogLevel.WARNING, "Policy {0} not found in the LB registry, skipping", typedConfig.getTypeUrl()); @@ -200,10 +199,8 @@ static class LoadBalancingPolicyConverter { /** * Converts a ring_hash {@link Any} configuration to service config format. */ - private static ImmutableMap convertRingHashConfig(Any config) + private static ImmutableMap convertRingHashConfig(RingHash ringHash) throws InvalidProtocolBufferException, ResourceInvalidException { - RingHash ringHash = config.unpack(RingHash.class); - // The hash function needs to be validated here as it is not exposed in the returned // configuration for later validation. if (RingHash.HashFunction.XX_HASH != ringHash.getHashFunction()) { @@ -219,9 +216,9 @@ static class LoadBalancingPolicyConverter { /** * Converts a wrr_locality {@link Any} configuration to service config format. */ - private static ImmutableMap convertWrrLocalityConfig(Any config, int recursionDepth) - throws InvalidProtocolBufferException, ResourceInvalidException { - WrrLocality wrrLocality = config.unpack(WrrLocality.class); + private static ImmutableMap convertWrrLocalityConfig(WrrLocality wrrLocality, + int recursionDepth) throws InvalidProtocolBufferException, ResourceInvalidException, + MaxRecursionReachedException { return buildWrrLocalityConfig( convertToServiceConfig(wrrLocality.getEndpointPickingPolicy(), recursionDepth + 1)); } @@ -237,9 +234,8 @@ static class LoadBalancingPolicyConverter { * Converts a custom LB config {@link Any} configuration to service config format. */ @SuppressWarnings("unchecked") - private static ImmutableMap convertCustomConfig(Any config) + private static ImmutableMap convertCustomConfig(TypedStruct configTypedStruct) throws InvalidProtocolBufferException, ResourceInvalidException { - TypedStruct configTypedStruct = config.unpack(TypedStruct.class); Object rawJsonConfig = null; try { rawJsonConfig = JsonParser.parse(JsonFormat.printer().print(configTypedStruct.getValue())); @@ -251,13 +247,17 @@ static class LoadBalancingPolicyConverter { throw new ResourceInvalidException("Custom LB config does not contain a JSON object"); } - return ImmutableMap.of( - configTypedStruct.getTypeUrl().replaceFirst(TYPE_URL_PREFIX_REGEX, ""), - (Map) rawJsonConfig); + String customConfigTypeName = configTypedStruct.getTypeUrl(); + if (customConfigTypeName.contains("/")) { + customConfigTypeName = customConfigTypeName.substring( + customConfigTypeName.lastIndexOf("/") + 1); + } + + return ImmutableMap.of(customConfigTypeName, (Map) rawJsonConfig); } // Used to signal that the LB config goes too deep. - static class MaxRecursionReachedException extends RuntimeException { + static class MaxRecursionReachedException extends Exception { static final long serialVersionUID = 1L; } } diff --git a/xds/src/test/java/io/grpc/xds/ClientXdsClientDataTest.java b/xds/src/test/java/io/grpc/xds/ClientXdsClientDataTest.java index 3ee84721dc4..07da755f057 100644 --- a/xds/src/test/java/io/grpc/xds/ClientXdsClientDataTest.java +++ b/xds/src/test/java/io/grpc/xds/ClientXdsClientDataTest.java @@ -105,6 +105,7 @@ import io.grpc.LoadBalancer; import io.grpc.LoadBalancerRegistry; import io.grpc.Status.Code; +import io.grpc.internal.JsonUtil; import io.grpc.internal.ServiceConfigUtil; import io.grpc.internal.ServiceConfigUtil.LbConfig; import io.grpc.lookup.v1.GrpcKeyBuilder; @@ -1761,9 +1762,8 @@ public void parseCluster_leastRequestLbPolicy_defaultLbConfig() throws ResourceI LoadBalancerRegistry.getDefaultRegistry()); LbConfig lbConfig = ServiceConfigUtil.unwrapLoadBalancingConfig(update.lbPolicyConfig()); assertThat(lbConfig.getPolicyName()).isEqualTo("wrr_locality_experimental"); - @SuppressWarnings("unchecked") List childConfigs = ServiceConfigUtil.unwrapLoadBalancingConfigList( - (List>) lbConfig.getRawConfigValue().get("childPolicy")); + JsonUtil.getListOfObjects(lbConfig.getRawConfigValue(), "childPolicy")); assertThat(childConfigs.get(0).getPolicyName()).isEqualTo("least_request_experimental"); } From 54b007e665a5b40f766fe70c7557d0f5bfd0da8a Mon Sep 17 00:00:00 2001 From: Terry Wilson Date: Fri, 6 May 2022 14:12:12 -0700 Subject: [PATCH 5/6] Fail the whole conversion process on the first conversion failure. --- .../io/grpc/xds/LoadBalancerConfigFactory.java | 11 +++-------- .../grpc/xds/LoadBalancerConfigFactoryTest.java | 16 ++++++++++++---- 2 files changed, 15 insertions(+), 12 deletions(-) diff --git a/xds/src/main/java/io/grpc/xds/LoadBalancerConfigFactory.java b/xds/src/main/java/io/grpc/xds/LoadBalancerConfigFactory.java index 1118d90e958..2f2ad34218b 100644 --- a/xds/src/main/java/io/grpc/xds/LoadBalancerConfigFactory.java +++ b/xds/src/main/java/io/grpc/xds/LoadBalancerConfigFactory.java @@ -171,17 +171,12 @@ static class LoadBalancingPolicyConverter { } // TODO: support least_request once it is added to the envoy protos. } catch (InvalidProtocolBufferException e) { - logger.log(XdsLogLevel.WARNING, "Invalid Any protobuf for policy {0}: {1}", - typedConfig.getTypeUrl(), e.getMessage()); - continue; - } catch (ResourceInvalidException e) { - logger.log(XdsLogLevel.WARNING, "Invalid configuration for policy {0}: {1}", - typedConfig.getTypeUrl(), e.getMessage()); - continue; + throw new ResourceInvalidException( + "Unable to unpack typedConfig for: " + typedConfig.getTypeUrl(), e); } // The service config is expected to have a single root entry, where the name of that entry // is the name of the policy. A Load balancer with this name must exist in the registry. - if (serviceConfig != null && LoadBalancerRegistry.getDefaultRegistry() + if (serviceConfig == null || LoadBalancerRegistry.getDefaultRegistry() .getProvider(Iterables.getOnlyElement(serviceConfig.keySet())) == null) { logger.log(XdsLogLevel.WARNING, "Policy {0} not found in the LB registry, skipping", typedConfig.getTypeUrl()); diff --git a/xds/src/test/java/io/grpc/xds/LoadBalancerConfigFactoryTest.java b/xds/src/test/java/io/grpc/xds/LoadBalancerConfigFactoryTest.java index a455eb4b41d..94e4d259d11 100644 --- a/xds/src/test/java/io/grpc/xds/LoadBalancerConfigFactoryTest.java +++ b/xds/src/test/java/io/grpc/xds/LoadBalancerConfigFactoryTest.java @@ -177,7 +177,7 @@ public void ringHash_invalidHash() { // With the new config mechanism we get a more generic error than with the old one because the // logic loops over potentially multiple configurations and only throws an exception at the // end if there was no valid policies found. - assertThat(e).hasMessageThat().contains("Invalid LoadBalancingPolicy"); + assertThat(e).hasMessageThat().contains("Invalid ring hash function"); return; } fail("ResourceInvalidException not thrown"); @@ -266,7 +266,9 @@ public void complexCustomConfig_customProviderRegistered() throws ResourceInvali LoadBalancerConfigFactory.newConfig(cluster, false))); } - // When a provider for the custom policy is NOT available, we fall back to the next available one. + // When a provider for the custom policy is NOT available, we still fail even if there is another + // round_robin configuration in the list as the wrr_locality the custom config is wrapped in is + // a recognized type and expected to have a valid config. @Test public void complexCustomConfig_customProviderNotRegistered() throws ResourceInvalidException { Cluster cluster = Cluster.newBuilder() @@ -275,8 +277,14 @@ public void complexCustomConfig_customProviderNotRegistered() throws ResourceInv .addPolicies(buildWrrPolicy(ROUND_ROBIN_POLICY))) .build(); - assertValidRoundRobin(ServiceConfigUtil.unwrapLoadBalancingConfig( - LoadBalancerConfigFactory.newConfig(cluster, false))); + try { + ServiceConfigUtil.unwrapLoadBalancingConfig( + LoadBalancerConfigFactory.newConfig(cluster, false)); + } catch (ResourceInvalidException e) { + assertThat(e).hasMessageThat().contains("Invalid LoadBalancingPolicy"); + return; + } + fail("ResourceInvalidException not thrown"); } private void assertValidCustomConfig(LbConfig lbConfig) { From fbfb33e46a7422233c1a41b3d92ffaf629d95df8 Mon Sep 17 00:00:00 2001 From: Terry Wilson Date: Fri, 6 May 2022 14:22:49 -0700 Subject: [PATCH 6/6] Remove unnecessary casts. --- .../io/grpc/xds/ClientXdsClientTestBase.java | 33 +++++++------------ 1 file changed, 12 insertions(+), 21 deletions(-) diff --git a/xds/src/test/java/io/grpc/xds/ClientXdsClientTestBase.java b/xds/src/test/java/io/grpc/xds/ClientXdsClientTestBase.java index 12db1d233e6..86382de9a21 100644 --- a/xds/src/test/java/io/grpc/xds/ClientXdsClientTestBase.java +++ b/xds/src/test/java/io/grpc/xds/ClientXdsClientTestBase.java @@ -1630,9 +1630,8 @@ public void cdsResourceFound() { assertThat(cdsUpdate.edsServiceName()).isNull(); LbConfig lbConfig = ServiceConfigUtil.unwrapLoadBalancingConfig(cdsUpdate.lbPolicyConfig()); assertThat(lbConfig.getPolicyName()).isEqualTo("wrr_locality_experimental"); - @SuppressWarnings("unchecked") List childConfigs = ServiceConfigUtil.unwrapLoadBalancingConfigList( - (List>) lbConfig.getRawConfigValue().get("childPolicy")); + JsonUtil.getListOfObjects(lbConfig.getRawConfigValue(), "childPolicy")); assertThat(childConfigs.get(0).getPolicyName()).isEqualTo("round_robin"); assertThat(cdsUpdate.lrsServerInfo()).isNull(); assertThat(cdsUpdate.maxConcurrentRequests()).isNull(); @@ -1657,9 +1656,8 @@ public void wrappedCdsResource() { assertThat(cdsUpdate.edsServiceName()).isNull(); LbConfig lbConfig = ServiceConfigUtil.unwrapLoadBalancingConfig(cdsUpdate.lbPolicyConfig()); assertThat(lbConfig.getPolicyName()).isEqualTo("wrr_locality_experimental"); - @SuppressWarnings("unchecked") List childConfigs = ServiceConfigUtil.unwrapLoadBalancingConfigList( - (List>) lbConfig.getRawConfigValue().get("childPolicy")); + JsonUtil.getListOfObjects(lbConfig.getRawConfigValue(), "childPolicy")); assertThat(childConfigs.get(0).getPolicyName()).isEqualTo("round_robin"); assertThat(cdsUpdate.lrsServerInfo()).isNull(); assertThat(cdsUpdate.maxConcurrentRequests()).isNull(); @@ -1689,9 +1687,8 @@ public void cdsResourceFound_leastRequestLbPolicy() { assertThat(cdsUpdate.edsServiceName()).isNull(); LbConfig lbConfig = ServiceConfigUtil.unwrapLoadBalancingConfig(cdsUpdate.lbPolicyConfig()); assertThat(lbConfig.getPolicyName()).isEqualTo("wrr_locality_experimental"); - @SuppressWarnings("unchecked") List childConfigs = ServiceConfigUtil.unwrapLoadBalancingConfigList( - (List>) lbConfig.getRawConfigValue().get("childPolicy")); + JsonUtil.getListOfObjects(lbConfig.getRawConfigValue(), "childPolicy")); assertThat(childConfigs.get(0).getPolicyName()).isEqualTo("least_request_experimental"); assertThat(childConfigs.get(0).getRawConfigValue().get("choiceCount")).isEqualTo(3); assertThat(cdsUpdate.lrsServerInfo()).isNull(); @@ -1750,9 +1747,8 @@ public void cdsResponseWithAggregateCluster() { assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.AGGREGATE); LbConfig lbConfig = ServiceConfigUtil.unwrapLoadBalancingConfig(cdsUpdate.lbPolicyConfig()); assertThat(lbConfig.getPolicyName()).isEqualTo("wrr_locality_experimental"); - @SuppressWarnings("unchecked") List childConfigs = ServiceConfigUtil.unwrapLoadBalancingConfigList( - (List>) lbConfig.getRawConfigValue().get("childPolicy")); + JsonUtil.getListOfObjects(lbConfig.getRawConfigValue(), "childPolicy")); assertThat(childConfigs.get(0).getPolicyName()).isEqualTo("round_robin"); assertThat(cdsUpdate.prioritizedClusterNames()).containsExactlyElementsIn(candidates).inOrder(); verifyResourceMetadataAcked(CDS, CDS_RESOURCE, clusterAggregate, VERSION_1, TIME_INCREMENT); @@ -1776,9 +1772,8 @@ public void cdsResponseWithCircuitBreakers() { assertThat(cdsUpdate.edsServiceName()).isNull(); LbConfig lbConfig = ServiceConfigUtil.unwrapLoadBalancingConfig(cdsUpdate.lbPolicyConfig()); assertThat(lbConfig.getPolicyName()).isEqualTo("wrr_locality_experimental"); - @SuppressWarnings("unchecked") List childConfigs = ServiceConfigUtil.unwrapLoadBalancingConfigList( - (List>) lbConfig.getRawConfigValue().get("childPolicy")); + JsonUtil.getListOfObjects(lbConfig.getRawConfigValue(), "childPolicy")); assertThat(childConfigs.get(0).getPolicyName()).isEqualTo("round_robin"); assertThat(cdsUpdate.lrsServerInfo()).isNull(); assertThat(cdsUpdate.maxConcurrentRequests()).isEqualTo(200L); @@ -1925,9 +1920,8 @@ public void cachedCdsResource_data() { assertThat(cdsUpdate.edsServiceName()).isNull(); LbConfig lbConfig = ServiceConfigUtil.unwrapLoadBalancingConfig(cdsUpdate.lbPolicyConfig()); assertThat(lbConfig.getPolicyName()).isEqualTo("wrr_locality_experimental"); - @SuppressWarnings("unchecked") List childConfigs = ServiceConfigUtil.unwrapLoadBalancingConfigList( - (List>) lbConfig.getRawConfigValue().get("childPolicy")); + JsonUtil.getListOfObjects(lbConfig.getRawConfigValue(), "childPolicy")); assertThat(childConfigs.get(0).getPolicyName()).isEqualTo("round_robin"); assertThat(cdsUpdate.lrsServerInfo()).isNull(); assertThat(cdsUpdate.maxConcurrentRequests()).isNull(); @@ -1953,7 +1947,6 @@ public void cachedCdsResource_absent() { } @Test - @SuppressWarnings("unchecked") public void cdsResourceUpdated() { DiscoveryRpcCall call = startResourceWatcher(CDS, CDS_RESOURCE, cdsResourceWatcher); verifyResourceMetadataRequested(CDS, CDS_RESOURCE); @@ -1974,7 +1967,7 @@ public void cdsResourceUpdated() { LbConfig lbConfig = ServiceConfigUtil.unwrapLoadBalancingConfig(cdsUpdate.lbPolicyConfig()); assertThat(lbConfig.getPolicyName()).isEqualTo("wrr_locality_experimental"); List childConfigs = ServiceConfigUtil.unwrapLoadBalancingConfigList( - (List>) lbConfig.getRawConfigValue().get("childPolicy")); + JsonUtil.getListOfObjects(lbConfig.getRawConfigValue(), "childPolicy")); assertThat(childConfigs.get(0).getPolicyName()).isEqualTo("round_robin"); assertThat(cdsUpdate.lrsServerInfo()).isNull(); assertThat(cdsUpdate.maxConcurrentRequests()).isNull(); @@ -1997,7 +1990,7 @@ public void cdsResourceUpdated() { lbConfig = ServiceConfigUtil.unwrapLoadBalancingConfig(cdsUpdate.lbPolicyConfig()); assertThat(lbConfig.getPolicyName()).isEqualTo("wrr_locality_experimental"); childConfigs = ServiceConfigUtil.unwrapLoadBalancingConfigList( - (List>) lbConfig.getRawConfigValue().get("childPolicy")); + JsonUtil.getListOfObjects(lbConfig.getRawConfigValue(), "childPolicy")); assertThat(childConfigs.get(0).getPolicyName()).isEqualTo("round_robin"); assertThat(cdsUpdate.lrsServerInfo()).isEqualTo(lrsServerInfo); assertThat(cdsUpdate.maxConcurrentRequests()).isNull(); @@ -2068,9 +2061,8 @@ public void cdsResourceDeleted() { assertThat(cdsUpdate.edsServiceName()).isNull(); LbConfig lbConfig = ServiceConfigUtil.unwrapLoadBalancingConfig(cdsUpdate.lbPolicyConfig()); assertThat(lbConfig.getPolicyName()).isEqualTo("wrr_locality_experimental"); - @SuppressWarnings("unchecked") List childConfigs = ServiceConfigUtil.unwrapLoadBalancingConfigList( - (List>) lbConfig.getRawConfigValue().get("childPolicy")); + JsonUtil.getListOfObjects(lbConfig.getRawConfigValue(), "childPolicy")); assertThat(childConfigs.get(0).getPolicyName()).isEqualTo("round_robin"); assertThat(cdsUpdate.lrsServerInfo()).isNull(); assertThat(cdsUpdate.maxConcurrentRequests()).isNull(); @@ -2088,7 +2080,6 @@ public void cdsResourceDeleted() { } @Test - @SuppressWarnings("unchecked") public void multipleCdsWatchers() { String cdsResourceTwo = "cluster-bar.googleapis.com"; CdsResourceWatcher watcher1 = mock(CdsResourceWatcher.class); @@ -2127,7 +2118,7 @@ public void multipleCdsWatchers() { LbConfig lbConfig = ServiceConfigUtil.unwrapLoadBalancingConfig(cdsUpdate.lbPolicyConfig()); assertThat(lbConfig.getPolicyName()).isEqualTo("wrr_locality_experimental"); List childConfigs = ServiceConfigUtil.unwrapLoadBalancingConfigList( - (List>) lbConfig.getRawConfigValue().get("childPolicy")); + JsonUtil.getListOfObjects(lbConfig.getRawConfigValue(), "childPolicy")); assertThat(childConfigs.get(0).getPolicyName()).isEqualTo("round_robin"); assertThat(cdsUpdate.lrsServerInfo()).isNull(); assertThat(cdsUpdate.maxConcurrentRequests()).isNull(); @@ -2140,7 +2131,7 @@ public void multipleCdsWatchers() { lbConfig = ServiceConfigUtil.unwrapLoadBalancingConfig(cdsUpdate.lbPolicyConfig()); assertThat(lbConfig.getPolicyName()).isEqualTo("wrr_locality_experimental"); childConfigs = ServiceConfigUtil.unwrapLoadBalancingConfigList( - (List>) lbConfig.getRawConfigValue().get("childPolicy")); + JsonUtil.getListOfObjects(lbConfig.getRawConfigValue(), "childPolicy")); assertThat(childConfigs.get(0).getPolicyName()).isEqualTo("round_robin"); assertThat(cdsUpdate.lrsServerInfo()).isEqualTo(lrsServerInfo); assertThat(cdsUpdate.maxConcurrentRequests()).isNull(); @@ -2153,7 +2144,7 @@ public void multipleCdsWatchers() { lbConfig = ServiceConfigUtil.unwrapLoadBalancingConfig(cdsUpdate.lbPolicyConfig()); assertThat(lbConfig.getPolicyName()).isEqualTo("wrr_locality_experimental"); childConfigs = ServiceConfigUtil.unwrapLoadBalancingConfigList( - (List>) lbConfig.getRawConfigValue().get("childPolicy")); + JsonUtil.getListOfObjects(lbConfig.getRawConfigValue(), "childPolicy")); assertThat(childConfigs.get(0).getPolicyName()).isEqualTo("round_robin"); assertThat(cdsUpdate.lrsServerInfo()).isEqualTo(lrsServerInfo); assertThat(cdsUpdate.maxConcurrentRequests()).isNull();