Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds: implement ignore_resource_deletion server feature #9339

Merged
merged 8 commits into from Jul 8, 2022
13 changes: 12 additions & 1 deletion xds/src/main/java/io/grpc/xds/Bootstrapper.java
Expand Up @@ -62,10 +62,21 @@ abstract static class ServerInfo {

abstract boolean useProtocolV3();

abstract boolean ignoreResourceDeletion();

@VisibleForTesting
static ServerInfo create(
String target, ChannelCredentials channelCredentials, boolean useProtocolV3) {
return new AutoValue_Bootstrapper_ServerInfo(target, channelCredentials, useProtocolV3);
return new AutoValue_Bootstrapper_ServerInfo(target, channelCredentials, useProtocolV3,
false);
}

@VisibleForTesting
static ServerInfo create(
String target, ChannelCredentials channelCredentials, boolean useProtocolV3,
boolean ignoreResourceDeletion) {
return new AutoValue_Bootstrapper_ServerInfo(target, channelCredentials, useProtocolV3,
ignoreResourceDeletion);
}
}

Expand Down
22 changes: 16 additions & 6 deletions xds/src/main/java/io/grpc/xds/BootstrapperImpl.java
Expand Up @@ -54,15 +54,22 @@ class BootstrapperImpl extends Bootstrapper {
private static final String BOOTSTRAP_CONFIG_SYS_PROPERTY = "io.grpc.xds.bootstrapConfig";
@VisibleForTesting
static String bootstrapConfigFromSysProp = System.getProperty(BOOTSTRAP_CONFIG_SYS_PROPERTY);
private static final String XDS_V3_SERVER_FEATURE = "xds_v3";

// Feature-gating environment variables.
static boolean enableFederation =
!Strings.isNullOrEmpty(System.getenv("GRPC_EXPERIMENTAL_XDS_FEDERATION"))
&& Boolean.parseBoolean(System.getenv("GRPC_EXPERIMENTAL_XDS_FEDERATION"));

// Client features.
@VisibleForTesting
static final String CLIENT_FEATURE_DISABLE_OVERPROVISIONING =
"envoy.lb.does_not_support_overprovisioning";
@VisibleForTesting
static final String CLIENT_FEATURE_RESOURCE_IN_SOTW = "xds.config.resource-in-sotw";
static boolean enableFederation =
!Strings.isNullOrEmpty(System.getenv("GRPC_EXPERIMENTAL_XDS_FEDERATION"))
&& Boolean.parseBoolean(System.getenv("GRPC_EXPERIMENTAL_XDS_FEDERATION"));

// Server features.
private static final String SERVER_FEATURE_XDS_V3 = "xds_v3";
private static final String SERVER_FEATURE_IGNORE_RESOURCE_DELETION = "ignore_resource_deletion";

private final XdsLogger logger;
private FileReader reader = LocalFileReader.INSTANCE;
Expand Down Expand Up @@ -275,12 +282,15 @@ private static List<ServerInfo> parseServerInfos(List<?> rawServerConfigs, XdsLo
}

boolean useProtocolV3 = false;
boolean ignoreResourceDeletion = false;
List<String> serverFeatures = JsonUtil.getListOfStrings(serverConfig, "server_features");
if (serverFeatures != null) {
logger.log(XdsLogLevel.INFO, "Server features: {0}", serverFeatures);
useProtocolV3 = serverFeatures.contains(XDS_V3_SERVER_FEATURE);
useProtocolV3 = serverFeatures.contains(SERVER_FEATURE_XDS_V3);
ignoreResourceDeletion = serverFeatures.contains(SERVER_FEATURE_IGNORE_RESOURCE_DELETION);
}
servers.add(ServerInfo.create(serverUri, channelCredentials, useProtocolV3));
servers.add(
ServerInfo.create(serverUri, channelCredentials, useProtocolV3, ignoreResourceDeletion));
}
return servers.build();
}
Expand Down
107 changes: 77 additions & 30 deletions xds/src/main/java/io/grpc/xds/ClientXdsClient.java
Expand Up @@ -2156,8 +2156,7 @@ public void run() {
ResourceSubscriber subscriber = ldsResourceSubscribers.get(resourceName);
subscriber.removeWatcher(watcher);
if (!subscriber.isWatched()) {
subscriber.stopTimer();
logger.log(XdsLogLevel.INFO, "Unsubscribe LDS resource {0}", resourceName);
subscriber.cancelResourceWatch();
ldsResourceSubscribers.remove(resourceName);
if (subscriber.xdsChannel != null) {
subscriber.xdsChannel.adjustResourceSubscription(ResourceType.LDS);
Expand Down Expand Up @@ -2194,8 +2193,7 @@ public void run() {
ResourceSubscriber subscriber = rdsResourceSubscribers.get(resourceName);
subscriber.removeWatcher(watcher);
if (!subscriber.isWatched()) {
subscriber.stopTimer();
logger.log(XdsLogLevel.INFO, "Unsubscribe RDS resource {0}", resourceName);
subscriber.cancelResourceWatch();
rdsResourceSubscribers.remove(resourceName);
if (subscriber.xdsChannel != null) {
subscriber.xdsChannel.adjustResourceSubscription(ResourceType.RDS);
Expand Down Expand Up @@ -2232,8 +2230,7 @@ public void run() {
ResourceSubscriber subscriber = cdsResourceSubscribers.get(resourceName);
subscriber.removeWatcher(watcher);
if (!subscriber.isWatched()) {
subscriber.stopTimer();
logger.log(XdsLogLevel.INFO, "Unsubscribe CDS resource {0}", resourceName);
subscriber.cancelResourceWatch();
cdsResourceSubscribers.remove(resourceName);
if (subscriber.xdsChannel != null) {
subscriber.xdsChannel.adjustResourceSubscription(ResourceType.CDS);
Expand Down Expand Up @@ -2270,8 +2267,7 @@ public void run() {
ResourceSubscriber subscriber = edsResourceSubscribers.get(resourceName);
subscriber.removeWatcher(watcher);
if (!subscriber.isWatched()) {
subscriber.stopTimer();
logger.log(XdsLogLevel.INFO, "Unsubscribe EDS resource {0}", resourceName);
subscriber.cancelResourceWatch();
edsResourceSubscribers.remove(resourceName);
if (subscriber.xdsChannel != null) {
subscriber.xdsChannel.adjustResourceSubscription(ResourceType.EDS);
Expand Down Expand Up @@ -2320,7 +2316,7 @@ public void run() {
Bootstrapper.BootstrapInfo getBootstrapInfo() {
return bootstrapInfo;
}

@Override
public String toString() {
return logId.toString();
Expand Down Expand Up @@ -2370,29 +2366,18 @@ private void handleResourceUpdate(
} else if (type == ResourceType.LDS || type == ResourceType.CDS) {
if (subscriber.data != null && invalidResources.contains(resourceName)) {
// Update is rejected but keep using the cached data.
if (type == ResourceType.LDS) {
LdsUpdate ldsUpdate = (LdsUpdate) subscriber.data;
io.grpc.xds.HttpConnectionManager hcm = ldsUpdate.httpConnectionManager();
if (hcm != null) {
String rdsName = hcm.rdsName();
if (rdsName != null) {
retainedResources.add(rdsName);
}
}
} else {
CdsUpdate cdsUpdate = (CdsUpdate) subscriber.data;
String edsName = cdsUpdate.edsServiceName();
if (edsName == null) {
edsName = cdsUpdate.clusterName();
}
retainedResources.add(edsName);
}
retainDependentResource(subscriber, retainedResources);
} else if (invalidResources.contains(resourceName)) {
subscriber.onError(Status.UNAVAILABLE.withDescription(errorDetail));
} else {
// For State of the World services, notify watchers when their watched resource is missing
// from the ADS update.
subscriber.onAbsent();
// Retain any dependent resources if the resource deletion is ignored
// per bootstrap ignore_resource_deletion server feature.
if (!subscriber.absent) {
retainDependentResource(subscriber, retainedResources);
}
}
}
}
Expand All @@ -2409,6 +2394,28 @@ private void handleResourceUpdate(
}
}

private void retainDependentResource(
ResourceSubscriber subscriber, Set<String> retainedResources) {
if (subscriber.data == null) {
return;
}
String resourceName = null;
if (subscriber.type == ResourceType.LDS) {
LdsUpdate ldsUpdate = (LdsUpdate) subscriber.data;
io.grpc.xds.HttpConnectionManager hcm = ldsUpdate.httpConnectionManager();
if (hcm != null) {
resourceName = hcm.rdsName();
}
} else if (subscriber.type == ResourceType.CDS) {
CdsUpdate cdsUpdate = (CdsUpdate) subscriber.data;
resourceName = cdsUpdate.edsServiceName();
}

if (resourceName != null) {
retainedResources.add(resourceName);
}
}

private static final class ParsedResource {
private final ResourceUpdate resourceUpdate;
private final Any rawResource;
Expand All @@ -2431,15 +2438,18 @@ private Any getRawResource() {
* Tracks a single subscribed resource.
*/
private final class ResourceSubscriber {
private final ServerInfo serverInfo;
@Nullable private final ServerInfo serverInfo;
@Nullable private final AbstractXdsClient xdsChannel;
private final ResourceType type;
private final String resource;
private final Set<ResourceWatcher> watchers = new HashSet<>();
private ResourceUpdate data;
@Nullable private ResourceUpdate data;
private boolean absent;
private ScheduledHandle respTimer;
private ResourceMetadata metadata;
// Tracks whether the deletion has been ignored per bootstrap server feature.
// See https://github.com/grpc/proposal/blob/master/A53-xds-ignore-resource-deletion.md
private boolean resourceDeletionIgnored;
@Nullable private ScheduledHandle respTimer;
@Nullable private ResourceMetadata metadata;
@Nullable private String errorDescription;

ResourceSubscriber(ResourceType type, String resource) {
Expand Down Expand Up @@ -2533,6 +2543,21 @@ void stopTimer() {
}
}

void cancelResourceWatch() {
if (isWatched()) {
throw new IllegalStateException("Can't cancel resource watch with active watchers present");
}
stopTimer();
String message = "Unsubscribing {0} resource {1} from server {2}";
XdsLogLevel logLevel = XdsLogLevel.INFO;
if (resourceDeletionIgnored) {
message += " for which we previously ignored a deletion";
logLevel = XdsLogLevel.FORCE_INFO;
}
logger.log(logLevel, message, type, resource,
serverInfo != null ? serverInfo.target() : "unknown");
}

boolean isWatched() {
return !watchers.isEmpty();
}
Expand All @@ -2547,6 +2572,12 @@ void onData(ParsedResource parsedResource, String version, long updateTime) {
ResourceUpdate oldData = this.data;
this.data = parsedResource.getResourceUpdate();
absent = false;
if (resourceDeletionIgnored) {
logger.log(XdsLogLevel.FORCE_INFO, "xds server {0}: server returned new version "
+ "of resource for which we previously ignored a deletion: type {1} name {2}",
serverInfo != null ? serverInfo.target() : "unknown", type, resource);
resourceDeletionIgnored = false;
}
if (!Objects.equals(oldData, data)) {
for (ResourceWatcher watcher : watchers) {
notifyWatcher(watcher, data);
Expand All @@ -2558,6 +2589,22 @@ void onAbsent() {
if (respTimer != null && respTimer.isPending()) { // too early to conclude absence
return;
}

// Ignore deletion of State of the World resources when this feature is on,
// and the resource is reusable.
boolean ignoreResourceDeletionEnabled =
serverInfo != null && serverInfo.ignoreResourceDeletion();
boolean isStateOfTheWorld = (type == ResourceType.LDS || type == ResourceType.CDS);
if (ignoreResourceDeletionEnabled && isStateOfTheWorld && data != null) {
if (!resourceDeletionIgnored) {
logger.log(XdsLogLevel.FORCE_WARNING,
"xds server {0}: ignoring deletion for resource type {1} name {2}}",
serverInfo.target(), type, resource);
resourceDeletionIgnored = true;
}
return;
}

logger.log(XdsLogLevel.INFO, "Conclude {0} resource {1} not exist", type, resource);
if (!absent) {
data = null;
Expand Down
15 changes: 14 additions & 1 deletion xds/src/main/java/io/grpc/xds/XdsLogger.java
Expand Up @@ -81,6 +81,10 @@ private static Level toJavaLogLevel(XdsLogLevel level) {
return Level.FINE;
case INFO:
return Level.FINER;
case FORCE_INFO:
return Level.INFO;
case FORCE_WARNING:
return Level.WARNING;
default:
return Level.FINEST;
}
Expand All @@ -89,6 +93,11 @@ private static Level toJavaLogLevel(XdsLogLevel level) {
/**
* Log levels. See the table below for the mapping from the XdsLogger levels to
* Java logger levels.
*
* <p><b>NOTE:</b>
* Please use {@code FORCE_} levels with care, only when the message is expected to be
* surfaced to the library user. Normally libraries should minimize the usage
* of highly visible logs.
* <pre>
* +---------------------+-------------------+
* | XdsLogger Level | Java Logger Level |
Expand All @@ -97,13 +106,17 @@ private static Level toJavaLogLevel(XdsLogLevel level) {
* | INFO | FINER |
* | WARNING | FINE |
* | ERROR | FINE |
* | FORCE_INFO | INFO |
* | FORCE_WARNING | WARNING |
* +---------------------+-------------------+
* </pre>
*/
enum XdsLogLevel {
DEBUG,
INFO,
WARNING,
ERROR
ERROR,
FORCE_INFO,
FORCE_WARNING,
}
}
52 changes: 52 additions & 0 deletions xds/src/test/java/io/grpc/xds/BootstrapperImplTest.java
Expand Up @@ -578,6 +578,8 @@ public void useV2ProtocolByDefault() throws XdsInitializationException {
ServerInfo serverInfo = Iterables.getOnlyElement(info.servers());
assertThat(serverInfo.target()).isEqualTo(SERVER_URI);
assertThat(serverInfo.channelCredentials()).isInstanceOf(InsecureChannelCredentials.class);
assertThat(serverInfo.ignoreResourceDeletion()).isFalse();
// xds v2: xds v3 disabled
assertThat(serverInfo.useProtocolV3()).isFalse();
}

Expand All @@ -600,9 +602,59 @@ public void useV3ProtocolIfV3FeaturePresent() throws XdsInitializationException
ServerInfo serverInfo = Iterables.getOnlyElement(info.servers());
assertThat(serverInfo.target()).isEqualTo(SERVER_URI);
assertThat(serverInfo.channelCredentials()).isInstanceOf(InsecureChannelCredentials.class);
assertThat(serverInfo.ignoreResourceDeletion()).isFalse();
// xds_v3 enabled
assertThat(serverInfo.useProtocolV3()).isTrue();
}

@Test
public void serverFeatureIgnoreResourceDeletion() throws XdsInitializationException {
String rawData = "{\n"
+ " \"xds_servers\": [\n"
+ " {\n"
+ " \"server_uri\": \"" + SERVER_URI + "\",\n"
+ " \"channel_creds\": [\n"
+ " {\"type\": \"insecure\"}\n"
+ " ],\n"
+ " \"server_features\": [\"ignore_resource_deletion\"]\n"
+ " }\n"
+ " ]\n"
+ "}";

bootstrapper.setFileReader(createFileReader(BOOTSTRAP_FILE_PATH, rawData));
BootstrapInfo info = bootstrapper.bootstrap();
ServerInfo serverInfo = Iterables.getOnlyElement(info.servers());
assertThat(serverInfo.target()).isEqualTo(SERVER_URI);
assertThat(serverInfo.channelCredentials()).isInstanceOf(InsecureChannelCredentials.class);
// Only ignore_resource_deletion feature enabled: confirm it's on, and xds_v3 is off.
assertThat(serverInfo.useProtocolV3()).isFalse();
assertThat(serverInfo.ignoreResourceDeletion()).isTrue();
}

@Test
public void serverFeatureIgnoreResourceDeletion_xdsV3() throws XdsInitializationException {
String rawData = "{\n"
+ " \"xds_servers\": [\n"
+ " {\n"
+ " \"server_uri\": \"" + SERVER_URI + "\",\n"
+ " \"channel_creds\": [\n"
+ " {\"type\": \"insecure\"}\n"
+ " ],\n"
+ " \"server_features\": [\"xds_v3\", \"ignore_resource_deletion\"]\n"
+ " }\n"
+ " ]\n"
+ "}";

bootstrapper.setFileReader(createFileReader(BOOTSTRAP_FILE_PATH, rawData));
BootstrapInfo info = bootstrapper.bootstrap();
ServerInfo serverInfo = Iterables.getOnlyElement(info.servers());
assertThat(serverInfo.target()).isEqualTo(SERVER_URI);
assertThat(serverInfo.channelCredentials()).isInstanceOf(InsecureChannelCredentials.class);
// xds_v3 and ignore_resource_deletion features enabled: confirm both are on.
assertThat(serverInfo.useProtocolV3()).isTrue();
assertThat(serverInfo.ignoreResourceDeletion()).isTrue();
}

@Test
public void notFound() {
BootstrapperImpl.bootstrapPathFromEnvVar = null;
Expand Down