Skip to content

Commit

Permalink
Avoid unnecessary epoll event loop wake-ups (#9605)
Browse files Browse the repository at this point in the history
Motivation

The recently-introduced event loop scheduling hooks can be exploited by
the epoll transport to avoid waking the event loop when scheduling
future tasks if there is a timer already set to wake up sooner.

There is also a "default" timeout which will wake the event
loop after 1 second if there are no pending future tasks. The
performance impact of these wakeups themselves is likely negligible but
there's significant overhead in having to re-arm the timer every time
the event loop goes to sleep (see #7816). It's not 100% clear why this
timeout was there originally but we're sure it's no longer needed.

Modification

Combine the existing volatile wakenUp and non-volatile prevDeadlineNanos
fields into a single AtomicLong that stores the next scheduled wakeup
time while the event loop is in epoll_wait, and is -1 while it is awake.

Use this as a guard to debounce wakeups from both immediate scheduled
tasks and future scheduled tasks, the latter using the new
before/afterScheduledTaskSubmitted overrides and based on whether the
new deadline occurs prior to an already-scheduled timer.

A similar optimization was already added to NioEventLoop, but it still
uses two separate volatiles. We should consider similar streamlining of
that in a future update.

Result

Fewer event loop wakeups when scheduling future tasks, greatly reduced
overhead when no future tasks are scheduled.
  • Loading branch information
njhill authored and normanmaurer committed Oct 12, 2019
1 parent ec8d43c commit 166caf9
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 32 deletions.
4 changes: 0 additions & 4 deletions transport-native-epoll/src/main/c/netty_epoll_native.c
Original file line number Diff line number Diff line change
Expand Up @@ -212,10 +212,6 @@ static jint netty_epoll_native_epollWait(JNIEnv* env, jclass clazz, jint efd, jl

// This method is deprecated!
static jint netty_epoll_native_epollWait0(JNIEnv* env, jclass clazz, jint efd, jlong address, jint len, jint timerFd, jint tvSec, jint tvNsec) {
if (tvSec == 0 && tvNsec == 0) {
// Zeros = poll (aka return immediately).
return netty_epoll_native_epollWait(env, clazz, efd, address, len, 0);
}
// only reschedule the timer if there is a newer event.
// -1 is a special value used by EpollEventLoop.
if (tvSec != ((jint) -1) && tvNsec != ((jint) -1)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import java.util.BitSet;
import java.util.Queue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import static java.lang.Math.min;

Expand All @@ -52,8 +52,6 @@ class EpollEventLoop extends SingleThreadEventLoop {
Epoll.ensureAvailability();
}

// Pick a number that no task could have previously used.
private long prevDeadlineNanos = nanoTime() - 1;
private final FileDescriptor epollFd;
private final FileDescriptor eventFd;
private final FileDescriptor timerFd;
Expand All @@ -74,7 +72,12 @@ public int get() throws Exception {
return epollWaitNow();
}
};
private final AtomicInteger wakenUp = new AtomicInteger(1);

// nextWakeupNanos is:
// -1 when EL is awake
// Long.MAX_VALUE when EL is waiting with no wakeup scheduled
// other value T when EL is waiting with wakeup scheduled at time T
private final AtomicLong nextWakeupNanos = new AtomicLong(-1L);
private boolean pendingWakeup;
private volatile int ioRatio = 50;

Expand Down Expand Up @@ -178,12 +181,24 @@ NativeDatagramPacketArray cleanDatagramPacketArray() {

@Override
protected void wakeup(boolean inEventLoop) {
if (!inEventLoop && wakenUp.getAndSet(1) == 0) {
if (!inEventLoop && nextWakeupNanos.getAndSet(-1L) != -1L) {
// write to the evfd which will then wake-up epoll_wait(...)
Native.eventFdWrite(eventFd.intValue(), 1L);
}
}

@Override
protected boolean beforeScheduledTaskSubmitted(long deadlineNanos) {
// Note this is also correct for the nextWakeupNanos == -1 case
return deadlineNanos < nextWakeupNanos.get();
}

@Override
protected boolean afterScheduledTaskSubmitted(long deadlineNanos) {
// Note this is also correct for the nextWakeupNanos == -1 case
return deadlineNanos < nextWakeupNanos.get();
}

/**
* Register the given epoll with this {@link EventLoop}.
*/
Expand Down Expand Up @@ -288,22 +303,20 @@ public int registeredChannels() {
return channels.size();
}

private int epollWait() throws IOException {
int delaySeconds;
int delayNanos;
long curDeadlineNanos = deadlineNanos();
if (curDeadlineNanos == prevDeadlineNanos) {
delaySeconds = -1;
delayNanos = -1;
} else {
long totalDelay = delayNanos(System.nanoTime());
prevDeadlineNanos = curDeadlineNanos;
delaySeconds = (int) min(totalDelay / 1000000000L, Integer.MAX_VALUE);
delayNanos = (int) min(totalDelay - delaySeconds * 1000000000L, MAX_SCHEDULED_TIMERFD_NS);
private int epollWait(long deadlineNanos) throws IOException {
if (deadlineNanos == Long.MAX_VALUE) {
return Native.epollWait(epollFd, events, timerFd, Integer.MAX_VALUE, 0); // disarm timer
}
long totalDelay = deadlineToDelayNanos(deadlineNanos);
int delaySeconds = (int) min(totalDelay / 1000000000L, Integer.MAX_VALUE);
int delayNanos = (int) min(totalDelay - delaySeconds * 1000000000L, MAX_SCHEDULED_TIMERFD_NS);
return Native.epollWait(epollFd, events, timerFd, delaySeconds, delayNanos);
}

private int epollWaitNoTimerChange() throws IOException {
return Native.epollWait(epollFd, events, false);
}

private int epollWaitNow() throws IOException {
return Native.epollWait(epollFd, events, true);
}
Expand All @@ -319,6 +332,7 @@ private int epollWaitTimeboxed() throws IOException {

@Override
protected void run() {
long prevDeadlineNanos = Long.MAX_VALUE;
for (;;) {
try {
processPendingChannelFlags();
Expand Down Expand Up @@ -349,15 +363,26 @@ protected void run() {
// fall-through
}

wakenUp.set(0);
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L) {
curDeadlineNanos = Long.MAX_VALUE; // nothing on the calendar
}
nextWakeupNanos.set(curDeadlineNanos);
try {
if (!hasTasks()) {
strategy = epollWait();
if (curDeadlineNanos == prevDeadlineNanos) {
// No timer activity needed
strategy = epollWaitNoTimerChange();
} else {
// Timerfd needs to be re-armed or disarmed
prevDeadlineNanos = curDeadlineNanos;
strategy = epollWait(curDeadlineNanos);
}
}
} finally {
// Try get() first to avoid much more expensive CAS in the case we
// were woken via the wakeup() method (submitted task)
if (wakenUp.get() == 1 || wakenUp.getAndSet(1) == 1) {
if (nextWakeupNanos.get() == -1L || nextWakeupNanos.getAndSet(-1L) == -1L) {
pendingWakeup = true;
}
}
Expand All @@ -368,8 +393,8 @@ protected void run() {
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
if (strategy > 0) {
processReady(events, strategy);
if (strategy > 0 && processReady(events, strategy)) {
prevDeadlineNanos = Long.MAX_VALUE;
}
} finally {
// Ensure we always run tasks.
Expand All @@ -379,8 +404,8 @@ protected void run() {
final long ioStartTime = System.nanoTime();

try {
if (strategy > 0) {
processReady(events, strategy);
if (strategy > 0 && processReady(events, strategy)) {
prevDeadlineNanos = Long.MAX_VALUE;
}
} finally {
// Ensure we always run tasks.
Expand Down Expand Up @@ -434,15 +459,15 @@ private void closeAll() {
}
}

private void processReady(EpollEventArray events, int ready) {
// Returns true if a timerFd event was encountered
private boolean processReady(EpollEventArray events, int ready) {
boolean timerFired = false;
for (int i = 0; i < ready; i ++) {
final int fd = events.fd(i);
if (fd == eventFd.intValue()) {
pendingWakeup = false;
} else if (fd == timerFd.intValue()) {
// Just ignore as we use ET mode for the eventfd and timerfd.
//
// See also https://stackoverflow.com/a/12492308/1074097
timerFired = true;
} else {
final long ev = events.events(i);

Expand Down Expand Up @@ -496,6 +521,7 @@ private void processReady(EpollEventArray events, int ready) {
}
}
}
return timerFired;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,15 @@ public static FileDescriptor newEpollCreate() {
@Deprecated
public static int epollWait(FileDescriptor epollFd, EpollEventArray events, FileDescriptor timerFd,
int timeoutSec, int timeoutNs) throws IOException {
if (timeoutSec == 0 && timeoutNs == 0) {
// Zero timeout => poll (aka return immediately)
return epollWait(epollFd, events, 0);
}
if (timeoutSec == Integer.MAX_VALUE) {
// Max timeout => wait indefinitely: disarm timerfd first
timeoutSec = 0;
timeoutNs = 0;
}
int ready = epollWait0(epollFd.intValue(), events.memoryAddress(), events.length(), timerFd.intValue(),
timeoutSec, timeoutNs);
if (ready < 0) {
Expand Down

0 comments on commit 166caf9

Please sign in to comment.