Skip to content

Commit

Permalink
Merge pull request netty#11 from normanmaurer/auto_read
Browse files Browse the repository at this point in the history
Correctly stop reading when AUTO_READ is set to off and also ensure w…
  • Loading branch information
1Jo1 committed Aug 31, 2020
2 parents 74c0d3d + 28db67c commit e1a582d
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,14 @@ abstract class AbstractIOUringChannel extends AbstractChannel implements UnixCha
private static final ChannelMetadata METADATA = new ChannelMetadata(false);
final LinuxSocket socket;
protected volatile boolean active;
private boolean pollInScheduled = false;

private boolean pollInPending = false;
private boolean pollOutPending = false;
private boolean writeScheduled = false;
private boolean readScheduled = false;
boolean inputClosedSeenErrorOnRead;

static final int SOCK_ADDR_LEN = 128;

//can only submit one write operation at a time
private boolean writeScheduled = false;
/**
* The future of the current connection attempt. If not null, subsequent connection attempts will fail.
*/
Expand Down Expand Up @@ -211,7 +211,15 @@ protected void doClose() throws Exception {
active = false;

IOUringSubmissionQueue submissionQueue = submissionQueue();
submissionQueue.addPollRemove(socket.intValue());
if (pollInPending) {
submissionQueue.addPollRemove(socket.intValue(), IOUring.POLLMASK_IN);
pollInPending = false;
}
if (pollOutPending) {
submissionQueue.addPollRemove(socket.intValue(), IOUring.POLLMASK_OUT);
pollOutPending = false;
}
submissionQueue.addPollRemove(socket.intValue(), IOUring.POLLMASK_RDHUP);
submissionQueue.submit();

// Even if we allow half closed sockets we should give up on reading. Otherwise we may allow a read attempt on a
Expand Down Expand Up @@ -258,15 +266,13 @@ public void run() {
@Override
protected void doBeginRead() {
final AbstractUringUnsafe unsafe = (AbstractUringUnsafe) unsafe();
if (!pollInScheduled) {
if (!pollInPending) {
unsafe.schedulePollIn();
}
}

@Override
protected void doWrite(ChannelOutboundBuffer in) {
logger.trace("IOUring doWrite message size: {}", in.size());

if (writeScheduled) {
return;
}
Expand All @@ -280,6 +286,7 @@ protected void doWrite(ChannelOutboundBuffer in) {
}

private void doWriteMultiple(ChannelOutboundBuffer in) {

final IovecArrayPool iovecArray = ((IOUringEventLoop) eventLoop()).getIovecArrayPool();

iovecMemoryAddress = iovecArray.createNewIovecMemoryAddress();
Expand Down Expand Up @@ -313,6 +320,8 @@ protected final void doWriteSingle(ByteBuf buf) {

//POLLOUT
private void addPollOut() {
assert !pollOutPending;
pollOutPending = true;
IOUringSubmissionQueue submissionQueue = submissionQueue();
submissionQueue.addPollOut(socket.intValue());
submissionQueue.submit();
Expand All @@ -326,7 +335,7 @@ protected final void flush0() {
// Flush immediately only when there's no pending flush.
// If there's a pending flush operation, event loop will call forceFlush() later,
// and thus there's no need to call it now.
if (!writeScheduled) {
if (!pollOutPending) {
super.flush0();
}
}
Expand Down Expand Up @@ -418,18 +427,19 @@ private void fireEventAndClose(Object evt) {
}

void schedulePollIn() {
assert !pollInScheduled;
assert !pollInPending;
if (!isActive() || shouldBreakIoUringInReady(config())) {
return;
}
pollInScheduled = true;
pollInPending = true;
IOUringSubmissionQueue submissionQueue = submissionQueue();
submissionQueue.addPollIn(socket.intValue());
submissionQueue.submit();
}

final void readComplete(int res) {
pollInScheduled = false;
readScheduled = false;

readComplete0(res);
}

Expand All @@ -440,21 +450,39 @@ final void readComplete(int res) {
*/
final void pollRdHup(int res) {
if (isActive()) {
if (!pollInScheduled) {
// If it is still active, we need to call epollInReady as otherwise we may miss to
// read pending data from the underlying file descriptor.
// See https://github.com/netty/netty/issues/3709
pollIn(res);
if (!readScheduled) {
scheduleRead();
}
} else {
// Just to be safe make sure the input marked as closed.
shutdownInput(true);
}
}

abstract void pollIn(int res);
/**
* Called once POLLIN event is ready to be processed
*/
final void pollIn(int res) {
pollInPending = false;

if (!readScheduled) {
scheduleRead();
}
}

protected final void scheduleRead() {
readScheduled = true;
scheduleRead0();
}

protected abstract void scheduleRead0();

/**
* Called once POLLOUT event is ready to be processed
*/
final void pollOut(int res) {
pollOutPending = false;

// pending connect
if (connectPromise != null) {
// Note this method is invoked by the event loop only if the connection attempt was
Expand Down Expand Up @@ -484,12 +512,14 @@ final void pollOut(int res) {
}
}
} else if (!getSocket().isOutputShutdown()) {
doWrite(unsafe().outboundBuffer());
// Try writing again
super.flush0();
}
}

final void writeComplete(int res) {
writeScheduled = false;

ChannelOutboundBuffer channelOutboundBuffer = unsafe().outboundBuffer();
if (iovecMemoryAddress != -1) {
((IOUringEventLoop) eventLoop()).getIovecArrayPool().releaseIovec(iovecMemoryAddress);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,20 +54,14 @@ public AbstractIOUringChannel getChannel() {
abstract Channel newChildChannel(int fd) throws Exception;

final class UringServerChannelUnsafe extends AbstractIOUringChannel.AbstractUringUnsafe {
private final byte[] acceptedAddress = new byte[26];

private void acceptSocket() {
@Override
protected void scheduleRead0() {
IOUringSubmissionQueue submissionQueue = submissionQueue();
//Todo get network addresses
submissionQueue.addAccept(fd().intValue());
submissionQueue.submit();
}

@Override
void pollIn(int res) {
acceptSocket();
}

// TODO: Respect MAX_MESSAGES_READ
protected void readComplete0(int res) {
final IOUringRecvByteAllocatorHandle allocHandle =
Expand All @@ -89,7 +83,9 @@ protected void readComplete0(int res) {
pipeline.fireExceptionCaught(cause);
pipeline.fireChannelReadComplete();
}
acceptSocket();
if (config().isAutoRead()) {
scheduleRead();
}
} else {
allocHandle.readComplete();
// Check if we did fail because there was nothing to accept atm.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,11 +198,7 @@ protected Executor prepareToClose() {
private ByteBuf readBuffer;

@Override
void pollIn(int res) {
readFromSocket();
}

private void readFromSocket() {
protected void scheduleRead0() {
final ChannelConfig config = config();

final ByteBufAllocator allocator = config.getAllocator();
Expand All @@ -215,6 +211,7 @@ private void readFromSocket() {

assert readBuffer == null;
readBuffer = byteBuf;

submissionQueue.addRead(socket.intValue(), byteBuf.memoryAddress(),
byteBuf.writerIndex(), byteBuf.capacity());
submissionQueue.submit();
Expand All @@ -232,8 +229,6 @@ protected void readComplete0(int res) {
this.readBuffer = null;
assert byteBuf != null;

boolean writable = true;

try {
if (res < 0) {
// If res is negative we should pass it to ioResult(...) which will either throw
Expand Down Expand Up @@ -261,21 +256,20 @@ protected void readComplete0(int res) {
}

allocHandle.incMessagesRead(1);
writable = byteBuf.isWritable();
boolean writable = byteBuf.isWritable();
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close, allocHandle);
}
if (!close) {
if (!writable) {
if (!writable && config().isAutoRead()) {
// Let's schedule another read.
readFromSocket();
scheduleRead();
} else {
// We did not fill the whole ByteBuf so we should break the "read loop" and try again later.
pipeline.fireChannelReadComplete();
}
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close, allocHandle);
}

}

private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,11 +177,6 @@ public boolean handle(int fd, int res, long flags, int op, int pollMask) {
IOUringSubmissionQueue submissionQueue = ringBuffer.getIoUringSubmissionQueue();
switch (op) {
case IOUring.OP_ACCEPT:
//Todo error handle the res
if (res == ECANCELED) {
logger.trace("POLL_LINK canceled");
break;
}
// Fall-through

case IOUring.OP_READ:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,8 @@ private void setData(long sqe, byte op, int pollMask, int fd, long bufferAddress
//user_data should be same as POLL_LINK fd
if (op == IOUring.OP_POLL_REMOVE) {
PlatformDependent.putInt(sqe + SQE_FD_FIELD, -1);
long pollLinkuData = convertToUserData((byte) IOUring.IO_POLL, fd, IOUring.POLLMASK_IN);
PlatformDependent.putLong(sqe + SQE_ADDRESS_FIELD, pollLinkuData);
long uData = convertToUserData(op, fd, pollMask);
PlatformDependent.putLong(sqe + SQE_ADDRESS_FIELD, uData);
}

long uData = convertToUserData(op, fd, pollMask);
Expand Down Expand Up @@ -241,7 +241,7 @@ public boolean addAccept(int fd) {
}

//fill the address which is associated with server poll link user_data
public boolean addPollRemove(int fd) {
public boolean addPollRemove(int fd, int pollMask) {
long sqe = 0;
boolean submitted = false;
while (sqe == 0) {
Expand All @@ -252,7 +252,7 @@ public boolean addPollRemove(int fd) {
submitted = true;
}
}
setData(sqe, (byte) IOUring.OP_POLL_REMOVE, 0, fd, 0, 0, 0);
setData(sqe, (byte) IOUring.OP_POLL_REMOVE, pollMask, fd, 0, 0, 0);

return submitted;
}
Expand Down Expand Up @@ -323,7 +323,6 @@ private int flushSqe() {
public void submit() {
int submitted = flushSqe();
logger.trace("Submitted: {}", submitted);
System.out.println("submitted: " + submitted);
if (submitted > 0) {
int ret = Native.ioUringEnter(ringFd, submitted, 0, 0);
if (ret < 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ public void ioUringPollRemoveTest() throws Exception {

Thread.sleep(10);

submissionQueue.addPollRemove(eventFd.intValue());
submissionQueue.addPollRemove(eventFd.intValue(), IOUring.POLLMASK_IN);
submissionQueue.submit();

Thread waitingCqe = new Thread() {
Expand Down

0 comments on commit e1a582d

Please sign in to comment.