diff --git a/common/src/main/java/io/netty5/util/concurrent/AbstractScheduledEventExecutor.java b/common/src/main/java/io/netty5/util/concurrent/AbstractScheduledEventExecutor.java index 415ff204bf8..83c8ec1c894 100644 --- a/common/src/main/java/io/netty5/util/concurrent/AbstractScheduledEventExecutor.java +++ b/common/src/main/java/io/netty5/util/concurrent/AbstractScheduledEventExecutor.java @@ -50,16 +50,39 @@ protected AbstractScheduledEventExecutor() { * {@link System#nanoTime()}. */ public static long nanoTime() { - return System.nanoTime() - START_TIME; + return defaultCurrentTimeNanos(); + } + + /** + * The initial value used for delay and computations based upon a monatomic time source. + * @return initial value used for delay and computations based upon a monatomic time source. + */ + protected static long initialNanoTime() { + return START_TIME; } /** - * The deadline (in nanoseconds) for a given delay (in nanoseconds). + * Get the current time in nanoseconds by this executor's clock. This is not the same as {@link System#nanoTime()} + * for two reasons: + * + * */ - static long deadlineNanos(long delay) { - long deadlineNanos = nanoTime() + delay; + protected long getCurrentTimeNanos() { + return defaultCurrentTimeNanos(); + } + + static long defaultCurrentTimeNanos() { + return System.nanoTime() - START_TIME; + } + + static long deadlineNanos(long nanoTime, long delay) { + long deadlineNanos = nanoTime + delay; // Guard against overflow - return deadlineNanos < 0? Long.MAX_VALUE : deadlineNanos; + return deadlineNanos < 0 ? Long.MAX_VALUE : deadlineNanos; } PriorityQueue> scheduledTaskQueue() { @@ -102,12 +125,12 @@ protected final void cancelScheduledTasks() { * @see #pollScheduledTask(long) */ protected final RunnableScheduledFuture pollScheduledTask() { - return pollScheduledTask(nanoTime()); + return pollScheduledTask(getCurrentTimeNanos()); } /** * Return the {@link Runnable} which is ready to be executed with the given {@code nanoTime}. You should use {@link - * #nanoTime()} to retrieve the correct {@code nanoTime}. + * #getCurrentTimeNanos()} to retrieve the correct {@code nanoTime}. *

* This method MUST be called only when {@link #inEventLoop()} is {@code true}. */ @@ -138,7 +161,7 @@ protected final long nextScheduledTaskNano() { if (scheduledTask == null) { return -1; } - return Math.max(0, scheduledTask.deadlineNanos() - nanoTime()); + return Math.max(0, scheduledTask.deadlineNanos() - getCurrentTimeNanos()); } final RunnableScheduledFuture peekScheduledTask() { @@ -158,7 +181,7 @@ protected final boolean hasScheduledTasks() { assert inEventLoop(); Queue> scheduledTaskQueue = this.scheduledTaskQueue; RunnableScheduledFutureNode scheduledTask = scheduledTaskQueue == null? null : scheduledTaskQueue.peek(); - return scheduledTask != null && scheduledTask.deadlineNanos() <= nanoTime(); + return scheduledTask != null && scheduledTask.deadlineNanos() <= getCurrentTimeNanos(); } @Override @@ -169,7 +192,7 @@ public Future schedule(Runnable command, long delay, TimeUnit unit) { delay = 0; } RunnableScheduledFuture task = newScheduledTaskFor( - callable(command, null), deadlineNanos(unit.toNanos(delay)), 0); + callable(command, null), deadlineNanos(getCurrentTimeNanos(), unit.toNanos(delay)), 0); return schedule(task); } @@ -180,7 +203,8 @@ public Future schedule(Callable callable, long delay, TimeUnit unit) { if (delay < 0) { delay = 0; } - RunnableScheduledFuture task = newScheduledTaskFor(callable, deadlineNanos(unit.toNanos(delay)), 0); + RunnableScheduledFuture task = newScheduledTaskFor( + callable, deadlineNanos(getCurrentTimeNanos(), unit.toNanos(delay)), 0); return schedule(task); } @@ -198,7 +222,8 @@ public Future scheduleAtFixedRate(Runnable command, long initialDelay, lon } RunnableScheduledFuture task = newScheduledTaskFor( - callable(command, null), deadlineNanos(unit.toNanos(initialDelay)), unit.toNanos(period)); + callable(command, null), + deadlineNanos(getCurrentTimeNanos(), unit.toNanos(initialDelay)), unit.toNanos(period)); return schedule(task); } @@ -216,7 +241,8 @@ public Future scheduleWithFixedDelay(Runnable command, long initialDelay, } RunnableScheduledFuture task = newScheduledTaskFor( - callable(command, null), deadlineNanos(unit.toNanos(initialDelay)), -unit.toNanos(delay)); + callable(command, null), + deadlineNanos(getCurrentTimeNanos(), unit.toNanos(initialDelay)), -unit.toNanos(delay)); return schedule(task); } diff --git a/common/src/main/java/io/netty5/util/concurrent/GlobalEventExecutor.java b/common/src/main/java/io/netty5/util/concurrent/GlobalEventExecutor.java index 2e5b58ef6c0..ea66df56872 100644 --- a/common/src/main/java/io/netty5/util/concurrent/GlobalEventExecutor.java +++ b/common/src/main/java/io/netty5/util/concurrent/GlobalEventExecutor.java @@ -45,17 +45,11 @@ public final class GlobalEventExecutor extends AbstractScheduledEventExecutor im private static final long SCHEDULE_QUIET_PERIOD_INTERVAL = TimeUnit.SECONDS.toNanos(1); - private static final RunnableScheduledFutureAdapter QUIET_PERIOD_TASK; + private final RunnableScheduledFutureAdapter quietPeriodTask; public static final GlobalEventExecutor INSTANCE; static { INSTANCE = new GlobalEventExecutor(); - QUIET_PERIOD_TASK = new RunnableScheduledFutureAdapter<>( - INSTANCE, INSTANCE.newPromise(), Executors.callable(() -> { - // NOOP - }, null), deadlineNanos(SCHEDULE_QUIET_PERIOD_INTERVAL), -SCHEDULE_QUIET_PERIOD_INTERVAL); - - INSTANCE.scheduledTaskQueue().add(QUIET_PERIOD_TASK); } private final BlockingQueue taskQueue = new LinkedBlockingQueue<>(); @@ -75,6 +69,16 @@ public final class GlobalEventExecutor extends AbstractScheduledEventExecutor im private GlobalEventExecutor() { threadFactory = ThreadExecutorMap.apply(new DefaultThreadFactory( DefaultThreadFactory.toPoolName(getClass()), false, Thread.NORM_PRIORITY, null), this); + quietPeriodTask = new RunnableScheduledFutureAdapter<>( + this, newPromise(), Executors.callable(() -> { + // NOOP + }, null), + // note: the getCurrentTimeNanos() call here only works because this is a final class, otherwise + // the method could be overridden leading to unsafe initialization here! + deadlineNanos(getCurrentTimeNanos(), SCHEDULE_QUIET_PERIOD_INTERVAL), + -SCHEDULE_QUIET_PERIOD_INTERVAL); + + scheduledTaskQueue().add(quietPeriodTask); } /** @@ -122,7 +126,7 @@ private Runnable takeTask() { } private void fetchFromScheduledTaskQueue() { - long nanoTime = AbstractScheduledEventExecutor.nanoTime(); + long nanoTime = getCurrentTimeNanos(); Runnable scheduledTask = pollScheduledTask(nanoTime); while (scheduledTask != null) { taskQueue.add(scheduledTask); @@ -243,7 +247,7 @@ public void run() { logger.warn("Unexpected exception from the global event executor: ", t); } - if (task != QUIET_PERIOD_TASK) { + if (task != quietPeriodTask) { continue; } } diff --git a/common/src/main/java/io/netty5/util/concurrent/RunnableScheduledFutureAdapter.java b/common/src/main/java/io/netty5/util/concurrent/RunnableScheduledFutureAdapter.java index b04e96dcb1f..601e26d4dc8 100644 --- a/common/src/main/java/io/netty5/util/concurrent/RunnableScheduledFutureAdapter.java +++ b/common/src/main/java/io/netty5/util/concurrent/RunnableScheduledFutureAdapter.java @@ -64,12 +64,16 @@ public long deadlineNanos() { @Override public long delayNanos() { - return Math.max(0, deadlineNanos() - AbstractScheduledEventExecutor.nanoTime()); + return delayNanos(executor.getCurrentTimeNanos()); } @Override public long delayNanos(long currentTimeNanos) { - return Math.max(0, deadlineNanos() - (currentTimeNanos - AbstractScheduledEventExecutor.START_TIME)); + return deadlineToDelayNanos(currentTimeNanos, deadlineNanos); + } + + private static long deadlineToDelayNanos(long currentTimeNanos, long deadlineNanos) { + return deadlineNanos == 0L ? 0L : Math.max(0L, deadlineNanos - currentTimeNanos); } @Override @@ -127,7 +131,7 @@ public void run() { if (p > 0) { deadlineNanos += p; } else { - deadlineNanos = AbstractScheduledEventExecutor.nanoTime() - p; + deadlineNanos = executor.getCurrentTimeNanos() - p; } if (!isCancelled()) { executor.schedule(this); diff --git a/common/src/main/java/io/netty5/util/concurrent/SingleThreadEventExecutor.java b/common/src/main/java/io/netty5/util/concurrent/SingleThreadEventExecutor.java index d72bff3f734..f7dafc04812 100644 --- a/common/src/main/java/io/netty5/util/concurrent/SingleThreadEventExecutor.java +++ b/common/src/main/java/io/netty5/util/concurrent/SingleThreadEventExecutor.java @@ -250,7 +250,7 @@ protected final Runnable takeTask() { } private boolean fetchFromScheduledTaskQueue() { - long nanoTime = AbstractScheduledEventExecutor.nanoTime(); + long nanoTime = getCurrentTimeNanos(); RunnableScheduledFuture scheduledTask = pollScheduledTask(nanoTime); while (scheduledTask != null) { if (!taskQueue.offer(scheduledTask)) { @@ -379,6 +379,7 @@ protected int runAllTasks(int maxTasks) { */ protected final long delayNanos(long currentTimeNanos) { assert inEventLoop(); + currentTimeNanos -= START_TIME; RunnableScheduledFuture scheduledTask = peekScheduledTask(); if (scheduledTask == null) { return SCHEDULE_PURGE_INTERVAL; @@ -388,7 +389,7 @@ protected final long delayNanos(long currentTimeNanos) { } /** - * Returns the absolute point in time (relative to {@link #nanoTime()}) at which the the next + * Returns the absolute point in time (relative to {@link #getCurrentTimeNanos()} ()}) at which the next * closest scheduled task should run. * * This method must be called from the {@link EventExecutor} thread. @@ -397,7 +398,7 @@ protected final long deadlineNanos() { assert inEventLoop(); RunnableScheduledFuture scheduledTask = peekScheduledTask(); if (scheduledTask == null) { - return nanoTime() + SCHEDULE_PURGE_INTERVAL; + return getCurrentTimeNanos() + SCHEDULE_PURGE_INTERVAL; } return scheduledTask.deadlineNanos(); } @@ -412,7 +413,7 @@ protected final long deadlineNanos() { */ protected final void updateLastExecutionTime() { assert inEventLoop(); - lastExecutionTime = nanoTime(); + lastExecutionTime = getCurrentTimeNanos(); } /** @@ -599,7 +600,7 @@ boolean confirmShutdown0() { cancelScheduledTasks(); if (gracefulShutdownStartTime == 0) { - gracefulShutdownStartTime = nanoTime(); + gracefulShutdownStartTime = getCurrentTimeNanos(); } if (runAllTasks() || runShutdownHooks()) { @@ -618,7 +619,7 @@ boolean confirmShutdown0() { return false; } - final long nanoTime = nanoTime(); + final long nanoTime = getCurrentTimeNanos(); if (isShutdown() || nanoTime - gracefulShutdownStartTime > gracefulShutdownTimeout) { return true; diff --git a/common/src/test/java/io/netty5/util/concurrent/AbstractScheduledEventExecutorTest.java b/common/src/test/java/io/netty5/util/concurrent/AbstractScheduledEventExecutorTest.java index e3dc272ca4d..44bbd4efad3 100644 --- a/common/src/test/java/io/netty5/util/concurrent/AbstractScheduledEventExecutorTest.java +++ b/common/src/test/java/io/netty5/util/concurrent/AbstractScheduledEventExecutorTest.java @@ -102,7 +102,8 @@ public void testScheduleWithFixedDelayNegative() { @Test public void testDeadlineNanosNotOverflow() { - assertEquals(Long.MAX_VALUE, AbstractScheduledEventExecutor.deadlineNanos(Long.MAX_VALUE)); + assertEquals(Long.MAX_VALUE, AbstractScheduledEventExecutor.deadlineNanos( + AbstractScheduledEventExecutor.defaultCurrentTimeNanos(), Long.MAX_VALUE)); } private static final class TestScheduledEventExecutor extends AbstractScheduledEventExecutor { diff --git a/transport/src/main/java/io/netty5/channel/embedded/EmbeddedChannel.java b/transport/src/main/java/io/netty5/channel/embedded/EmbeddedChannel.java index 4e8472be9d6..a7bda5ab227 100644 --- a/transport/src/main/java/io/netty5/channel/embedded/EmbeddedChannel.java +++ b/transport/src/main/java/io/netty5/channel/embedded/EmbeddedChannel.java @@ -42,6 +42,7 @@ import java.nio.channels.ClosedChannelException; import java.util.ArrayDeque; import java.util.Queue; +import java.util.concurrent.TimeUnit; import static io.netty5.util.internal.PlatformDependent.throwException; import static java.util.Objects.requireNonNull; @@ -614,6 +615,37 @@ private void recordException(Throwable cause) { } } + private EmbeddedEventLoop embeddedEventLoop() { + return (EmbeddedEventLoop) executor(); + } + /** + * Advance the clock of the event loop of this channel by the given duration. Any scheduled tasks will execute + * sooner by the given time (but {@link #runScheduledPendingTasks()} still needs to be called). + */ + public void advanceTimeBy(long duration, TimeUnit unit) { + embeddedEventLoop().advanceTimeBy(unit.toNanos(duration)); + } + + /** + * Freeze the clock of this channel's event loop. Any scheduled tasks that are not already due will not run on + * future {@link #runScheduledPendingTasks()} calls. While the event loop is frozen, it is still possible to + * {@link #advanceTimeBy(long, TimeUnit) advance time} manually so that scheduled tasks execute. + */ + public void freezeTime() { + embeddedEventLoop().freezeTime(); + } + + /** + * Unfreeze an event loop that was {@link #freezeTime() frozen}. Time will continue at the point where + * {@link #freezeTime()} stopped it: if a task was scheduled ten minutes in the future and {@link #freezeTime()} + * was called, it will run ten minutes after this method is called again (assuming no + * {@link #advanceTimeBy(long, TimeUnit)} calls, and assuming pending scheduled tasks are run at that time using + * {@link #runScheduledPendingTasks()}). + */ + public void unfreezeTime() { + embeddedEventLoop().unfreezeTime(); + } + /** * Checks for the presence of an {@link Exception}. */ diff --git a/transport/src/main/java/io/netty5/channel/embedded/EmbeddedEventLoop.java b/transport/src/main/java/io/netty5/channel/embedded/EmbeddedEventLoop.java index 24816d2cf1d..bab2fd99592 100644 --- a/transport/src/main/java/io/netty5/channel/embedded/EmbeddedEventLoop.java +++ b/transport/src/main/java/io/netty5/channel/embedded/EmbeddedEventLoop.java @@ -28,6 +28,22 @@ import static java.util.Objects.requireNonNull; final class EmbeddedEventLoop extends AbstractScheduledEventExecutor implements EventLoop { + /** + * When time is not {@link #timeFrozen frozen}, the base time to subtract from {@link System#nanoTime()}. When time + * is frozen, this variable is unused. + * + * Initialized to {@link #initialNanoTime()} so that until one of the time mutator methods is called, + * {@link #getCurrentTimeNanos()} matches the default behavior. + */ + private long startTime = initialNanoTime(); + /** + * When time is frozen, the timestamp returned by {@link #getCurrentTimeNanos()}. When unfrozen, this is unused. + */ + private long frozenTimestamp; + /** + * Whether time is currently frozen. + */ + private boolean timeFrozen; private final Queue tasks = new ArrayDeque<>(2); boolean running; @@ -91,7 +107,7 @@ void runTasks() { } long runScheduledTasks() { - long time = AbstractScheduledEventExecutor.nanoTime(); + long time = getCurrentTimeNanos(); boolean wasRunning = running; try { for (;;) { @@ -123,6 +139,40 @@ void cancelScheduled() { } } + @Override + protected long getCurrentTimeNanos() { + if (timeFrozen) { + return frozenTimestamp; + } + return System.nanoTime() - startTime; + } + + void advanceTimeBy(long nanos) { + if (timeFrozen) { + frozenTimestamp += nanos; + } else { + // startTime is subtracted from nanoTime, so increasing the startTime will advance getCurrentTimeNanos + startTime -= nanos; + } + } + + void freezeTime() { + if (!timeFrozen) { + frozenTimestamp = getCurrentTimeNanos(); + timeFrozen = true; + } + } + + void unfreezeTime() { + if (timeFrozen) { + // we want getCurrentTimeNanos to continue right where frozenTimestamp left off: + // getCurrentTimeNanos = nanoTime - startTime = frozenTimestamp + // then solve for startTime + startTime = System.nanoTime() - frozenTimestamp; + timeFrozen = false; + } + } + @Override public Future shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { throw new UnsupportedOperationException();