From c18fc2bc68c08805b2553336a3bf02f0c8c50972 Mon Sep 17 00:00:00 2001 From: Jonas Konrad Date: Mon, 13 Jun 2022 08:44:56 +0200 Subject: [PATCH] Allow controlling time flow for EmbeddedEventLoop (#12459) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Motivation: Tests using EmbeddedEventLoop can run faster if they can "advance time" so that scheduled tasks (e.g. timeouts) run earlier. Additionally, "freeze time" functionality can improve reliability of such tests. Modification: - Introduce a protected method `AbstractScheduledEventExecutor.getCurrentTimeNanos` that replaces the previous static `nanoTime` method (now deprecated). Replace usages of `nanoTime` with the new method. - Override `getCurrentTimeNanos` with the new time control (freeze, unfreeze, advanceBy) features in `EmbeddedEventLoop`. - Add a microbenchmark that tests one of the sites that seemed most likely to see negative performance impact by the change (`ScheduledFutureTask.delayNanos`). Result: Fixes #12433. Local runs of the `ScheduleFutureTaskBenchmark` microbenchmark shows no evidence for performance impact (within error bounds of each other): ``` before: Benchmark (num) Mode Cnt Score Error Units ScheduleFutureTaskBenchmark.scheduleCancelLotsOutsideLoop 100000 thrpt 20 132.437 ± 15.116 ops/s ScheduleFutureTaskBenchmark.scheduleLots 100000 thrpt 20 694.475 ± 8.184 ops/s ScheduleFutureTaskBenchmark.scheduleLotsOutsideLoop 100000 thrpt 20 88.037 ± 4.013 ops/s after: Benchmark (num) Mode Cnt Score Error Units ScheduleFutureTaskBenchmark.scheduleCancelLotsOutsideLoop 100000 thrpt 20 149.629 ± 7.514 ops/s ScheduleFutureTaskBenchmark.scheduleLots 100000 thrpt 20 688.954 ± 7.831 ops/s ScheduleFutureTaskBenchmark.scheduleLotsOutsideLoop 100000 thrpt 20 85.426 ± 1.104 ops/s ``` The new `ScheduleFutureTaskDeadlineBenchmark` shows some performance degradation: ``` before: Benchmark Mode Cnt Score Error Units ScheduleFutureTaskDeadlineBenchmark.requestDeadline thrpt 20 60726336.795 ± 280054.533 ops/s after: Benchmark Mode Cnt Score Error Units ScheduleFutureTaskDeadlineBenchmark.requestDeadline thrpt 20 56948231.480 ± 188264.092 ops/s ``` The difference is small, but it's there, so I investigated this further using jitwatch. Looking at the generated assembly, the call to `getCurrentTimeNanos` is devirtualized and inlined in the absence of `EmbeddedEventLoop`, so the code is mostly identical. However there is the added getfield and checkcast for the executor, which probably explains the discrepancy. In my opinion this is acceptable, because the performance impact is not severe, this use is likely the worst case (virtual call through `scheduledExecutorService()`), and it is never as hot as in this benchmark. Note that if an `EmbeddedEventLoop` is present in the application, the performance impact is likely substantially higher, because this would necessitate a virtual call. However this is not an issue for production applications, and the affected code is still not very hot. Co-authored-by: Norman Maurer --- .../AbstractScheduledEventExecutor.java | 59 ++++++++++++----- .../util/concurrent/GlobalEventExecutor.java | 9 ++- .../util/concurrent/ScheduledFutureTask.java | 29 ++------- .../concurrent/SingleThreadEventExecutor.java | 28 ++++---- .../AbstractScheduledEventExecutorTest.java | 7 ++ .../concurrent/ScheduledFutureTaskTest.java | 27 -------- .../ScheduledFutureTaskDeadlineBenchmark.java | 64 +++++++++++++++++++ .../channel/embedded/EmbeddedChannel.java | 29 +++++++++ .../channel/embedded/EmbeddedEventLoop.java | 52 ++++++++++++++- .../channel/embedded/EmbeddedChannelTest.java | 59 +++++++++++++++++ 10 files changed, 282 insertions(+), 81 deletions(-) delete mode 100644 common/src/test/java/io/netty/util/concurrent/ScheduledFutureTaskTest.java create mode 100644 microbench/src/main/java/io/netty/util/concurrent/ScheduledFutureTaskDeadlineBenchmark.java diff --git a/common/src/main/java/io/netty/util/concurrent/AbstractScheduledEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/AbstractScheduledEventExecutor.java index 7f86e55d566..ffabe723e33 100644 --- a/common/src/main/java/io/netty/util/concurrent/AbstractScheduledEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/AbstractScheduledEventExecutor.java @@ -19,8 +19,6 @@ import io.netty.util.internal.ObjectUtil; import io.netty.util.internal.PriorityQueue; -import static io.netty.util.concurrent.ScheduledFutureTask.deadlineNanos; - import java.util.Comparator; import java.util.Queue; import java.util.concurrent.Callable; @@ -38,7 +36,9 @@ public int compare(ScheduledFutureTask o1, ScheduledFutureTask o2) { } }; - static final Runnable WAKEUP_TASK = new Runnable() { + private static final long START_TIME = System.nanoTime(); + + static final Runnable WAKEUP_TASK = new Runnable() { @Override public void run() { } // Do nothing }; @@ -54,8 +54,36 @@ protected AbstractScheduledEventExecutor(EventExecutorGroup parent) { super(parent); } + /** + * Get the current time in nanoseconds by this executor's clock. This is not the same as {@link System#nanoTime()} + * for two reasons: + * + * + */ + protected long getCurrentTimeNanos() { + return defaultCurrentTimeNanos(); + } + + /** + * @deprecated Use the non-static {@link #getCurrentTimeNanos()} instead. + */ + @Deprecated protected static long nanoTime() { - return ScheduledFutureTask.nanoTime(); + return defaultCurrentTimeNanos(); + } + + static long defaultCurrentTimeNanos() { + return System.nanoTime() - START_TIME; + } + + static long deadlineNanos(long nanoTime, long delay) { + long deadlineNanos = nanoTime + delay; + // Guard against overflow + return deadlineNanos < 0 ? Long.MAX_VALUE : deadlineNanos; } /** @@ -65,7 +93,7 @@ protected static long nanoTime() { * @return the number of nano seconds from now {@code deadlineNanos} would expire. */ protected static long deadlineToDelayNanos(long deadlineNanos) { - return ScheduledFutureTask.deadlineToDelayNanos(deadlineNanos); + return ScheduledFutureTask.deadlineToDelayNanos(defaultCurrentTimeNanos(), deadlineNanos); } /** @@ -73,7 +101,7 @@ protected static long deadlineToDelayNanos(long deadlineNanos) { * @return initial value used for delay and computations based upon a monatomic time source. */ protected static long initialNanoTime() { - return ScheduledFutureTask.initialNanoTime(); + return START_TIME; } PriorityQueue> scheduledTaskQueue() { @@ -116,12 +144,12 @@ protected void cancelScheduledTasks() { * @see #pollScheduledTask(long) */ protected final Runnable pollScheduledTask() { - return pollScheduledTask(nanoTime()); + return pollScheduledTask(getCurrentTimeNanos()); } /** * Return the {@link Runnable} which is ready to be executed with the given {@code nanoTime}. - * You should use {@link #nanoTime()} to retrieve the correct {@code nanoTime}. + * You should use {@link #getCurrentTimeNanos()} to retrieve the correct {@code nanoTime}. */ protected final Runnable pollScheduledTask(long nanoTime) { assert inEventLoop(); @@ -162,7 +190,7 @@ final ScheduledFutureTask peekScheduledTask() { */ protected final boolean hasScheduledTasks() { ScheduledFutureTask scheduledTask = peekScheduledTask(); - return scheduledTask != null && scheduledTask.deadlineNanos() <= nanoTime(); + return scheduledTask != null && scheduledTask.deadlineNanos() <= getCurrentTimeNanos(); } @Override @@ -177,7 +205,7 @@ public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) return schedule(new ScheduledFutureTask( this, command, - deadlineNanos(unit.toNanos(delay)))); + deadlineNanos(getCurrentTimeNanos(), unit.toNanos(delay)))); } @Override @@ -189,7 +217,8 @@ public ScheduledFuture schedule(Callable callable, long delay, TimeUni } validateScheduled0(delay, unit); - return schedule(new ScheduledFutureTask(this, callable, deadlineNanos(unit.toNanos(delay)))); + return schedule(new ScheduledFutureTask( + this, callable, deadlineNanos(getCurrentTimeNanos(), unit.toNanos(delay)))); } @Override @@ -208,7 +237,7 @@ public ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDela validateScheduled0(period, unit); return schedule(new ScheduledFutureTask( - this, command, deadlineNanos(unit.toNanos(initialDelay)), unit.toNanos(period))); + this, command, deadlineNanos(getCurrentTimeNanos(), unit.toNanos(initialDelay)), unit.toNanos(period))); } @Override @@ -228,7 +257,7 @@ public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialD validateScheduled0(delay, unit); return schedule(new ScheduledFutureTask( - this, command, deadlineNanos(unit.toNanos(initialDelay)), -unit.toNanos(delay))); + this, command, deadlineNanos(getCurrentTimeNanos(), unit.toNanos(initialDelay)), -unit.toNanos(delay))); } @SuppressWarnings("deprecation") @@ -291,7 +320,7 @@ final void removeScheduled(final ScheduledFutureTask task) { * to wake the {@link EventExecutor} thread if required. * * @param deadlineNanos deadline of the to-be-scheduled task - * relative to {@link AbstractScheduledEventExecutor#nanoTime()} + * relative to {@link AbstractScheduledEventExecutor#getCurrentTimeNanos()} * @return {@code true} if the {@link EventExecutor} thread should be woken, {@code false} otherwise */ protected boolean beforeScheduledTaskSubmitted(long deadlineNanos) { @@ -301,7 +330,7 @@ protected boolean beforeScheduledTaskSubmitted(long deadlineNanos) { /** * See {@link #beforeScheduledTaskSubmitted(long)}. Called only after that method returns false. * - * @param deadlineNanos relative to {@link AbstractScheduledEventExecutor#nanoTime()} + * @param deadlineNanos relative to {@link AbstractScheduledEventExecutor#getCurrentTimeNanos()} * @return {@code true} if the {@link EventExecutor} thread should be woken, {@code false} otherwise */ protected boolean afterScheduledTaskSubmitted(long deadlineNanos) { diff --git a/common/src/main/java/io/netty/util/concurrent/GlobalEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/GlobalEventExecutor.java index e110bf66b76..186af97f184 100644 --- a/common/src/main/java/io/netty/util/concurrent/GlobalEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/GlobalEventExecutor.java @@ -53,7 +53,12 @@ public final class GlobalEventExecutor extends AbstractScheduledEventExecutor im public void run() { // NOOP } - }, null), ScheduledFutureTask.deadlineNanos(SCHEDULE_QUIET_PERIOD_INTERVAL), -SCHEDULE_QUIET_PERIOD_INTERVAL); + }, null), + // note: the getCurrentTimeNanos() call here only works because this is a final class, otherwise the method + // could be overridden leading to unsafe initialization here! + deadlineNanos(getCurrentTimeNanos(), SCHEDULE_QUIET_PERIOD_INTERVAL), + -SCHEDULE_QUIET_PERIOD_INTERVAL + ); // because the GlobalEventExecutor is a singleton, tasks submitted to it can come from arbitrary threads and this // can trigger the creation of a thread from arbitrary thread groups; for this reason, the thread factory must not @@ -117,7 +122,7 @@ Runnable takeTask() { } private void fetchFromScheduledTaskQueue() { - long nanoTime = AbstractScheduledEventExecutor.nanoTime(); + long nanoTime = getCurrentTimeNanos(); Runnable scheduledTask = pollScheduledTask(nanoTime); while (scheduledTask != null) { taskQueue.add(scheduledTask); diff --git a/common/src/main/java/io/netty/util/concurrent/ScheduledFutureTask.java b/common/src/main/java/io/netty/util/concurrent/ScheduledFutureTask.java index ab39b86aeaf..23fbe5a1e77 100644 --- a/common/src/main/java/io/netty/util/concurrent/ScheduledFutureTask.java +++ b/common/src/main/java/io/netty/util/concurrent/ScheduledFutureTask.java @@ -25,22 +25,6 @@ @SuppressWarnings("ComparableImplementedButEqualsNotOverridden") final class ScheduledFutureTask extends PromiseTask implements ScheduledFuture, PriorityQueueNode { - private static final long START_TIME = System.nanoTime(); - - static long nanoTime() { - return System.nanoTime() - START_TIME; - } - - static long deadlineNanos(long delay) { - long deadlineNanos = nanoTime() + delay; - // Guard against overflow - return deadlineNanos < 0 ? Long.MAX_VALUE : deadlineNanos; - } - - static long initialNanoTime() { - return START_TIME; - } - // set once when added to priority queue private long id; @@ -109,22 +93,21 @@ void setConsumed() { // Optimization to avoid checking system clock again // after deadline has passed and task has been dequeued if (periodNanos == 0) { - assert nanoTime() >= deadlineNanos; + assert scheduledExecutor().getCurrentTimeNanos() >= deadlineNanos; deadlineNanos = 0L; } } public long delayNanos() { - return deadlineToDelayNanos(deadlineNanos()); + return delayNanos(scheduledExecutor().getCurrentTimeNanos()); } - static long deadlineToDelayNanos(long deadlineNanos) { - return deadlineNanos == 0L ? 0L : Math.max(0L, deadlineNanos - nanoTime()); + static long deadlineToDelayNanos(long currentTimeNanos, long deadlineNanos) { + return deadlineNanos == 0L ? 0L : Math.max(0L, deadlineNanos - currentTimeNanos); } public long delayNanos(long currentTimeNanos) { - return deadlineNanos == 0L ? 0L - : Math.max(0L, deadlineNanos() - (currentTimeNanos - START_TIME)); + return deadlineToDelayNanos(currentTimeNanos, deadlineNanos); } @Override @@ -178,7 +161,7 @@ public void run() { if (periodNanos > 0) { deadlineNanos += periodNanos; } else { - deadlineNanos = nanoTime() - periodNanos; + deadlineNanos = scheduledExecutor().getCurrentTimeNanos() - periodNanos; } if (!isCancelled()) { scheduledExecutor().scheduledTaskQueue().add(this); diff --git a/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java index 0e68abd66d7..6c7020ee234 100644 --- a/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java @@ -280,7 +280,7 @@ private boolean fetchFromScheduledTaskQueue() { if (scheduledTaskQueue == null || scheduledTaskQueue.isEmpty()) { return true; } - long nanoTime = AbstractScheduledEventExecutor.nanoTime(); + long nanoTime = getCurrentTimeNanos(); for (;;) { Runnable scheduledTask = pollScheduledTask(nanoTime); if (scheduledTask == null) { @@ -301,7 +301,7 @@ private boolean executeExpiredScheduledTasks() { if (scheduledTaskQueue == null || scheduledTaskQueue.isEmpty()) { return false; } - long nanoTime = AbstractScheduledEventExecutor.nanoTime(); + long nanoTime = getCurrentTimeNanos(); Runnable scheduledTask = pollScheduledTask(nanoTime); if (scheduledTask == null) { return false; @@ -378,7 +378,7 @@ protected boolean runAllTasks() { } while (!fetchedAll); // keep on processing until we fetched all scheduled tasks. if (ranAtLeastOne) { - lastExecutionTime = ScheduledFutureTask.nanoTime(); + lastExecutionTime = getCurrentTimeNanos(); } afterRunningAllTasks(); return ranAtLeastOne; @@ -403,7 +403,7 @@ protected final boolean runScheduledAndExecutorTasks(final int maxDrainAttempts) } while (ranAtLeastOneTask && ++drainAttempt < maxDrainAttempts); if (drainAttempt > 0) { - lastExecutionTime = ScheduledFutureTask.nanoTime(); + lastExecutionTime = getCurrentTimeNanos(); } afterRunningAllTasks(); @@ -463,7 +463,7 @@ protected boolean runAllTasks(long timeoutNanos) { return false; } - final long deadline = timeoutNanos > 0 ? ScheduledFutureTask.nanoTime() + timeoutNanos : 0; + final long deadline = timeoutNanos > 0 ? getCurrentTimeNanos() + timeoutNanos : 0; long runTasks = 0; long lastExecutionTime; for (;;) { @@ -474,7 +474,7 @@ protected boolean runAllTasks(long timeoutNanos) { // Check timeout every 64 tasks because nanoTime() is relatively expensive. // XXX: Hard-coded value - will make it configurable if it is really a problem. if ((runTasks & 0x3F) == 0) { - lastExecutionTime = ScheduledFutureTask.nanoTime(); + lastExecutionTime = getCurrentTimeNanos(); if (lastExecutionTime >= deadline) { break; } @@ -482,7 +482,7 @@ protected boolean runAllTasks(long timeoutNanos) { task = pollTask(); if (task == null) { - lastExecutionTime = ScheduledFutureTask.nanoTime(); + lastExecutionTime = getCurrentTimeNanos(); break; } } @@ -502,6 +502,8 @@ protected void afterRunningAllTasks() { } * Returns the amount of time left until the scheduled task with the closest dead line is executed. */ protected long delayNanos(long currentTimeNanos) { + currentTimeNanos -= initialNanoTime(); + ScheduledFutureTask scheduledTask = peekScheduledTask(); if (scheduledTask == null) { return SCHEDULE_PURGE_INTERVAL; @@ -511,14 +513,14 @@ protected long delayNanos(long currentTimeNanos) { } /** - * Returns the absolute point in time (relative to {@link #nanoTime()}) at which the next + * Returns the absolute point in time (relative to {@link #getCurrentTimeNanos()}) at which the next * closest scheduled task should run. */ @UnstableApi protected long deadlineNanos() { ScheduledFutureTask scheduledTask = peekScheduledTask(); if (scheduledTask == null) { - return nanoTime() + SCHEDULE_PURGE_INTERVAL; + return getCurrentTimeNanos() + SCHEDULE_PURGE_INTERVAL; } return scheduledTask.deadlineNanos(); } @@ -531,7 +533,7 @@ protected long deadlineNanos() { * checks. */ protected void updateLastExecutionTime() { - lastExecutionTime = ScheduledFutureTask.nanoTime(); + lastExecutionTime = getCurrentTimeNanos(); } /** @@ -609,7 +611,7 @@ private boolean runShutdownHooks() { } if (ran) { - lastExecutionTime = ScheduledFutureTask.nanoTime(); + lastExecutionTime = getCurrentTimeNanos(); } return ran; @@ -755,7 +757,7 @@ protected boolean confirmShutdown() { cancelScheduledTasks(); if (gracefulShutdownStartTime == 0) { - gracefulShutdownStartTime = ScheduledFutureTask.nanoTime(); + gracefulShutdownStartTime = getCurrentTimeNanos(); } if (runAllTasks() || runShutdownHooks()) { @@ -774,7 +776,7 @@ protected boolean confirmShutdown() { return false; } - final long nanoTime = ScheduledFutureTask.nanoTime(); + final long nanoTime = getCurrentTimeNanos(); if (isShutdown() || nanoTime - gracefulShutdownStartTime > gracefulShutdownTimeout) { return true; diff --git a/common/src/test/java/io/netty/util/concurrent/AbstractScheduledEventExecutorTest.java b/common/src/test/java/io/netty/util/concurrent/AbstractScheduledEventExecutorTest.java index fca6ad2d27c..13c4447ba67 100644 --- a/common/src/test/java/io/netty/util/concurrent/AbstractScheduledEventExecutorTest.java +++ b/common/src/test/java/io/netty/util/concurrent/AbstractScheduledEventExecutorTest.java @@ -15,6 +15,7 @@ */ package io.netty.util.concurrent; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.function.Executable; @@ -117,6 +118,12 @@ public void execute() { }); } + @Test + public void testDeadlineNanosNotOverflow() { + Assertions.assertEquals(Long.MAX_VALUE, AbstractScheduledEventExecutor.deadlineNanos( + AbstractScheduledEventExecutor.defaultCurrentTimeNanos(), Long.MAX_VALUE)); + } + private static final class TestScheduledEventExecutor extends AbstractScheduledEventExecutor { @Override public boolean isShuttingDown() { diff --git a/common/src/test/java/io/netty/util/concurrent/ScheduledFutureTaskTest.java b/common/src/test/java/io/netty/util/concurrent/ScheduledFutureTaskTest.java deleted file mode 100644 index 248a0887733..00000000000 --- a/common/src/test/java/io/netty/util/concurrent/ScheduledFutureTaskTest.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Copyright 2018 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.util.concurrent; - -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.Assertions; - -public class ScheduledFutureTaskTest { - - @Test - public void testDeadlineNanosNotOverflow() { - Assertions.assertEquals(Long.MAX_VALUE, ScheduledFutureTask.deadlineNanos(Long.MAX_VALUE)); - } -} diff --git a/microbench/src/main/java/io/netty/util/concurrent/ScheduledFutureTaskDeadlineBenchmark.java b/microbench/src/main/java/io/netty/util/concurrent/ScheduledFutureTaskDeadlineBenchmark.java new file mode 100644 index 00000000000..bcb8ae67896 --- /dev/null +++ b/microbench/src/main/java/io/netty/util/concurrent/ScheduledFutureTaskDeadlineBenchmark.java @@ -0,0 +1,64 @@ +/* + * Copyright 2022 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.util.concurrent; + +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.microbench.util.AbstractMicrobenchmark; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.concurrent.TimeUnit; + +@Warmup(iterations = 5, time = 3, timeUnit = TimeUnit.SECONDS) +@Measurement(iterations = 10, time = 3, timeUnit = TimeUnit.SECONDS) +@State(Scope.Benchmark) +public class ScheduledFutureTaskDeadlineBenchmark extends AbstractMicrobenchmark { + @State(Scope.Thread) + public static class ThreadState { + + AbstractScheduledEventExecutor eventLoop; + ScheduledFutureTask future; + + @Setup(Level.Trial) + public void reset() { + eventLoop = (AbstractScheduledEventExecutor) new NioEventLoopGroup(1).next(); + future = (ScheduledFutureTask) eventLoop.schedule(new Runnable() { + @Override + public void run() { + } + }, 100, TimeUnit.DAYS); + } + + @TearDown(Level.Trial) + public void shutdown() { + future.cancel(true); + eventLoop.parent().shutdownGracefully().awaitUninterruptibly(); + } + } + + @Benchmark + @Threads(1) + public long requestDeadline(final ThreadState threadState) { + return threadState.future.delayNanos(); + } +} diff --git a/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java b/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java index 7d776cfb529..f27327e5af2 100644 --- a/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java +++ b/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java @@ -19,6 +19,7 @@ import java.nio.channels.ClosedChannelException; import java.util.ArrayDeque; import java.util.Queue; +import java.util.concurrent.TimeUnit; import io.netty.channel.AbstractChannel; import io.netty.channel.Channel; @@ -635,6 +636,34 @@ private void recordException(Throwable cause) { } } + /** + * Advance the clock of the event loop of this channel by the given duration. Any scheduled tasks will execute + * sooner by the given time (but {@link #runScheduledPendingTasks()} still needs to be called). + */ + public void advanceTimeBy(long duration, TimeUnit unit) { + embeddedEventLoop().advanceTimeBy(unit.toNanos(duration)); + } + + /** + * Freeze the clock of this channel's event loop. Any scheduled tasks that are not already due will not run on + * future {@link #runScheduledPendingTasks()} calls. While the event loop is frozen, it is still possible to + * {@link #advanceTimeBy(long, TimeUnit) advance time} manually so that scheduled tasks execute. + */ + public void freezeTime() { + embeddedEventLoop().freezeTime(); + } + + /** + * Unfreeze an event loop that was {@link #freezeTime() frozen}. Time will continue at the point where + * {@link #freezeTime()} stopped it: if a task was scheduled ten minutes in the future and {@link #freezeTime()} + * was called, it will run ten minutes after this method is called again (assuming no + * {@link #advanceTimeBy(long, TimeUnit)} calls, and assuming pending scheduled tasks are run at that time using + * {@link #runScheduledPendingTasks()}). + */ + public void unfreezeTime() { + embeddedEventLoop().unfreezeTime(); + } + /** * Checks for the presence of an {@link Exception}. */ diff --git a/transport/src/main/java/io/netty/channel/embedded/EmbeddedEventLoop.java b/transport/src/main/java/io/netty/channel/embedded/EmbeddedEventLoop.java index c1ffc7a314e..b8b73d9f8bd 100644 --- a/transport/src/main/java/io/netty/channel/embedded/EmbeddedEventLoop.java +++ b/transport/src/main/java/io/netty/channel/embedded/EmbeddedEventLoop.java @@ -30,6 +30,22 @@ import java.util.concurrent.TimeUnit; final class EmbeddedEventLoop extends AbstractScheduledEventExecutor implements EventLoop { + /** + * When time is not {@link #timeFrozen frozen}, the base time to subtract from {@link System#nanoTime()}. When time + * is frozen, this variable is unused. + * + * Initialized to {@link #initialNanoTime()} so that until one of the time mutator methods is called, + * {@link #getCurrentTimeNanos()} matches the default behavior. + */ + private long startTime = initialNanoTime(); + /** + * When time is frozen, the timestamp returned by {@link #getCurrentTimeNanos()}. When unfrozen, this is unused. + */ + private long frozenTimestamp; + /** + * Whether time is currently frozen. + */ + private boolean timeFrozen; private final Queue tasks = new ArrayDeque(2); @@ -60,7 +76,7 @@ void runTasks() { } long runScheduledTasks() { - long time = AbstractScheduledEventExecutor.nanoTime(); + long time = getCurrentTimeNanos(); for (;;) { Runnable task = pollScheduledTask(time); if (task == null) { @@ -75,6 +91,40 @@ long nextScheduledTask() { return nextScheduledTaskNano(); } + @Override + protected long getCurrentTimeNanos() { + if (timeFrozen) { + return frozenTimestamp; + } + return System.nanoTime() - startTime; + } + + void advanceTimeBy(long nanos) { + if (timeFrozen) { + frozenTimestamp += nanos; + } else { + // startTime is subtracted from nanoTime, so increasing the startTime will advance getCurrentTimeNanos + startTime -= nanos; + } + } + + void freezeTime() { + if (!timeFrozen) { + frozenTimestamp = getCurrentTimeNanos(); + timeFrozen = true; + } + } + + void unfreezeTime() { + if (timeFrozen) { + // we want getCurrentTimeNanos to continue right where frozenTimestamp left off: + // getCurrentTimeNanos = nanoTime - startTime = frozenTimestamp + // then solve for startTime + startTime = System.nanoTime() - frozenTimestamp; + timeFrozen = false; + } + } + @Override protected void cancelScheduledTasks() { super.cancelScheduledTasks(); diff --git a/transport/src/test/java/io/netty/channel/embedded/EmbeddedChannelTest.java b/transport/src/test/java/io/netty/channel/embedded/EmbeddedChannelTest.java index fd279ed5ca6..4999ea82013 100644 --- a/transport/src/test/java/io/netty/channel/embedded/EmbeddedChannelTest.java +++ b/transport/src/test/java/io/netty/channel/embedded/EmbeddedChannelTest.java @@ -32,6 +32,7 @@ import io.netty.util.ReferenceCountUtil; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; +import io.netty.util.concurrent.ScheduledFuture; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; @@ -676,6 +677,64 @@ void testRunPendingTasksForNotRegisteredChannel() { } } + @Test + @Timeout(30) // generous timeout, just make sure we don't actually wait for the full 10 mins... + void testAdvanceTime() { + EmbeddedChannel channel = new EmbeddedChannel(); + Runnable runnable = new Runnable() { + @Override + public void run() { + } + }; + ScheduledFuture future10 = channel.eventLoop().schedule(runnable, 10, TimeUnit.MINUTES); + ScheduledFuture future20 = channel.eventLoop().schedule(runnable, 20, TimeUnit.MINUTES); + + channel.runPendingTasks(); + assertFalse(future10.isDone()); + assertFalse(future20.isDone()); + + channel.advanceTimeBy(10, TimeUnit.MINUTES); + channel.runPendingTasks(); + assertTrue(future10.isDone()); + assertFalse(future20.isDone()); + } + + @Test + @Timeout(30) // generous timeout, just make sure we don't actually wait for the full 10 mins... + void testFreezeTime() { + EmbeddedChannel channel = new EmbeddedChannel(); + Runnable runnable = new Runnable() { + @Override + public void run() { + } + }; + + channel.freezeTime(); + // this future will complete after 10min + ScheduledFuture future10 = channel.eventLoop().schedule(runnable, 10, TimeUnit.MINUTES); + // this future will complete after 10min + 1ns + ScheduledFuture future101 = channel.eventLoop().schedule(runnable, + TimeUnit.MINUTES.toNanos(10) + 1, TimeUnit.NANOSECONDS); + // this future will complete after 20min + ScheduledFuture future20 = channel.eventLoop().schedule(runnable, 20, TimeUnit.MINUTES); + + channel.runPendingTasks(); + assertFalse(future10.isDone()); + assertFalse(future101.isDone()); + assertFalse(future20.isDone()); + + channel.advanceTimeBy(10, TimeUnit.MINUTES); + channel.runPendingTasks(); + assertTrue(future10.isDone()); + assertFalse(future101.isDone()); + assertFalse(future20.isDone()); + + channel.unfreezeTime(); + channel.runPendingTasks(); + assertTrue(future101.isDone()); + assertFalse(future20.isDone()); + } + private static void release(ByteBuf... buffers) { for (ByteBuf buffer : buffers) { if (buffer.refCnt() > 0) {