Skip to content

Commit

Permalink
xds: create singleton XdsClient object (promote ClientXdsClient) (grp…
Browse files Browse the repository at this point in the history
…c#7500)

Use a global factory to create a shared XdsClient object pool that can be used by multiple client channels. The object pool is thread-safe and holds a single XdsClient returning to each client channel. So at most one XdsClient instance will be created per process, and it is shared between client channels.
  • Loading branch information
voidzcy authored and dfawley committed Jan 15, 2021
1 parent 9cd4e68 commit e491174
Show file tree
Hide file tree
Showing 7 changed files with 288 additions and 313 deletions.
146 changes: 146 additions & 0 deletions xds/src/main/java/io/grpc/xds/SharedXdsClientPoolProvider.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*
* Copyright 2020 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.grpc.xds;

import static com.google.common.base.Preconditions.checkNotNull;

import com.google.common.annotations.VisibleForTesting;
import io.grpc.internal.ExponentialBackoffPolicy;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.ObjectPool;
import io.grpc.internal.SharedResourceHolder;
import io.grpc.xds.Bootstrapper.BootstrapInfo;
import io.grpc.xds.EnvoyProtoData.Node;
import io.grpc.xds.XdsClient.XdsChannel;
import io.grpc.xds.XdsNameResolverProvider.XdsClientPoolFactory;
import java.util.concurrent.ScheduledExecutorService;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;

/**
* The global factory for creating a singleton {@link XdsClient} instance to be used by all gRPC
* clients in the process.
*/
@ThreadSafe
final class SharedXdsClientPoolProvider implements XdsClientPoolFactory {
private final Bootstrapper bootstrapper;
private final XdsChannelFactory channelFactory;
private final Object lock = new Object();
private volatile ObjectPool<XdsClient> xdsClientPool;

private SharedXdsClientPoolProvider() {
this(Bootstrapper.getInstance(), XdsChannelFactory.getInstance());
}

@VisibleForTesting
SharedXdsClientPoolProvider(
Bootstrapper bootstrapper, XdsChannelFactory channelFactory) {
this.bootstrapper = checkNotNull(bootstrapper, "bootstrapper");
this.channelFactory = checkNotNull(channelFactory, "channelFactory");
}

static SharedXdsClientPoolProvider getDefaultProvider() {
return SharedXdsClientPoolProviderHolder.instance;
}

@Override
public ObjectPool<XdsClient> getXdsClientPool() throws XdsInitializationException {
ObjectPool<XdsClient> ref = xdsClientPool;
if (ref == null) {
synchronized (lock) {
ref = xdsClientPool;
if (ref == null) {
BootstrapInfo bootstrapInfo = bootstrapper.readBootstrap();
XdsChannel channel = channelFactory.createChannel(bootstrapInfo.getServers());
ref = xdsClientPool = new RefCountedXdsClientObjectPool(channel, bootstrapInfo.getNode());
}
}
}
return ref;
}

private static class SharedXdsClientPoolProviderHolder {
private static final SharedXdsClientPoolProvider instance = new SharedXdsClientPoolProvider();
}

@ThreadSafe
@VisibleForTesting
static class RefCountedXdsClientObjectPool implements ObjectPool<XdsClient> {
private final XdsChannel channel;
private final Node node;
private final XdsClientFactory factory;
private final Object lock = new Object();
@GuardedBy("lock")
private ScheduledExecutorService scheduler;
@GuardedBy("lock")
private XdsClient xdsClient;
@GuardedBy("lock")
private int refCount;

RefCountedXdsClientObjectPool(XdsChannel channel, Node node) {
this(channel, node, XdsClientFactory.INSTANCE);
}

@VisibleForTesting
RefCountedXdsClientObjectPool(XdsChannel channel, Node node, XdsClientFactory factory) {
this.channel = checkNotNull(channel, "channel");
this.node = checkNotNull(node, "node");
this.factory = checkNotNull(factory, "factory");
}

@Override
public XdsClient getObject() {
synchronized (lock) {
if (xdsClient == null) {
scheduler = SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE);
xdsClient = factory.newXdsClient(channel, node, scheduler);
}
refCount++;
return xdsClient;
}
}

@Override
public XdsClient returnObject(Object object) {
synchronized (lock) {
refCount--;
if (refCount == 0) {
xdsClient.shutdown();
xdsClient = null;
scheduler = SharedResourceHolder.release(GrpcUtil.TIMER_SERVICE, scheduler);
}
return null;
}
}

// Introduced for testing.
@VisibleForTesting
abstract static class XdsClientFactory {
private static final XdsClientFactory INSTANCE = new XdsClientFactory() {
@Override
XdsClient newXdsClient(XdsChannel channel, Node node,
ScheduledExecutorService timeService) {
return new ClientXdsClient(channel, node, timeService,
new ExponentialBackoffPolicy.Provider(), GrpcUtil.STOPWATCH_SUPPLIER);
}
};

abstract XdsClient newXdsClient(XdsChannel channel, Node node,
ScheduledExecutorService timeService);
}
}
}
70 changes: 2 additions & 68 deletions xds/src/main/java/io/grpc/xds/XdsClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,12 @@
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import com.google.common.base.MoreObjects.ToStringHelper;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.grpc.ManagedChannel;
import io.grpc.Status;
import io.grpc.internal.ObjectPool;
import io.grpc.xds.EnvoyProtoData.DropOverload;
import io.grpc.xds.EnvoyProtoData.Locality;
import io.grpc.xds.EnvoyProtoData.LocalityLbEndpoints;
Expand Down Expand Up @@ -537,7 +534,8 @@ interface ListenerWatcher extends ResourceWatcher {
/**
* Shutdown this {@link XdsClient} and release resources.
*/
abstract void shutdown();
void shutdown() {
}

/**
* Registers a data watcher for the given LDS resource.
Expand Down Expand Up @@ -611,74 +609,10 @@ void removeClientStats(String clusterName, @Nullable String clusterServiceName)
throw new UnsupportedOperationException();
}

// TODO(chengyuanzhang): eliminate this factory
abstract static class XdsClientFactory {
abstract XdsClient createXdsClient();
}

/**
* An {@link ObjectPool} holding reference and ref-count of an {@link XdsClient} instance.
* Initially the instance is null and the ref-count is zero. {@link #getObject()} will create a
* new XdsClient instance if the ref-count is zero when calling the method. {@code #getObject()}
* increments the ref-count and {@link #returnObject(Object)} decrements it. Anytime when the
* ref-count gets back to zero, the XdsClient instance will be shutdown and de-referenced.
*/
static final class RefCountedXdsClientObjectPool implements ObjectPool<XdsClient> {

private final XdsClientFactory xdsClientFactory;

@VisibleForTesting
@Nullable
XdsClient xdsClient;

private int refCount;

RefCountedXdsClientObjectPool(XdsClientFactory xdsClientFactory) {
this.xdsClientFactory = Preconditions.checkNotNull(xdsClientFactory, "xdsClientFactory");
}

/**
* See {@link RefCountedXdsClientObjectPool}.
*/
@Override
public synchronized XdsClient getObject() {
if (xdsClient == null) {
checkState(
refCount == 0,
"Bug: refCount should be zero while xdsClient is null");
xdsClient = xdsClientFactory.createXdsClient();
}
refCount++;
return xdsClient;
}

/**
* See {@link RefCountedXdsClientObjectPool}.
*/
@Override
public synchronized XdsClient returnObject(Object object) {
checkState(
object == xdsClient,
"Bug: the returned object '%s' does not match current XdsClient '%s'",
object,
xdsClient);

refCount--;
checkState(refCount >= 0, "Bug: refCount of XdsClient less than 0");
if (refCount == 0) {
xdsClient.shutdown();
xdsClient = null;
}

return null;
}
}

static final class XdsChannel {
private final ManagedChannel managedChannel;
private final boolean useProtocolV3;

@VisibleForTesting
XdsChannel(ManagedChannel managedChannel, boolean useProtocolV3) {
this.managedChannel = managedChannel;
this.useProtocolV3 = useProtocolV3;
Expand Down
31 changes: 7 additions & 24 deletions xds/src/main/java/io/grpc/xds/XdsNameResolver.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import io.grpc.SynchronizationContext;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.ObjectPool;
import io.grpc.xds.Bootstrapper.BootstrapInfo;
import io.grpc.xds.EnvoyProtoData.ClusterWeight;
import io.grpc.xds.EnvoyProtoData.Route;
import io.grpc.xds.EnvoyProtoData.RouteAction;
Expand All @@ -43,7 +42,6 @@
import io.grpc.xds.XdsClient.LdsUpdate;
import io.grpc.xds.XdsClient.RdsResourceWatcher;
import io.grpc.xds.XdsClient.RdsUpdate;
import io.grpc.xds.XdsClient.XdsChannel;
import io.grpc.xds.XdsLogger.XdsLogLevel;
import io.grpc.xds.XdsNameResolverProvider.XdsClientPoolFactory;
import java.util.Collection;
Expand Down Expand Up @@ -79,8 +77,6 @@ final class XdsNameResolver extends NameResolver {
private final String authority;
private final ServiceConfigParser serviceConfigParser;
private final SynchronizationContext syncContext;
private final Bootstrapper bootstrapper;
private final XdsChannelFactory channelFactory;
private final XdsClientPoolFactory xdsClientPoolFactory;
private final ThreadSafeRandom random;
private final ConcurrentMap<String, AtomicInteger> clusterRefs = new ConcurrentHashMap<>();
Expand All @@ -92,28 +88,19 @@ final class XdsNameResolver extends NameResolver {
private XdsClient xdsClient;
private ResolveState resolveState;

XdsNameResolver(String name,
ServiceConfigParser serviceConfigParser,
SynchronizationContext syncContext,
XdsClientPoolFactory xdsClientPoolFactory) {
this(name, serviceConfigParser, syncContext, Bootstrapper.getInstance(),
XdsChannelFactory.getInstance(), xdsClientPoolFactory, ThreadSafeRandomImpl.instance);
XdsNameResolver(String name, ServiceConfigParser serviceConfigParser,
SynchronizationContext syncContext) {
this(name, serviceConfigParser, syncContext, SharedXdsClientPoolProvider.getDefaultProvider(),
ThreadSafeRandomImpl.instance);
}

@VisibleForTesting
XdsNameResolver(
String name,
ServiceConfigParser serviceConfigParser,
SynchronizationContext syncContext,
Bootstrapper bootstrapper,
XdsChannelFactory channelFactory,
XdsClientPoolFactory xdsClientPoolFactory,
XdsNameResolver(String name, ServiceConfigParser serviceConfigParser,
SynchronizationContext syncContext, XdsClientPoolFactory xdsClientPoolFactory,
ThreadSafeRandom random) {
authority = GrpcUtil.checkAuthority(checkNotNull(name, "name"));
this.serviceConfigParser = checkNotNull(serviceConfigParser, "serviceConfigParser");
this.syncContext = checkNotNull(syncContext, "syncContext");
this.bootstrapper = checkNotNull(bootstrapper, "bootstrapper");
this.channelFactory = checkNotNull(channelFactory, "channelFactory");
this.xdsClientPoolFactory = checkNotNull(xdsClientPoolFactory, "xdsClientPoolFactory");
this.random = checkNotNull(random, "random");
logger = XdsLogger.withLogId(InternalLogId.allocate("xds-resolver", name));
Expand All @@ -128,17 +115,13 @@ public String getServiceAuthority() {
@Override
public void start(Listener2 listener) {
this.listener = checkNotNull(listener, "listener");
BootstrapInfo bootstrapInfo;
XdsChannel channel;
try {
bootstrapInfo = bootstrapper.readBootstrap();
channel = channelFactory.createChannel(bootstrapInfo.getServers());
xdsClientPool = xdsClientPoolFactory.getXdsClientPool();
} catch (Exception e) {
listener.onError(
Status.UNAVAILABLE.withDescription("Failed to initialize xDS").withCause(e));
return;
}
xdsClientPool = xdsClientPoolFactory.newXdsClientObjectPool(bootstrapInfo, channel);
xdsClient = xdsClientPool.getObject();
resolveState = new ResolveState();
resolveState.start();
Expand Down

0 comments on commit e491174

Please sign in to comment.