Skip to content

Commit

Permalink
xds: refactor xds client to make it resource agnostic (grpc#9444)
Browse files Browse the repository at this point in the history
Mainly refactor work to make type specific xds resources generic, e.g.
1. Define abstract class XdsResourceType to be extended by pluggable new resources. It mainly contains abstract method doParse() to parse unpacked proto messges and produce a ResourceUpdate. The common unpacking proto logic is in XdsResourceType default method parse()
2. Move the parsing/processing logics to specific XdsResourceType. Implementing:
XdsListenerResource for LDS
XdsRouteConfigureResource for RDS
XdsClusterResource for CDS
XdsEndpointResource for EDS
3. The XdsResourceTypes are singleton. To process for each XdsClient, context is passed in parameters, defined by XdsResourceType.Args.
4. Watchers will use generic APIs to subscribe to resource watchXdsResource(XdsResourceType, resourceName, watcher). Watcher and ResourceSubscribers becomes java generic class.
  • Loading branch information
YifeiZhuang committed Sep 16, 2022
1 parent a3c1d77 commit e1ad984
Show file tree
Hide file tree
Showing 23 changed files with 3,455 additions and 3,236 deletions.
163 changes: 38 additions & 125 deletions xds/src/main/java/io/grpc/xds/AbstractXdsClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,14 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static io.grpc.xds.XdsClusterResource.ADS_TYPE_URL_CDS;
import static io.grpc.xds.XdsClusterResource.ADS_TYPE_URL_CDS_V2;
import static io.grpc.xds.XdsEndpointResource.ADS_TYPE_URL_EDS;
import static io.grpc.xds.XdsEndpointResource.ADS_TYPE_URL_EDS_V2;
import static io.grpc.xds.XdsListenerResource.ADS_TYPE_URL_LDS;
import static io.grpc.xds.XdsListenerResource.ADS_TYPE_URL_LDS_V2;
import static io.grpc.xds.XdsRouteConfigureResource.ADS_TYPE_URL_RDS;
import static io.grpc.xds.XdsRouteConfigureResource.ADS_TYPE_URL_RDS_V2;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
Expand All @@ -41,11 +49,14 @@
import io.grpc.xds.ClientXdsClient.XdsChannelFactory;
import io.grpc.xds.EnvoyProtoData.Node;
import io.grpc.xds.XdsClient.ResourceStore;
import io.grpc.xds.XdsClient.ResourceUpdate;
import io.grpc.xds.XdsClient.XdsResponseHandler;
import io.grpc.xds.XdsLogger.XdsLogLevel;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
Expand All @@ -55,23 +66,6 @@
* the xDS RPC stream.
*/
final class AbstractXdsClient {

private static final String ADS_TYPE_URL_LDS_V2 = "type.googleapis.com/envoy.api.v2.Listener";
private static final String ADS_TYPE_URL_LDS =
"type.googleapis.com/envoy.config.listener.v3.Listener";
private static final String ADS_TYPE_URL_RDS_V2 =
"type.googleapis.com/envoy.api.v2.RouteConfiguration";
private static final String ADS_TYPE_URL_RDS =
"type.googleapis.com/envoy.config.route.v3.RouteConfiguration";
@VisibleForTesting
static final String ADS_TYPE_URL_CDS_V2 = "type.googleapis.com/envoy.api.v2.Cluster";
private static final String ADS_TYPE_URL_CDS =
"type.googleapis.com/envoy.config.cluster.v3.Cluster";
private static final String ADS_TYPE_URL_EDS_V2 =
"type.googleapis.com/envoy.api.v2.ClusterLoadAssignment";
private static final String ADS_TYPE_URL_EDS =
"type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment";

private final SynchronizationContext syncContext;
private final InternalLogId logId;
private final XdsLogger logger;
Expand All @@ -88,10 +82,7 @@ final class AbstractXdsClient {
// Last successfully applied version_info for each resource type. Starts with empty string.
// A version_info is used to update management server with client's most recent knowledge of
// resources.
private String ldsVersion = "";
private String rdsVersion = "";
private String cdsVersion = "";
private String edsVersion = "";
private final Map<ResourceType, String> versions = new HashMap<>();

private boolean shutdown;
@Nullable
Expand Down Expand Up @@ -162,16 +153,17 @@ public String toString() {
* Updates the resource subscription for the given resource type.
*/
// Must be synchronized.
void adjustResourceSubscription(ResourceType type) {
void adjustResourceSubscription(XdsResourceType<?> resourceType) {
if (isInBackoff()) {
return;
}
if (adsStream == null) {
startRpcStream();
}
Collection<String> resources = resourceStore.getSubscribedResources(serverInfo, type);
Collection<String> resources = resourceStore.getSubscribedResources(serverInfo,
resourceType.typeName());
if (resources != null) {
adsStream.sendDiscoveryRequest(type, resources);
adsStream.sendDiscoveryRequest(resourceType, resources);
}
}

Expand All @@ -180,24 +172,9 @@ void adjustResourceSubscription(ResourceType type) {
* and sends an ACK request to the management server.
*/
// Must be synchronized.
void ackResponse(ResourceType type, String versionInfo, String nonce) {
switch (type) {
case LDS:
ldsVersion = versionInfo;
break;
case RDS:
rdsVersion = versionInfo;
break;
case CDS:
cdsVersion = versionInfo;
break;
case EDS:
edsVersion = versionInfo;
break;
case UNKNOWN:
default:
throw new AssertionError("Unknown resource type: " + type);
}
void ackResponse(XdsResourceType<?> xdsResourceType, String versionInfo, String nonce) {
ResourceType type = xdsResourceType.typeName();
versions.put(type, versionInfo);
logger.log(XdsLogLevel.INFO, "Sending ACK for {0} update, nonce: {1}, current version: {2}",
type, nonce, versionInfo);
Collection<String> resources = resourceStore.getSubscribedResources(serverInfo, type);
Expand All @@ -212,8 +189,9 @@ void ackResponse(ResourceType type, String versionInfo, String nonce) {
* accepted version) to the management server.
*/
// Must be synchronized.
void nackResponse(ResourceType type, String nonce, String errorDetail) {
String versionInfo = getCurrentVersion(type);
void nackResponse(XdsResourceType<?> xdsResourceType, String nonce, String errorDetail) {
ResourceType type = xdsResourceType.typeName();
String versionInfo = versions.getOrDefault(type, "");
logger.log(XdsLogLevel.INFO, "Sending NACK for {0} update, nonce: {1}, current version: {2}",
type, nonce, versionInfo);
Collection<String> resources = resourceStore.getSubscribedResources(serverInfo, type);
Expand Down Expand Up @@ -253,30 +231,6 @@ private void startRpcStream() {
stopwatch.reset().start();
}

/** Returns the latest accepted version of the given resource type. */
// Must be synchronized.
String getCurrentVersion(ResourceType type) {
String version;
switch (type) {
case LDS:
version = ldsVersion;
break;
case RDS:
version = rdsVersion;
break;
case CDS:
version = cdsVersion;
break;
case EDS:
version = edsVersion;
break;
case UNKNOWN:
default:
throw new AssertionError("Unknown resource type: " + type);
}
return version;
}

@VisibleForTesting
final class RpcRetryTask implements Runnable {
@Override
Expand All @@ -291,13 +245,14 @@ public void run() {
}
Collection<String> resources = resourceStore.getSubscribedResources(serverInfo, type);
if (resources != null) {
adsStream.sendDiscoveryRequest(type, resources);
adsStream.sendDiscoveryRequest(resourceStore.getXdsResourceType(type), resources);
}
}
xdsResponseHandler.handleStreamRestarted(serverInfo);
}
}

// TODO(zivy) : remove and replace with XdsResourceType
enum ResourceType {
UNKNOWN, LDS, RDS, CDS, EDS;

Expand Down Expand Up @@ -361,17 +316,13 @@ static ResourceType fromTypeUrl(String typeUrl) {
private abstract class AbstractAdsStream {
private boolean responseReceived;
private boolean closed;

// Response nonce for the most recently received discovery responses of each resource type.
// Client initiated requests start response nonce with empty string.
// A nonce is used to indicate the specific DiscoveryResponse each DiscoveryRequest
// corresponds to.
// A nonce becomes stale following a newer nonce being presented to the client in a
// DiscoveryResponse.
private String ldsRespNonce = "";
private String rdsRespNonce = "";
private String cdsRespNonce = "";
private String edsRespNonce = "";
// Nonce in each response is echoed back in the following ACK/NACK request. It is
// used for management server to identify which response the client is ACKing/NACking.
// To avoid confusion, client-initiated requests will always use the nonce in
// most recently received responses of each resource type.
private final Map<ResourceType, String> respNonces = new HashMap<>();

abstract void start();

Expand All @@ -381,35 +332,20 @@ private abstract class AbstractAdsStream {
* Sends a discovery request with the given {@code versionInfo}, {@code nonce} and
* {@code errorDetail}. Used for reacting to a specific discovery response. For
* client-initiated discovery requests, use {@link
* #sendDiscoveryRequest(ResourceType, Collection)}.
* #sendDiscoveryRequest(XdsResourceType, Collection)}.
*/
abstract void sendDiscoveryRequest(ResourceType type, String versionInfo,
abstract void sendDiscoveryRequest(ResourceType type, String version,
Collection<String> resources, String nonce, @Nullable String errorDetail);

/**
* Sends a client-initiated discovery request.
*/
final void sendDiscoveryRequest(ResourceType type, Collection<String> resources) {
String nonce;
switch (type) {
case LDS:
nonce = ldsRespNonce;
break;
case RDS:
nonce = rdsRespNonce;
break;
case CDS:
nonce = cdsRespNonce;
break;
case EDS:
nonce = edsRespNonce;
break;
case UNKNOWN:
default:
throw new AssertionError("Unknown resource type: " + type);
}
final void sendDiscoveryRequest(XdsResourceType<? extends ResourceUpdate> xdsResourceType,
Collection<String> resources) {
ResourceType type = xdsResourceType.typeName();
logger.log(XdsLogLevel.INFO, "Sending {0} request for resources: {1}", type, resources);
sendDiscoveryRequest(type, getCurrentVersion(type), resources, nonce, null);
sendDiscoveryRequest(type, versions.getOrDefault(type, ""), resources,
respNonces.getOrDefault(type, ""), null);
}

final void handleRpcResponse(
Expand All @@ -418,31 +354,8 @@ final void handleRpcResponse(
return;
}
responseReceived = true;
// Nonce in each response is echoed back in the following ACK/NACK request. It is
// used for management server to identify which response the client is ACKing/NACking.
// To avoid confusion, client-initiated requests will always use the nonce in
// most recently received responses of each resource type.
switch (type) {
case LDS:
ldsRespNonce = nonce;
xdsResponseHandler.handleLdsResponse(serverInfo, versionInfo, resources, nonce);
break;
case RDS:
rdsRespNonce = nonce;
xdsResponseHandler.handleRdsResponse(serverInfo, versionInfo, resources, nonce);
break;
case CDS:
cdsRespNonce = nonce;
xdsResponseHandler.handleCdsResponse(serverInfo, versionInfo, resources, nonce);
break;
case EDS:
edsRespNonce = nonce;
xdsResponseHandler.handleEdsResponse(serverInfo, versionInfo, resources, nonce);
break;
case UNKNOWN:
default:
logger.log(XdsLogLevel.WARNING, "Ignore an unknown type of DiscoveryResponse");
}
respNonces.put(type, nonce);
xdsResponseHandler.handleResourceResponse(type, serverInfo, versionInfo, resources, nonce);
}

final void handleRpcError(Throwable t) {
Expand Down
13 changes: 7 additions & 6 deletions xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@
import io.grpc.xds.CdsLoadBalancerProvider.CdsConfig;
import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig;
import io.grpc.xds.ClusterResolverLoadBalancerProvider.ClusterResolverConfig.DiscoveryMechanism;
import io.grpc.xds.XdsClient.CdsResourceWatcher;
import io.grpc.xds.XdsClient.CdsUpdate;
import io.grpc.xds.XdsClient.CdsUpdate.ClusterType;
import io.grpc.xds.XdsClient.ResourceWatcher;
import io.grpc.xds.XdsClusterResource.CdsUpdate;
import io.grpc.xds.XdsClusterResource.CdsUpdate.ClusterType;
import io.grpc.xds.XdsLogger.XdsLogLevel;
import io.grpc.xds.XdsSubchannelPickers.ErrorPicker;
import java.util.ArrayDeque;
Expand Down Expand Up @@ -221,7 +221,7 @@ private void handleClusterDiscoveryError(Status error) {
}
}

private final class ClusterState implements CdsResourceWatcher {
private final class ClusterState implements ResourceWatcher<CdsUpdate> {
private final String name;
@Nullable
private Map<String, ClusterState> childClusterStates;
Expand All @@ -237,12 +237,12 @@ private ClusterState(String name) {
}

private void start() {
xdsClient.watchCdsResource(name, this);
xdsClient.watchXdsResource(XdsClusterResource.getInstance(), name, this);
}

void shutdown() {
shutdown = true;
xdsClient.cancelCdsResourceWatch(name, this);
xdsClient.cancelXdsResourceWatch(XdsClusterResource.getInstance(), name, this);
if (childClusterStates != null) { // recursively shut down all descendants
for (ClusterState state : childClusterStates.values()) {
state.shutdown();
Expand Down Expand Up @@ -300,6 +300,7 @@ public void run() {
if (shutdown) {
return;
}

logger.log(XdsLogLevel.DEBUG, "Received cluster update {0}", update);
discovered = true;
result = update;
Expand Down

0 comments on commit e1ad984

Please sign in to comment.