Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Close eventfd shutdown/wakeup race by closely tracking epoll edges #9586

Merged
merged 3 commits into from Sep 23, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
57 changes: 16 additions & 41 deletions transport-native-epoll/src/main/c/netty_epoll_native.c
Expand Up @@ -197,9 +197,8 @@ static void netty_epoll_native_timerFdSetTime(JNIEnv* env, jclass clazz, jint ti
}
}

static jint netty_epoll_native_epollWaitNoTimeout(JNIEnv* env, jclass clazz, jint efd, jlong address, jint len, jboolean immediatePoll) {
static jint netty_epoll_native_epollWait(JNIEnv* env, jclass clazz, jint efd, jlong address, jint len, jint timeout) {
struct epoll_event *ev = (struct epoll_event*) (intptr_t) address;
const int timeout = immediatePoll ? 0 : -1;
int result, err;

do {
Expand All @@ -213,47 +212,23 @@ static jint netty_epoll_native_epollWaitNoTimeout(JNIEnv* env, jclass clazz, jin

// This method is deprecated!
static jint netty_epoll_native_epollWait0(JNIEnv* env, jclass clazz, jint efd, jlong address, jint len, jint timerFd, jint tvSec, jint tvNsec) {
struct epoll_event *ev = (struct epoll_event*) (intptr_t) address;
int result, err;

if (tvSec == 0 && tvNsec == 0) {
// Zeros = poll (aka return immediately).
do {
result = epoll_wait(efd, ev, len, 0);
if (result >= 0) {
return result;
}
} while((err = errno) == EINTR);
} else {
// only reschedule the timer if there is a newer event.
// -1 is a special value used by EpollEventLoop.
if (tvSec != ((jint) -1) && tvNsec != ((jint) -1)) {
struct itimerspec ts;
memset(&ts.it_interval, 0, sizeof(struct timespec));
ts.it_value.tv_sec = tvSec;
ts.it_value.tv_nsec = tvNsec;
if (timerfd_settime(timerFd, 0, &ts, NULL) < 0) {
netty_unix_errors_throwChannelExceptionErrorNo(env, "timerfd_settime() failed: ", errno);
return -1;
}
}
do {
result = epoll_wait(efd, ev, len, -1);
if (result > 0) {
// Detect timeout, and preserve the epoll_wait API.
if (result == 1 && ev[0].data.fd == timerFd) {
// We assume that timerFD is in ET mode. So we must consume this event to ensure we are notified
// of future timer events because ET mode only notifies a single time until the event is consumed.
uint64_t timerFireCount;
// We don't care what the result is. We just want to consume the wakeup event and reset ET.
result = read(timerFd, &timerFireCount, sizeof(uint64_t));
return 0;
}
return result;
}
} while((err = errno) == EINTR);
return netty_epoll_native_epollWait(env, clazz, efd, address, len, 0);
}
return -err;
// only reschedule the timer if there is a newer event.
// -1 is a special value used by EpollEventLoop.
if (tvSec != ((jint) -1) && tvNsec != ((jint) -1)) {
struct itimerspec ts;
memset(&ts.it_interval, 0, sizeof(struct timespec));
ts.it_value.tv_sec = tvSec;
ts.it_value.tv_nsec = tvNsec;
if (timerfd_settime(timerFd, 0, &ts, NULL) < 0) {
netty_unix_errors_throwChannelExceptionErrorNo(env, "timerfd_settime() failed: ", errno);
return -1;
}
}
return netty_epoll_native_epollWait(env, clazz, efd, address, len, -1);
}

static inline void cpu_relax() {
Expand Down Expand Up @@ -524,7 +499,7 @@ static const JNINativeMethod fixed_method_table[] = {
{ "timerFdSetTime", "(III)V", (void *) netty_epoll_native_timerFdSetTime },
{ "epollCreate", "()I", (void *) netty_epoll_native_epollCreate },
{ "epollWait0", "(IJIIII)I", (void *) netty_epoll_native_epollWait0 }, // This method is deprecated!
{ "epollWaitNoTimeout", "(IJIZ)I", (void *) netty_epoll_native_epollWaitNoTimeout },
{ "epollWait", "(IJII)I", (void *) netty_epoll_native_epollWait },
{ "epollBusyWait0", "(IJI)I", (void *) netty_epoll_native_epollBusyWait0 },
{ "epollCtlAdd0", "(III)I", (void *) netty_epoll_native_epollCtlAdd0 },
{ "epollCtlMod0", "(III)I", (void *) netty_epoll_native_epollCtlMod0 },
Expand Down
Expand Up @@ -36,7 +36,7 @@
import java.util.BitSet;
import java.util.Queue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicInteger;

import static java.lang.Math.min;

Expand All @@ -45,8 +45,6 @@
*/
class EpollEventLoop extends SingleThreadEventLoop {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(EpollEventLoop.class);
private static final AtomicIntegerFieldUpdater<EpollEventLoop> WAKEN_UP_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(EpollEventLoop.class, "wakenUp");

static {
// Ensure JNI is initialized by the time this class is loaded by this time!
Expand Down Expand Up @@ -76,8 +74,8 @@ public int get() throws Exception {
return epollWaitNow();
}
};
@SuppressWarnings("unused") // AtomicIntegerFieldUpdater
private volatile int wakenUp;
private final AtomicInteger wakenUp = new AtomicInteger(1);
private boolean pendingWakeup;
private volatile int ioRatio = 50;

// See http://man7.org/linux/man-pages/man2/timerfd_create.2.html.
Expand Down Expand Up @@ -180,7 +178,7 @@ NativeDatagramPacketArray cleanDatagramPacketArray() {

@Override
protected void wakeup(boolean inEventLoop) {
if (!inEventLoop && WAKEN_UP_UPDATER.getAndSet(this, 1) == 0) {
if (!inEventLoop && wakenUp.getAndSet(1) == 0) {
// write to the evfd which will then wake-up epoll_wait(...)
Native.eventFdWrite(eventFd.intValue(), 1L);
}
Expand Down Expand Up @@ -307,13 +305,18 @@ private int epollWait() throws IOException {
}

private int epollWaitNow() throws IOException {
return Native.epollWait(epollFd, events, timerFd, 0, 0);
return Native.epollWait(epollFd, events, true);
}

private int epollBusyWait() throws IOException {
return Native.epollBusyWait(epollFd, events);
}

private int epollWaitTimeboxed() throws IOException {
// Wait with 1 second "safeguard" timeout
return Native.epollWait(epollFd, events, 1000);
}

@Override
protected void run() {
for (;;) {
Expand All @@ -329,11 +332,34 @@ protected void run() {
break;

case SelectStrategy.SELECT:
if (wakenUp == 1) {
wakenUp = 0;
if (pendingWakeup) {
// We are going to be immediately woken so no need to reset wakenUp
// or check for timerfd adjustment.
strategy = epollWaitTimeboxed();
if (strategy != 0) {
break;
}
// We timed out so assume that we missed the write event due to an
// abnormally failed syscall (the write itself or a prior epoll_wait)
logger.warn("Missed eventfd write (not seen after > 1 second)");
pendingWakeup = false;
if (hasTasks()) {
break;
}
// fall-through
}
if (!hasTasks()) {
strategy = epollWait();

wakenUp.set(0);
try {
if (!hasTasks()) {
strategy = epollWait();
}
} finally {
// Try get() first to avoid much more expensive CAS in the case we
// were woken via the wakeup() method (submitted task)
if (wakenUp.get() == 1 || wakenUp.getAndSet(1) == 1) {
pendingWakeup = true;
}
}
// fallthrough
default:
Expand Down Expand Up @@ -417,7 +443,9 @@ private void closeAll() {
private void processReady(EpollEventArray events, int ready) {
for (int i = 0; i < ready; i ++) {
final int fd = events.fd(i);
if (fd == eventFd.intValue() || fd == timerFd.intValue()) {
if (fd == eventFd.intValue()) {
pendingWakeup = false;
} else if (fd == timerFd.intValue()) {
// Just ignore as we use ET mode for the eventfd and timerfd.
//
// See also https://stackoverflow.com/a/12492308/1074097
Expand Down Expand Up @@ -479,10 +507,23 @@ private void processReady(EpollEventArray events, int ready) {
@Override
protected void cleanup() {
try {
try {
epollFd.close();
} catch (IOException e) {
logger.warn("Failed to close the epoll fd.", e);
// Ensure any in-flight wakeup writes have been performed prior to closing eventFd.
while (pendingWakeup) {
try {
int count = epollWaitTimeboxed();
if (count == 0) {
// We timed-out so assume that the write we're expecting isn't coming
break;
}
for (int i = 0; i < count; i++) {
if (events.fd(i) == eventFd.intValue()) {
pendingWakeup = false;
break;
}
}
} catch (IOException ignore) {
// ignore
}
}
try {
eventFd.close();
Expand All @@ -494,6 +535,12 @@ protected void cleanup() {
} catch (IOException e) {
logger.warn("Failed to close the timer fd.", e);
}

try {
epollFd.close();
} catch (IOException e) {
logger.warn("Failed to close the epoll fd.", e);
}
} finally {
// release native memory
if (iovArray != null) {
Expand Down
Expand Up @@ -107,7 +107,14 @@ public static int epollWait(FileDescriptor epollFd, EpollEventArray events, File
}

static int epollWait(FileDescriptor epollFd, EpollEventArray events, boolean immediatePoll) throws IOException {
int ready = epollWaitNoTimeout(epollFd.intValue(), events.memoryAddress(), events.length(), immediatePoll);
return epollWait(epollFd, events, immediatePoll ? 0 : -1);
}

/**
* This uses epoll's own timeout and does not reset/re-arm any timerfd
*/
static int epollWait(FileDescriptor epollFd, EpollEventArray events, int timeoutMillis) throws IOException {
int ready = epollWait(epollFd.intValue(), events.memoryAddress(), events.length(), timeoutMillis);
if (ready < 0) {
throw newIOException("epoll_wait", ready);
}
Expand All @@ -128,7 +135,7 @@ public static int epollBusyWait(FileDescriptor epollFd, EpollEventArray events)
}

private static native int epollWait0(int efd, long address, int len, int timerFd, int timeoutSec, int timeoutNs);
private static native int epollWaitNoTimeout(int efd, long address, int len, boolean immediatePoll);
private static native int epollWait(int efd, long address, int len, int timeout);
private static native int epollBusyWait0(int efd, long address, int len);

public static void epollCtlAdd(int efd, final int fd, final int flags) throws IOException {
Expand Down