Skip to content

Commit

Permalink
Replace EmbeddedChannel's 'frozen time' feature with Ticker
Browse files Browse the repository at this point in the history
Motivation:

Netty currently lacks a standard abstraction for time source, as known
as 'ticker' or 'clock'. As a result, Netty relies on ad-hoc overridable
protected methods for 'time getter' methods, which gives hard time to
users who want to test the time-sensitive logic that involes Netty.

Modifications:

- Added `Ticker` and `MockTicker` and their default implementations.
- (Breaking) Updated `EmbeddedChannel` and `EmbeddedEventLoop` to use
  `Ticker` instead of its own time manipulation API.
  - Removed the old time manipulation API.
  - Note that I removed `freezeTime()` and `unfreezeTime()` because it
    can easily be replicated with `advance()`.

Result:

- Netty now has a better abstraction for scheduling and testing
  time-sensitive logic.
- A user can now specify system, mock or custom `Ticker` implementation
  when constructing an `EmbeddedChannel`, which is more flexible than
  the previous API.
  - (Breaking) The previous time-manipulation API in `EmbeddedChannel`
    has been removed in favor of the new API.
- Partially resolves #12827. However, this PR didn't update
  `IdleStateHandler` or any other time-sensitive logic in Netty, which
  is left as future work.
  • Loading branch information
trustin committed Jan 30, 2023
1 parent 68eba52 commit ec381a1
Show file tree
Hide file tree
Showing 14 changed files with 483 additions and 163 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
* Abstract base class for {@link EventExecutor}s that want to support scheduling.
*/
public abstract class AbstractScheduledEventExecutor extends AbstractEventExecutor {
static final long START_TIME = System.nanoTime();

private static final Comparator<RunnableScheduledFutureNode<?>> SCHEDULED_FUTURE_TASK_COMPARATOR =
Comparable::compareTo;
Expand All @@ -44,37 +43,10 @@ protected AbstractScheduledEventExecutor() {
}

/**
* The time elapsed since initialization of this class in nanoseconds. This may return a negative number just like
* {@link System#nanoTime()}.
* Return the {@link Ticker} that provides the time source.
*/
public static long nanoTime() {
return defaultCurrentTimeNanos();
}

/**
* The initial value used for delay and computations based upon a monatomic time source.
* @return initial value used for delay and computations based upon a monatomic time source.
*/
protected static long initialNanoTime() {
return START_TIME;
}

/**
* Get the current time in nanoseconds by this executor's clock. This is not the same as {@link System#nanoTime()}
* for two reasons:
*
* <ul>
* <li>We apply a fixed offset to the {@link System#nanoTime() nanoTime}</li>
* <li>Implementations (in particular EmbeddedEventLoop) may use their own time source so they can control time
* for testing purposes.</li>
* </ul>
*/
protected long getCurrentTimeNanos() {
return defaultCurrentTimeNanos();
}

static long defaultCurrentTimeNanos() {
return System.nanoTime() - START_TIME;
protected Ticker ticker() {
return Ticker.systemTicker();
}

static long deadlineNanos(long nanoTime, long delay) {
Expand Down Expand Up @@ -123,12 +95,12 @@ protected final void cancelScheduledTasks() {
* @see #pollScheduledTask(long)
*/
protected final RunnableScheduledFuture<?> pollScheduledTask() {
return pollScheduledTask(getCurrentTimeNanos());
return pollScheduledTask(ticker().nanoTime());
}

/**
* Return the {@link Runnable} which is ready to be executed with the given {@code nanoTime}. You should use {@link
* #getCurrentTimeNanos()} to retrieve the correct {@code nanoTime}.
* #ticker().nanoTime()} to retrieve the correct {@code nanoTime}.
* <p>
* This method MUST be called only when {@link #inEventLoop()} is {@code true}.
*/
Expand Down Expand Up @@ -159,7 +131,7 @@ protected final long nextScheduledTaskNano() {
if (scheduledTask == null) {
return -1;
}
return Math.max(0, scheduledTask.deadlineNanos() - getCurrentTimeNanos());
return Math.max(0, scheduledTask.deadlineNanos() - ticker().nanoTime());
}

final RunnableScheduledFuture<?> peekScheduledTask() {
Expand All @@ -179,7 +151,7 @@ protected final boolean hasScheduledTasks() {
assert inEventLoop();
Queue<RunnableScheduledFutureNode<?>> scheduledTaskQueue = this.scheduledTaskQueue;
RunnableScheduledFutureNode<?> scheduledTask = scheduledTaskQueue == null? null : scheduledTaskQueue.peek();
return scheduledTask != null && scheduledTask.deadlineNanos() <= getCurrentTimeNanos();
return scheduledTask != null && scheduledTask.deadlineNanos() <= ticker().nanoTime();
}

@Override
Expand All @@ -190,7 +162,7 @@ public Future<Void> schedule(Runnable command, long delay, TimeUnit unit) {
delay = 0;
}
RunnableScheduledFuture<Void> task = newScheduledTaskFor(
callable(command, null), deadlineNanos(getCurrentTimeNanos(), unit.toNanos(delay)), 0);
callable(command, null), deadlineNanos(ticker().nanoTime(), unit.toNanos(delay)), 0);
return schedule(task);
}

Expand All @@ -202,7 +174,7 @@ public <V> Future<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
delay = 0;
}
RunnableScheduledFuture<V> task = newScheduledTaskFor(
callable, deadlineNanos(getCurrentTimeNanos(), unit.toNanos(delay)), 0);
callable, deadlineNanos(ticker().nanoTime(), unit.toNanos(delay)), 0);
return schedule(task);
}

Expand All @@ -221,7 +193,7 @@ public Future<Void> scheduleAtFixedRate(Runnable command, long initialDelay, lon

RunnableScheduledFuture<Void> task = newScheduledTaskFor(
callable(command, null),
deadlineNanos(getCurrentTimeNanos(), unit.toNanos(initialDelay)), unit.toNanos(period));
deadlineNanos(ticker().nanoTime(), unit.toNanos(initialDelay)), unit.toNanos(period));
return schedule(task);
}

Expand All @@ -240,7 +212,7 @@ public Future<Void> scheduleWithFixedDelay(Runnable command, long initialDelay,

RunnableScheduledFuture<Void> task = newScheduledTaskFor(
callable(command, null),
deadlineNanos(getCurrentTimeNanos(), unit.toNanos(initialDelay)), -unit.toNanos(delay));
deadlineNanos(ticker().nanoTime(), unit.toNanos(initialDelay)), -unit.toNanos(delay));
return schedule(task);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright 2023 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty5.util.concurrent;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import static io.netty5.util.internal.ObjectUtil.checkPositiveOrZero;

/**
* The default {@link MockTicker} implementation.
*/
final class DefaultMockTicker implements MockTicker {

private final Lock lock = new ReentrantLock();
private final Condition cond = lock.newCondition();
private final AtomicLong nanoTime = new AtomicLong();

@Override
public long nanoTime() {
return nanoTime.get();
}

@Override
public void sleep(long delay, TimeUnit unit) throws InterruptedException {
checkPositiveOrZero(delay, "delay");
if (delay == 0) {
return;
}

final long startTimeNanos = nanoTime();
final long delayNanos = unit.toNanos(delay);
lock.lockInterruptibly();
try {
do {
cond.await();
} while (nanoTime() - startTimeNanos < delayNanos);
} finally {
lock.unlock();
}
}

@Override
public void advance(long amount, TimeUnit unit) {
checkPositiveOrZero(amount, "amount");
if (amount == 0) {
return;
}

final long amountNanos = unit.toNanos(amount);
lock.lock();
try {
nanoTime.addAndGet(amountNanos);
cond.signalAll();
} finally {
lock.unlock();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,9 @@ private GlobalEventExecutor() {
this, newPromise(), Executors.callable(() -> {
// NOOP
}, null),
// note: the getCurrentTimeNanos() call here only works because this is a final class, otherwise
// note: the ticker().nanoTime() call here only works because this is a final class, otherwise
// the method could be overridden leading to unsafe initialization here!
deadlineNanos(getCurrentTimeNanos(), SCHEDULE_QUIET_PERIOD_INTERVAL),
deadlineNanos(ticker().nanoTime(), SCHEDULE_QUIET_PERIOD_INTERVAL),
-SCHEDULE_QUIET_PERIOD_INTERVAL);

scheduledTaskQueue().add(quietPeriodTask);
Expand Down Expand Up @@ -139,7 +139,7 @@ private Runnable takeTask() {
}

private void fetchFromScheduledTaskQueue() {
long nanoTime = getCurrentTimeNanos();
long nanoTime = ticker().nanoTime();
Runnable scheduledTask = pollScheduledTask(nanoTime);
while (scheduledTask != null) {
taskQueue.add(scheduledTask);
Expand Down
47 changes: 47 additions & 0 deletions common/src/main/java/io/netty5/util/concurrent/MockTicker.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright 2023 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty5.util.concurrent;

import java.util.concurrent.TimeUnit;

/**
* A fake {@link Ticker} that allows the caller control the flow of time.
* This can be useful when you test time-sensitive logic without waiting for too long
* or introducing flakiness due to non-deterministic nature of system clock.
*/
public interface MockTicker extends Ticker {
@Override
default long initialNanoTime() {
return 0;
}

/**
* Advances the current {@link #nanoTime()} by the given amount of time.
*
* @param amount the amount of time to advance this ticker by.
* @param unit the {@link TimeUnit} of {@code amount}.
*/
void advance(long amount, TimeUnit unit);

/**
* Advances the current {@link #nanoTime()} by the given amount of time.
*
* @param amountMillis the number of milliseconds to advance this ticker by.
*/
default void advanceMillis(long amountMillis) {
advance(amountMillis, TimeUnit.MILLISECONDS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public long deadlineNanos() {

@Override
public long delayNanos() {
return delayNanos(executor.getCurrentTimeNanos());
return delayNanos(executor.ticker().nanoTime());
}

@Override
Expand Down Expand Up @@ -128,7 +128,7 @@ public void run() {
if (p > 0) {
deadlineNanos += p;
} else {
deadlineNanos = executor.getCurrentTimeNanos() - p;
deadlineNanos = executor.ticker().nanoTime() - p;
}
if (!isCancelled()) {
executor.schedule(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ protected final Runnable takeTask() {
}

private boolean fetchFromScheduledTaskQueue() {
long nanoTime = getCurrentTimeNanos();
long nanoTime = ticker().nanoTime();
RunnableScheduledFuture<?> scheduledTask = pollScheduledTask(nanoTime);
while (scheduledTask != null) {
if (!taskQueue.offer(scheduledTask)) {
Expand Down Expand Up @@ -379,7 +379,7 @@ protected int runAllTasks(int maxTasks) {
*/
protected final long delayNanos(long currentTimeNanos) {
assert inEventLoop();
currentTimeNanos -= START_TIME;
currentTimeNanos -= SystemTicker.START_TIME;
RunnableScheduledFuture<?> scheduledTask = peekScheduledTask();
if (scheduledTask == null) {
return SCHEDULE_PURGE_INTERVAL;
Expand All @@ -389,7 +389,7 @@ protected final long delayNanos(long currentTimeNanos) {
}

/**
* Returns the absolute point in time (relative to {@link #getCurrentTimeNanos()} ()}) at which the next
* Returns the absolute point in time (relative to {@link #ticker().nanoTime()} ()}) at which the next
* closest scheduled task should run or {@code -1} if none is scheduled at the mment.
*
* This method must be called from the {@link EventExecutor} thread.
Expand All @@ -410,7 +410,7 @@ protected final long deadlineNanos() {
*/
protected final void updateLastExecutionTime() {
assert inEventLoop();
lastExecutionTime = getCurrentTimeNanos();
lastExecutionTime = ticker().nanoTime();
}

/**
Expand Down Expand Up @@ -597,7 +597,7 @@ boolean confirmShutdown0() {
cancelScheduledTasks();

if (gracefulShutdownStartTime == 0) {
gracefulShutdownStartTime = getCurrentTimeNanos();
gracefulShutdownStartTime = ticker().nanoTime();
}

if (runAllTasks() || runShutdownHooks()) {
Expand All @@ -616,7 +616,7 @@ boolean confirmShutdown0() {
return false;
}

final long nanoTime = getCurrentTimeNanos();
final long nanoTime = ticker().nanoTime();

if (isShutdown() || nanoTime - gracefulShutdownStartTime > gracefulShutdownTimeout) {
return true;
Expand Down
39 changes: 39 additions & 0 deletions common/src/main/java/io/netty5/util/concurrent/SystemTicker.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright 2023 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty5.util.concurrent;

import java.util.concurrent.TimeUnit;

enum SystemTicker implements Ticker {
INSTANCE;

static final long START_TIME = System.nanoTime();

@Override
public long initialNanoTime() {
return START_TIME;
}

@Override
public long nanoTime() {
return System.nanoTime() - START_TIME;
}

@Override
public void sleep(long delay, TimeUnit unit) throws InterruptedException {
Thread.sleep(unit.toMillis(delay), (int) (delay % 1_000_000));

This comment has been minimized.

Copy link
@yawkat

yawkat Jan 30, 2023

Contributor

missing toNanos

}
}

0 comments on commit ec381a1

Please sign in to comment.