diff --git a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java index 40c25ac6335..69b46dfbe8e 100644 --- a/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java +++ b/transport-native-epoll/src/main/java/io/netty/channel/epoll/EpollEventLoop.java @@ -36,7 +36,9 @@ import java.util.BitSet; import java.util.Queue; import java.util.concurrent.Executor; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import static java.lang.Math.min; @@ -52,8 +54,17 @@ class EpollEventLoop extends SingleThreadEventLoop { Epoll.ensureAvailability(); } - // Pick a number that no task could have previously used. - private long prevDeadlineNanos = nanoTime() - 1; + /** + * When in epollWait(), this mirrors the currently-set deadline of the timerFd. A negative value + * means that the event loop is awake, which blocks rescheduling activity by other threads. + * It is restored to the real timerFd expiry time again prior to entering epollWait(). + * + * Note that we use deadline instead of delay because deadline is just a fixed number but delay requires interacting + * with the time source (e.g. calling System.nanoTime()) which can be expensive. + */ + private final AtomicLong nextDeadlineNanos = new AtomicLong(-1L); + private final AtomicInteger wakenUp = new AtomicInteger(1); + private boolean pendingWakeup; private final FileDescriptor epollFd; private final FileDescriptor eventFd; private final FileDescriptor timerFd; @@ -74,12 +85,6 @@ public int get() throws Exception { return epollWaitNow(); } }; - private final AtomicInteger wakenUp = new AtomicInteger(1); - private boolean pendingWakeup; - private volatile int ioRatio = 50; - - // See http://man7.org/linux/man-pages/man2/timerfd_create.2.html. - private static final long MAX_SCHEDULED_TIMERFD_NS = 999999999; EpollEventLoop(EventLoopGroup parent, Executor executor, int maxEvents, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler, @@ -176,6 +181,91 @@ NativeDatagramPacketArray cleanDatagramPacketArray() { return datagramPacketArray; } + @Override + protected boolean beforeScheduledTaskSubmitted(long deadlineNanos) { + return false; // don't wake event loop + } + + @Override + protected boolean afterScheduledTaskSubmitted(long deadlineNanos) { + try { + trySetTimerFd(deadlineNanos); + } catch (IOException e) { + throw new RejectedExecutionException(e); + } + return false; // don't wake event loop + } + + @Override + protected boolean runAllTasks() { + // This method is overridden to ensure that all the expired scheduled tasks are executed during shutdown, and + // any other execute all scenarios in the base class. + return runScheduledAndExecutorTasks(4); + } + + // Called only from *outside* the event loop + private void trySetTimerFd(long candidateNextDeadline) throws IOException { + for (;;) { + long nextDeadline = nextDeadlineNanos.get(); + if (nextDeadline <= candidateNextDeadline) { + // This includes case where nextDeadline is negative (event loop is awake) + return; + } + if (nextDeadlineNanos.compareAndSet(nextDeadline, candidateNextDeadline)) { + // We must serialize calls to setTimerFd to avoid the set of a later deadline + // racing with a sooner one and overwriting it. A second check of nextDeadlineNanos + // is made within the sync block to avoid having the CAS within the sync + synchronized (nextDeadlineNanos) { + nextDeadline = nextDeadlineNanos.get(); + if (nextDeadline == candidateNextDeadline || + (nextDeadline + Long.MAX_VALUE + 1) == candidateNextDeadline) { + setTimerFd(deadlineToDelayNanos(candidateNextDeadline)); + } + } + return; + } + } + } + + // Called only within synchronized(nextDeadlineNanos) + private void setTimerFd(long candidateNextDelayNanos) throws IOException { + if (candidateNextDelayNanos > 0) { + final int delaySeconds = (int) min(candidateNextDelayNanos / 1000000000L, Integer.MAX_VALUE); + final int delayNanos = (int) min(candidateNextDelayNanos - delaySeconds * 1000000000L, Integer.MAX_VALUE); + Native.timerFdSetTime(timerFd.intValue(), delaySeconds, delayNanos); + } else { + // Setting the timer to 0, 0 will disarm it, so we have a few options: + // 1. Set the timer wakeup to 1ns (1 system call). + // 2. Use the eventFd to force a wakeup and disarm the timer (2 system calls). + // For now we are using option (1) because there are less system calls, and we will correctly reset the + // nextDeadlineNanos state when the EventLoop processes the timer wakeup. + Native.timerFdSetTime(timerFd.intValue(), 0, 1); + } + } + + // Called only from the event loop + private boolean checkScheduleTaskQueueForNewDelay(long timerFdDeadline) throws IOException { + assert nextDeadlineNanos.get() < 0; + final long nextTaskDeadlineNanos = nextScheduledTaskDeadlineNanos(); + if (nextTaskDeadlineNanos == -1L || nextTaskDeadlineNanos >= timerFdDeadline) { + // Just restore to preexisting timerFd value, update not needed + nextDeadlineNanos.set(timerFdDeadline); + } else { + synchronized (nextDeadlineNanos) { + long delayNanos = deadlineToDelayNanos(nextTaskDeadlineNanos); + if (delayNanos <= 0L) { + // Abort since the deadline already passed (don't enter epollWait + // and just continue the EL to pick up the now-expired task) + return false; + } + // Shorter delay required than current timerFd setting, update it + setTimerFd(delayNanos); + nextDeadlineNanos.set(nextTaskDeadlineNanos); + } + } + return true; + } + @Override protected void wakeup(boolean inEventLoop) { if (!inEventLoop && wakenUp.getAndSet(1) == 0) { @@ -265,43 +355,13 @@ private static Queue newTaskQueue0(int maxPendingTasks) { : PlatformDependent.newMpscQueue(maxPendingTasks); } - /** - * Returns the percentage of the desired amount of time spent for I/O in the event loop. - */ - public int getIoRatio() { - return ioRatio; - } - - /** - * Sets the percentage of the desired amount of time spent for I/O in the event loop. The default value is - * {@code 50}, which means the event loop will try to spend the same amount of time for I/O as for non-I/O tasks. - */ - public void setIoRatio(int ioRatio) { - if (ioRatio <= 0 || ioRatio > 100) { - throw new IllegalArgumentException("ioRatio: " + ioRatio + " (expected: 0 < ioRatio <= 100)"); - } - this.ioRatio = ioRatio; - } - @Override public int registeredChannels() { return channels.size(); } private int epollWait() throws IOException { - int delaySeconds; - int delayNanos; - long curDeadlineNanos = deadlineNanos(); - if (curDeadlineNanos == prevDeadlineNanos) { - delaySeconds = -1; - delayNanos = -1; - } else { - long totalDelay = delayNanos(System.nanoTime()); - prevDeadlineNanos = curDeadlineNanos; - delaySeconds = (int) min(totalDelay / 1000000000L, Integer.MAX_VALUE); - delayNanos = (int) min(totalDelay - delaySeconds * 1000000000L, MAX_SCHEDULED_TIMERFD_NS); - } - return Native.epollWait(epollFd, events, timerFd, delaySeconds, delayNanos); + return Native.epollWait(epollFd, events, false); } private int epollWaitNow() throws IOException { @@ -319,6 +379,7 @@ private int epollWaitTimeboxed() throws IOException { @Override protected void run() { + long timerFdDeadline = Long.MAX_VALUE; for (;;) { try { processPendingChannelFlags(); @@ -348,10 +409,14 @@ protected void run() { } // fall-through } - + boolean deadlineFieldReset = false; wakenUp.set(0); try { - if (!hasTasks()) { + // When we are in the EventLoop we don't bother setting the timerFd for each + // scheduled task, but instead defer the processing until the end of the EventLoop + // (next wait) to reduce the timerFd modifications. + deadlineFieldReset = checkScheduleTaskQueueForNewDelay(timerFdDeadline); + if (deadlineFieldReset && !hasTasks()) { strategy = epollWait(); } } finally { @@ -360,33 +425,28 @@ protected void run() { if (wakenUp.get() == 1 || wakenUp.getAndSet(1) == 1) { pendingWakeup = true; } + if (deadlineFieldReset) { + // This getAndAdd will change the raw value of nextDeadlineNanos to be negative + // which will block any *new* timerFd mods by other threads while also "preserving" + // its last value to avoid disrupting a possibly-concurrent setTimerFd call + // (so that we can know the timerFd really did/will get updated to the read value). + timerFdDeadline = nextDeadlineNanos.getAndAdd(Long.MAX_VALUE + 1); + // The value of nextDeadlineNanos is now guaranteed to be negative + } } // fallthrough default: } - final int ioRatio = this.ioRatio; - if (ioRatio == 100) { - try { - if (strategy > 0) { - processReady(events, strategy); - } - } finally { - // Ensure we always run tasks. - runAllTasks(); - } - } else { - final long ioStartTime = System.nanoTime(); - - try { - if (strategy > 0) { - processReady(events, strategy); - } - } finally { - // Ensure we always run tasks. - final long ioTime = System.nanoTime() - ioStartTime; - runAllTasks(ioTime * (100 - ioRatio) / ioRatio); + try { + if (processReady(events, strategy)) { + // Polled events include timerFd expiry; conservatively assume that no timer is set + timerFdDeadline = Long.MAX_VALUE; } + } finally { + runAllTasks(); + // No need to drainScheduledQueue() after the fact, because all in event loop scheduling results + // in direct addition to the scheduled priority queue. } if (allowGrowing && strategy == events.length()) { //increase the size of the array as we needed the whole space for the events @@ -440,15 +500,15 @@ private void closeAll() { } } - private void processReady(EpollEventArray events, int ready) { - for (int i = 0; i < ready; i ++) { + // Returns true if a timerFd event was encountered + private boolean processReady(EpollEventArray events, int ready) { + boolean timerFired = false; + for (int i = 0; i < ready; ++i) { final int fd = events.fd(i); if (fd == eventFd.intValue()) { pendingWakeup = false; } else if (fd == timerFd.intValue()) { - // Just ignore as we use ET mode for the eventfd and timerfd. - // - // See also https://stackoverflow.com/a/12492308/1074097 + timerFired = true; } else { final long ev = events.events(i); @@ -502,6 +562,7 @@ private void processReady(EpollEventArray events, int ready) { } } } + return timerFired; } @Override