Skip to content

Commit

Permalink
Allow controlling time flow for EmbeddedEventLoop (#12459) (#12464)
Browse files Browse the repository at this point in the history
Motivation:

Tests using EmbeddedEventLoop can run faster if they can "advance time" so that scheduled tasks (e.g. timeouts) run earlier. Additionally, "freeze time" functionality can improve reliability of such tests.

Modification:

- Introduce a protected method `AbstractScheduledEventExecutor.getCurrentTimeNanos` that replaces the previous static `nanoTime` method (now deprecated). Replace usages of `nanoTime` with the new method.
- Override `getCurrentTimeNanos` with the new time control (freeze, unfreeze, advanceBy) features in `EmbeddedEventLoop`.
- Add a microbenchmark that tests one of the sites that seemed most likely to see negative performance impact by the change (`ScheduledFutureTask.delayNanos`).

Result:

Fixes  #12433.

Local runs of the `ScheduleFutureTaskBenchmark` microbenchmark shows no evidence for performance impact (within error bounds of each other):

```
before:
Benchmark                                                   (num)   Mode  Cnt    Score    Error  Units
ScheduleFutureTaskBenchmark.scheduleCancelLotsOutsideLoop  100000  thrpt   20  132.437 ± 15.116  ops/s
ScheduleFutureTaskBenchmark.scheduleLots                   100000  thrpt   20  694.475 ±  8.184  ops/s
ScheduleFutureTaskBenchmark.scheduleLotsOutsideLoop        100000  thrpt   20   88.037 ±  4.013  ops/s
after:
Benchmark                                                   (num)   Mode  Cnt    Score   Error  Units
ScheduleFutureTaskBenchmark.scheduleCancelLotsOutsideLoop  100000  thrpt   20  149.629 ± 7.514  ops/s
ScheduleFutureTaskBenchmark.scheduleLots                   100000  thrpt   20  688.954 ± 7.831  ops/s
ScheduleFutureTaskBenchmark.scheduleLotsOutsideLoop        100000  thrpt   20   85.426 ± 1.104  ops/s
```

The new `ScheduleFutureTaskDeadlineBenchmark` shows some performance degradation:

```
before:
Benchmark                                             Mode  Cnt         Score        Error  Units
ScheduleFutureTaskDeadlineBenchmark.requestDeadline  thrpt   20  60726336.795 ± 280054.533  ops/s
after:
Benchmark                                             Mode  Cnt         Score        Error  Units
ScheduleFutureTaskDeadlineBenchmark.requestDeadline  thrpt   20  56948231.480 ± 188264.092  ops/s
```

The difference is small, but it's there, so I investigated this further using jitwatch. Looking at the generated assembly, the call to `getCurrentTimeNanos` is devirtualized and inlined in the absence of `EmbeddedEventLoop`, so the code is mostly identical. However there is the added getfield and checkcast for the executor, which probably explains the discrepancy.

In my opinion this is acceptable, because the performance impact is not severe, this use is likely the worst case (virtual call through `scheduledExecutorService()`), and it is never as hot as in this benchmark.

Note that if an `EmbeddedEventLoop` is present in the application, the performance impact is likely substantially higher, because this would necessitate a virtual call. However this is not an issue for production applications, and the affected code is still not very hot.

Co-authored-by: Norman Maurer <norman_maurer@apple.com>
Co-authored-by: Jonas Konrad <me@yawk.at>
  • Loading branch information
normanmaurer and yawkat committed Jun 13, 2022
1 parent 63e4f36 commit 4a8bc5e
Show file tree
Hide file tree
Showing 7 changed files with 151 additions and 33 deletions.
Expand Up @@ -50,16 +50,39 @@ protected AbstractScheduledEventExecutor() {
* {@link System#nanoTime()}.
*/
public static long nanoTime() {
return System.nanoTime() - START_TIME;
return defaultCurrentTimeNanos();
}

/**
* The initial value used for delay and computations based upon a monatomic time source.
* @return initial value used for delay and computations based upon a monatomic time source.
*/
protected static long initialNanoTime() {
return START_TIME;
}

/**
* The deadline (in nanoseconds) for a given delay (in nanoseconds).
* Get the current time in nanoseconds by this executor's clock. This is not the same as {@link System#nanoTime()}
* for two reasons:
*
* <ul>
* <li>We apply a fixed offset to the {@link System#nanoTime() nanoTime}</li>
* <li>Implementations (in particular EmbeddedEventLoop) may use their own time source so they can control time
* for testing purposes.</li>
* </ul>
*/
static long deadlineNanos(long delay) {
long deadlineNanos = nanoTime() + delay;
protected long getCurrentTimeNanos() {
return defaultCurrentTimeNanos();
}

static long defaultCurrentTimeNanos() {
return System.nanoTime() - START_TIME;
}

static long deadlineNanos(long nanoTime, long delay) {
long deadlineNanos = nanoTime + delay;
// Guard against overflow
return deadlineNanos < 0? Long.MAX_VALUE : deadlineNanos;
return deadlineNanos < 0 ? Long.MAX_VALUE : deadlineNanos;
}

PriorityQueue<RunnableScheduledFutureNode<?>> scheduledTaskQueue() {
Expand Down Expand Up @@ -102,12 +125,12 @@ protected final void cancelScheduledTasks() {
* @see #pollScheduledTask(long)
*/
protected final RunnableScheduledFuture<?> pollScheduledTask() {
return pollScheduledTask(nanoTime());
return pollScheduledTask(getCurrentTimeNanos());
}

/**
* Return the {@link Runnable} which is ready to be executed with the given {@code nanoTime}. You should use {@link
* #nanoTime()} to retrieve the correct {@code nanoTime}.
* #getCurrentTimeNanos()} to retrieve the correct {@code nanoTime}.
* <p>
* This method MUST be called only when {@link #inEventLoop()} is {@code true}.
*/
Expand Down Expand Up @@ -138,7 +161,7 @@ protected final long nextScheduledTaskNano() {
if (scheduledTask == null) {
return -1;
}
return Math.max(0, scheduledTask.deadlineNanos() - nanoTime());
return Math.max(0, scheduledTask.deadlineNanos() - getCurrentTimeNanos());
}

final RunnableScheduledFuture<?> peekScheduledTask() {
Expand All @@ -158,7 +181,7 @@ protected final boolean hasScheduledTasks() {
assert inEventLoop();
Queue<RunnableScheduledFutureNode<?>> scheduledTaskQueue = this.scheduledTaskQueue;
RunnableScheduledFutureNode<?> scheduledTask = scheduledTaskQueue == null? null : scheduledTaskQueue.peek();
return scheduledTask != null && scheduledTask.deadlineNanos() <= nanoTime();
return scheduledTask != null && scheduledTask.deadlineNanos() <= getCurrentTimeNanos();
}

@Override
Expand All @@ -169,7 +192,7 @@ public Future<Void> schedule(Runnable command, long delay, TimeUnit unit) {
delay = 0;
}
RunnableScheduledFuture<Void> task = newScheduledTaskFor(
callable(command, null), deadlineNanos(unit.toNanos(delay)), 0);
callable(command, null), deadlineNanos(getCurrentTimeNanos(), unit.toNanos(delay)), 0);
return schedule(task);
}

Expand All @@ -180,7 +203,8 @@ public <V> Future<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
if (delay < 0) {
delay = 0;
}
RunnableScheduledFuture<V> task = newScheduledTaskFor(callable, deadlineNanos(unit.toNanos(delay)), 0);
RunnableScheduledFuture<V> task = newScheduledTaskFor(
callable, deadlineNanos(getCurrentTimeNanos(), unit.toNanos(delay)), 0);
return schedule(task);
}

Expand All @@ -198,7 +222,8 @@ public Future<Void> scheduleAtFixedRate(Runnable command, long initialDelay, lon
}

RunnableScheduledFuture<Void> task = newScheduledTaskFor(
callable(command, null), deadlineNanos(unit.toNanos(initialDelay)), unit.toNanos(period));
callable(command, null),
deadlineNanos(getCurrentTimeNanos(), unit.toNanos(initialDelay)), unit.toNanos(period));
return schedule(task);
}

Expand All @@ -216,7 +241,8 @@ public Future<Void> scheduleWithFixedDelay(Runnable command, long initialDelay,
}

RunnableScheduledFuture<Void> task = newScheduledTaskFor(
callable(command, null), deadlineNanos(unit.toNanos(initialDelay)), -unit.toNanos(delay));
callable(command, null),
deadlineNanos(getCurrentTimeNanos(), unit.toNanos(initialDelay)), -unit.toNanos(delay));
return schedule(task);
}

Expand Down
Expand Up @@ -45,17 +45,11 @@ public final class GlobalEventExecutor extends AbstractScheduledEventExecutor im

private static final long SCHEDULE_QUIET_PERIOD_INTERVAL = TimeUnit.SECONDS.toNanos(1);

private static final RunnableScheduledFutureAdapter<Void> QUIET_PERIOD_TASK;
private final RunnableScheduledFutureAdapter<Void> quietPeriodTask;
public static final GlobalEventExecutor INSTANCE;

static {
INSTANCE = new GlobalEventExecutor();
QUIET_PERIOD_TASK = new RunnableScheduledFutureAdapter<>(
INSTANCE, INSTANCE.newPromise(), Executors.callable(() -> {
// NOOP
}, null), deadlineNanos(SCHEDULE_QUIET_PERIOD_INTERVAL), -SCHEDULE_QUIET_PERIOD_INTERVAL);

INSTANCE.scheduledTaskQueue().add(QUIET_PERIOD_TASK);
}

private final BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>();
Expand All @@ -75,6 +69,16 @@ public final class GlobalEventExecutor extends AbstractScheduledEventExecutor im
private GlobalEventExecutor() {
threadFactory = ThreadExecutorMap.apply(new DefaultThreadFactory(
DefaultThreadFactory.toPoolName(getClass()), false, Thread.NORM_PRIORITY, null), this);
quietPeriodTask = new RunnableScheduledFutureAdapter<>(
this, newPromise(), Executors.callable(() -> {
// NOOP
}, null),
// note: the getCurrentTimeNanos() call here only works because this is a final class, otherwise
// the method could be overridden leading to unsafe initialization here!
deadlineNanos(getCurrentTimeNanos(), SCHEDULE_QUIET_PERIOD_INTERVAL),
-SCHEDULE_QUIET_PERIOD_INTERVAL);

scheduledTaskQueue().add(quietPeriodTask);
}

/**
Expand Down Expand Up @@ -122,7 +126,7 @@ private Runnable takeTask() {
}

private void fetchFromScheduledTaskQueue() {
long nanoTime = AbstractScheduledEventExecutor.nanoTime();
long nanoTime = getCurrentTimeNanos();
Runnable scheduledTask = pollScheduledTask(nanoTime);
while (scheduledTask != null) {
taskQueue.add(scheduledTask);
Expand Down Expand Up @@ -243,7 +247,7 @@ public void run() {
logger.warn("Unexpected exception from the global event executor: ", t);
}

if (task != QUIET_PERIOD_TASK) {
if (task != quietPeriodTask) {
continue;
}
}
Expand Down
Expand Up @@ -64,12 +64,16 @@ public long deadlineNanos() {

@Override
public long delayNanos() {
return Math.max(0, deadlineNanos() - AbstractScheduledEventExecutor.nanoTime());
return delayNanos(executor.getCurrentTimeNanos());
}

@Override
public long delayNanos(long currentTimeNanos) {
return Math.max(0, deadlineNanos() - (currentTimeNanos - AbstractScheduledEventExecutor.START_TIME));
return deadlineToDelayNanos(currentTimeNanos, deadlineNanos);
}

private static long deadlineToDelayNanos(long currentTimeNanos, long deadlineNanos) {
return deadlineNanos == 0L ? 0L : Math.max(0L, deadlineNanos - currentTimeNanos);
}

@Override
Expand Down Expand Up @@ -127,7 +131,7 @@ public void run() {
if (p > 0) {
deadlineNanos += p;
} else {
deadlineNanos = AbstractScheduledEventExecutor.nanoTime() - p;
deadlineNanos = executor.getCurrentTimeNanos() - p;
}
if (!isCancelled()) {
executor.schedule(this);
Expand Down
Expand Up @@ -250,7 +250,7 @@ protected final Runnable takeTask() {
}

private boolean fetchFromScheduledTaskQueue() {
long nanoTime = AbstractScheduledEventExecutor.nanoTime();
long nanoTime = getCurrentTimeNanos();
RunnableScheduledFuture<?> scheduledTask = pollScheduledTask(nanoTime);
while (scheduledTask != null) {
if (!taskQueue.offer(scheduledTask)) {
Expand Down Expand Up @@ -379,6 +379,7 @@ protected int runAllTasks(int maxTasks) {
*/
protected final long delayNanos(long currentTimeNanos) {
assert inEventLoop();
currentTimeNanos -= START_TIME;
RunnableScheduledFuture<?> scheduledTask = peekScheduledTask();
if (scheduledTask == null) {
return SCHEDULE_PURGE_INTERVAL;
Expand All @@ -388,7 +389,7 @@ protected final long delayNanos(long currentTimeNanos) {
}

/**
* Returns the absolute point in time (relative to {@link #nanoTime()}) at which the the next
* Returns the absolute point in time (relative to {@link #getCurrentTimeNanos()} ()}) at which the next
* closest scheduled task should run.
*
* This method must be called from the {@link EventExecutor} thread.
Expand All @@ -397,7 +398,7 @@ protected final long deadlineNanos() {
assert inEventLoop();
RunnableScheduledFuture<?> scheduledTask = peekScheduledTask();
if (scheduledTask == null) {
return nanoTime() + SCHEDULE_PURGE_INTERVAL;
return getCurrentTimeNanos() + SCHEDULE_PURGE_INTERVAL;
}
return scheduledTask.deadlineNanos();
}
Expand All @@ -412,7 +413,7 @@ protected final long deadlineNanos() {
*/
protected final void updateLastExecutionTime() {
assert inEventLoop();
lastExecutionTime = nanoTime();
lastExecutionTime = getCurrentTimeNanos();
}

/**
Expand Down Expand Up @@ -599,7 +600,7 @@ boolean confirmShutdown0() {
cancelScheduledTasks();

if (gracefulShutdownStartTime == 0) {
gracefulShutdownStartTime = nanoTime();
gracefulShutdownStartTime = getCurrentTimeNanos();
}

if (runAllTasks() || runShutdownHooks()) {
Expand All @@ -618,7 +619,7 @@ boolean confirmShutdown0() {
return false;
}

final long nanoTime = nanoTime();
final long nanoTime = getCurrentTimeNanos();

if (isShutdown() || nanoTime - gracefulShutdownStartTime > gracefulShutdownTimeout) {
return true;
Expand Down
Expand Up @@ -102,7 +102,8 @@ public void testScheduleWithFixedDelayNegative() {

@Test
public void testDeadlineNanosNotOverflow() {
assertEquals(Long.MAX_VALUE, AbstractScheduledEventExecutor.deadlineNanos(Long.MAX_VALUE));
assertEquals(Long.MAX_VALUE, AbstractScheduledEventExecutor.deadlineNanos(
AbstractScheduledEventExecutor.defaultCurrentTimeNanos(), Long.MAX_VALUE));
}

private static final class TestScheduledEventExecutor extends AbstractScheduledEventExecutor {
Expand Down
Expand Up @@ -42,6 +42,7 @@
import java.nio.channels.ClosedChannelException;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.TimeUnit;

import static io.netty5.util.internal.PlatformDependent.throwException;
import static java.util.Objects.requireNonNull;
Expand Down Expand Up @@ -614,6 +615,37 @@ private void recordException(Throwable cause) {
}
}

private EmbeddedEventLoop embeddedEventLoop() {
return (EmbeddedEventLoop) executor();
}
/**
* Advance the clock of the event loop of this channel by the given duration. Any scheduled tasks will execute
* sooner by the given time (but {@link #runScheduledPendingTasks()} still needs to be called).
*/
public void advanceTimeBy(long duration, TimeUnit unit) {
embeddedEventLoop().advanceTimeBy(unit.toNanos(duration));
}

/**
* Freeze the clock of this channel's event loop. Any scheduled tasks that are not already due will not run on
* future {@link #runScheduledPendingTasks()} calls. While the event loop is frozen, it is still possible to
* {@link #advanceTimeBy(long, TimeUnit) advance time} manually so that scheduled tasks execute.
*/
public void freezeTime() {
embeddedEventLoop().freezeTime();
}

/**
* Unfreeze an event loop that was {@link #freezeTime() frozen}. Time will continue at the point where
* {@link #freezeTime()} stopped it: if a task was scheduled ten minutes in the future and {@link #freezeTime()}
* was called, it will run ten minutes after this method is called again (assuming no
* {@link #advanceTimeBy(long, TimeUnit)} calls, and assuming pending scheduled tasks are run at that time using
* {@link #runScheduledPendingTasks()}).
*/
public void unfreezeTime() {
embeddedEventLoop().unfreezeTime();
}

/**
* Checks for the presence of an {@link Exception}.
*/
Expand Down
Expand Up @@ -28,6 +28,22 @@
import static java.util.Objects.requireNonNull;

final class EmbeddedEventLoop extends AbstractScheduledEventExecutor implements EventLoop {
/**
* When time is not {@link #timeFrozen frozen}, the base time to subtract from {@link System#nanoTime()}. When time
* is frozen, this variable is unused.
*
* Initialized to {@link #initialNanoTime()} so that until one of the time mutator methods is called,
* {@link #getCurrentTimeNanos()} matches the default behavior.
*/
private long startTime = initialNanoTime();
/**
* When time is frozen, the timestamp returned by {@link #getCurrentTimeNanos()}. When unfrozen, this is unused.
*/
private long frozenTimestamp;
/**
* Whether time is currently frozen.
*/
private boolean timeFrozen;

private final Queue<Runnable> tasks = new ArrayDeque<>(2);
boolean running;
Expand Down Expand Up @@ -91,7 +107,7 @@ void runTasks() {
}

long runScheduledTasks() {
long time = AbstractScheduledEventExecutor.nanoTime();
long time = getCurrentTimeNanos();
boolean wasRunning = running;
try {
for (;;) {
Expand Down Expand Up @@ -123,6 +139,40 @@ void cancelScheduled() {
}
}

@Override
protected long getCurrentTimeNanos() {
if (timeFrozen) {
return frozenTimestamp;
}
return System.nanoTime() - startTime;
}

void advanceTimeBy(long nanos) {
if (timeFrozen) {
frozenTimestamp += nanos;
} else {
// startTime is subtracted from nanoTime, so increasing the startTime will advance getCurrentTimeNanos
startTime -= nanos;
}
}

void freezeTime() {
if (!timeFrozen) {
frozenTimestamp = getCurrentTimeNanos();
timeFrozen = true;
}
}

void unfreezeTime() {
if (timeFrozen) {
// we want getCurrentTimeNanos to continue right where frozenTimestamp left off:
// getCurrentTimeNanos = nanoTime - startTime = frozenTimestamp
// then solve for startTime
startTime = System.nanoTime() - frozenTimestamp;
timeFrozen = false;
}
}

@Override
public Future<Void> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
throw new UnsupportedOperationException();
Expand Down

0 comments on commit 4a8bc5e

Please sign in to comment.