Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

okhttp: add maxConnectionAge and maxConnectionAgeGrace #9649

Merged
merged 1 commit into from Oct 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
44 changes: 44 additions & 0 deletions okhttp/src/main/java/io/grpc/okhttp/OkHttpServerBuilder.java
Expand Up @@ -68,6 +68,9 @@ public final class OkHttpServerBuilder extends ForwardingServerBuilder<OkHttpSer

static final long MAX_CONNECTION_IDLE_NANOS_DISABLED = Long.MAX_VALUE;
private static final long MIN_MAX_CONNECTION_IDLE_NANO = TimeUnit.SECONDS.toNanos(1L);
static final long MAX_CONNECTION_AGE_NANOS_DISABLED = Long.MAX_VALUE;
static final long MAX_CONNECTION_AGE_GRACE_NANOS_INFINITE = Long.MAX_VALUE;
private static final long MIN_MAX_CONNECTION_AGE_NANO = TimeUnit.SECONDS.toNanos(1L);

private static final long AS_LARGE_AS_INFINITE = TimeUnit.DAYS.toNanos(1000L);
private static final ObjectPool<Executor> DEFAULT_TRANSPORT_EXECUTOR_POOL =
Expand Down Expand Up @@ -120,6 +123,8 @@ public static OkHttpServerBuilder forPort(SocketAddress address, ServerCredentia
long maxConnectionIdleInNanos = MAX_CONNECTION_IDLE_NANOS_DISABLED;
boolean permitKeepAliveWithoutCalls;
long permitKeepAliveTimeInNanos = TimeUnit.MINUTES.toNanos(5);
long maxConnectionAgeInNanos = MAX_CONNECTION_AGE_NANOS_DISABLED;
long maxConnectionAgeGraceInNanos = MAX_CONNECTION_AGE_GRACE_NANOS_INFINITE;

@VisibleForTesting
OkHttpServerBuilder(
Expand Down Expand Up @@ -209,6 +214,45 @@ public OkHttpServerBuilder maxConnectionIdle(long maxConnectionIdle, TimeUnit ti
return this;
}

/**
* Sets a custom max connection age, connection lasting longer than which will be gracefully
* terminated. An unreasonably small value might be increased. A random jitter of +/-10% will be
* added to it. {@code Long.MAX_VALUE} nano seconds or an unreasonably large value will disable
* max connection age.
*/
@Override
public OkHttpServerBuilder maxConnectionAge(long maxConnectionAge, TimeUnit timeUnit) {
checkArgument(maxConnectionAge > 0L, "max connection age must be positive: %s",
maxConnectionAge);
maxConnectionAgeInNanos = timeUnit.toNanos(maxConnectionAge);
if (maxConnectionAgeInNanos >= AS_LARGE_AS_INFINITE) {
maxConnectionAgeInNanos = MAX_CONNECTION_AGE_NANOS_DISABLED;
}
if (maxConnectionAgeInNanos < MIN_MAX_CONNECTION_AGE_NANO) {
maxConnectionAgeInNanos = MIN_MAX_CONNECTION_AGE_NANO;
}
return this;
}

/**
* Sets a custom grace time for the graceful connection termination. Once the max connection age
* is reached, RPCs have the grace time to complete. RPCs that do not complete in time will be
* cancelled, allowing the connection to terminate. {@code Long.MAX_VALUE} nano seconds or an
* unreasonably large value are considered infinite.
*
* @see #maxConnectionAge(long, TimeUnit)
*/
@Override
public OkHttpServerBuilder maxConnectionAgeGrace(long maxConnectionAgeGrace, TimeUnit timeUnit) {
checkArgument(maxConnectionAgeGrace >= 0L, "max connection age grace must be non-negative: %s",
maxConnectionAgeGrace);
maxConnectionAgeGraceInNanos = timeUnit.toNanos(maxConnectionAgeGrace);
if (maxConnectionAgeGraceInNanos >= AS_LARGE_AS_INFINITE) {
maxConnectionAgeGraceInNanos = MAX_CONNECTION_AGE_GRACE_NANOS_INFINITE;
}
return this;
}

/**
* Sets a time waiting for read activity after sending a keepalive ping. If the time expires
* without any read activity on the connection, the connection is considered dead. An unreasonably
Expand Down
26 changes: 25 additions & 1 deletion okhttp/src/main/java/io/grpc/okhttp/OkHttpServerTransport.java
Expand Up @@ -16,6 +16,7 @@

package io.grpc.okhttp;

import static io.grpc.okhttp.OkHttpServerBuilder.MAX_CONNECTION_AGE_NANOS_DISABLED;
import static io.grpc.okhttp.OkHttpServerBuilder.MAX_CONNECTION_IDLE_NANOS_DISABLED;

import com.google.common.base.Preconditions;
Expand All @@ -31,6 +32,7 @@
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.KeepAliveEnforcer;
import io.grpc.internal.KeepAliveManager;
import io.grpc.internal.LogExceptionRunnable;
import io.grpc.internal.MaxConnectionIdleManager;
import io.grpc.internal.ObjectPool;
import io.grpc.internal.SerializingExecutor;
Expand Down Expand Up @@ -96,6 +98,7 @@ final class OkHttpServerTransport implements ServerTransport,
private Attributes attributes;
private KeepAliveManager keepAliveManager;
private MaxConnectionIdleManager maxConnectionIdleManager;
private ScheduledFuture<?> maxConnectionAgeMonitor;
private final KeepAliveEnforcer keepAliveEnforcer;

private final Object lock = new Object();
Expand Down Expand Up @@ -223,6 +226,15 @@ public void data(boolean outFinished, int streamId, Buffer source, int byteCount
maxConnectionIdleManager.start(this::shutdown, scheduledExecutorService);
}

if (config.maxConnectionAgeInNanos != MAX_CONNECTION_AGE_NANOS_DISABLED) {
long maxConnectionAgeInNanos =
(long) ((.9D + Math.random() * .2D) * config.maxConnectionAgeInNanos);
maxConnectionAgeMonitor = scheduledExecutorService.schedule(
new LogExceptionRunnable(() -> shutdown(config.maxConnectionAgeGraceInNanos)),
maxConnectionAgeInNanos,
TimeUnit.NANOSECONDS);
}

transportExecutor.execute(
new FrameHandler(variant.newReader(Okio.buffer(Okio.source(socket)), false)));
} catch (Error | IOException | RuntimeException ex) {
Expand All @@ -238,6 +250,10 @@ public void data(boolean outFinished, int streamId, Buffer source, int byteCount

@Override
public void shutdown() {
shutdown(TimeUnit.SECONDS.toNanos(1L));
}

private void shutdown(Long graceTimeInNanos) {
synchronized (lock) {
if (gracefulShutdown || abruptShutdown) {
return;
Expand All @@ -251,7 +267,7 @@ public void shutdown() {
// we also set a timer to limit the upper bound in case the PING is excessively stalled or
// the client is malicious.
secondGoawayTimer = scheduledExecutorService.schedule(
this::triggerGracefulSecondGoaway, 1, TimeUnit.SECONDS);
this::triggerGracefulSecondGoaway, graceTimeInNanos, TimeUnit.NANOSECONDS);
frameWriter.goAway(Integer.MAX_VALUE, ErrorCode.NO_ERROR, new byte[0]);
frameWriter.ping(false, 0, GRACEFUL_SHUTDOWN_PING);
frameWriter.flush();
Expand Down Expand Up @@ -348,6 +364,10 @@ private void terminated() {
if (maxConnectionIdleManager != null) {
maxConnectionIdleManager.onTransportTermination();
}

if (maxConnectionAgeMonitor != null) {
maxConnectionAgeMonitor.cancel(false);
}
transportExecutor = config.transportExecutorPool.returnObject(transportExecutor);
scheduledExecutorService =
config.scheduledExecutorServicePool.returnObject(scheduledExecutorService);
Expand Down Expand Up @@ -479,6 +499,8 @@ static final class Config {
final long maxConnectionIdleNanos;
final boolean permitKeepAliveWithoutCalls;
final long permitKeepAliveTimeInNanos;
final long maxConnectionAgeInNanos;
final long maxConnectionAgeGraceInNanos;

public Config(
OkHttpServerBuilder builder,
Expand All @@ -501,6 +523,8 @@ public Config(
maxConnectionIdleNanos = builder.maxConnectionIdleInNanos;
permitKeepAliveWithoutCalls = builder.permitKeepAliveWithoutCalls;
permitKeepAliveTimeInNanos = builder.permitKeepAliveTimeInNanos;
maxConnectionAgeInNanos = builder.maxConnectionAgeInNanos;
maxConnectionAgeGraceInNanos = builder.maxConnectionAgeGraceInNanos;
}
}

Expand Down
21 changes: 21 additions & 0 deletions okhttp/src/test/java/io/grpc/okhttp/OkHttpServerTransportTest.java
Expand Up @@ -152,6 +152,27 @@ public void startThenShutdown() throws Exception {
shutdownAndTerminate(/*lastStreamId=*/ 0);
}

@Test
public void maxConnectionAge() throws Exception {
serverBuilder.maxConnectionAge(5, TimeUnit.SECONDS)
.maxConnectionAgeGrace(1, TimeUnit.SECONDS);
initTransport();
handshake();
clientFrameWriter.headers(1, Arrays.asList(
HTTP_SCHEME_HEADER,
METHOD_HEADER,
new Header(Header.TARGET_AUTHORITY, "example.com:80"),
new Header(Header.TARGET_PATH, "/com.example/SimpleService.doit"),
CONTENT_TYPE_HEADER,
TE_HEADER));
clientFrameWriter.synStream(true, false, 1, -1, Arrays.asList(
new Header("some-client-sent-trailer", "trailer-value")));
pingPong();
fakeClock.forwardNanos(TimeUnit.SECONDS.toNanos(6)); // > 1.1 * 5
fakeClock.forwardNanos(TimeUnit.SECONDS.toNanos(1));
verifyGracefulShutdown(1);
}

@Test
public void maxConnectionIdleTimer() throws Exception {
initTransport();
Expand Down