Skip to content

Commit

Permalink
Introduce IoEvent to prepare support for io_uring (#14024)
Browse files Browse the repository at this point in the history
Motiviation:

To support things like io_uring we need to support different things like
IoOps as a result for notifications. Like for example you would issue a
connect via the IoRegistration when using io_uring but receive some
specific event on the completion queue once it was done.

Modification:

- Add new IoEvent interface
- Change IoHandle.handle(...) signature to use IoEvent as a param
- Rewrite existing implementations to use new signature.

Result:

More flexible API which can support also things like io_uring
  • Loading branch information
normanmaurer committed May 3, 2024
1 parent c390810 commit 7ed972d
Show file tree
Hide file tree
Showing 37 changed files with 535 additions and 245 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1478,7 +1478,7 @@
<item>
<ignore>true</ignore>
<code>java.method.finalMethodAddedToNonFinalClass</code>
<new>method io.netty.util.concurrent.Future&lt;io.netty.channel.IoRegistration&gt; io.netty.channel.SingleThreadIoEventLoop::register(io.netty.channel.IoHandle, io.netty.channel.IoOps) @ io.netty.channel.epoll.EpollEventLoop</new>
<new>method io.netty.util.concurrent.Future&lt;io.netty.channel.IoRegistration&gt; io.netty.channel.SingleThreadIoEventLoop::register(io.netty.channel.IoHandle) @ io.netty.channel.epoll.EpollEventLoop</new>
</item>

<item>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@
import io.netty.channel.ChannelPromise;
import io.netty.channel.ConnectTimeoutException;
import io.netty.channel.EventLoop;
import io.netty.channel.IoEvent;
import io.netty.channel.IoEventLoop;
import io.netty.channel.IoOps;
import io.netty.channel.IoRegistration;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.socket.ChannelInputShutdownEvent;
Expand Down Expand Up @@ -75,7 +75,8 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
private EpollIoRegistration registration;
boolean inputClosedSeenErrorOnRead;
boolean epollInReadyRunnablePending;
volatile EpollIoOps initialOps;
private EpollIoOps ops;
private EpollIoOps inital;
protected volatile boolean active;

AbstractEpollChannel(Channel parent, LinuxSocket fd, boolean active, EpollIoOps initialOps) {
Expand All @@ -88,7 +89,7 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
this.local = fd.localAddress();
this.remote = fd.remoteAddress();
}
this.initialOps = initialOps;
this.ops = initialOps;
}

AbstractEpollChannel(Channel parent, LinuxSocket fd, SocketAddress remote, EpollIoOps initialOps) {
Expand All @@ -99,7 +100,15 @@ abstract class AbstractEpollChannel extends AbstractChannel implements UnixChann
// See https://github.com/netty/netty/issues/2359
this.remote = remote;
this.local = fd.localAddress();
this.initialOps = initialOps;
this.ops = initialOps;
}

void add(EpollIoOps add) {
ops = ops.with(add);
}

void remove(EpollIoOps remove) {
ops = ops.without(remove);
}

static boolean isSoErrorZero(Socket fd) {
Expand All @@ -111,18 +120,31 @@ static boolean isSoErrorZero(Socket fd) {
}

protected void setFlag(int flag) throws IOException {
ops = ops.with(EpollIoOps.valueOf(flag));
if (isRegistered()) {
EpollIoRegistration registration = registration();
registration.updateInterestOps(registration.interestOps().with(EpollIoOps.valueOf(flag)));
try {
registration.submit(ops);
} catch (IOException e) {
throw e;
} catch (Exception e) {
throw new IllegalStateException(e);
}
} else {
initialOps = initialOps.with(EpollIoOps.valueOf(flag));
ops = ops.with(EpollIoOps.valueOf(flag));
}
}

void clearFlag(int flag) throws IOException {
EpollIoRegistration registration = registration();
registration.updateInterestOps(
registration.interestOps().without(EpollIoOps.valueOf(flag)));
ops = ops.without(EpollIoOps.valueOf(flag));
try {
registration.submit(ops);
} catch (IOException e) {
throw e;
} catch (Exception e) {
throw new IllegalStateException(e);
}
}

protected final EpollIoRegistration registration() {
Expand All @@ -131,7 +153,7 @@ protected final EpollIoRegistration registration() {
}

boolean isFlagSet(int flag) {
return (registration().interestOps().value & flag) != 0;
return (ops.value & flag) != 0;
}

@Override
Expand Down Expand Up @@ -217,6 +239,7 @@ public boolean isOpen() {
protected void doDeregister() throws Exception {
EpollIoRegistration registration = this.registration;
if (registration != null) {
ops = inital;
registration.cancel();
}
}
Expand Down Expand Up @@ -278,7 +301,7 @@ public void run() {
} else {
// The EventLoop is not registered atm so just update the flags so the correct value
// will be used once the channel is registered
initialOps = initialOps.without(EpollIoOps.EPOLLIN);
ops = ops.without(EpollIoOps.EPOLLIN);
}
}

Expand All @@ -288,9 +311,11 @@ protected void doRegister(ChannelPromise promise) {
// make sure the epollInReadyRunnablePending variable is reset so we will be able to execute the Runnable on the
// new EventLoop.
epollInReadyRunnablePending = false;
((IoEventLoop) eventLoop()).register((AbstractEpollUnsafe) unsafe(), initialOps).addListener(f -> {
((IoEventLoop) eventLoop()).register((AbstractEpollUnsafe) unsafe()).addListener(f -> {
if (f.isSuccess()) {
registration = (EpollIoRegistration) f.getNow();
registration.submit(ops);
inital = ops;
promise.setSuccess();
} else {
promise.setFailure(f.cause());
Expand Down Expand Up @@ -451,8 +476,9 @@ public void close() {
}

@Override
public void handle(IoRegistration registration, IoOps readyOps) {
EpollIoOps epollOps = (EpollIoOps) readyOps;
public void handle(IoRegistration registration, IoEvent event) {
EpollIoEvent epollEvent = (EpollIoEvent) event;
EpollIoOps epollOps = epollEvent.ops();

// Don't change the ordering of processing EPOLLOUT | EPOLLRDHUP / EPOLLIN if you're not 100%
// sure about it!
Expand Down Expand Up @@ -639,9 +665,10 @@ protected final void clearEpollIn0() {
assert eventLoop().inEventLoop();
try {
readPending = false;
ops = ops.without(EpollIoOps.EPOLLIN);
EpollIoRegistration registration = registration();
registration.updateInterestOps(registration.interestOps().without(EpollIoOps.EPOLLIN));
} catch (IOException e) {
registration.submit(ops);
} catch (Exception e) {
// When this happens there is something completely wrong with either the filedescriptor or epoll,
// so fire the exception through the pipeline and close the Channel.
pipeline().fireExceptionCaught(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,14 +198,13 @@ public EpollChannelConfig setEpollMode(EpollMode mode) {
ObjectUtil.checkNotNull(mode, "mode");

AbstractEpollChannel epollChannel = (AbstractEpollChannel) channel;
EpollIoOps initial = epollChannel.initialOps;
checkChannelNotRegistered();
switch (mode) {
case EDGE_TRIGGERED:
epollChannel.initialOps = initial.with(EpollIoOps.EPOLLET);
epollChannel.add(EpollIoOps.EPOLLET);
break;
case LEVEL_TRIGGERED:
epollChannel.initialOps = initial.without(EpollIoOps.EPOLLET);
epollChannel.remove(EpollIoOps.EPOLLET);
break;
default:
throw new Error();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2024 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.channel.epoll;

import io.netty.channel.IoEvent;

/**
* {@link IoEvent} that must be handled by the {@link EpollIoHandle}.
*/
public interface EpollIoEvent extends IoEvent {

/**
* Returns the {@link EpollIoOps} which did trigger the {@link EpollIoEvent}.
*
* @return ops.
*/
EpollIoOps ops();
}
Original file line number Diff line number Diff line change
Expand Up @@ -260,34 +260,26 @@ private final class DefaultEpollIoRegistration extends AtomicBoolean implements
private final IoEventLoop eventLoop;
final EpollIoHandle handle;

private volatile EpollIoOps currentOps;

DefaultEpollIoRegistration(IoEventLoop eventLoop, EpollIoHandle handle, EpollIoOps initialOps) {
DefaultEpollIoRegistration(IoEventLoop eventLoop, EpollIoHandle handle) {
this.eventLoop = eventLoop;
this.handle = handle;
this.currentOps = initialOps;
}

@Override
public void updateInterestOps(EpollIoOps ops) throws IOException {
currentOps = ops;
public void submit(IoOps ops) throws Exception {
EpollIoOps epollIoOps = cast(ops);
try {
if (!isValid()) {
return;
}
Native.epollCtlMod(epollFd.intValue(), handle.fd().intValue(), ops.value);
Native.epollCtlMod(epollFd.intValue(), handle.fd().intValue(), epollIoOps.value);
} catch (IOException e) {
throw e;
} catch (Exception e) {
throw new IOException(e);
}
}

@Override
public EpollIoOps interestOps() {
return currentOps;
}

@Override
public EpollIoHandler ioHandler() {
return EpollIoHandler.this;
Expand Down Expand Up @@ -342,19 +334,17 @@ void close() {
}

void handle(long ev) {
handle.handle(this, EpollIoOps.valueOf((int) ev));
handle.handle(this, EpollIoOps.eventOf((int) ev));
}
}

@Override
public EpollIoRegistration register(IoEventLoop eventLoop, IoHandle handle,
IoOps initialOps)
public EpollIoRegistration register(IoEventLoop eventLoop, IoHandle handle)
throws Exception {
final EpollIoHandle epollHandle = cast(handle);
EpollIoOps ops = cast(initialOps);
DefaultEpollIoRegistration registration = new DefaultEpollIoRegistration(eventLoop, epollHandle, ops);
DefaultEpollIoRegistration registration = new DefaultEpollIoRegistration(eventLoop, epollHandle);
int fd = epollHandle.fd().intValue();
Native.epollCtlAdd(epollFd.intValue(), fd, registration.interestOps().value);
Native.epollCtlAdd(epollFd.intValue(), fd, EpollIoOps.EPOLLERR.value);
DefaultEpollIoRegistration old = registrations.put(fd, registration);

// We either expect to have no registration in the map with the same FD or that the FD of the old registration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ public final class EpollIoOps implements IoOps {
}

/**
* Interested in IO events that should be handled by accepting new connections
* Interested in IO events which tell that the underlying channel is writable again or a connection
* attempt can be continued.
*/
public static final EpollIoOps EPOLLOUT = new EpollIoOps(Native.EPOLLOUT);

Expand All @@ -39,7 +40,7 @@ public final class EpollIoOps implements IoOps {
public static final EpollIoOps EPOLLIN = new EpollIoOps(Native.EPOLLIN);

/**
* Interested in IO events which tell that the underlying channel is writable again.
* Error condition happened on the associated file descriptor.
*/
public static final EpollIoOps EPOLLERR = new EpollIoOps(Native.EPOLLERR);

Expand All @@ -51,21 +52,20 @@ public final class EpollIoOps implements IoOps {
public static final EpollIoOps EPOLLET = new EpollIoOps(Native.EPOLLET);

// Just use an array to store often used values.
private static final EpollIoOps[] OPS;
private static final EpollIoEvent[] EVENTS;

static {
EpollIoOps all = new EpollIoOps(
EPOLLOUT.value | EPOLLIN.value | EPOLLERR.value | EPOLLRDHUP.value);
OPS = new EpollIoOps[all.value + 1];
addToArray(OPS, EPOLLOUT);
addToArray(OPS, EPOLLIN);
addToArray(OPS, EPOLLERR);
addToArray(OPS, EPOLLRDHUP);
addToArray(OPS, all);
EpollIoOps all = new EpollIoOps(EPOLLOUT.value | EPOLLIN.value | EPOLLERR.value | EPOLLRDHUP.value);
EVENTS = new EpollIoEvent[all.value + 1];
addToArray(EVENTS, EPOLLOUT);
addToArray(EVENTS, EPOLLIN);
addToArray(EVENTS, EPOLLERR);
addToArray(EVENTS, EPOLLRDHUP);
addToArray(EVENTS, all);
}

private static void addToArray(EpollIoOps[] array, EpollIoOps ops) {
array[ops.value] = ops;
private static void addToArray(EpollIoEvent[] array, EpollIoOps ops) {
array[ops.value] = new DefaultEpollIoEvent(ops);
}

final int value;
Expand Down Expand Up @@ -142,15 +142,60 @@ public int hashCode() {
* @return the {@link EpollIoOps}.
*/
public static EpollIoOps valueOf(int value) {
final EpollIoOps ops;
if (value > 0 && value < OPS.length) {
ops = OPS[value];
if (ops != null) {
return ops;
return eventOf(value).ops();
}

@Override
public String toString() {
return "EpollIoOps{" +
"value=" + value +
'}';
}

static EpollIoEvent eventOf(int value) {
if (value > 0 && value < EVENTS.length) {
EpollIoEvent event = EVENTS[value];
if (event != null) {
return event;
}
} else if (value == EPOLLET.value) {
return EPOLLET;
}
return new EpollIoOps(value);
return new DefaultEpollIoEvent(new EpollIoOps(value));
}

private static final class DefaultEpollIoEvent implements EpollIoEvent {
private final EpollIoOps ops;

DefaultEpollIoEvent(EpollIoOps ops) {
this.ops = ops;
}

@Override
public EpollIoOps ops() {
return ops;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
EpollIoEvent event = (EpollIoEvent) o;
return event.ops().equals(ops());
}

@Override
public int hashCode() {
return ops().hashCode();
}

@Override
public String toString() {
return "DefaultEpollIoEvent{" +
"ops=" + ops +
'}';
}
}
}

0 comments on commit 7ed972d

Please sign in to comment.