Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow controlling time flow for EmbeddedEventLoop #12459

Merged
merged 8 commits into from Jun 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -19,8 +19,6 @@
import io.netty.util.internal.ObjectUtil;
import io.netty.util.internal.PriorityQueue;

import static io.netty.util.concurrent.ScheduledFutureTask.deadlineNanos;

import java.util.Comparator;
import java.util.Queue;
import java.util.concurrent.Callable;
Expand All @@ -38,7 +36,9 @@ public int compare(ScheduledFutureTask<?> o1, ScheduledFutureTask<?> o2) {
}
};

static final Runnable WAKEUP_TASK = new Runnable() {
private static final long START_TIME = System.nanoTime();

static final Runnable WAKEUP_TASK = new Runnable() {
@Override
public void run() { } // Do nothing
};
Expand All @@ -54,8 +54,36 @@ protected AbstractScheduledEventExecutor(EventExecutorGroup parent) {
super(parent);
}

/**
* Get the current time in nanoseconds by this executor's clock. This is not the same as {@link System#nanoTime()}
* for two reasons:
*
* <ul>
* <li>We apply a fixed offset to the {@link System#nanoTime() nanoTime}</li>
* <li>Implementations (in particular EmbeddedEventLoop) may use their own time source so they can control time
* for testing purposes.</li>
* </ul>
*/
protected long getCurrentTimeNanos() {
return defaultCurrentTimeNanos();
}

/**
* @deprecated Use the non-static {@link #getCurrentTimeNanos()} instead.
*/
@Deprecated
protected static long nanoTime() {
return ScheduledFutureTask.nanoTime();
return defaultCurrentTimeNanos();
}

static long defaultCurrentTimeNanos() {
return System.nanoTime() - START_TIME;
}

static long deadlineNanos(long nanoTime, long delay) {
long deadlineNanos = nanoTime + delay;
// Guard against overflow
return deadlineNanos < 0 ? Long.MAX_VALUE : deadlineNanos;
}

/**
Expand All @@ -65,15 +93,15 @@ protected static long nanoTime() {
* @return the number of nano seconds from now {@code deadlineNanos} would expire.
*/
protected static long deadlineToDelayNanos(long deadlineNanos) {
return ScheduledFutureTask.deadlineToDelayNanos(deadlineNanos);
return ScheduledFutureTask.deadlineToDelayNanos(defaultCurrentTimeNanos(), deadlineNanos);
}

/**
* The initial value used for delay and computations based upon a monatomic time source.
* @return initial value used for delay and computations based upon a monatomic time source.
*/
protected static long initialNanoTime() {
return ScheduledFutureTask.initialNanoTime();
return START_TIME;
}

PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue() {
Expand Down Expand Up @@ -116,12 +144,12 @@ protected void cancelScheduledTasks() {
* @see #pollScheduledTask(long)
*/
protected final Runnable pollScheduledTask() {
return pollScheduledTask(nanoTime());
return pollScheduledTask(getCurrentTimeNanos());
}

/**
* Return the {@link Runnable} which is ready to be executed with the given {@code nanoTime}.
* You should use {@link #nanoTime()} to retrieve the correct {@code nanoTime}.
* You should use {@link #getCurrentTimeNanos()} to retrieve the correct {@code nanoTime}.
*/
protected final Runnable pollScheduledTask(long nanoTime) {
assert inEventLoop();
Expand Down Expand Up @@ -162,7 +190,7 @@ final ScheduledFutureTask<?> peekScheduledTask() {
*/
protected final boolean hasScheduledTasks() {
ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
return scheduledTask != null && scheduledTask.deadlineNanos() <= nanoTime();
return scheduledTask != null && scheduledTask.deadlineNanos() <= getCurrentTimeNanos();
}

@Override
Expand All @@ -177,7 +205,7 @@ public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
return schedule(new ScheduledFutureTask<Void>(
this,
command,
deadlineNanos(unit.toNanos(delay))));
deadlineNanos(getCurrentTimeNanos(), unit.toNanos(delay))));
}

@Override
Expand All @@ -189,7 +217,8 @@ public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUni
}
validateScheduled0(delay, unit);

return schedule(new ScheduledFutureTask<V>(this, callable, deadlineNanos(unit.toNanos(delay))));
return schedule(new ScheduledFutureTask<V>(
this, callable, deadlineNanos(getCurrentTimeNanos(), unit.toNanos(delay))));
}

@Override
Expand All @@ -208,7 +237,7 @@ public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDela
validateScheduled0(period, unit);

return schedule(new ScheduledFutureTask<Void>(
this, command, deadlineNanos(unit.toNanos(initialDelay)), unit.toNanos(period)));
this, command, deadlineNanos(getCurrentTimeNanos(), unit.toNanos(initialDelay)), unit.toNanos(period)));
}

@Override
Expand All @@ -228,7 +257,7 @@ public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialD
validateScheduled0(delay, unit);

return schedule(new ScheduledFutureTask<Void>(
this, command, deadlineNanos(unit.toNanos(initialDelay)), -unit.toNanos(delay)));
this, command, deadlineNanos(getCurrentTimeNanos(), unit.toNanos(initialDelay)), -unit.toNanos(delay)));
}

@SuppressWarnings("deprecation")
Expand Down Expand Up @@ -291,7 +320,7 @@ final void removeScheduled(final ScheduledFutureTask<?> task) {
* to wake the {@link EventExecutor} thread if required.
*
* @param deadlineNanos deadline of the to-be-scheduled task
* relative to {@link AbstractScheduledEventExecutor#nanoTime()}
* relative to {@link AbstractScheduledEventExecutor#getCurrentTimeNanos()}
* @return {@code true} if the {@link EventExecutor} thread should be woken, {@code false} otherwise
*/
protected boolean beforeScheduledTaskSubmitted(long deadlineNanos) {
Expand All @@ -301,7 +330,7 @@ protected boolean beforeScheduledTaskSubmitted(long deadlineNanos) {
/**
* See {@link #beforeScheduledTaskSubmitted(long)}. Called only after that method returns false.
*
* @param deadlineNanos relative to {@link AbstractScheduledEventExecutor#nanoTime()}
* @param deadlineNanos relative to {@link AbstractScheduledEventExecutor#getCurrentTimeNanos()}
* @return {@code true} if the {@link EventExecutor} thread should be woken, {@code false} otherwise
*/
protected boolean afterScheduledTaskSubmitted(long deadlineNanos) {
Expand Down
Expand Up @@ -53,7 +53,12 @@ public final class GlobalEventExecutor extends AbstractScheduledEventExecutor im
public void run() {
// NOOP
}
}, null), ScheduledFutureTask.deadlineNanos(SCHEDULE_QUIET_PERIOD_INTERVAL), -SCHEDULE_QUIET_PERIOD_INTERVAL);
}, null),
// note: the getCurrentTimeNanos() call here only works because this is a final class, otherwise the method
// could be overridden leading to unsafe initialization here!
deadlineNanos(getCurrentTimeNanos(), SCHEDULE_QUIET_PERIOD_INTERVAL),
-SCHEDULE_QUIET_PERIOD_INTERVAL
);

// because the GlobalEventExecutor is a singleton, tasks submitted to it can come from arbitrary threads and this
// can trigger the creation of a thread from arbitrary thread groups; for this reason, the thread factory must not
Expand Down Expand Up @@ -117,7 +122,7 @@ Runnable takeTask() {
}

private void fetchFromScheduledTaskQueue() {
long nanoTime = AbstractScheduledEventExecutor.nanoTime();
long nanoTime = getCurrentTimeNanos();
Runnable scheduledTask = pollScheduledTask(nanoTime);
while (scheduledTask != null) {
taskQueue.add(scheduledTask);
Expand Down
Expand Up @@ -25,22 +25,6 @@

@SuppressWarnings("ComparableImplementedButEqualsNotOverridden")
final class ScheduledFutureTask<V> extends PromiseTask<V> implements ScheduledFuture<V>, PriorityQueueNode {
private static final long START_TIME = System.nanoTime();

static long nanoTime() {
return System.nanoTime() - START_TIME;
}

static long deadlineNanos(long delay) {
long deadlineNanos = nanoTime() + delay;
// Guard against overflow
return deadlineNanos < 0 ? Long.MAX_VALUE : deadlineNanos;
}

static long initialNanoTime() {
return START_TIME;
}

// set once when added to priority queue
private long id;

Expand Down Expand Up @@ -109,22 +93,21 @@ void setConsumed() {
// Optimization to avoid checking system clock again
// after deadline has passed and task has been dequeued
if (periodNanos == 0) {
assert nanoTime() >= deadlineNanos;
assert scheduledExecutor().getCurrentTimeNanos() >= deadlineNanos;
deadlineNanos = 0L;
}
}

public long delayNanos() {
return deadlineToDelayNanos(deadlineNanos());
return delayNanos(scheduledExecutor().getCurrentTimeNanos());
}

static long deadlineToDelayNanos(long deadlineNanos) {
return deadlineNanos == 0L ? 0L : Math.max(0L, deadlineNanos - nanoTime());
static long deadlineToDelayNanos(long currentTimeNanos, long deadlineNanos) {
return deadlineNanos == 0L ? 0L : Math.max(0L, deadlineNanos - currentTimeNanos);
}

public long delayNanos(long currentTimeNanos) {
return deadlineNanos == 0L ? 0L
: Math.max(0L, deadlineNanos() - (currentTimeNanos - START_TIME));
return deadlineToDelayNanos(currentTimeNanos, deadlineNanos);
}

@Override
Expand Down Expand Up @@ -178,7 +161,7 @@ public void run() {
if (periodNanos > 0) {
deadlineNanos += periodNanos;
} else {
deadlineNanos = nanoTime() - periodNanos;
deadlineNanos = scheduledExecutor().getCurrentTimeNanos() - periodNanos;
}
if (!isCancelled()) {
scheduledExecutor().scheduledTaskQueue().add(this);
Expand Down
Expand Up @@ -280,7 +280,7 @@ private boolean fetchFromScheduledTaskQueue() {
if (scheduledTaskQueue == null || scheduledTaskQueue.isEmpty()) {
return true;
}
long nanoTime = AbstractScheduledEventExecutor.nanoTime();
long nanoTime = getCurrentTimeNanos();
for (;;) {
Runnable scheduledTask = pollScheduledTask(nanoTime);
if (scheduledTask == null) {
Expand All @@ -301,7 +301,7 @@ private boolean executeExpiredScheduledTasks() {
if (scheduledTaskQueue == null || scheduledTaskQueue.isEmpty()) {
return false;
}
long nanoTime = AbstractScheduledEventExecutor.nanoTime();
long nanoTime = getCurrentTimeNanos();
Runnable scheduledTask = pollScheduledTask(nanoTime);
if (scheduledTask == null) {
return false;
Expand Down Expand Up @@ -378,7 +378,7 @@ protected boolean runAllTasks() {
} while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.

if (ranAtLeastOne) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
lastExecutionTime = getCurrentTimeNanos();
}
afterRunningAllTasks();
return ranAtLeastOne;
Expand All @@ -403,7 +403,7 @@ protected final boolean runScheduledAndExecutorTasks(final int maxDrainAttempts)
} while (ranAtLeastOneTask && ++drainAttempt < maxDrainAttempts);

if (drainAttempt > 0) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
lastExecutionTime = getCurrentTimeNanos();
}
afterRunningAllTasks();

Expand Down Expand Up @@ -463,7 +463,7 @@ protected boolean runAllTasks(long timeoutNanos) {
return false;
}

final long deadline = timeoutNanos > 0 ? ScheduledFutureTask.nanoTime() + timeoutNanos : 0;
final long deadline = timeoutNanos > 0 ? getCurrentTimeNanos() + timeoutNanos : 0;
long runTasks = 0;
long lastExecutionTime;
for (;;) {
Expand All @@ -474,15 +474,15 @@ protected boolean runAllTasks(long timeoutNanos) {
// Check timeout every 64 tasks because nanoTime() is relatively expensive.
// XXX: Hard-coded value - will make it configurable if it is really a problem.
if ((runTasks & 0x3F) == 0) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
lastExecutionTime = getCurrentTimeNanos();
if (lastExecutionTime >= deadline) {
break;
}
}

task = pollTask();
if (task == null) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
lastExecutionTime = getCurrentTimeNanos();
break;
}
}
Expand All @@ -502,6 +502,8 @@ protected void afterRunningAllTasks() { }
* Returns the amount of time left until the scheduled task with the closest dead line is executed.
*/
protected long delayNanos(long currentTimeNanos) {
currentTimeNanos -= initialNanoTime();

ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
if (scheduledTask == null) {
return SCHEDULE_PURGE_INTERVAL;
Expand All @@ -511,14 +513,14 @@ protected long delayNanos(long currentTimeNanos) {
}

/**
* Returns the absolute point in time (relative to {@link #nanoTime()}) at which the next
* Returns the absolute point in time (relative to {@link #getCurrentTimeNanos()}) at which the next
* closest scheduled task should run.
*/
@UnstableApi
protected long deadlineNanos() {
ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
if (scheduledTask == null) {
return nanoTime() + SCHEDULE_PURGE_INTERVAL;
return getCurrentTimeNanos() + SCHEDULE_PURGE_INTERVAL;
}
return scheduledTask.deadlineNanos();
}
Expand All @@ -531,7 +533,7 @@ protected long deadlineNanos() {
* checks.
*/
protected void updateLastExecutionTime() {
lastExecutionTime = ScheduledFutureTask.nanoTime();
lastExecutionTime = getCurrentTimeNanos();
}

/**
Expand Down Expand Up @@ -609,7 +611,7 @@ private boolean runShutdownHooks() {
}

if (ran) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
lastExecutionTime = getCurrentTimeNanos();
}

return ran;
Expand Down Expand Up @@ -755,7 +757,7 @@ protected boolean confirmShutdown() {
cancelScheduledTasks();

if (gracefulShutdownStartTime == 0) {
gracefulShutdownStartTime = ScheduledFutureTask.nanoTime();
gracefulShutdownStartTime = getCurrentTimeNanos();
}

if (runAllTasks() || runShutdownHooks()) {
Expand All @@ -774,7 +776,7 @@ protected boolean confirmShutdown() {
return false;
}

final long nanoTime = ScheduledFutureTask.nanoTime();
final long nanoTime = getCurrentTimeNanos();

if (isShutdown() || nanoTime - gracefulShutdownStartTime > gracefulShutdownTimeout) {
return true;
Expand Down
Expand Up @@ -15,6 +15,7 @@
*/
package io.netty.util.concurrent;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.function.Executable;

Expand Down Expand Up @@ -117,6 +118,12 @@ public void execute() {
});
}

@Test
public void testDeadlineNanosNotOverflow() {
Assertions.assertEquals(Long.MAX_VALUE, AbstractScheduledEventExecutor.deadlineNanos(
AbstractScheduledEventExecutor.defaultCurrentTimeNanos(), Long.MAX_VALUE));
}

private static final class TestScheduledEventExecutor extends AbstractScheduledEventExecutor {
@Override
public boolean isShuttingDown() {
Expand Down