Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Close eventfd shutdown/wakeup race by closely tracking epoll edges #9586

Merged
merged 3 commits into from Sep 23, 2019
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -36,7 +36,7 @@
import java.util.BitSet;
import java.util.Queue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicInteger;

import static java.lang.Math.min;

Expand All @@ -45,8 +45,6 @@
*/
class EpollEventLoop extends SingleThreadEventLoop {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(EpollEventLoop.class);
private static final AtomicIntegerFieldUpdater<EpollEventLoop> WAKEN_UP_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(EpollEventLoop.class, "wakenUp");

static {
// Ensure JNI is initialized by the time this class is loaded by this time!
Expand Down Expand Up @@ -76,8 +74,8 @@ public int get() throws Exception {
return epollWaitNow();
}
};
@SuppressWarnings("unused") // AtomicIntegerFieldUpdater
private volatile int wakenUp;
private final AtomicInteger wakenUp = new AtomicInteger(1);
private boolean pendingWakeup;
private volatile int ioRatio = 50;

// See http://man7.org/linux/man-pages/man2/timerfd_create.2.html.
Expand Down Expand Up @@ -180,7 +178,7 @@ NativeDatagramPacketArray cleanDatagramPacketArray() {

@Override
protected void wakeup(boolean inEventLoop) {
if (!inEventLoop && WAKEN_UP_UPDATER.getAndSet(this, 1) == 0) {
if (!inEventLoop && wakenUp.getAndSet(1) == 0) {
// write to the evfd which will then wake-up epoll_wait(...)
Native.eventFdWrite(eventFd.intValue(), 1L);
}
Expand Down Expand Up @@ -314,6 +312,11 @@ private int epollBusyWait() throws IOException {
return Native.epollBusyWait(epollFd, events);
}

private int epollWaitTimeboxed() throws IOException {
// Wait with 1 second "safeguard" timeout
return Native.epollWait(epollFd, events, timerFd, 1, 0);
}

@Override
protected void run() {
for (;;) {
Expand All @@ -329,11 +332,35 @@ protected void run() {
break;

case SelectStrategy.SELECT:
if (wakenUp == 1) {
wakenUp = 0;
if (pendingWakeup) {
// We are going to be immediately woken so no need to reset wakenUp
// or check for timerfd adjustment.
strategy = epollWaitTimeboxed();
if (strategy != 0) {
break;
}
// We timed out so assume that we missed the write event due to an
// abnormally failed syscall (the write itself or a prior epoll_wait)
pendingWakeup = false;
if (hasTasks()) {
break;
}
// fall-through
}
if (!hasTasks()) {
strategy = epollWait();

// Ordered store is sufficient here since the only access outside this
// thread is a getAndSet in the wakeup() method
wakenUp.lazySet(0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was lazySet safe here based on what @njhill said the other day?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@johnou good question. To distil the problem, say we have fields

final AtomicInteger x = new AtomicInteger(1), y = new AtomicInteger(0);

Thread 1 program order:

x.lazySet(0);
if (y.get() == 0) { print("sleep"); }

Thread 2 program order:

y.compareAndSet(0, 1); // (always succeeds)
if (x.compareAndSet(0, 1)) { print("wake"); }

Is it possible for "sleep" to be printed without "wake" also being printed? I now think the answer is yes, even more so after reading this. Which means that lazySet isn't safe, but would be good to get confirmation from others... @franz1981?? :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You've said right: it isn't safe and set would be a better choice for a total ordered operation (among the others) :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@franz1981 please review the rest as well

try {
if (!hasTasks()) {
strategy = epollWait();
}
} finally {
// Try get() first to avoid much more expensive CAS in the case we
// were woken via the wakeup() method (submitted task)
if (wakenUp.get() == 1 || wakenUp.getAndSet(1) == 1) {
pendingWakeup = true;
}
}
// fallthrough
default:
Expand Down Expand Up @@ -417,7 +444,9 @@ private void closeAll() {
private void processReady(EpollEventArray events, int ready) {
for (int i = 0; i < ready; i ++) {
final int fd = events.fd(i);
if (fd == eventFd.intValue() || fd == timerFd.intValue()) {
if (fd == eventFd.intValue()) {
pendingWakeup = false;
} else if (fd == timerFd.intValue()) {
// Just ignore as we use ET mode for the eventfd and timerfd.
//
// See also https://stackoverflow.com/a/12492308/1074097
Expand Down Expand Up @@ -479,10 +508,23 @@ private void processReady(EpollEventArray events, int ready) {
@Override
protected void cleanup() {
try {
try {
epollFd.close();
} catch (IOException e) {
logger.warn("Failed to close the epoll fd.", e);
// Ensure any in-flight wakeup writes have been performed prior to closing eventFd.
while (pendingWakeup) {
try {
int count = epollWaitTimeboxed();
if (count == 0) {
// We timed-out so assume that the write we're expecting isn't coming
break;
}
for (int i = 0; i < count; i++) {
if (events.fd(i) == eventFd.intValue()) {
pendingWakeup = false;
break;
}
}
} catch (IOException ignore) {
// ignore
}
}
try {
eventFd.close();
Expand All @@ -494,6 +536,12 @@ protected void cleanup() {
} catch (IOException e) {
logger.warn("Failed to close the timer fd.", e);
}

try {
epollFd.close();
} catch (IOException e) {
logger.warn("Failed to close the epoll fd.", e);
}
} finally {
// release native memory
if (iovArray != null) {
Expand Down