From 407520f78efac864aab7eef5ae275b697ccb1912 Mon Sep 17 00:00:00 2001 From: yifeizhuang Date: Thu, 20 Oct 2022 17:58:16 -0700 Subject: [PATCH] okhttp: add maxConnectionAge and maxConnectionAgeGrace --- .../io/grpc/okhttp/OkHttpServerBuilder.java | 44 +++++++++++++++++++ .../io/grpc/okhttp/OkHttpServerTransport.java | 26 ++++++++++- .../okhttp/OkHttpServerTransportTest.java | 21 +++++++++ 3 files changed, 90 insertions(+), 1 deletion(-) diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpServerBuilder.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpServerBuilder.java index d3ea82894b0..f0e8bf41ff9 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpServerBuilder.java +++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpServerBuilder.java @@ -68,6 +68,9 @@ public final class OkHttpServerBuilder extends ForwardingServerBuilder DEFAULT_TRANSPORT_EXECUTOR_POOL = @@ -120,6 +123,8 @@ public static OkHttpServerBuilder forPort(SocketAddress address, ServerCredentia long maxConnectionIdleInNanos = MAX_CONNECTION_IDLE_NANOS_DISABLED; boolean permitKeepAliveWithoutCalls; long permitKeepAliveTimeInNanos = TimeUnit.MINUTES.toNanos(5); + long maxConnectionAgeInNanos = MAX_CONNECTION_AGE_NANOS_DISABLED; + long maxConnectionAgeGraceInNanos = MAX_CONNECTION_AGE_GRACE_NANOS_INFINITE; @VisibleForTesting OkHttpServerBuilder( @@ -209,6 +214,45 @@ public OkHttpServerBuilder maxConnectionIdle(long maxConnectionIdle, TimeUnit ti return this; } + /** + * Sets a custom max connection age, connection lasting longer than which will be gracefully + * terminated. An unreasonably small value might be increased. A random jitter of +/-10% will be + * added to it. {@code Long.MAX_VALUE} nano seconds or an unreasonably large value will disable + * max connection age. + */ + @Override + public OkHttpServerBuilder maxConnectionAge(long maxConnectionAge, TimeUnit timeUnit) { + checkArgument(maxConnectionAge > 0L, "max connection age must be positive: %s", + maxConnectionAge); + maxConnectionAgeInNanos = timeUnit.toNanos(maxConnectionAge); + if (maxConnectionAgeInNanos >= AS_LARGE_AS_INFINITE) { + maxConnectionAgeInNanos = MAX_CONNECTION_AGE_NANOS_DISABLED; + } + if (maxConnectionAgeInNanos < MIN_MAX_CONNECTION_AGE_NANO) { + maxConnectionAgeInNanos = MIN_MAX_CONNECTION_AGE_NANO; + } + return this; + } + + /** + * Sets a custom grace time for the graceful connection termination. Once the max connection age + * is reached, RPCs have the grace time to complete. RPCs that do not complete in time will be + * cancelled, allowing the connection to terminate. {@code Long.MAX_VALUE} nano seconds or an + * unreasonably large value are considered infinite. + * + * @see #maxConnectionAge(long, TimeUnit) + */ + @Override + public OkHttpServerBuilder maxConnectionAgeGrace(long maxConnectionAgeGrace, TimeUnit timeUnit) { + checkArgument(maxConnectionAgeGrace >= 0L, "max connection age grace must be non-negative: %s", + maxConnectionAgeGrace); + maxConnectionAgeGraceInNanos = timeUnit.toNanos(maxConnectionAgeGrace); + if (maxConnectionAgeGraceInNanos >= AS_LARGE_AS_INFINITE) { + maxConnectionAgeGraceInNanos = MAX_CONNECTION_AGE_GRACE_NANOS_INFINITE; + } + return this; + } + /** * Sets a time waiting for read activity after sending a keepalive ping. If the time expires * without any read activity on the connection, the connection is considered dead. An unreasonably diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpServerTransport.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpServerTransport.java index f6099bec17a..1fd98079ede 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpServerTransport.java +++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpServerTransport.java @@ -16,6 +16,7 @@ package io.grpc.okhttp; +import static io.grpc.okhttp.OkHttpServerBuilder.MAX_CONNECTION_AGE_NANOS_DISABLED; import static io.grpc.okhttp.OkHttpServerBuilder.MAX_CONNECTION_IDLE_NANOS_DISABLED; import com.google.common.base.Preconditions; @@ -31,6 +32,7 @@ import io.grpc.internal.GrpcUtil; import io.grpc.internal.KeepAliveEnforcer; import io.grpc.internal.KeepAliveManager; +import io.grpc.internal.LogExceptionRunnable; import io.grpc.internal.MaxConnectionIdleManager; import io.grpc.internal.ObjectPool; import io.grpc.internal.SerializingExecutor; @@ -96,6 +98,7 @@ final class OkHttpServerTransport implements ServerTransport, private Attributes attributes; private KeepAliveManager keepAliveManager; private MaxConnectionIdleManager maxConnectionIdleManager; + private ScheduledFuture maxConnectionAgeMonitor; private final KeepAliveEnforcer keepAliveEnforcer; private final Object lock = new Object(); @@ -223,6 +226,15 @@ public void data(boolean outFinished, int streamId, Buffer source, int byteCount maxConnectionIdleManager.start(this::shutdown, scheduledExecutorService); } + if (config.maxConnectionAgeInNanos != MAX_CONNECTION_AGE_NANOS_DISABLED) { + long maxConnectionAgeInNanos = + (long) ((.9D + Math.random() * .2D) * config.maxConnectionAgeInNanos); + maxConnectionAgeMonitor = scheduledExecutorService.schedule( + new LogExceptionRunnable(() -> shutdown(config.maxConnectionAgeGraceInNanos)), + maxConnectionAgeInNanos, + TimeUnit.NANOSECONDS); + } + transportExecutor.execute( new FrameHandler(variant.newReader(Okio.buffer(Okio.source(socket)), false))); } catch (Error | IOException | RuntimeException ex) { @@ -238,6 +250,10 @@ public void data(boolean outFinished, int streamId, Buffer source, int byteCount @Override public void shutdown() { + shutdown(TimeUnit.SECONDS.toNanos(1L)); + } + + private void shutdown(Long graceTimeInNanos) { synchronized (lock) { if (gracefulShutdown || abruptShutdown) { return; @@ -251,7 +267,7 @@ public void shutdown() { // we also set a timer to limit the upper bound in case the PING is excessively stalled or // the client is malicious. secondGoawayTimer = scheduledExecutorService.schedule( - this::triggerGracefulSecondGoaway, 1, TimeUnit.SECONDS); + this::triggerGracefulSecondGoaway, graceTimeInNanos, TimeUnit.NANOSECONDS); frameWriter.goAway(Integer.MAX_VALUE, ErrorCode.NO_ERROR, new byte[0]); frameWriter.ping(false, 0, GRACEFUL_SHUTDOWN_PING); frameWriter.flush(); @@ -348,6 +364,10 @@ private void terminated() { if (maxConnectionIdleManager != null) { maxConnectionIdleManager.onTransportTermination(); } + + if (maxConnectionAgeMonitor != null) { + maxConnectionAgeMonitor.cancel(false); + } transportExecutor = config.transportExecutorPool.returnObject(transportExecutor); scheduledExecutorService = config.scheduledExecutorServicePool.returnObject(scheduledExecutorService); @@ -479,6 +499,8 @@ static final class Config { final long maxConnectionIdleNanos; final boolean permitKeepAliveWithoutCalls; final long permitKeepAliveTimeInNanos; + final long maxConnectionAgeInNanos; + final long maxConnectionAgeGraceInNanos; public Config( OkHttpServerBuilder builder, @@ -501,6 +523,8 @@ public Config( maxConnectionIdleNanos = builder.maxConnectionIdleInNanos; permitKeepAliveWithoutCalls = builder.permitKeepAliveWithoutCalls; permitKeepAliveTimeInNanos = builder.permitKeepAliveTimeInNanos; + maxConnectionAgeInNanos = builder.maxConnectionAgeInNanos; + maxConnectionAgeGraceInNanos = builder.maxConnectionAgeGraceInNanos; } } diff --git a/okhttp/src/test/java/io/grpc/okhttp/OkHttpServerTransportTest.java b/okhttp/src/test/java/io/grpc/okhttp/OkHttpServerTransportTest.java index a52045011ae..af9b7c12d54 100644 --- a/okhttp/src/test/java/io/grpc/okhttp/OkHttpServerTransportTest.java +++ b/okhttp/src/test/java/io/grpc/okhttp/OkHttpServerTransportTest.java @@ -152,6 +152,27 @@ public void startThenShutdown() throws Exception { shutdownAndTerminate(/*lastStreamId=*/ 0); } + @Test + public void maxConnectionAge() throws Exception { + serverBuilder.maxConnectionAge(5, TimeUnit.SECONDS) + .maxConnectionAgeGrace(1, TimeUnit.SECONDS); + initTransport(); + handshake(); + clientFrameWriter.headers(1, Arrays.asList( + HTTP_SCHEME_HEADER, + METHOD_HEADER, + new Header(Header.TARGET_AUTHORITY, "example.com:80"), + new Header(Header.TARGET_PATH, "/com.example/SimpleService.doit"), + CONTENT_TYPE_HEADER, + TE_HEADER)); + clientFrameWriter.synStream(true, false, 1, -1, Arrays.asList( + new Header("some-client-sent-trailer", "trailer-value"))); + pingPong(); + fakeClock.forwardNanos(TimeUnit.SECONDS.toNanos(6)); // > 1.1 * 5 + fakeClock.forwardNanos(TimeUnit.SECONDS.toNanos(1)); + verifyGracefulShutdown(1); + } + @Test public void maxConnectionIdleTimer() throws Exception { initTransport();