From b4fe07d22df6e14f8883dbcae082b1433f1c5643 Mon Sep 17 00:00:00 2001 From: Chengyuan Zhang Date: Fri, 16 Apr 2021 12:46:55 -0700 Subject: [PATCH] xds: support ring_hash as the endpoint-level LB policy (#7991) Update LB policy config generation to support ring hash policy as the endpoint-level LB policy. - Changed the CDS LB policy to accept RING_HASH as the endpoint LB policy from CDS updates. This configuration is directly passed to its child policy (aka, ClusterResolverLoadBalancer) in its config. - Changed ClusterResolverLoadBalancer to generate different LB configs for its downstream LB policies, depending on the endpoint-level LB policies. - If the endpoint-level LB policy is ROUND_ROBIN, the downstream LB policy hierarchy is: PriorityLB -> ClusterImplLB -> WeightedTargetLB -> RoundRobinLB - If the endpoin-level LB policy is RNIG_HASH, the downstream LB policy hierarchy is: PriorityLB -> ClusterImplLB -> RingHashLB. --- .../java/io/grpc/xds/CdsLoadBalancer2.java | 19 +- .../java/io/grpc/xds/ClientXdsClient.java | 9 +- .../grpc/xds/ClusterResolverLoadBalancer.java | 102 +++--- .../ClusterResolverLoadBalancerProvider.java | 18 +- xds/src/main/java/io/grpc/xds/XdsClient.java | 29 +- .../io/grpc/xds/CdsLoadBalancer2Test.java | 182 ++++++----- .../io/grpc/xds/ClientXdsClientTestBase.java | 23 +- .../xds/ClusterResolverLoadBalancerTest.java | 294 ++++++++++++------ 8 files changed, 420 insertions(+), 256 deletions(-) diff --git a/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java b/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java index 7ee600998d2..d0286b268de 100644 --- a/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java +++ b/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java @@ -32,9 +32,11 @@ import io.grpc.xds.CdsLoadBalancerProvider.CdsConfig; import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig; import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig.DiscoveryMechanism; +import io.grpc.xds.RingHashLoadBalancer.RingHashConfig; import io.grpc.xds.XdsClient.CdsResourceWatcher; import io.grpc.xds.XdsClient.CdsUpdate; import io.grpc.xds.XdsClient.CdsUpdate.ClusterType; +import io.grpc.xds.XdsClient.CdsUpdate.LbPolicy; import io.grpc.xds.XdsLogger.XdsLogLevel; import io.grpc.xds.XdsSubchannelPickers.ErrorPicker; import java.util.ArrayDeque; @@ -181,15 +183,16 @@ private void handleClusterDiscovered() { helper.updateBalancingState(TRANSIENT_FAILURE, new ErrorPicker(unavailable)); return; } - String endpointPickingPolicy = root.result.lbPolicy(); - LoadBalancerProvider localityPickingLbProvider = - lbRegistry.getProvider(XdsLbPolicies.WEIGHTED_TARGET_POLICY_NAME); // hardcoded - LoadBalancerProvider endpointPickingLbProvider = - lbRegistry.getProvider(endpointPickingPolicy); + LoadBalancerProvider lbProvider = null; + Object lbConfig = null; + if (root.result.lbPolicy() == LbPolicy.RING_HASH) { + lbProvider = lbRegistry.getProvider("ring_hash"); + lbConfig = new RingHashConfig(root.result.minRingSize(), root.result.maxRingSize()); + } else { + lbProvider = lbRegistry.getProvider("round_robin"); + } ClusterResolverConfig config = new ClusterResolverConfig( - Collections.unmodifiableList(instances), - new PolicySelection(localityPickingLbProvider, null /* by cluster_resolver LB policy */), - new PolicySelection(endpointPickingLbProvider, null)); + Collections.unmodifiableList(instances), new PolicySelection(lbProvider, lbConfig)); if (childLb == null) { childLb = lbRegistry.getProvider(CLUSTER_RESOLVER_POLICY_NAME).newLoadBalancer(helper); } diff --git a/xds/src/main/java/io/grpc/xds/ClientXdsClient.java b/xds/src/main/java/io/grpc/xds/ClientXdsClient.java index 3b8e89cf5ba..77678f49367 100644 --- a/xds/src/main/java/io/grpc/xds/ClientXdsClient.java +++ b/xds/src/main/java/io/grpc/xds/ClientXdsClient.java @@ -22,7 +22,6 @@ import com.github.udpa.udpa.type.v1.TypedStruct; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.CaseFormat; import com.google.common.base.Joiner; import com.google.common.base.Stopwatch; import com.google.common.base.Strings; @@ -832,8 +831,6 @@ private static CdsUpdate processCluster(Cluster cluster, Set retainedEds } CdsUpdate.Builder updateBuilder = structOrError.getStruct(); - String lbPolicy = CaseFormat.UPPER_UNDERSCORE.to( - CaseFormat.LOWER_UNDERSCORE, cluster.getLbPolicy().name()); if (cluster.getLbPolicy() == LbPolicy.RING_HASH) { RingHashLbConfig lbConfig = cluster.getRingHashLbConfig(); @@ -841,10 +838,10 @@ private static CdsUpdate processCluster(Cluster cluster, Set retainedEds throw new ResourceInvalidException( "Unsupported ring hash function: " + lbConfig.getHashFunction()); } - updateBuilder.lbPolicy(lbPolicy, lbConfig.getMinimumRingSize().getValue(), - lbConfig.getMaximumRingSize().getValue()); + updateBuilder.lbPolicy(CdsUpdate.LbPolicy.RING_HASH, + lbConfig.getMinimumRingSize().getValue(), lbConfig.getMaximumRingSize().getValue()); } else if (cluster.getLbPolicy() == LbPolicy.ROUND_ROBIN) { - updateBuilder.lbPolicy(lbPolicy); + updateBuilder.lbPolicy(CdsUpdate.LbPolicy.ROUND_ROBIN); } else { throw new ResourceInvalidException("Unsupported lb policy: " + cluster.getLbPolicy()); } diff --git a/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java b/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java index 37dc4e741a8..f29d9321efc 100644 --- a/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java @@ -20,6 +20,7 @@ import static io.grpc.ConnectivityState.CONNECTING; import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; import static io.grpc.xds.XdsLbPolicies.PRIORITY_POLICY_NAME; +import static io.grpc.xds.XdsLbPolicies.WEIGHTED_TARGET_POLICY_NAME; import static io.grpc.xds.XdsSubchannelPickers.BUFFER_PICKER; import com.google.common.annotations.VisibleForTesting; @@ -76,7 +77,8 @@ * used in the downstream LB policies for fine-grained load balancing purposes. */ final class ClusterResolverLoadBalancer extends LoadBalancer { - + // DNS-resolved endpoints do not have the definition of the locality it belongs to, just hardcode + // to an empty locality. private static final Locality LOGICAL_DNS_CLUSTER_LOCALITY = Locality.create("", "", ""); private final XdsLogger logger; private final String authority; @@ -156,12 +158,7 @@ private final class ClusterResolverLbState extends LoadBalancer { private final Helper helper; private final List clusters = new ArrayList<>(); private final Map clusterStates = new HashMap<>(); - // An aggregate cluster is thought of as a cluster that groups the endpoints of the underlying - // clusters together for load balancing purposes only. Load balancing policies (both locality - // level and endpoint level) are configured by the aggregate cluster and apply to all of its - // underlying clusters. - private PolicySelection localityPickingPolicy; - private PolicySelection endpointPickingPolicy; + private PolicySelection endpointLbPolicy; private ResolvedAddresses resolvedAddresses; private LoadBalancer childLb; @@ -175,20 +172,18 @@ public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) { this.resolvedAddresses = resolvedAddresses; ClusterResolverConfig config = (ClusterResolverConfig) resolvedAddresses.getLoadBalancingPolicyConfig(); - localityPickingPolicy = config.localityPickingPolicy; - endpointPickingPolicy = config.endpointPickingPolicy; + endpointLbPolicy = config.lbPolicy; for (DiscoveryMechanism instance : config.discoveryMechanisms) { clusters.add(instance.cluster); ClusterState state; if (instance.type == DiscoveryMechanism.Type.EDS) { state = new EdsClusterState(instance.cluster, instance.edsServiceName, instance.lrsServerName, instance.maxConcurrentRequests, instance.tlsContext); - clusterStates.put(instance.cluster, state); } else { // logical DNS state = new LogicalDnsClusterState(instance.cluster, instance.lrsServerName, instance.maxConcurrentRequests, instance.tlsContext); - clusterStates.put(instance.cluster, state); } + clusterStates.put(instance.cluster, state); state.start(); } } @@ -392,8 +387,11 @@ public void run() { for (LbEndpoint endpoint : localityLbInfo.endpoints()) { if (endpoint.isHealthy()) { discard = false; + long weight = + (long) localityLbInfo.localityWeight() * endpoint.loadBalancingWeight(); Attributes attr = endpoint.eag().getAttributes().toBuilder() - .set(InternalXdsAttributes.ATTR_LOCALITY, locality).build(); + .set(InternalXdsAttributes.ATTR_LOCALITY, locality) + .set(InternalXdsAttributes.ATTR_SERVER_WEIGHT, weight).build(); EquivalentAddressGroup eag = new EquivalentAddressGroup(endpoint.eag().getAddresses(), attr); eag = AddressFilter.setPathFilter( @@ -419,10 +417,10 @@ public void run() { } List priorities = new ArrayList<>(prioritizedLocalityWeights.keySet()); Collections.sort(priorities); - Map priorityChildConfigs = generatePriorityChildConfigs( - name, edsServiceName, lrsServerName, maxConcurrentRequests, tlsContext, - localityPickingPolicy, endpointPickingPolicy, true, lbRegistry, - prioritizedLocalityWeights, dropOverloads); + Map priorityChildConfigs = + generateEdsBasedPriorityChildConfigs( + name, edsServiceName, lrsServerName, maxConcurrentRequests, tlsContext, + endpointLbPolicy, lbRegistry, prioritizedLocalityWeights, dropOverloads); status = Status.OK; resolved = true; result = new ClusterResolutionResult(addresses, priorityChildConfigs, priorities); @@ -532,9 +530,12 @@ public void run() { return; } backoffPolicy = null; // reset backoff sequence if succeeded + // Arbitrary priority notation for all DNS-resolved endpoints. String priorityName = priorityName(name, 0); // value doesn't matter List addresses = new ArrayList<>(); for (EquivalentAddressGroup eag : resolutionResult.getAddresses()) { + // No weight attribute is attached, all endpoint-level LB policy should be able + // to handle such it. Attributes attr = eag.getAttributes().toBuilder().set( InternalXdsAttributes.ATTR_LOCALITY, LOGICAL_DNS_CLUSTER_LOCALITY).build(); eag = new EquivalentAddressGroup(eag.getAddresses(), attr); @@ -542,12 +543,9 @@ public void run() { eag, Arrays.asList(priorityName, LOGICAL_DNS_CLUSTER_LOCALITY.toString())); addresses.add(eag); } - PolicySelection endpointPickingPolicy = - new PolicySelection(lbRegistry.getProvider("pick_first"), null); - PriorityChildConfig priorityChildConfig = generatePriorityChildConfig( + PriorityChildConfig priorityChildConfig = generateDnsBasedPriorityChildConfig( name, edsServiceName, lrsServerName, maxConcurrentRequests, tlsContext, - endpointPickingPolicy, false, lbRegistry, - Collections.emptyList()); + lbRegistry, Collections.emptyList()); status = Status.OK; resolved = true; result = new ClusterResolutionResult(addresses, priorityName, priorityChildConfig); @@ -614,58 +612,74 @@ private static class ClusterResolutionResult { } /** - * Generates the config to be used in the priority LB policy for a single priority. + * Generates the config to be used in the priority LB policy for the single priority of + * logical DNS cluster. * - *

priority LB -> cluster_impl LB -> pick_first + *

priority LB -> cluster_impl LB (single hardcoded priority) -> pick_first */ - private static PriorityChildConfig generatePriorityChildConfig( + private static PriorityChildConfig generateDnsBasedPriorityChildConfig( String cluster, @Nullable String edsServiceName, @Nullable String lrsServerName, @Nullable Long maxConcurrentRequests, @Nullable UpstreamTlsContext tlsContext, - PolicySelection endpointPickingPolicy, boolean ignoreReresolution, LoadBalancerRegistry lbRegistry, List dropOverloads) { + // Override endpoint-level LB policy with pick_first for logical DNS cluster. + PolicySelection endpointLbPolicy = + new PolicySelection(lbRegistry.getProvider("pick_first"), null); ClusterImplConfig clusterImplConfig = new ClusterImplConfig(cluster, edsServiceName, lrsServerName, maxConcurrentRequests, - dropOverloads, endpointPickingPolicy, tlsContext); + dropOverloads, endpointLbPolicy, tlsContext); LoadBalancerProvider clusterImplLbProvider = lbRegistry.getProvider(XdsLbPolicies.CLUSTER_IMPL_POLICY_NAME); PolicySelection clusterImplPolicy = new PolicySelection(clusterImplLbProvider, clusterImplConfig); - return new PriorityChildConfig(clusterImplPolicy, ignoreReresolution); + return new PriorityChildConfig(clusterImplPolicy, false /* ignoreReresolution*/); } /** - * Generates configs to be used in the priority LB policy for priorities in the cluster. + * Generates configs to be used in the priority LB policy for priorities in an EDS cluster. * - *

priority LB -> cluster_impl LB (one per priority) -> weighted_target LB - * -> round_robin (one per locality)) + *

priority LB -> cluster_impl LB (one per priority) -> (weighted_target LB + * -> round_robin (one per locality)) / ring_hash */ - private static Map generatePriorityChildConfigs( + private static Map generateEdsBasedPriorityChildConfigs( String cluster, @Nullable String edsServiceName, @Nullable String lrsServerName, @Nullable Long maxConcurrentRequests, @Nullable UpstreamTlsContext tlsContext, - PolicySelection localityPickingPolicy, PolicySelection endpointPickingPolicy, - boolean ignoreReresolution, LoadBalancerRegistry lbRegistry, + PolicySelection endpointLbPolicy, LoadBalancerRegistry lbRegistry, Map> prioritizedLocalityWeights, List dropOverloads) { Map configs = new HashMap<>(); for (String priority : prioritizedLocalityWeights.keySet()) { - Map localityWeights = prioritizedLocalityWeights.get(priority); - Map targets = new HashMap<>(); - for (Locality locality : localityWeights.keySet()) { - int weight = localityWeights.get(locality); - targets.put(localityName(locality), - new WeightedPolicySelection(weight, endpointPickingPolicy)); + PolicySelection leafPolicy = endpointLbPolicy; + // Depending on the endpoint-level load balancing policy, different LB hierarchy may be + // created. If the endpoint-level LB policy is round_robin, it creates a two-level LB + // hierarchy: a locality-level LB policy that balances load according to locality weights + // followed by an endpoint-level LB policy that simply rounds robin the endpoints within + // the locality. If the endpoint-level LB policy is ring_hash, it creates a unified LB + // policy that balances load by weighing the product of each endpoint's weight and the + // weight of the locality it belongs to. + if (endpointLbPolicy.getProvider().getPolicyName().equals("round_robin")) { + Map localityWeights = prioritizedLocalityWeights.get(priority); + Map targets = new HashMap<>(); + for (Locality locality : localityWeights.keySet()) { + int weight = localityWeights.get(locality); + WeightedPolicySelection target = new WeightedPolicySelection(weight, endpointLbPolicy); + targets.put(localityName(locality), target); + } + LoadBalancerProvider weightedTargetLbProvider = + lbRegistry.getProvider(WEIGHTED_TARGET_POLICY_NAME); + WeightedTargetConfig weightedTargetConfig = + new WeightedTargetConfig(Collections.unmodifiableMap(targets)); + leafPolicy = new PolicySelection(weightedTargetLbProvider, weightedTargetConfig); } - PolicySelection localityPicking = new PolicySelection( - localityPickingPolicy.getProvider(), - new WeightedTargetConfig(Collections.unmodifiableMap(targets))); ClusterImplConfig clusterImplConfig = new ClusterImplConfig(cluster, edsServiceName, lrsServerName, maxConcurrentRequests, - dropOverloads, localityPicking, tlsContext); + dropOverloads, leafPolicy, tlsContext); LoadBalancerProvider clusterImplLbProvider = lbRegistry.getProvider(XdsLbPolicies.CLUSTER_IMPL_POLICY_NAME); PolicySelection clusterImplPolicy = new PolicySelection(clusterImplLbProvider, clusterImplConfig); - configs.put(priority, new PriorityChildConfig(clusterImplPolicy, ignoreReresolution)); + PriorityChildConfig priorityChildConfig = + new PriorityChildConfig(clusterImplPolicy, true /* ignoreReresolution */); + configs.put(priority, priorityChildConfig); } return configs; } diff --git a/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancerProvider.java b/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancerProvider.java index 9442e88e12a..e62c70cb5ec 100644 --- a/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancerProvider.java +++ b/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancerProvider.java @@ -67,19 +67,17 @@ public LoadBalancer newLoadBalancer(Helper helper) { static final class ClusterResolverConfig { // Ordered list of clusters to be resolved. final List discoveryMechanisms; - final PolicySelection localityPickingPolicy; - final PolicySelection endpointPickingPolicy; + // Endpoint-level load balancing policy with config (round_robin or ring_hash). + final PolicySelection lbPolicy; - ClusterResolverConfig(List discoveryMechanisms, - PolicySelection localityPickingPolicy, PolicySelection endpointPickingPolicy) { + ClusterResolverConfig(List discoveryMechanisms, PolicySelection lbPolicy) { this.discoveryMechanisms = checkNotNull(discoveryMechanisms, "discoveryMechanisms"); - this.localityPickingPolicy = checkNotNull(localityPickingPolicy, "localityPickingPolicy"); - this.endpointPickingPolicy = checkNotNull(endpointPickingPolicy, "endpointPickingPolicy"); + this.lbPolicy = checkNotNull(lbPolicy, "lbPolicy"); } @Override public int hashCode() { - return Objects.hash(discoveryMechanisms, localityPickingPolicy, endpointPickingPolicy); + return Objects.hash(discoveryMechanisms, lbPolicy); } @Override @@ -92,16 +90,14 @@ public boolean equals(Object o) { } ClusterResolverConfig that = (ClusterResolverConfig) o; return discoveryMechanisms.equals(that.discoveryMechanisms) - && localityPickingPolicy.equals(that.localityPickingPolicy) - && endpointPickingPolicy.equals(that.endpointPickingPolicy); + && lbPolicy.equals(that.lbPolicy); } @Override public String toString() { return MoreObjects.toStringHelper(this) .add("discoveryMechanisms", discoveryMechanisms) - .add("localityPickingPolicy", localityPickingPolicy) - .add("endpointPickingPolicy", endpointPickingPolicy) + .add("lbPolicy", lbPolicy) .toString(); } diff --git a/xds/src/main/java/io/grpc/xds/XdsClient.java b/xds/src/main/java/io/grpc/xds/XdsClient.java index 73c57e7a023..37c07eb1bcf 100644 --- a/xds/src/main/java/io/grpc/xds/XdsClient.java +++ b/xds/src/main/java/io/grpc/xds/XdsClient.java @@ -178,7 +178,7 @@ abstract static class CdsUpdate implements ResourceUpdate { abstract ClusterType clusterType(); // Endpoint-level load balancing policy. - abstract String lbPolicy(); + abstract LbPolicy lbPolicy(); // Only valid if lbPolicy is "ring_hash". abstract long minRingSize(); @@ -251,6 +251,10 @@ enum ClusterType { EDS, LOGICAL_DNS, AGGREGATE } + enum LbPolicy { + ROUND_ROBIN, RING_HASH + } + // FIXME(chengyuanzhang): delete this after UpstreamTlsContext's toString() is fixed. @Override public final String toString() { @@ -270,38 +274,37 @@ public final String toString() { @AutoValue.Builder abstract static class Builder { - // Private do not use. + // Private, use one of the static factory methods instead. protected abstract Builder clusterName(String clusterName); - // Private do not use. + // Private, use one of the static factory methods instead. protected abstract Builder clusterType(ClusterType clusterType); - // Private do not use. - protected abstract Builder lbPolicy(String lbPolicy); + abstract Builder lbPolicy(LbPolicy lbPolicy); - Builder lbPolicy(String lbPolicy, long minRingSize, long maxRingSize) { + Builder lbPolicy(LbPolicy lbPolicy, long minRingSize, long maxRingSize) { return this.lbPolicy(lbPolicy).minRingSize(minRingSize).maxRingSize(maxRingSize); } - // Private do not use. + // Private, use lbPolicy(LbPolicy, long, long). protected abstract Builder minRingSize(long minRingSize); - // Private do not use. + // Private, use lbPolicy(.LbPolicy, long, long) protected abstract Builder maxRingSize(long maxRingSize); - // Private do not use. + // Private, use CdsUpdate.forEds() instead. protected abstract Builder edsServiceName(String edsServiceName); - // Private do not use. + // Private, use one of the static factory methods instead. protected abstract Builder lrsServerName(String lrsServerName); - // Private do not use. + // Private, use one of the static factory methods instead. protected abstract Builder maxConcurrentRequests(Long maxConcurrentRequests); - // Private do not use. + // Private, use one of the static factory methods instead. protected abstract Builder upstreamTlsContext(UpstreamTlsContext upstreamTlsContext); - // Private do not use. + // Private, use CdsUpdate.forAggregate() instead. protected abstract Builder prioritizedClusterNames(List prioritizedClusterNames); abstract CdsUpdate build(); diff --git a/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java b/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java index 9a3aa2934d6..2287b8fc1b2 100644 --- a/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java +++ b/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java @@ -18,7 +18,6 @@ import static com.google.common.truth.Truth.assertThat; import static io.grpc.xds.XdsLbPolicies.CLUSTER_RESOLVER_POLICY_NAME; -import static io.grpc.xds.XdsLbPolicies.WEIGHTED_TARGET_POLICY_NAME; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; @@ -47,6 +46,9 @@ import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig; import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig.DiscoveryMechanism; import io.grpc.xds.EnvoyServerProtoData.UpstreamTlsContext; +import io.grpc.xds.RingHashLoadBalancer.RingHashConfig; +import io.grpc.xds.XdsClient.CdsUpdate; +import io.grpc.xds.XdsClient.CdsUpdate.LbPolicy; import io.grpc.xds.internal.sds.CommonTlsContextTestsUtil; import java.util.ArrayList; import java.util.Arrays; @@ -118,8 +120,8 @@ public void setUp() { when(helper.getSynchronizationContext()).thenReturn(syncContext); lbRegistry.register(new FakeLoadBalancerProvider(CLUSTER_RESOLVER_POLICY_NAME)); - lbRegistry.register(new FakeLoadBalancerProvider(WEIGHTED_TARGET_POLICY_NAME)); lbRegistry.register(new FakeLoadBalancerProvider("round_robin")); + lbRegistry.register(new FakeLoadBalancerProvider("ring_hash")); loadBalancer = new CdsLoadBalancer2(helper, lbRegistry); loadBalancer.handleResolvedAddresses( ResolvedAddresses.newBuilder() @@ -144,8 +146,10 @@ public void tearDown() { @Test public void discoverTopLevelEdsCluster() { - xdsClient.deliverEdsCluster(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_NAME, 100L, - upstreamTlsContext); + CdsUpdate update = + CdsUpdate.forEds(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_NAME, 100L, upstreamTlsContext) + .lbPolicy(LbPolicy.ROUND_ROBIN).build(); + xdsClient.deliverCdsUpdate(CLUSTER, update); assertThat(childBalancers).hasSize(1); FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); assertThat(childBalancer.name).isEqualTo(CLUSTER_RESOLVER_POLICY_NAME); @@ -154,15 +158,15 @@ public void discoverTopLevelEdsCluster() { DiscoveryMechanism instance = Iterables.getOnlyElement(childLbConfig.discoveryMechanisms); assertDiscoveryMechanism(instance, CLUSTER, DiscoveryMechanism.Type.EDS, EDS_SERVICE_NAME, LRS_SERVER_NAME, 100L, upstreamTlsContext); - assertThat(childLbConfig.localityPickingPolicy.getProvider().getPolicyName()) - .isEqualTo(WEIGHTED_TARGET_POLICY_NAME); - assertThat(childLbConfig.endpointPickingPolicy.getProvider().getPolicyName()) - .isEqualTo("round_robin"); + assertThat(childLbConfig.lbPolicy.getProvider().getPolicyName()).isEqualTo("round_robin"); } @Test public void discoverTopLevelLogicalDnsCluster() { - xdsClient.deliverLogicalDnsCluster(CLUSTER, LRS_SERVER_NAME, 100L, upstreamTlsContext); + CdsUpdate update = + CdsUpdate.forLogicalDns(CLUSTER, LRS_SERVER_NAME, 100L, upstreamTlsContext) + .lbPolicy(LbPolicy.ROUND_ROBIN).build(); + xdsClient.deliverCdsUpdate(CLUSTER, update); assertThat(childBalancers).hasSize(1); FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); assertThat(childBalancer.name).isEqualTo(CLUSTER_RESOLVER_POLICY_NAME); @@ -171,10 +175,7 @@ public void discoverTopLevelLogicalDnsCluster() { DiscoveryMechanism instance = Iterables.getOnlyElement(childLbConfig.discoveryMechanisms); assertDiscoveryMechanism(instance, CLUSTER, DiscoveryMechanism.Type.LOGICAL_DNS, null, LRS_SERVER_NAME, 100L, upstreamTlsContext); - assertThat(childLbConfig.localityPickingPolicy.getProvider().getPolicyName()) - .isEqualTo(WEIGHTED_TARGET_POLICY_NAME); - assertThat(childLbConfig.endpointPickingPolicy.getProvider().getPolicyName()) - .isEqualTo("round_robin"); + assertThat(childLbConfig.lbPolicy.getProvider().getPolicyName()).isEqualTo("round_robin"); } @Test @@ -189,7 +190,10 @@ public void nonAggregateCluster_resourceNotExist_returnErrorPicker() { @Test public void nonAggregateCluster_resourceUpdate() { - xdsClient.deliverEdsCluster(CLUSTER, null, null, 100L, upstreamTlsContext); + CdsUpdate update = + CdsUpdate.forEds(CLUSTER, null, null, 100L, upstreamTlsContext) + .lbPolicy(LbPolicy.ROUND_ROBIN).build(); + xdsClient.deliverCdsUpdate(CLUSTER, update); assertThat(childBalancers).hasSize(1); FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); ClusterResolverConfig childLbConfig = (ClusterResolverConfig) childBalancer.config; @@ -197,7 +201,9 @@ public void nonAggregateCluster_resourceUpdate() { assertDiscoveryMechanism(instance, CLUSTER, DiscoveryMechanism.Type.EDS, null, null, 100L, upstreamTlsContext); - xdsClient.deliverEdsCluster(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_NAME, 200L, null); + update = CdsUpdate.forEds(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_NAME, 200L, null) + .lbPolicy(LbPolicy.ROUND_ROBIN).build(); + xdsClient.deliverCdsUpdate(CLUSTER, update); childLbConfig = (ClusterResolverConfig) childBalancer.config; instance = Iterables.getOnlyElement(childLbConfig.discoveryMechanisms); assertDiscoveryMechanism(instance, CLUSTER, DiscoveryMechanism.Type.EDS, EDS_SERVICE_NAME, @@ -206,7 +212,10 @@ public void nonAggregateCluster_resourceUpdate() { @Test public void nonAggregateCluster_resourceRevoked() { - xdsClient.deliverLogicalDnsCluster(CLUSTER, null, 100L, upstreamTlsContext); + CdsUpdate update = + CdsUpdate.forLogicalDns(CLUSTER, null, 100L, upstreamTlsContext) + .lbPolicy(LbPolicy.ROUND_ROBIN).build(); + xdsClient.deliverCdsUpdate(CLUSTER, update); assertThat(childBalancers).hasSize(1); FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); ClusterResolverConfig childLbConfig = (ClusterResolverConfig) childBalancer.config; @@ -225,27 +234,40 @@ public void nonAggregateCluster_resourceRevoked() { } @Test - public void discoveryAggregateCluster() { + public void discoverAggregateCluster() { String cluster1 = "cluster-01.googleapis.com"; String cluster2 = "cluster-02.googleapis.com"; // CLUSTER (aggr.) -> [cluster1 (aggr.), cluster2 (logical DNS)] - xdsClient.deliverAggregateCluster(CLUSTER, Arrays.asList(cluster1, cluster2)); + CdsUpdate update = + CdsUpdate.forAggregate(CLUSTER, Arrays.asList(cluster1, cluster2)) + .lbPolicy(LbPolicy.RING_HASH, 100L, 1000L).build(); + xdsClient.deliverCdsUpdate(CLUSTER, update); assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster1, cluster2); assertThat(childBalancers).isEmpty(); String cluster3 = "cluster-03.googleapis.com"; String cluster4 = "cluster-04.googleapis.com"; // cluster1 (aggr.) -> [cluster3 (EDS), cluster4 (EDS)] - xdsClient.deliverAggregateCluster(cluster1, Arrays.asList(cluster3, cluster4)); + CdsUpdate update1 = + CdsUpdate.forAggregate(cluster1, Arrays.asList(cluster3, cluster4)) + .lbPolicy(LbPolicy.ROUND_ROBIN).build(); + xdsClient.deliverCdsUpdate(cluster1, update1); assertThat(xdsClient.watchers.keySet()).containsExactly( CLUSTER, cluster1, cluster2, cluster3, cluster4); assertThat(childBalancers).isEmpty(); - xdsClient.deliverEdsCluster(cluster3, EDS_SERVICE_NAME, LRS_SERVER_NAME, 200L, - upstreamTlsContext); + CdsUpdate update3 = + CdsUpdate.forEds(cluster3, EDS_SERVICE_NAME, LRS_SERVER_NAME, 200L, upstreamTlsContext) + .lbPolicy(LbPolicy.ROUND_ROBIN).build(); + xdsClient.deliverCdsUpdate(cluster3, update3); assertThat(childBalancers).isEmpty(); - xdsClient.deliverLogicalDnsCluster(cluster2, null, 100L, null); + CdsUpdate update2 = + CdsUpdate.forLogicalDns(cluster2, null, 100L, null) + .lbPolicy(LbPolicy.ROUND_ROBIN).build(); + xdsClient.deliverCdsUpdate(cluster2, update2); assertThat(childBalancers).isEmpty(); - xdsClient.deliverEdsCluster(cluster4, null, LRS_SERVER_NAME, 300L, - null); + CdsUpdate update4 = + CdsUpdate.forEds(cluster4, null, LRS_SERVER_NAME, 300L, null) + .lbPolicy(LbPolicy.ROUND_ROBIN).build(); + xdsClient.deliverCdsUpdate(cluster4, update4); assertThat(childBalancers).hasSize(1); // all non-aggregate clusters discovered FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); assertThat(childBalancer.name).isEqualTo(CLUSTER_RESOLVER_POLICY_NAME); @@ -258,17 +280,20 @@ public void discoveryAggregateCluster() { DiscoveryMechanism.Type.EDS, EDS_SERVICE_NAME, LRS_SERVER_NAME, 200L, upstreamTlsContext); assertDiscoveryMechanism(childLbConfig.discoveryMechanisms.get(2), cluster4, DiscoveryMechanism.Type.EDS, null, LRS_SERVER_NAME, 300L, null); - assertThat(childLbConfig.localityPickingPolicy.getProvider().getPolicyName()) - .isEqualTo(WEIGHTED_TARGET_POLICY_NAME); - assertThat(childLbConfig.endpointPickingPolicy.getProvider().getPolicyName()) - .isEqualTo("round_robin"); + assertThat(childLbConfig.lbPolicy.getProvider().getPolicyName()) + .isEqualTo("ring_hash"); // dominated by top-level cluster's config + assertThat(((RingHashConfig) childLbConfig.lbPolicy.getConfig()).minRingSize).isEqualTo(100L); + assertThat(((RingHashConfig) childLbConfig.lbPolicy.getConfig()).maxRingSize).isEqualTo(1000L); } @Test public void aggregateCluster_noNonAggregateClusterExits_returnErrorPicker() { String cluster1 = "cluster-01.googleapis.com"; // CLUSTER (aggr.) -> [cluster1 (EDS)] - xdsClient.deliverAggregateCluster(CLUSTER, Collections.singletonList(cluster1)); + CdsUpdate update = + CdsUpdate.forAggregate(CLUSTER, Collections.singletonList(cluster1)) + .lbPolicy(LbPolicy.ROUND_ROBIN).build(); + xdsClient.deliverCdsUpdate(CLUSTER, update); assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster1); xdsClient.deliverResourceNotExist(cluster1); verify(helper).updateBalancingState( @@ -283,11 +308,19 @@ public void aggregateCluster_descendantClustersRevoked() { String cluster1 = "cluster-01.googleapis.com"; String cluster2 = "cluster-02.googleapis.com"; // CLUSTER (aggr.) -> [cluster1 (EDS), cluster2 (logical DNS)] - xdsClient.deliverAggregateCluster(CLUSTER, Arrays.asList(cluster1, cluster2)); + CdsUpdate update = + CdsUpdate.forAggregate(CLUSTER, Arrays.asList(cluster1, cluster2)) + .lbPolicy(LbPolicy.ROUND_ROBIN).build(); + xdsClient.deliverCdsUpdate(CLUSTER, update); assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster1, cluster2); - xdsClient.deliverLogicalDnsCluster(cluster2, LRS_SERVER_NAME, 100L, null); - xdsClient.deliverEdsCluster(cluster1, EDS_SERVICE_NAME, LRS_SERVER_NAME, 200L, - upstreamTlsContext); + CdsUpdate update1 = + CdsUpdate.forEds(cluster1, EDS_SERVICE_NAME, LRS_SERVER_NAME, 200L, upstreamTlsContext) + .lbPolicy(LbPolicy.ROUND_ROBIN).build(); + xdsClient.deliverCdsUpdate(cluster1, update1); + CdsUpdate update2 = + CdsUpdate.forLogicalDns(cluster2, LRS_SERVER_NAME, 100L, null) + .lbPolicy(LbPolicy.ROUND_ROBIN).build(); + xdsClient.deliverCdsUpdate(cluster2, update2); FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); ClusterResolverConfig childLbConfig = (ClusterResolverConfig) childBalancer.config; assertThat(childLbConfig.discoveryMechanisms).hasSize(2); @@ -321,11 +354,19 @@ public void aggregateCluster_rootClusterRevoked() { String cluster1 = "cluster-01.googleapis.com"; String cluster2 = "cluster-02.googleapis.com"; // CLUSTER (aggr.) -> [cluster1 (EDS), cluster2 (logical DNS)] - xdsClient.deliverAggregateCluster(CLUSTER, Arrays.asList(cluster1, cluster2)); + CdsUpdate update = + CdsUpdate.forAggregate(CLUSTER, Arrays.asList(cluster1, cluster2)) + .lbPolicy(LbPolicy.ROUND_ROBIN).build(); + xdsClient.deliverCdsUpdate(CLUSTER, update); assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster1, cluster2); - xdsClient.deliverLogicalDnsCluster(cluster2, LRS_SERVER_NAME, 100L, null); - xdsClient.deliverEdsCluster(cluster1, EDS_SERVICE_NAME, LRS_SERVER_NAME, 200L, - upstreamTlsContext); + CdsUpdate update1 = + CdsUpdate.forEds(cluster1, EDS_SERVICE_NAME, LRS_SERVER_NAME, 200L, upstreamTlsContext) + .lbPolicy(LbPolicy.ROUND_ROBIN).build(); + xdsClient.deliverCdsUpdate(cluster1, update1); + CdsUpdate update2 = + CdsUpdate.forLogicalDns(cluster2, LRS_SERVER_NAME, 100L, null) + .lbPolicy(LbPolicy.ROUND_ROBIN).build(); + xdsClient.deliverCdsUpdate(cluster2, update2); FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); ClusterResolverConfig childLbConfig = (ClusterResolverConfig) childBalancer.config; assertThat(childLbConfig.discoveryMechanisms).hasSize(2); @@ -349,20 +390,31 @@ public void aggregateCluster_rootClusterRevoked() { public void aggregateCluster_intermediateClusterChanges() { String cluster1 = "cluster-01.googleapis.com"; // CLUSTER (aggr.) -> [cluster1] - xdsClient.deliverAggregateCluster(CLUSTER, Collections.singletonList(cluster1)); + CdsUpdate update = + CdsUpdate.forAggregate(CLUSTER, Collections.singletonList(cluster1)) + .lbPolicy(LbPolicy.ROUND_ROBIN).build(); + xdsClient.deliverCdsUpdate(CLUSTER, update); assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster1); // CLUSTER (aggr.) -> [cluster2 (aggr.)] String cluster2 = "cluster-02.googleapis.com"; - xdsClient.deliverAggregateCluster(CLUSTER, Collections.singletonList(cluster2)); + update = + CdsUpdate.forAggregate(CLUSTER, Collections.singletonList(cluster2)) + .lbPolicy(LbPolicy.ROUND_ROBIN).build(); + xdsClient.deliverCdsUpdate(CLUSTER, update); assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster2); // cluster2 (aggr.) -> [cluster3 (EDS)] String cluster3 = "cluster-03.googleapis.com"; - xdsClient.deliverAggregateCluster(cluster2, Collections.singletonList(cluster3)); + CdsUpdate update2 = + CdsUpdate.forAggregate(cluster2, Collections.singletonList(cluster3)) + .lbPolicy(LbPolicy.ROUND_ROBIN).build(); + xdsClient.deliverCdsUpdate(cluster2, update2); assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster2, cluster3); - xdsClient.deliverEdsCluster(cluster3, EDS_SERVICE_NAME, LRS_SERVER_NAME, 100L, - upstreamTlsContext); + CdsUpdate update3 = + CdsUpdate.forEds(cluster3, EDS_SERVICE_NAME, LRS_SERVER_NAME, 100L, upstreamTlsContext) + .lbPolicy(LbPolicy.ROUND_ROBIN).build(); + xdsClient.deliverCdsUpdate(cluster3, update3); FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); ClusterResolverConfig childLbConfig = (ClusterResolverConfig) childBalancer.config; assertThat(childLbConfig.discoveryMechanisms).hasSize(1); @@ -386,7 +438,10 @@ public void aggregateCluster_intermediateClusterChanges() { public void aggregateCluster_discoveryErrorBeforeChildLbCreated_returnErrorPicker() { String cluster1 = "cluster-01.googleapis.com"; // CLUSTER (aggr.) -> [cluster1] - xdsClient.deliverAggregateCluster(CLUSTER, Collections.singletonList(cluster1)); + CdsUpdate update = + CdsUpdate.forAggregate(CLUSTER, Collections.singletonList(cluster1)) + .lbPolicy(LbPolicy.ROUND_ROBIN).build(); + xdsClient.deliverCdsUpdate(CLUSTER, update); assertThat(xdsClient.watchers.keySet()).containsExactly(CLUSTER, cluster1); Status error = Status.RESOURCE_EXHAUSTED.withDescription("OOM"); xdsClient.deliverError(error); @@ -400,8 +455,14 @@ public void aggregateCluster_discoveryErrorBeforeChildLbCreated_returnErrorPicke public void aggregateCluster_discoveryErrorAfterChildLbCreated_propagateToChildLb() { String cluster1 = "cluster-01.googleapis.com"; // CLUSTER (aggr.) -> [cluster1 (logical DNS)] - xdsClient.deliverAggregateCluster(CLUSTER, Collections.singletonList(cluster1)); - xdsClient.deliverLogicalDnsCluster(cluster1, LRS_SERVER_NAME, 200L, null); + CdsUpdate update = + CdsUpdate.forAggregate(CLUSTER, Collections.singletonList(cluster1)) + .lbPolicy(LbPolicy.ROUND_ROBIN).build(); + xdsClient.deliverCdsUpdate(CLUSTER, update); + CdsUpdate update1 = + CdsUpdate.forLogicalDns(cluster1, LRS_SERVER_NAME, 200L, null) + .lbPolicy(LbPolicy.ROUND_ROBIN).build(); + xdsClient.deliverCdsUpdate(cluster1, update1); FakeLoadBalancer childLb = Iterables.getOnlyElement(childBalancers); ClusterResolverConfig childLbConfig = (ClusterResolverConfig) childLb.config; assertThat(childLbConfig.discoveryMechanisms).hasSize(1); @@ -423,8 +484,10 @@ public void handleNameResolutionErrorFromUpstream_beforeChildLbCreated_returnErr @Test public void handleNameResolutionErrorFromUpstream_afterChildLbCreated_fallThrough() { - xdsClient.deliverEdsCluster(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_NAME, 100L, - upstreamTlsContext); + CdsUpdate update = + CdsUpdate.forEds(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_NAME, 100L, upstreamTlsContext) + .lbPolicy(LbPolicy.ROUND_ROBIN).build(); + xdsClient.deliverCdsUpdate(CLUSTER, update); FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); assertThat(childBalancer.shutdown).isFalse(); loadBalancer.handleNameResolutionError(Status.UNAVAILABLE.withDescription("unreachable")); @@ -541,31 +604,8 @@ void cancelCdsResourceWatch(String resourceName, CdsResourceWatcher watcher) { watchers.remove(resourceName); } - private void deliverEdsCluster(String clusterName, @Nullable String edsServiceName, - @Nullable String lrsServerName, @Nullable Long maxConcurrentRequests, - @Nullable UpstreamTlsContext tlsContext) { - if (watchers.containsKey(clusterName)) { - CdsUpdate update = CdsUpdate.forEds( - clusterName, edsServiceName, lrsServerName, maxConcurrentRequests, tlsContext) - .lbPolicy("round_robin").build(); - watchers.get(clusterName).onChanged(update); - } - } - - private void deliverLogicalDnsCluster(String clusterName, @Nullable String lrsServerName, - @Nullable Long maxConcurrentRequests, @Nullable UpstreamTlsContext tlsContext) { - if (watchers.containsKey(clusterName)) { - CdsUpdate update = CdsUpdate.forLogicalDns( - clusterName, lrsServerName, maxConcurrentRequests, tlsContext) - .lbPolicy("round_robin").build(); - watchers.get(clusterName).onChanged(update); - } - } - - private void deliverAggregateCluster(String clusterName, List clusters) { + private void deliverCdsUpdate(String clusterName, CdsUpdate update) { if (watchers.containsKey(clusterName)) { - CdsUpdate update = CdsUpdate.forAggregate(clusterName, clusters) - .lbPolicy("round_robin").build(); watchers.get(clusterName).onChanged(update); } } diff --git a/xds/src/test/java/io/grpc/xds/ClientXdsClientTestBase.java b/xds/src/test/java/io/grpc/xds/ClientXdsClientTestBase.java index 692bf9ec9e3..20a9da63079 100644 --- a/xds/src/test/java/io/grpc/xds/ClientXdsClientTestBase.java +++ b/xds/src/test/java/io/grpc/xds/ClientXdsClientTestBase.java @@ -62,6 +62,7 @@ import io.grpc.xds.XdsClient.CdsResourceWatcher; import io.grpc.xds.XdsClient.CdsUpdate; import io.grpc.xds.XdsClient.CdsUpdate.ClusterType; +import io.grpc.xds.XdsClient.CdsUpdate.LbPolicy; import io.grpc.xds.XdsClient.EdsResourceWatcher; import io.grpc.xds.XdsClient.EdsUpdate; import io.grpc.xds.XdsClient.LdsResourceWatcher; @@ -1132,7 +1133,7 @@ public void cdsResourceFound() { assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE); assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS); assertThat(cdsUpdate.edsServiceName()).isNull(); - assertThat(cdsUpdate.lbPolicy()).isEqualTo("round_robin"); + assertThat(cdsUpdate.lbPolicy()).isEqualTo(LbPolicy.ROUND_ROBIN); assertThat(cdsUpdate.lrsServerName()).isNull(); assertThat(cdsUpdate.maxConcurrentRequests()).isNull(); assertThat(cdsUpdate.upstreamTlsContext()).isNull(); @@ -1157,7 +1158,7 @@ public void cdsResourceFound_ringHashLbPolicy() { assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE); assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS); assertThat(cdsUpdate.edsServiceName()).isNull(); - assertThat(cdsUpdate.lbPolicy()).isEqualTo("ring_hash"); + assertThat(cdsUpdate.lbPolicy()).isEqualTo(LbPolicy.RING_HASH); assertThat(cdsUpdate.minRingSize()).isEqualTo(10L); assertThat(cdsUpdate.maxRingSize()).isEqualTo(100L); assertThat(cdsUpdate.lrsServerName()).isNull(); @@ -1183,7 +1184,7 @@ public void cdsResponseWithAggregateCluster() { CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue(); assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE); assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.AGGREGATE); - assertThat(cdsUpdate.lbPolicy()).isEqualTo("round_robin"); + assertThat(cdsUpdate.lbPolicy()).isEqualTo(LbPolicy.ROUND_ROBIN); assertThat(cdsUpdate.prioritizedClusterNames()).containsExactlyElementsIn(candidates).inOrder(); verifyResourceMetadataAcked(CDS, CDS_RESOURCE, clusterAggregate, VERSION_1, TIME_INCREMENT); verifySubscribedResourcesMetadataSizes(0, 1, 0, 0); @@ -1204,7 +1205,7 @@ public void cdsResponseWithCircuitBreakers() { assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE); assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS); assertThat(cdsUpdate.edsServiceName()).isNull(); - assertThat(cdsUpdate.lbPolicy()).isEqualTo("round_robin"); + assertThat(cdsUpdate.lbPolicy()).isEqualTo(LbPolicy.ROUND_ROBIN); assertThat(cdsUpdate.lrsServerName()).isNull(); assertThat(cdsUpdate.maxConcurrentRequests()).isEqualTo(200L); assertThat(cdsUpdate.upstreamTlsContext()).isNull(); @@ -1268,7 +1269,7 @@ public void cachedCdsResource_data() { assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE); assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS); assertThat(cdsUpdate.edsServiceName()).isNull(); - assertThat(cdsUpdate.lbPolicy()).isEqualTo("round_robin"); + assertThat(cdsUpdate.lbPolicy()).isEqualTo(LbPolicy.ROUND_ROBIN); assertThat(cdsUpdate.lrsServerName()).isNull(); assertThat(cdsUpdate.maxConcurrentRequests()).isNull(); assertThat(cdsUpdate.upstreamTlsContext()).isNull(); @@ -1306,7 +1307,7 @@ public void cdsResourceUpdated() { CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue(); assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE); assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.LOGICAL_DNS); - assertThat(cdsUpdate.lbPolicy()).isEqualTo("round_robin"); + assertThat(cdsUpdate.lbPolicy()).isEqualTo(LbPolicy.ROUND_ROBIN); assertThat(cdsUpdate.lrsServerName()).isNull(); assertThat(cdsUpdate.maxConcurrentRequests()).isNull(); assertThat(cdsUpdate.upstreamTlsContext()).isNull(); @@ -1323,7 +1324,7 @@ public void cdsResourceUpdated() { assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE); assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS); assertThat(cdsUpdate.edsServiceName()).isEqualTo(edsService); - assertThat(cdsUpdate.lbPolicy()).isEqualTo("round_robin"); + assertThat(cdsUpdate.lbPolicy()).isEqualTo(LbPolicy.ROUND_ROBIN); assertThat(cdsUpdate.lrsServerName()).isEqualTo(""); assertThat(cdsUpdate.maxConcurrentRequests()).isNull(); assertThat(cdsUpdate.upstreamTlsContext()).isNull(); @@ -1344,7 +1345,7 @@ public void cdsResourceDeleted() { assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE); assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS); assertThat(cdsUpdate.edsServiceName()).isNull(); - assertThat(cdsUpdate.lbPolicy()).isEqualTo("round_robin"); + assertThat(cdsUpdate.lbPolicy()).isEqualTo(LbPolicy.ROUND_ROBIN); assertThat(cdsUpdate.lrsServerName()).isNull(); assertThat(cdsUpdate.maxConcurrentRequests()).isNull(); assertThat(cdsUpdate.upstreamTlsContext()).isNull(); @@ -1392,7 +1393,7 @@ public void multipleCdsWatchers() { CdsUpdate cdsUpdate = cdsUpdateCaptor.getValue(); assertThat(cdsUpdate.clusterName()).isEqualTo(CDS_RESOURCE); assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.LOGICAL_DNS); - assertThat(cdsUpdate.lbPolicy()).isEqualTo("round_robin"); + assertThat(cdsUpdate.lbPolicy()).isEqualTo(LbPolicy.ROUND_ROBIN); assertThat(cdsUpdate.lrsServerName()).isNull(); assertThat(cdsUpdate.maxConcurrentRequests()).isNull(); assertThat(cdsUpdate.upstreamTlsContext()).isNull(); @@ -1401,7 +1402,7 @@ public void multipleCdsWatchers() { assertThat(cdsUpdate.clusterName()).isEqualTo(cdsResourceTwo); assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS); assertThat(cdsUpdate.edsServiceName()).isEqualTo(edsService); - assertThat(cdsUpdate.lbPolicy()).isEqualTo("round_robin"); + assertThat(cdsUpdate.lbPolicy()).isEqualTo(LbPolicy.ROUND_ROBIN); assertThat(cdsUpdate.lrsServerName()).isEqualTo(""); assertThat(cdsUpdate.maxConcurrentRequests()).isNull(); assertThat(cdsUpdate.upstreamTlsContext()).isNull(); @@ -1410,7 +1411,7 @@ public void multipleCdsWatchers() { assertThat(cdsUpdate.clusterName()).isEqualTo(cdsResourceTwo); assertThat(cdsUpdate.clusterType()).isEqualTo(ClusterType.EDS); assertThat(cdsUpdate.edsServiceName()).isEqualTo(edsService); - assertThat(cdsUpdate.lbPolicy()).isEqualTo("round_robin"); + assertThat(cdsUpdate.lbPolicy()).isEqualTo(LbPolicy.ROUND_ROBIN); assertThat(cdsUpdate.lrsServerName()).isEqualTo(""); assertThat(cdsUpdate.maxConcurrentRequests()).isNull(); assertThat(cdsUpdate.upstreamTlsContext()).isNull(); diff --git a/xds/src/test/java/io/grpc/xds/ClusterResolverLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/ClusterResolverLoadBalancerTest.java index 264f6232c0c..eb1576f988e 100644 --- a/xds/src/test/java/io/grpc/xds/ClusterResolverLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/ClusterResolverLoadBalancerTest.java @@ -65,6 +65,7 @@ import io.grpc.xds.EnvoyServerProtoData.UpstreamTlsContext; import io.grpc.xds.PriorityLoadBalancerProvider.PriorityLbConfig; import io.grpc.xds.PriorityLoadBalancerProvider.PriorityLbConfig.PriorityChildConfig; +import io.grpc.xds.RingHashLoadBalancer.RingHashConfig; import io.grpc.xds.WeightedTargetLoadBalancerProvider.WeightedPolicySelection; import io.grpc.xds.WeightedTargetLoadBalancerProvider.WeightedTargetConfig; import io.grpc.xds.internal.sds.CommonTlsContextTestsUtil; @@ -113,6 +114,12 @@ public class ClusterResolverLoadBalancerTest { CommonTlsContextTestsUtil.CLIENT_KEY_FILE, CommonTlsContextTestsUtil.CLIENT_PEM_FILE, CommonTlsContextTestsUtil.CA_PEM_FILE); + private final DiscoveryMechanism edsDiscoveryMechanism1 = + DiscoveryMechanism.forEds(CLUSTER1, EDS_SERVICE_NAME1, LRS_SERVER_NAME, 100L, tlsContext); + private final DiscoveryMechanism edsDiscoveryMechanism2 = + DiscoveryMechanism.forEds(CLUSTER2, EDS_SERVICE_NAME2, LRS_SERVER_NAME, 200L, tlsContext); + private final DiscoveryMechanism logicalDnsDiscoveryMechanism = + DiscoveryMechanism.forLogicalDns(CLUSTER_DNS, LRS_SERVER_NAME, 300L, null); private final SynchronizationContext syncContext = new SynchronizationContext( new Thread.UncaughtExceptionHandler() { @@ -126,8 +133,8 @@ public void uncaughtException(Thread t, Throwable e) { private final NameResolverRegistry nsRegistry = new NameResolverRegistry(); private final PolicySelection roundRobin = new PolicySelection(new FakeLoadBalancerProvider("round_robin"), null); - private final PolicySelection weightedTarget = - new PolicySelection(new FakeLoadBalancerProvider(WEIGHTED_TARGET_POLICY_NAME), null); + private final PolicySelection ringHash = new PolicySelection( + new FakeLoadBalancerProvider("ring_hash"), new RingHashConfig(10L, 100L)); private final List childBalancers = new ArrayList<>(); private final List resolvers = new ArrayList<>(); private final FakeXdsClient xdsClient = new FakeXdsClient(); @@ -165,6 +172,7 @@ public void setUp() throws URISyntaxException { lbRegistry.register(new FakeLoadBalancerProvider(PRIORITY_POLICY_NAME)); lbRegistry.register(new FakeLoadBalancerProvider(CLUSTER_IMPL_POLICY_NAME)); + lbRegistry.register(new FakeLoadBalancerProvider(WEIGHTED_TARGET_POLICY_NAME)); lbRegistry.register( new FakeLoadBalancerProvider("pick_first")); // needed by logical_dns URI targetUri = new URI(AUTHORITY); @@ -198,9 +206,66 @@ public void tearDown() { assertThat(fakeClock.getPendingTasks()).isEmpty(); } + @Test + public void edsClustersWithRingHashEndpointLbPolicy() { + ClusterResolverConfig config = new ClusterResolverConfig( + Collections.singletonList(edsDiscoveryMechanism1), ringHash); + deliverLbConfig(config); + assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1); + assertThat(childBalancers).isEmpty(); + + // One priority with two localities of different weights. + EquivalentAddressGroup endpoint1 = makeAddress("endpoint-addr-1"); + EquivalentAddressGroup endpoint2 = makeAddress("endpoint-addr-2"); + LocalityLbEndpoints localityLbEndpoints1 = + LocalityLbEndpoints.create( + Collections.singletonList( + LbEndpoint.create(endpoint1, 100 /* loadBalancingWeight */, true)), + 10 /* localityWeight */, 1 /* priority */); + LocalityLbEndpoints localityLbEndpoints2 = + LocalityLbEndpoints.create( + Collections.singletonList( + LbEndpoint.create(endpoint2, 60 /* loadBalancingWeight */, true)), + 50 /* localityWeight */, 1 /* priority */); + xdsClient.deliverClusterLoadAssignment( + EDS_SERVICE_NAME1, + ImmutableMap.of(locality1, localityLbEndpoints1, locality2, localityLbEndpoints2)); + assertThat(childBalancers).hasSize(1); + FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); + assertThat(childBalancer.addresses).hasSize(2); + EquivalentAddressGroup addr1 = childBalancer.addresses.get(0); + EquivalentAddressGroup addr2 = childBalancer.addresses.get(1); + assertThat(addr1.getAddresses()).isEqualTo(endpoint1.getAddresses()); + assertThat(addr1.getAttributes().get(InternalXdsAttributes.ATTR_SERVER_WEIGHT)) + .isEqualTo(10 * 100); + assertThat(addr2.getAddresses()).isEqualTo(endpoint2.getAddresses()); + assertThat(addr2.getAttributes().get(InternalXdsAttributes.ATTR_SERVER_WEIGHT)) + .isEqualTo(50 * 60); + assertThat(childBalancer.name).isEqualTo(PRIORITY_POLICY_NAME); + PriorityLbConfig priorityLbConfig = (PriorityLbConfig) childBalancer.config; + assertThat(priorityLbConfig.priorities).containsExactly(CLUSTER1 + "[priority1]"); + PriorityChildConfig priorityChildConfig = + Iterables.getOnlyElement(priorityLbConfig.childConfigs.values()); + assertThat(priorityChildConfig.ignoreReresolution).isTrue(); + assertThat(priorityChildConfig.policySelection.getProvider().getPolicyName()) + .isEqualTo(CLUSTER_IMPL_POLICY_NAME); + ClusterImplConfig clusterImplConfig = + (ClusterImplConfig) priorityChildConfig.policySelection.getConfig(); + assertClusterImplConfig(clusterImplConfig, CLUSTER1, EDS_SERVICE_NAME1, LRS_SERVER_NAME, 100L, + tlsContext, Collections.emptyList(), "ring_hash"); + RingHashConfig ringHashConfig = + (RingHashConfig) clusterImplConfig.childPolicy.getConfig(); + assertThat(ringHashConfig.minRingSize).isEqualTo(10L); + assertThat(ringHashConfig.maxRingSize).isEqualTo(100L); + } + @Test public void onlyEdsClusters_receivedEndpoints() { - deliverConfigWithEdsClusters(); // CLUSTER1 and CLUSTER2 + ClusterResolverConfig config = new ClusterResolverConfig( + Arrays.asList(edsDiscoveryMechanism1, edsDiscoveryMechanism2), roundRobin); + deliverLbConfig(config); + assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1, EDS_SERVICE_NAME2); + assertThat(childBalancers).isEmpty(); // CLUSTER1 has priority 1 (priority3), which has locality 2, which has endpoint3. // CLUSTER2 has priority 1 (priority1) and 2 (priority2); priority1 has locality1, // which has endpoint1 and endpoint2; priority2 has locality3, which has endpoint4. @@ -209,11 +274,19 @@ public void onlyEdsClusters_receivedEndpoints() { EquivalentAddressGroup endpoint3 = makeAddress("endpoint-addr-3"); EquivalentAddressGroup endpoint4 = makeAddress("endpoint-addr-4"); LocalityLbEndpoints localityLbEndpoints1 = - buildLocalityLbEndpoints(1, 70, ImmutableMap.of(endpoint1, true, endpoint2, true)); + LocalityLbEndpoints.create( + Arrays.asList( + LbEndpoint.create(endpoint1, 100, true), + LbEndpoint.create(endpoint2, 100, true)), + 70 /* localityWeight */, 1 /* priority */); LocalityLbEndpoints localityLbEndpoints2 = - buildLocalityLbEndpoints(1, 10, Collections.singletonMap(endpoint3, true)); + LocalityLbEndpoints.create( + Collections.singletonList(LbEndpoint.create(endpoint3, 100, true)), + 10 /* localityWeight */, 1 /* priority */); LocalityLbEndpoints localityLbEndpoints3 = - buildLocalityLbEndpoints(2, 20, Collections.singletonMap(endpoint4, true)); + LocalityLbEndpoints.create( + Collections.singletonList(LbEndpoint.create(endpoint4, 100, true)), + 20 /* localityWeight */, 2 /* priority */); String priority1 = CLUSTER2 + "[priority1]"; String priority2 = CLUSTER2 + "[priority2]"; String priority3 = CLUSTER1 + "[priority1]"; @@ -291,7 +364,11 @@ public void onlyEdsClusters_receivedEndpoints() { @Test public void onlyEdsClusters_resourceNeverExist_returnErrorPicker() { - deliverConfigWithEdsClusters(); // CLUSTER1 and CLUSTER2 + ClusterResolverConfig config = new ClusterResolverConfig( + Arrays.asList(edsDiscoveryMechanism1, edsDiscoveryMechanism2), roundRobin); + deliverLbConfig(config); + assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1, EDS_SERVICE_NAME2); + assertThat(childBalancers).isEmpty(); reset(helper); xdsClient.deliverResourceNotFound(EDS_SERVICE_NAME1); verify(helper).updateBalancingState(eq(ConnectivityState.CONNECTING), pickerCaptor.capture()); @@ -308,14 +385,22 @@ public void onlyEdsClusters_resourceNeverExist_returnErrorPicker() { @Test public void onlyEdsClusters_allResourcesRevoked_shutDownChildLbPolicy() { - deliverConfigWithEdsClusters(); // CLUSTER1 and CLUSTER2 + ClusterResolverConfig config = new ClusterResolverConfig( + Arrays.asList(edsDiscoveryMechanism1, edsDiscoveryMechanism2), roundRobin); + deliverLbConfig(config); + assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1, EDS_SERVICE_NAME2); + assertThat(childBalancers).isEmpty(); reset(helper); EquivalentAddressGroup endpoint1 = makeAddress("endpoint-addr-1"); EquivalentAddressGroup endpoint2 = makeAddress("endpoint-addr-2"); LocalityLbEndpoints localityLbEndpoints1 = - buildLocalityLbEndpoints(1, 10, Collections.singletonMap(endpoint1, true)); + LocalityLbEndpoints.create( + Collections.singletonList(LbEndpoint.create(endpoint1, 100, true)), + 10 /* localityWeight */, 1 /* priority */); LocalityLbEndpoints localityLbEndpoints2 = - buildLocalityLbEndpoints(2, 20, Collections.singletonMap(endpoint2, true)); + LocalityLbEndpoints.create( + Collections.singletonList(LbEndpoint.create(endpoint2, 100, true)), + 20 /* localityWeight */, 2 /* priority */); xdsClient.deliverClusterLoadAssignment( EDS_SERVICE_NAME1, Collections.singletonMap(locality1, localityLbEndpoints1)); xdsClient.deliverClusterLoadAssignment( @@ -333,25 +418,19 @@ public void onlyEdsClusters_allResourcesRevoked_shutDownChildLbPolicy() { assertPicker(pickerCaptor.getValue(), expectedError, null); } - private void deliverConfigWithEdsClusters() { - DiscoveryMechanism instance1 = - DiscoveryMechanism.forEds(CLUSTER1, EDS_SERVICE_NAME1, LRS_SERVER_NAME, 100L, tlsContext); - DiscoveryMechanism instance2 = - DiscoveryMechanism.forEds(CLUSTER2, EDS_SERVICE_NAME2, LRS_SERVER_NAME, 200L, tlsContext); - ClusterResolverConfig config = - new ClusterResolverConfig(Arrays.asList(instance1, instance2), weightedTarget, roundRobin); - deliverLbConfig(config); - assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1, EDS_SERVICE_NAME2); - assertThat(childBalancers).isEmpty(); - } - @Test public void handleEdsResource_ignoreUnhealthyEndpoints() { - deliverConfigWithSingleEdsCluster(); // CLUSTER1 + ClusterResolverConfig config = + new ClusterResolverConfig(Collections.singletonList(edsDiscoveryMechanism1), roundRobin); + deliverLbConfig(config); EquivalentAddressGroup endpoint1 = makeAddress("endpoint-addr-1"); EquivalentAddressGroup endpoint2 = makeAddress("endpoint-addr-2"); LocalityLbEndpoints localityLbEndpoints = - buildLocalityLbEndpoints(1, 10, ImmutableMap.of(endpoint1, false, endpoint2, true)); + LocalityLbEndpoints.create( + Arrays.asList( + LbEndpoint.create(endpoint1, 100, false /* isHealthy */), + LbEndpoint.create(endpoint2, 100, true /* isHealthy */)), + 10 /* localityWeight */, 1 /* priority */); xdsClient.deliverClusterLoadAssignment( EDS_SERVICE_NAME1, Collections.singletonMap(locality1, localityLbEndpoints)); FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); @@ -361,13 +440,19 @@ public void handleEdsResource_ignoreUnhealthyEndpoints() { @Test public void handleEdsResource_ignoreLocalitiesWithNoHealthyEndpoints() { - deliverConfigWithSingleEdsCluster(); // CLUSTER1 + ClusterResolverConfig config = + new ClusterResolverConfig(Collections.singletonList(edsDiscoveryMechanism1), roundRobin); + deliverLbConfig(config); EquivalentAddressGroup endpoint1 = makeAddress("endpoint-addr-1"); EquivalentAddressGroup endpoint2 = makeAddress("endpoint-addr-2"); LocalityLbEndpoints localityLbEndpoints1 = - buildLocalityLbEndpoints(1, 10, Collections.singletonMap(endpoint1, false)); + LocalityLbEndpoints.create( + Collections.singletonList(LbEndpoint.create(endpoint1, 100, false /* isHealthy */)), + 10 /* localityWeight */, 1 /* priority */); LocalityLbEndpoints localityLbEndpoints2 = - buildLocalityLbEndpoints(1, 10, Collections.singletonMap(endpoint2, true)); + LocalityLbEndpoints.create( + Collections.singletonList(LbEndpoint.create(endpoint2, 100, true /* isHealthy */)), + 10 /* localityWeight */, 1 /* priority */); String priority = CLUSTER1 + "[priority1]"; xdsClient.deliverClusterLoadAssignment( EDS_SERVICE_NAME1, @@ -385,29 +470,38 @@ public void handleEdsResource_ignoreLocalitiesWithNoHealthyEndpoints() { @Test public void handleEdsResource_ignorePrioritiesWithNoHealthyEndpoints() { - deliverConfigWithSingleEdsCluster(); // CLUSTER1 + ClusterResolverConfig config = + new ClusterResolverConfig(Collections.singletonList(edsDiscoveryMechanism1), roundRobin); + deliverLbConfig(config); EquivalentAddressGroup endpoint1 = makeAddress("endpoint-addr-1"); EquivalentAddressGroup endpoint2 = makeAddress("endpoint-addr-2"); LocalityLbEndpoints localityLbEndpoints1 = - buildLocalityLbEndpoints(1, 10, Collections.singletonMap(endpoint1, false)); + LocalityLbEndpoints.create( + Collections.singletonList(LbEndpoint.create(endpoint1, 100, false /* isHealthy */)), + 10 /* localityWeight */, 1 /* priority */); LocalityLbEndpoints localityLbEndpoints2 = - buildLocalityLbEndpoints(2, 10, Collections.singletonMap(endpoint2, true)); + LocalityLbEndpoints.create( + Collections.singletonList(LbEndpoint.create(endpoint2, 200, true /* isHealthy */)), + 10 /* localityWeight */, 2 /* priority */); String priority2 = CLUSTER1 + "[priority2]"; xdsClient.deliverClusterLoadAssignment( EDS_SERVICE_NAME1, ImmutableMap.of(locality1, localityLbEndpoints1, locality2, localityLbEndpoints2)); FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); - PriorityLbConfig config = (PriorityLbConfig) childBalancer.config; - assertThat(config.priorities).containsExactly(priority2); + assertThat(((PriorityLbConfig) childBalancer.config).priorities).containsExactly(priority2); } @Test public void handleEdsResource_noHealthyEndpoint() { - deliverConfigWithSingleEdsCluster(); // CLUSTER1 + ClusterResolverConfig config = + new ClusterResolverConfig(Collections.singletonList(edsDiscoveryMechanism1), roundRobin); + deliverLbConfig(config); EquivalentAddressGroup endpoint = makeAddress("endpoint-addr-1"); LocalityLbEndpoints localityLbEndpoints = - buildLocalityLbEndpoints(1, 10, Collections.singletonMap(endpoint, false)); + LocalityLbEndpoints.create( + Collections.singletonList(LbEndpoint.create(endpoint, 100, false /* isHealthy */)), + 10 /* localityWeight */, 1 /* priority */); xdsClient.deliverClusterLoadAssignment(EDS_SERVICE_NAME1, Collections.singletonMap(locality1, localityLbEndpoints)); // single endpoint, unhealthy @@ -418,19 +512,13 @@ public void handleEdsResource_noHealthyEndpoint() { Status.UNAVAILABLE.withDescription("No usable endpoint"), null); } - private void deliverConfigWithSingleEdsCluster() { - DiscoveryMechanism instance = - DiscoveryMechanism.forEds(CLUSTER1, EDS_SERVICE_NAME1, LRS_SERVER_NAME, 100L, null); - ClusterResolverConfig config = - new ClusterResolverConfig(Collections.singletonList(instance), weightedTarget, roundRobin); - deliverLbConfig(config); - assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1); - assertThat(childBalancers).isEmpty(); - } - @Test public void onlyLogicalDnsCluster_endpointsResolved() { - deliverConfigWithSingleLogicalDnsCluster(); + ClusterResolverConfig config = new ClusterResolverConfig( + Collections.singletonList(logicalDnsDiscoveryMechanism), roundRobin); + deliverLbConfig(config); + assertThat(resolvers).hasSize(1); + assertThat(childBalancers).isEmpty(); EquivalentAddressGroup endpoint1 = makeAddress("endpoint-addr-1"); EquivalentAddressGroup endpoint2 = makeAddress("endpoint-addr-2"); FakeNameResolver resolver = Iterables.getOnlyElement(resolvers); @@ -447,14 +535,18 @@ public void onlyLogicalDnsCluster_endpointsResolved() { .isEqualTo(CLUSTER_IMPL_POLICY_NAME); ClusterImplConfig clusterImplConfig = (ClusterImplConfig) priorityChildConfig.policySelection.getConfig(); - assertClusterImplConfig(clusterImplConfig, CLUSTER_DNS, null, LRS_SERVER_NAME, 100L, null, + assertClusterImplConfig(clusterImplConfig, CLUSTER_DNS, null, LRS_SERVER_NAME, 300L, null, Collections.emptyList(), "pick_first"); assertAddressesEqual(Arrays.asList(endpoint1, endpoint2), childBalancer.addresses); } @Test public void onlyLogicalDnsCluster_handleRefreshNameResolution() { - deliverConfigWithSingleLogicalDnsCluster(); + ClusterResolverConfig config = new ClusterResolverConfig( + Collections.singletonList(logicalDnsDiscoveryMechanism), roundRobin); + deliverLbConfig(config); + assertThat(resolvers).hasSize(1); + assertThat(childBalancers).isEmpty(); EquivalentAddressGroup endpoint1 = makeAddress("endpoint-addr-1"); EquivalentAddressGroup endpoint2 = makeAddress("endpoint-addr-2"); FakeNameResolver resolver = Iterables.getOnlyElement(resolvers); @@ -470,7 +562,11 @@ public void onlyLogicalDnsCluster_handleRefreshNameResolution() { public void onlyLogicalDnsCluster_resolutionError_backoffAndRefresh() { InOrder inOrder = Mockito.inOrder(helper, backoffPolicyProvider, backoffPolicy1, backoffPolicy2); - deliverConfigWithSingleLogicalDnsCluster(); + ClusterResolverConfig config = new ClusterResolverConfig( + Collections.singletonList(logicalDnsDiscoveryMechanism), roundRobin); + deliverLbConfig(config); + assertThat(resolvers).hasSize(1); + assertThat(childBalancers).isEmpty(); FakeNameResolver resolver = Iterables.getOnlyElement(resolvers); Status error = Status.UNAVAILABLE.withDescription("cannot reach DNS server"); resolver.deliverError(error); @@ -513,7 +609,11 @@ public void onlyLogicalDnsCluster_resolutionError_backoffAndRefresh() { @Test public void onlyLogicalDnsCluster_refreshNameResolutionRaceWithResolutionError() { InOrder inOrder = Mockito.inOrder(backoffPolicyProvider, backoffPolicy1, backoffPolicy2); - deliverConfigWithSingleLogicalDnsCluster(); + ClusterResolverConfig config = new ClusterResolverConfig( + Collections.singletonList(logicalDnsDiscoveryMechanism), roundRobin); + deliverLbConfig(config); + assertThat(resolvers).hasSize(1); + assertThat(childBalancers).isEmpty(); EquivalentAddressGroup endpoint = makeAddress("endpoint-addr"); FakeNameResolver resolver = Iterables.getOnlyElement(resolvers); resolver.deliverEndpointAddresses(Collections.singletonList(endpoint)); @@ -547,26 +647,23 @@ public void onlyLogicalDnsCluster_refreshNameResolutionRaceWithResolutionError() inOrder.verifyNoMoreInteractions(); } - private void deliverConfigWithSingleLogicalDnsCluster() { - DiscoveryMechanism instance = - DiscoveryMechanism.forLogicalDns(CLUSTER_DNS, LRS_SERVER_NAME, 100L, null); - ClusterResolverConfig config = - new ClusterResolverConfig(Collections.singletonList(instance), weightedTarget, roundRobin); + @Test + public void edsClustersAndLogicalDnsCluster_receivedEndpoints() { + ClusterResolverConfig config = new ClusterResolverConfig( + Arrays.asList(edsDiscoveryMechanism1, logicalDnsDiscoveryMechanism), roundRobin); deliverLbConfig(config); + assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1); assertThat(resolvers).hasSize(1); assertThat(childBalancers).isEmpty(); - } - - @Test - public void edsClustersAndLogicalDnsCluster_receivedEndpoints() { - deliverConfigWithEdsAndLogicalDnsClusters(); // CLUSTER1 and CLUSTER_DNS EquivalentAddressGroup endpoint1 = makeAddress("endpoint-addr-1"); // DNS endpoint EquivalentAddressGroup endpoint2 = makeAddress("endpoint-addr-2"); // DNS endpoint EquivalentAddressGroup endpoint3 = makeAddress("endpoint-addr-3"); // EDS endpoint FakeNameResolver resolver = Iterables.getOnlyElement(resolvers); resolver.deliverEndpointAddresses(Arrays.asList(endpoint1, endpoint2)); LocalityLbEndpoints localityLbEndpoints = - buildLocalityLbEndpoints(1, 10, Collections.singletonMap(endpoint3, true)); + LocalityLbEndpoints.create( + Collections.singletonList(LbEndpoint.create(endpoint3, 100, true)), + 10 /* localityWeight */, 1 /* priority */); xdsClient.deliverClusterLoadAssignment( EDS_SERVICE_NAME1, Collections.singletonMap(locality1, localityLbEndpoints)); @@ -587,7 +684,12 @@ public void edsClustersAndLogicalDnsCluster_receivedEndpoints() { @Test public void noEdsResourceExists_useDnsResolutionResults() { - deliverConfigWithEdsAndLogicalDnsClusters(); + ClusterResolverConfig config = new ClusterResolverConfig( + Arrays.asList(edsDiscoveryMechanism1, logicalDnsDiscoveryMechanism), roundRobin); + deliverLbConfig(config); + assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1); + assertThat(resolvers).hasSize(1); + assertThat(childBalancers).isEmpty(); reset(helper); xdsClient.deliverResourceNotFound(EDS_SERVICE_NAME1); verify(helper).updateBalancingState(eq(ConnectivityState.CONNECTING), pickerCaptor.capture()); @@ -609,11 +711,18 @@ public void noEdsResourceExists_useDnsResolutionResults() { @Test public void edsResourceRevoked_dnsResolutionError_shutDownChildLbPolicyAndReturnErrorPicker() { - deliverConfigWithEdsAndLogicalDnsClusters(); + ClusterResolverConfig config = new ClusterResolverConfig( + Arrays.asList(edsDiscoveryMechanism1, logicalDnsDiscoveryMechanism), roundRobin); + deliverLbConfig(config); + assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1); + assertThat(resolvers).hasSize(1); + assertThat(childBalancers).isEmpty(); reset(helper); EquivalentAddressGroup endpoint = makeAddress("endpoint-addr-1"); LocalityLbEndpoints localityLbEndpoints = - buildLocalityLbEndpoints(1, 10, Collections.singletonMap(endpoint, true)); + LocalityLbEndpoints.create( + Collections.singletonList(LbEndpoint.create(endpoint, 100, true)), + 10 /* localityWeight */, 1 /* priority */); xdsClient.deliverClusterLoadAssignment( EDS_SERVICE_NAME1, Collections.singletonMap(locality1, localityLbEndpoints)); FakeNameResolver resolver = Iterables.getOnlyElement(resolvers); @@ -634,11 +743,18 @@ public void edsResourceRevoked_dnsResolutionError_shutDownChildLbPolicyAndReturn @Test public void resolutionErrorAfterChildLbCreated_propagateErrorIfAllClustersEncounterError() { - deliverConfigWithEdsAndLogicalDnsClusters(); + ClusterResolverConfig config = new ClusterResolverConfig( + Arrays.asList(edsDiscoveryMechanism1, logicalDnsDiscoveryMechanism), roundRobin); + deliverLbConfig(config); + assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1); + assertThat(resolvers).hasSize(1); + assertThat(childBalancers).isEmpty(); reset(helper); EquivalentAddressGroup endpoint = makeAddress("endpoint-addr-1"); LocalityLbEndpoints localityLbEndpoints = - buildLocalityLbEndpoints(1, 10, Collections.singletonMap(endpoint, true)); + LocalityLbEndpoints.create( + Collections.singletonList(LbEndpoint.create(endpoint, 100, true)), + 10 /* localityWeight */, 1 /* priority */); xdsClient.deliverClusterLoadAssignment( EDS_SERVICE_NAME1, Collections.singletonMap(locality1, localityLbEndpoints)); FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers); // child LB created @@ -656,7 +772,12 @@ public void resolutionErrorAfterChildLbCreated_propagateErrorIfAllClustersEncoun @Test public void resolutionErrorBeforeChildLbCreated_returnErrorPickerIfAllClustersEncounterError() { - deliverConfigWithEdsAndLogicalDnsClusters(); + ClusterResolverConfig config = new ClusterResolverConfig( + Arrays.asList(edsDiscoveryMechanism1, logicalDnsDiscoveryMechanism), roundRobin); + deliverLbConfig(config); + assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1); + assertThat(resolvers).hasSize(1); + assertThat(childBalancers).isEmpty(); reset(helper); xdsClient.deliverError(Status.UNIMPLEMENTED.withDescription("not found")); assertThat(childBalancers).isEmpty(); @@ -672,7 +793,12 @@ public void resolutionErrorBeforeChildLbCreated_returnErrorPickerIfAllClustersEn @Test public void handleNameResolutionErrorFromUpstream_beforeChildLbCreated_returnErrorPicker() { - deliverConfigWithEdsAndLogicalDnsClusters(); + ClusterResolverConfig config = new ClusterResolverConfig( + Arrays.asList(edsDiscoveryMechanism1, logicalDnsDiscoveryMechanism), roundRobin); + deliverLbConfig(config); + assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1); + assertThat(resolvers).hasSize(1); + assertThat(childBalancers).isEmpty(); reset(helper); Status upstreamError = Status.UNAVAILABLE.withDescription("unreachable"); loadBalancer.handleNameResolutionError(upstreamError); @@ -683,12 +809,19 @@ public void handleNameResolutionErrorFromUpstream_beforeChildLbCreated_returnErr @Test public void handleNameResolutionErrorFromUpstream_afterChildLbCreated_fallThrough() { - deliverConfigWithEdsAndLogicalDnsClusters(); + ClusterResolverConfig config = new ClusterResolverConfig( + Arrays.asList(edsDiscoveryMechanism1, logicalDnsDiscoveryMechanism), roundRobin); + deliverLbConfig(config); + assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1); + assertThat(resolvers).hasSize(1); + assertThat(childBalancers).isEmpty(); reset(helper); EquivalentAddressGroup endpoint1 = makeAddress("endpoint-addr-1"); EquivalentAddressGroup endpoint2 = makeAddress("endpoint-addr-2"); LocalityLbEndpoints localityLbEndpoints = - buildLocalityLbEndpoints(1, 10, Collections.singletonMap(endpoint1, true)); + LocalityLbEndpoints.create( + Collections.singletonList(LbEndpoint.create(endpoint1, 100, true)), + 10 /* localityWeight */, 1 /* priority */); xdsClient.deliverClusterLoadAssignment( EDS_SERVICE_NAME1, Collections.singletonMap(locality1, localityLbEndpoints)); FakeNameResolver resolver = Iterables.getOnlyElement(resolvers); @@ -706,19 +839,6 @@ public void handleNameResolutionErrorFromUpstream_afterChildLbCreated_fallThroug any(ConnectivityState.class), any(SubchannelPicker.class)); } - private void deliverConfigWithEdsAndLogicalDnsClusters() { - DiscoveryMechanism instance1 = - DiscoveryMechanism.forEds(CLUSTER1, EDS_SERVICE_NAME1, LRS_SERVER_NAME, 100L, null); - DiscoveryMechanism instance2 = - DiscoveryMechanism.forLogicalDns(CLUSTER_DNS, LRS_SERVER_NAME, 200L, null); - ClusterResolverConfig config = - new ClusterResolverConfig(Arrays.asList(instance1, instance2), weightedTarget, roundRobin); - deliverLbConfig(config); - assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME1); - assertThat(resolvers).hasSize(1); - assertThat(childBalancers).isEmpty(); - } - private void deliverLbConfig(ClusterResolverConfig config) { loadBalancer.handleResolvedAddresses( ResolvedAddresses.newBuilder() @@ -765,16 +885,6 @@ private static void assertAddressesEqual( } } - private static LocalityLbEndpoints buildLocalityLbEndpoints( - int priority, int localityWeight, Map managedEndpoints) { - List endpoints = new ArrayList<>(); - for (EquivalentAddressGroup addr : managedEndpoints.keySet()) { - boolean status = managedEndpoints.get(addr); - endpoints.add(LbEndpoint.create(addr, 100 /* unused */, status)); - } - return LocalityLbEndpoints.create(endpoints, localityWeight, priority); - } - private static EquivalentAddressGroup makeAddress(final String name) { class FakeSocketAddress extends SocketAddress { private final String name;