Skip to content

Commit

Permalink
KTOR-4766 - Add basic support to use UnixSockets with CIO
Browse files Browse the repository at this point in the history
  • Loading branch information
hfhbd committed Jan 5, 2023
1 parent 2886330 commit 4aa20ec
Show file tree
Hide file tree
Showing 10 changed files with 212 additions and 34 deletions.
21 changes: 21 additions & 0 deletions ktor-client/ktor-client-cio/api/ktor-client-cio.api
Expand Up @@ -4,6 +4,18 @@ public final class io/ktor/client/engine/cio/CIO : io/ktor/client/engine/HttpCli
public fun toString ()Ljava/lang/String;
}

public final class io/ktor/client/engine/cio/CIOEngine : io/ktor/client/engine/HttpClientEngineBase, io/ktor/client/engine/cio/CIOHttpClientEngine {
public fun <init> (Lio/ktor/client/engine/cio/CIOEngineConfig;)V
public fun close ()V
public fun createAddress (Ljava/lang/String;I)Lio/ktor/network/sockets/SocketAddress;
public fun execute (Lio/ktor/client/request/HttpRequestData;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public synthetic fun getConfig ()Lio/ktor/client/engine/HttpClientEngineConfig;
public fun getConfig ()Lio/ktor/client/engine/cio/CIOEngineConfig;
public fun getCoroutineContext ()Lkotlin/coroutines/CoroutineContext;
public fun getDispatcher ()Lkotlinx/coroutines/CoroutineDispatcher;
public fun getSupportedCapabilities ()Ljava/util/Set;
}

public final class io/ktor/client/engine/cio/CIOEngineConfig : io/ktor/client/engine/HttpClientEngineConfig {
public fun <init> ()V
public final fun getEndpoint ()Lio/ktor/client/engine/cio/EndpointConfig;
Expand All @@ -25,6 +37,15 @@ public final class io/ktor/client/engine/cio/CIOEngineContainer : io/ktor/client
public fun toString ()Ljava/lang/String;
}

public abstract interface class io/ktor/client/engine/cio/CIOHttpClientEngine : io/ktor/client/engine/HttpClientEngine {
public abstract fun createAddress (Ljava/lang/String;I)Lio/ktor/network/sockets/SocketAddress;
}

public final class io/ktor/client/engine/cio/CIOHttpClientEngine$DefaultImpls {
public static fun getSupportedCapabilities (Lio/ktor/client/engine/cio/CIOHttpClientEngine;)Ljava/util/Set;
public static fun install (Lio/ktor/client/engine/cio/CIOHttpClientEngine;Lio/ktor/client/HttpClient;)V
}

public final class io/ktor/client/engine/cio/ClientClosedException : java/lang/IllegalStateException {
public fun <init> ()V
public fun <init> (Ljava/lang/Throwable;)V
Expand Down
Expand Up @@ -31,6 +31,7 @@ public object CIO : HttpClientEngineFactory<CIOEngineConfig> {
addToLoader()
}

@OptIn(InternalAPI::class)
override fun create(block: CIOEngineConfig.() -> Unit): HttpClientEngine =
CIOEngine(CIOEngineConfig().apply(block))

Expand Down
Expand Up @@ -11,23 +11,30 @@ import io.ktor.client.request.*
import io.ktor.client.utils.*
import io.ktor.http.*
import io.ktor.network.selector.*
import io.ktor.network.sockets.*
import io.ktor.util.*
import io.ktor.util.collections.*
import io.ktor.util.network.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlin.coroutines.*

@OptIn(InternalAPI::class, DelicateCoroutinesApi::class)
internal class CIOEngine(
public interface CIOHttpClientEngine : HttpClientEngine {
public fun createAddress(host: String, port: Int): SocketAddress
}

@InternalAPI
@OptIn(DelicateCoroutinesApi::class)
public class CIOEngine(
override val config: CIOEngineConfig
) : HttpClientEngineBase("ktor-cio") {
) : HttpClientEngineBase("ktor-cio"), CIOHttpClientEngine {

override val dispatcher: CoroutineDispatcher by lazy {
Dispatchers.clientDispatcher(config.threadsCount, "ktor-cio-dispatcher")
}

override val supportedCapabilities = setOf(HttpTimeout, WebSocketCapability, WebSocketExtensionsCapability)
override val supportedCapabilities: Set<HttpClientEngineCapability<*>> =
setOf(HttpTimeout, WebSocketCapability, WebSocketExtensionsCapability)

private val endpoints = ConcurrentMap<String, Endpoint>()

Expand Down Expand Up @@ -104,6 +111,8 @@ internal class CIOEngine(
(requestsJob[Job] as CompletableJob).complete()
}

public override fun createAddress(host: String, port: Int): SocketAddress = InetSocketAddress(host, port)

private fun selectEndpoint(url: Url, proxy: ProxyConfig?): Endpoint {
val host: String
val port: Int
Expand All @@ -123,8 +132,8 @@ internal class CIOEngine(
return endpoints.computeIfAbsent(endpointId) {
val secure = (protocol.isSecure())
Endpoint(
host,
port,
lazy { createAddress(host, port) },
CoroutineName("Endpoint timeout($host:$port)"),
proxy,
secure,
config,
Expand Down
Expand Up @@ -15,10 +15,10 @@ internal class ConnectionFactory(
private val addressConnectionsLimit: Int
) {
private val limit = Semaphore(connectionsLimit)
private val addressLimit = ConcurrentMap<InetSocketAddress, Semaphore>()
private val addressLimit = ConcurrentMap<SocketAddress, Semaphore>()

suspend fun connect(
address: InetSocketAddress,
address: SocketAddress,
configuration: SocketOptions.TCPClientSocketOptions.() -> Unit = {}
): Socket {
limit.acquire()
Expand All @@ -35,7 +35,7 @@ internal class ConnectionFactory(
}
}

fun release(address: InetSocketAddress) {
fun release(address: SocketAddress) {
addressLimit[address]!!.release()
limit.release()
}
Expand Down
Expand Up @@ -21,8 +21,8 @@ import kotlinx.coroutines.channels.*
import kotlin.coroutines.*

internal class Endpoint(
private val host: String,
private val port: Int,
private val address: Lazy<SocketAddress>,
timeoutName: CoroutineName,
private val proxy: ProxyConfig?,
private val secure: Boolean,
private val config: CIOEngineConfig,
Expand All @@ -35,7 +35,7 @@ internal class Endpoint(
private val deliveryPoint: Channel<RequestTask> = Channel()
private val maxEndpointIdleTime: Long = 2 * config.endpoint.connectTimeout

private val timeout = launch(coroutineContext + CoroutineName("Endpoint timeout($host:$port)")) {
private val timeout = launch(coroutineContext + timeoutName) {
try {
while (true) {
val remaining = (lastActivity.value + maxEndpointIdleTime).timestamp - GMTDate().timestamp
Expand Down Expand Up @@ -150,7 +150,7 @@ internal class Endpoint(

try {
repeat(connectAttempts) {
val address = InetSocketAddress(host, port)
val address = address.value

val connect: suspend CoroutineScope.() -> Socket = {
connectionFactory.connect(address) {
Expand Down Expand Up @@ -178,12 +178,12 @@ internal class Endpoint(
startTunnel(requestData, connection.output, connection.input)
}
val realAddress = when (proxy) {
null -> address
null -> address as? InetSocketAddress
else -> InetSocketAddress(requestData.url.host, requestData.url.port)
}
val tlsSocket = connection.tls(coroutineContext) {
takeFrom(config.https)
serverName = serverName ?: realAddress.hostname
serverName = serverName ?: realAddress?.hostname
}
return tlsSocket.connection()
} catch (cause: Throwable) {
Expand Down Expand Up @@ -233,7 +233,7 @@ internal class Endpoint(
}

private fun releaseConnection() {
val address = InetSocketAddress(host, port)
val address = address.value
connectionFactory.release(address)
connections.decrementAndGet()
}
Expand Down
12 changes: 11 additions & 1 deletion ktor-server/ktor-server-cio/api/ktor-server-cio.api
Expand Up @@ -4,9 +4,10 @@ public final class io/ktor/server/cio/CIO : io/ktor/server/engine/ApplicationEng
public synthetic fun create (Lio/ktor/server/engine/ApplicationEngineEnvironment;Lkotlin/jvm/functions/Function1;)Lio/ktor/server/engine/ApplicationEngine;
}

public final class io/ktor/server/cio/CIOApplicationEngine : io/ktor/server/engine/BaseApplicationEngine {
public final class io/ktor/server/cio/CIOApplicationEngine : io/ktor/server/engine/BaseApplicationEngine, io/ktor/server/cio/CIOApplicationEngineInterface {
public fun <init> (Lio/ktor/server/engine/ApplicationEngineEnvironment;Lkotlin/jvm/functions/Function1;)V
public fun start (Z)Lio/ktor/server/engine/ApplicationEngine;
public fun startHttpServer (Lkotlinx/coroutines/CoroutineScope;Lio/ktor/server/engine/EngineConnectorConfig;JLkotlin/jvm/functions/Function3;)Lio/ktor/server/cio/HttpServer;
public fun stop (JJ)V
}

Expand All @@ -16,6 +17,14 @@ public final class io/ktor/server/cio/CIOApplicationEngine$Configuration : io/kt
public final fun setConnectionIdleTimeoutSeconds (I)V
}

public abstract interface class io/ktor/server/cio/CIOApplicationEngineInterface : io/ktor/server/engine/ApplicationEngine {
public abstract fun startHttpServer (Lkotlinx/coroutines/CoroutineScope;Lio/ktor/server/engine/EngineConnectorConfig;JLkotlin/jvm/functions/Function3;)Lio/ktor/server/cio/HttpServer;
}

public final class io/ktor/server/cio/CIOApplicationEngineInterface$DefaultImpls {
public static fun getApplication (Lio/ktor/server/cio/CIOApplicationEngineInterface;)Lio/ktor/server/application/Application;
}

public final class io/ktor/server/cio/EngineMain {
public static final field INSTANCE Lio/ktor/server/cio/EngineMain;
public static final fun main ([Ljava/lang/String;)V
Expand Down Expand Up @@ -57,6 +66,7 @@ public final class io/ktor/server/cio/PipelineKt {

public final class io/ktor/server/cio/backend/HttpServerKt {
public static final fun httpServer (Lkotlinx/coroutines/CoroutineScope;Lio/ktor/server/cio/HttpServerSettings;Lkotlin/jvm/functions/Function3;)Lio/ktor/server/cio/HttpServer;
public static final fun httpServer (Lkotlinx/coroutines/CoroutineScope;Lkotlin/jvm/functions/Function0;Lio/ktor/server/cio/internal/WeakTimeoutQueue;Lkotlinx/coroutines/CoroutineName;Lkotlinx/coroutines/CoroutineName;Lio/ktor/network/selector/SelectorManager;Lkotlin/jvm/functions/Function3;)Lio/ktor/server/cio/HttpServer;
}

public final class io/ktor/server/cio/backend/ServerIncomingConnection {
Expand Down
Expand Up @@ -10,17 +10,27 @@ import io.ktor.server.application.*
import io.ktor.server.cio.backend.*
import io.ktor.server.cio.internal.*
import io.ktor.server.engine.*
import io.ktor.util.*
import io.ktor.util.pipeline.*
import kotlinx.atomicfu.*
import kotlinx.coroutines.*

public interface CIOApplicationEngineInterface : ApplicationEngine {
@InternalAPI
public fun CoroutineScope.startHttpServer(
connectorConfig: EngineConnectorConfig,
connectionIdleTimeoutSeconds: Long,
handleRequest: suspend ServerRequestScope.(Request) -> Unit
): HttpServer
}

/**
* Engine that based on CIO backend
*/
public class CIOApplicationEngine(
environment: ApplicationEngineEnvironment,
configure: Configuration.() -> Unit
) : BaseApplicationEngine(environment) {
) : BaseApplicationEngine(environment), CIOApplicationEngineInterface {

/**
* CIO-based server configuration
Expand Down Expand Up @@ -97,11 +107,16 @@ public class CIOApplicationEngine(
}
}

private fun CoroutineScope.startConnector(host: String, port: Int): HttpServer {
@InternalAPI
override fun CoroutineScope.startHttpServer(
connectorConfig: EngineConnectorConfig,
connectionIdleTimeoutSeconds: Long,
handleRequest: suspend ServerRequestScope.(Request) -> Unit
): HttpServer {
val settings = HttpServerSettings(
host = host,
port = port,
connectionIdleTimeoutSeconds = configuration.connectionIdleTimeoutSeconds.toLong()
host = connectorConfig.host,
port = connectorConfig.port,
connectionIdleTimeoutSeconds = connectionIdleTimeoutSeconds
)

return httpServer(settings) { request ->
Expand Down Expand Up @@ -133,6 +148,7 @@ public class CIOApplicationEngine(
}
}

@OptIn(InternalAPI::class)
private fun initServerJob(): Job {
val environment = environment
val userDispatcher = userDispatcher
Expand Down Expand Up @@ -160,7 +176,12 @@ public class CIOApplicationEngine(
}

val connectorsAndServers = environment.connectors.map { connectorSpec ->
connectorSpec to startConnector(connectorSpec.host, connectorSpec.port)
connectorSpec to startHttpServer(
connectorConfig = connectorSpec,
connectionIdleTimeoutSeconds = configuration.connectionIdleTimeoutSeconds.toLong()
) {
handleRequest(it)
}
}
connectors.addAll(connectorsAndServers.map { it.second })

Expand Down
Expand Up @@ -7,7 +7,7 @@ package io.ktor.server.cio.backend
import io.ktor.network.selector.*
import io.ktor.network.sockets.*
import io.ktor.server.cio.*
import io.ktor.server.cio.internal.WeakTimeoutQueue
import io.ktor.server.cio.internal.*
import io.ktor.server.engine.*
import io.ktor.server.engine.internal.*
import io.ktor.util.*
Expand All @@ -22,30 +22,52 @@ import kotlinx.coroutines.*
public fun CoroutineScope.httpServer(
settings: HttpServerSettings,
handler: HttpRequestHandler
): HttpServer {
val selector = SelectorManager(coroutineContext)
return httpServer(
createServer = {
aSocket(selector).tcp().bind(
hostname = settings.host,
port = settings.port
)
},
serverJobName = CoroutineName("server-root-${settings.port}"),
acceptJobName = CoroutineName("accept-${settings.port}"),
timeout = WeakTimeoutQueue(settings.connectionIdleTimeoutSeconds * 1000L),
selector = selector,
handler = handler
)
}

/**
* Start an http server with [settings] invoking [handler] for every request
*/
@InternalAPI
public fun CoroutineScope.httpServer(
createServer: () -> ServerSocket,
timeout: WeakTimeoutQueue,
serverJobName: CoroutineName,
acceptJobName: CoroutineName,
selector: SelectorManager,
handler: HttpRequestHandler
): HttpServer {
val socket = CompletableDeferred<ServerSocket>()

val serverLatch: CompletableJob = Job()

@OptIn(ExperimentalCoroutinesApi::class)
val serverJob = launch(
context = CoroutineName("server-root-${settings.port}"),
context = serverJobName,
start = CoroutineStart.UNDISPATCHED
) {
serverLatch.join()
}

val selector = SelectorManager(coroutineContext)
val timeout = WeakTimeoutQueue(
settings.connectionIdleTimeoutSeconds * 1000L
)

val logger = KtorSimpleLogger(
HttpServer::class.simpleName ?: HttpServer::class.qualifiedName ?: HttpServer::class.toString()
)

val acceptJob = launch(serverJob + CoroutineName("accept-${settings.port}")) {
aSocket(selector).tcp().bind(settings.host, settings.port).use { server ->
val acceptJob = launch(serverJob + acceptJobName) {
createServer().use { server ->
socket.complete(server)

val exceptionHandler = coroutineContext[CoroutineExceptionHandler]
Expand Down

0 comments on commit 4aa20ec

Please sign in to comment.