Skip to content

Commit

Permalink
Remove edge-triggered support for epoll and just always use level-tri… (
Browse files Browse the repository at this point in the history
#14031)

…ggered

Motivation:

We supported edge-triggered and level-triggered modes for our epoll
transport. This made things more complex while not really providing much
value.

Modifications:

- Remove edge-triggered support and just use level-triggered all the
time

Result:

Less complexity. Fixes #14007
  • Loading branch information
normanmaurer committed May 3, 2024
1 parent d3e830c commit dacd5c0
Show file tree
Hide file tree
Showing 25 changed files with 52 additions and 462 deletions.
Expand Up @@ -77,6 +77,7 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
boolean epollInReadyRunnablePending;
private EpollIoOps ops;
private EpollIoOps inital;

protected volatile boolean active;

AbstractEpollChannel(Channel parent, LinuxSocket fd, boolean active, EpollIoOps initialOps) {
Expand All @@ -103,14 +104,6 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
this.ops = initialOps;
}

void add(EpollIoOps add) {
ops = ops.with(add);
}

void remove(EpollIoOps remove) {
ops = ops.without(remove);
}

static boolean isSoErrorZero(Socket fd) {
try {
return fd.getSoError() == 0;
Expand Down Expand Up @@ -259,12 +252,6 @@ protected void doBeginRead() throws Exception {
// executeEpollInReadyRunnable could read nothing, and if the user doesn't explicitly call read they will
// never get data after this.
setFlag(Native.EPOLLIN);

// If EPOLL ET mode is enabled and auto read was toggled off on the last read loop then we may not be notified
// again if we didn't consume all the data. So we force a read operation here if there maybe more data.
if (unsafe.maybeMoreDataToRead) {
unsafe.executeEpollInReadyRunnable(config());
}
}

final boolean shouldBreakEpollInReady(ChannelConfig config) {
Expand Down Expand Up @@ -451,7 +438,6 @@ final long doWriteOrSendBytes(ByteBuf data, InetSocketAddress remoteAddress, boo

protected abstract class AbstractEpollUnsafe extends AbstractUnsafe implements EpollIoHandle {
boolean readPending;
boolean maybeMoreDataToRead;
private EpollRecvByteAllocatorHandle allocHandle;
private final Runnable epollInReadyRunnable = new Runnable() {
@Override
Expand Down Expand Up @@ -521,39 +507,14 @@ public void handle(IoRegistration registration, IoEvent event) {
*/
abstract void epollInReady();

final void epollInBefore() {
maybeMoreDataToRead = false;
}

final void epollInFinally(ChannelConfig config) {
maybeMoreDataToRead = allocHandle.maybeMoreDataToRead();

if (allocHandle.isReceivedRdHup() || (readPending && maybeMoreDataToRead)) {
// trigger a read again as there may be something left to read and because of epoll ET we
// will not get notified again until we read everything from the socket
//
// It is possible the last fireChannelRead call could cause the user to call read() again, or if
// autoRead is true the call to channelReadComplete would also call read, but maybeMoreDataToRead is set
// to false before every read operation to prevent re-entry into epollInReady() we will not read from
// the underlying OS again unless the user happens to call read again.
executeEpollInReadyRunnable(config);
} else if (!readPending && !config.isAutoRead()) {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
clearEpollIn();
}
}

final void executeEpollInReadyRunnable(ChannelConfig config) {
if (epollInReadyRunnablePending || !isActive() || shouldBreakEpollInReady(config)) {
return;
}
epollInReadyRunnablePending = true;
eventLoop().execute(epollInReadyRunnable);
final boolean shouldStopReading(ChannelConfig config) {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
return !readPending && !config.isAutoRead();
}

/**
Expand Down
Expand Up @@ -89,12 +89,9 @@ void epollInReady() {
return;
}
final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
allocHandle.edgeTriggered(isFlagSet(Native.EPOLLET));

final ChannelPipeline pipeline = pipeline();
allocHandle.reset(config);
allocHandle.attemptedBytesRead(1);
epollInBefore();

Throwable exception = null;
try {
Expand Down Expand Up @@ -124,7 +121,9 @@ void epollInReady() {
pipeline.fireExceptionCaught(exception);
}
} finally {
epollInFinally(config);
if (shouldStopReading(config)) {
clearEpollIn();
}
}
}
}
Expand Down
Expand Up @@ -745,12 +745,9 @@ void epollInReady() {
return;
}
final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
allocHandle.edgeTriggered(isFlagSet(Native.EPOLLET));

final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
allocHandle.reset(config);
epollInBefore();

ByteBuf byteBuf = null;
boolean close = false;
Expand Down Expand Up @@ -824,7 +821,9 @@ void epollInReady() {
handleReadException(pipeline, byteBuf, t, close, allocHandle);
} finally {
if (sQueue == null) {
epollInFinally(config);
if (shouldStopReading(config)) {
clearEpollIn();
}
} else {
if (!config.isAutoRead()) {
clearEpollIn();
Expand Down
Expand Up @@ -25,7 +25,8 @@
import io.netty.channel.WriteBufferWaterMark;
import io.netty.channel.unix.IntegerUnixChannelOption;
import io.netty.channel.unix.RawUnixChannelOption;
import io.netty.util.internal.ObjectUtil;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;

import java.io.IOException;
import java.nio.ByteBuffer;
Expand All @@ -34,6 +35,9 @@
import static io.netty.channel.unix.Limits.SSIZE_MAX;

public class EpollChannelConfig extends DefaultChannelConfig {

private static final InternalLogger LOGGER = InternalLoggerFactory.getInstance(EpollChannelConfig.class);

private volatile long maxBytesPerGatheringWrite = SSIZE_MAX;

protected EpollChannelConfig(Channel channel) {
Expand Down Expand Up @@ -180,10 +184,12 @@ public EpollChannelConfig setMessageSizeEstimator(MessageSizeEstimator estimator
* {@link EpollMode#EDGE_TRIGGERED}. If you want to use {@link #isAutoRead()} {@code false} or
* {@link #getMaxMessagesPerRead()} and have an accurate behaviour you should use
* {@link EpollMode#LEVEL_TRIGGERED}.
*
* @deprecated Netty always uses level-triggered mode and so this method is just a no-op.
*/
@Deprecated
public EpollMode getEpollMode() {
return ((AbstractEpollChannel) channel).isFlagSet(Native.EPOLLET)
? EpollMode.EDGE_TRIGGERED : EpollMode.LEVEL_TRIGGERED;
return EpollMode.LEVEL_TRIGGERED;
}

/**
Expand All @@ -193,22 +199,12 @@ public EpollMode getEpollMode() {
* {@link EpollMode#LEVEL_TRIGGERED}.
*
* <strong>Be aware this config setting can only be adjusted before the channel was registered.</strong>
*
* @deprecated Netty always uses level-triggered mode and so this method is just a no-op.
*/
@Deprecated
public EpollChannelConfig setEpollMode(EpollMode mode) {
ObjectUtil.checkNotNull(mode, "mode");

AbstractEpollChannel epollChannel = (AbstractEpollChannel) channel;
checkChannelNotRegistered();
switch (mode) {
case EDGE_TRIGGERED:
epollChannel.add(EpollIoOps.EPOLLET);
break;
case LEVEL_TRIGGERED:
epollChannel.remove(EpollIoOps.EPOLLET);
break;
default:
throw new Error();
}
LOGGER.debug("Changing the EpollMode is not supported anymore, this is just a no-op");
return this;
}

Expand Down
Expand Up @@ -48,6 +48,10 @@ public final class EpollChannelOption<T> extends UnixChannelOption<T> {
public static final ChannelOption<Boolean> TCP_QUICKACK = valueOf(EpollChannelOption.class, "TCP_QUICKACK");
public static final ChannelOption<Integer> SO_BUSY_POLL = valueOf(EpollChannelOption.class, "SO_BUSY_POLL");

/**
* @deprecated Netty always uses level-triggered mode and so this method is just a no-op.
*/
@Deprecated
public static final ChannelOption<EpollMode> EPOLL_MODE =
ChannelOption.valueOf(EpollChannelOption.class, "EPOLL_MODE");

Expand Down
Expand Up @@ -510,12 +510,9 @@ void epollInReady() {
return;
}
final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
allocHandle.edgeTriggered(isFlagSet(Native.EPOLLET));

final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
allocHandle.reset(config);
epollInBefore();

Throwable exception = null;
try {
Expand Down Expand Up @@ -568,7 +565,9 @@ void epollInReady() {
pipeline.fireExceptionCaught(exception);
}
} finally {
epollInFinally(config);
if (shouldStopReading(config)) {
clearEpollIn();
}
}
}
}
Expand Down
Expand Up @@ -297,12 +297,9 @@ void epollInReady() {
return;
}
final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
allocHandle.edgeTriggered(isFlagSet(Native.EPOLLET));

final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
allocHandle.reset(config);
epollInBefore();

Throwable exception = null;
try {
Expand Down Expand Up @@ -375,7 +372,9 @@ void epollInReady() {
pipeline.fireExceptionCaught(exception);
}
} finally {
epollInFinally(config);
if (shouldStopReading(config)) {
clearEpollIn();
}
}
}
}
Expand Down
Expand Up @@ -154,11 +154,9 @@ private void epollInReadFd() {
}
final ChannelConfig config = config();
final EpollRecvByteAllocatorHandle allocHandle = recvBufAllocHandle();
allocHandle.edgeTriggered(isFlagSet(Native.EPOLLET));

final ChannelPipeline pipeline = pipeline();
allocHandle.reset(config);
epollInBefore();

try {
readLoop: do {
Expand Down Expand Up @@ -187,7 +185,9 @@ private void epollInReadFd() {
pipeline.fireChannelReadComplete();
pipeline.fireExceptionCaught(t);
} finally {
epollInFinally(config);
if (shouldStopReading(config)) {
clearEpollIn();
}
}
}
}
Expand Down
Expand Up @@ -17,7 +17,10 @@

/**
* The <a href="https://linux.die.net//man/7/epoll">epoll</a> mode to use.
*
* @deprecated Netty always uses level-triggered mode.
*/
@Deprecated
public enum EpollMode {

/**
Expand Down
Expand Up @@ -31,7 +31,6 @@ public boolean get() {
return maybeMoreDataToRead();
}
};
private boolean isEdgeTriggered;
private boolean receivedRdHup;

EpollRecvByteAllocatorHandle(ExtendedHandle handle) {
Expand All @@ -47,25 +46,7 @@ final boolean isReceivedRdHup() {
}

boolean maybeMoreDataToRead() {
/**
* EPOLL ET requires that we read until we get an EAGAIN
* (see Q9 in <a href="https://man7.org/linux/man-pages/man7/epoll.7.html">epoll man</a>). However in order to
* respect auto read we supporting reading to stop if auto read is off. It is expected that the
* {@link #EpollSocketChannel} implementations will track if we are in edgeTriggered mode and all data was not
* read, and will force a EPOLLIN ready event.
*
* It is assumed RDHUP is handled externally by checking {@link #isReceivedRdHup()}.
*/
return (isEdgeTriggered && lastBytesRead() > 0) ||
(!isEdgeTriggered && lastBytesRead() == attemptedBytesRead());
}

final void edgeTriggered(boolean edgeTriggered) {
isEdgeTriggered = edgeTriggered;
}

final boolean isEdgeTriggered() {
return isEdgeTriggered;
return lastBytesRead() == attemptedBytesRead();
}

@Override
Expand All @@ -77,7 +58,7 @@ public final ByteBuf allocate(ByteBufAllocator alloc) {

@Override
public final boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
return ((ExtendedHandle) delegate()).continueReading(maybeMoreDataSupplier);
return isReceivedRdHup() || ((ExtendedHandle) delegate()).continueReading(maybeMoreDataSupplier);
}

@Override
Expand Down
Expand Up @@ -16,7 +16,6 @@
package io.netty.channel.epoll;

import io.netty.channel.Channel;
import io.netty.channel.EventLoop;
import io.netty.channel.socket.InternetProtocolFamily;
import io.netty.channel.socket.ServerSocketChannel;

Expand Down

This file was deleted.

0 comments on commit dacd5c0

Please sign in to comment.