Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds: support xdstp scheme in resource URIs for federation #8716

Merged
merged 4 commits into from Nov 22, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions xds/src/main/java/io/grpc/xds/Bootstrapper.java
Expand Up @@ -33,6 +33,8 @@
@Internal
public abstract class Bootstrapper {

static final String XDSTP_SCHEME = "xdstp:";

/**
* Returns system-loaded bootstrap configuration.
*/
Expand Down
2 changes: 1 addition & 1 deletion xds/src/main/java/io/grpc/xds/BootstrapperImpl.java
Expand Up @@ -229,7 +229,7 @@ BootstrapInfo bootstrap(Map<String, ?> rawData) throws XdsInitializationExceptio
JsonUtil.getString(rawAuthority, "client_listener_resource_name_template");
logger.log(
XdsLogLevel.INFO, "client_listener_resource_name_template: {0}", clientListnerTemplate);
String prefix = "xdstp://" + authorityName + "/";
String prefix = XDSTP_SCHEME + "//" + authorityName + "/";
if (clientListnerTemplate == null) {
clientListnerTemplate = prefix + "envoy.config.listener.v3.Listener/%s";
} else if (!clientListnerTemplate.startsWith(prefix)) {
Expand Down
21 changes: 18 additions & 3 deletions xds/src/main/java/io/grpc/xds/ClientXdsClient.java
Expand Up @@ -18,6 +18,7 @@

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static io.grpc.xds.Bootstrapper.XDSTP_SCHEME;

import com.github.udpa.udpa.type.v1.TypedStruct;
import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -70,6 +71,7 @@
import io.grpc.internal.BackoffPolicy;
import io.grpc.internal.TimeProvider;
import io.grpc.xds.AbstractXdsClient.ResourceType;
import io.grpc.xds.Bootstrapper.AuthorityInfo;
import io.grpc.xds.Bootstrapper.ServerInfo;
import io.grpc.xds.Endpoints.DropOverload;
import io.grpc.xds.Endpoints.LbEndpoint;
Expand Down Expand Up @@ -98,6 +100,7 @@
import io.grpc.xds.internal.Matchers.FractionMatcher;
import io.grpc.xds.internal.Matchers.HeaderMatcher;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -2256,8 +2259,12 @@ private final class ResourceSubscriber {
ResourceSubscriber(ResourceType type, String resource) {
syncContext.throwIfNotInThisSynchronizationContext();
this.type = type;
// TODO(zdapeng): Validate authority in resource URI for new-style resource name
// when parsing XDS response.
// TODO(zdapeng): Canonicalize the resource name by sorting the context params in normal
// lexicographic order.
this.resource = resource;
this.serverInfo = getServerInfo();
this.serverInfo = getServerInfo(resource);
// Initialize metadata in UNKNOWN state to cover the case when resource subscriber,
// is created but not yet requested because the client is in backoff.
this.metadata = ResourceMetadata.newResourceMetadataUnknown();
Expand All @@ -2269,8 +2276,16 @@ private final class ResourceSubscriber {
restartTimer();
}

// TODO(zdapeng): add resourceName arg and support xdstp:// resources
private ServerInfo getServerInfo() {
private ServerInfo getServerInfo(String resource) {
if (resource.startsWith(XDSTP_SCHEME)) {
URI uri = URI.create(resource);
String authority = uri.getAuthority();
if (authority == null) {
authority = "";
}
AuthorityInfo authorityInfo = bootstrapInfo.authorities().get(authority);
return authorityInfo.xdsServers().get(0);
YifeiZhuang marked this conversation as resolved.
Show resolved Hide resolved
}
return bootstrapInfo.servers().get(0); // use first server
}

Expand Down
62 changes: 49 additions & 13 deletions xds/src/main/java/io/grpc/xds/XdsNameResolver.java
Expand Up @@ -18,12 +18,14 @@

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static io.grpc.xds.Bootstrapper.XDSTP_SCHEME;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import com.google.common.net.UrlEscapers;
import com.google.gson.Gson;
import com.google.protobuf.util.Durations;
import io.grpc.Attributes;
Expand All @@ -45,6 +47,8 @@
import io.grpc.SynchronizationContext;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.ObjectPool;
import io.grpc.xds.Bootstrapper.AuthorityInfo;
import io.grpc.xds.Bootstrapper.BootstrapInfo;
import io.grpc.xds.Filter.ClientInterceptorBuilder;
import io.grpc.xds.Filter.FilterConfig;
import io.grpc.xds.Filter.NamedFilterConfig;
Expand Down Expand Up @@ -101,7 +105,9 @@ final class XdsNameResolver extends NameResolver {

private final InternalLogId logId;
private final XdsLogger logger;
private final String authority;
@Nullable
private final String targetAuthority;
private final String serviceAuthority;
private final ServiceConfigParser serviceConfigParser;
private final SynchronizationContext syncContext;
private final ScheduledExecutorService scheduler;
Expand All @@ -120,20 +126,23 @@ final class XdsNameResolver extends NameResolver {
private CallCounterProvider callCounterProvider;
private ResolveState resolveState;

XdsNameResolver(String name, ServiceConfigParser serviceConfigParser,
XdsNameResolver(
@Nullable String targetAuthority, String name, ServiceConfigParser serviceConfigParser,
SynchronizationContext syncContext, ScheduledExecutorService scheduler,
@Nullable Map<String, ?> bootstrapOverride) {
this(name, serviceConfigParser, syncContext, scheduler,
this(targetAuthority, name, serviceConfigParser, syncContext, scheduler,
SharedXdsClientPoolProvider.getDefaultProvider(), ThreadSafeRandomImpl.instance,
FilterRegistry.getDefaultRegistry(), bootstrapOverride);
}

@VisibleForTesting
XdsNameResolver(String name, ServiceConfigParser serviceConfigParser,
XdsNameResolver(
@Nullable String targetAuthority, String name, ServiceConfigParser serviceConfigParser,
SynchronizationContext syncContext, ScheduledExecutorService scheduler,
XdsClientPoolFactory xdsClientPoolFactory, ThreadSafeRandom random,
FilterRegistry filterRegistry, @Nullable Map<String, ?> bootstrapOverride) {
authority = GrpcUtil.checkAuthority(checkNotNull(name, "name"));
this.targetAuthority = targetAuthority;
serviceAuthority = GrpcUtil.checkAuthority(checkNotNull(name, "name"));
this.serviceConfigParser = checkNotNull(serviceConfigParser, "serviceConfigParser");
this.syncContext = checkNotNull(syncContext, "syncContext");
this.scheduler = checkNotNull(scheduler, "scheduler");
Expand All @@ -149,7 +158,7 @@ final class XdsNameResolver extends NameResolver {

@Override
public String getServiceAuthority() {
return authority;
return serviceAuthority;
}

@Override
Expand All @@ -163,11 +172,33 @@ public void start(Listener2 listener) {
return;
}
xdsClient = xdsClientPool.getObject();
BootstrapInfo bootstrapInfo = xdsClient.getBootstrapInfo();
String listenerNameTemplate;
if (targetAuthority == null) {
listenerNameTemplate = bootstrapInfo.clientDefaultListenerResourceNameTemplate();
} else {
AuthorityInfo authorityInfo = bootstrapInfo.authorities().get(targetAuthority);
if (authorityInfo == null) {
listener.onError(Status.INVALID_ARGUMENT.withDescription(
"invalid target URI: target authority not found in the bootstrap"));
return;
}
listenerNameTemplate = authorityInfo.clientListenerResourceNameTemplate();
}
String replacement = serviceAuthority;
if (listenerNameTemplate.startsWith(XDSTP_SCHEME)) {
replacement = UrlEscapers.urlFragmentEscaper().escape(replacement);
}
String ldsResourceName = expandPercentS(listenerNameTemplate, replacement);
callCounterProvider = SharedCallCounterMap.getInstance();
resolveState = new ResolveState();
resolveState = new ResolveState(ldsResourceName);
resolveState.start();
}

private static String expandPercentS(String template, String replacement) {
return template.replace("%s", replacement);
}

@Override
public void shutdown() {
logger.log(XdsLogLevel.INFO, "Shutdown");
Expand Down Expand Up @@ -624,12 +655,17 @@ private class ResolveState implements LdsResourceWatcher {
.setServiceConfig(emptyServiceConfig)
// let channel take action for no config selector
.build();
private final String ldsResourceName;
private boolean stopped;
@Nullable
private Set<String> existingClusters; // clusters to which new requests can be routed
@Nullable
private RouteDiscoveryState routeDiscoveryState;

ResolveState(String ldsResourceName) {
this.ldsResourceName = ldsResourceName;
}

@Override
public void onChanged(final LdsUpdate update) {
syncContext.execute(new Runnable() {
Expand Down Expand Up @@ -686,23 +722,23 @@ public void run() {
}

private void start() {
logger.log(XdsLogLevel.INFO, "Start watching LDS resource {0}", authority);
xdsClient.watchLdsResource(authority, this);
logger.log(XdsLogLevel.INFO, "Start watching LDS resource {0}", ldsResourceName);
xdsClient.watchLdsResource(ldsResourceName, this);
}

private void stop() {
logger.log(XdsLogLevel.INFO, "Stop watching LDS resource {0}", authority);
logger.log(XdsLogLevel.INFO, "Stop watching LDS resource {0}", ldsResourceName);
stopped = true;
cleanUpRouteDiscoveryState();
xdsClient.cancelLdsResourceWatch(authority, this);
xdsClient.cancelLdsResourceWatch(ldsResourceName, this);
}

private void updateRoutes(List<VirtualHost> virtualHosts, long httpMaxStreamDurationNano,
@Nullable List<NamedFilterConfig> filterConfigs) {
VirtualHost virtualHost = findVirtualHostForHostName(virtualHosts, authority);
VirtualHost virtualHost = findVirtualHostForHostName(virtualHosts, ldsResourceName);
if (virtualHost == null) {
logger.log(XdsLogLevel.WARNING,
"Failed to find virtual host matching hostname {0}", authority);
"Failed to find virtual host matching hostname {0}", ldsResourceName);
cleanUpRoutes();
return;
}
Expand Down
7 changes: 4 additions & 3 deletions xds/src/main/java/io/grpc/xds/XdsNameResolverProvider.java
Expand Up @@ -74,9 +74,10 @@ public XdsNameResolver newNameResolver(URI targetUri, Args args) {
targetPath,
targetUri);
String name = targetPath.substring(1);
return new XdsNameResolver(name, args.getServiceConfigParser(),
args.getSynchronizationContext(), args.getScheduledExecutorService(),
bootstrapOverride);
return new XdsNameResolver(
targetUri.getAuthority(), name, args.getServiceConfigParser(),
args.getSynchronizationContext(), args.getScheduledExecutorService(),
bootstrapOverride);
}
return null;
}
Expand Down
8 changes: 7 additions & 1 deletion xds/src/main/java/io/grpc/xds/XdsServerWrapper.java
Expand Up @@ -18,11 +18,13 @@

import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static io.grpc.xds.Bootstrapper.XDSTP_SCHEME;

import com.google.auto.value.AutoValue;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.net.UrlEscapers;
import com.google.common.util.concurrent.SettableFuture;
import io.grpc.Attributes;
import io.grpc.InternalServerInterceptors;
Expand Down Expand Up @@ -192,7 +194,11 @@ private void internalStart() {
xdsClient = xdsClientPool.returnObject(xdsClient);
return;
}
discoveryState = new DiscoveryState(listenerTemplate.replaceAll("%s", listenerAddress));
String replacement = listenerAddress;
if (listenerTemplate.startsWith(XDSTP_SCHEME)) {
replacement = UrlEscapers.urlFragmentEscaper().escape(replacement);
}
discoveryState = new DiscoveryState(listenerTemplate.replaceAll("%s", replacement));
}

@Override
Expand Down
75 changes: 74 additions & 1 deletion xds/src/test/java/io/grpc/xds/ClientXdsClientTestBase.java
Expand Up @@ -59,6 +59,7 @@
import io.grpc.internal.TimeProvider;
import io.grpc.testing.GrpcCleanupRule;
import io.grpc.xds.AbstractXdsClient.ResourceType;
import io.grpc.xds.Bootstrapper.AuthorityInfo;
import io.grpc.xds.Bootstrapper.CertificateProviderInfo;
import io.grpc.xds.Bootstrapper.ServerInfo;
import io.grpc.xds.ClientXdsClient.XdsChannelFactory;
Expand Down Expand Up @@ -114,6 +115,8 @@
@RunWith(JUnit4.class)
public abstract class ClientXdsClientTestBase {
private static final String SERVER_URI = "trafficdirector.googleapis.com";
private static final String SERVER_URI_CUSTOME_AUTHORITY = "trafficdirector2.googleapis.com";
private static final String SERVER_URI_EMPTY_AUTHORITY = "trafficdirector3.googleapis.com";
private static final String LDS_RESOURCE = "listener.googleapis.com";
private static final String RDS_RESOURCE = "route-configuration.googleapis.com";
private static final String CDS_RESOURCE = "cluster.googleapis.com";
Expand Down Expand Up @@ -250,6 +253,8 @@ public long currentTimeNanos() {
private TlsContextManager tlsContextManager;

private ManagedChannel channel;
private ManagedChannel channelForCustomAuthority;
private ManagedChannel channelForEmptyAuthority;
private ClientXdsClient xdsClient;
private boolean originalEnableFaultInjection;
private boolean originalEnableRbac;
Expand Down Expand Up @@ -281,7 +286,24 @@ public void setUp() throws IOException {
XdsChannelFactory xdsChannelFactory = new XdsChannelFactory() {
@Override
ManagedChannel create(ServerInfo serverInfo) {
return channel;
if (serverInfo.target().equals(SERVER_URI)) {
return channel;
}
if (serverInfo.target().equals(SERVER_URI_CUSTOME_AUTHORITY)) {
if (channelForCustomAuthority == null) {
channelForCustomAuthority = cleanupRule.register(
InProcessChannelBuilder.forName(serverName).directExecutor().build());
}
return channelForCustomAuthority;
}
if (serverInfo.target().equals(SERVER_URI_EMPTY_AUTHORITY)) {
if (channelForEmptyAuthority == null) {
channelForEmptyAuthority = cleanupRule.register(
InProcessChannelBuilder.forName(serverName).directExecutor().build());
}
return channelForEmptyAuthority;
}
throw new IllegalArgumentException("Can not create channel for " + serverInfo);
}
};

Expand All @@ -290,6 +312,17 @@ ManagedChannel create(ServerInfo serverInfo) {
.servers(Arrays.asList(
Bootstrapper.ServerInfo.create(SERVER_URI, CHANNEL_CREDENTIALS, useProtocolV3())))
.node(EnvoyProtoData.Node.newBuilder().build())
.authorities(ImmutableMap.of(
"authority.xds.com",
AuthorityInfo.create(
"xdstp://authority.xds.com/envoy.config.listener.v3.Listener/%s",
ImmutableList.of(Bootstrapper.ServerInfo.create(
SERVER_URI_CUSTOME_AUTHORITY, CHANNEL_CREDENTIALS, useProtocolV3()))),
"",
AuthorityInfo.create(
"xdstp:///envoy.config.listener.v3.Listener/%s",
ImmutableList.of(Bootstrapper.ServerInfo.create(
SERVER_URI_EMPTY_AUTHORITY, CHANNEL_CREDENTIALS, useProtocolV3())))))
.certProviders(ImmutableMap.of("cert-instance-name",
CertificateProviderInfo.create("file-watcher", ImmutableMap.<String, Object>of())))
.build();
Expand Down Expand Up @@ -706,6 +739,46 @@ public void ldsResourceUpdated() {
.isEqualTo(RDS_RESOURCE);
verifyResourceMetadataAcked(LDS, LDS_RESOURCE, testListenerRds, VERSION_2, TIME_INCREMENT * 2);
verifySubscribedResourcesMetadataSizes(1, 0, 0, 0);
assertThat(channelForCustomAuthority).isNull();
assertThat(channelForEmptyAuthority).isNull();
}

@Test
public void ldsResourceUpdated_withXdstpResourceName() {
String ldsResourceName =
"xdstp://authority.xds.com/envoy.config.listener.v3.Listener/listener1";
DiscoveryRpcCall call = startResourceWatcher(LDS, ldsResourceName, ldsResourceWatcher);
assertThat(channelForCustomAuthority).isNotNull();
verifyResourceMetadataRequested(LDS, ldsResourceName);

Any testListenerVhosts = Any.pack(mf.buildListenerWithApiListener(ldsResourceName,
mf.buildRouteConfiguration("do not care", mf.buildOpaqueVirtualHosts(VHOST_SIZE))));
call.sendResponse(LDS, testListenerVhosts, VERSION_1, "0000");
call.verifyRequest(LDS, ldsResourceName, VERSION_1, "0000", NODE);
verify(ldsResourceWatcher).onChanged(ldsUpdateCaptor.capture());
assertThat(ldsUpdateCaptor.getValue().httpConnectionManager().virtualHosts())
.hasSize(VHOST_SIZE);
verifyResourceMetadataAcked(
LDS, ldsResourceName, testListenerVhosts, VERSION_1, TIME_INCREMENT);
}

@Test
public void ldsResourceUpdated_withXdstpResourceName_withEmptyAuthority() {
String ldsResourceName =
"xdstp:///envoy.config.listener.v3.Listener/listener1";
DiscoveryRpcCall call = startResourceWatcher(LDS, ldsResourceName, ldsResourceWatcher);
assertThat(channelForEmptyAuthority).isNotNull();
verifyResourceMetadataRequested(LDS, ldsResourceName);

Any testListenerVhosts = Any.pack(mf.buildListenerWithApiListener(ldsResourceName,
mf.buildRouteConfiguration("do not care", mf.buildOpaqueVirtualHosts(VHOST_SIZE))));
call.sendResponse(LDS, testListenerVhosts, VERSION_1, "0000");
call.verifyRequest(LDS, ldsResourceName, VERSION_1, "0000", NODE);
verify(ldsResourceWatcher).onChanged(ldsUpdateCaptor.capture());
assertThat(ldsUpdateCaptor.getValue().httpConnectionManager().virtualHosts())
.hasSize(VHOST_SIZE);
verifyResourceMetadataAcked(
LDS, ldsResourceName, testListenerVhosts, VERSION_1, TIME_INCREMENT);
}

@Test
Expand Down