Skip to content

Commit

Permalink
xds: Drain old server connections on Listener updates
Browse files Browse the repository at this point in the history
This is necessary to make sure all connections are using the new
configuration.
  • Loading branch information
ejona86 committed Sep 20, 2021
1 parent 8b6e0e5 commit 4d5a19c
Show file tree
Hide file tree
Showing 10 changed files with 508 additions and 144 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
package io.grpc.xds;

import static com.google.common.base.Preconditions.checkNotNull;
import static io.grpc.xds.InternalXdsAttributes.ATTR_FILTER_CHAIN_SELECTOR_REF;
import static io.grpc.xds.InternalXdsAttributes.ATTR_DRAIN_GRACE_NANOS;
import static io.grpc.xds.InternalXdsAttributes.ATTR_FILTER_CHAIN_SELECTOR_MANAGER;
import static io.grpc.xds.XdsServerWrapper.ATTR_SERVER_ROUTING_CONFIG;
import static io.grpc.xds.internal.sds.SdsProtocolNegotiators.ATTR_SERVER_SSL_CONTEXT_PROVIDER_SUPPLIER;

Expand All @@ -28,6 +29,7 @@
import io.grpc.Attributes;
import io.grpc.internal.ObjectPool;
import io.grpc.netty.GrpcHttp2ConnectionHandler;
import io.grpc.netty.InternalGracefulServerCloseCommand;
import io.grpc.netty.InternalProtocolNegotiationEvent;
import io.grpc.netty.InternalProtocolNegotiator;
import io.grpc.netty.InternalProtocolNegotiator.ProtocolNegotiator;
Expand All @@ -40,6 +42,8 @@
import io.grpc.xds.XdsServerWrapper.ServerRoutingConfig;
import io.grpc.xds.internal.Matchers.CidrMatcher;
import io.grpc.xds.internal.sds.SslContextProviderSupplier;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
Expand All @@ -54,7 +58,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
Expand All @@ -77,14 +81,16 @@ private FilterChainMatchingProtocolNegotiators() {
static final class FilterChainMatchingHandler extends ChannelInboundHandlerAdapter {

private final GrpcHttp2ConnectionHandler grpcHandler;
private final FilterChainSelector selector;
private final FilterChainSelectorManager filterChainSelectorManager;
private final ProtocolNegotiator delegate;

FilterChainMatchingHandler(
GrpcHttp2ConnectionHandler grpcHandler, FilterChainSelector selector,
GrpcHttp2ConnectionHandler grpcHandler,
FilterChainSelectorManager filterChainSelectorManager,
ProtocolNegotiator delegate) {
this.grpcHandler = checkNotNull(grpcHandler, "grpcHandler");
this.selector = checkNotNull(selector, "selector");
this.filterChainSelectorManager =
checkNotNull(filterChainSelectorManager, "filterChainSelectorManager");
this.delegate = checkNotNull(delegate, "delegate");
}

Expand All @@ -94,6 +100,19 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
super.userEventTriggered(ctx, evt);
return;
}
long drainGraceTime = 0;
TimeUnit drainGraceTimeUnit = null;
Long drainGraceNanosObj = grpcHandler.getEagAttributes().get(ATTR_DRAIN_GRACE_NANOS);
if (drainGraceNanosObj != null) {
drainGraceTime = drainGraceNanosObj;
drainGraceTimeUnit = TimeUnit.NANOSECONDS;
}
FilterChainSelectorManager.Closer closer = new FilterChainSelectorManager.Closer(
new GracefullyShutdownChannelRunnable(ctx.channel(), drainGraceTime, drainGraceTimeUnit));
FilterChainSelector selector = filterChainSelectorManager.register(closer);
ctx.channel().closeFuture().addListener(
new FilterChainSelectorManagerDeregister(filterChainSelectorManager, closer));
checkNotNull(selector, "selector");
SelectedConfig config = selector.select(
(InetSocketAddress) ctx.channel().localAddress(),
(InetSocketAddress) ctx.channel().remoteAddress());
Expand Down Expand Up @@ -354,10 +373,10 @@ public AsciiString scheme() {

@Override
public ChannelHandler newHandler(GrpcHttp2ConnectionHandler grpcHandler) {
AtomicReference<FilterChainSelector> filterChainSelectorRef =
grpcHandler.getEagAttributes().get(ATTR_FILTER_CHAIN_SELECTOR_REF);
checkNotNull(filterChainSelectorRef, "filterChainSelectorRef");
return new FilterChainMatchingHandler(grpcHandler, filterChainSelectorRef.get(),
FilterChainSelectorManager filterChainSelectorManager =
grpcHandler.getEagAttributes().get(ATTR_FILTER_CHAIN_SELECTOR_MANAGER);
checkNotNull(filterChainSelectorManager, "filterChainSelectorManager");
return new FilterChainMatchingHandler(grpcHandler, filterChainSelectorManager,
delegate.newNegotiator(offloadExecutorPool));
}

Expand All @@ -384,4 +403,42 @@ private SelectedConfig(ServerRoutingConfig routingConfig,
this.sslContextProviderSupplier = sslContextProviderSupplier;
}
}

private static class FilterChainSelectorManagerDeregister implements ChannelFutureListener {
private final FilterChainSelectorManager filterChainSelectorManager;
private final FilterChainSelectorManager.Closer closer;

public FilterChainSelectorManagerDeregister(
FilterChainSelectorManager filterChainSelectorManager,
FilterChainSelectorManager.Closer closer) {
this.filterChainSelectorManager =
checkNotNull(filterChainSelectorManager, "filterChainSelectorManager");
this.closer = checkNotNull(closer, "closer");
}

@Override public void operationComplete(ChannelFuture future) throws Exception {
filterChainSelectorManager.deregister(closer);
}
}

private static class GracefullyShutdownChannelRunnable implements Runnable {
private final Channel channel;
private final long drainGraceTime;
@Nullable
private final TimeUnit drainGraceTimeUnit;

public GracefullyShutdownChannelRunnable(
Channel channel, long drainGraceTime, @Nullable TimeUnit drainGraceTimeUnit) {
this.channel = checkNotNull(channel, "channel");
this.drainGraceTime = drainGraceTime;
this.drainGraceTimeUnit = drainGraceTimeUnit;
}

@Override public void run() {
Object gracefulCloseCommand = InternalGracefulServerCloseCommand.create(
"xds_drain", drainGraceTime, drainGraceTimeUnit);
channel.writeAndFlush(gracefulCloseCommand)
.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
}
}
}
95 changes: 95 additions & 0 deletions xds/src/main/java/io/grpc/xds/FilterChainSelectorManager.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Copyright 2021 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.grpc.xds;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import io.grpc.xds.FilterChainMatchingProtocolNegotiators.FilterChainMatchingHandler.FilterChainSelector;
import java.util.Comparator;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.concurrent.GuardedBy;

/**
* Maintains the current xDS selector and any resources using that selector. When the selector
* changes, old resources are closed to avoid old config usages.
*/
final class FilterChainSelectorManager {
private static final AtomicLong closerId = new AtomicLong();

private final Object lock = new Object();
@GuardedBy("lock")
private FilterChainSelector selector;
// Avoid HashSet since it does not decrease in size, forming a high water mark.
@GuardedBy("lock")
private TreeSet<Closer> closers = new TreeSet<Closer>(new CloserComparator());

public FilterChainSelector register(Closer closer) {
synchronized (lock) {
Preconditions.checkState(closers.add(closer), "closer already registered");
return selector;
}
}

public void deregister(Closer closer) {
synchronized (lock) {
closers.remove(closer);
}
}

/** Only safe to be called by code that is responsible for updating the selector. */
public FilterChainSelector getSelectorToUpdateSelector() {
synchronized (lock) {
return selector;
}
}

public void updateSelector(FilterChainSelector newSelector) {
TreeSet<Closer> oldClosers;
synchronized (lock) {
oldClosers = closers;
closers = new TreeSet<Closer>(closers.comparator());
selector = newSelector;
}
for (Closer closer : oldClosers) {
closer.closer.run();
}
}

@VisibleForTesting
int getRegisterCount() {
synchronized (lock) {
return closers.size();
}
}

public static final class Closer {
private final long id = closerId.getAndIncrement();
private final Runnable closer;

/** {@code closer} may be run multiple times. */
public Closer(Runnable closer) {
this.closer = Preconditions.checkNotNull(closer, "closer");
}
}

private static class CloserComparator implements Comparator<Closer> {
@Override public int compare(Closer c1, Closer c2) {
return Long.compare(c1.id, c2.id);
}
}
}
13 changes: 8 additions & 5 deletions xds/src/main/java/io/grpc/xds/InternalXdsAttributes.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,8 @@
import io.grpc.Internal;
import io.grpc.NameResolver;
import io.grpc.internal.ObjectPool;
import io.grpc.xds.FilterChainMatchingProtocolNegotiators.FilterChainMatchingHandler.FilterChainSelector;
import io.grpc.xds.XdsNameResolverProvider.CallCounterProvider;
import io.grpc.xds.internal.sds.SslContextProviderSupplier;
import java.util.concurrent.atomic.AtomicReference;

/**
* Internal attributes used for xDS implementation. Do not use.
Expand Down Expand Up @@ -81,9 +79,14 @@ public final class InternalXdsAttributes {
* Filter chain match for network filters.
*/
@Grpc.TransportAttr
static final Attributes.Key<AtomicReference<FilterChainSelector>>
ATTR_FILTER_CHAIN_SELECTOR_REF = Attributes.Key.create(
"io.grpc.xds.InternalXdsAttributes.filterChainSelectorRef");
static final Attributes.Key<FilterChainSelectorManager>
ATTR_FILTER_CHAIN_SELECTOR_MANAGER = Attributes.Key.create(
"io.grpc.xds.InternalXdsAttributes.filterChainSelectorManager");

/** Grace time to use when draining. Null for an infinite grace time. */
@Grpc.TransportAttr
static final Attributes.Key<Long> ATTR_DRAIN_GRACE_NANOS =
Attributes.Key.create("io.grpc.xds.InternalXdsAttributes.drainGraceTime");

private InternalXdsAttributes() {}
}
44 changes: 36 additions & 8 deletions xds/src/main/java/io/grpc/xds/XdsServerBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@

package io.grpc.xds;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static io.grpc.xds.InternalXdsAttributes.ATTR_FILTER_CHAIN_SELECTOR_REF;
import static io.grpc.xds.InternalXdsAttributes.ATTR_DRAIN_GRACE_NANOS;
import static io.grpc.xds.InternalXdsAttributes.ATTR_FILTER_CHAIN_SELECTOR_MANAGER;

import com.google.common.annotations.VisibleForTesting;
import com.google.errorprone.annotations.DoNotCall;
Expand All @@ -33,25 +35,28 @@
import io.grpc.netty.InternalNettyServerCredentials;
import io.grpc.netty.InternalProtocolNegotiator;
import io.grpc.netty.NettyServerBuilder;
import io.grpc.xds.FilterChainMatchingProtocolNegotiators.FilterChainMatchingHandler.FilterChainSelector;
import io.grpc.xds.FilterChainMatchingProtocolNegotiators.FilterChainMatchingNegotiatorServerFactory;
import io.grpc.xds.XdsNameResolverProvider.XdsClientPoolFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Logger;

/**
* A version of {@link ServerBuilder} to create xDS managed servers.
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/7514")
public final class XdsServerBuilder extends ForwardingServerBuilder<XdsServerBuilder> {
private static final long AS_LARGE_AS_INFINITE = TimeUnit.DAYS.toNanos(1000);

private final NettyServerBuilder delegate;
private final int port;
private XdsServingStatusListener xdsServingStatusListener;
private AtomicBoolean isServerBuilt = new AtomicBoolean(false);
private final FilterRegistry filterRegistry = FilterRegistry.getDefaultRegistry();
private XdsClientPoolFactory xdsClientPoolFactory =
SharedXdsClientPoolProvider.getDefaultProvider();
private long drainGraceTime = 10;
private TimeUnit drainGraceTimeUnit = TimeUnit.MINUTES;

private XdsServerBuilder(NettyServerBuilder nettyDelegate, int port) {
this.delegate = nettyDelegate;
Expand All @@ -74,6 +79,26 @@ public XdsServerBuilder xdsServingStatusListener(
return this;
}

/**
* Sets the grace time when draining connections with outdated configuration. When an xDS config
* update changes connection configuration, pre-existing connections stop accepting new RPCs to be
* replaced by new connections. RPCs on those pre-existing connections have the grace time to
* complete. RPCs that do not complete in time will be cancelled, allowing the connection to
* terminate. {@code Long.MAX_VALUE} nano seconds or an unreasonably large value are considered
* infinite. The default is 10 minutes.
*/
public XdsServerBuilder drainGraceTime(long drainGraceTime, TimeUnit drainGraceTimeUnit) {
checkArgument(drainGraceTime >= 0, "drain grace time must be non-negative: %s",
drainGraceTime);
checkNotNull(drainGraceTimeUnit, "drainGraceTimeUnit");
if (drainGraceTimeUnit.toNanos(drainGraceTime) >= AS_LARGE_AS_INFINITE) {
drainGraceTimeUnit = null;
}
this.drainGraceTime = drainGraceTime;
this.drainGraceTimeUnit = drainGraceTimeUnit;
return this;
}

@DoNotCall("Unsupported. Use forPort(int, ServerCredentials) instead")
public static ServerBuilder<?> forPort(int port) {
throw new UnsupportedOperationException(
Expand All @@ -94,12 +119,15 @@ public static XdsServerBuilder forPort(int port, ServerCredentials serverCredent
@Override
public Server build() {
checkState(isServerBuilt.compareAndSet(false, true), "Server already built!");
AtomicReference<FilterChainSelector> filterChainSelectorRef = new AtomicReference<>();
InternalNettyServerBuilder.eagAttributes(delegate, Attributes.newBuilder()
.set(ATTR_FILTER_CHAIN_SELECTOR_REF, filterChainSelectorRef)
.build());
FilterChainSelectorManager filterChainSelectorManager = new FilterChainSelectorManager();
Attributes.Builder builder = Attributes.newBuilder()
.set(ATTR_FILTER_CHAIN_SELECTOR_MANAGER, filterChainSelectorManager);
if (drainGraceTimeUnit != null) {
builder.set(ATTR_DRAIN_GRACE_NANOS, drainGraceTimeUnit.toNanos(drainGraceTime));
}
InternalNettyServerBuilder.eagAttributes(delegate, builder.build());
return new XdsServerWrapper("0.0.0.0:" + port, delegate, xdsServingStatusListener,
filterChainSelectorRef, xdsClientPoolFactory, filterRegistry);
filterChainSelectorManager, xdsClientPoolFactory, filterRegistry);
}

@VisibleForTesting
Expand Down

0 comments on commit 4d5a19c

Please sign in to comment.