Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds: implement ignore_resource_deletion server feature #9339

Merged
merged 8 commits into from Jul 8, 2022
100 changes: 70 additions & 30 deletions xds/src/main/java/io/grpc/xds/ClientXdsClient.java
Expand Up @@ -2156,8 +2156,7 @@ public void run() {
ResourceSubscriber subscriber = ldsResourceSubscribers.get(resourceName);
subscriber.removeWatcher(watcher);
if (!subscriber.isWatched()) {
subscriber.stopTimer();
logger.log(XdsLogLevel.INFO, "Unsubscribe LDS resource {0}", resourceName);
subscriber.cancelResourceWatch();
ldsResourceSubscribers.remove(resourceName);
if (subscriber.xdsChannel != null) {
subscriber.xdsChannel.adjustResourceSubscription(ResourceType.LDS);
Expand Down Expand Up @@ -2194,8 +2193,7 @@ public void run() {
ResourceSubscriber subscriber = rdsResourceSubscribers.get(resourceName);
subscriber.removeWatcher(watcher);
if (!subscriber.isWatched()) {
subscriber.stopTimer();
logger.log(XdsLogLevel.INFO, "Unsubscribe RDS resource {0}", resourceName);
subscriber.cancelResourceWatch();
rdsResourceSubscribers.remove(resourceName);
if (subscriber.xdsChannel != null) {
subscriber.xdsChannel.adjustResourceSubscription(ResourceType.RDS);
Expand Down Expand Up @@ -2232,8 +2230,7 @@ public void run() {
ResourceSubscriber subscriber = cdsResourceSubscribers.get(resourceName);
subscriber.removeWatcher(watcher);
if (!subscriber.isWatched()) {
subscriber.stopTimer();
logger.log(XdsLogLevel.INFO, "Unsubscribe CDS resource {0}", resourceName);
subscriber.cancelResourceWatch();
cdsResourceSubscribers.remove(resourceName);
if (subscriber.xdsChannel != null) {
subscriber.xdsChannel.adjustResourceSubscription(ResourceType.CDS);
Expand Down Expand Up @@ -2270,8 +2267,7 @@ public void run() {
ResourceSubscriber subscriber = edsResourceSubscribers.get(resourceName);
subscriber.removeWatcher(watcher);
if (!subscriber.isWatched()) {
subscriber.stopTimer();
logger.log(XdsLogLevel.INFO, "Unsubscribe EDS resource {0}", resourceName);
subscriber.cancelResourceWatch();
edsResourceSubscribers.remove(resourceName);
if (subscriber.xdsChannel != null) {
subscriber.xdsChannel.adjustResourceSubscription(ResourceType.EDS);
Expand Down Expand Up @@ -2320,7 +2316,7 @@ public void run() {
Bootstrapper.BootstrapInfo getBootstrapInfo() {
return bootstrapInfo;
}

@Override
public String toString() {
return logId.toString();
Expand Down Expand Up @@ -2370,29 +2366,17 @@ private void handleResourceUpdate(
} else if (type == ResourceType.LDS || type == ResourceType.CDS) {
if (subscriber.data != null && invalidResources.contains(resourceName)) {
// Update is rejected but keep using the cached data.
if (type == ResourceType.LDS) {
LdsUpdate ldsUpdate = (LdsUpdate) subscriber.data;
io.grpc.xds.HttpConnectionManager hcm = ldsUpdate.httpConnectionManager();
if (hcm != null) {
String rdsName = hcm.rdsName();
if (rdsName != null) {
retainedResources.add(rdsName);
}
}
} else {
CdsUpdate cdsUpdate = (CdsUpdate) subscriber.data;
String edsName = cdsUpdate.edsServiceName();
if (edsName == null) {
edsName = cdsUpdate.clusterName();
}
retainedResources.add(edsName);
}
retainDependentResource(subscriber, retainedResources);
} else if (invalidResources.contains(resourceName)) {
subscriber.onError(Status.UNAVAILABLE.withDescription(errorDetail));
} else {
// For State of the World services, notify watchers when their watched resource is missing
// from the ADS update.
subscriber.onAbsent();
// Retain any dependent resources if the resource deletion is ignored per server setting.
if (!subscriber.absent) {
retainDependentResource(subscriber, retainedResources);
}
}
}
}
Expand All @@ -2409,6 +2393,28 @@ private void handleResourceUpdate(
}
}

private void retainDependentResource(
ResourceSubscriber subscriber, Set<String> retainedResources) {
if (subscriber.data == null) {
return;
}
String resourceName = null;
if (subscriber.type == ResourceType.LDS) {
LdsUpdate ldsUpdate = (LdsUpdate) subscriber.data;
io.grpc.xds.HttpConnectionManager hcm = ldsUpdate.httpConnectionManager();
if (hcm != null) {
resourceName = hcm.rdsName();
}
} else if (subscriber.type == ResourceType.CDS) {
CdsUpdate cdsUpdate = (CdsUpdate) subscriber.data;
resourceName = cdsUpdate.edsServiceName();
}

if (resourceName != null) {
retainedResources.add(resourceName);
}
}

private static final class ParsedResource {
private final ResourceUpdate resourceUpdate;
private final Any rawResource;
Expand All @@ -2431,15 +2437,18 @@ private Any getRawResource() {
* Tracks a single subscribed resource.
*/
private final class ResourceSubscriber {
private final ServerInfo serverInfo;
@Nullable private final ServerInfo serverInfo;
@Nullable private final AbstractXdsClient xdsChannel;
private final ResourceType type;
private final String resource;
private final Set<ResourceWatcher> watchers = new HashSet<>();
private ResourceUpdate data;
@Nullable private ResourceUpdate data;
private boolean absent;
private ScheduledHandle respTimer;
private ResourceMetadata metadata;
// Tracks whether the deletion has been ignored per server request.
sergiitk marked this conversation as resolved.
Show resolved Hide resolved
// See https://github.com/grpc/proposal/blob/master/A53-xds-ignore-resource-deletion.md
private boolean resourceDeletionIgnored;
@Nullable private ScheduledHandle respTimer;
@Nullable private ResourceMetadata metadata;
@Nullable private String errorDescription;

ResourceSubscriber(ResourceType type, String resource) {
Expand Down Expand Up @@ -2533,6 +2542,19 @@ void stopTimer() {
}
}

void cancelResourceWatch() {
if (isWatched()) {
throw new IllegalStateException("Can't cancel resource watch with active watchers present");
}
stopTimer();
String message = "Unsubscribing {0} resource {1} from server {2}";
if (resourceDeletionIgnored) {
message += " for which we previously ignored a deletion";
}
logger.log(XdsLogLevel.INFO, message, type, resource,
serverInfo != null ? serverInfo.target() : "unknown");
}

boolean isWatched() {
return !watchers.isEmpty();
}
Expand All @@ -2547,6 +2569,12 @@ void onData(ParsedResource parsedResource, String version, long updateTime) {
ResourceUpdate oldData = this.data;
this.data = parsedResource.getResourceUpdate();
absent = false;
if (resourceDeletionIgnored) {
logger.log(XdsLogLevel.INFO, "xds server {0}: server returned new version of resource "
+ "for which we previously ignored a deletion: type {1} name {2}",
serverInfo != null ? serverInfo.target() : "unknown", type, resource);
resourceDeletionIgnored = false;
}
if (!Objects.equals(oldData, data)) {
for (ResourceWatcher watcher : watchers) {
notifyWatcher(watcher, data);
Expand All @@ -2558,6 +2586,18 @@ void onAbsent() {
if (respTimer != null && respTimer.isPending()) { // too early to conclude absence
return;
}

// Ignore deletion when the server instructs to, and the resource is reusable.
if (data != null && serverInfo != null && serverInfo.ignoreResourceDeletion()) {
if (!resourceDeletionIgnored) {
logger.log(XdsLogLevel.WARNING,
sergiitk marked this conversation as resolved.
Show resolved Hide resolved
"xds server {0}: ignoring deletion for resource type {1} name {2}}",
serverInfo.target(), type, resource);
resourceDeletionIgnored = true;
}
return;
}

logger.log(XdsLogLevel.INFO, "Conclude {0} resource {1} not exist", type, resource);
if (!absent) {
data = null;
Expand Down