Skip to content

Commit

Permalink
Reinstate decoupled timerfd logic in EpollEventLoop
Browse files Browse the repository at this point in the history
Motivation

This reinstates the parts of 1fa7a5e
and a22d4ba that were reverted in
7f39142 (applies to EpollEventLoop
class only), with some minor changes.

It sounds like this still exhibits some testsuite failures even with the
lazySet fixes, but I thought it would be good to have a PR with the
"latest" version to continue to test/debug.

Modification

Changes are the same as before apart a few added comments and the
following in EpollEventLoop#checkScheduleTaskQueueForNewDelay:
- Use set instead of lazySet to avoid race condition
- Avoid arming timer and entering epoll wait if a task has already
expired
- Switch order of calling setTimerFd and updating the nextDeadlineNanos
AtomicLong within the sync block

The latter two are perf refinements and should not have contributed to
the issues seen.

Result

Performance benefits from having timerfd updates decoupled from the
event loop, as originally introduced by @Scottmitch
  • Loading branch information
njhill committed Sep 23, 2019
1 parent 76592db commit 125dc27
Showing 1 changed file with 128 additions and 67 deletions.
Expand Up @@ -36,7 +36,9 @@
import java.util.BitSet;
import java.util.Queue;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import static java.lang.Math.min;

Expand All @@ -52,8 +54,17 @@ class EpollEventLoop extends SingleThreadEventLoop {
Epoll.ensureAvailability();
}

// Pick a number that no task could have previously used.
private long prevDeadlineNanos = nanoTime() - 1;
/**
* When in epollWait(), this mirrors the currently-set deadline of the timerFd. A negative value
* means that the event loop is awake, which blocks rescheduling activity by other threads.
* It is restored to the real timerFd expiry time again prior to entering epollWait().
*
* Note that we use deadline instead of delay because deadline is just a fixed number but delay requires interacting
* with the time source (e.g. calling System.nanoTime()) which can be expensive.
*/
private final AtomicLong nextDeadlineNanos = new AtomicLong(-1L);
private final AtomicInteger wakenUp = new AtomicInteger(1);
private boolean pendingWakeup;
private final FileDescriptor epollFd;
private final FileDescriptor eventFd;
private final FileDescriptor timerFd;
Expand All @@ -74,12 +85,6 @@ public int get() throws Exception {
return epollWaitNow();
}
};
private final AtomicInteger wakenUp = new AtomicInteger(1);
private boolean pendingWakeup;
private volatile int ioRatio = 50;

// See http://man7.org/linux/man-pages/man2/timerfd_create.2.html.
private static final long MAX_SCHEDULED_TIMERFD_NS = 999999999;

EpollEventLoop(EventLoopGroup parent, Executor executor, int maxEvents,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
Expand Down Expand Up @@ -176,6 +181,91 @@ NativeDatagramPacketArray cleanDatagramPacketArray() {
return datagramPacketArray;
}

@Override
protected boolean beforeScheduledTaskSubmitted(long deadlineNanos) {
return false; // don't wake event loop
}

@Override
protected boolean afterScheduledTaskSubmitted(long deadlineNanos) {
try {
trySetTimerFd(deadlineNanos);
} catch (IOException e) {
throw new RejectedExecutionException(e);
}
return false; // don't wake event loop
}

@Override
protected boolean runAllTasks() {
// This method is overridden to ensure that all the expired scheduled tasks are executed during shutdown, and
// any other execute all scenarios in the base class.
return runScheduledAndExecutorTasks(4);
}

// Called only from *outside* the event loop
private void trySetTimerFd(long candidateNextDeadline) throws IOException {
for (;;) {
long nextDeadline = nextDeadlineNanos.get();
if (nextDeadline <= candidateNextDeadline) {
// This includes case where nextDeadline is negative (event loop is awake)
return;
}
if (nextDeadlineNanos.compareAndSet(nextDeadline, candidateNextDeadline)) {
// We must serialize calls to setTimerFd to avoid the set of a later deadline
// racing with a sooner one and overwriting it. A second check of nextDeadlineNanos
// is made within the sync block to avoid having the CAS within the sync
synchronized (nextDeadlineNanos) {
nextDeadline = nextDeadlineNanos.get();
if (nextDeadline == candidateNextDeadline ||
(nextDeadline + Long.MAX_VALUE + 1) == candidateNextDeadline) {
setTimerFd(deadlineToDelayNanos(candidateNextDeadline));
}
}
return;
}
}
}

// Called only within synchronized(nextDeadlineNanos)
private void setTimerFd(long candidateNextDelayNanos) throws IOException {
if (candidateNextDelayNanos > 0) {
final int delaySeconds = (int) min(candidateNextDelayNanos / 1000000000L, Integer.MAX_VALUE);
final int delayNanos = (int) min(candidateNextDelayNanos - delaySeconds * 1000000000L, Integer.MAX_VALUE);
Native.timerFdSetTime(timerFd.intValue(), delaySeconds, delayNanos);
} else {
// Setting the timer to 0, 0 will disarm it, so we have a few options:
// 1. Set the timer wakeup to 1ns (1 system call).
// 2. Use the eventFd to force a wakeup and disarm the timer (2 system calls).
// For now we are using option (1) because there are less system calls, and we will correctly reset the
// nextDeadlineNanos state when the EventLoop processes the timer wakeup.
Native.timerFdSetTime(timerFd.intValue(), 0, 1);
}
}

// Called only from the event loop
private boolean checkScheduleTaskQueueForNewDelay(long timerFdDeadline) throws IOException {
assert nextDeadlineNanos.get() < 0;
final long nextTaskDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (nextTaskDeadlineNanos == -1L || nextTaskDeadlineNanos >= timerFdDeadline) {
// Just restore to preexisting timerFd value, update not needed
nextDeadlineNanos.set(timerFdDeadline);
} else {
synchronized (nextDeadlineNanos) {
long delayNanos = deadlineToDelayNanos(nextTaskDeadlineNanos);
if (delayNanos <= 0L) {
// Abort since the deadline already passed (don't enter epollWait
// and just continue the EL to pick up the now-expired task)
return false;
}
// Shorter delay required than current timerFd setting, update it
setTimerFd(delayNanos);
nextDeadlineNanos.set(nextTaskDeadlineNanos);
}
}
return true;
}

@Override
protected void wakeup(boolean inEventLoop) {
if (!inEventLoop && wakenUp.getAndSet(1) == 0) {
Expand Down Expand Up @@ -265,43 +355,13 @@ private static Queue<Runnable> newTaskQueue0(int maxPendingTasks) {
: PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
}

/**
* Returns the percentage of the desired amount of time spent for I/O in the event loop.
*/
public int getIoRatio() {
return ioRatio;
}

/**
* Sets the percentage of the desired amount of time spent for I/O in the event loop. The default value is
* {@code 50}, which means the event loop will try to spend the same amount of time for I/O as for non-I/O tasks.
*/
public void setIoRatio(int ioRatio) {
if (ioRatio <= 0 || ioRatio > 100) {
throw new IllegalArgumentException("ioRatio: " + ioRatio + " (expected: 0 < ioRatio <= 100)");
}
this.ioRatio = ioRatio;
}

@Override
public int registeredChannels() {
return channels.size();
}

private int epollWait() throws IOException {
int delaySeconds;
int delayNanos;
long curDeadlineNanos = deadlineNanos();
if (curDeadlineNanos == prevDeadlineNanos) {
delaySeconds = -1;
delayNanos = -1;
} else {
long totalDelay = delayNanos(System.nanoTime());
prevDeadlineNanos = curDeadlineNanos;
delaySeconds = (int) min(totalDelay / 1000000000L, Integer.MAX_VALUE);
delayNanos = (int) min(totalDelay - delaySeconds * 1000000000L, MAX_SCHEDULED_TIMERFD_NS);
}
return Native.epollWait(epollFd, events, timerFd, delaySeconds, delayNanos);
return Native.epollWait(epollFd, events, false);
}

private int epollWaitNow() throws IOException {
Expand All @@ -319,6 +379,7 @@ private int epollWaitTimeboxed() throws IOException {

@Override
protected void run() {
long timerFdDeadline = Long.MAX_VALUE;
for (;;) {
try {
processPendingChannelFlags();
Expand Down Expand Up @@ -348,10 +409,14 @@ protected void run() {
}
// fall-through
}

boolean deadlineFieldReset = false;
wakenUp.set(0);
try {
if (!hasTasks()) {
// When we are in the EventLoop we don't bother setting the timerFd for each
// scheduled task, but instead defer the processing until the end of the EventLoop
// (next wait) to reduce the timerFd modifications.
deadlineFieldReset = checkScheduleTaskQueueForNewDelay(timerFdDeadline);
if (deadlineFieldReset && !hasTasks()) {
strategy = epollWait();
}
} finally {
Expand All @@ -360,33 +425,28 @@ protected void run() {
if (wakenUp.get() == 1 || wakenUp.getAndSet(1) == 1) {
pendingWakeup = true;
}
if (deadlineFieldReset) {
// This getAndAdd will change the raw value of nextDeadlineNanos to be negative
// which will block any *new* timerFd mods by other threads while also "preserving"
// its last value to avoid disrupting a possibly-concurrent setTimerFd call
// (so that we can know the timerFd really did/will get updated to the read value).
timerFdDeadline = nextDeadlineNanos.getAndAdd(Long.MAX_VALUE + 1);
// The value of nextDeadlineNanos is now guaranteed to be negative
}
}
// fallthrough
default:
}

final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
if (strategy > 0) {
processReady(events, strategy);
}
} finally {
// Ensure we always run tasks.
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();

try {
if (strategy > 0) {
processReady(events, strategy);
}
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
try {
if (processReady(events, strategy)) {
// Polled events include timerFd expiry; conservatively assume that no timer is set
timerFdDeadline = Long.MAX_VALUE;
}
} finally {
runAllTasks();
// No need to drainScheduledQueue() after the fact, because all in event loop scheduling results
// in direct addition to the scheduled priority queue.
}
if (allowGrowing && strategy == events.length()) {
//increase the size of the array as we needed the whole space for the events
Expand Down Expand Up @@ -440,15 +500,15 @@ private void closeAll() {
}
}

private void processReady(EpollEventArray events, int ready) {
for (int i = 0; i < ready; i ++) {
// Returns true if a timerFd event was encountered
private boolean processReady(EpollEventArray events, int ready) {
boolean timerFired = false;
for (int i = 0; i < ready; ++i) {
final int fd = events.fd(i);
if (fd == eventFd.intValue()) {
pendingWakeup = false;
} else if (fd == timerFd.intValue()) {
// Just ignore as we use ET mode for the eventfd and timerfd.
//
// See also https://stackoverflow.com/a/12492308/1074097
timerFired = true;
} else {
final long ev = events.events(i);

Expand Down Expand Up @@ -502,6 +562,7 @@ private void processReady(EpollEventArray events, int ready) {
}
}
}
return timerFired;
}

@Override
Expand Down

0 comments on commit 125dc27

Please sign in to comment.