diff --git a/common/src/main/java/io/netty/util/concurrent/AbstractScheduledEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/AbstractScheduledEventExecutor.java
index 7f86e55d566..ffabe723e33 100644
--- a/common/src/main/java/io/netty/util/concurrent/AbstractScheduledEventExecutor.java
+++ b/common/src/main/java/io/netty/util/concurrent/AbstractScheduledEventExecutor.java
@@ -19,8 +19,6 @@
import io.netty.util.internal.ObjectUtil;
import io.netty.util.internal.PriorityQueue;
-import static io.netty.util.concurrent.ScheduledFutureTask.deadlineNanos;
-
import java.util.Comparator;
import java.util.Queue;
import java.util.concurrent.Callable;
@@ -38,7 +36,9 @@ public int compare(ScheduledFutureTask> o1, ScheduledFutureTask> o2) {
}
};
- static final Runnable WAKEUP_TASK = new Runnable() {
+ private static final long START_TIME = System.nanoTime();
+
+ static final Runnable WAKEUP_TASK = new Runnable() {
@Override
public void run() { } // Do nothing
};
@@ -54,8 +54,36 @@ protected AbstractScheduledEventExecutor(EventExecutorGroup parent) {
super(parent);
}
+ /**
+ * Get the current time in nanoseconds by this executor's clock. This is not the same as {@link System#nanoTime()}
+ * for two reasons:
+ *
+ *
+ * - We apply a fixed offset to the {@link System#nanoTime() nanoTime}
+ * - Implementations (in particular EmbeddedEventLoop) may use their own time source so they can control time
+ * for testing purposes.
+ *
+ */
+ protected long getCurrentTimeNanos() {
+ return defaultCurrentTimeNanos();
+ }
+
+ /**
+ * @deprecated Use the non-static {@link #getCurrentTimeNanos()} instead.
+ */
+ @Deprecated
protected static long nanoTime() {
- return ScheduledFutureTask.nanoTime();
+ return defaultCurrentTimeNanos();
+ }
+
+ static long defaultCurrentTimeNanos() {
+ return System.nanoTime() - START_TIME;
+ }
+
+ static long deadlineNanos(long nanoTime, long delay) {
+ long deadlineNanos = nanoTime + delay;
+ // Guard against overflow
+ return deadlineNanos < 0 ? Long.MAX_VALUE : deadlineNanos;
}
/**
@@ -65,7 +93,7 @@ protected static long nanoTime() {
* @return the number of nano seconds from now {@code deadlineNanos} would expire.
*/
protected static long deadlineToDelayNanos(long deadlineNanos) {
- return ScheduledFutureTask.deadlineToDelayNanos(deadlineNanos);
+ return ScheduledFutureTask.deadlineToDelayNanos(defaultCurrentTimeNanos(), deadlineNanos);
}
/**
@@ -73,7 +101,7 @@ protected static long deadlineToDelayNanos(long deadlineNanos) {
* @return initial value used for delay and computations based upon a monatomic time source.
*/
protected static long initialNanoTime() {
- return ScheduledFutureTask.initialNanoTime();
+ return START_TIME;
}
PriorityQueue> scheduledTaskQueue() {
@@ -116,12 +144,12 @@ protected void cancelScheduledTasks() {
* @see #pollScheduledTask(long)
*/
protected final Runnable pollScheduledTask() {
- return pollScheduledTask(nanoTime());
+ return pollScheduledTask(getCurrentTimeNanos());
}
/**
* Return the {@link Runnable} which is ready to be executed with the given {@code nanoTime}.
- * You should use {@link #nanoTime()} to retrieve the correct {@code nanoTime}.
+ * You should use {@link #getCurrentTimeNanos()} to retrieve the correct {@code nanoTime}.
*/
protected final Runnable pollScheduledTask(long nanoTime) {
assert inEventLoop();
@@ -162,7 +190,7 @@ final ScheduledFutureTask> peekScheduledTask() {
*/
protected final boolean hasScheduledTasks() {
ScheduledFutureTask> scheduledTask = peekScheduledTask();
- return scheduledTask != null && scheduledTask.deadlineNanos() <= nanoTime();
+ return scheduledTask != null && scheduledTask.deadlineNanos() <= getCurrentTimeNanos();
}
@Override
@@ -177,7 +205,7 @@ public ScheduledFuture> schedule(Runnable command, long delay, TimeUnit unit)
return schedule(new ScheduledFutureTask(
this,
command,
- deadlineNanos(unit.toNanos(delay))));
+ deadlineNanos(getCurrentTimeNanos(), unit.toNanos(delay))));
}
@Override
@@ -189,7 +217,8 @@ public ScheduledFuture schedule(Callable callable, long delay, TimeUni
}
validateScheduled0(delay, unit);
- return schedule(new ScheduledFutureTask(this, callable, deadlineNanos(unit.toNanos(delay))));
+ return schedule(new ScheduledFutureTask(
+ this, callable, deadlineNanos(getCurrentTimeNanos(), unit.toNanos(delay))));
}
@Override
@@ -208,7 +237,7 @@ public ScheduledFuture> scheduleAtFixedRate(Runnable command, long initialDela
validateScheduled0(period, unit);
return schedule(new ScheduledFutureTask(
- this, command, deadlineNanos(unit.toNanos(initialDelay)), unit.toNanos(period)));
+ this, command, deadlineNanos(getCurrentTimeNanos(), unit.toNanos(initialDelay)), unit.toNanos(period)));
}
@Override
@@ -228,7 +257,7 @@ public ScheduledFuture> scheduleWithFixedDelay(Runnable command, long initialD
validateScheduled0(delay, unit);
return schedule(new ScheduledFutureTask(
- this, command, deadlineNanos(unit.toNanos(initialDelay)), -unit.toNanos(delay)));
+ this, command, deadlineNanos(getCurrentTimeNanos(), unit.toNanos(initialDelay)), -unit.toNanos(delay)));
}
@SuppressWarnings("deprecation")
@@ -291,7 +320,7 @@ final void removeScheduled(final ScheduledFutureTask> task) {
* to wake the {@link EventExecutor} thread if required.
*
* @param deadlineNanos deadline of the to-be-scheduled task
- * relative to {@link AbstractScheduledEventExecutor#nanoTime()}
+ * relative to {@link AbstractScheduledEventExecutor#getCurrentTimeNanos()}
* @return {@code true} if the {@link EventExecutor} thread should be woken, {@code false} otherwise
*/
protected boolean beforeScheduledTaskSubmitted(long deadlineNanos) {
@@ -301,7 +330,7 @@ protected boolean beforeScheduledTaskSubmitted(long deadlineNanos) {
/**
* See {@link #beforeScheduledTaskSubmitted(long)}. Called only after that method returns false.
*
- * @param deadlineNanos relative to {@link AbstractScheduledEventExecutor#nanoTime()}
+ * @param deadlineNanos relative to {@link AbstractScheduledEventExecutor#getCurrentTimeNanos()}
* @return {@code true} if the {@link EventExecutor} thread should be woken, {@code false} otherwise
*/
protected boolean afterScheduledTaskSubmitted(long deadlineNanos) {
diff --git a/common/src/main/java/io/netty/util/concurrent/GlobalEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/GlobalEventExecutor.java
index e110bf66b76..186af97f184 100644
--- a/common/src/main/java/io/netty/util/concurrent/GlobalEventExecutor.java
+++ b/common/src/main/java/io/netty/util/concurrent/GlobalEventExecutor.java
@@ -53,7 +53,12 @@ public final class GlobalEventExecutor extends AbstractScheduledEventExecutor im
public void run() {
// NOOP
}
- }, null), ScheduledFutureTask.deadlineNanos(SCHEDULE_QUIET_PERIOD_INTERVAL), -SCHEDULE_QUIET_PERIOD_INTERVAL);
+ }, null),
+ // note: the getCurrentTimeNanos() call here only works because this is a final class, otherwise the method
+ // could be overridden leading to unsafe initialization here!
+ deadlineNanos(getCurrentTimeNanos(), SCHEDULE_QUIET_PERIOD_INTERVAL),
+ -SCHEDULE_QUIET_PERIOD_INTERVAL
+ );
// because the GlobalEventExecutor is a singleton, tasks submitted to it can come from arbitrary threads and this
// can trigger the creation of a thread from arbitrary thread groups; for this reason, the thread factory must not
@@ -117,7 +122,7 @@ Runnable takeTask() {
}
private void fetchFromScheduledTaskQueue() {
- long nanoTime = AbstractScheduledEventExecutor.nanoTime();
+ long nanoTime = getCurrentTimeNanos();
Runnable scheduledTask = pollScheduledTask(nanoTime);
while (scheduledTask != null) {
taskQueue.add(scheduledTask);
diff --git a/common/src/main/java/io/netty/util/concurrent/ScheduledFutureTask.java b/common/src/main/java/io/netty/util/concurrent/ScheduledFutureTask.java
index ab39b86aeaf..23fbe5a1e77 100644
--- a/common/src/main/java/io/netty/util/concurrent/ScheduledFutureTask.java
+++ b/common/src/main/java/io/netty/util/concurrent/ScheduledFutureTask.java
@@ -25,22 +25,6 @@
@SuppressWarnings("ComparableImplementedButEqualsNotOverridden")
final class ScheduledFutureTask extends PromiseTask implements ScheduledFuture, PriorityQueueNode {
- private static final long START_TIME = System.nanoTime();
-
- static long nanoTime() {
- return System.nanoTime() - START_TIME;
- }
-
- static long deadlineNanos(long delay) {
- long deadlineNanos = nanoTime() + delay;
- // Guard against overflow
- return deadlineNanos < 0 ? Long.MAX_VALUE : deadlineNanos;
- }
-
- static long initialNanoTime() {
- return START_TIME;
- }
-
// set once when added to priority queue
private long id;
@@ -109,22 +93,21 @@ void setConsumed() {
// Optimization to avoid checking system clock again
// after deadline has passed and task has been dequeued
if (periodNanos == 0) {
- assert nanoTime() >= deadlineNanos;
+ assert scheduledExecutor().getCurrentTimeNanos() >= deadlineNanos;
deadlineNanos = 0L;
}
}
public long delayNanos() {
- return deadlineToDelayNanos(deadlineNanos());
+ return delayNanos(scheduledExecutor().getCurrentTimeNanos());
}
- static long deadlineToDelayNanos(long deadlineNanos) {
- return deadlineNanos == 0L ? 0L : Math.max(0L, deadlineNanos - nanoTime());
+ static long deadlineToDelayNanos(long currentTimeNanos, long deadlineNanos) {
+ return deadlineNanos == 0L ? 0L : Math.max(0L, deadlineNanos - currentTimeNanos);
}
public long delayNanos(long currentTimeNanos) {
- return deadlineNanos == 0L ? 0L
- : Math.max(0L, deadlineNanos() - (currentTimeNanos - START_TIME));
+ return deadlineToDelayNanos(currentTimeNanos, deadlineNanos);
}
@Override
@@ -178,7 +161,7 @@ public void run() {
if (periodNanos > 0) {
deadlineNanos += periodNanos;
} else {
- deadlineNanos = nanoTime() - periodNanos;
+ deadlineNanos = scheduledExecutor().getCurrentTimeNanos() - periodNanos;
}
if (!isCancelled()) {
scheduledExecutor().scheduledTaskQueue().add(this);
diff --git a/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java b/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java
index 0e68abd66d7..6c7020ee234 100644
--- a/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java
+++ b/common/src/main/java/io/netty/util/concurrent/SingleThreadEventExecutor.java
@@ -280,7 +280,7 @@ private boolean fetchFromScheduledTaskQueue() {
if (scheduledTaskQueue == null || scheduledTaskQueue.isEmpty()) {
return true;
}
- long nanoTime = AbstractScheduledEventExecutor.nanoTime();
+ long nanoTime = getCurrentTimeNanos();
for (;;) {
Runnable scheduledTask = pollScheduledTask(nanoTime);
if (scheduledTask == null) {
@@ -301,7 +301,7 @@ private boolean executeExpiredScheduledTasks() {
if (scheduledTaskQueue == null || scheduledTaskQueue.isEmpty()) {
return false;
}
- long nanoTime = AbstractScheduledEventExecutor.nanoTime();
+ long nanoTime = getCurrentTimeNanos();
Runnable scheduledTask = pollScheduledTask(nanoTime);
if (scheduledTask == null) {
return false;
@@ -378,7 +378,7 @@ protected boolean runAllTasks() {
} while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.
if (ranAtLeastOne) {
- lastExecutionTime = ScheduledFutureTask.nanoTime();
+ lastExecutionTime = getCurrentTimeNanos();
}
afterRunningAllTasks();
return ranAtLeastOne;
@@ -403,7 +403,7 @@ protected final boolean runScheduledAndExecutorTasks(final int maxDrainAttempts)
} while (ranAtLeastOneTask && ++drainAttempt < maxDrainAttempts);
if (drainAttempt > 0) {
- lastExecutionTime = ScheduledFutureTask.nanoTime();
+ lastExecutionTime = getCurrentTimeNanos();
}
afterRunningAllTasks();
@@ -463,7 +463,7 @@ protected boolean runAllTasks(long timeoutNanos) {
return false;
}
- final long deadline = timeoutNanos > 0 ? ScheduledFutureTask.nanoTime() + timeoutNanos : 0;
+ final long deadline = timeoutNanos > 0 ? getCurrentTimeNanos() + timeoutNanos : 0;
long runTasks = 0;
long lastExecutionTime;
for (;;) {
@@ -474,7 +474,7 @@ protected boolean runAllTasks(long timeoutNanos) {
// Check timeout every 64 tasks because nanoTime() is relatively expensive.
// XXX: Hard-coded value - will make it configurable if it is really a problem.
if ((runTasks & 0x3F) == 0) {
- lastExecutionTime = ScheduledFutureTask.nanoTime();
+ lastExecutionTime = getCurrentTimeNanos();
if (lastExecutionTime >= deadline) {
break;
}
@@ -482,7 +482,7 @@ protected boolean runAllTasks(long timeoutNanos) {
task = pollTask();
if (task == null) {
- lastExecutionTime = ScheduledFutureTask.nanoTime();
+ lastExecutionTime = getCurrentTimeNanos();
break;
}
}
@@ -502,6 +502,8 @@ protected void afterRunningAllTasks() { }
* Returns the amount of time left until the scheduled task with the closest dead line is executed.
*/
protected long delayNanos(long currentTimeNanos) {
+ currentTimeNanos -= initialNanoTime();
+
ScheduledFutureTask> scheduledTask = peekScheduledTask();
if (scheduledTask == null) {
return SCHEDULE_PURGE_INTERVAL;
@@ -511,14 +513,14 @@ protected long delayNanos(long currentTimeNanos) {
}
/**
- * Returns the absolute point in time (relative to {@link #nanoTime()}) at which the next
+ * Returns the absolute point in time (relative to {@link #getCurrentTimeNanos()}) at which the next
* closest scheduled task should run.
*/
@UnstableApi
protected long deadlineNanos() {
ScheduledFutureTask> scheduledTask = peekScheduledTask();
if (scheduledTask == null) {
- return nanoTime() + SCHEDULE_PURGE_INTERVAL;
+ return getCurrentTimeNanos() + SCHEDULE_PURGE_INTERVAL;
}
return scheduledTask.deadlineNanos();
}
@@ -531,7 +533,7 @@ protected long deadlineNanos() {
* checks.
*/
protected void updateLastExecutionTime() {
- lastExecutionTime = ScheduledFutureTask.nanoTime();
+ lastExecutionTime = getCurrentTimeNanos();
}
/**
@@ -609,7 +611,7 @@ private boolean runShutdownHooks() {
}
if (ran) {
- lastExecutionTime = ScheduledFutureTask.nanoTime();
+ lastExecutionTime = getCurrentTimeNanos();
}
return ran;
@@ -755,7 +757,7 @@ protected boolean confirmShutdown() {
cancelScheduledTasks();
if (gracefulShutdownStartTime == 0) {
- gracefulShutdownStartTime = ScheduledFutureTask.nanoTime();
+ gracefulShutdownStartTime = getCurrentTimeNanos();
}
if (runAllTasks() || runShutdownHooks()) {
@@ -774,7 +776,7 @@ protected boolean confirmShutdown() {
return false;
}
- final long nanoTime = ScheduledFutureTask.nanoTime();
+ final long nanoTime = getCurrentTimeNanos();
if (isShutdown() || nanoTime - gracefulShutdownStartTime > gracefulShutdownTimeout) {
return true;
diff --git a/common/src/test/java/io/netty/util/concurrent/AbstractScheduledEventExecutorTest.java b/common/src/test/java/io/netty/util/concurrent/AbstractScheduledEventExecutorTest.java
index fca6ad2d27c..13c4447ba67 100644
--- a/common/src/test/java/io/netty/util/concurrent/AbstractScheduledEventExecutorTest.java
+++ b/common/src/test/java/io/netty/util/concurrent/AbstractScheduledEventExecutorTest.java
@@ -15,6 +15,7 @@
*/
package io.netty.util.concurrent;
+import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.function.Executable;
@@ -117,6 +118,12 @@ public void execute() {
});
}
+ @Test
+ public void testDeadlineNanosNotOverflow() {
+ Assertions.assertEquals(Long.MAX_VALUE, AbstractScheduledEventExecutor.deadlineNanos(
+ AbstractScheduledEventExecutor.defaultCurrentTimeNanos(), Long.MAX_VALUE));
+ }
+
private static final class TestScheduledEventExecutor extends AbstractScheduledEventExecutor {
@Override
public boolean isShuttingDown() {
diff --git a/common/src/test/java/io/netty/util/concurrent/ScheduledFutureTaskTest.java b/common/src/test/java/io/netty/util/concurrent/ScheduledFutureTaskTest.java
deleted file mode 100644
index 248a0887733..00000000000
--- a/common/src/test/java/io/netty/util/concurrent/ScheduledFutureTaskTest.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Copyright 2018 The Netty Project
- *
- * The Netty Project licenses this file to you under the Apache License,
- * version 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at:
- *
- * https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations
- * under the License.
- */
-package io.netty.util.concurrent;
-
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.Assertions;
-
-public class ScheduledFutureTaskTest {
-
- @Test
- public void testDeadlineNanosNotOverflow() {
- Assertions.assertEquals(Long.MAX_VALUE, ScheduledFutureTask.deadlineNanos(Long.MAX_VALUE));
- }
-}
diff --git a/microbench/src/main/java/io/netty/util/concurrent/ScheduledFutureTaskDeadlineBenchmark.java b/microbench/src/main/java/io/netty/util/concurrent/ScheduledFutureTaskDeadlineBenchmark.java
new file mode 100644
index 00000000000..bcb8ae67896
--- /dev/null
+++ b/microbench/src/main/java/io/netty/util/concurrent/ScheduledFutureTaskDeadlineBenchmark.java
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2022 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package io.netty.util.concurrent;
+
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.microbench.util.AbstractMicrobenchmark;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.concurrent.TimeUnit;
+
+@Warmup(iterations = 5, time = 3, timeUnit = TimeUnit.SECONDS)
+@Measurement(iterations = 10, time = 3, timeUnit = TimeUnit.SECONDS)
+@State(Scope.Benchmark)
+public class ScheduledFutureTaskDeadlineBenchmark extends AbstractMicrobenchmark {
+ @State(Scope.Thread)
+ public static class ThreadState {
+
+ AbstractScheduledEventExecutor eventLoop;
+ ScheduledFutureTask> future;
+
+ @Setup(Level.Trial)
+ public void reset() {
+ eventLoop = (AbstractScheduledEventExecutor) new NioEventLoopGroup(1).next();
+ future = (ScheduledFutureTask>) eventLoop.schedule(new Runnable() {
+ @Override
+ public void run() {
+ }
+ }, 100, TimeUnit.DAYS);
+ }
+
+ @TearDown(Level.Trial)
+ public void shutdown() {
+ future.cancel(true);
+ eventLoop.parent().shutdownGracefully().awaitUninterruptibly();
+ }
+ }
+
+ @Benchmark
+ @Threads(1)
+ public long requestDeadline(final ThreadState threadState) {
+ return threadState.future.delayNanos();
+ }
+}
diff --git a/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java b/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java
index 7d776cfb529..f27327e5af2 100644
--- a/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java
+++ b/transport/src/main/java/io/netty/channel/embedded/EmbeddedChannel.java
@@ -19,6 +19,7 @@
import java.nio.channels.ClosedChannelException;
import java.util.ArrayDeque;
import java.util.Queue;
+import java.util.concurrent.TimeUnit;
import io.netty.channel.AbstractChannel;
import io.netty.channel.Channel;
@@ -635,6 +636,34 @@ private void recordException(Throwable cause) {
}
}
+ /**
+ * Advance the clock of the event loop of this channel by the given duration. Any scheduled tasks will execute
+ * sooner by the given time (but {@link #runScheduledPendingTasks()} still needs to be called).
+ */
+ public void advanceTimeBy(long duration, TimeUnit unit) {
+ embeddedEventLoop().advanceTimeBy(unit.toNanos(duration));
+ }
+
+ /**
+ * Freeze the clock of this channel's event loop. Any scheduled tasks that are not already due will not run on
+ * future {@link #runScheduledPendingTasks()} calls. While the event loop is frozen, it is still possible to
+ * {@link #advanceTimeBy(long, TimeUnit) advance time} manually so that scheduled tasks execute.
+ */
+ public void freezeTime() {
+ embeddedEventLoop().freezeTime();
+ }
+
+ /**
+ * Unfreeze an event loop that was {@link #freezeTime() frozen}. Time will continue at the point where
+ * {@link #freezeTime()} stopped it: if a task was scheduled ten minutes in the future and {@link #freezeTime()}
+ * was called, it will run ten minutes after this method is called again (assuming no
+ * {@link #advanceTimeBy(long, TimeUnit)} calls, and assuming pending scheduled tasks are run at that time using
+ * {@link #runScheduledPendingTasks()}).
+ */
+ public void unfreezeTime() {
+ embeddedEventLoop().unfreezeTime();
+ }
+
/**
* Checks for the presence of an {@link Exception}.
*/
diff --git a/transport/src/main/java/io/netty/channel/embedded/EmbeddedEventLoop.java b/transport/src/main/java/io/netty/channel/embedded/EmbeddedEventLoop.java
index c1ffc7a314e..b8b73d9f8bd 100644
--- a/transport/src/main/java/io/netty/channel/embedded/EmbeddedEventLoop.java
+++ b/transport/src/main/java/io/netty/channel/embedded/EmbeddedEventLoop.java
@@ -30,6 +30,22 @@
import java.util.concurrent.TimeUnit;
final class EmbeddedEventLoop extends AbstractScheduledEventExecutor implements EventLoop {
+ /**
+ * When time is not {@link #timeFrozen frozen}, the base time to subtract from {@link System#nanoTime()}. When time
+ * is frozen, this variable is unused.
+ *
+ * Initialized to {@link #initialNanoTime()} so that until one of the time mutator methods is called,
+ * {@link #getCurrentTimeNanos()} matches the default behavior.
+ */
+ private long startTime = initialNanoTime();
+ /**
+ * When time is frozen, the timestamp returned by {@link #getCurrentTimeNanos()}. When unfrozen, this is unused.
+ */
+ private long frozenTimestamp;
+ /**
+ * Whether time is currently frozen.
+ */
+ private boolean timeFrozen;
private final Queue tasks = new ArrayDeque(2);
@@ -60,7 +76,7 @@ void runTasks() {
}
long runScheduledTasks() {
- long time = AbstractScheduledEventExecutor.nanoTime();
+ long time = getCurrentTimeNanos();
for (;;) {
Runnable task = pollScheduledTask(time);
if (task == null) {
@@ -75,6 +91,40 @@ long nextScheduledTask() {
return nextScheduledTaskNano();
}
+ @Override
+ protected long getCurrentTimeNanos() {
+ if (timeFrozen) {
+ return frozenTimestamp;
+ }
+ return System.nanoTime() - startTime;
+ }
+
+ void advanceTimeBy(long nanos) {
+ if (timeFrozen) {
+ frozenTimestamp += nanos;
+ } else {
+ // startTime is subtracted from nanoTime, so increasing the startTime will advance getCurrentTimeNanos
+ startTime -= nanos;
+ }
+ }
+
+ void freezeTime() {
+ if (!timeFrozen) {
+ frozenTimestamp = getCurrentTimeNanos();
+ timeFrozen = true;
+ }
+ }
+
+ void unfreezeTime() {
+ if (timeFrozen) {
+ // we want getCurrentTimeNanos to continue right where frozenTimestamp left off:
+ // getCurrentTimeNanos = nanoTime - startTime = frozenTimestamp
+ // then solve for startTime
+ startTime = System.nanoTime() - frozenTimestamp;
+ timeFrozen = false;
+ }
+ }
+
@Override
protected void cancelScheduledTasks() {
super.cancelScheduledTasks();
diff --git a/transport/src/test/java/io/netty/channel/embedded/EmbeddedChannelTest.java b/transport/src/test/java/io/netty/channel/embedded/EmbeddedChannelTest.java
index fd279ed5ca6..4999ea82013 100644
--- a/transport/src/test/java/io/netty/channel/embedded/EmbeddedChannelTest.java
+++ b/transport/src/test/java/io/netty/channel/embedded/EmbeddedChannelTest.java
@@ -32,6 +32,7 @@
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
+import io.netty.util.concurrent.ScheduledFuture;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
@@ -676,6 +677,64 @@ void testRunPendingTasksForNotRegisteredChannel() {
}
}
+ @Test
+ @Timeout(30) // generous timeout, just make sure we don't actually wait for the full 10 mins...
+ void testAdvanceTime() {
+ EmbeddedChannel channel = new EmbeddedChannel();
+ Runnable runnable = new Runnable() {
+ @Override
+ public void run() {
+ }
+ };
+ ScheduledFuture> future10 = channel.eventLoop().schedule(runnable, 10, TimeUnit.MINUTES);
+ ScheduledFuture> future20 = channel.eventLoop().schedule(runnable, 20, TimeUnit.MINUTES);
+
+ channel.runPendingTasks();
+ assertFalse(future10.isDone());
+ assertFalse(future20.isDone());
+
+ channel.advanceTimeBy(10, TimeUnit.MINUTES);
+ channel.runPendingTasks();
+ assertTrue(future10.isDone());
+ assertFalse(future20.isDone());
+ }
+
+ @Test
+ @Timeout(30) // generous timeout, just make sure we don't actually wait for the full 10 mins...
+ void testFreezeTime() {
+ EmbeddedChannel channel = new EmbeddedChannel();
+ Runnable runnable = new Runnable() {
+ @Override
+ public void run() {
+ }
+ };
+
+ channel.freezeTime();
+ // this future will complete after 10min
+ ScheduledFuture> future10 = channel.eventLoop().schedule(runnable, 10, TimeUnit.MINUTES);
+ // this future will complete after 10min + 1ns
+ ScheduledFuture> future101 = channel.eventLoop().schedule(runnable,
+ TimeUnit.MINUTES.toNanos(10) + 1, TimeUnit.NANOSECONDS);
+ // this future will complete after 20min
+ ScheduledFuture> future20 = channel.eventLoop().schedule(runnable, 20, TimeUnit.MINUTES);
+
+ channel.runPendingTasks();
+ assertFalse(future10.isDone());
+ assertFalse(future101.isDone());
+ assertFalse(future20.isDone());
+
+ channel.advanceTimeBy(10, TimeUnit.MINUTES);
+ channel.runPendingTasks();
+ assertTrue(future10.isDone());
+ assertFalse(future101.isDone());
+ assertFalse(future20.isDone());
+
+ channel.unfreezeTime();
+ channel.runPendingTasks();
+ assertTrue(future101.isDone());
+ assertFalse(future20.isDone());
+ }
+
private static void release(ByteBuf... buffers) {
for (ByteBuf buffer : buffers) {
if (buffer.refCnt() > 0) {