Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds: apply valid resources while NACKing update #8506

Merged
merged 2 commits into from Sep 12, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
135 changes: 71 additions & 64 deletions xds/src/main/java/io/grpc/xds/ClientXdsClient.java
Expand Up @@ -190,6 +190,7 @@ final class ClientXdsClient extends AbstractXdsClient {
protected void handleLdsResponse(String versionInfo, List<Any> resources, String nonce) {
Map<String, ParsedResource> parsedResources = new HashMap<>(resources.size());
Set<String> unpackedResources = new HashSet<>(resources.size());
Set<String> invalidResources = new HashSet<>();
List<String> errors = new ArrayList<>();
Set<String> retainedRdsResources = new HashSet<>();

Expand Down Expand Up @@ -222,6 +223,7 @@ protected void handleLdsResponse(String versionInfo, List<Any> resources, String
} catch (ResourceInvalidException e) {
errors.add(
"LDS response Listener '" + listenerName + "' validation error: " + e.getMessage());
invalidResources.add(listenerName);
continue;
}

Expand All @@ -231,19 +233,9 @@ protected void handleLdsResponse(String versionInfo, List<Any> resources, String
getLogger().log(XdsLogLevel.INFO,
"Received LDS Response version {0} nonce {1}. Parsed resources: {2}",
versionInfo, nonce, unpackedResources);

if (!errors.isEmpty()) {
handleResourcesRejected(ResourceType.LDS, unpackedResources, versionInfo, nonce, errors);
return;
}

handleResourcesAccepted(ResourceType.LDS, parsedResources, versionInfo, nonce);
for (String resource : rdsResourceSubscribers.keySet()) {
if (!retainedRdsResources.contains(resource)) {
ResourceSubscriber subscriber = rdsResourceSubscribers.get(resource);
subscriber.onAbsent();
}
}
handleResourceUpdate(
ResourceType.LDS, parsedResources, invalidResources, retainedRdsResources, versionInfo,
nonce, errors);
}

private LdsUpdate processClientSideListener(
Expand Down Expand Up @@ -1287,6 +1279,7 @@ static StructOrError<ClusterWeight> parseClusterWeight(
protected void handleRdsResponse(String versionInfo, List<Any> resources, String nonce) {
Map<String, ParsedResource> parsedResources = new HashMap<>(resources.size());
Set<String> unpackedResources = new HashSet<>(resources.size());
Set<String> invalidResources = new HashSet<>(resources.size());
ejona86 marked this conversation as resolved.
Show resolved Hide resolved
List<String> errors = new ArrayList<>();

for (int i = 0; i < resources.size(); i++) {
Expand Down Expand Up @@ -1314,6 +1307,7 @@ protected void handleRdsResponse(String versionInfo, List<Any> resources, String
errors.add(
"RDS response RouteConfiguration '" + routeConfigName + "' validation error: " + e
.getMessage());
invalidResources.add(routeConfigName);
continue;
}

Expand All @@ -1322,12 +1316,9 @@ protected void handleRdsResponse(String versionInfo, List<Any> resources, String
getLogger().log(XdsLogLevel.INFO,
"Received RDS Response version {0} nonce {1}. Parsed resources: {2}",
versionInfo, nonce, unpackedResources);

if (!errors.isEmpty()) {
handleResourcesRejected(ResourceType.RDS, unpackedResources, versionInfo, nonce, errors);
} else {
handleResourcesAccepted(ResourceType.RDS, parsedResources, versionInfo, nonce);
}
handleResourceUpdate(
ResourceType.RDS, parsedResources, invalidResources, Collections.<String>emptySet(),
versionInfo, nonce, errors);
}

private static RdsUpdate processRouteConfiguration(
Expand All @@ -1351,6 +1342,7 @@ private static RdsUpdate processRouteConfiguration(
protected void handleCdsResponse(String versionInfo, List<Any> resources, String nonce) {
Map<String, ParsedResource> parsedResources = new HashMap<>(resources.size());
Set<String> unpackedResources = new HashSet<>(resources.size());
Set<String> invalidResources = new HashSet<>();
List<String> errors = new ArrayList<>();
Set<String> retainedEdsResources = new HashSet<>();

Expand Down Expand Up @@ -1387,28 +1379,17 @@ protected void handleCdsResponse(String versionInfo, List<Any> resources, String
} catch (ResourceInvalidException e) {
errors.add(
"CDS response Cluster '" + clusterName + "' validation error: " + e.getMessage());
invalidResources.add(clusterName);
continue;
}
parsedResources.put(clusterName, new ParsedResource(cdsUpdate, resource));
}
getLogger().log(XdsLogLevel.INFO,
"Received CDS Response version {0} nonce {1}. Parsed resources: {2}",
versionInfo, nonce, unpackedResources);

if (!errors.isEmpty()) {
handleResourcesRejected(ResourceType.CDS, unpackedResources, versionInfo, nonce, errors);
return;
}

handleResourcesAccepted(ResourceType.CDS, parsedResources, versionInfo, nonce);
// CDS responses represents the state of the world, EDS resources not referenced in CDS
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we even need logic like this? If EDS is no longer needed then the subscriber will go away and we'll naturally stop watching it. I remember discussing this in the past, but maybe I don't remember the previous resolution.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we had a very long debate with @voidzcy one/two years ago. There was very subtle reason that convinced me to accept this approach, but I can't recall now.

I'm keeping this behavior in this PR to implement the change, but therefore I have to add code like https://github.com/grpc/grpc-java/pull/8506/files/5fe2a6bc3dd52f2687a91903732c443567c0e6b1#diff-6b019eb1d3518381d336168847890513a235ea4974fcb3c76a2e9ace43f1d460R2036-R2052 which makes ClientXdsClient so strongly coupled with LdsUpdate and CdsUpdate's deeply nested fields, or with LDS/CDS config's semantic.

// resources should be deleted.
for (String resource : edsResourceSubscribers.keySet()) {
ResourceSubscriber subscriber = edsResourceSubscribers.get(resource);
if (!retainedEdsResources.contains(resource)) {
subscriber.onAbsent();
}
}
handleResourceUpdate(
ResourceType.CDS, parsedResources, invalidResources, retainedEdsResources, versionInfo,
nonce, errors);
}

@VisibleForTesting
Expand Down Expand Up @@ -1589,6 +1570,7 @@ private static StructOrError<CdsUpdate.Builder> parseNonAggregateCluster(
protected void handleEdsResponse(String versionInfo, List<Any> resources, String nonce) {
Map<String, ParsedResource> parsedResources = new HashMap<>(resources.size());
Set<String> unpackedResources = new HashSet<>(resources.size());
Set<String> invalidResources = new HashSet<>();
List<String> errors = new ArrayList<>();

for (int i = 0; i < resources.size(); i++) {
Expand Down Expand Up @@ -1623,16 +1605,17 @@ protected void handleEdsResponse(String versionInfo, List<Any> resources, String
} catch (ResourceInvalidException e) {
errors.add("EDS response ClusterLoadAssignment '" + clusterName
+ "' validation error: " + e.getMessage());
invalidResources.add(clusterName);
continue;
}
parsedResources.put(clusterName, new ParsedResource(edsUpdate, resource));
}

if (!errors.isEmpty()) {
handleResourcesRejected(ResourceType.EDS, unpackedResources, versionInfo, nonce, errors);
} else {
handleResourcesAccepted(ResourceType.EDS, parsedResources, versionInfo, nonce);
}
getLogger().log(
XdsLogLevel.INFO, "Received EDS Response version {0} nonce {1}. Parsed resources: {2}",
versionInfo, nonce, unpackedResources);
handleResourceUpdate(
ResourceType.EDS, parsedResources, invalidResources, Collections.<String>emptySet(),
versionInfo, nonce, errors);
}

private static EdsUpdate processClusterLoadAssignment(ClusterLoadAssignment assignment)
Expand Down Expand Up @@ -2022,43 +2005,67 @@ private void cleanUpResourceTimers() {
}
}

private void handleResourcesAccepted(
ResourceType type, Map<String, ParsedResource> parsedResources, String version,
String nonce) {
ackResponse(type, version, nonce);

private void handleResourceUpdate(
ResourceType type, Map<String, ParsedResource> parsedResources, Set<String> invalidResources,
Set<String> retainedResources, String version, String nonce, List<String> errors) {
String errorDetail = null;
if (errors.isEmpty()) {
checkArgument(invalidResources.isEmpty(), "found invalid resources but missing errors");
ackResponse(type, version, nonce);
} else {
errorDetail = Joiner.on('\n').join(errors);
getLogger().log(XdsLogLevel.WARNING,
"Failed processing {0} Response version {1} nonce {2}. Errors:\n{3}",
type, version, nonce, errorDetail);
nackResponse(type, nonce, errorDetail);
}
long updateTime = timeProvider.currentTimeNanos();
for (Map.Entry<String, ResourceSubscriber> entry : getSubscribedResourcesMap(type).entrySet()) {
String resourceName = entry.getKey();
ResourceSubscriber subscriber = entry.getValue();
// Attach error details to the subscribed resources that included in the ADS update.
if (invalidResources.contains(resourceName)) {
subscriber.onRejected(version, updateTime, errorDetail);
}
// Notify the watchers.
if (parsedResources.containsKey(resourceName)) {
subscriber.onData(parsedResources.get(resourceName), version, updateTime);
} else if (type == ResourceType.LDS || type == ResourceType.CDS) {
if (subscriber.data != null && invalidResources.contains(resourceName)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems this condition should move into the invalidResources.contains(resourceName) case above and then we combine the invalidResources.contains(resourceName into the if-else chain.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure how to simplify this. The subscriber.onAbsent() below still need (the negation of) this condition. I grouped the logic regarding retainedResources and onAbsent() together because they are related.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking:

if (invalidResources.contains(resourceName)) {
  subscriber.onRejected(version, updateTime, errorDetail);
  if (subscriber.data != null) {
    if (type == ResourceType.LDS) {
      ...
      retainedResources.add(rdsName);
    } else if (type == ResourceType.CDS) {
      ...
      retainedResources.add(edsName);
    }
  }
} else if (parsedResources.containsKey(resourceName)) {
  subscriber.onData(parsedResources.get(resourceName), version, updateTime);
} else if (type == ResourceType.LDS || type == ResourceType.CDS) {
  subscriber.onAbsent();
}

But I can live with what's here.

// Update is rejected but keep using the cached data.
if (type == ResourceType.LDS) {
LdsUpdate ldsUpdate = (LdsUpdate) subscriber.data;
io.grpc.xds.HttpConnectionManager hcm = ldsUpdate.httpConnectionManager();
if (hcm != null) {
String rdsName = hcm.rdsName();
if (rdsName != null) {
retainedResources.add(rdsName);
}
}
} else {
CdsUpdate cdsUpdate = (CdsUpdate) subscriber.data;
String edsName = cdsUpdate.edsServiceName();
if (edsName == null) {
edsName = cdsUpdate.clusterName();
}
retainedResources.add(edsName);
}
continue;
}
// For State of the World services, notify watchers when their watched resource is missing
// from the ADS update.
subscriber.onAbsent();
}
}
}

private void handleResourcesRejected(
ResourceType type, Set<String> unpackedResourceNames, String version,
String nonce, List<String> errors) {
String errorDetail = Joiner.on('\n').join(errors);
getLogger().log(XdsLogLevel.WARNING,
"Failed processing {0} Response version {1} nonce {2}. Errors:\n{3}",
type, version, nonce, errorDetail);
nackResponse(type, nonce, errorDetail);

long updateTime = timeProvider.currentTimeNanos();
for (Map.Entry<String, ResourceSubscriber> entry : getSubscribedResourcesMap(type).entrySet()) {
String resourceName = entry.getKey();
ResourceSubscriber subscriber = entry.getValue();

// Attach error details to the subscribed resources that included in the ADS update.
if (unpackedResourceNames.contains(resourceName)) {
subscriber.onRejected(version, updateTime, errorDetail);
// LDS/CDS responses represents the state of the world, RDS/EDS resources not referenced in
// LDS/CDS resources should be deleted.
if (type == ResourceType.LDS || type == ResourceType.CDS) {
Map<String, ResourceSubscriber> dependentSubscribers =
type == ResourceType.LDS ? rdsResourceSubscribers : edsResourceSubscribers;
for (String resource : dependentSubscribers.keySet()) {
if (!retainedResources.contains(resource)) {
dependentSubscribers.get(resource).onAbsent();
}
}
}
}
Expand Down