Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(network): commonize raw socket interface #3221

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -10,6 +10,7 @@ import io.ktor.client.plugins.websocket.*
import io.ktor.client.request.*
import io.ktor.client.utils.*
import io.ktor.http.*
import io.ktor.network.dispatcher.*
import io.ktor.network.selector.*
import io.ktor.util.*
import io.ktor.util.collections.*
Expand All @@ -31,10 +32,10 @@ internal class CIOEngine(

private val endpoints = ConcurrentMap<String, Endpoint>()

private val selectorManager: SelectorManager by lazy { SelectorManager(dispatcher) }
private val socketDispatcher: SocketDispatcher by lazy { SocketDispatcher(dispatcher) }

private val connectionFactory = ConnectionFactory(
selectorManager,
socketDispatcher,
config.maxConnectionsCount,
config.endpoint.maxConnectionsPerRoute
)
Expand All @@ -61,7 +62,7 @@ internal class CIOEngine(
coroutineContext = parentContext + requestField

val requestJob = requestField[Job]!!
val selector = selectorManager
val selector = socketDispatcher

@OptIn(ExperimentalCoroutinesApi::class)
GlobalScope.launch(parentContext, start = CoroutineStart.ATOMIC) {
Expand Down
Expand Up @@ -4,13 +4,13 @@

package io.ktor.client.engine.cio

import io.ktor.network.selector.*
import io.ktor.network.dispatcher.*
import io.ktor.network.sockets.*
import io.ktor.util.collections.*
import kotlinx.coroutines.sync.*

internal class ConnectionFactory(
private val selector: SelectorManager,
private val dispatcher: SocketDispatcher,
connectionsLimit: Int,
private val addressConnectionsLimit: Int
) {
Expand All @@ -26,7 +26,10 @@ internal class ConnectionFactory(
addressSemaphore.acquire()

return try {
aSocket(selector).tcpNoDelay().tcp().connect(address, configuration)
dispatcher.tcp().connect(address) {
noDelay = true
also(configuration)
}
} catch (cause: Throwable) {
// a failure or cancellation
addressSemaphore.release()
Expand Down
90 changes: 41 additions & 49 deletions ktor-network/api/ktor-network.api
@@ -1,3 +1,23 @@
public final class io/ktor/network/dispatcher/JvmSocketDispatcher : io/ktor/network/dispatcher/SocketDispatcher {
public fun <init> ()V
public fun <init> (Lkotlin/coroutines/CoroutineContext;)V
public synthetic fun <init> (Lkotlin/coroutines/CoroutineContext;ILkotlin/jvm/internal/DefaultConstructorMarker;)V
public fun close ()V
public fun getCoroutineContext ()Lkotlin/coroutines/CoroutineContext;
public fun tcp ()Lio/ktor/network/sockets/TcpSocketBuilder;
public fun udp ()Lio/ktor/network/sockets/UdpSocketBuilder;
}

public final class io/ktor/network/dispatcher/JvmSocketDispatcherKt {
public static final fun SocketDispatcher ()Lio/ktor/network/dispatcher/SocketDispatcher;
public static final fun SocketDispatcher (Lkotlin/coroutines/CoroutineContext;)Lio/ktor/network/dispatcher/SocketDispatcher;
}

public abstract interface class io/ktor/network/dispatcher/SocketDispatcher : java/io/Closeable, kotlinx/coroutines/CoroutineScope {
public abstract fun tcp ()Lio/ktor/network/sockets/TcpSocketBuilder;
public abstract fun udp ()Lio/ktor/network/sockets/UdpSocketBuilder;
}

public final class io/ktor/network/selector/ActorSelectorManager : io/ktor/network/selector/SelectorManagerSupport, java/io/Closeable, kotlinx/coroutines/CoroutineScope {
public fun <init> (Lkotlin/coroutines/CoroutineContext;)V
public fun close ()V
Expand Down Expand Up @@ -49,18 +69,12 @@ public abstract interface class io/ktor/network/selector/Selectable : java/io/Cl
}

public abstract interface class io/ktor/network/selector/SelectorManager : java/io/Closeable, kotlinx/coroutines/CoroutineScope {
public static final field Companion Lio/ktor/network/selector/SelectorManager$Companion;
public abstract fun getProvider ()Ljava/nio/channels/spi/SelectorProvider;
public abstract fun notifyClosed (Lio/ktor/network/selector/Selectable;)V
public abstract fun select (Lio/ktor/network/selector/Selectable;Lio/ktor/network/selector/SelectInterest;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public final class io/ktor/network/selector/SelectorManager$Companion {
}

public final class io/ktor/network/selector/SelectorManagerKt {
public static final fun SelectorManager (Lkotlin/coroutines/CoroutineContext;)Lio/ktor/network/selector/SelectorManager;
public static synthetic fun SelectorManager$default (Lkotlin/coroutines/CoroutineContext;ILjava/lang/Object;)Lio/ktor/network/selector/SelectorManager;
public static final fun buildOrClose (Lio/ktor/network/selector/SelectorManager;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function1;)Ljava/lang/Object;
}

Expand Down Expand Up @@ -126,11 +140,6 @@ public final class io/ktor/network/sockets/BoundDatagramSocket$DefaultImpls {
public static fun send (Lio/ktor/network/sockets/BoundDatagramSocket;Lio/ktor/network/sockets/Datagram;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public final class io/ktor/network/sockets/BuildersKt {
public static final fun aSocket (Lio/ktor/network/selector/SelectorManager;)Lio/ktor/network/sockets/SocketBuilder;
public static final fun tcpNoDelay (Lio/ktor/network/sockets/Configurable;)Lio/ktor/network/sockets/Configurable;
}

public abstract interface class io/ktor/network/sockets/Configurable {
public abstract fun configure (Lkotlin/jvm/functions/Function1;)Lio/ktor/network/sockets/Configurable;
public abstract fun getOptions ()Lio/ktor/network/sockets/SocketOptions;
Expand All @@ -141,6 +150,10 @@ public final class io/ktor/network/sockets/Configurable$DefaultImpls {
public static fun configure (Lio/ktor/network/sockets/Configurable;Lkotlin/jvm/functions/Function1;)Lio/ktor/network/sockets/Configurable;
}

public final class io/ktor/network/sockets/ConfigurableKt {
public static final fun tcpNoDelay (Lio/ktor/network/sockets/Configurable;)Lio/ktor/network/sockets/Configurable;
}

public abstract interface class io/ktor/network/sockets/ConnectedDatagramSocket : io/ktor/network/sockets/ABoundSocket, io/ktor/network/sockets/AConnectedSocket, io/ktor/network/sockets/ASocket, io/ktor/network/sockets/DatagramReadWriteChannel, io/ktor/network/sockets/ReadWriteSocket {
}

Expand Down Expand Up @@ -231,15 +244,6 @@ public final class io/ktor/network/sockets/Socket$DefaultImpls {
public abstract class io/ktor/network/sockets/SocketAddress {
}

public final class io/ktor/network/sockets/SocketBuilder : io/ktor/network/sockets/Configurable {
public synthetic fun configure (Lkotlin/jvm/functions/Function1;)Lio/ktor/network/sockets/Configurable;
public fun configure (Lkotlin/jvm/functions/Function1;)Lio/ktor/network/sockets/SocketBuilder;
public fun getOptions ()Lio/ktor/network/sockets/SocketOptions;
public fun setOptions (Lio/ktor/network/sockets/SocketOptions;)V
public final fun tcp ()Lio/ktor/network/sockets/TcpSocketBuilder;
public final fun udp ()Lio/ktor/network/sockets/UDPSocketBuilder;
}

public abstract class io/ktor/network/sockets/SocketOptions {
public static final field Companion Lio/ktor/network/sockets/SocketOptions$Companion;
public synthetic fun <init> (Ljava/util/Map;Lkotlin/jvm/internal/DefaultConstructorMarker;)V
Expand Down Expand Up @@ -300,25 +304,21 @@ public final class io/ktor/network/sockets/SocketsKt {
public static synthetic fun openWriteChannel$default (Lio/ktor/network/sockets/AWritable;ZILjava/lang/Object;)Lio/ktor/utils/io/ByteWriteChannel;
}

public final class io/ktor/network/sockets/TCPSocketBuilderJvmKt {
public static final fun connect (Lio/ktor/network/sockets/TcpSocketBuilder;Lio/ktor/network/sockets/SocketAddress;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static synthetic fun connect$default (Lio/ktor/network/sockets/TcpSocketBuilder;Lio/ktor/network/sockets/SocketAddress;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
public abstract interface class io/ktor/network/sockets/TcpSocketBuilder {
public abstract fun bind (Lio/ktor/network/sockets/SocketAddress;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun connect (Lio/ktor/network/sockets/SocketAddress;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public final class io/ktor/network/sockets/TcpSocketBuilder : io/ktor/network/sockets/Configurable {
public fun <init> (Lio/ktor/network/selector/SelectorManager;Lio/ktor/network/sockets/SocketOptions;)V
public final fun bind (Lio/ktor/network/sockets/SocketAddress;Lkotlin/jvm/functions/Function1;)Lio/ktor/network/sockets/ServerSocket;
public final fun bind (Ljava/lang/String;ILkotlin/jvm/functions/Function1;)Lio/ktor/network/sockets/ServerSocket;
public static synthetic fun bind$default (Lio/ktor/network/sockets/TcpSocketBuilder;Lio/ktor/network/sockets/SocketAddress;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lio/ktor/network/sockets/ServerSocket;
public static synthetic fun bind$default (Lio/ktor/network/sockets/TcpSocketBuilder;Ljava/lang/String;ILkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lio/ktor/network/sockets/ServerSocket;
public synthetic fun configure (Lkotlin/jvm/functions/Function1;)Lio/ktor/network/sockets/Configurable;
public fun configure (Lkotlin/jvm/functions/Function1;)Lio/ktor/network/sockets/TcpSocketBuilder;
public final fun connect (Lio/ktor/network/sockets/SocketAddress;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public final fun connect (Ljava/lang/String;ILkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public final class io/ktor/network/sockets/TcpSocketBuilder$DefaultImpls {
public static synthetic fun bind$default (Lio/ktor/network/sockets/TcpSocketBuilder;Lio/ktor/network/sockets/SocketAddress;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
public static synthetic fun connect$default (Lio/ktor/network/sockets/TcpSocketBuilder;Lio/ktor/network/sockets/SocketAddress;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
}

public final class io/ktor/network/sockets/TcpSocketBuilderKt {
public static final fun bind (Lio/ktor/network/sockets/TcpSocketBuilder;Ljava/lang/String;ILkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static synthetic fun bind$default (Lio/ktor/network/sockets/TcpSocketBuilder;Ljava/lang/String;ILkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
public static final fun connect (Lio/ktor/network/sockets/TcpSocketBuilder;Ljava/lang/String;ILkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static synthetic fun connect$default (Lio/ktor/network/sockets/TcpSocketBuilder;Ljava/lang/String;ILkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
public fun getOptions ()Lio/ktor/network/sockets/SocketOptions;
public fun setOptions (Lio/ktor/network/sockets/SocketOptions;)V
}

public final class io/ktor/network/sockets/TypeOfService {
Expand Down Expand Up @@ -346,22 +346,14 @@ public final class io/ktor/network/sockets/TypeOfService$Companion {
public final fun getUNDEFINED-zieKYfw ()B
}

public final class io/ktor/network/sockets/UDPSocketBuilder : io/ktor/network/sockets/Configurable {
public static final field Companion Lio/ktor/network/sockets/UDPSocketBuilder$Companion;
public fun <init> (Lio/ktor/network/selector/SelectorManager;Lio/ktor/network/sockets/SocketOptions$UDPSocketOptions;)V
public final fun bind (Lio/ktor/network/sockets/SocketAddress;Lkotlin/jvm/functions/Function1;)Lio/ktor/network/sockets/BoundDatagramSocket;
public static synthetic fun bind$default (Lio/ktor/network/sockets/UDPSocketBuilder;Lio/ktor/network/sockets/SocketAddress;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lio/ktor/network/sockets/BoundDatagramSocket;
public synthetic fun configure (Lkotlin/jvm/functions/Function1;)Lio/ktor/network/sockets/Configurable;
public fun configure (Lkotlin/jvm/functions/Function1;)Lio/ktor/network/sockets/UDPSocketBuilder;
public final fun connect (Lio/ktor/network/sockets/SocketAddress;Lio/ktor/network/sockets/SocketAddress;Lkotlin/jvm/functions/Function1;)Lio/ktor/network/sockets/ConnectedDatagramSocket;
public static synthetic fun connect$default (Lio/ktor/network/sockets/UDPSocketBuilder;Lio/ktor/network/sockets/SocketAddress;Lio/ktor/network/sockets/SocketAddress;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lio/ktor/network/sockets/ConnectedDatagramSocket;
public fun getOptions ()Lio/ktor/network/sockets/SocketOptions$UDPSocketOptions;
public synthetic fun getOptions ()Lio/ktor/network/sockets/SocketOptions;
public fun setOptions (Lio/ktor/network/sockets/SocketOptions$UDPSocketOptions;)V
public synthetic fun setOptions (Lio/ktor/network/sockets/SocketOptions;)V
public abstract interface class io/ktor/network/sockets/UdpSocketBuilder {
public abstract fun bind (Lio/ktor/network/sockets/SocketAddress;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun connect (Lio/ktor/network/sockets/SocketAddress;Lio/ktor/network/sockets/SocketAddress;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public final class io/ktor/network/sockets/UDPSocketBuilder$Companion {
public final class io/ktor/network/sockets/UdpSocketBuilder$DefaultImpls {
public static synthetic fun bind$default (Lio/ktor/network/sockets/UdpSocketBuilder;Lio/ktor/network/sockets/SocketAddress;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
public static synthetic fun connect$default (Lio/ktor/network/sockets/UdpSocketBuilder;Lio/ktor/network/sockets/SocketAddress;Lio/ktor/network/sockets/SocketAddress;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
}

public final class io/ktor/network/sockets/UnixSocketAddress : io/ktor/network/sockets/SocketAddress {
Expand Down
3 changes: 2 additions & 1 deletion ktor-network/build.gradle.kts
Expand Up @@ -6,8 +6,9 @@ kotlin {
}

sourceSets {
jvmAndNixMain {
commonMain {
dependencies {
api(libs.kotlinx.coroutines.core)
api(project(":ktor-utils"))
}
}
Expand Down
@@ -0,0 +1,20 @@
/*
* Copyright 2014-2022 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
*/

package io.ktor.network.dispatcher

import io.ktor.network.sockets.*
import io.ktor.utils.io.core.*
import kotlinx.coroutines.*
import kotlin.coroutines.*

@Suppress("FunctionName")
public expect fun SocketDispatcher(): SocketDispatcher

@Suppress("FunctionName")
public expect fun SocketDispatcher(parent: CoroutineContext): SocketDispatcher
public interface SocketDispatcher : CoroutineScope, Closeable {
public fun tcp(): TcpSocketBuilder
public fun udp(): UdpSocketBuilder
}
Expand Up @@ -2,10 +2,5 @@ package io.ktor.network.selector

import kotlinx.coroutines.*

/**
* A selectable entity with selectable NIO [channel], [interestedOps] subscriptions.
*/
public expect interface Selectable

@Suppress("KDocMissingDocumentation")
public class ClosedChannelCancellationException : CancellationException("Closed channel.")
Expand Up @@ -5,31 +5,6 @@ package io.ktor.network.sockets

import io.ktor.network.selector.*

/**
* Start building a socket
*/
public fun aSocket(selector: SelectorManager): SocketBuilder = SocketBuilder(selector, SocketOptions.create())

/**
* Socket builder
*/
@Suppress("PublicApiImplicitType", "unused")
public class SocketBuilder internal constructor(
private val selector: SelectorManager,
override var options: SocketOptions
) : Configurable<SocketBuilder, SocketOptions> {

/**
* Build TCP socket.
*/
public fun tcp(): TcpSocketBuilder = TcpSocketBuilder(selector, options.peer())

/**
* Build UDP socket.
*/
public fun udp(): UDPSocketBuilder = UDPSocketBuilder(selector, options.peer().udp())
}

/**
* Set TCP_NODELAY socket option to disable the Nagle algorithm.
*/
Expand Down
@@ -1,6 +1,6 @@
/*
* Copyright 2014-2021 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
*/
* Copyright 2014-2022 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
*/

package io.ktor.network.sockets

Expand Down
@@ -0,0 +1,40 @@
package io.ktor.network.sockets

/**
* TCP socket builder
*/
public interface TcpSocketBuilder {
/**
* Connect to [remoteAddress].
*/
public suspend fun connect(
remoteAddress: SocketAddress,
configure: SocketOptions.TCPClientSocketOptions.() -> Unit = {}
): Socket

/**
* Bind server socket to listen to [localAddress].
*/
public suspend fun bind(
localAddress: SocketAddress? = null,
configure: SocketOptions.AcceptorOptions.() -> Unit = {}
): ServerSocket
}

/**
* Connect to [hostname] and [port].
*/
public suspend inline fun TcpSocketBuilder.connect(
hostname: String,
port: Int,
noinline configure: SocketOptions.TCPClientSocketOptions.() -> Unit = {}
): Socket = connect(InetSocketAddress(hostname, port), configure)

/**
* Bind server socket at [port] to listen to [hostname].
*/
public suspend inline fun TcpSocketBuilder.bind(
hostname: String = "0.0.0.0",
port: Int = 0,
noinline configure: SocketOptions.AcceptorOptions.() -> Unit = {}
): ServerSocket = bind(InetSocketAddress(hostname, port), configure)
@@ -0,0 +1,27 @@
/*
* Copyright 2014-2022 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
*/

package io.ktor.network.sockets

/**
* UDP socket builder
*/
public interface UdpSocketBuilder {
/**
* Bind server socket to listen to [localAddress].
*/
public suspend fun bind(
localAddress: SocketAddress? = null,
configure: SocketOptions.UDPSocketOptions.() -> Unit = {}
): BoundDatagramSocket

/**
* Create a datagram socket to listen datagrams at [localAddress] and set to [remoteAddress].
*/
public suspend fun connect(
remoteAddress: SocketAddress,
localAddress: SocketAddress? = null,
configure: SocketOptions.UDPSocketOptions.() -> Unit = {}
): ConnectedDatagramSocket
}
13 changes: 13 additions & 0 deletions ktor-network/js/src/io/ktor/network/dispatcher/SocketDispatcher.kt
@@ -0,0 +1,13 @@
/*
* Copyright 2014-2022 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
*/

package io.ktor.network.dispatcher

import kotlin.coroutines.*

@Suppress("FunctionName")
public actual fun SocketDispatcher(): SocketDispatcher = TODO()

@Suppress("FunctionName")
public actual fun SocketDispatcher(parent: CoroutineContext): SocketDispatcher = TODO()
12 changes: 12 additions & 0 deletions ktor-network/js/src/io/ktor/network/sockets/SocketAddress.kt
@@ -0,0 +1,12 @@
package io.ktor.network.sockets

public actual sealed class SocketAddress

public actual data class InetSocketAddress actual constructor(
public actual val hostname: String,
public actual val port: Int
) : SocketAddress()

public actual data class UnixSocketAddress actual constructor(
public actual val path: String
) : SocketAddress()