From ece7f8221a14d0ec61c31d55e0b04d01bfa061ba Mon Sep 17 00:00:00 2001 From: yawkat Date: Fri, 10 Jun 2022 12:40:34 +0200 Subject: [PATCH 1/8] embedded time changes --- .../AbstractScheduledEventExecutor.java | 50 +++++++++++----- .../util/concurrent/GlobalEventExecutor.java | 9 ++- .../util/concurrent/ScheduledFutureTask.java | 27 +++------ .../concurrent/SingleThreadEventExecutor.java | 28 ++++----- .../concurrent/ScheduledFutureTaskTest.java | 2 +- .../channel/embedded/EmbeddedChannel.java | 13 +++++ .../channel/embedded/EmbeddedEventLoop.java | 53 ++++++++++++++++- .../channel/embedded/EmbeddedChannelTest.java | 58 +++++++++++++++++++ 8 files changed, 191 insertions(+), 49 deletions(-) diff --git a/common/src/main/java/io/netty/util/concurrent/AbstractScheduledEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/AbstractScheduledEventExecutor.java index 7f86e55d566..006f83dc983 100644 --- a/common/src/main/java/io/netty/util/concurrent/AbstractScheduledEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/AbstractScheduledEventExecutor.java @@ -38,7 +38,9 @@ public int compare(ScheduledFutureTask o1, ScheduledFutureTask o2) { } }; - static final Runnable WAKEUP_TASK = new Runnable() { + private static final long START_TIME = System.nanoTime(); + + static final Runnable WAKEUP_TASK = new Runnable() { @Override public void run() { } // Do nothing }; @@ -54,8 +56,30 @@ protected AbstractScheduledEventExecutor(EventExecutorGroup parent) { super(parent); } + /** + * Get the current time in nanoseconds by this executor's clock. This is not the same as {@link System#nanoTime()} + * for two reasons: + * + * + */ + protected long getCurrentTimeNanos() { + return defaultCurrentTimeNanos(); + } + + /** + * @deprecated Use the non-static {@link #getCurrentTimeNanos()} instead. + */ + @Deprecated protected static long nanoTime() { - return ScheduledFutureTask.nanoTime(); + return defaultCurrentTimeNanos(); + } + + static long defaultCurrentTimeNanos() { + return System.nanoTime() - START_TIME; } /** @@ -65,7 +89,7 @@ protected static long nanoTime() { * @return the number of nano seconds from now {@code deadlineNanos} would expire. */ protected static long deadlineToDelayNanos(long deadlineNanos) { - return ScheduledFutureTask.deadlineToDelayNanos(deadlineNanos); + return ScheduledFutureTask.deadlineToDelayNanos(defaultCurrentTimeNanos(), deadlineNanos); } /** @@ -73,7 +97,7 @@ protected static long deadlineToDelayNanos(long deadlineNanos) { * @return initial value used for delay and computations based upon a monatomic time source. */ protected static long initialNanoTime() { - return ScheduledFutureTask.initialNanoTime(); + return START_TIME; } PriorityQueue> scheduledTaskQueue() { @@ -116,12 +140,12 @@ protected void cancelScheduledTasks() { * @see #pollScheduledTask(long) */ protected final Runnable pollScheduledTask() { - return pollScheduledTask(nanoTime()); + return pollScheduledTask(getCurrentTimeNanos()); } /** * Return the {@link Runnable} which is ready to be executed with the given {@code nanoTime}. - * You should use {@link #nanoTime()} to retrieve the correct {@code nanoTime}. + * You should use {@link #getCurrentTimeNanos()} to retrieve the correct {@code nanoTime}. */ protected final Runnable pollScheduledTask(long nanoTime) { assert inEventLoop(); @@ -162,7 +186,7 @@ final ScheduledFutureTask peekScheduledTask() { */ protected final boolean hasScheduledTasks() { ScheduledFutureTask scheduledTask = peekScheduledTask(); - return scheduledTask != null && scheduledTask.deadlineNanos() <= nanoTime(); + return scheduledTask != null && scheduledTask.deadlineNanos() <= getCurrentTimeNanos(); } @Override @@ -177,7 +201,7 @@ public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) return schedule(new ScheduledFutureTask( this, command, - deadlineNanos(unit.toNanos(delay)))); + deadlineNanos(getCurrentTimeNanos(), unit.toNanos(delay)))); } @Override @@ -189,7 +213,7 @@ public ScheduledFuture schedule(Callable callable, long delay, TimeUni } validateScheduled0(delay, unit); - return schedule(new ScheduledFutureTask(this, callable, deadlineNanos(unit.toNanos(delay)))); + return schedule(new ScheduledFutureTask(this, callable, deadlineNanos(getCurrentTimeNanos(), unit.toNanos(delay)))); } @Override @@ -208,7 +232,7 @@ public ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDela validateScheduled0(period, unit); return schedule(new ScheduledFutureTask( - this, command, deadlineNanos(unit.toNanos(initialDelay)), unit.toNanos(period))); + this, command, deadlineNanos(getCurrentTimeNanos(), unit.toNanos(initialDelay)), unit.toNanos(period))); } @Override @@ -228,7 +252,7 @@ public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialD validateScheduled0(delay, unit); return schedule(new ScheduledFutureTask( - this, command, deadlineNanos(unit.toNanos(initialDelay)), -unit.toNanos(delay))); + this, command, deadlineNanos(getCurrentTimeNanos(), unit.toNanos(initialDelay)), -unit.toNanos(delay))); } @SuppressWarnings("deprecation") @@ -291,7 +315,7 @@ final void removeScheduled(final ScheduledFutureTask task) { * to wake the {@link EventExecutor} thread if required. * * @param deadlineNanos deadline of the to-be-scheduled task - * relative to {@link AbstractScheduledEventExecutor#nanoTime()} + * relative to {@link AbstractScheduledEventExecutor#getCurrentTimeNanos()} * @return {@code true} if the {@link EventExecutor} thread should be woken, {@code false} otherwise */ protected boolean beforeScheduledTaskSubmitted(long deadlineNanos) { @@ -301,7 +325,7 @@ protected boolean beforeScheduledTaskSubmitted(long deadlineNanos) { /** * See {@link #beforeScheduledTaskSubmitted(long)}. Called only after that method returns false. * - * @param deadlineNanos relative to {@link AbstractScheduledEventExecutor#nanoTime()} + * @param deadlineNanos relative to {@link AbstractScheduledEventExecutor#getCurrentTimeNanos()} * @return {@code true} if the {@link EventExecutor} thread should be woken, {@code false} otherwise */ protected boolean afterScheduledTaskSubmitted(long deadlineNanos) { diff --git a/common/src/main/java/io/netty/util/concurrent/GlobalEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/GlobalEventExecutor.java index e110bf66b76..9c5cfea3d34 100644 --- a/common/src/main/java/io/netty/util/concurrent/GlobalEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/GlobalEventExecutor.java @@ -53,7 +53,12 @@ public final class GlobalEventExecutor extends AbstractScheduledEventExecutor im public void run() { // NOOP } - }, null), ScheduledFutureTask.deadlineNanos(SCHEDULE_QUIET_PERIOD_INTERVAL), -SCHEDULE_QUIET_PERIOD_INTERVAL); + }, null), + // note: the getCurrentTimeNanos() call here only works because this is a final class, otherwise the method + // could be overridden leading to unsafe initialization here! + ScheduledFutureTask.deadlineNanos(getCurrentTimeNanos(), SCHEDULE_QUIET_PERIOD_INTERVAL), + -SCHEDULE_QUIET_PERIOD_INTERVAL + ); // because the GlobalEventExecutor is a singleton, tasks submitted to it can come from arbitrary threads and this // can trigger the creation of a thread from arbitrary thread groups; for this reason, the thread factory must not @@ -117,7 +122,7 @@ Runnable takeTask() { } private void fetchFromScheduledTaskQueue() { - long nanoTime = AbstractScheduledEventExecutor.nanoTime(); + long nanoTime = getCurrentTimeNanos(); Runnable scheduledTask = pollScheduledTask(nanoTime); while (scheduledTask != null) { taskQueue.add(scheduledTask); diff --git a/common/src/main/java/io/netty/util/concurrent/ScheduledFutureTask.java b/common/src/main/java/io/netty/util/concurrent/ScheduledFutureTask.java index ab39b86aeaf..df1afacdf2d 100644 --- a/common/src/main/java/io/netty/util/concurrent/ScheduledFutureTask.java +++ b/common/src/main/java/io/netty/util/concurrent/ScheduledFutureTask.java @@ -25,22 +25,12 @@ @SuppressWarnings("ComparableImplementedButEqualsNotOverridden") final class ScheduledFutureTask extends PromiseTask implements ScheduledFuture, PriorityQueueNode { - private static final long START_TIME = System.nanoTime(); - - static long nanoTime() { - return System.nanoTime() - START_TIME; - } - - static long deadlineNanos(long delay) { - long deadlineNanos = nanoTime() + delay; + static long deadlineNanos(long nanoTime, long delay) { + long deadlineNanos = nanoTime + delay; // Guard against overflow return deadlineNanos < 0 ? Long.MAX_VALUE : deadlineNanos; } - static long initialNanoTime() { - return START_TIME; - } - // set once when added to priority queue private long id; @@ -109,22 +99,21 @@ void setConsumed() { // Optimization to avoid checking system clock again // after deadline has passed and task has been dequeued if (periodNanos == 0) { - assert nanoTime() >= deadlineNanos; + assert scheduledExecutor().getCurrentTimeNanos() >= deadlineNanos; deadlineNanos = 0L; } } public long delayNanos() { - return deadlineToDelayNanos(deadlineNanos()); + return delayNanos(scheduledExecutor().getCurrentTimeNanos()); } - static long deadlineToDelayNanos(long deadlineNanos) { - return deadlineNanos == 0L ? 0L : Math.max(0L, deadlineNanos - nanoTime()); + static long deadlineToDelayNanos(long currentTimeNanos, long deadlineNanos) { + return deadlineNanos == 0L ? 0L : Math.max(0L, deadlineNanos - currentTimeNanos); } public long delayNanos(long currentTimeNanos) { - return deadlineNanos == 0L ? 0L - : Math.max(0L, deadlineNanos() - (currentTimeNanos - START_TIME)); + return deadlineToDelayNanos(currentTimeNanos, deadlineNanos); } @Override @@ -178,7 +167,7 @@ public void run() { if (periodNanos > 0) { deadlineNanos += periodNanos; } else { - deadlineNanos = nanoTime() - periodNanos; + deadlineNanos = scheduledExecutor().getCurrentTimeNanos() - periodNanos; } if (!isCancelled()) { scheduledExecutor().scheduledTaskQueue().add(this); diff --git a/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java index 0e68abd66d7..6c7020ee234 100644 --- a/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java @@ -280,7 +280,7 @@ private boolean fetchFromScheduledTaskQueue() { if (scheduledTaskQueue == null || scheduledTaskQueue.isEmpty()) { return true; } - long nanoTime = AbstractScheduledEventExecutor.nanoTime(); + long nanoTime = getCurrentTimeNanos(); for (;;) { Runnable scheduledTask = pollScheduledTask(nanoTime); if (scheduledTask == null) { @@ -301,7 +301,7 @@ private boolean executeExpiredScheduledTasks() { if (scheduledTaskQueue == null || scheduledTaskQueue.isEmpty()) { return false; } - long nanoTime = AbstractScheduledEventExecutor.nanoTime(); + long nanoTime = getCurrentTimeNanos(); Runnable scheduledTask = pollScheduledTask(nanoTime); if (scheduledTask == null) { return false; @@ -378,7 +378,7 @@ protected boolean runAllTasks() { } while (!fetchedAll); // keep on processing until we fetched all scheduled tasks. if (ranAtLeastOne) { - lastExecutionTime = ScheduledFutureTask.nanoTime(); + lastExecutionTime = getCurrentTimeNanos(); } afterRunningAllTasks(); return ranAtLeastOne; @@ -403,7 +403,7 @@ protected final boolean runScheduledAndExecutorTasks(final int maxDrainAttempts) } while (ranAtLeastOneTask && ++drainAttempt < maxDrainAttempts); if (drainAttempt > 0) { - lastExecutionTime = ScheduledFutureTask.nanoTime(); + lastExecutionTime = getCurrentTimeNanos(); } afterRunningAllTasks(); @@ -463,7 +463,7 @@ protected boolean runAllTasks(long timeoutNanos) { return false; } - final long deadline = timeoutNanos > 0 ? ScheduledFutureTask.nanoTime() + timeoutNanos : 0; + final long deadline = timeoutNanos > 0 ? getCurrentTimeNanos() + timeoutNanos : 0; long runTasks = 0; long lastExecutionTime; for (;;) { @@ -474,7 +474,7 @@ protected boolean runAllTasks(long timeoutNanos) { // Check timeout every 64 tasks because nanoTime() is relatively expensive. // XXX: Hard-coded value - will make it configurable if it is really a problem. if ((runTasks & 0x3F) == 0) { - lastExecutionTime = ScheduledFutureTask.nanoTime(); + lastExecutionTime = getCurrentTimeNanos(); if (lastExecutionTime >= deadline) { break; } @@ -482,7 +482,7 @@ protected boolean runAllTasks(long timeoutNanos) { task = pollTask(); if (task == null) { - lastExecutionTime = ScheduledFutureTask.nanoTime(); + lastExecutionTime = getCurrentTimeNanos(); break; } } @@ -502,6 +502,8 @@ protected void afterRunningAllTasks() { } * Returns the amount of time left until the scheduled task with the closest dead line is executed. */ protected long delayNanos(long currentTimeNanos) { + currentTimeNanos -= initialNanoTime(); + ScheduledFutureTask scheduledTask = peekScheduledTask(); if (scheduledTask == null) { return SCHEDULE_PURGE_INTERVAL; @@ -511,14 +513,14 @@ protected long delayNanos(long currentTimeNanos) { } /** - * Returns the absolute point in time (relative to {@link #nanoTime()}) at which the next + * Returns the absolute point in time (relative to {@link #getCurrentTimeNanos()}) at which the next * closest scheduled task should run. */ @UnstableApi protected long deadlineNanos() { ScheduledFutureTask scheduledTask = peekScheduledTask(); if (scheduledTask == null) { - return nanoTime() + SCHEDULE_PURGE_INTERVAL; + return getCurrentTimeNanos() + SCHEDULE_PURGE_INTERVAL; } return scheduledTask.deadlineNanos(); } @@ -531,7 +533,7 @@ protected long deadlineNanos() { * checks. */ protected void updateLastExecutionTime() { - lastExecutionTime = ScheduledFutureTask.nanoTime(); + lastExecutionTime = getCurrentTimeNanos(); } /** @@ -609,7 +611,7 @@ private boolean runShutdownHooks() { } if (ran) { - lastExecutionTime = ScheduledFutureTask.nanoTime(); + lastExecutionTime = getCurrentTimeNanos(); } return ran; @@ -755,7 +757,7 @@ protected boolean confirmShutdown() { cancelScheduledTasks(); if (gracefulShutdownStartTime == 0) { - gracefulShutdownStartTime = ScheduledFutureTask.nanoTime(); + gracefulShutdownStartTime = getCurrentTimeNanos(); } if (runAllTasks() || runShutdownHooks()) { @@ -774,7 +776,7 @@ protected boolean confirmShutdown() { return false; } - final long nanoTime = ScheduledFutureTask.nanoTime(); + final long nanoTime = getCurrentTimeNanos(); if (isShutdown() || nanoTime - gracefulShutdownStartTime > gracefulShutdownTimeout) { return true; diff --git a/common/src/test/java/io/netty/util/concurrent/ScheduledFutureTaskTest.java b/common/src/test/java/io/netty/util/concurrent/ScheduledFutureTaskTest.java index 248a0887733..5c6def23f19 100644 --- a/common/src/test/java/io/netty/util/concurrent/ScheduledFutureTaskTest.java +++ b/common/src/test/java/io/netty/util/concurrent/ScheduledFutureTaskTest.java @@ -22,6 +22,6 @@ public class ScheduledFutureTaskTest { @Test public void testDeadlineNanosNotOverflow() { - Assertions.assertEquals(Long.MAX_VALUE, ScheduledFutureTask.deadlineNanos(Long.MAX_VALUE)); + Assertions.assertEquals(Long.MAX_VALUE, ScheduledFutureTask.deadlineNanos(AbstractScheduledEventExecutor.defaultCurrentTimeNanos(), Long.MAX_VALUE)); } } diff --git a/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java b/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java index 7d776cfb529..244d9bc213b 100644 --- a/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java +++ b/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java @@ -19,6 +19,7 @@ import java.nio.channels.ClosedChannelException; import java.util.ArrayDeque; import java.util.Queue; +import java.util.concurrent.TimeUnit; import io.netty.channel.AbstractChannel; import io.netty.channel.Channel; @@ -635,6 +636,18 @@ private void recordException(Throwable cause) { } } + public void advanceTimeBy(long duration, TimeUnit unit) { + embeddedEventLoop().advanceTimeBy(unit.toNanos(duration)); + } + + public void freezeTime() { + embeddedEventLoop().freezeTime(); + } + + public void unfreezeTime() { + embeddedEventLoop().unfreezeTime(); + } + /** * Checks for the presence of an {@link Exception}. */ diff --git a/transport/src/main/java/io/netty/channel/embedded/EmbeddedEventLoop.java b/transport/src/main/java/io/netty/channel/embedded/EmbeddedEventLoop.java index c1ffc7a314e..6c699b5b09d 100644 --- a/transport/src/main/java/io/netty/channel/embedded/EmbeddedEventLoop.java +++ b/transport/src/main/java/io/netty/channel/embedded/EmbeddedEventLoop.java @@ -30,6 +30,22 @@ import java.util.concurrent.TimeUnit; final class EmbeddedEventLoop extends AbstractScheduledEventExecutor implements EventLoop { + /** + * When time is not {@link #timeFrozen frozen}, the base time to subtract from {@link System#nanoTime()}. When time + * is frozen, this variable is unused. + * + * Initialized to {@link #initialNanoTime()} so that until one of the time mutator methods is called, + * {@link #getCurrentTimeNanos()} matches the default behavior. + */ + private long startTime = initialNanoTime(); + /** + * When time is frozen, the timestamp returned by {@link #getCurrentTimeNanos()}. When unfrozen, this is unused. + */ + private long frozenTimestamp = 0; + /** + * Whether time is currently frozen. + */ + private boolean timeFrozen = false; private final Queue tasks = new ArrayDeque(2); @@ -60,7 +76,7 @@ void runTasks() { } long runScheduledTasks() { - long time = AbstractScheduledEventExecutor.nanoTime(); + long time = getCurrentTimeNanos(); for (;;) { Runnable task = pollScheduledTask(time); if (task == null) { @@ -75,6 +91,41 @@ long nextScheduledTask() { return nextScheduledTaskNano(); } + @Override + protected long getCurrentTimeNanos() { + if (timeFrozen) { + return frozenTimestamp; + } else { + return System.nanoTime() - startTime; + } + } + + void advanceTimeBy(long nanos) { + if (timeFrozen) { + frozenTimestamp += nanos; + } else { + // startTime is subtracted from nanoTime, so increasing the startTime will advance getCurrentTimeNanos + startTime -= nanos; + } + } + + void freezeTime() { + if (!timeFrozen) { + frozenTimestamp = getCurrentTimeNanos(); + timeFrozen = true; + } + } + + void unfreezeTime() { + if (timeFrozen) { + // we want getCurrentTimeNanos to continue right where frozenTimestamp left off: + // getCurrentTimeNanos = nanoTime - startTime = frozenTimestamp + // then solve for startTime + startTime = System.nanoTime() - frozenTimestamp; + timeFrozen = false; + } + } + @Override protected void cancelScheduledTasks() { super.cancelScheduledTasks(); diff --git a/transport/src/test/java/io/netty/channel/embedded/EmbeddedChannelTest.java b/transport/src/test/java/io/netty/channel/embedded/EmbeddedChannelTest.java index fd279ed5ca6..9b08c8efdba 100644 --- a/transport/src/test/java/io/netty/channel/embedded/EmbeddedChannelTest.java +++ b/transport/src/test/java/io/netty/channel/embedded/EmbeddedChannelTest.java @@ -32,6 +32,7 @@ import io.netty.util.ReferenceCountUtil; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; +import io.netty.util.concurrent.ScheduledFuture; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; @@ -676,6 +677,63 @@ void testRunPendingTasksForNotRegisteredChannel() { } } + @Test + @Timeout(30) // generous timeout, just make sure we don't actually wait for the full 10 mins... + void testAdvanceTime() { + EmbeddedChannel channel = new EmbeddedChannel(); + Runnable runnable = new Runnable() { + @Override + public void run() { + } + }; + ScheduledFuture future10 = channel.eventLoop().schedule(runnable, 10, TimeUnit.MINUTES); + ScheduledFuture future20 = channel.eventLoop().schedule(runnable, 20, TimeUnit.MINUTES); + + channel.runPendingTasks(); + assertFalse(future10.isDone()); + assertFalse(future20.isDone()); + + channel.advanceTimeBy(10, TimeUnit.MINUTES); + channel.runPendingTasks(); + assertTrue(future10.isDone()); + assertFalse(future20.isDone()); + } + + @Test + @Timeout(30) // generous timeout, just make sure we don't actually wait for the full 10 mins... + void testFreezeTime() { + EmbeddedChannel channel = new EmbeddedChannel(); + Runnable runnable = new Runnable() { + @Override + public void run() { + } + }; + + channel.freezeTime(); + // this future will complete after 10min + ScheduledFuture future10 = channel.eventLoop().schedule(runnable, 10, TimeUnit.MINUTES); + // this future will complete after 10min + 1ns + ScheduledFuture future101 = channel.eventLoop().schedule(runnable, TimeUnit.MINUTES.toNanos(10) + 1, TimeUnit.NANOSECONDS); + // this future will complete after 20min + ScheduledFuture future20 = channel.eventLoop().schedule(runnable, 20, TimeUnit.MINUTES); + + channel.runPendingTasks(); + assertFalse(future10.isDone()); + assertFalse(future101.isDone()); + assertFalse(future20.isDone()); + + channel.advanceTimeBy(10, TimeUnit.MINUTES); + channel.runPendingTasks(); + assertTrue(future10.isDone()); + assertFalse(future101.isDone()); + assertFalse(future20.isDone()); + + channel.unfreezeTime(); + channel.runPendingTasks(); + assertTrue(future101.isDone()); + assertFalse(future20.isDone()); + } + private static void release(ByteBuf... buffers) { for (ByteBuf buffer : buffers) { if (buffer.refCnt() > 0) { From 98277fcf0f8c801d0875d3485e72533d79fe80fa Mon Sep 17 00:00:00 2001 From: yawkat Date: Fri, 10 Jun 2022 14:29:24 +0200 Subject: [PATCH 2/8] deadline benchmark --- .../ScheduledFutureTaskDeadlineBenchmark.java | 64 +++++++++++++++++++ 1 file changed, 64 insertions(+) create mode 100644 microbench/src/main/java/io/netty/util/concurrent/ScheduledFutureTaskDeadlineBenchmark.java diff --git a/microbench/src/main/java/io/netty/util/concurrent/ScheduledFutureTaskDeadlineBenchmark.java b/microbench/src/main/java/io/netty/util/concurrent/ScheduledFutureTaskDeadlineBenchmark.java new file mode 100644 index 00000000000..bcb8ae67896 --- /dev/null +++ b/microbench/src/main/java/io/netty/util/concurrent/ScheduledFutureTaskDeadlineBenchmark.java @@ -0,0 +1,64 @@ +/* + * Copyright 2022 The Netty Project + * + * The Netty Project licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package io.netty.util.concurrent; + +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.microbench.util.AbstractMicrobenchmark; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.concurrent.TimeUnit; + +@Warmup(iterations = 5, time = 3, timeUnit = TimeUnit.SECONDS) +@Measurement(iterations = 10, time = 3, timeUnit = TimeUnit.SECONDS) +@State(Scope.Benchmark) +public class ScheduledFutureTaskDeadlineBenchmark extends AbstractMicrobenchmark { + @State(Scope.Thread) + public static class ThreadState { + + AbstractScheduledEventExecutor eventLoop; + ScheduledFutureTask future; + + @Setup(Level.Trial) + public void reset() { + eventLoop = (AbstractScheduledEventExecutor) new NioEventLoopGroup(1).next(); + future = (ScheduledFutureTask) eventLoop.schedule(new Runnable() { + @Override + public void run() { + } + }, 100, TimeUnit.DAYS); + } + + @TearDown(Level.Trial) + public void shutdown() { + future.cancel(true); + eventLoop.parent().shutdownGracefully().awaitUninterruptibly(); + } + } + + @Benchmark + @Threads(1) + public long requestDeadline(final ThreadState threadState) { + return threadState.future.delayNanos(); + } +} From ff5aefb725398ebecf6d28f1a74d843c851d85de Mon Sep 17 00:00:00 2001 From: yawkat Date: Fri, 10 Jun 2022 14:44:24 +0200 Subject: [PATCH 3/8] checkstyle --- .../netty/util/concurrent/AbstractScheduledEventExecutor.java | 3 ++- .../io/netty/util/concurrent/ScheduledFutureTaskTest.java | 3 ++- .../java/io/netty/channel/embedded/EmbeddedEventLoop.java | 4 ++-- .../java/io/netty/channel/embedded/EmbeddedChannelTest.java | 3 ++- 4 files changed, 8 insertions(+), 5 deletions(-) diff --git a/common/src/main/java/io/netty/util/concurrent/AbstractScheduledEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/AbstractScheduledEventExecutor.java index 006f83dc983..30f9e173ea6 100644 --- a/common/src/main/java/io/netty/util/concurrent/AbstractScheduledEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/AbstractScheduledEventExecutor.java @@ -213,7 +213,8 @@ public ScheduledFuture schedule(Callable callable, long delay, TimeUni } validateScheduled0(delay, unit); - return schedule(new ScheduledFutureTask(this, callable, deadlineNanos(getCurrentTimeNanos(), unit.toNanos(delay)))); + return schedule(new ScheduledFutureTask( + this, callable, deadlineNanos(getCurrentTimeNanos(), unit.toNanos(delay)))); } @Override diff --git a/common/src/test/java/io/netty/util/concurrent/ScheduledFutureTaskTest.java b/common/src/test/java/io/netty/util/concurrent/ScheduledFutureTaskTest.java index 5c6def23f19..4b6a52a3944 100644 --- a/common/src/test/java/io/netty/util/concurrent/ScheduledFutureTaskTest.java +++ b/common/src/test/java/io/netty/util/concurrent/ScheduledFutureTaskTest.java @@ -22,6 +22,7 @@ public class ScheduledFutureTaskTest { @Test public void testDeadlineNanosNotOverflow() { - Assertions.assertEquals(Long.MAX_VALUE, ScheduledFutureTask.deadlineNanos(AbstractScheduledEventExecutor.defaultCurrentTimeNanos(), Long.MAX_VALUE)); + Assertions.assertEquals(Long.MAX_VALUE, ScheduledFutureTask.deadlineNanos( + AbstractScheduledEventExecutor.defaultCurrentTimeNanos(), Long.MAX_VALUE)); } } diff --git a/transport/src/main/java/io/netty/channel/embedded/EmbeddedEventLoop.java b/transport/src/main/java/io/netty/channel/embedded/EmbeddedEventLoop.java index 6c699b5b09d..31a49167c69 100644 --- a/transport/src/main/java/io/netty/channel/embedded/EmbeddedEventLoop.java +++ b/transport/src/main/java/io/netty/channel/embedded/EmbeddedEventLoop.java @@ -41,11 +41,11 @@ final class EmbeddedEventLoop extends AbstractScheduledEventExecutor implements /** * When time is frozen, the timestamp returned by {@link #getCurrentTimeNanos()}. When unfrozen, this is unused. */ - private long frozenTimestamp = 0; + private long frozenTimestamp; /** * Whether time is currently frozen. */ - private boolean timeFrozen = false; + private boolean timeFrozen; private final Queue tasks = new ArrayDeque(2); diff --git a/transport/src/test/java/io/netty/channel/embedded/EmbeddedChannelTest.java b/transport/src/test/java/io/netty/channel/embedded/EmbeddedChannelTest.java index 9b08c8efdba..4999ea82013 100644 --- a/transport/src/test/java/io/netty/channel/embedded/EmbeddedChannelTest.java +++ b/transport/src/test/java/io/netty/channel/embedded/EmbeddedChannelTest.java @@ -713,7 +713,8 @@ public void run() { // this future will complete after 10min ScheduledFuture future10 = channel.eventLoop().schedule(runnable, 10, TimeUnit.MINUTES); // this future will complete after 10min + 1ns - ScheduledFuture future101 = channel.eventLoop().schedule(runnable, TimeUnit.MINUTES.toNanos(10) + 1, TimeUnit.NANOSECONDS); + ScheduledFuture future101 = channel.eventLoop().schedule(runnable, + TimeUnit.MINUTES.toNanos(10) + 1, TimeUnit.NANOSECONDS); // this future will complete after 20min ScheduledFuture future20 = channel.eventLoop().schedule(runnable, 20, TimeUnit.MINUTES); From bd3d6fd6be69811ee7cab4b5bc58736ce055b480 Mon Sep 17 00:00:00 2001 From: yawkat Date: Fri, 10 Jun 2022 15:49:46 +0200 Subject: [PATCH 4/8] javadoc --- .../netty/channel/embedded/EmbeddedChannel.java | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java b/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java index 244d9bc213b..f27327e5af2 100644 --- a/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java +++ b/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java @@ -636,14 +636,30 @@ private void recordException(Throwable cause) { } } + /** + * Advance the clock of the event loop of this channel by the given duration. Any scheduled tasks will execute + * sooner by the given time (but {@link #runScheduledPendingTasks()} still needs to be called). + */ public void advanceTimeBy(long duration, TimeUnit unit) { embeddedEventLoop().advanceTimeBy(unit.toNanos(duration)); } + /** + * Freeze the clock of this channel's event loop. Any scheduled tasks that are not already due will not run on + * future {@link #runScheduledPendingTasks()} calls. While the event loop is frozen, it is still possible to + * {@link #advanceTimeBy(long, TimeUnit) advance time} manually so that scheduled tasks execute. + */ public void freezeTime() { embeddedEventLoop().freezeTime(); } + /** + * Unfreeze an event loop that was {@link #freezeTime() frozen}. Time will continue at the point where + * {@link #freezeTime()} stopped it: if a task was scheduled ten minutes in the future and {@link #freezeTime()} + * was called, it will run ten minutes after this method is called again (assuming no + * {@link #advanceTimeBy(long, TimeUnit)} calls, and assuming pending scheduled tasks are run at that time using + * {@link #runScheduledPendingTasks()}). + */ public void unfreezeTime() { embeddedEventLoop().unfreezeTime(); } From 5003827c3b0226080bb85693852f08e2cc6ebf20 Mon Sep 17 00:00:00 2001 From: Jonas Konrad Date: Fri, 10 Jun 2022 15:59:59 +0200 Subject: [PATCH 5/8] Update transport/src/main/java/io/netty/channel/embedded/EmbeddedEventLoop.java Co-authored-by: Norman Maurer --- .../java/io/netty/channel/embedded/EmbeddedEventLoop.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/transport/src/main/java/io/netty/channel/embedded/EmbeddedEventLoop.java b/transport/src/main/java/io/netty/channel/embedded/EmbeddedEventLoop.java index 31a49167c69..cf1a17f265c 100644 --- a/transport/src/main/java/io/netty/channel/embedded/EmbeddedEventLoop.java +++ b/transport/src/main/java/io/netty/channel/embedded/EmbeddedEventLoop.java @@ -95,9 +95,7 @@ long nextScheduledTask() { protected long getCurrentTimeNanos() { if (timeFrozen) { return frozenTimestamp; - } else { - return System.nanoTime() - startTime; - } + return System.nanoTime() - startTime; } void advanceTimeBy(long nanos) { From c33fdf11bb126c5e99f807e25efec79b79953c8c Mon Sep 17 00:00:00 2001 From: yawkat Date: Fri, 10 Jun 2022 16:08:54 +0200 Subject: [PATCH 6/8] missing brace from web merge --- .../main/java/io/netty/channel/embedded/EmbeddedEventLoop.java | 1 + 1 file changed, 1 insertion(+) diff --git a/transport/src/main/java/io/netty/channel/embedded/EmbeddedEventLoop.java b/transport/src/main/java/io/netty/channel/embedded/EmbeddedEventLoop.java index cf1a17f265c..b8b73d9f8bd 100644 --- a/transport/src/main/java/io/netty/channel/embedded/EmbeddedEventLoop.java +++ b/transport/src/main/java/io/netty/channel/embedded/EmbeddedEventLoop.java @@ -95,6 +95,7 @@ long nextScheduledTask() { protected long getCurrentTimeNanos() { if (timeFrozen) { return frozenTimestamp; + } return System.nanoTime() - startTime; } From 2080c9916ecd9de22f27dc15827891689eeb723c Mon Sep 17 00:00:00 2001 From: Jonas Konrad Date: Fri, 10 Jun 2022 22:33:26 +0200 Subject: [PATCH 7/8] move deadlineNanos --- .../AbstractScheduledEventExecutor.java | 8 ++++-- .../util/concurrent/GlobalEventExecutor.java | 2 +- .../util/concurrent/ScheduledFutureTask.java | 6 ---- .../AbstractScheduledEventExecutorTest.java | 7 +++++ .../concurrent/ScheduledFutureTaskTest.java | 28 ------------------- 5 files changed, 14 insertions(+), 37 deletions(-) delete mode 100644 common/src/test/java/io/netty/util/concurrent/ScheduledFutureTaskTest.java diff --git a/common/src/main/java/io/netty/util/concurrent/AbstractScheduledEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/AbstractScheduledEventExecutor.java index 30f9e173ea6..ffabe723e33 100644 --- a/common/src/main/java/io/netty/util/concurrent/AbstractScheduledEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/AbstractScheduledEventExecutor.java @@ -19,8 +19,6 @@ import io.netty.util.internal.ObjectUtil; import io.netty.util.internal.PriorityQueue; -import static io.netty.util.concurrent.ScheduledFutureTask.deadlineNanos; - import java.util.Comparator; import java.util.Queue; import java.util.concurrent.Callable; @@ -82,6 +80,12 @@ static long defaultCurrentTimeNanos() { return System.nanoTime() - START_TIME; } + static long deadlineNanos(long nanoTime, long delay) { + long deadlineNanos = nanoTime + delay; + // Guard against overflow + return deadlineNanos < 0 ? Long.MAX_VALUE : deadlineNanos; + } + /** * Given an arbitrary deadline {@code deadlineNanos}, calculate the number of nano seconds from now * {@code deadlineNanos} would expire. diff --git a/common/src/main/java/io/netty/util/concurrent/GlobalEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/GlobalEventExecutor.java index 9c5cfea3d34..6c203a2f606 100644 --- a/common/src/main/java/io/netty/util/concurrent/GlobalEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/GlobalEventExecutor.java @@ -56,7 +56,7 @@ public void run() { }, null), // note: the getCurrentTimeNanos() call here only works because this is a final class, otherwise the method // could be overridden leading to unsafe initialization here! - ScheduledFutureTask.deadlineNanos(getCurrentTimeNanos(), SCHEDULE_QUIET_PERIOD_INTERVAL), + AbstractScheduledEventExecutor.deadlineNanos(getCurrentTimeNanos(), SCHEDULE_QUIET_PERIOD_INTERVAL), -SCHEDULE_QUIET_PERIOD_INTERVAL ); diff --git a/common/src/main/java/io/netty/util/concurrent/ScheduledFutureTask.java b/common/src/main/java/io/netty/util/concurrent/ScheduledFutureTask.java index df1afacdf2d..23fbe5a1e77 100644 --- a/common/src/main/java/io/netty/util/concurrent/ScheduledFutureTask.java +++ b/common/src/main/java/io/netty/util/concurrent/ScheduledFutureTask.java @@ -25,12 +25,6 @@ @SuppressWarnings("ComparableImplementedButEqualsNotOverridden") final class ScheduledFutureTask extends PromiseTask implements ScheduledFuture, PriorityQueueNode { - static long deadlineNanos(long nanoTime, long delay) { - long deadlineNanos = nanoTime + delay; - // Guard against overflow - return deadlineNanos < 0 ? Long.MAX_VALUE : deadlineNanos; - } - // set once when added to priority queue private long id; diff --git a/common/src/test/java/io/netty/util/concurrent/AbstractScheduledEventExecutorTest.java b/common/src/test/java/io/netty/util/concurrent/AbstractScheduledEventExecutorTest.java index fca6ad2d27c..13c4447ba67 100644 --- a/common/src/test/java/io/netty/util/concurrent/AbstractScheduledEventExecutorTest.java +++ b/common/src/test/java/io/netty/util/concurrent/AbstractScheduledEventExecutorTest.java @@ -15,6 +15,7 @@ */ package io.netty.util.concurrent; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.function.Executable; @@ -117,6 +118,12 @@ public void execute() { }); } + @Test + public void testDeadlineNanosNotOverflow() { + Assertions.assertEquals(Long.MAX_VALUE, AbstractScheduledEventExecutor.deadlineNanos( + AbstractScheduledEventExecutor.defaultCurrentTimeNanos(), Long.MAX_VALUE)); + } + private static final class TestScheduledEventExecutor extends AbstractScheduledEventExecutor { @Override public boolean isShuttingDown() { diff --git a/common/src/test/java/io/netty/util/concurrent/ScheduledFutureTaskTest.java b/common/src/test/java/io/netty/util/concurrent/ScheduledFutureTaskTest.java deleted file mode 100644 index 4b6a52a3944..00000000000 --- a/common/src/test/java/io/netty/util/concurrent/ScheduledFutureTaskTest.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Copyright 2018 The Netty Project - * - * The Netty Project licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package io.netty.util.concurrent; - -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.Assertions; - -public class ScheduledFutureTaskTest { - - @Test - public void testDeadlineNanosNotOverflow() { - Assertions.assertEquals(Long.MAX_VALUE, ScheduledFutureTask.deadlineNanos( - AbstractScheduledEventExecutor.defaultCurrentTimeNanos(), Long.MAX_VALUE)); - } -} From cf17549c07f25611fe20420b55201157bc491b80 Mon Sep 17 00:00:00 2001 From: Jonas Konrad Date: Fri, 10 Jun 2022 22:34:15 +0200 Subject: [PATCH 8/8] don't use static qualifier --- .../main/java/io/netty/util/concurrent/GlobalEventExecutor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/src/main/java/io/netty/util/concurrent/GlobalEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/GlobalEventExecutor.java index 6c203a2f606..186af97f184 100644 --- a/common/src/main/java/io/netty/util/concurrent/GlobalEventExecutor.java +++ b/common/src/main/java/io/netty/util/concurrent/GlobalEventExecutor.java @@ -56,7 +56,7 @@ public void run() { }, null), // note: the getCurrentTimeNanos() call here only works because this is a final class, otherwise the method // could be overridden leading to unsafe initialization here! - AbstractScheduledEventExecutor.deadlineNanos(getCurrentTimeNanos(), SCHEDULE_QUIET_PERIOD_INTERVAL), + deadlineNanos(getCurrentTimeNanos(), SCHEDULE_QUIET_PERIOD_INTERVAL), -SCHEDULE_QUIET_PERIOD_INTERVAL );