Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds: apply valid resources while NACKing update [backport v1.41.x] #8519

Merged
merged 1 commit into from Sep 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
135 changes: 71 additions & 64 deletions xds/src/main/java/io/grpc/xds/ClientXdsClient.java
Expand Up @@ -190,6 +190,7 @@ final class ClientXdsClient extends AbstractXdsClient {
protected void handleLdsResponse(String versionInfo, List<Any> resources, String nonce) {
Map<String, ParsedResource> parsedResources = new HashMap<>(resources.size());
Set<String> unpackedResources = new HashSet<>(resources.size());
Set<String> invalidResources = new HashSet<>();
List<String> errors = new ArrayList<>();
Set<String> retainedRdsResources = new HashSet<>();

Expand Down Expand Up @@ -222,6 +223,7 @@ protected void handleLdsResponse(String versionInfo, List<Any> resources, String
} catch (ResourceInvalidException e) {
errors.add(
"LDS response Listener '" + listenerName + "' validation error: " + e.getMessage());
invalidResources.add(listenerName);
continue;
}

Expand All @@ -231,19 +233,9 @@ protected void handleLdsResponse(String versionInfo, List<Any> resources, String
getLogger().log(XdsLogLevel.INFO,
"Received LDS Response version {0} nonce {1}. Parsed resources: {2}",
versionInfo, nonce, unpackedResources);

if (!errors.isEmpty()) {
handleResourcesRejected(ResourceType.LDS, unpackedResources, versionInfo, nonce, errors);
return;
}

handleResourcesAccepted(ResourceType.LDS, parsedResources, versionInfo, nonce);
for (String resource : rdsResourceSubscribers.keySet()) {
if (!retainedRdsResources.contains(resource)) {
ResourceSubscriber subscriber = rdsResourceSubscribers.get(resource);
subscriber.onAbsent();
}
}
handleResourceUpdate(
ResourceType.LDS, parsedResources, invalidResources, retainedRdsResources, versionInfo,
nonce, errors);
}

private LdsUpdate processClientSideListener(
Expand Down Expand Up @@ -1310,6 +1302,7 @@ static StructOrError<ClusterWeight> parseClusterWeight(
protected void handleRdsResponse(String versionInfo, List<Any> resources, String nonce) {
Map<String, ParsedResource> parsedResources = new HashMap<>(resources.size());
Set<String> unpackedResources = new HashSet<>(resources.size());
Set<String> invalidResources = new HashSet<>();
List<String> errors = new ArrayList<>();

for (int i = 0; i < resources.size(); i++) {
Expand Down Expand Up @@ -1337,6 +1330,7 @@ protected void handleRdsResponse(String versionInfo, List<Any> resources, String
errors.add(
"RDS response RouteConfiguration '" + routeConfigName + "' validation error: " + e
.getMessage());
invalidResources.add(routeConfigName);
continue;
}

Expand All @@ -1345,12 +1339,9 @@ protected void handleRdsResponse(String versionInfo, List<Any> resources, String
getLogger().log(XdsLogLevel.INFO,
"Received RDS Response version {0} nonce {1}. Parsed resources: {2}",
versionInfo, nonce, unpackedResources);

if (!errors.isEmpty()) {
handleResourcesRejected(ResourceType.RDS, unpackedResources, versionInfo, nonce, errors);
} else {
handleResourcesAccepted(ResourceType.RDS, parsedResources, versionInfo, nonce);
}
handleResourceUpdate(
ResourceType.RDS, parsedResources, invalidResources, Collections.<String>emptySet(),
versionInfo, nonce, errors);
}

private static RdsUpdate processRouteConfiguration(
Expand All @@ -1374,6 +1365,7 @@ private static RdsUpdate processRouteConfiguration(
protected void handleCdsResponse(String versionInfo, List<Any> resources, String nonce) {
Map<String, ParsedResource> parsedResources = new HashMap<>(resources.size());
Set<String> unpackedResources = new HashSet<>(resources.size());
Set<String> invalidResources = new HashSet<>();
List<String> errors = new ArrayList<>();
Set<String> retainedEdsResources = new HashSet<>();

Expand Down Expand Up @@ -1410,28 +1402,17 @@ protected void handleCdsResponse(String versionInfo, List<Any> resources, String
} catch (ResourceInvalidException e) {
errors.add(
"CDS response Cluster '" + clusterName + "' validation error: " + e.getMessage());
invalidResources.add(clusterName);
continue;
}
parsedResources.put(clusterName, new ParsedResource(cdsUpdate, resource));
}
getLogger().log(XdsLogLevel.INFO,
"Received CDS Response version {0} nonce {1}. Parsed resources: {2}",
versionInfo, nonce, unpackedResources);

if (!errors.isEmpty()) {
handleResourcesRejected(ResourceType.CDS, unpackedResources, versionInfo, nonce, errors);
return;
}

handleResourcesAccepted(ResourceType.CDS, parsedResources, versionInfo, nonce);
// CDS responses represents the state of the world, EDS resources not referenced in CDS
// resources should be deleted.
for (String resource : edsResourceSubscribers.keySet()) {
ResourceSubscriber subscriber = edsResourceSubscribers.get(resource);
if (!retainedEdsResources.contains(resource)) {
subscriber.onAbsent();
}
}
handleResourceUpdate(
ResourceType.CDS, parsedResources, invalidResources, retainedEdsResources, versionInfo,
nonce, errors);
}

@VisibleForTesting
Expand Down Expand Up @@ -1612,6 +1593,7 @@ private static StructOrError<CdsUpdate.Builder> parseNonAggregateCluster(
protected void handleEdsResponse(String versionInfo, List<Any> resources, String nonce) {
Map<String, ParsedResource> parsedResources = new HashMap<>(resources.size());
Set<String> unpackedResources = new HashSet<>(resources.size());
Set<String> invalidResources = new HashSet<>();
List<String> errors = new ArrayList<>();

for (int i = 0; i < resources.size(); i++) {
Expand Down Expand Up @@ -1646,16 +1628,17 @@ protected void handleEdsResponse(String versionInfo, List<Any> resources, String
} catch (ResourceInvalidException e) {
errors.add("EDS response ClusterLoadAssignment '" + clusterName
+ "' validation error: " + e.getMessage());
invalidResources.add(clusterName);
continue;
}
parsedResources.put(clusterName, new ParsedResource(edsUpdate, resource));
}

if (!errors.isEmpty()) {
handleResourcesRejected(ResourceType.EDS, unpackedResources, versionInfo, nonce, errors);
} else {
handleResourcesAccepted(ResourceType.EDS, parsedResources, versionInfo, nonce);
}
getLogger().log(
XdsLogLevel.INFO, "Received EDS Response version {0} nonce {1}. Parsed resources: {2}",
versionInfo, nonce, unpackedResources);
handleResourceUpdate(
ResourceType.EDS, parsedResources, invalidResources, Collections.<String>emptySet(),
versionInfo, nonce, errors);
}

private static EdsUpdate processClusterLoadAssignment(ClusterLoadAssignment assignment)
Expand Down Expand Up @@ -2045,43 +2028,67 @@ private void cleanUpResourceTimers() {
}
}

private void handleResourcesAccepted(
ResourceType type, Map<String, ParsedResource> parsedResources, String version,
String nonce) {
ackResponse(type, version, nonce);

private void handleResourceUpdate(
ResourceType type, Map<String, ParsedResource> parsedResources, Set<String> invalidResources,
Set<String> retainedResources, String version, String nonce, List<String> errors) {
String errorDetail = null;
if (errors.isEmpty()) {
checkArgument(invalidResources.isEmpty(), "found invalid resources but missing errors");
ackResponse(type, version, nonce);
} else {
errorDetail = Joiner.on('\n').join(errors);
getLogger().log(XdsLogLevel.WARNING,
"Failed processing {0} Response version {1} nonce {2}. Errors:\n{3}",
type, version, nonce, errorDetail);
nackResponse(type, nonce, errorDetail);
}
long updateTime = timeProvider.currentTimeNanos();
for (Map.Entry<String, ResourceSubscriber> entry : getSubscribedResourcesMap(type).entrySet()) {
String resourceName = entry.getKey();
ResourceSubscriber subscriber = entry.getValue();
// Attach error details to the subscribed resources that included in the ADS update.
if (invalidResources.contains(resourceName)) {
subscriber.onRejected(version, updateTime, errorDetail);
}
// Notify the watchers.
if (parsedResources.containsKey(resourceName)) {
subscriber.onData(parsedResources.get(resourceName), version, updateTime);
} else if (type == ResourceType.LDS || type == ResourceType.CDS) {
if (subscriber.data != null && invalidResources.contains(resourceName)) {
// Update is rejected but keep using the cached data.
if (type == ResourceType.LDS) {
LdsUpdate ldsUpdate = (LdsUpdate) subscriber.data;
io.grpc.xds.HttpConnectionManager hcm = ldsUpdate.httpConnectionManager();
if (hcm != null) {
String rdsName = hcm.rdsName();
if (rdsName != null) {
retainedResources.add(rdsName);
}
}
} else {
CdsUpdate cdsUpdate = (CdsUpdate) subscriber.data;
String edsName = cdsUpdate.edsServiceName();
if (edsName == null) {
edsName = cdsUpdate.clusterName();
}
retainedResources.add(edsName);
}
continue;
}
// For State of the World services, notify watchers when their watched resource is missing
// from the ADS update.
subscriber.onAbsent();
}
}
}

private void handleResourcesRejected(
ResourceType type, Set<String> unpackedResourceNames, String version,
String nonce, List<String> errors) {
String errorDetail = Joiner.on('\n').join(errors);
getLogger().log(XdsLogLevel.WARNING,
"Failed processing {0} Response version {1} nonce {2}. Errors:\n{3}",
type, version, nonce, errorDetail);
nackResponse(type, nonce, errorDetail);

long updateTime = timeProvider.currentTimeNanos();
for (Map.Entry<String, ResourceSubscriber> entry : getSubscribedResourcesMap(type).entrySet()) {
String resourceName = entry.getKey();
ResourceSubscriber subscriber = entry.getValue();

// Attach error details to the subscribed resources that included in the ADS update.
if (unpackedResourceNames.contains(resourceName)) {
subscriber.onRejected(version, updateTime, errorDetail);
// LDS/CDS responses represents the state of the world, RDS/EDS resources not referenced in
// LDS/CDS resources should be deleted.
if (type == ResourceType.LDS || type == ResourceType.CDS) {
Map<String, ResourceSubscriber> dependentSubscribers =
type == ResourceType.LDS ? rdsResourceSubscribers : edsResourceSubscribers;
for (String resource : dependentSubscribers.keySet()) {
if (!retainedResources.contains(resource)) {
dependentSubscribers.get(resource).onAbsent();
}
}
}
}
Expand Down