From 6e3b3fbf09dfa43f70c1c082f25c10b66b7d277f Mon Sep 17 00:00:00 2001 From: Chris Vest Date: Fri, 6 Aug 2021 15:18:48 +0200 Subject: [PATCH 01/12] Add `connectx(2)` to BsdSocket --- .../src/main/c/netty_kqueue_bsdsocket.c | 72 ++++++++++++++- .../io/netty/channel/kqueue/BsdSocket.java | 88 ++++++++++++++++++- 2 files changed, 158 insertions(+), 2 deletions(-) diff --git a/transport-native-kqueue/src/main/c/netty_kqueue_bsdsocket.c b/transport-native-kqueue/src/main/c/netty_kqueue_bsdsocket.c index 5f021d02605..8d8bf61eef2 100644 --- a/transport-native-kqueue/src/main/c/netty_kqueue_bsdsocket.c +++ b/transport-native-kqueue/src/main/c/netty_kqueue_bsdsocket.c @@ -83,6 +83,57 @@ static jlong netty_kqueue_bsdsocket_sendFile(JNIEnv* env, jclass clazz, jint soc return res < 0 ? -err : 0; } +static jint netty_kqueue_bsdsocket_connectx(JNIEnv* env, + jint socketFd, + jint socketInterface, + jboolean sourceIPv6, jbyteArray sourceAddress, jint sourceScopeId, jint sourcePort, + jboolean destinationIPv6, jbyteArray destinationAddress, jint destinationScopeId, jint destinationPort, + jint flags, + jlong iovAddress, jint iovCount, jint iovDataLength) { +#ifdef __APPLE__ // connectx(2) is only defined on Darwin. + sa_endpoints_t endpoints; + endpoints.sae_srcif = (unsigned int) socketInterface; + endpoints.sae_srcaddr = NULL; + endpoints.sae_srcaddrlen = 0; + endpoints.sae_dstaddr = NULL; + endpoints.sae_dstaddrlen = 0; + + struct sockaddr_storage srcaddr; + socklen_t srcaddrlen; + struct sockaddr_storage dstaddr; + socklen_t dstaddrlen; + + if (NULL != sourceAddress) { + if (-1 == netty_unix_socket_initSockaddr(env, + sourceIPv6, sourceAddress, sourceScopeId, sourcePort, &srcaddr, &srcaddrlen)) { + return -1; + } + endpoints.sae_srcaddr = (const struct sockaddr*) &srcaddr; + endpoints.sae_srcaddrlen = srcaddrlen; + } + if (NULL != destinationAddress) { + if (-1 == netty_unix_socket_initSockaddr(env, + destinationIPv6, destinationAddress, destinationScopeId, destinationPort, &dstaddr, &dstaddrlen)) { + return -1; + } + endpoints.sae_dstaddr = (const struct sockaddr*) &dstaddr; + endpoints.sae_dstaddrlen = dstaddrlen; + } + + int socket = (int) socketFd; + const struct iovec* iov = (const struct iovec*) iovAddress; + unsigned int iovcnt = (unsigned int) iovCount; + size_t len = (size_t) iovDataLength; + int result = connectx(socket, &endpoints, SAE_ASSOCID_ANY, flags, iov, iovcnt, &len, NULL); + if (result == -1) { + return -errno; + } + return (jint) len; +#else + return -ENOSYS; +#endif +} + static void netty_kqueue_bsdsocket_setAcceptFilter(JNIEnv* env, jclass clazz, jint fd, jstring afName, jstring afArg) { #ifdef SO_ACCEPTFILTER struct accept_filter_arg af; @@ -187,6 +238,22 @@ static jobject netty_kqueue_bsdsocket_getPeerCredentials(JNIEnv *env, jclass cla return (*env)->NewObject(env, peerCredentialsClass, peerCredentialsMethodId, pid, credentials.cr_uid, gids); } + +static jint netty_kqueue_bsdsocket_connectResumeOnReadWrite(JNIEnv *env) { +#ifdef CONNECT_RESUME_ON_READ_WRITE + return CONNECT_RESUME_ON_READ_WRITE; +#else + return 0; +#endif +} + +static jint netty_kqueue_bsdsocket_connectDataIdempotent(JNIEnv *env) { +#ifdef CONNECT_DATA_IDEMPOTENT + return CONNECT_DATA_IDEMPOTENT; +#else + return 0; +#endif +} // JNI Registered Methods End // JNI Method Registration Table Begin @@ -196,7 +263,10 @@ static const JNINativeMethod fixed_method_table[] = { { "setSndLowAt", "(II)V", (void *) netty_kqueue_bsdsocket_setSndLowAt }, { "getAcceptFilter", "(I)[Ljava/lang/String;", (void *) netty_kqueue_bsdsocket_getAcceptFilter }, { "getTcpNoPush", "(I)I", (void *) netty_kqueue_bsdsocket_getTcpNoPush }, - { "getSndLowAt", "(I)I", (void *) netty_kqueue_bsdsocket_getSndLowAt } + { "getSndLowAt", "(I)I", (void *) netty_kqueue_bsdsocket_getSndLowAt }, + { "connectx", "(IIZ[BIIZ[BIIIJII)I", (void *) netty_kqueue_bsdsocket_connectx }, + { "connectResumeOnReadWrite", "()I", (void *) netty_kqueue_bsdsocket_connectResumeOnReadWrite }, + { "connectDataIdempotent", "()I", (void *) netty_kqueue_bsdsocket_connectDataIdempotent } }; static const jint fixed_method_table_size = sizeof(fixed_method_table) / sizeof(fixed_method_table[0]); diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/BsdSocket.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/BsdSocket.java index 40658827354..b5dbb4dbc43 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/BsdSocket.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/BsdSocket.java @@ -16,13 +16,18 @@ package io.netty.channel.kqueue; import io.netty.channel.DefaultFileRegion; +import io.netty.channel.unix.IovArray; import io.netty.channel.unix.PeerCredentials; import io.netty.channel.unix.Socket; import java.io.IOException; +import java.net.Inet6Address; +import java.net.InetAddress; +import java.net.InetSocketAddress; import static io.netty.channel.kqueue.AcceptFilter.PLATFORM_UNSUPPORTED; import static io.netty.channel.unix.Errors.ioResult; +import static io.netty.channel.unix.NativeInetAddress.ipv4MappedIpv6Address; /** * A socket which provides access BSD native methods. @@ -51,7 +56,7 @@ void setSndLowAt(int lowAt) throws IOException { setSndLowAt(intValue(), lowAt); } - boolean isTcpNoPush() throws IOException { + boolean isTcpNoPush() throws IOException { return getTcpNoPush(intValue()) != 0; } @@ -80,6 +85,70 @@ long sendFile(DefaultFileRegion src, long baseOffset, long offset, long length) return ioResult("sendfile", (int) res); } + int connectx(InetSocketAddress source, InetSocketAddress destination, IovArray data, boolean tcpFastOpen) + throws IOException { + int flags = tcpFastOpen ? connectResumeOnReadWrite() | connectDataIdempotent() : 0; + return connectx(source, destination, data, flags); + } + + int connectx(InetSocketAddress source, InetSocketAddress destination, IovArray data, int flags) throws IOException { + int sourceInterface = 0; + + InetAddress sourceInetAddress = source.getAddress(); + boolean sourceIPv6 = sourceInetAddress instanceof Inet6Address; + byte[] sourceAddress; + int sourceScopeId; + if (sourceIPv6) { + sourceAddress = sourceInetAddress.getAddress(); + sourceScopeId = ((Inet6Address) sourceInetAddress).getScopeId(); + } else { + // convert to ipv4 mapped ipv6 address; + sourceScopeId = 0; + sourceAddress = ipv4MappedIpv6Address(sourceInetAddress.getAddress()); + } + int sourcePort = source.getPort(); + + InetAddress destinationInetAddress = destination.getAddress(); + boolean destinationIPv6 = destinationInetAddress instanceof Inet6Address; + byte[] destinationAddress; + int destinationScopeId; + if (destinationIPv6) { + destinationAddress = destinationInetAddress.getAddress(); + destinationScopeId = ((Inet6Address) destinationInetAddress).getScopeId(); + } else { + // convert to ipv4 mapped ipv6 address; + destinationScopeId = 0; + destinationAddress = ipv4MappedIpv6Address(destinationInetAddress.getAddress()); + } + int destinationPort = source.getPort(); + + long iovAddress; + int iovCount; + int iovDataLength; + if (data == null || data.count() == 0) { + iovAddress = 0; + iovCount = 0; + iovDataLength = 0; + } else { + iovAddress = data.memoryAddress(0); + iovCount = data.count(); + long size = data.size(); + if (size > Integer.MAX_VALUE) { + throw new IOException("IovArray.size() too big: " + size + " bytes."); + } + iovDataLength = (int) size; + } + + int result = connectx(intValue(), + sourceInterface, sourceIPv6, sourceAddress, sourceScopeId, sourcePort, + destinationIPv6, destinationAddress, destinationScopeId, destinationPort, + flags, iovAddress, iovCount, iovDataLength); + if (result < 0) { + return ioResult("connectx", -result); + } + return result; + } + public static BsdSocket newSocketStream() { return new BsdSocket(newSocketStream0()); } @@ -99,6 +168,21 @@ public static BsdSocket newSocketDomainDgram() { private static native long sendFile(int socketFd, DefaultFileRegion src, long baseOffset, long offset, long length) throws IOException; + /** + * @return If successful, zero or positive number of bytes transfered, otherwise negative errno. + */ + private static native int connectx( + int socketFd, + // sa_endpoints_t *endpoints: + int sourceInterface, + boolean sourceIPv6, byte[] sourceAddress, int sourceScopeId, int sourcePort, + boolean destinationIPv6, byte[] destinationAddress, int destinationScopeId, int destinationPort, + // sae_associd_t associd is reserved + int flags, + long iovAddress, int iovCount, int iovDataLength + // sae_connid_t *connid is reserved + ); + private static native String[] getAcceptFilter(int fd) throws IOException; private static native int getTcpNoPush(int fd) throws IOException; private static native int getSndLowAt(int fd) throws IOException; @@ -107,4 +191,6 @@ private static native long sendFile(int socketFd, DefaultFileRegion src, long ba private static native void setAcceptFilter(int fd, String filterName, String filterArgs) throws IOException; private static native void setTcpNoPush(int fd, int tcpNoPush) throws IOException; private static native void setSndLowAt(int fd, int lowAt) throws IOException; + private static native int connectResumeOnReadWrite(); + private static native int connectDataIdempotent(); } From dda2e0381af61774560f352bbd24b3918f28fb5d Mon Sep 17 00:00:00 2001 From: Chris Vest Date: Fri, 6 Aug 2021 16:10:16 +0200 Subject: [PATCH 02/12] Draft of adding client-side support for TCP FastOpen to the MacOS KQueue transport --- .../channel/kqueue/KQueueSocketChannel.java | 31 +++++++++++++++++++ .../kqueue/KQueueSocketChannelConfig.java | 21 +++++++++++++ 2 files changed, 52 insertions(+) diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueSocketChannel.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueSocketChannel.java index f001478d4fb..060c605e439 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueSocketChannel.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueSocketChannel.java @@ -15,13 +15,17 @@ */ package io.netty.channel.kqueue; +import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; +import io.netty.channel.ChannelOutboundBuffer; import io.netty.channel.socket.ServerSocketChannel; import io.netty.channel.socket.SocketChannel; +import io.netty.channel.unix.IovArray; import io.netty.util.concurrent.GlobalEventExecutor; import io.netty.util.internal.UnstableApi; import java.net.InetSocketAddress; +import java.net.SocketAddress; import java.util.concurrent.Executor; @UnstableApi @@ -63,6 +67,33 @@ public ServerSocketChannel parent() { return (ServerSocketChannel) super.parent(); } + @Override + protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception { + if (config.isTcpFastOpenConnect()) { + ChannelOutboundBuffer outbound = unsafe().outboundBuffer(); + outbound.addFlush(); + Object curr; + if ((curr = outbound.current()) instanceof ByteBuf) { + ByteBuf initialData = (ByteBuf) curr; + if (initialData.isReadable()) { + IovArray iov = new IovArray(config.getAllocator().directBuffer()); + try { + iov.add(initialData, initialData.readerIndex(), initialData.readableBytes()); + int bytesSent = socket.connectx( + (InetSocketAddress) localAddress, (InetSocketAddress) remoteAddress, iov, true); + if (bytesSent > 0) { + outbound.removeBytes(bytesSent); + return true; + } + } finally { + iov.release(); + } + } + } + } + return super.doConnect(remoteAddress, localAddress); + } + @Override protected AbstractKQueueUnsafe newUnsafe() { return new KQueueSocketChannelUnsafe(); diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueSocketChannelConfig.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueSocketChannelConfig.java index 590abe84166..b74338e99a3 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueSocketChannelConfig.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueSocketChannelConfig.java @@ -42,6 +42,7 @@ @UnstableApi public final class KQueueSocketChannelConfig extends KQueueChannelConfig implements SocketChannelConfig { private volatile boolean allowHalfClosure; + private volatile boolean tcpFastopen; KQueueSocketChannelConfig(KQueueSocketChannel channel) { super(channel); @@ -92,6 +93,9 @@ public T getOption(ChannelOption option) { if (option == TCP_NOPUSH) { return (T) Boolean.valueOf(isTcpNoPush()); } + if (option == ChannelOption.TCP_FASTOPEN_CONNECT) { + return (T) Boolean.valueOf(isTcpFastOpenConnect()); + } return super.getOption(option); } @@ -119,6 +123,8 @@ public boolean setOption(ChannelOption option, T value) { setSndLowAt((Integer) value); } else if (option == TCP_NOPUSH) { setTcpNoPush((Boolean) value); + } else if (option == ChannelOption.TCP_FASTOPEN_CONNECT) { + setTcpFastOpenConnect((Boolean) value); } else { return super.setOption(option, value); } @@ -297,6 +303,21 @@ public boolean isAllowHalfClosure() { return allowHalfClosure; } + /** + * Enables client TCP fast open, if available. + */ + public KQueueSocketChannelConfig setTcpFastOpenConnect(boolean fastOpenConnect) { + tcpFastopen = fastOpenConnect; + return this; + } + + /** + * Returns {@code true} if TCP fast open is enabled, {@code false} otherwise. + */ + public boolean isTcpFastOpenConnect() { + return tcpFastopen; + } + @Override public KQueueSocketChannelConfig setRcvAllocTransportProvidesGuess(boolean transportProvidesGuess) { super.setRcvAllocTransportProvidesGuess(transportProvidesGuess); From 3a092eacf69706acdd691007f9897e5a2af82d9a Mon Sep 17 00:00:00 2001 From: Chris Vest Date: Mon, 9 Aug 2021 11:46:43 +0200 Subject: [PATCH 03/12] Make one of the BsdSocket.connectx private --- .../src/main/java/io/netty/channel/kqueue/BsdSocket.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/BsdSocket.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/BsdSocket.java index b5dbb4dbc43..afa48df10d5 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/BsdSocket.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/BsdSocket.java @@ -91,7 +91,8 @@ int connectx(InetSocketAddress source, InetSocketAddress destination, IovArray d return connectx(source, destination, data, flags); } - int connectx(InetSocketAddress source, InetSocketAddress destination, IovArray data, int flags) throws IOException { + private int connectx(InetSocketAddress source, InetSocketAddress destination, IovArray data, int flags) + throws IOException { int sourceInterface = 0; InetAddress sourceInetAddress = source.getAddress(); From 1f111b4c49e5b2c158345c04e47a4698a41a8e2f Mon Sep 17 00:00:00 2001 From: Chris Vest Date: Mon, 9 Aug 2021 14:35:37 +0200 Subject: [PATCH 04/12] Cache the CONNECT_RESUME_ON_READ_WRITE and CONNECT_DATA_IDEMPOTENT values as class constants in Native --- .../src/main/c/netty_kqueue_bsdsocket.c | 20 +--------------- .../src/main/c/netty_kqueue_native.c | 24 ++++++++++++++++++- .../io/netty/channel/kqueue/BsdSocket.java | 5 ++-- .../KQueueStaticallyReferencedJniMethods.java | 4 ++++ .../java/io/netty/channel/kqueue/Native.java | 7 ++++++ 5 files changed, 37 insertions(+), 23 deletions(-) diff --git a/transport-native-kqueue/src/main/c/netty_kqueue_bsdsocket.c b/transport-native-kqueue/src/main/c/netty_kqueue_bsdsocket.c index 8d8bf61eef2..e1a6c14fc51 100644 --- a/transport-native-kqueue/src/main/c/netty_kqueue_bsdsocket.c +++ b/transport-native-kqueue/src/main/c/netty_kqueue_bsdsocket.c @@ -238,22 +238,6 @@ static jobject netty_kqueue_bsdsocket_getPeerCredentials(JNIEnv *env, jclass cla return (*env)->NewObject(env, peerCredentialsClass, peerCredentialsMethodId, pid, credentials.cr_uid, gids); } - -static jint netty_kqueue_bsdsocket_connectResumeOnReadWrite(JNIEnv *env) { -#ifdef CONNECT_RESUME_ON_READ_WRITE - return CONNECT_RESUME_ON_READ_WRITE; -#else - return 0; -#endif -} - -static jint netty_kqueue_bsdsocket_connectDataIdempotent(JNIEnv *env) { -#ifdef CONNECT_DATA_IDEMPOTENT - return CONNECT_DATA_IDEMPOTENT; -#else - return 0; -#endif -} // JNI Registered Methods End // JNI Method Registration Table Begin @@ -264,9 +248,7 @@ static const JNINativeMethod fixed_method_table[] = { { "getAcceptFilter", "(I)[Ljava/lang/String;", (void *) netty_kqueue_bsdsocket_getAcceptFilter }, { "getTcpNoPush", "(I)I", (void *) netty_kqueue_bsdsocket_getTcpNoPush }, { "getSndLowAt", "(I)I", (void *) netty_kqueue_bsdsocket_getSndLowAt }, - { "connectx", "(IIZ[BIIZ[BIIIJII)I", (void *) netty_kqueue_bsdsocket_connectx }, - { "connectResumeOnReadWrite", "()I", (void *) netty_kqueue_bsdsocket_connectResumeOnReadWrite }, - { "connectDataIdempotent", "()I", (void *) netty_kqueue_bsdsocket_connectDataIdempotent } + { "connectx", "(IIZ[BIIZ[BIIIJII)I", (void *) netty_kqueue_bsdsocket_connectx } }; static const jint fixed_method_table_size = sizeof(fixed_method_table) / sizeof(fixed_method_table[0]); diff --git a/transport-native-kqueue/src/main/c/netty_kqueue_native.c b/transport-native-kqueue/src/main/c/netty_kqueue_native.c index 3ac27a1e519..ceda160a2eb 100644 --- a/transport-native-kqueue/src/main/c/netty_kqueue_native.c +++ b/transport-native-kqueue/src/main/c/netty_kqueue_native.c @@ -60,6 +60,12 @@ #ifndef NOTE_DISCONNECTED #define NOTE_DISCONNECTED 0x00001000 #endif /* NOTE_DISCONNECTED */ +#ifndef CONNECT_RESUME_ON_READ_WRITE +#define CONNECT_RESUME_ON_READ_WRITE 0x1 +#endif /* CONNECT_RESUME_ON_READ_WRITE */ +#ifndef CONNECT_DATA_IDEMPOTENT +#define CONNECT_DATA_IDEMPOTENT 0x2 +#endif /* CONNECT_DATA_IDEMPOTENT */ #else #ifndef EVFILT_SOCK #define EVFILT_SOCK 0 // Disabled @@ -73,6 +79,12 @@ #ifndef NOTE_DISCONNECTED #define NOTE_DISCONNECTED 0 #endif /* NOTE_DISCONNECTED */ +#ifndef CONNECT_RESUME_ON_READ_WRITE +#define CONNECT_RESUME_ON_READ_WRITE 0 +#endif /* CONNECT_RESUME_ON_READ_WRITE */ +#ifndef CONNECT_DATA_IDEMPOTENT +#define CONNECT_DATA_IDEMPOTENT 0 +#endif /* CONNECT_DATA_IDEMPOTENT */ #endif /* __APPLE__ */ static clockid_t waitClockId = 0; // initialized by netty_unix_util_initialize_wait_clock @@ -247,6 +259,14 @@ static jshort netty_kqueue_native_noteDisconnected(JNIEnv* env, jclass clazz) { return NOTE_DISCONNECTED; } +static jint netty_kqueue_bsdsocket_connectResumeOnReadWrite(JNIEnv *env) { + return CONNECT_RESUME_ON_READ_WRITE; +} + +static jint netty_kqueue_bsdsocket_connectDataIdempotent(JNIEnv *env) { + return CONNECT_DATA_IDEMPOTENT; +} + // JNI Method Registration Table Begin static const JNINativeMethod statically_referenced_fixed_method_table[] = { { "evfiltRead", "()S", (void *) netty_kqueue_native_evfiltRead }, @@ -262,7 +282,9 @@ static const JNINativeMethod statically_referenced_fixed_method_table[] = { { "evError", "()S", (void *) netty_kqueue_native_evError }, { "noteReadClosed", "()S", (void *) netty_kqueue_native_noteReadClosed }, { "noteConnReset", "()S", (void *) netty_kqueue_native_noteConnReset }, - { "noteDisconnected", "()S", (void *) netty_kqueue_native_noteDisconnected } + { "noteDisconnected", "()S", (void *) netty_kqueue_native_noteDisconnected }, + { "connectResumeOnReadWrite", "()I", (void *) netty_kqueue_bsdsocket_connectResumeOnReadWrite }, + { "connectDataIdempotent", "()I", (void *) netty_kqueue_bsdsocket_connectDataIdempotent } }; static const jint statically_referenced_fixed_method_table_size = sizeof(statically_referenced_fixed_method_table) / sizeof(statically_referenced_fixed_method_table[0]); static const JNINativeMethod fixed_method_table[] = { diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/BsdSocket.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/BsdSocket.java index afa48df10d5..3b5c2dfd648 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/BsdSocket.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/BsdSocket.java @@ -26,6 +26,7 @@ import java.net.InetSocketAddress; import static io.netty.channel.kqueue.AcceptFilter.PLATFORM_UNSUPPORTED; +import static io.netty.channel.kqueue.Native.CONNECT_TCP_FASTOPEN; import static io.netty.channel.unix.Errors.ioResult; import static io.netty.channel.unix.NativeInetAddress.ipv4MappedIpv6Address; @@ -87,7 +88,7 @@ long sendFile(DefaultFileRegion src, long baseOffset, long offset, long length) int connectx(InetSocketAddress source, InetSocketAddress destination, IovArray data, boolean tcpFastOpen) throws IOException { - int flags = tcpFastOpen ? connectResumeOnReadWrite() | connectDataIdempotent() : 0; + int flags = tcpFastOpen ? CONNECT_TCP_FASTOPEN : 0; return connectx(source, destination, data, flags); } @@ -192,6 +193,4 @@ private static native int connectx( private static native void setAcceptFilter(int fd, String filterName, String filterArgs) throws IOException; private static native void setTcpNoPush(int fd, int tcpNoPush) throws IOException; private static native void setSndLowAt(int fd, int lowAt) throws IOException; - private static native int connectResumeOnReadWrite(); - private static native int connectDataIdempotent(); } diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueStaticallyReferencedJniMethods.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueStaticallyReferencedJniMethods.java index 2ed7f53d43f..e6926d65faf 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueStaticallyReferencedJniMethods.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueStaticallyReferencedJniMethods.java @@ -46,4 +46,8 @@ private KQueueStaticallyReferencedJniMethods() { } static native short evfiltWrite(); static native short evfiltUser(); static native short evfiltSock(); + + // Flags for connectx(2) + static native int connectResumeOnReadWrite(); + static native int connectDataIdempotent(); } diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/Native.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/Native.java index ff3986f8261..3c087b06c26 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/Native.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/Native.java @@ -29,6 +29,8 @@ import java.io.IOException; import java.nio.channels.FileChannel; +import static io.netty.channel.kqueue.KQueueStaticallyReferencedJniMethods.connectDataIdempotent; +import static io.netty.channel.kqueue.KQueueStaticallyReferencedJniMethods.connectResumeOnReadWrite; import static io.netty.channel.kqueue.KQueueStaticallyReferencedJniMethods.evAdd; import static io.netty.channel.kqueue.KQueueStaticallyReferencedJniMethods.evClear; import static io.netty.channel.kqueue.KQueueStaticallyReferencedJniMethods.evDelete; @@ -104,6 +106,11 @@ public void run() { static final short EVFILT_USER = evfiltUser(); static final short EVFILT_SOCK = evfiltSock(); + // Flags for connectx(2) + static final int CONNECT_RESUME_ON_READ_WRITE = connectResumeOnReadWrite(); + static final int CONNECT_DATA_IDEMPOTENT = connectDataIdempotent(); + static final int CONNECT_TCP_FASTOPEN = CONNECT_RESUME_ON_READ_WRITE | CONNECT_DATA_IDEMPOTENT; + static FileDescriptor newKQueue() { return new FileDescriptor(kqueueCreate()); } From 1faac56ab3e5fc6904f8087cf665c9094ccb0a50 Mon Sep 17 00:00:00 2001 From: Chris Vest Date: Mon, 9 Aug 2021 14:41:48 +0200 Subject: [PATCH 05/12] Explain the "unspecified source interface" value used with connectx --- .../main/java/io/netty/channel/kqueue/BsdSocket.java | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/BsdSocket.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/BsdSocket.java index 3b5c2dfd648..25531455c0a 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/BsdSocket.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/BsdSocket.java @@ -40,6 +40,12 @@ final class BsdSocket extends Socket { private static final int APPLE_SND_LOW_AT_MAX = 1 << 17; private static final int FREEBSD_SND_LOW_AT_MAX = 1 << 15; static final int BSD_SND_LOW_AT_MAX = Math.min(APPLE_SND_LOW_AT_MAX, FREEBSD_SND_LOW_AT_MAX); + /** + * The `endpoints` structure passed to `connectx(2)` has an optional "source interface" field, + * which is the index of the network interface to use. + * According to `if_nametoindex(3)`, the value 0 is used when no interface is specified. + */ + private static final int UNSPECIFIED_SOURCE_INTERFACE = 0; BsdSocket(int fd) { super(fd); @@ -94,8 +100,6 @@ int connectx(InetSocketAddress source, InetSocketAddress destination, IovArray d private int connectx(InetSocketAddress source, InetSocketAddress destination, IovArray data, int flags) throws IOException { - int sourceInterface = 0; - InetAddress sourceInetAddress = source.getAddress(); boolean sourceIPv6 = sourceInetAddress instanceof Inet6Address; byte[] sourceAddress; @@ -142,7 +146,7 @@ private int connectx(InetSocketAddress source, InetSocketAddress destination, Io } int result = connectx(intValue(), - sourceInterface, sourceIPv6, sourceAddress, sourceScopeId, sourcePort, + UNSPECIFIED_SOURCE_INTERFACE, sourceIPv6, sourceAddress, sourceScopeId, sourcePort, destinationIPv6, destinationAddress, destinationScopeId, destinationPort, flags, iovAddress, iovCount, iovDataLength); if (result < 0) { From 8c4d471944f76f7b5a7abd050d4a454f10cb8ee8 Mon Sep 17 00:00:00 2001 From: Chris Vest Date: Mon, 9 Aug 2021 15:00:08 +0200 Subject: [PATCH 06/12] Improve BsdSocket.connectx null handling --- .../src/main/c/netty_kqueue_bsdsocket.c | 14 ++++----- .../io/netty/channel/kqueue/BsdSocket.java | 30 +++++++++++++------ 2 files changed, 28 insertions(+), 16 deletions(-) diff --git a/transport-native-kqueue/src/main/c/netty_kqueue_bsdsocket.c b/transport-native-kqueue/src/main/c/netty_kqueue_bsdsocket.c index e1a6c14fc51..de53a5c9614 100644 --- a/transport-native-kqueue/src/main/c/netty_kqueue_bsdsocket.c +++ b/transport-native-kqueue/src/main/c/netty_kqueue_bsdsocket.c @@ -111,14 +111,14 @@ static jint netty_kqueue_bsdsocket_connectx(JNIEnv* env, endpoints.sae_srcaddr = (const struct sockaddr*) &srcaddr; endpoints.sae_srcaddrlen = srcaddrlen; } - if (NULL != destinationAddress) { - if (-1 == netty_unix_socket_initSockaddr(env, - destinationIPv6, destinationAddress, destinationScopeId, destinationPort, &dstaddr, &dstaddrlen)) { - return -1; - } - endpoints.sae_dstaddr = (const struct sockaddr*) &dstaddr; - endpoints.sae_dstaddrlen = dstaddrlen; + + assert destinationAddress != NULL; // Java side will ensure destination is never null. + if (-1 == netty_unix_socket_initSockaddr(env, + destinationIPv6, destinationAddress, destinationScopeId, destinationPort, &dstaddr, &dstaddrlen)) { + return -1; } + endpoints.sae_dstaddr = (const struct sockaddr*) &dstaddr; + endpoints.sae_dstaddrlen = dstaddrlen; int socket = (int) socketFd; const struct iovec* iov = (const struct iovec*) iovAddress; diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/BsdSocket.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/BsdSocket.java index 25531455c0a..20267d9b09c 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/BsdSocket.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/BsdSocket.java @@ -29,6 +29,7 @@ import static io.netty.channel.kqueue.Native.CONNECT_TCP_FASTOPEN; import static io.netty.channel.unix.Errors.ioResult; import static io.netty.channel.unix.NativeInetAddress.ipv4MappedIpv6Address; +import static java.util.Objects.requireNonNull; /** * A socket which provides access BSD native methods. @@ -100,19 +101,30 @@ int connectx(InetSocketAddress source, InetSocketAddress destination, IovArray d private int connectx(InetSocketAddress source, InetSocketAddress destination, IovArray data, int flags) throws IOException { - InetAddress sourceInetAddress = source.getAddress(); - boolean sourceIPv6 = sourceInetAddress instanceof Inet6Address; + requireNonNull(destination, "Destination InetSocketAddress cannot be null."); + + boolean sourceIPv6; byte[] sourceAddress; int sourceScopeId; - if (sourceIPv6) { - sourceAddress = sourceInetAddress.getAddress(); - sourceScopeId = ((Inet6Address) sourceInetAddress).getScopeId(); - } else { - // convert to ipv4 mapped ipv6 address; + int sourcePort; + if (source == null) { + sourceIPv6 = false; + sourceAddress = null; sourceScopeId = 0; - sourceAddress = ipv4MappedIpv6Address(sourceInetAddress.getAddress()); + sourcePort = 0; + } else { + InetAddress sourceInetAddress = source.getAddress(); + sourceIPv6 = sourceInetAddress instanceof Inet6Address; + if (sourceIPv6) { + sourceAddress = sourceInetAddress.getAddress(); + sourceScopeId = ((Inet6Address) sourceInetAddress).getScopeId(); + } else { + // convert to ipv4 mapped ipv6 address; + sourceScopeId = 0; + sourceAddress = ipv4MappedIpv6Address(sourceInetAddress.getAddress()); + } + sourcePort = source.getPort(); } - int sourcePort = source.getPort(); InetAddress destinationInetAddress = destination.getAddress(); boolean destinationIPv6 = destinationInetAddress instanceof Inet6Address; From a84f5fc07472f7909f81ff46ca9bf2e20958fabf Mon Sep 17 00:00:00 2001 From: Chris Vest Date: Mon, 9 Aug 2021 15:19:03 +0200 Subject: [PATCH 07/12] Throw exceptions from BsdSocket.connectx if address conversion fails --- .../src/main/c/netty_kqueue_bsdsocket.c | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/transport-native-kqueue/src/main/c/netty_kqueue_bsdsocket.c b/transport-native-kqueue/src/main/c/netty_kqueue_bsdsocket.c index de53a5c9614..41998e798af 100644 --- a/transport-native-kqueue/src/main/c/netty_kqueue_bsdsocket.c +++ b/transport-native-kqueue/src/main/c/netty_kqueue_bsdsocket.c @@ -106,7 +106,9 @@ static jint netty_kqueue_bsdsocket_connectx(JNIEnv* env, if (NULL != sourceAddress) { if (-1 == netty_unix_socket_initSockaddr(env, sourceIPv6, sourceAddress, sourceScopeId, sourcePort, &srcaddr, &srcaddrlen)) { - return -1; + netty_unix_errors_throwIOException(env, + "Source address specified, but could not be converted to sockaddr."); + return -EINVAL; } endpoints.sae_srcaddr = (const struct sockaddr*) &srcaddr; endpoints.sae_srcaddrlen = srcaddrlen; @@ -115,7 +117,8 @@ static jint netty_kqueue_bsdsocket_connectx(JNIEnv* env, assert destinationAddress != NULL; // Java side will ensure destination is never null. if (-1 == netty_unix_socket_initSockaddr(env, destinationIPv6, destinationAddress, destinationScopeId, destinationPort, &dstaddr, &dstaddrlen)) { - return -1; + netty_unix_errors_throwIOException(env, "Destination address could not be converted to sockaddr."); + return -EINVAL; } endpoints.sae_dstaddr = (const struct sockaddr*) &dstaddr; endpoints.sae_dstaddrlen = dstaddrlen; From f25377969a1d1002349d595aad76be00df509c27 Mon Sep 17 00:00:00 2001 From: Chris Vest Date: Tue, 10 Aug 2021 11:06:10 +0200 Subject: [PATCH 08/12] Fix java compatibility error in BsdSocket --- .../src/main/java/io/netty/channel/kqueue/BsdSocket.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/BsdSocket.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/BsdSocket.java index 20267d9b09c..f4285e10bb0 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/BsdSocket.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/BsdSocket.java @@ -29,7 +29,7 @@ import static io.netty.channel.kqueue.Native.CONNECT_TCP_FASTOPEN; import static io.netty.channel.unix.Errors.ioResult; import static io.netty.channel.unix.NativeInetAddress.ipv4MappedIpv6Address; -import static java.util.Objects.requireNonNull; +import static io.netty.util.internal.ObjectUtil.checkNotNull; /** * A socket which provides access BSD native methods. @@ -101,7 +101,7 @@ int connectx(InetSocketAddress source, InetSocketAddress destination, IovArray d private int connectx(InetSocketAddress source, InetSocketAddress destination, IovArray data, int flags) throws IOException { - requireNonNull(destination, "Destination InetSocketAddress cannot be null."); + checkNotNull(destination, "Destination InetSocketAddress cannot be null."); boolean sourceIPv6; byte[] sourceAddress; From aeec6fa5d111aa44814228ac14c023ff531425f5 Mon Sep 17 00:00:00 2001 From: Chris Vest Date: Tue, 10 Aug 2021 13:41:09 +0200 Subject: [PATCH 09/12] Fix a number of issues with the KQueue TFO support --- transport-native-kqueue/src/main/c/netty_kqueue_bsdsocket.c | 5 +++-- .../src/main/java/io/netty/channel/kqueue/BsdSocket.java | 4 ++-- .../src/main/java/io/netty/channel/kqueue/Native.java | 4 ++-- 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/transport-native-kqueue/src/main/c/netty_kqueue_bsdsocket.c b/transport-native-kqueue/src/main/c/netty_kqueue_bsdsocket.c index 41998e798af..81690c46006 100644 --- a/transport-native-kqueue/src/main/c/netty_kqueue_bsdsocket.c +++ b/transport-native-kqueue/src/main/c/netty_kqueue_bsdsocket.c @@ -13,6 +13,7 @@ * License for the specific language governing permissions and limitations * under the License. */ +#include #include #include #include @@ -83,7 +84,7 @@ static jlong netty_kqueue_bsdsocket_sendFile(JNIEnv* env, jclass clazz, jint soc return res < 0 ? -err : 0; } -static jint netty_kqueue_bsdsocket_connectx(JNIEnv* env, +static jint netty_kqueue_bsdsocket_connectx(JNIEnv* env, jclass clazz, jint socketFd, jint socketInterface, jboolean sourceIPv6, jbyteArray sourceAddress, jint sourceScopeId, jint sourcePort, @@ -114,7 +115,7 @@ static jint netty_kqueue_bsdsocket_connectx(JNIEnv* env, endpoints.sae_srcaddrlen = srcaddrlen; } - assert destinationAddress != NULL; // Java side will ensure destination is never null. + assert(destinationAddress != NULL); // Java side will ensure destination is never null. if (-1 == netty_unix_socket_initSockaddr(env, destinationIPv6, destinationAddress, destinationScopeId, destinationPort, &dstaddr, &dstaddrlen)) { netty_unix_errors_throwIOException(env, "Destination address could not be converted to sockaddr."); diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/BsdSocket.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/BsdSocket.java index f4285e10bb0..e3fcfe57a2c 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/BsdSocket.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/BsdSocket.java @@ -138,7 +138,7 @@ private int connectx(InetSocketAddress source, InetSocketAddress destination, Io destinationScopeId = 0; destinationAddress = ipv4MappedIpv6Address(destinationInetAddress.getAddress()); } - int destinationPort = source.getPort(); + int destinationPort = destination.getPort(); long iovAddress; int iovCount; @@ -162,7 +162,7 @@ private int connectx(InetSocketAddress source, InetSocketAddress destination, Io destinationIPv6, destinationAddress, destinationScopeId, destinationPort, flags, iovAddress, iovCount, iovDataLength); if (result < 0) { - return ioResult("connectx", -result); + return ioResult("connectx", result); } return result; } diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/Native.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/Native.java index 3c087b06c26..5f4c77fbc2b 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/Native.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/Native.java @@ -107,8 +107,8 @@ public void run() { static final short EVFILT_SOCK = evfiltSock(); // Flags for connectx(2) - static final int CONNECT_RESUME_ON_READ_WRITE = connectResumeOnReadWrite(); - static final int CONNECT_DATA_IDEMPOTENT = connectDataIdempotent(); + private static final int CONNECT_RESUME_ON_READ_WRITE = connectResumeOnReadWrite(); + private static final int CONNECT_DATA_IDEMPOTENT = connectDataIdempotent(); static final int CONNECT_TCP_FASTOPEN = CONNECT_RESUME_ON_READ_WRITE | CONNECT_DATA_IDEMPOTENT; static FileDescriptor newKQueue() { From 14306fc99ece759f4df1810d30fdc510966cd106 Mon Sep 17 00:00:00 2001 From: Chris Vest Date: Tue, 10 Aug 2021 13:55:27 +0200 Subject: [PATCH 10/12] When using connectx for TFO on non-blocking sockets, it will return EINPROGRESS Make sure we handle that as a successful call. --- .../src/main/java/io/netty/channel/kqueue/BsdSocket.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/BsdSocket.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/BsdSocket.java index e3fcfe57a2c..5e8332d352d 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/BsdSocket.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/BsdSocket.java @@ -27,6 +27,7 @@ import static io.netty.channel.kqueue.AcceptFilter.PLATFORM_UNSUPPORTED; import static io.netty.channel.kqueue.Native.CONNECT_TCP_FASTOPEN; +import static io.netty.channel.unix.Errors.ERRNO_EINPROGRESS_NEGATIVE; import static io.netty.channel.unix.Errors.ioResult; import static io.netty.channel.unix.NativeInetAddress.ipv4MappedIpv6Address; import static io.netty.util.internal.ObjectUtil.checkNotNull; @@ -161,6 +162,12 @@ private int connectx(InetSocketAddress source, InetSocketAddress destination, Io UNSPECIFIED_SOURCE_INTERFACE, sourceIPv6, sourceAddress, sourceScopeId, sourcePort, destinationIPv6, destinationAddress, destinationScopeId, destinationPort, flags, iovAddress, iovCount, iovDataLength); + if (result == ERRNO_EINPROGRESS_NEGATIVE) { + // This is normal for non-blocking sockets. + // We'll know the connection has been established when the socket is selectable for writing. + // Tell the channel the data was written, so the outbound buffer can update its position. + return iovDataLength; + } if (result < 0) { return ioResult("connectx", result); } From 4577097df0cdec9e85e189a2fc948b23eb4c2cac Mon Sep 17 00:00:00 2001 From: Chris Vest Date: Wed, 11 Aug 2021 12:03:30 +0200 Subject: [PATCH 11/12] Add TCP FastOpen to the KQueue test permutations Also fix a bug where using TFO with KQueue would prematurely consider the socket connected. Instead, the socket should assume to be connect-in-progress when using TFO since all our sockets are non-blocking. --- .../transport/socket/SocketConnectTest.java | 7 +-- .../channel/epoll/EpollSocketConnectTest.java | 6 -- .../epoll/EpollSocketTestPermutation.java | 6 +- .../channel/kqueue/AbstractKQueueChannel.java | 8 +-- .../channel/kqueue/KQueueSocketChannel.java | 11 ++-- ...QueueSocketChannelNotYetConnectedTest.java | 2 +- .../kqueue/KQueueSocketTestPermutation.java | 60 +++++++++++-------- .../KqueueWriteBeforeRegisteredTest.java | 2 +- .../java/io/netty/channel/unix/Socket.java | 4 +- 9 files changed, 52 insertions(+), 54 deletions(-) diff --git a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketConnectTest.java b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketConnectTest.java index 9b306b53e1a..06ba85fb7bf 100644 --- a/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketConnectTest.java +++ b/testsuite/src/main/java/io/netty/testsuite/transport/socket/SocketConnectTest.java @@ -28,11 +28,9 @@ import io.netty.channel.socket.SocketChannel; import io.netty.util.concurrent.ImmediateEventExecutor; import io.netty.util.concurrent.Promise; -import io.netty.util.internal.StringUtil; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.api.Timeout; -import org.opentest4j.TestAbortedException; import java.io.ByteArrayOutputStream; import java.net.InetSocketAddress; @@ -189,8 +187,9 @@ private static void connectAndVerifyDataTransfer(Bootstrap cb, Channel sc) } protected void enableTcpFastOpen(ServerBootstrap sb, Bootstrap cb) { - throw new TestAbortedException( - "Support for testing TCP_FASTOPEN not enabled for " + StringUtil.simpleClassName(this)); + // TFO is an almost-pure optimisation and should not change any observable behaviour in our tests. + sb.option(ChannelOption.TCP_FASTOPEN, 5); + cb.option(ChannelOption.TCP_FASTOPEN_CONNECT, true); } private static void assertLocalAddress(InetSocketAddress address) { diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketConnectTest.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketConnectTest.java index 459de4ef775..31f3a2644e2 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketConnectTest.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketConnectTest.java @@ -29,10 +29,4 @@ public class EpollSocketConnectTest extends SocketConnectTest { protected List> newFactories() { return EpollSocketTestPermutation.INSTANCE.socketWithoutFastOpen(); } - - @Override - protected void enableTcpFastOpen(ServerBootstrap sb, Bootstrap cb) { - sb.option(ChannelOption.TCP_FASTOPEN, 5); - cb.option(ChannelOption.TCP_FASTOPEN_CONNECT, true); - } } diff --git a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketTestPermutation.java b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketTestPermutation.java index 38804575ef8..ca4960f0c03 100644 --- a/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketTestPermutation.java +++ b/transport-native-epoll/src/test/java/io/netty/channel/epoll/EpollSocketTestPermutation.java @@ -68,7 +68,6 @@ public List> serverSocket() { List> toReturn = new ArrayList>(); @@ -207,10 +206,7 @@ public String toString() { } public List> domainSocket() { - - List> list = - combo(serverDomainSocket(), clientDomainSocket()); - return list; + return combo(serverDomainSocket(), clientDomainSocket()); } public List> serverDomainSocket() { diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueChannel.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueChannel.java index 96004482743..80b27858a3b 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueChannel.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/AbstractKQueueChannel.java @@ -390,7 +390,7 @@ final void readReadyBefore() { final void readReadyFinally(ChannelConfig config) { maybeMoreDataToRead = allocHandle.maybeMoreDataToRead(); - if (allocHandle.isReadEOF() || (readPending && maybeMoreDataToRead)) { + if (allocHandle.isReadEOF() || readPending && maybeMoreDataToRead) { // trigger a read again as there may be something left to read and because of ET we // will not get notified again until we read everything from the socket // @@ -699,7 +699,7 @@ protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddr socket.bind(localAddress); } - boolean connected = doConnect0(remoteAddress); + boolean connected = doConnect0(remoteAddress, localAddress); if (connected) { remote = remoteSocketAddr == null? remoteAddress : computeRemoteAddr(remoteSocketAddr, socket.remoteAddress()); @@ -711,10 +711,10 @@ protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddr return connected; } - private boolean doConnect0(SocketAddress remote) throws Exception { + protected boolean doConnect0(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception { boolean success = false; try { - boolean connected = socket.connect(remote); + boolean connected = socket.connect(remoteAddress); if (!connected) { writeFilter(true); } diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueSocketChannel.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueSocketChannel.java index 060c605e439..878cae53472 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueSocketChannel.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueSocketChannel.java @@ -68,7 +68,7 @@ public ServerSocketChannel parent() { } @Override - protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception { + protected boolean doConnect0(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception { if (config.isTcpFastOpenConnect()) { ChannelOutboundBuffer outbound = unsafe().outboundBuffer(); outbound.addFlush(); @@ -81,17 +81,16 @@ protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddr iov.add(initialData, initialData.readerIndex(), initialData.readableBytes()); int bytesSent = socket.connectx( (InetSocketAddress) localAddress, (InetSocketAddress) remoteAddress, iov, true); - if (bytesSent > 0) { - outbound.removeBytes(bytesSent); - return true; - } + writeFilter(true); + outbound.removeBytes(bytesSent); + return false; // 'false' because we assume connecting to be in-progress. } finally { iov.release(); } } } } - return super.doConnect(remoteAddress, localAddress); + return super.doConnect0(remoteAddress, localAddress); } @Override diff --git a/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueSocketChannelNotYetConnectedTest.java b/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueSocketChannelNotYetConnectedTest.java index b486024b6cf..a9d93eef99a 100644 --- a/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueSocketChannelNotYetConnectedTest.java +++ b/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueSocketChannelNotYetConnectedTest.java @@ -24,6 +24,6 @@ public class KQueueSocketChannelNotYetConnectedTest extends SocketChannelNotYetConnectedTest { @Override protected List> newFactories() { - return KQueueSocketTestPermutation.INSTANCE.clientSocket(); + return KQueueSocketTestPermutation.INSTANCE.clientSocketWithFastOpen(); } } diff --git a/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueSocketTestPermutation.java b/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueSocketTestPermutation.java index 93595b87a65..eff955dbca9 100644 --- a/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueSocketTestPermutation.java +++ b/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KQueueSocketTestPermutation.java @@ -19,6 +19,7 @@ import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFactory; +import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.socket.InternetProtocolFamily; import io.netty.channel.socket.nio.NioDatagramChannel; @@ -30,8 +31,6 @@ import io.netty.testsuite.transport.TestsuitePermutation.BootstrapFactory; import io.netty.testsuite.transport.socket.SocketTestPermutation; import io.netty.util.concurrent.DefaultThreadFactory; -import io.netty.util.internal.logging.InternalLogger; -import io.netty.util.internal.logging.InternalLoggerFactory; import java.util.ArrayList; import java.util.Arrays; @@ -47,8 +46,6 @@ class KQueueSocketTestPermutation extends SocketTestPermutation { static final EventLoopGroup KQUEUE_WORKER_GROUP = new KQueueEventLoopGroup(WORKERS, new DefaultThreadFactory("testsuite-KQueue-worker", true)); - private static final InternalLogger logger = InternalLoggerFactory.getInstance(KQueueSocketTestPermutation.class); - @Override public List> socket() { @@ -60,7 +57,6 @@ public List> serverSocket() { List> toReturn = new ArrayList>(); @@ -83,30 +79,47 @@ public ServerBootstrap newInstance() { return toReturn; } - @SuppressWarnings("unchecked") @Override public List> clientSocket() { - return Arrays.asList( - new BootstrapFactory() { - @Override - public Bootstrap newInstance() { - return new Bootstrap().group(KQUEUE_WORKER_GROUP).channel(KQueueSocketChannel.class); - } - }, - new BootstrapFactory() { - @Override - public Bootstrap newInstance() { - return new Bootstrap().group(nioWorkerGroup).channel(NioSocketChannel.class); - } - } - ); + List> toReturn = new ArrayList>(); + + toReturn.add(new BootstrapFactory() { + @Override + public Bootstrap newInstance() { + return new Bootstrap().group(KQUEUE_WORKER_GROUP).channel(KQueueSocketChannel.class); + } + }); + + toReturn.add(new BootstrapFactory() { + @Override + public Bootstrap newInstance() { + return new Bootstrap().group(nioWorkerGroup).channel(NioSocketChannel.class); + } + }); + + return toReturn; + } + + @Override + public List> clientSocketWithFastOpen() { + List> factories = clientSocket(); + + int insertIndex = factories.size() - 1; // Keep NIO fixture last. + factories.add(insertIndex, new BootstrapFactory() { + @Override + public Bootstrap newInstance() { + return new Bootstrap().group(KQUEUE_WORKER_GROUP).channel(KQueueSocketChannel.class) + .option(ChannelOption.TCP_FASTOPEN_CONNECT, true); + } + }); + + return factories; } @Override public List> datagram( final InternetProtocolFamily family) { // Make the list of Bootstrap factories. - @SuppressWarnings("unchecked") List> bfs = Arrays.asList( new BootstrapFactory() { @Override @@ -135,10 +148,7 @@ public Bootstrap newInstance() { } public List> domainSocket() { - - List> list = - combo(serverDomainSocket(), clientDomainSocket()); - return list; + return combo(serverDomainSocket(), clientDomainSocket()); } public List> serverDomainSocket() { diff --git a/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KqueueWriteBeforeRegisteredTest.java b/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KqueueWriteBeforeRegisteredTest.java index e623753ea88..1e7f14ced4d 100644 --- a/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KqueueWriteBeforeRegisteredTest.java +++ b/transport-native-kqueue/src/test/java/io/netty/channel/kqueue/KqueueWriteBeforeRegisteredTest.java @@ -25,6 +25,6 @@ public class KqueueWriteBeforeRegisteredTest extends WriteBeforeRegisteredTest { @Override protected List> newFactories() { - return KQueueSocketTestPermutation.INSTANCE.clientSocket(); + return KQueueSocketTestPermutation.INSTANCE.clientSocketWithFastOpen(); } } diff --git a/transport-native-unix-common/src/main/java/io/netty/channel/unix/Socket.java b/transport-native-unix-common/src/main/java/io/netty/channel/unix/Socket.java index 0452845803d..a2cf22e163d 100644 --- a/transport-native-unix-common/src/main/java/io/netty/channel/unix/Socket.java +++ b/transport-native-unix-common/src/main/java/io/netty/channel/unix/Socket.java @@ -52,7 +52,7 @@ public class Socket extends FileDescriptor { public Socket(int fd) { super(fd); - this.ipv6 = isIPv6(fd); + ipv6 = isIPv6(fd); } /** @@ -72,7 +72,7 @@ public final void shutdown(boolean read, boolean write) throws IOException { // shutdown anything. This is because if the underlying FD is reused and we still have an object which // represents the previous incarnation of the FD we need to be sure we don't inadvertently shutdown the // "new" FD without explicitly having a change. - final int oldState = this.state; + final int oldState = state; if (isClosed(oldState)) { throw new ClosedChannelException(); } From 075acbb8c36d9f8644870da1fd9e64a065ac10e6 Mon Sep 17 00:00:00 2001 From: Chris Vest Date: Thu, 12 Aug 2021 10:06:02 +0200 Subject: [PATCH 12/12] Accurately determine if a connection is in-progress or not, when using connectx/TCP FastOpen on MacOS --- .../io/netty/channel/kqueue/BsdSocket.java | 29 ++++++++++++++----- .../channel/kqueue/KQueueSocketChannel.java | 7 +++-- 2 files changed, 27 insertions(+), 9 deletions(-) diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/BsdSocket.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/BsdSocket.java index 5e8332d352d..75a2e92ea1e 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/BsdSocket.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/BsdSocket.java @@ -94,15 +94,25 @@ long sendFile(DefaultFileRegion src, long baseOffset, long offset, long length) return ioResult("sendfile", (int) res); } + /** + * Establish a connection to the given destination address, and send the given data to it. + * + * Note: This method relies on the {@code connectx(2)} system call, which is MacOS specific. + * + * @param source the source address we are connecting from. + * @param destination the destination address we are connecting to. + * @param data the data to copy to the kernel-side socket buffer. + * @param tcpFastOpen if {@code true}, set the flags needed to enable TCP FastOpen connecting. + * @return The number of bytes copied to the kernel-side socket buffer, or the number of bytes sent to the + * destination. This number is negative if connecting is left in an in-progress state, + * or positive if the connection was immediately established. + * @throws IOException if an IO error occurs, if the {@code data} is too big to send in one go, + * or if the system call is not supported on your platform. + */ int connectx(InetSocketAddress source, InetSocketAddress destination, IovArray data, boolean tcpFastOpen) throws IOException { - int flags = tcpFastOpen ? CONNECT_TCP_FASTOPEN : 0; - return connectx(source, destination, data, flags); - } - - private int connectx(InetSocketAddress source, InetSocketAddress destination, IovArray data, int flags) - throws IOException { checkNotNull(destination, "Destination InetSocketAddress cannot be null."); + int flags = tcpFastOpen ? CONNECT_TCP_FASTOPEN : 0; boolean sourceIPv6; byte[] sourceAddress; @@ -166,7 +176,7 @@ private int connectx(InetSocketAddress source, InetSocketAddress destination, Io // This is normal for non-blocking sockets. // We'll know the connection has been established when the socket is selectable for writing. // Tell the channel the data was written, so the outbound buffer can update its position. - return iovDataLength; + return -iovDataLength; } if (result < 0) { return ioResult("connectx", result); @@ -209,11 +219,16 @@ private static native int connectx( ); private static native String[] getAcceptFilter(int fd) throws IOException; + private static native int getTcpNoPush(int fd) throws IOException; + private static native int getSndLowAt(int fd) throws IOException; + private static native PeerCredentials getPeerCredentials(int fd) throws IOException; private static native void setAcceptFilter(int fd, String filterName, String filterArgs) throws IOException; + private static native void setTcpNoPush(int fd, int tcpNoPush) throws IOException; + private static native void setSndLowAt(int fd, int lowAt) throws IOException; } diff --git a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueSocketChannel.java b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueSocketChannel.java index 878cae53472..e024a6eaf51 100644 --- a/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueSocketChannel.java +++ b/transport-native-kqueue/src/main/java/io/netty/channel/kqueue/KQueueSocketChannel.java @@ -75,6 +75,7 @@ protected boolean doConnect0(SocketAddress remoteAddress, SocketAddress localAdd Object curr; if ((curr = outbound.current()) instanceof ByteBuf) { ByteBuf initialData = (ByteBuf) curr; + // Don't bother with TCP FastOpen if we don't have any initial data to send anyway. if (initialData.isReadable()) { IovArray iov = new IovArray(config.getAllocator().directBuffer()); try { @@ -82,8 +83,10 @@ protected boolean doConnect0(SocketAddress remoteAddress, SocketAddress localAdd int bytesSent = socket.connectx( (InetSocketAddress) localAddress, (InetSocketAddress) remoteAddress, iov, true); writeFilter(true); - outbound.removeBytes(bytesSent); - return false; // 'false' because we assume connecting to be in-progress. + outbound.removeBytes(Math.abs(bytesSent)); + // The `connectx` method returns a negative number if connection is in-progress. + // So we should return `true` to indicate that connection was established, if it's positive. + return bytesSent > 0; } finally { iov.release(); }