From 4a8bc5e2e019242c25d0b87424d91027f43e5240 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Mon, 13 Jun 2022 19:49:25 +0200 Subject: [PATCH] Allow controlling time flow for EmbeddedEventLoop (#12459) (#12464) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Motivation: Tests using EmbeddedEventLoop can run faster if they can "advance time" so that scheduled tasks (e.g. timeouts) run earlier. Additionally, "freeze time" functionality can improve reliability of such tests. Modification: - Introduce a protected method `AbstractScheduledEventExecutor.getCurrentTimeNanos` that replaces the previous static `nanoTime` method (now deprecated). Replace usages of `nanoTime` with the new method. - Override `getCurrentTimeNanos` with the new time control (freeze, unfreeze, advanceBy) features in `EmbeddedEventLoop`. - Add a microbenchmark that tests one of the sites that seemed most likely to see negative performance impact by the change (`ScheduledFutureTask.delayNanos`). Result: Fixes #12433. Local runs of the `ScheduleFutureTaskBenchmark` microbenchmark shows no evidence for performance impact (within error bounds of each other): ``` before: Benchmark (num) Mode Cnt Score Error Units ScheduleFutureTaskBenchmark.scheduleCancelLotsOutsideLoop 100000 thrpt 20 132.437 ± 15.116 ops/s ScheduleFutureTaskBenchmark.scheduleLots 100000 thrpt 20 694.475 ± 8.184 ops/s ScheduleFutureTaskBenchmark.scheduleLotsOutsideLoop 100000 thrpt 20 88.037 ± 4.013 ops/s after: Benchmark (num) Mode Cnt Score Error Units ScheduleFutureTaskBenchmark.scheduleCancelLotsOutsideLoop 100000 thrpt 20 149.629 ± 7.514 ops/s ScheduleFutureTaskBenchmark.scheduleLots 100000 thrpt 20 688.954 ± 7.831 ops/s ScheduleFutureTaskBenchmark.scheduleLotsOutsideLoop 100000 thrpt 20 85.426 ± 1.104 ops/s ``` The new `ScheduleFutureTaskDeadlineBenchmark` shows some performance degradation: ``` before: Benchmark Mode Cnt Score Error Units ScheduleFutureTaskDeadlineBenchmark.requestDeadline thrpt 20 60726336.795 ± 280054.533 ops/s after: Benchmark Mode Cnt Score Error Units ScheduleFutureTaskDeadlineBenchmark.requestDeadline thrpt 20 56948231.480 ± 188264.092 ops/s ``` The difference is small, but it's there, so I investigated this further using jitwatch. Looking at the generated assembly, the call to `getCurrentTimeNanos` is devirtualized and inlined in the absence of `EmbeddedEventLoop`, so the code is mostly identical. However there is the added getfield and checkcast for the executor, which probably explains the discrepancy. In my opinion this is acceptable, because the performance impact is not severe, this use is likely the worst case (virtual call through `scheduledExecutorService()`), and it is never as hot as in this benchmark. Note that if an `EmbeddedEventLoop` is present in the application, the performance impact is likely substantially higher, because this would necessitate a virtual call. However this is not an issue for production applications, and the affected code is still not very hot. Co-authored-by: Norman Maurer Co-authored-by: Jonas Konrad --- .../AbstractScheduledEventExecutor.java | 52 ++++++++++++++----- .../util/concurrent/GlobalEventExecutor.java | 22 ++++---- .../RunnableScheduledFutureAdapter.java | 10 ++-- .../concurrent/SingleThreadEventExecutor.java | 13 ++--- .../AbstractScheduledEventExecutorTest.java | 3 +- .../channel/embedded/EmbeddedChannel.java | 32 ++++++++++++ .../channel/embedded/EmbeddedEventLoop.java | 52 ++++++++++++++++++- 7 files changed, 151 insertions(+), 33 deletions(-) diff --git a/common/src/main/java/io/netty5/util/concurrent/AbstractScheduledEventExecutor.java b/common/src/main/java/io/netty5/util/concurrent/AbstractScheduledEventExecutor.java index 415ff204bf8..83c8ec1c894 100644 --- a/common/src/main/java/io/netty5/util/concurrent/AbstractScheduledEventExecutor.java +++ b/common/src/main/java/io/netty5/util/concurrent/AbstractScheduledEventExecutor.java @@ -50,16 +50,39 @@ protected AbstractScheduledEventExecutor() { * {@link System#nanoTime()}. */ public static long nanoTime() { - return System.nanoTime() - START_TIME; + return defaultCurrentTimeNanos(); + } + + /** + * The initial value used for delay and computations based upon a monatomic time source. + * @return initial value used for delay and computations based upon a monatomic time source. + */ + protected static long initialNanoTime() { + return START_TIME; } /** - * The deadline (in nanoseconds) for a given delay (in nanoseconds). + * Get the current time in nanoseconds by this executor's clock. This is not the same as {@link System#nanoTime()} + * for two reasons: + * + *
    + *
  • We apply a fixed offset to the {@link System#nanoTime() nanoTime}
  • + *
  • Implementations (in particular EmbeddedEventLoop) may use their own time source so they can control time + * for testing purposes.
  • + *
*/ - static long deadlineNanos(long delay) { - long deadlineNanos = nanoTime() + delay; + protected long getCurrentTimeNanos() { + return defaultCurrentTimeNanos(); + } + + static long defaultCurrentTimeNanos() { + return System.nanoTime() - START_TIME; + } + + static long deadlineNanos(long nanoTime, long delay) { + long deadlineNanos = nanoTime + delay; // Guard against overflow - return deadlineNanos < 0? Long.MAX_VALUE : deadlineNanos; + return deadlineNanos < 0 ? Long.MAX_VALUE : deadlineNanos; } PriorityQueue> scheduledTaskQueue() { @@ -102,12 +125,12 @@ protected final void cancelScheduledTasks() { * @see #pollScheduledTask(long) */ protected final RunnableScheduledFuture pollScheduledTask() { - return pollScheduledTask(nanoTime()); + return pollScheduledTask(getCurrentTimeNanos()); } /** * Return the {@link Runnable} which is ready to be executed with the given {@code nanoTime}. You should use {@link - * #nanoTime()} to retrieve the correct {@code nanoTime}. + * #getCurrentTimeNanos()} to retrieve the correct {@code nanoTime}. *

* This method MUST be called only when {@link #inEventLoop()} is {@code true}. */ @@ -138,7 +161,7 @@ protected final long nextScheduledTaskNano() { if (scheduledTask == null) { return -1; } - return Math.max(0, scheduledTask.deadlineNanos() - nanoTime()); + return Math.max(0, scheduledTask.deadlineNanos() - getCurrentTimeNanos()); } final RunnableScheduledFuture peekScheduledTask() { @@ -158,7 +181,7 @@ protected final boolean hasScheduledTasks() { assert inEventLoop(); Queue> scheduledTaskQueue = this.scheduledTaskQueue; RunnableScheduledFutureNode scheduledTask = scheduledTaskQueue == null? null : scheduledTaskQueue.peek(); - return scheduledTask != null && scheduledTask.deadlineNanos() <= nanoTime(); + return scheduledTask != null && scheduledTask.deadlineNanos() <= getCurrentTimeNanos(); } @Override @@ -169,7 +192,7 @@ public Future schedule(Runnable command, long delay, TimeUnit unit) { delay = 0; } RunnableScheduledFuture task = newScheduledTaskFor( - callable(command, null), deadlineNanos(unit.toNanos(delay)), 0); + callable(command, null), deadlineNanos(getCurrentTimeNanos(), unit.toNanos(delay)), 0); return schedule(task); } @@ -180,7 +203,8 @@ public Future schedule(Callable callable, long delay, TimeUnit unit) { if (delay < 0) { delay = 0; } - RunnableScheduledFuture task = newScheduledTaskFor(callable, deadlineNanos(unit.toNanos(delay)), 0); + RunnableScheduledFuture task = newScheduledTaskFor( + callable, deadlineNanos(getCurrentTimeNanos(), unit.toNanos(delay)), 0); return schedule(task); } @@ -198,7 +222,8 @@ public Future scheduleAtFixedRate(Runnable command, long initialDelay, lon } RunnableScheduledFuture task = newScheduledTaskFor( - callable(command, null), deadlineNanos(unit.toNanos(initialDelay)), unit.toNanos(period)); + callable(command, null), + deadlineNanos(getCurrentTimeNanos(), unit.toNanos(initialDelay)), unit.toNanos(period)); return schedule(task); } @@ -216,7 +241,8 @@ public Future scheduleWithFixedDelay(Runnable command, long initialDelay, } RunnableScheduledFuture task = newScheduledTaskFor( - callable(command, null), deadlineNanos(unit.toNanos(initialDelay)), -unit.toNanos(delay)); + callable(command, null), + deadlineNanos(getCurrentTimeNanos(), unit.toNanos(initialDelay)), -unit.toNanos(delay)); return schedule(task); } diff --git a/common/src/main/java/io/netty5/util/concurrent/GlobalEventExecutor.java b/common/src/main/java/io/netty5/util/concurrent/GlobalEventExecutor.java index 2e5b58ef6c0..ea66df56872 100644 --- a/common/src/main/java/io/netty5/util/concurrent/GlobalEventExecutor.java +++ b/common/src/main/java/io/netty5/util/concurrent/GlobalEventExecutor.java @@ -45,17 +45,11 @@ public final class GlobalEventExecutor extends AbstractScheduledEventExecutor im private static final long SCHEDULE_QUIET_PERIOD_INTERVAL = TimeUnit.SECONDS.toNanos(1); - private static final RunnableScheduledFutureAdapter QUIET_PERIOD_TASK; + private final RunnableScheduledFutureAdapter quietPeriodTask; public static final GlobalEventExecutor INSTANCE; static { INSTANCE = new GlobalEventExecutor(); - QUIET_PERIOD_TASK = new RunnableScheduledFutureAdapter<>( - INSTANCE, INSTANCE.newPromise(), Executors.callable(() -> { - // NOOP - }, null), deadlineNanos(SCHEDULE_QUIET_PERIOD_INTERVAL), -SCHEDULE_QUIET_PERIOD_INTERVAL); - - INSTANCE.scheduledTaskQueue().add(QUIET_PERIOD_TASK); } private final BlockingQueue taskQueue = new LinkedBlockingQueue<>(); @@ -75,6 +69,16 @@ public final class GlobalEventExecutor extends AbstractScheduledEventExecutor im private GlobalEventExecutor() { threadFactory = ThreadExecutorMap.apply(new DefaultThreadFactory( DefaultThreadFactory.toPoolName(getClass()), false, Thread.NORM_PRIORITY, null), this); + quietPeriodTask = new RunnableScheduledFutureAdapter<>( + this, newPromise(), Executors.callable(() -> { + // NOOP + }, null), + // note: the getCurrentTimeNanos() call here only works because this is a final class, otherwise + // the method could be overridden leading to unsafe initialization here! + deadlineNanos(getCurrentTimeNanos(), SCHEDULE_QUIET_PERIOD_INTERVAL), + -SCHEDULE_QUIET_PERIOD_INTERVAL); + + scheduledTaskQueue().add(quietPeriodTask); } /** @@ -122,7 +126,7 @@ private Runnable takeTask() { } private void fetchFromScheduledTaskQueue() { - long nanoTime = AbstractScheduledEventExecutor.nanoTime(); + long nanoTime = getCurrentTimeNanos(); Runnable scheduledTask = pollScheduledTask(nanoTime); while (scheduledTask != null) { taskQueue.add(scheduledTask); @@ -243,7 +247,7 @@ public void run() { logger.warn("Unexpected exception from the global event executor: ", t); } - if (task != QUIET_PERIOD_TASK) { + if (task != quietPeriodTask) { continue; } } diff --git a/common/src/main/java/io/netty5/util/concurrent/RunnableScheduledFutureAdapter.java b/common/src/main/java/io/netty5/util/concurrent/RunnableScheduledFutureAdapter.java index b04e96dcb1f..601e26d4dc8 100644 --- a/common/src/main/java/io/netty5/util/concurrent/RunnableScheduledFutureAdapter.java +++ b/common/src/main/java/io/netty5/util/concurrent/RunnableScheduledFutureAdapter.java @@ -64,12 +64,16 @@ public long deadlineNanos() { @Override public long delayNanos() { - return Math.max(0, deadlineNanos() - AbstractScheduledEventExecutor.nanoTime()); + return delayNanos(executor.getCurrentTimeNanos()); } @Override public long delayNanos(long currentTimeNanos) { - return Math.max(0, deadlineNanos() - (currentTimeNanos - AbstractScheduledEventExecutor.START_TIME)); + return deadlineToDelayNanos(currentTimeNanos, deadlineNanos); + } + + private static long deadlineToDelayNanos(long currentTimeNanos, long deadlineNanos) { + return deadlineNanos == 0L ? 0L : Math.max(0L, deadlineNanos - currentTimeNanos); } @Override @@ -127,7 +131,7 @@ public void run() { if (p > 0) { deadlineNanos += p; } else { - deadlineNanos = AbstractScheduledEventExecutor.nanoTime() - p; + deadlineNanos = executor.getCurrentTimeNanos() - p; } if (!isCancelled()) { executor.schedule(this); diff --git a/common/src/main/java/io/netty5/util/concurrent/SingleThreadEventExecutor.java b/common/src/main/java/io/netty5/util/concurrent/SingleThreadEventExecutor.java index d72bff3f734..f7dafc04812 100644 --- a/common/src/main/java/io/netty5/util/concurrent/SingleThreadEventExecutor.java +++ b/common/src/main/java/io/netty5/util/concurrent/SingleThreadEventExecutor.java @@ -250,7 +250,7 @@ protected final Runnable takeTask() { } private boolean fetchFromScheduledTaskQueue() { - long nanoTime = AbstractScheduledEventExecutor.nanoTime(); + long nanoTime = getCurrentTimeNanos(); RunnableScheduledFuture scheduledTask = pollScheduledTask(nanoTime); while (scheduledTask != null) { if (!taskQueue.offer(scheduledTask)) { @@ -379,6 +379,7 @@ protected int runAllTasks(int maxTasks) { */ protected final long delayNanos(long currentTimeNanos) { assert inEventLoop(); + currentTimeNanos -= START_TIME; RunnableScheduledFuture scheduledTask = peekScheduledTask(); if (scheduledTask == null) { return SCHEDULE_PURGE_INTERVAL; @@ -388,7 +389,7 @@ protected final long delayNanos(long currentTimeNanos) { } /** - * Returns the absolute point in time (relative to {@link #nanoTime()}) at which the the next + * Returns the absolute point in time (relative to {@link #getCurrentTimeNanos()} ()}) at which the next * closest scheduled task should run. * * This method must be called from the {@link EventExecutor} thread. @@ -397,7 +398,7 @@ protected final long deadlineNanos() { assert inEventLoop(); RunnableScheduledFuture scheduledTask = peekScheduledTask(); if (scheduledTask == null) { - return nanoTime() + SCHEDULE_PURGE_INTERVAL; + return getCurrentTimeNanos() + SCHEDULE_PURGE_INTERVAL; } return scheduledTask.deadlineNanos(); } @@ -412,7 +413,7 @@ protected final long deadlineNanos() { */ protected final void updateLastExecutionTime() { assert inEventLoop(); - lastExecutionTime = nanoTime(); + lastExecutionTime = getCurrentTimeNanos(); } /** @@ -599,7 +600,7 @@ boolean confirmShutdown0() { cancelScheduledTasks(); if (gracefulShutdownStartTime == 0) { - gracefulShutdownStartTime = nanoTime(); + gracefulShutdownStartTime = getCurrentTimeNanos(); } if (runAllTasks() || runShutdownHooks()) { @@ -618,7 +619,7 @@ boolean confirmShutdown0() { return false; } - final long nanoTime = nanoTime(); + final long nanoTime = getCurrentTimeNanos(); if (isShutdown() || nanoTime - gracefulShutdownStartTime > gracefulShutdownTimeout) { return true; diff --git a/common/src/test/java/io/netty5/util/concurrent/AbstractScheduledEventExecutorTest.java b/common/src/test/java/io/netty5/util/concurrent/AbstractScheduledEventExecutorTest.java index e3dc272ca4d..44bbd4efad3 100644 --- a/common/src/test/java/io/netty5/util/concurrent/AbstractScheduledEventExecutorTest.java +++ b/common/src/test/java/io/netty5/util/concurrent/AbstractScheduledEventExecutorTest.java @@ -102,7 +102,8 @@ public void testScheduleWithFixedDelayNegative() { @Test public void testDeadlineNanosNotOverflow() { - assertEquals(Long.MAX_VALUE, AbstractScheduledEventExecutor.deadlineNanos(Long.MAX_VALUE)); + assertEquals(Long.MAX_VALUE, AbstractScheduledEventExecutor.deadlineNanos( + AbstractScheduledEventExecutor.defaultCurrentTimeNanos(), Long.MAX_VALUE)); } private static final class TestScheduledEventExecutor extends AbstractScheduledEventExecutor { diff --git a/transport/src/main/java/io/netty5/channel/embedded/EmbeddedChannel.java b/transport/src/main/java/io/netty5/channel/embedded/EmbeddedChannel.java index 4e8472be9d6..a7bda5ab227 100644 --- a/transport/src/main/java/io/netty5/channel/embedded/EmbeddedChannel.java +++ b/transport/src/main/java/io/netty5/channel/embedded/EmbeddedChannel.java @@ -42,6 +42,7 @@ import java.nio.channels.ClosedChannelException; import java.util.ArrayDeque; import java.util.Queue; +import java.util.concurrent.TimeUnit; import static io.netty5.util.internal.PlatformDependent.throwException; import static java.util.Objects.requireNonNull; @@ -614,6 +615,37 @@ private void recordException(Throwable cause) { } } + private EmbeddedEventLoop embeddedEventLoop() { + return (EmbeddedEventLoop) executor(); + } + /** + * Advance the clock of the event loop of this channel by the given duration. Any scheduled tasks will execute + * sooner by the given time (but {@link #runScheduledPendingTasks()} still needs to be called). + */ + public void advanceTimeBy(long duration, TimeUnit unit) { + embeddedEventLoop().advanceTimeBy(unit.toNanos(duration)); + } + + /** + * Freeze the clock of this channel's event loop. Any scheduled tasks that are not already due will not run on + * future {@link #runScheduledPendingTasks()} calls. While the event loop is frozen, it is still possible to + * {@link #advanceTimeBy(long, TimeUnit) advance time} manually so that scheduled tasks execute. + */ + public void freezeTime() { + embeddedEventLoop().freezeTime(); + } + + /** + * Unfreeze an event loop that was {@link #freezeTime() frozen}. Time will continue at the point where + * {@link #freezeTime()} stopped it: if a task was scheduled ten minutes in the future and {@link #freezeTime()} + * was called, it will run ten minutes after this method is called again (assuming no + * {@link #advanceTimeBy(long, TimeUnit)} calls, and assuming pending scheduled tasks are run at that time using + * {@link #runScheduledPendingTasks()}). + */ + public void unfreezeTime() { + embeddedEventLoop().unfreezeTime(); + } + /** * Checks for the presence of an {@link Exception}. */ diff --git a/transport/src/main/java/io/netty5/channel/embedded/EmbeddedEventLoop.java b/transport/src/main/java/io/netty5/channel/embedded/EmbeddedEventLoop.java index 24816d2cf1d..bab2fd99592 100644 --- a/transport/src/main/java/io/netty5/channel/embedded/EmbeddedEventLoop.java +++ b/transport/src/main/java/io/netty5/channel/embedded/EmbeddedEventLoop.java @@ -28,6 +28,22 @@ import static java.util.Objects.requireNonNull; final class EmbeddedEventLoop extends AbstractScheduledEventExecutor implements EventLoop { + /** + * When time is not {@link #timeFrozen frozen}, the base time to subtract from {@link System#nanoTime()}. When time + * is frozen, this variable is unused. + * + * Initialized to {@link #initialNanoTime()} so that until one of the time mutator methods is called, + * {@link #getCurrentTimeNanos()} matches the default behavior. + */ + private long startTime = initialNanoTime(); + /** + * When time is frozen, the timestamp returned by {@link #getCurrentTimeNanos()}. When unfrozen, this is unused. + */ + private long frozenTimestamp; + /** + * Whether time is currently frozen. + */ + private boolean timeFrozen; private final Queue tasks = new ArrayDeque<>(2); boolean running; @@ -91,7 +107,7 @@ void runTasks() { } long runScheduledTasks() { - long time = AbstractScheduledEventExecutor.nanoTime(); + long time = getCurrentTimeNanos(); boolean wasRunning = running; try { for (;;) { @@ -123,6 +139,40 @@ void cancelScheduled() { } } + @Override + protected long getCurrentTimeNanos() { + if (timeFrozen) { + return frozenTimestamp; + } + return System.nanoTime() - startTime; + } + + void advanceTimeBy(long nanos) { + if (timeFrozen) { + frozenTimestamp += nanos; + } else { + // startTime is subtracted from nanoTime, so increasing the startTime will advance getCurrentTimeNanos + startTime -= nanos; + } + } + + void freezeTime() { + if (!timeFrozen) { + frozenTimestamp = getCurrentTimeNanos(); + timeFrozen = true; + } + } + + void unfreezeTime() { + if (timeFrozen) { + // we want getCurrentTimeNanos to continue right where frozenTimestamp left off: + // getCurrentTimeNanos = nanoTime - startTime = frozenTimestamp + // then solve for startTime + startTime = System.nanoTime() - frozenTimestamp; + timeFrozen = false; + } + } + @Override public Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { throw new UnsupportedOperationException();