Skip to content

Commit

Permalink
Allow controlling time flow for EmbeddedEventLoop (netty#12459)
Browse files Browse the repository at this point in the history
Motivation:

Tests using EmbeddedEventLoop can run faster if they can "advance time" so that scheduled tasks (e.g. timeouts) run earlier. Additionally, "freeze time" functionality can improve reliability of such tests.

Modification:

- Introduce a protected method `AbstractScheduledEventExecutor.getCurrentTimeNanos` that replaces the previous static `nanoTime` method (now deprecated). Replace usages of `nanoTime` with the new method.
- Override `getCurrentTimeNanos` with the new time control (freeze, unfreeze, advanceBy) features in `EmbeddedEventLoop`.
- Add a microbenchmark that tests one of the sites that seemed most likely to see negative performance impact by the change (`ScheduledFutureTask.delayNanos`).

Result:

Fixes  netty#12433. 

Local runs of the `ScheduleFutureTaskBenchmark` microbenchmark shows no evidence for performance impact (within error bounds of each other):

```
before:
Benchmark                                                   (num)   Mode  Cnt    Score    Error  Units
ScheduleFutureTaskBenchmark.scheduleCancelLotsOutsideLoop  100000  thrpt   20  132.437 ± 15.116  ops/s
ScheduleFutureTaskBenchmark.scheduleLots                   100000  thrpt   20  694.475 ±  8.184  ops/s
ScheduleFutureTaskBenchmark.scheduleLotsOutsideLoop        100000  thrpt   20   88.037 ±  4.013  ops/s
after:
Benchmark                                                   (num)   Mode  Cnt    Score   Error  Units
ScheduleFutureTaskBenchmark.scheduleCancelLotsOutsideLoop  100000  thrpt   20  149.629 ± 7.514  ops/s
ScheduleFutureTaskBenchmark.scheduleLots                   100000  thrpt   20  688.954 ± 7.831  ops/s
ScheduleFutureTaskBenchmark.scheduleLotsOutsideLoop        100000  thrpt   20   85.426 ± 1.104  ops/s
```

The new `ScheduleFutureTaskDeadlineBenchmark` shows some performance degradation:

```
before:
Benchmark                                             Mode  Cnt         Score        Error  Units
ScheduleFutureTaskDeadlineBenchmark.requestDeadline  thrpt   20  60726336.795 ± 280054.533  ops/s
after:
Benchmark                                             Mode  Cnt         Score        Error  Units
ScheduleFutureTaskDeadlineBenchmark.requestDeadline  thrpt   20  56948231.480 ± 188264.092  ops/s
```

The difference is small, but it's there, so I investigated this further using jitwatch. Looking at the generated assembly, the call to `getCurrentTimeNanos` is devirtualized and inlined in the absence of `EmbeddedEventLoop`, so the code is mostly identical. However there is the added getfield and checkcast for the executor, which probably explains the discrepancy.

In my opinion this is acceptable, because the performance impact is not severe, this use is likely the worst case (virtual call through `scheduledExecutorService()`), and it is never as hot as in this benchmark.

Note that if an `EmbeddedEventLoop` is present in the application, the performance impact is likely substantially higher, because this would necessitate a virtual call. However this is not an issue for production applications, and the affected code is still not very hot.

Co-authored-by: Norman Maurer <norman_maurer@apple.com>
  • Loading branch information
2 people authored and 夏无影 committed Jul 8, 2022
1 parent 1691abe commit bb9455b
Show file tree
Hide file tree
Showing 10 changed files with 282 additions and 81 deletions.
Expand Up @@ -19,8 +19,6 @@
import io.netty.util.internal.ObjectUtil;
import io.netty.util.internal.PriorityQueue;

import static io.netty.util.concurrent.ScheduledFutureTask.deadlineNanos;

import java.util.Comparator;
import java.util.Queue;
import java.util.concurrent.Callable;
Expand All @@ -38,7 +36,9 @@ public int compare(ScheduledFutureTask<?> o1, ScheduledFutureTask<?> o2) {
}
};

static final Runnable WAKEUP_TASK = new Runnable() {
private static final long START_TIME = System.nanoTime();

static final Runnable WAKEUP_TASK = new Runnable() {
@Override
public void run() { } // Do nothing
};
Expand All @@ -54,8 +54,36 @@ protected AbstractScheduledEventExecutor(EventExecutorGroup parent) {
super(parent);
}

/**
* Get the current time in nanoseconds by this executor's clock. This is not the same as {@link System#nanoTime()}
* for two reasons:
*
* <ul>
* <li>We apply a fixed offset to the {@link System#nanoTime() nanoTime}</li>
* <li>Implementations (in particular EmbeddedEventLoop) may use their own time source so they can control time
* for testing purposes.</li>
* </ul>
*/
protected long getCurrentTimeNanos() {
return defaultCurrentTimeNanos();
}

/**
* @deprecated Use the non-static {@link #getCurrentTimeNanos()} instead.
*/
@Deprecated
protected static long nanoTime() {
return ScheduledFutureTask.nanoTime();
return defaultCurrentTimeNanos();
}

static long defaultCurrentTimeNanos() {
return System.nanoTime() - START_TIME;
}

static long deadlineNanos(long nanoTime, long delay) {
long deadlineNanos = nanoTime + delay;
// Guard against overflow
return deadlineNanos < 0 ? Long.MAX_VALUE : deadlineNanos;
}

/**
Expand All @@ -65,15 +93,15 @@ protected static long nanoTime() {
* @return the number of nano seconds from now {@code deadlineNanos} would expire.
*/
protected static long deadlineToDelayNanos(long deadlineNanos) {
return ScheduledFutureTask.deadlineToDelayNanos(deadlineNanos);
return ScheduledFutureTask.deadlineToDelayNanos(defaultCurrentTimeNanos(), deadlineNanos);
}

/**
* The initial value used for delay and computations based upon a monatomic time source.
* @return initial value used for delay and computations based upon a monatomic time source.
*/
protected static long initialNanoTime() {
return ScheduledFutureTask.initialNanoTime();
return START_TIME;
}

PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue() {
Expand Down Expand Up @@ -116,12 +144,12 @@ protected void cancelScheduledTasks() {
* @see #pollScheduledTask(long)
*/
protected final Runnable pollScheduledTask() {
return pollScheduledTask(nanoTime());
return pollScheduledTask(getCurrentTimeNanos());
}

/**
* Return the {@link Runnable} which is ready to be executed with the given {@code nanoTime}.
* You should use {@link #nanoTime()} to retrieve the correct {@code nanoTime}.
* You should use {@link #getCurrentTimeNanos()} to retrieve the correct {@code nanoTime}.
*/
protected final Runnable pollScheduledTask(long nanoTime) {
assert inEventLoop();
Expand Down Expand Up @@ -162,7 +190,7 @@ final ScheduledFutureTask<?> peekScheduledTask() {
*/
protected final boolean hasScheduledTasks() {
ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
return scheduledTask != null && scheduledTask.deadlineNanos() <= nanoTime();
return scheduledTask != null && scheduledTask.deadlineNanos() <= getCurrentTimeNanos();
}

@Override
Expand All @@ -177,7 +205,7 @@ public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
return schedule(new ScheduledFutureTask<Void>(
this,
command,
deadlineNanos(unit.toNanos(delay))));
deadlineNanos(getCurrentTimeNanos(), unit.toNanos(delay))));
}

@Override
Expand All @@ -189,7 +217,8 @@ public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUni
}
validateScheduled0(delay, unit);

return schedule(new ScheduledFutureTask<V>(this, callable, deadlineNanos(unit.toNanos(delay))));
return schedule(new ScheduledFutureTask<V>(
this, callable, deadlineNanos(getCurrentTimeNanos(), unit.toNanos(delay))));
}

@Override
Expand All @@ -208,7 +237,7 @@ public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDela
validateScheduled0(period, unit);

return schedule(new ScheduledFutureTask<Void>(
this, command, deadlineNanos(unit.toNanos(initialDelay)), unit.toNanos(period)));
this, command, deadlineNanos(getCurrentTimeNanos(), unit.toNanos(initialDelay)), unit.toNanos(period)));
}

@Override
Expand All @@ -228,7 +257,7 @@ public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialD
validateScheduled0(delay, unit);

return schedule(new ScheduledFutureTask<Void>(
this, command, deadlineNanos(unit.toNanos(initialDelay)), -unit.toNanos(delay)));
this, command, deadlineNanos(getCurrentTimeNanos(), unit.toNanos(initialDelay)), -unit.toNanos(delay)));
}

@SuppressWarnings("deprecation")
Expand Down Expand Up @@ -291,7 +320,7 @@ final void removeScheduled(final ScheduledFutureTask<?> task) {
* to wake the {@link EventExecutor} thread if required.
*
* @param deadlineNanos deadline of the to-be-scheduled task
* relative to {@link AbstractScheduledEventExecutor#nanoTime()}
* relative to {@link AbstractScheduledEventExecutor#getCurrentTimeNanos()}
* @return {@code true} if the {@link EventExecutor} thread should be woken, {@code false} otherwise
*/
protected boolean beforeScheduledTaskSubmitted(long deadlineNanos) {
Expand All @@ -301,7 +330,7 @@ protected boolean beforeScheduledTaskSubmitted(long deadlineNanos) {
/**
* See {@link #beforeScheduledTaskSubmitted(long)}. Called only after that method returns false.
*
* @param deadlineNanos relative to {@link AbstractScheduledEventExecutor#nanoTime()}
* @param deadlineNanos relative to {@link AbstractScheduledEventExecutor#getCurrentTimeNanos()}
* @return {@code true} if the {@link EventExecutor} thread should be woken, {@code false} otherwise
*/
protected boolean afterScheduledTaskSubmitted(long deadlineNanos) {
Expand Down
Expand Up @@ -53,7 +53,12 @@ public final class GlobalEventExecutor extends AbstractScheduledEventExecutor im
public void run() {
// NOOP
}
}, null), ScheduledFutureTask.deadlineNanos(SCHEDULE_QUIET_PERIOD_INTERVAL), -SCHEDULE_QUIET_PERIOD_INTERVAL);
}, null),
// note: the getCurrentTimeNanos() call here only works because this is a final class, otherwise the method
// could be overridden leading to unsafe initialization here!
deadlineNanos(getCurrentTimeNanos(), SCHEDULE_QUIET_PERIOD_INTERVAL),
-SCHEDULE_QUIET_PERIOD_INTERVAL
);

// because the GlobalEventExecutor is a singleton, tasks submitted to it can come from arbitrary threads and this
// can trigger the creation of a thread from arbitrary thread groups; for this reason, the thread factory must not
Expand Down Expand Up @@ -117,7 +122,7 @@ Runnable takeTask() {
}

private void fetchFromScheduledTaskQueue() {
long nanoTime = AbstractScheduledEventExecutor.nanoTime();
long nanoTime = getCurrentTimeNanos();
Runnable scheduledTask = pollScheduledTask(nanoTime);
while (scheduledTask != null) {
taskQueue.add(scheduledTask);
Expand Down
Expand Up @@ -25,22 +25,6 @@

@SuppressWarnings("ComparableImplementedButEqualsNotOverridden")
final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFuture<V>, PriorityQueueNode {
private static final long START_TIME = System.nanoTime();

static long nanoTime() {
return System.nanoTime() - START_TIME;
}

static long deadlineNanos(long delay) {
long deadlineNanos = nanoTime() + delay;
// Guard against overflow
return deadlineNanos < 0 ? Long.MAX_VALUE : deadlineNanos;
}

static long initialNanoTime() {
return START_TIME;
}

// set once when added to priority queue
private long id;

Expand Down Expand Up @@ -109,22 +93,21 @@ void setConsumed() {
// Optimization to avoid checking system clock again
// after deadline has passed and task has been dequeued
if (periodNanos == 0) {
assert nanoTime() >= deadlineNanos;
assert scheduledExecutor().getCurrentTimeNanos() >= deadlineNanos;
deadlineNanos = 0L;
}
}

public long delayNanos() {
return deadlineToDelayNanos(deadlineNanos());
return delayNanos(scheduledExecutor().getCurrentTimeNanos());
}

static long deadlineToDelayNanos(long deadlineNanos) {
return deadlineNanos == 0L ? 0L : Math.max(0L, deadlineNanos - nanoTime());
static long deadlineToDelayNanos(long currentTimeNanos, long deadlineNanos) {
return deadlineNanos == 0L ? 0L : Math.max(0L, deadlineNanos - currentTimeNanos);
}

public long delayNanos(long currentTimeNanos) {
return deadlineNanos == 0L ? 0L
: Math.max(0L, deadlineNanos() - (currentTimeNanos - START_TIME));
return deadlineToDelayNanos(currentTimeNanos, deadlineNanos);
}

@Override
Expand Down Expand Up @@ -178,7 +161,7 @@ public void run() {
if (periodNanos > 0) {
deadlineNanos += periodNanos;
} else {
deadlineNanos = nanoTime() - periodNanos;
deadlineNanos = scheduledExecutor().getCurrentTimeNanos() - periodNanos;
}
if (!isCancelled()) {
scheduledExecutor().scheduledTaskQueue().add(this);
Expand Down
Expand Up @@ -280,7 +280,7 @@ private boolean fetchFromScheduledTaskQueue() {
if (scheduledTaskQueue == null || scheduledTaskQueue.isEmpty()) {
return true;
}
long nanoTime = AbstractScheduledEventExecutor.nanoTime();
long nanoTime = getCurrentTimeNanos();
for (;;) {
Runnable scheduledTask = pollScheduledTask(nanoTime);
if (scheduledTask == null) {
Expand All @@ -301,7 +301,7 @@ private boolean executeExpiredScheduledTasks() {
if (scheduledTaskQueue == null || scheduledTaskQueue.isEmpty()) {
return false;
}
long nanoTime = AbstractScheduledEventExecutor.nanoTime();
long nanoTime = getCurrentTimeNanos();
Runnable scheduledTask = pollScheduledTask(nanoTime);
if (scheduledTask == null) {
return false;
Expand Down Expand Up @@ -378,7 +378,7 @@ protected boolean runAllTasks() {
} while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.

if (ranAtLeastOne) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
lastExecutionTime = getCurrentTimeNanos();
}
afterRunningAllTasks();
return ranAtLeastOne;
Expand All @@ -403,7 +403,7 @@ protected final boolean runScheduledAndExecutorTasks(final int maxDrainAttempts)
} while (ranAtLeastOneTask && ++drainAttempt < maxDrainAttempts);

if (drainAttempt > 0) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
lastExecutionTime = getCurrentTimeNanos();
}
afterRunningAllTasks();

Expand Down Expand Up @@ -463,7 +463,7 @@ protected boolean runAllTasks(long timeoutNanos) {
return false;
}

final long deadline = timeoutNanos > 0 ? ScheduledFutureTask.nanoTime() + timeoutNanos : 0;
final long deadline = timeoutNanos > 0 ? getCurrentTimeNanos() + timeoutNanos : 0;
long runTasks = 0;
long lastExecutionTime;
for (;;) {
Expand All @@ -474,15 +474,15 @@ protected boolean runAllTasks(long timeoutNanos) {
// Check timeout every 64 tasks because nanoTime() is relatively expensive.
// XXX: Hard-coded value - will make it configurable if it is really a problem.
if ((runTasks & 0x3F) == 0) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
lastExecutionTime = getCurrentTimeNanos();
if (lastExecutionTime >= deadline) {
break;
}
}

task = pollTask();
if (task == null) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
lastExecutionTime = getCurrentTimeNanos();
break;
}
}
Expand All @@ -502,6 +502,8 @@ protected void afterRunningAllTasks() { }
* Returns the amount of time left until the scheduled task with the closest dead line is executed.
*/
protected long delayNanos(long currentTimeNanos) {
currentTimeNanos -= initialNanoTime();

ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
if (scheduledTask == null) {
return SCHEDULE_PURGE_INTERVAL;
Expand All @@ -511,14 +513,14 @@ protected long delayNanos(long currentTimeNanos) {
}

/**
* Returns the absolute point in time (relative to {@link #nanoTime()}) at which the next
* Returns the absolute point in time (relative to {@link #getCurrentTimeNanos()}) at which the next
* closest scheduled task should run.
*/
@UnstableApi
protected long deadlineNanos() {
ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
if (scheduledTask == null) {
return nanoTime() + SCHEDULE_PURGE_INTERVAL;
return getCurrentTimeNanos() + SCHEDULE_PURGE_INTERVAL;
}
return scheduledTask.deadlineNanos();
}
Expand All @@ -531,7 +533,7 @@ protected long deadlineNanos() {
* checks.
*/
protected void updateLastExecutionTime() {
lastExecutionTime = ScheduledFutureTask.nanoTime();
lastExecutionTime = getCurrentTimeNanos();
}

/**
Expand Down Expand Up @@ -609,7 +611,7 @@ private boolean runShutdownHooks() {
}

if (ran) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
lastExecutionTime = getCurrentTimeNanos();
}

return ran;
Expand Down Expand Up @@ -755,7 +757,7 @@ protected boolean confirmShutdown() {
cancelScheduledTasks();

if (gracefulShutdownStartTime == 0) {
gracefulShutdownStartTime = ScheduledFutureTask.nanoTime();
gracefulShutdownStartTime = getCurrentTimeNanos();
}

if (runAllTasks() || runShutdownHooks()) {
Expand All @@ -774,7 +776,7 @@ protected boolean confirmShutdown() {
return false;
}

final long nanoTime = ScheduledFutureTask.nanoTime();
final long nanoTime = getCurrentTimeNanos();

if (isShutdown() || nanoTime - gracefulShutdownStartTime > gracefulShutdownTimeout) {
return true;
Expand Down
Expand Up @@ -15,6 +15,7 @@
*/
package io.netty.util.concurrent;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.function.Executable;

Expand Down Expand Up @@ -117,6 +118,12 @@ public void execute() {
});
}

@Test
public void testDeadlineNanosNotOverflow() {
Assertions.assertEquals(Long.MAX_VALUE, AbstractScheduledEventExecutor.deadlineNanos(
AbstractScheduledEventExecutor.defaultCurrentTimeNanos(), Long.MAX_VALUE));
}

private static final class TestScheduledEventExecutor extends AbstractScheduledEventExecutor {
@Override
public boolean isShuttingDown() {
Expand Down

0 comments on commit bb9455b

Please sign in to comment.