Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce IoHandleEventLoopGroup / IoHandleEventLoop and its implemen… #13991

Merged
merged 23 commits into from
Apr 29, 2024
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.channel.local.LocalAddress;
import io.netty.channel.local.LocalChannel;
import io.netty.channel.local.LocalEventLoopGroup;
import io.netty.channel.local.LocalServerChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
Expand Down Expand Up @@ -770,7 +771,7 @@ public void testCloseOnHandshakeFailure() throws Exception {
.trustManager(new SelfSignedCertificate().cert())
.build();

EventLoopGroup group = new DefaultEventLoopGroup(1);
EventLoopGroup group = new LocalEventLoopGroup(1);
Channel sc = null;
Channel cc = null;
try {
Expand Down
290 changes: 290 additions & 0 deletions pom.xml

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@
import io.netty.channel.ChannelPromise;
import io.netty.channel.ConnectTimeoutException;
import io.netty.channel.EventLoop;
import io.netty.channel.IoEventLoop;
import io.netty.channel.IoOpt;
import io.netty.channel.IoRegistration;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.socket.ChannelInputShutdownEvent;
import io.netty.channel.socket.ChannelInputShutdownReadComplete;
Expand Down Expand Up @@ -66,21 +69,16 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
private ChannelPromise connectPromise;
private Future<?> connectTimeoutFuture;
private SocketAddress requestedRemoteAddress;

private volatile SocketAddress local;
private volatile SocketAddress remote;

protected int flags = Native.EPOLLET;
private EpollIoRegistration registration;
boolean inputClosedSeenErrorOnRead;
boolean epollInReadyRunnablePending;

volatile EpollIoOpt initialOpts;
protected volatile boolean active;

AbstractEpollChannel(LinuxSocket fd) {
this(null, fd, false);
}

AbstractEpollChannel(Channel parent, LinuxSocket fd, boolean active) {
AbstractEpollChannel(Channel parent, LinuxSocket fd, boolean active, EpollIoOpt initialOpts) {
super(parent);
this.socket = checkNotNull(fd, "fd");
this.active = active;
Expand All @@ -90,16 +88,18 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
this.local = fd.localAddress();
this.remote = fd.remoteAddress();
}
this.initialOpts = initialOpts;
}

AbstractEpollChannel(Channel parent, LinuxSocket fd, SocketAddress remote) {
AbstractEpollChannel(Channel parent, LinuxSocket fd, SocketAddress remote, EpollIoOpt initialOpts) {
super(parent);
this.socket = checkNotNull(fd, "fd");
this.active = true;
// Directly cache the remote and local addresses
// See https://github.com/netty/netty/issues/2359
this.remote = remote;
this.local = fd.localAddress();
this.initialOpts = initialOpts;
}

static boolean isSoErrorZero(Socket fd) {
Expand All @@ -111,21 +111,27 @@ static boolean isSoErrorZero(Socket fd) {
}

protected void setFlag(int flag) throws IOException {
if (!isFlagSet(flag)) {
flags |= flag;
modifyEvents();
if (isRegistered()) {
EpollIoRegistration registration = registration();
registration.updateInterestOpt(registration.interestOpt().with(EpollIoOpt.valueOf(flag)));
} else {
initialOpts = initialOpts.with(EpollIoOpt.valueOf(flag));
}
}

void clearFlag(int flag) throws IOException {
if (isFlagSet(flag)) {
flags &= ~flag;
modifyEvents();
}
EpollIoRegistration registration = registration();
registration.updateInterestOpt(
registration.interestOpt().without(EpollIoOpt.valueOf(flag)));
}

protected final EpollIoRegistration registration() {
assert registration != null;
return registration;
}

boolean isFlagSet(int flag) {
return (flags & flag) != 0;
return (registration().interestOpt().value & flag) != 0;
}

@Override
Expand Down Expand Up @@ -202,23 +208,26 @@ protected void doDisconnect() throws Exception {
doClose();
}

@Override
protected boolean isCompatible(EventLoop loop) {
return loop instanceof EpollEventLoop;
}

@Override
public boolean isOpen() {
return socket.isOpen();
}

@Override
protected void doDeregister() throws Exception {
((EpollEventLoop) eventLoop()).remove(this);
EpollIoRegistration registration = this.registration;
if (registration != null) {
registration.cancel();
}
}

@Override
protected boolean isCompatible(EventLoop loop) {
return loop instanceof IoEventLoop && ((IoEventLoop) loop).isCompatible(AbstractEpollUnsafe.class);
}

@Override
protected final void doBeginRead() throws Exception {
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
final AbstractEpollUnsafe unsafe = (AbstractEpollUnsafe) unsafe();
unsafe.readPending = true;
Expand Down Expand Up @@ -269,23 +278,24 @@ public void run() {
} else {
// The EventLoop is not registered atm so just update the flags so the correct value
// will be used once the channel is registered
flags &= ~Native.EPOLLIN;
}
}

private void modifyEvents() throws IOException {
if (isOpen() && isRegistered()) {
((EpollEventLoop) eventLoop()).modify(this);
initialOpts = initialOpts.without(EpollIoOpt.EPOLLIN);
}
}

@Override
protected void doRegister() throws Exception {
protected void doRegister(ChannelPromise promise) {
// Just in case the previous EventLoop was shutdown abruptly, or an event is still pending on the old EventLoop
// make sure the epollInReadyRunnablePending variable is reset so we will be able to execute the Runnable on the
// new EventLoop.
epollInReadyRunnablePending = false;
((EpollEventLoop) eventLoop()).add(this);
((IoEventLoop) eventLoop()).register((AbstractEpollUnsafe) unsafe(), initialOpts).addListener(f -> {
if (f.isSuccess()) {
registration = (EpollIoRegistration) f.getNow();
promise.setSuccess();
} else {
promise.setFailure(f.cause());
}
});
}

@Override
Expand Down Expand Up @@ -394,7 +404,7 @@ final long doWriteOrSendBytes(ByteBuf data, InetSocketAddress remoteAddress, boo
}

if (data.nioBufferCount() > 1) {
IovArray array = ((EpollEventLoop) eventLoop()).cleanIovArray();
IovArray array = registration().ioHandler().cleanIovArray();
array.add(data, data.readerIndex(), data.readableBytes());
int cnt = array.count();
assert cnt != 0;
Expand All @@ -414,7 +424,7 @@ final long doWriteOrSendBytes(ByteBuf data, InetSocketAddress remoteAddress, boo
remoteAddress.getAddress(), remoteAddress.getPort(), fastOpen);
}

protected abstract class AbstractEpollUnsafe extends AbstractUnsafe {
protected abstract class AbstractEpollUnsafe extends AbstractUnsafe implements EpollIoHandle {
boolean readPending;
boolean maybeMoreDataToRead;
private EpollRecvByteAllocatorHandle allocHandle;
Expand All @@ -426,6 +436,60 @@ public void run() {
}
};

Channel channel() {
return AbstractEpollChannel.this;
}

@Override
public FileDescriptor fd() {
return AbstractEpollChannel.this.fd();
}

@Override
public void close() {
close(voidPromise());
}

@Override
public void handle(IoRegistration registration, IoOpt readyOpt) {
EpollIoOpt epollOpt = (EpollIoOpt) readyOpt;

// Don't change the ordering of processing EPOLLOUT | EPOLLRDHUP / EPOLLIN if you're not 100%
// sure about it!
// Re-ordering can easily introduce bugs and bad side-effects, as we found out painfully in the
// past.

// First check for EPOLLOUT as we may need to fail the connect ChannelPromise before try
// to read from the file descriptor.
// See https://github.com/netty/netty/issues/3785
//
// It is possible for an EPOLLOUT or EPOLLERR to be generated when a connection is refused.
// In either case epollOutReady() will do the correct thing (finish connecting, or fail
// the connection).
// See https://github.com/netty/netty/issues/3848
if (epollOpt.contains(EpollIoOpt.EPOLLERR) || epollOpt.contains(EpollIoOpt.EPOLLOUT)) {
// Force flush of data as the epoll is writable again
epollOutReady();
}

// Check EPOLLIN before EPOLLRDHUP to ensure all data is read before shutting down the input.
// See https://github.com/netty/netty/issues/4317.
//
// If EPOLLIN or EPOLLERR was received and the channel is still open call epollInReady(). This will
// try to read from the underlying file descriptor and so notify the user about the error.
if (epollOpt.contains(EpollIoOpt.EPOLLERR) || epollOpt.contains(EpollIoOpt.EPOLLIN)) {
// The Channel is still open and there is something to read. Do it now.
epollInReady();
}

// Check if EPOLLRDHUP was set, this will notify us for connection-reset in which case
// we may close the channel directly or try to read more data depending on the state of the
// Channel and als depending on the AbstractEpollChannel subtype.
if (epollOpt.contains(EpollIoOpt.EPOLLRDHUP)) {
epollRdHupReady();
}
}

/**
* Called once EPOLLIN event is ready to be processed
*/
Expand Down Expand Up @@ -575,7 +639,8 @@ protected final void clearEpollIn0() {
assert eventLoop().inEventLoop();
try {
readPending = false;
clearFlag(Native.EPOLLIN);
EpollIoRegistration registration = registration();
registration.updateInterestOpt(registration.interestOpt().without(EpollIoOpt.EPOLLIN));
} catch (IOException e) {
// When this happens there is something completely wrong with either the filedescriptor or epoll,
// so fire the exception through the pipeline and close the Channel.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoop;
import io.netty.channel.ServerChannel;

import java.net.InetSocketAddress;
Expand All @@ -39,19 +38,14 @@ protected AbstractEpollServerChannel(LinuxSocket fd) {
}

protected AbstractEpollServerChannel(LinuxSocket fd, boolean active) {
super(null, fd, active);
super(null, fd, active, EpollIoOpt.valueOf(0));
}

@Override
public ChannelMetadata metadata() {
return METADATA;
}

@Override
protected boolean isCompatible(EventLoop loop) {
return loop instanceof EpollEventLoop;
}

@Override
protected InetSocketAddress remoteAddress0() {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,21 +91,18 @@ protected AbstractEpollStreamChannel(int fd) {
}

AbstractEpollStreamChannel(Channel parent, LinuxSocket fd) {
super(parent, fd, true);
// Add EPOLLRDHUP so we are notified once the remote peer close the connection.
flags |= Native.EPOLLRDHUP;
super(parent, fd, true, EpollIoOpt.EPOLLRDHUP);
}

protected AbstractEpollStreamChannel(Channel parent, LinuxSocket fd, SocketAddress remote) {
super(parent, fd, remote);
// Add EPOLLRDHUP so we are notified once the remote peer close the connection.
flags |= Native.EPOLLRDHUP;
super(parent, fd, remote, EpollIoOpt.EPOLLRDHUP);
}

protected AbstractEpollStreamChannel(LinuxSocket fd, boolean active) {
super(null, fd, active);
// Add EPOLLRDHUP so we are notified once the remote peer close the connection.
flags |= Native.EPOLLRDHUP;
super(null, fd, active, EpollIoOpt.EPOLLRDHUP);
}

@Override
Expand Down Expand Up @@ -504,7 +501,7 @@ protected int doWriteSingle(ChannelOutboundBuffer in) throws Exception {
*/
private int doWriteMultiple(ChannelOutboundBuffer in) throws Exception {
final long maxBytesPerGatheringWrite = config().getMaxBytesPerGatheringWrite();
IovArray array = ((EpollEventLoop) eventLoop()).cleanIovArray();
IovArray array = registration().ioHandler().cleanIovArray();
array.maxBytes(maxBytesPerGatheringWrite);
in.forEachFlushedMessage(array);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,21 +197,19 @@ public EpollMode getEpollMode() {
public EpollChannelConfig setEpollMode(EpollMode mode) {
ObjectUtil.checkNotNull(mode, "mode");

try {
switch (mode) {
AbstractEpollChannel epollChannel = (AbstractEpollChannel) channel;
EpollIoOpt initial = epollChannel.initialOpts;
checkChannelNotRegistered();
switch (mode) {
case EDGE_TRIGGERED:
checkChannelNotRegistered();
((AbstractEpollChannel) channel).setFlag(Native.EPOLLET);
epollChannel.initialOpts = initial.with(EpollIoOpt.EPOLLET);
break;
case LEVEL_TRIGGERED:
checkChannelNotRegistered();
((AbstractEpollChannel) channel).clearFlag(Native.EPOLLET);
epollChannel.initialOpts = initial.without(EpollIoOpt.EPOLLET);
normanmaurer marked this conversation as resolved.
Show resolved Hide resolved
break;
default:
throw new Error();
}
} catch (IOException e) {
throw new ChannelException(e);
}
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public EpollDatagramChannel(int fd) {
}

private EpollDatagramChannel(LinuxSocket fd, boolean active) {
super(null, fd, active);
super(null, fd, active, EpollIoOpt.valueOf(0));
config = new EpollDatagramChannelConfig(this);
}

Expand Down Expand Up @@ -774,6 +774,6 @@ private boolean scatteringRead(EpollRecvByteAllocatorHandle allocHandle, NativeD
}

private NativeDatagramPacketArray cleanDatagramPacketArray() {
return ((EpollEventLoop) eventLoop()).cleanDatagramPacketArray();
return registration().ioHandler().cleanDatagramPacketArray();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public EpollDomainDatagramChannel(int fd) {
}

private EpollDomainDatagramChannel(LinuxSocket socket, boolean active) {
super(null, socket, active);
super(null, socket, active, EpollIoOpt.valueOf(0));
config = new EpollDomainDatagramChannelConfig(this);
}

Expand Down Expand Up @@ -183,7 +183,7 @@ private boolean doWriteMessage(Object msg) throws Exception {
remoteAddress.path().getBytes(CharsetUtil.UTF_8));
}
} else if (data.nioBufferCount() > 1) {
IovArray array = ((EpollEventLoop) eventLoop()).cleanIovArray();
IovArray array = registration().ioHandler().cleanIovArray();
array.add(data, data.readerIndex(), data.readableBytes());
int cnt = array.count();
assert cnt != 0;
Expand Down