From 8e5b5400e610c4072f72d75a6fa911add1493a8e Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Mon, 31 Aug 2020 21:23:45 +0200 Subject: [PATCH] Correctly build up entry for POLL_REMOVE so we find the right operation Motivation: We did not correctly compute all fields when POLL_REMOVE entry was calculate. Which could lead to not finding the right operation. Modifications: - Correctly fill all fields - Fix unit tests Result: Remove IO_POLL operations work again as expected --- .../netty/channel/uring/IOUringEventLoop.java | 9 ++- .../channel/uring/IOUringSubmissionQueue.java | 29 ++++---- .../io/netty/channel/uring/NativeTest.java | 72 +++++++++---------- 3 files changed, 52 insertions(+), 58 deletions(-) diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoop.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoop.java index 211aa783d59..028636b9445 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoop.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringEventLoop.java @@ -54,10 +54,10 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements // other value T when EL is waiting with wakeup scheduled at time T private final AtomicLong nextWakeupNanos = new AtomicLong(AWAKE); private final FileDescriptor eventfd; + private final IovecArrayPool iovecArrayPool; private long prevDeadlineNanos = NONE; private boolean pendingWakeup; - private IovecArrayPool iovecArrayPool; IOUringEventLoop(final EventLoopGroup parent, final Executor executor, final boolean addTaskWakesUp) { super(parent, executor, addTaskWakesUp); @@ -203,9 +203,8 @@ public boolean handle(int fd, int res, long flags, int op, int pollMask) { break; case IOUring.IO_POLL: - //Todo error handle the res if (res == ECANCELED) { - logger.trace("POLL_LINK canceled"); + logger.trace("IO_POLL cancelled"); break; } if (eventfd.intValue() == fd) { @@ -230,9 +229,9 @@ public boolean handle(int fd, int res, long flags, int op, int pollMask) { case IOUring.OP_POLL_REMOVE: if (res == ENOENT) { - System.out.println(("POLL_REMOVE OPERATION not permitted")); + logger.trace("POLL_REMOVE not successful"); } else if (res == 0) { - System.out.println(("POLL_REMOVE OPERATION successful")); + logger.trace("POLL_REMOVE successful"); } break; diff --git a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringSubmissionQueue.java b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringSubmissionQueue.java index 0cd350dae02..ef28d475efe 100644 --- a/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringSubmissionQueue.java +++ b/transport-native-io_uring/src/main/java/io/netty/channel/uring/IOUringSubmissionQueue.java @@ -122,22 +122,25 @@ private void setData(long sqe, byte op, int pollMask, int fd, long bufferAddress //user_data should be same as POLL_LINK fd if (op == IOUring.OP_POLL_REMOVE) { PlatformDependent.putInt(sqe + SQE_FD_FIELD, -1); - long uData = convertToUserData(op, fd, pollMask); + long uData = convertToUserData((byte) IOUring.IO_POLL, fd, pollMask); PlatformDependent.putLong(sqe + SQE_ADDRESS_FIELD, uData); + PlatformDependent.putLong(sqe + SQE_USER_DATA_FIELD, convertToUserData(op, fd, 0)); + PlatformDependent.putInt(sqe + SQE_RW_FLAGS_FIELD, 0); + } else { + long uData = convertToUserData(op, fd, pollMask); + PlatformDependent.putLong(sqe + SQE_USER_DATA_FIELD, uData); + //c union set Rw-Flags or accept_flags + if (op != IOUring.OP_ACCEPT) { + PlatformDependent.putInt(sqe + SQE_RW_FLAGS_FIELD, pollMask); + } else { + //accept_flags set NON_BLOCKING + PlatformDependent.putInt(sqe + SQE_RW_FLAGS_FIELD, SOCK_NONBLOCK | SOCK_CLOEXEC); + } } - long uData = convertToUserData(op, fd, pollMask); - PlatformDependent.putLong(sqe + SQE_USER_DATA_FIELD, uData); - PlatformDependent.putByte(sqe + SQE_FLAGS_FIELD, (byte) 0); - //c union set Rw-Flags or accept_flags - if (op != IOUring.OP_ACCEPT) { - PlatformDependent.putInt(sqe + SQE_RW_FLAGS_FIELD, 0); - } else { - //accept_flags set NON_BLOCKING - PlatformDependent.putInt(sqe + SQE_RW_FLAGS_FIELD, SOCK_NONBLOCK | SOCK_CLOEXEC); - } + // pad field array -> all fields should be zero long offsetIndex = 0; @@ -146,10 +149,6 @@ private void setData(long sqe, byte op, int pollMask, int fd, long bufferAddress offsetIndex += 8; } - if (pollMask != 0) { - PlatformDependent.putInt(sqe + SQE_RW_FLAGS_FIELD, pollMask); - } - logger.trace("UserDataField: {}", PlatformDependent.getLong(sqe + SQE_USER_DATA_FIELD)); logger.trace("BufferAddress: {}", PlatformDependent.getLong(sqe + SQE_ADDRESS_FIELD)); logger.trace("Length: {}", PlatformDependent.getInt(sqe + SQE_LEN_FIELD)); diff --git a/transport-native-io_uring/src/test/java/io/netty/channel/uring/NativeTest.java b/transport-native-io_uring/src/test/java/io/netty/channel/uring/NativeTest.java index c5ad55676e2..1fa8f9cb058 100644 --- a/transport-native-io_uring/src/test/java/io/netty/channel/uring/NativeTest.java +++ b/transport-native-io_uring/src/test/java/io/netty/channel/uring/NativeTest.java @@ -20,6 +20,7 @@ import org.junit.Test; import java.nio.charset.Charset; +import java.util.concurrent.atomic.AtomicReference; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.UnpooledByteBufAllocator; @@ -187,17 +188,13 @@ public void eventfdNoSignal() throws Exception { @Override public void run() { assertTrue(completionQueue.ioUringWaitCqe()); - try { - assertEquals(1, completionQueue.process(new IOUringCompletionQueue.IOUringCompletionQueueCallback() { - @Override - public boolean handle(int fd, int res, long flags, int op, int mask) { - assertEquals(1, res); - return true; - } - })); - } catch (Exception e) { - e.printStackTrace(); - } + assertEquals(1, completionQueue.process(new IOUringCompletionQueue.IOUringCompletionQueueCallback() { + @Override + public boolean handle(int fd, int res, long flags, int op, int mask) { + assertEquals(1, res); + return true; + } + })); } }; waitingCqe.start(); @@ -232,47 +229,46 @@ public void ioUringPollRemoveTest() throws Exception { FileDescriptor eventFd = Native.newEventFd(); submissionQueue.addPollIn(eventFd.intValue()); submissionQueue.submit(); - - Thread.sleep(10); - submissionQueue.addPollRemove(eventFd.intValue(), IOUring.POLLMASK_IN); submissionQueue.submit(); + final AtomicReference errorRef = new AtomicReference(); Thread waitingCqe = new Thread() { + private final IOUringCompletionQueue.IOUringCompletionQueueCallback verifyCallback = + new IOUringCompletionQueue.IOUringCompletionQueueCallback() { + @Override + public boolean handle(int fd, int res, long flags, int op, int mask) { + if (op == IOUring.IO_POLL) { + assertEquals(IOUringEventLoop.ECANCELED, res); + } else if (op == IOUring.OP_POLL_REMOVE) { + assertEquals(0, res); + } else { + fail("op " + op); + } + return false; + } + }; + @Override public void run() { - assertTrue(completionQueue.ioUringWaitCqe()); try { - assertEquals(1, completionQueue.process(new IOUringCompletionQueue.IOUringCompletionQueueCallback() { - @Override - public boolean handle(int fd, int res, long flags, int op, int mask) { - assertEquals(IOUringEventLoop.ECANCELED, res); - assertEquals(IOUring.IO_POLL, op); - return true; - } - })); - } catch (Exception e) { - e.printStackTrace(); - } - try { - assertEquals(1, completionQueue.process(new IOUringCompletionQueue.IOUringCompletionQueueCallback() { - @Override - public boolean handle(int fd, int res, long flags, int op, int mask) { - assertEquals(0, res); - assertEquals(IOUring.OP_POLL_REMOVE, op); - return true; - } - })); - } catch (Exception e) { - e.printStackTrace(); + assertTrue(completionQueue.ioUringWaitCqe()); + assertEquals(1, completionQueue.process(verifyCallback)); + assertTrue(completionQueue.ioUringWaitCqe()); + assertEquals(1, completionQueue.process(verifyCallback)); + } catch (AssertionError error) { + errorRef.set(error); } } }; waitingCqe.start(); - waitingCqe.join(); try { eventFd.close(); + AssertionError error = errorRef.get(); + if (error != null) { + throw error; + } } finally { ringBuffer.close(); }