From 125dc27aaa7e3f46884cab2cc671ebdf20505498 Mon Sep 17 00:00:00 2001 From: nickhill Date: Fri, 20 Sep 2019 12:11:46 -0700 Subject: [PATCH] Reinstate decoupled timerfd logic in EpollEventLoop Motivation This reinstates the parts of 1fa7a5e697825bdd2f5ad7885b64749ede5c3192 and a22d4ba859b115d353b4cea1af581b987249adf6 that were reverted in 7f391426a2179d3c68e3a116567aa9a0aaf0524c (applies to EpollEventLoop class only), with some minor changes. It sounds like this still exhibits some testsuite failures even with the lazySet fixes, but I thought it would be good to have a PR with the "latest" version to continue to test/debug. Modification Changes are the same as before apart a few added comments and the following in EpollEventLoop#checkScheduleTaskQueueForNewDelay: - Use set instead of lazySet to avoid race condition - Avoid arming timer and entering epoll wait if a task has already expired - Switch order of calling setTimerFd and updating the nextDeadlineNanos AtomicLong within the sync block The latter two are perf refinements and should not have contributed to the issues seen. Result Performance benefits from having timerfd updates decoupled from the event loop, as originally introduced by @Scottmitch --- .../netty/channel/epoll/EpollEventLoop.java | 195 ++++++++++++------ 1 file changed, 128 insertions(+), 67 deletions(-) diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java index 40c25ac6335..69b46dfbe8e 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java @@ -36,7 +36,9 @@ import java.util.BitSet; import java.util.Queue; import java.util.concurrent.Executor; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import static java.lang.Math.min; @@ -52,8 +54,17 @@ class EpollEventLoop extends SingleThreadEventLoop { Epoll.ensureAvailability(); } - // Pick a number that no task could have previously used. - private long prevDeadlineNanos = nanoTime() - 1; + /** + * When in epollWait(), this mirrors the currently-set deadline of the timerFd. A negative value + * means that the event loop is awake, which blocks rescheduling activity by other threads. + * It is restored to the real timerFd expiry time again prior to entering epollWait(). + * + * Note that we use deadline instead of delay because deadline is just a fixed number but delay requires interacting + * with the time source (e.g. calling System.nanoTime()) which can be expensive. + */ + private final AtomicLong nextDeadlineNanos = new AtomicLong(-1L); + private final AtomicInteger wakenUp = new AtomicInteger(1); + private boolean pendingWakeup; private final FileDescriptor epollFd; private final FileDescriptor eventFd; private final FileDescriptor timerFd; @@ -74,12 +85,6 @@ public int get() throws Exception { return epollWaitNow(); } }; - private final AtomicInteger wakenUp = new AtomicInteger(1); - private boolean pendingWakeup; - private volatile int ioRatio = 50; - - // See http://man7.org/linux/man-pages/man2/timerfd_create.2.html. - private static final long MAX_SCHEDULED_TIMERFD_NS = 999999999; EpollEventLoop(EventLoopGroup parent, Executor executor, int maxEvents, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler, @@ -176,6 +181,91 @@ NativeDatagramPacketArray cleanDatagramPacketArray() { return datagramPacketArray; } + @Override + protected boolean beforeScheduledTaskSubmitted(long deadlineNanos) { + return false; // don't wake event loop + } + + @Override + protected boolean afterScheduledTaskSubmitted(long deadlineNanos) { + try { + trySetTimerFd(deadlineNanos); + } catch (IOException e) { + throw new RejectedExecutionException(e); + } + return false; // don't wake event loop + } + + @Override + protected boolean runAllTasks() { + // This method is overridden to ensure that all the expired scheduled tasks are executed during shutdown, and + // any other execute all scenarios in the base class. + return runScheduledAndExecutorTasks(4); + } + + // Called only from *outside* the event loop + private void trySetTimerFd(long candidateNextDeadline) throws IOException { + for (;;) { + long nextDeadline = nextDeadlineNanos.get(); + if (nextDeadline <= candidateNextDeadline) { + // This includes case where nextDeadline is negative (event loop is awake) + return; + } + if (nextDeadlineNanos.compareAndSet(nextDeadline, candidateNextDeadline)) { + // We must serialize calls to setTimerFd to avoid the set of a later deadline + // racing with a sooner one and overwriting it. A second check of nextDeadlineNanos + // is made within the sync block to avoid having the CAS within the sync + synchronized (nextDeadlineNanos) { + nextDeadline = nextDeadlineNanos.get(); + if (nextDeadline == candidateNextDeadline || + (nextDeadline + Long.MAX_VALUE + 1) == candidateNextDeadline) { + setTimerFd(deadlineToDelayNanos(candidateNextDeadline)); + } + } + return; + } + } + } + + // Called only within synchronized(nextDeadlineNanos) + private void setTimerFd(long candidateNextDelayNanos) throws IOException { + if (candidateNextDelayNanos > 0) { + final int delaySeconds = (int) min(candidateNextDelayNanos / 1000000000L, Integer.MAX_VALUE); + final int delayNanos = (int) min(candidateNextDelayNanos - delaySeconds * 1000000000L, Integer.MAX_VALUE); + Native.timerFdSetTime(timerFd.intValue(), delaySeconds, delayNanos); + } else { + // Setting the timer to 0, 0 will disarm it, so we have a few options: + // 1. Set the timer wakeup to 1ns (1 system call). + // 2. Use the eventFd to force a wakeup and disarm the timer (2 system calls). + // For now we are using option (1) because there are less system calls, and we will correctly reset the + // nextDeadlineNanos state when the EventLoop processes the timer wakeup. + Native.timerFdSetTime(timerFd.intValue(), 0, 1); + } + } + + // Called only from the event loop + private boolean checkScheduleTaskQueueForNewDelay(long timerFdDeadline) throws IOException { + assert nextDeadlineNanos.get() < 0; + final long nextTaskDeadlineNanos = nextScheduledTaskDeadlineNanos(); + if (nextTaskDeadlineNanos == -1L || nextTaskDeadlineNanos >= timerFdDeadline) { + // Just restore to preexisting timerFd value, update not needed + nextDeadlineNanos.set(timerFdDeadline); + } else { + synchronized (nextDeadlineNanos) { + long delayNanos = deadlineToDelayNanos(nextTaskDeadlineNanos); + if (delayNanos <= 0L) { + // Abort since the deadline already passed (don't enter epollWait + // and just continue the EL to pick up the now-expired task) + return false; + } + // Shorter delay required than current timerFd setting, update it + setTimerFd(delayNanos); + nextDeadlineNanos.set(nextTaskDeadlineNanos); + } + } + return true; + } + @Override protected void wakeup(boolean inEventLoop) { if (!inEventLoop && wakenUp.getAndSet(1) == 0) { @@ -265,43 +355,13 @@ private static Queue newTaskQueue0(int maxPendingTasks) { : PlatformDependent.newMpscQueue(maxPendingTasks); } - /** - * Returns the percentage of the desired amount of time spent for I/O in the event loop. - */ - public int getIoRatio() { - return ioRatio; - } - - /** - * Sets the percentage of the desired amount of time spent for I/O in the event loop. The default value is - * {@code 50}, which means the event loop will try to spend the same amount of time for I/O as for non-I/O tasks. - */ - public void setIoRatio(int ioRatio) { - if (ioRatio <= 0 || ioRatio > 100) { - throw new IllegalArgumentException("ioRatio: " + ioRatio + " (expected: 0 < ioRatio <= 100)"); - } - this.ioRatio = ioRatio; - } - @Override public int registeredChannels() { return channels.size(); } private int epollWait() throws IOException { - int delaySeconds; - int delayNanos; - long curDeadlineNanos = deadlineNanos(); - if (curDeadlineNanos == prevDeadlineNanos) { - delaySeconds = -1; - delayNanos = -1; - } else { - long totalDelay = delayNanos(System.nanoTime()); - prevDeadlineNanos = curDeadlineNanos; - delaySeconds = (int) min(totalDelay / 1000000000L, Integer.MAX_VALUE); - delayNanos = (int) min(totalDelay - delaySeconds * 1000000000L, MAX_SCHEDULED_TIMERFD_NS); - } - return Native.epollWait(epollFd, events, timerFd, delaySeconds, delayNanos); + return Native.epollWait(epollFd, events, false); } private int epollWaitNow() throws IOException { @@ -319,6 +379,7 @@ private int epollWaitTimeboxed() throws IOException { @Override protected void run() { + long timerFdDeadline = Long.MAX_VALUE; for (;;) { try { processPendingChannelFlags(); @@ -348,10 +409,14 @@ protected void run() { } // fall-through } - + boolean deadlineFieldReset = false; wakenUp.set(0); try { - if (!hasTasks()) { + // When we are in the EventLoop we don't bother setting the timerFd for each + // scheduled task, but instead defer the processing until the end of the EventLoop + // (next wait) to reduce the timerFd modifications. + deadlineFieldReset = checkScheduleTaskQueueForNewDelay(timerFdDeadline); + if (deadlineFieldReset && !hasTasks()) { strategy = epollWait(); } } finally { @@ -360,33 +425,28 @@ protected void run() { if (wakenUp.get() == 1 || wakenUp.getAndSet(1) == 1) { pendingWakeup = true; } + if (deadlineFieldReset) { + // This getAndAdd will change the raw value of nextDeadlineNanos to be negative + // which will block any *new* timerFd mods by other threads while also "preserving" + // its last value to avoid disrupting a possibly-concurrent setTimerFd call + // (so that we can know the timerFd really did/will get updated to the read value). + timerFdDeadline = nextDeadlineNanos.getAndAdd(Long.MAX_VALUE + 1); + // The value of nextDeadlineNanos is now guaranteed to be negative + } } // fallthrough default: } - final int ioRatio = this.ioRatio; - if (ioRatio == 100) { - try { - if (strategy > 0) { - processReady(events, strategy); - } - } finally { - // Ensure we always run tasks. - runAllTasks(); - } - } else { - final long ioStartTime = System.nanoTime(); - - try { - if (strategy > 0) { - processReady(events, strategy); - } - } finally { - // Ensure we always run tasks. - final long ioTime = System.nanoTime() - ioStartTime; - runAllTasks(ioTime * (100 - ioRatio) / ioRatio); + try { + if (processReady(events, strategy)) { + // Polled events include timerFd expiry; conservatively assume that no timer is set + timerFdDeadline = Long.MAX_VALUE; } + } finally { + runAllTasks(); + // No need to drainScheduledQueue() after the fact, because all in event loop scheduling results + // in direct addition to the scheduled priority queue. } if (allowGrowing && strategy == events.length()) { //increase the size of the array as we needed the whole space for the events @@ -440,15 +500,15 @@ private void closeAll() { } } - private void processReady(EpollEventArray events, int ready) { - for (int i = 0; i < ready; i ++) { + // Returns true if a timerFd event was encountered + private boolean processReady(EpollEventArray events, int ready) { + boolean timerFired = false; + for (int i = 0; i < ready; ++i) { final int fd = events.fd(i); if (fd == eventFd.intValue()) { pendingWakeup = false; } else if (fd == timerFd.intValue()) { - // Just ignore as we use ET mode for the eventfd and timerfd. - // - // See also https://stackoverflow.com/a/12492308/1074097 + timerFired = true; } else { final long ev = events.events(i); @@ -502,6 +562,7 @@ private void processReady(EpollEventArray events, int ready) { } } } + return timerFired; } @Override