Skip to content

Commit

Permalink
xds: eds reuse priority names for the same existing locality (grpc#9287)
Browse files Browse the repository at this point in the history
  • Loading branch information
YifeiZhuang authored and larry-safran committed Jun 28, 2022
1 parent 0b85d5c commit b0d060d
Show file tree
Hide file tree
Showing 2 changed files with 132 additions and 19 deletions.
52 changes: 45 additions & 7 deletions xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java
Expand Up @@ -58,9 +58,12 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -342,6 +345,8 @@ void shutdown() {
private final class EdsClusterState extends ClusterState implements EdsResourceWatcher {
@Nullable
private final String edsServiceName;
private Map<Locality, String> localityPriorityNames = Collections.emptyMap();
int priorityNameGenId = 1;

private EdsClusterState(String name, @Nullable String edsServiceName,
@Nullable ServerInfo lrsServerInfo, @Nullable Long maxConcurrentRequests,
Expand Down Expand Up @@ -385,10 +390,10 @@ public void run() {
List<DropOverload> dropOverloads = update.dropPolicies;
List<EquivalentAddressGroup> addresses = new ArrayList<>();
Map<String, Map<Locality, Integer>> prioritizedLocalityWeights = new HashMap<>();
List<String> sortedPriorityNames = generatePriorityNames(name, localityLbEndpoints);
for (Locality locality : localityLbEndpoints.keySet()) {
LocalityLbEndpoints localityLbInfo = localityLbEndpoints.get(locality);
int priority = localityLbInfo.priority();
String priorityName = priorityName(name, priority);
String priorityName = localityPriorityNames.get(locality);
boolean discard = true;
for (LbEndpoint endpoint : localityLbInfo.endpoints()) {
if (endpoint.isHealthy()) {
Expand Down Expand Up @@ -426,23 +431,56 @@ public void run() {
logger.log(XdsLogLevel.INFO,
"Cluster {0} has no usable priority/locality/endpoint", update.clusterName);
}
List<String> priorities = new ArrayList<>(prioritizedLocalityWeights.keySet());
Collections.sort(priorities);
sortedPriorityNames.retainAll(prioritizedLocalityWeights.keySet());
Map<String, PriorityChildConfig> priorityChildConfigs =
generateEdsBasedPriorityChildConfigs(
name, edsServiceName, lrsServerInfo, maxConcurrentRequests, tlsContext,
endpointLbPolicy, lbRegistry, prioritizedLocalityWeights, dropOverloads);
status = Status.OK;
resolved = true;
result = new ClusterResolutionResult(addresses, priorityChildConfigs, priorities,
localityWeights);
result = new ClusterResolutionResult(addresses, priorityChildConfigs,
sortedPriorityNames, localityWeights);
handleEndpointResourceUpdate();
}
}

syncContext.execute(new EndpointsUpdated());
}

private List<String> generatePriorityNames(String name,
Map<Locality, LocalityLbEndpoints> localityLbEndpoints) {
TreeMap<Integer, List<Locality>> todo = new TreeMap<>();
for (Locality locality : localityLbEndpoints.keySet()) {
int priority = localityLbEndpoints.get(locality).priority();
if (!todo.containsKey(priority)) {
todo.put(priority, new ArrayList<>());
}
todo.get(priority).add(locality);
}
Map<Locality, String> newNames = new HashMap<>();
Set<String> usedNames = new HashSet<>();
List<String> ret = new ArrayList<>();
for (Integer priority: todo.keySet()) {
String foundName = "";
for (Locality locality : todo.get(priority)) {
if (localityPriorityNames.containsKey(locality)
&& usedNames.add(localityPriorityNames.get(locality))) {
foundName = localityPriorityNames.get(locality);
break;
}
}
if ("".equals(foundName)) {
foundName = String.format("%s[child%d]", name, priorityNameGenId++);
}
for (Locality locality : todo.get(priority)) {
newNames.put(locality, foundName);
}
ret.add(foundName);
}
localityPriorityNames = newNames;
return ret;
}

@Override
public void onResourceDoesNotExist(final String resourceName) {
syncContext.execute(new Runnable() {
Expand Down Expand Up @@ -718,7 +756,7 @@ private static Map<String, PriorityChildConfig> generateEdsBasedPriorityChildCon
* The ordering is undefined for priorities in different clusters.
*/
private static String priorityName(String cluster, int priority) {
return cluster + "[priority" + priority + "]";
return cluster + "[child" + priority + "]";
}

/**
Expand Down
99 changes: 87 additions & 12 deletions xds/src/test/java/io/grpc/xds/ClusterResolverLoadBalancerTest.java
Expand Up @@ -257,7 +257,7 @@ public void edsClustersWithRingHashEndpointLbPolicy() {
.isEqualTo(50 * 60);
assertThat(childBalancer.name).isEqualTo(PRIORITY_POLICY_NAME);
PriorityLbConfig priorityLbConfig = (PriorityLbConfig) childBalancer.config;
assertThat(priorityLbConfig.priorities).containsExactly(CLUSTER1 + "[priority1]");
assertThat(priorityLbConfig.priorities).containsExactly(CLUSTER1 + "[child1]");
PriorityChildConfig priorityChildConfig =
Iterables.getOnlyElement(priorityLbConfig.childConfigs.values());
assertThat(priorityChildConfig.ignoreReresolution).isTrue();
Expand Down Expand Up @@ -298,7 +298,7 @@ public void edsClustersWithLeastRequestEndpointLbPolicy() {
assertThat(addr.getAddresses()).isEqualTo(endpoint.getAddresses());
assertThat(childBalancer.name).isEqualTo(PRIORITY_POLICY_NAME);
PriorityLbConfig priorityLbConfig = (PriorityLbConfig) childBalancer.config;
assertThat(priorityLbConfig.priorities).containsExactly(CLUSTER1 + "[priority1]");
assertThat(priorityLbConfig.priorities).containsExactly(CLUSTER1 + "[child1]");
PriorityChildConfig priorityChildConfig =
Iterables.getOnlyElement(priorityLbConfig.childConfigs.values());
assertThat(priorityChildConfig.policySelection.getProvider().getPolicyName())
Expand Down Expand Up @@ -345,9 +345,9 @@ public void onlyEdsClusters_receivedEndpoints() {
LocalityLbEndpoints.create(
Collections.singletonList(LbEndpoint.create(endpoint4, 100, true)),
20 /* localityWeight */, 2 /* priority */);
String priority1 = CLUSTER2 + "[priority1]";
String priority2 = CLUSTER2 + "[priority2]";
String priority3 = CLUSTER1 + "[priority1]";
String priority1 = CLUSTER2 + "[child1]";
String priority2 = CLUSTER2 + "[child2]";
String priority3 = CLUSTER1 + "[child1]";

// CLUSTER2: locality1 with priority 1 and locality3 with priority 2.
xdsClient.deliverClusterLoadAssignment(
Expand Down Expand Up @@ -416,6 +416,81 @@ public void onlyEdsClusters_receivedEndpoints() {
assertThat(localityWeights).containsEntry(locality3, 20);
}

@SuppressWarnings("unchecked")
private void verifyEdsPriorityNames(List<String> want,
Map<Locality, LocalityLbEndpoints>... updates) {
ClusterResolverConfig config = new ClusterResolverConfig(
Arrays.asList(edsDiscoveryMechanism2), roundRobin);
deliverLbConfig(config);
assertThat(xdsClient.watchers.keySet()).containsExactly(EDS_SERVICE_NAME2);
assertThat(childBalancers).isEmpty();

for (Map<Locality, LocalityLbEndpoints> update: updates) {
xdsClient.deliverClusterLoadAssignment(
EDS_SERVICE_NAME2,
update);
}
assertThat(childBalancers).hasSize(1);
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
assertThat(childBalancer.name).isEqualTo(PRIORITY_POLICY_NAME);
PriorityLbConfig priorityLbConfig = (PriorityLbConfig) childBalancer.config;
assertThat(priorityLbConfig.priorities).isEqualTo(want);
}

@Test
@SuppressWarnings("unchecked")
public void edsUpdatePriorityName_twoPriorities() {
verifyEdsPriorityNames(Arrays.asList(CLUSTER2 + "[child1]", CLUSTER2 + "[child2]"),
ImmutableMap.of(locality1, createEndpoints(1),
locality2, createEndpoints(2)
));
}

@Test
@SuppressWarnings("unchecked")
public void edsUpdatePriorityName_addOnePriority() {
verifyEdsPriorityNames(Arrays.asList(CLUSTER2 + "[child2]"),
ImmutableMap.of(locality1, createEndpoints(1)),
ImmutableMap.of(locality2, createEndpoints(1)
));
}

@Test
@SuppressWarnings("unchecked")
public void edsUpdatePriorityName_swapTwoPriorities() {
verifyEdsPriorityNames(Arrays.asList(CLUSTER2 + "[child2]", CLUSTER2 + "[child1]",
CLUSTER2 + "[child3]"),
ImmutableMap.of(locality1, createEndpoints(1),
locality2, createEndpoints(2),
locality3, createEndpoints(3)
),
ImmutableMap.of(locality1, createEndpoints(2),
locality2, createEndpoints(1),
locality3, createEndpoints(3))
);
}

@Test
@SuppressWarnings("unchecked")
public void edsUpdatePriorityName_mergeTwoPriorities() {
verifyEdsPriorityNames(Arrays.asList(CLUSTER2 + "[child3]", CLUSTER2 + "[child1]"),
ImmutableMap.of(locality1, createEndpoints(1),
locality3, createEndpoints(3),
locality2, createEndpoints(2)),
ImmutableMap.of(locality1, createEndpoints(2),
locality3, createEndpoints(1),
locality2, createEndpoints(1)
));
}

private LocalityLbEndpoints createEndpoints(int priority) {
return LocalityLbEndpoints.create(
Arrays.asList(
LbEndpoint.create(makeAddress("endpoint-addr-1"), 100, true),
LbEndpoint.create(makeAddress("endpoint-addr-2"), 100, true)),
70 /* localityWeight */, priority /* priority */);
}

@Test
public void onlyEdsClusters_resourceNeverExist_returnErrorPicker() {
ClusterResolverConfig config = new ClusterResolverConfig(
Expand Down Expand Up @@ -534,7 +609,7 @@ public void handleEdsResource_ignorePrioritiesWithNoHealthyEndpoints() {
LocalityLbEndpoints.create(
Collections.singletonList(LbEndpoint.create(endpoint2, 200, true /* isHealthy */)),
10 /* localityWeight */, 2 /* priority */);
String priority2 = CLUSTER1 + "[priority2]";
String priority2 = CLUSTER1 + "[child2]";
xdsClient.deliverClusterLoadAssignment(
EDS_SERVICE_NAME1,
ImmutableMap.of(locality1, localityLbEndpoints1, locality2, localityLbEndpoints2));
Expand Down Expand Up @@ -719,14 +794,14 @@ public void edsClustersAndLogicalDnsCluster_receivedEndpoints() {
assertThat(childBalancers).hasSize(1);
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
assertThat(((PriorityLbConfig) childBalancer.config).priorities)
.containsExactly(CLUSTER1 + "[priority1]", CLUSTER_DNS + "[priority0]").inOrder();
.containsExactly(CLUSTER1 + "[child1]", CLUSTER_DNS + "[child0]").inOrder();
assertAddressesEqual(Arrays.asList(endpoint3, endpoint1, endpoint2),
childBalancer.addresses); // ordered by cluster then addresses
assertAddressesEqual(AddressFilter.filter(AddressFilter.filter(
childBalancer.addresses, CLUSTER1 + "[priority1]"), locality1.toString()),
childBalancer.addresses, CLUSTER1 + "[child1]"), locality1.toString()),
Collections.singletonList(endpoint3));
assertAddressesEqual(AddressFilter.filter(AddressFilter.filter(
childBalancer.addresses, CLUSTER_DNS + "[priority0]"),
childBalancer.addresses, CLUSTER_DNS + "[child0]"),
Locality.create("", "", "").toString()),
Arrays.asList(endpoint1, endpoint2));
}
Expand All @@ -751,7 +826,7 @@ public void noEdsResourceExists_useDnsResolutionResults() {
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
String priority = Iterables.getOnlyElement(
((PriorityLbConfig) childBalancer.config).priorities);
assertThat(priority).isEqualTo(CLUSTER_DNS + "[priority0]");
assertThat(priority).isEqualTo(CLUSTER_DNS + "[child0]");
assertAddressesEqual(Arrays.asList(endpoint1, endpoint2), childBalancer.addresses);
}

Expand All @@ -775,7 +850,7 @@ public void edsResourceRevoked_dnsResolutionError_shutDownChildLbPolicyAndReturn
assertThat(childBalancers).hasSize(1);
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
assertThat(((PriorityLbConfig) childBalancer.config).priorities)
.containsExactly(CLUSTER1 + "[priority1]");
.containsExactly(CLUSTER1 + "[child1]");
assertAddressesEqual(Collections.singletonList(endpoint), childBalancer.addresses);
assertThat(childBalancer.shutdown).isFalse();
xdsClient.deliverResourceNotFound(EDS_SERVICE_NAME1);
Expand Down Expand Up @@ -899,7 +974,7 @@ public void handleNameResolutionErrorFromUpstream_afterChildLbCreated_fallThroug
assertThat(childBalancers).hasSize(1);
FakeLoadBalancer childBalancer = Iterables.getOnlyElement(childBalancers);
assertThat(((PriorityLbConfig) childBalancer.config).priorities)
.containsExactly(CLUSTER1 + "[priority1]", CLUSTER_DNS + "[priority0]");
.containsExactly(CLUSTER1 + "[child1]", CLUSTER_DNS + "[child0]");
assertAddressesEqual(Arrays.asList(endpoint1, endpoint2), childBalancer.addresses);

loadBalancer.handleNameResolutionError(Status.UNAVAILABLE.withDescription("unreachable"));
Expand Down

0 comments on commit b0d060d

Please sign in to comment.