New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
xds: apply valid resources while NACKing update #8506
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -190,6 +190,7 @@ final class ClientXdsClient extends AbstractXdsClient { | |
protected void handleLdsResponse(String versionInfo, List<Any> resources, String nonce) { | ||
Map<String, ParsedResource> parsedResources = new HashMap<>(resources.size()); | ||
Set<String> unpackedResources = new HashSet<>(resources.size()); | ||
Set<String> invalidResources = new HashSet<>(); | ||
List<String> errors = new ArrayList<>(); | ||
Set<String> retainedRdsResources = new HashSet<>(); | ||
|
||
|
@@ -222,6 +223,7 @@ protected void handleLdsResponse(String versionInfo, List<Any> resources, String | |
} catch (ResourceInvalidException e) { | ||
errors.add( | ||
"LDS response Listener '" + listenerName + "' validation error: " + e.getMessage()); | ||
invalidResources.add(listenerName); | ||
continue; | ||
} | ||
|
||
|
@@ -231,19 +233,9 @@ protected void handleLdsResponse(String versionInfo, List<Any> resources, String | |
getLogger().log(XdsLogLevel.INFO, | ||
"Received LDS Response version {0} nonce {1}. Parsed resources: {2}", | ||
versionInfo, nonce, unpackedResources); | ||
|
||
if (!errors.isEmpty()) { | ||
handleResourcesRejected(ResourceType.LDS, unpackedResources, versionInfo, nonce, errors); | ||
return; | ||
} | ||
|
||
handleResourcesAccepted(ResourceType.LDS, parsedResources, versionInfo, nonce); | ||
for (String resource : rdsResourceSubscribers.keySet()) { | ||
if (!retainedRdsResources.contains(resource)) { | ||
ResourceSubscriber subscriber = rdsResourceSubscribers.get(resource); | ||
subscriber.onAbsent(); | ||
} | ||
} | ||
handleResourceUpdate( | ||
ResourceType.LDS, parsedResources, invalidResources, retainedRdsResources, versionInfo, | ||
nonce, errors); | ||
} | ||
|
||
private LdsUpdate processClientSideListener( | ||
|
@@ -1287,6 +1279,7 @@ static StructOrError<ClusterWeight> parseClusterWeight( | |
protected void handleRdsResponse(String versionInfo, List<Any> resources, String nonce) { | ||
Map<String, ParsedResource> parsedResources = new HashMap<>(resources.size()); | ||
Set<String> unpackedResources = new HashSet<>(resources.size()); | ||
Set<String> invalidResources = new HashSet<>(); | ||
List<String> errors = new ArrayList<>(); | ||
|
||
for (int i = 0; i < resources.size(); i++) { | ||
|
@@ -1314,6 +1307,7 @@ protected void handleRdsResponse(String versionInfo, List<Any> resources, String | |
errors.add( | ||
"RDS response RouteConfiguration '" + routeConfigName + "' validation error: " + e | ||
.getMessage()); | ||
invalidResources.add(routeConfigName); | ||
continue; | ||
} | ||
|
||
|
@@ -1322,12 +1316,9 @@ protected void handleRdsResponse(String versionInfo, List<Any> resources, String | |
getLogger().log(XdsLogLevel.INFO, | ||
"Received RDS Response version {0} nonce {1}. Parsed resources: {2}", | ||
versionInfo, nonce, unpackedResources); | ||
|
||
if (!errors.isEmpty()) { | ||
handleResourcesRejected(ResourceType.RDS, unpackedResources, versionInfo, nonce, errors); | ||
} else { | ||
handleResourcesAccepted(ResourceType.RDS, parsedResources, versionInfo, nonce); | ||
} | ||
handleResourceUpdate( | ||
ResourceType.RDS, parsedResources, invalidResources, Collections.<String>emptySet(), | ||
versionInfo, nonce, errors); | ||
} | ||
|
||
private static RdsUpdate processRouteConfiguration( | ||
|
@@ -1351,6 +1342,7 @@ private static RdsUpdate processRouteConfiguration( | |
protected void handleCdsResponse(String versionInfo, List<Any> resources, String nonce) { | ||
Map<String, ParsedResource> parsedResources = new HashMap<>(resources.size()); | ||
Set<String> unpackedResources = new HashSet<>(resources.size()); | ||
Set<String> invalidResources = new HashSet<>(); | ||
List<String> errors = new ArrayList<>(); | ||
Set<String> retainedEdsResources = new HashSet<>(); | ||
|
||
|
@@ -1387,28 +1379,17 @@ protected void handleCdsResponse(String versionInfo, List<Any> resources, String | |
} catch (ResourceInvalidException e) { | ||
errors.add( | ||
"CDS response Cluster '" + clusterName + "' validation error: " + e.getMessage()); | ||
invalidResources.add(clusterName); | ||
continue; | ||
} | ||
parsedResources.put(clusterName, new ParsedResource(cdsUpdate, resource)); | ||
} | ||
getLogger().log(XdsLogLevel.INFO, | ||
"Received CDS Response version {0} nonce {1}. Parsed resources: {2}", | ||
versionInfo, nonce, unpackedResources); | ||
|
||
if (!errors.isEmpty()) { | ||
handleResourcesRejected(ResourceType.CDS, unpackedResources, versionInfo, nonce, errors); | ||
return; | ||
} | ||
|
||
handleResourcesAccepted(ResourceType.CDS, parsedResources, versionInfo, nonce); | ||
// CDS responses represents the state of the world, EDS resources not referenced in CDS | ||
// resources should be deleted. | ||
for (String resource : edsResourceSubscribers.keySet()) { | ||
ResourceSubscriber subscriber = edsResourceSubscribers.get(resource); | ||
if (!retainedEdsResources.contains(resource)) { | ||
subscriber.onAbsent(); | ||
} | ||
} | ||
handleResourceUpdate( | ||
ResourceType.CDS, parsedResources, invalidResources, retainedEdsResources, versionInfo, | ||
nonce, errors); | ||
} | ||
|
||
@VisibleForTesting | ||
|
@@ -1589,6 +1570,7 @@ private static StructOrError<CdsUpdate.Builder> parseNonAggregateCluster( | |
protected void handleEdsResponse(String versionInfo, List<Any> resources, String nonce) { | ||
Map<String, ParsedResource> parsedResources = new HashMap<>(resources.size()); | ||
Set<String> unpackedResources = new HashSet<>(resources.size()); | ||
Set<String> invalidResources = new HashSet<>(); | ||
List<String> errors = new ArrayList<>(); | ||
|
||
for (int i = 0; i < resources.size(); i++) { | ||
|
@@ -1623,16 +1605,17 @@ protected void handleEdsResponse(String versionInfo, List<Any> resources, String | |
} catch (ResourceInvalidException e) { | ||
errors.add("EDS response ClusterLoadAssignment '" + clusterName | ||
+ "' validation error: " + e.getMessage()); | ||
invalidResources.add(clusterName); | ||
continue; | ||
} | ||
parsedResources.put(clusterName, new ParsedResource(edsUpdate, resource)); | ||
} | ||
|
||
if (!errors.isEmpty()) { | ||
handleResourcesRejected(ResourceType.EDS, unpackedResources, versionInfo, nonce, errors); | ||
} else { | ||
handleResourcesAccepted(ResourceType.EDS, parsedResources, versionInfo, nonce); | ||
} | ||
getLogger().log( | ||
XdsLogLevel.INFO, "Received EDS Response version {0} nonce {1}. Parsed resources: {2}", | ||
versionInfo, nonce, unpackedResources); | ||
handleResourceUpdate( | ||
ResourceType.EDS, parsedResources, invalidResources, Collections.<String>emptySet(), | ||
versionInfo, nonce, errors); | ||
} | ||
|
||
private static EdsUpdate processClusterLoadAssignment(ClusterLoadAssignment assignment) | ||
|
@@ -2022,43 +2005,67 @@ private void cleanUpResourceTimers() { | |
} | ||
} | ||
|
||
private void handleResourcesAccepted( | ||
ResourceType type, Map<String, ParsedResource> parsedResources, String version, | ||
String nonce) { | ||
ackResponse(type, version, nonce); | ||
|
||
private void handleResourceUpdate( | ||
ResourceType type, Map<String, ParsedResource> parsedResources, Set<String> invalidResources, | ||
Set<String> retainedResources, String version, String nonce, List<String> errors) { | ||
String errorDetail = null; | ||
if (errors.isEmpty()) { | ||
checkArgument(invalidResources.isEmpty(), "found invalid resources but missing errors"); | ||
ackResponse(type, version, nonce); | ||
} else { | ||
errorDetail = Joiner.on('\n').join(errors); | ||
getLogger().log(XdsLogLevel.WARNING, | ||
"Failed processing {0} Response version {1} nonce {2}. Errors:\n{3}", | ||
type, version, nonce, errorDetail); | ||
nackResponse(type, nonce, errorDetail); | ||
} | ||
long updateTime = timeProvider.currentTimeNanos(); | ||
for (Map.Entry<String, ResourceSubscriber> entry : getSubscribedResourcesMap(type).entrySet()) { | ||
String resourceName = entry.getKey(); | ||
ResourceSubscriber subscriber = entry.getValue(); | ||
// Attach error details to the subscribed resources that included in the ADS update. | ||
if (invalidResources.contains(resourceName)) { | ||
subscriber.onRejected(version, updateTime, errorDetail); | ||
} | ||
// Notify the watchers. | ||
if (parsedResources.containsKey(resourceName)) { | ||
subscriber.onData(parsedResources.get(resourceName), version, updateTime); | ||
} else if (type == ResourceType.LDS || type == ResourceType.CDS) { | ||
if (subscriber.data != null && invalidResources.contains(resourceName)) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems this condition should move into the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure how to simplify this. The There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was thinking: if (invalidResources.contains(resourceName)) {
subscriber.onRejected(version, updateTime, errorDetail);
if (subscriber.data != null) {
if (type == ResourceType.LDS) {
...
retainedResources.add(rdsName);
} else if (type == ResourceType.CDS) {
...
retainedResources.add(edsName);
}
}
} else if (parsedResources.containsKey(resourceName)) {
subscriber.onData(parsedResources.get(resourceName), version, updateTime);
} else if (type == ResourceType.LDS || type == ResourceType.CDS) {
subscriber.onAbsent();
} But I can live with what's here. |
||
// Update is rejected but keep using the cached data. | ||
if (type == ResourceType.LDS) { | ||
LdsUpdate ldsUpdate = (LdsUpdate) subscriber.data; | ||
io.grpc.xds.HttpConnectionManager hcm = ldsUpdate.httpConnectionManager(); | ||
if (hcm != null) { | ||
String rdsName = hcm.rdsName(); | ||
if (rdsName != null) { | ||
retainedResources.add(rdsName); | ||
} | ||
} | ||
} else { | ||
CdsUpdate cdsUpdate = (CdsUpdate) subscriber.data; | ||
String edsName = cdsUpdate.edsServiceName(); | ||
if (edsName == null) { | ||
edsName = cdsUpdate.clusterName(); | ||
} | ||
retainedResources.add(edsName); | ||
} | ||
continue; | ||
} | ||
// For State of the World services, notify watchers when their watched resource is missing | ||
// from the ADS update. | ||
subscriber.onAbsent(); | ||
} | ||
} | ||
} | ||
|
||
private void handleResourcesRejected( | ||
ResourceType type, Set<String> unpackedResourceNames, String version, | ||
String nonce, List<String> errors) { | ||
String errorDetail = Joiner.on('\n').join(errors); | ||
getLogger().log(XdsLogLevel.WARNING, | ||
"Failed processing {0} Response version {1} nonce {2}. Errors:\n{3}", | ||
type, version, nonce, errorDetail); | ||
nackResponse(type, nonce, errorDetail); | ||
|
||
long updateTime = timeProvider.currentTimeNanos(); | ||
for (Map.Entry<String, ResourceSubscriber> entry : getSubscribedResourcesMap(type).entrySet()) { | ||
String resourceName = entry.getKey(); | ||
ResourceSubscriber subscriber = entry.getValue(); | ||
|
||
// Attach error details to the subscribed resources that included in the ADS update. | ||
if (unpackedResourceNames.contains(resourceName)) { | ||
subscriber.onRejected(version, updateTime, errorDetail); | ||
// LDS/CDS responses represents the state of the world, RDS/EDS resources not referenced in | ||
// LDS/CDS resources should be deleted. | ||
if (type == ResourceType.LDS || type == ResourceType.CDS) { | ||
Map<String, ResourceSubscriber> dependentSubscribers = | ||
type == ResourceType.LDS ? rdsResourceSubscribers : edsResourceSubscribers; | ||
for (String resource : dependentSubscribers.keySet()) { | ||
if (!retainedResources.contains(resource)) { | ||
dependentSubscribers.get(resource).onAbsent(); | ||
} | ||
} | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we even need logic like this? If EDS is no longer needed then the subscriber will go away and we'll naturally stop watching it. I remember discussing this in the past, but maybe I don't remember the previous resolution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, we had a very long debate with @voidzcy one/two years ago. There was very subtle reason that convinced me to accept this approach, but I can't recall now.
I'm keeping this behavior in this PR to implement the change, but therefore I have to add code like https://github.com/grpc/grpc-java/pull/8506/files/5fe2a6bc3dd52f2687a91903732c443567c0e6b1#diff-6b019eb1d3518381d336168847890513a235ea4974fcb3c76a2e9ace43f1d460R2036-R2052 which makes
ClientXdsClient
so strongly coupled withLdsUpdate
andCdsUpdate
's deeply nested fields, or with LDS/CDS config's semantic.