Skip to content

Commit

Permalink
Merge pull request netty#12 from normanmaurer/poll_remove_fix
Browse files Browse the repository at this point in the history
Correctly build up entry for POLL_REMOVE so we find the right operation
  • Loading branch information
1Jo1 committed Aug 31, 2020
2 parents e1a582d + 8e5b540 commit 57884e2
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 58 deletions.
Expand Up @@ -54,10 +54,10 @@ final class IOUringEventLoop extends SingleThreadEventLoop implements
// other value T when EL is waiting with wakeup scheduled at time T
private final AtomicLong nextWakeupNanos = new AtomicLong(AWAKE);
private final FileDescriptor eventfd;
private final IovecArrayPool iovecArrayPool;

private long prevDeadlineNanos = NONE;
private boolean pendingWakeup;
private IovecArrayPool iovecArrayPool;

IOUringEventLoop(final EventLoopGroup parent, final Executor executor, final boolean addTaskWakesUp) {
super(parent, executor, addTaskWakesUp);
Expand Down Expand Up @@ -203,9 +203,8 @@ public boolean handle(int fd, int res, long flags, int op, int pollMask) {
break;

case IOUring.IO_POLL:
//Todo error handle the res
if (res == ECANCELED) {
logger.trace("POLL_LINK canceled");
logger.trace("IO_POLL cancelled");
break;
}
if (eventfd.intValue() == fd) {
Expand All @@ -230,9 +229,9 @@ public boolean handle(int fd, int res, long flags, int op, int pollMask) {

case IOUring.OP_POLL_REMOVE:
if (res == ENOENT) {
System.out.println(("POLL_REMOVE OPERATION not permitted"));
logger.trace("POLL_REMOVE not successful");
} else if (res == 0) {
System.out.println(("POLL_REMOVE OPERATION successful"));
logger.trace("POLL_REMOVE successful");
}
break;

Expand Down
Expand Up @@ -122,22 +122,25 @@ private void setData(long sqe, byte op, int pollMask, int fd, long bufferAddress
//user_data should be same as POLL_LINK fd
if (op == IOUring.OP_POLL_REMOVE) {
PlatformDependent.putInt(sqe + SQE_FD_FIELD, -1);
long uData = convertToUserData(op, fd, pollMask);
long uData = convertToUserData((byte) IOUring.IO_POLL, fd, pollMask);
PlatformDependent.putLong(sqe + SQE_ADDRESS_FIELD, uData);
PlatformDependent.putLong(sqe + SQE_USER_DATA_FIELD, convertToUserData(op, fd, 0));
PlatformDependent.putInt(sqe + SQE_RW_FLAGS_FIELD, 0);
} else {
long uData = convertToUserData(op, fd, pollMask);
PlatformDependent.putLong(sqe + SQE_USER_DATA_FIELD, uData);
//c union set Rw-Flags or accept_flags
if (op != IOUring.OP_ACCEPT) {
PlatformDependent.putInt(sqe + SQE_RW_FLAGS_FIELD, pollMask);
} else {
//accept_flags set NON_BLOCKING
PlatformDependent.putInt(sqe + SQE_RW_FLAGS_FIELD, SOCK_NONBLOCK | SOCK_CLOEXEC);
}
}

long uData = convertToUserData(op, fd, pollMask);
PlatformDependent.putLong(sqe + SQE_USER_DATA_FIELD, uData);

PlatformDependent.putByte(sqe + SQE_FLAGS_FIELD, (byte) 0);

//c union set Rw-Flags or accept_flags
if (op != IOUring.OP_ACCEPT) {
PlatformDependent.putInt(sqe + SQE_RW_FLAGS_FIELD, 0);
} else {
//accept_flags set NON_BLOCKING
PlatformDependent.putInt(sqe + SQE_RW_FLAGS_FIELD, SOCK_NONBLOCK | SOCK_CLOEXEC);
}


// pad field array -> all fields should be zero
long offsetIndex = 0;
Expand All @@ -146,10 +149,6 @@ private void setData(long sqe, byte op, int pollMask, int fd, long bufferAddress
offsetIndex += 8;
}

if (pollMask != 0) {
PlatformDependent.putInt(sqe + SQE_RW_FLAGS_FIELD, pollMask);
}

logger.trace("UserDataField: {}", PlatformDependent.getLong(sqe + SQE_USER_DATA_FIELD));
logger.trace("BufferAddress: {}", PlatformDependent.getLong(sqe + SQE_ADDRESS_FIELD));
logger.trace("Length: {}", PlatformDependent.getInt(sqe + SQE_LEN_FIELD));
Expand Down
Expand Up @@ -20,6 +20,7 @@
import org.junit.Test;

import java.nio.charset.Charset;
import java.util.concurrent.atomic.AtomicReference;

import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.UnpooledByteBufAllocator;
Expand Down Expand Up @@ -187,17 +188,13 @@ public void eventfdNoSignal() throws Exception {
@Override
public void run() {
assertTrue(completionQueue.ioUringWaitCqe());
try {
assertEquals(1, completionQueue.process(new IOUringCompletionQueue.IOUringCompletionQueueCallback() {
@Override
public boolean handle(int fd, int res, long flags, int op, int mask) {
assertEquals(1, res);
return true;
}
}));
} catch (Exception e) {
e.printStackTrace();
}
assertEquals(1, completionQueue.process(new IOUringCompletionQueue.IOUringCompletionQueueCallback() {
@Override
public boolean handle(int fd, int res, long flags, int op, int mask) {
assertEquals(1, res);
return true;
}
}));
}
};
waitingCqe.start();
Expand Down Expand Up @@ -232,47 +229,46 @@ public void ioUringPollRemoveTest() throws Exception {
FileDescriptor eventFd = Native.newEventFd();
submissionQueue.addPollIn(eventFd.intValue());
submissionQueue.submit();

Thread.sleep(10);

submissionQueue.addPollRemove(eventFd.intValue(), IOUring.POLLMASK_IN);
submissionQueue.submit();

final AtomicReference<AssertionError> errorRef = new AtomicReference<AssertionError>();
Thread waitingCqe = new Thread() {
private final IOUringCompletionQueue.IOUringCompletionQueueCallback verifyCallback =
new IOUringCompletionQueue.IOUringCompletionQueueCallback() {
@Override
public boolean handle(int fd, int res, long flags, int op, int mask) {
if (op == IOUring.IO_POLL) {
assertEquals(IOUringEventLoop.ECANCELED, res);
} else if (op == IOUring.OP_POLL_REMOVE) {
assertEquals(0, res);
} else {
fail("op " + op);
}
return false;
}
};

@Override
public void run() {
assertTrue(completionQueue.ioUringWaitCqe());
try {
assertEquals(1, completionQueue.process(new IOUringCompletionQueue.IOUringCompletionQueueCallback() {
@Override
public boolean handle(int fd, int res, long flags, int op, int mask) {
assertEquals(IOUringEventLoop.ECANCELED, res);
assertEquals(IOUring.IO_POLL, op);
return true;
}
}));
} catch (Exception e) {
e.printStackTrace();
}
try {
assertEquals(1, completionQueue.process(new IOUringCompletionQueue.IOUringCompletionQueueCallback() {
@Override
public boolean handle(int fd, int res, long flags, int op, int mask) {
assertEquals(0, res);
assertEquals(IOUring.OP_POLL_REMOVE, op);
return true;
}
}));
} catch (Exception e) {
e.printStackTrace();
assertTrue(completionQueue.ioUringWaitCqe());
assertEquals(1, completionQueue.process(verifyCallback));
assertTrue(completionQueue.ioUringWaitCqe());
assertEquals(1, completionQueue.process(verifyCallback));
} catch (AssertionError error) {
errorRef.set(error);
}
}
};
waitingCqe.start();

waitingCqe.join();
try {
eventFd.close();
AssertionError error = errorRef.get();
if (error != null) {
throw error;
}
} finally {
ringBuffer.close();
}
Expand Down

0 comments on commit 57884e2

Please sign in to comment.