Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KTOR-4766 - Add basic support to use UnixSockets with CIO #3342

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle.kts
Expand Up @@ -45,8 +45,8 @@ buildscript {
extra["native_targets_enabled"] = rootProject.properties["disable_native_targets"] == null

repositories {
mavenLocal()
mavenCentral()
mavenLocal()
google()
gradlePluginPortal()
maven("https://maven.pkg.jetbrains.space/kotlin/p/kotlin/dev")
Expand Down
20 changes: 20 additions & 0 deletions ktor-client/ktor-client-cio/api/ktor-client-cio.api
Expand Up @@ -4,6 +4,17 @@ public final class io/ktor/client/engine/cio/CIO : io/ktor/client/engine/HttpCli
public fun toString ()Ljava/lang/String;
}

public final class io/ktor/client/engine/cio/CIOEngine : io/ktor/client/engine/HttpClientEngineBase, io/ktor/client/engine/cio/CIOHttpClientEngine {
public fun <init> (Lio/ktor/client/engine/cio/CIOEngineConfig;)V
public fun close ()V
public fun createAddress (Ljava/lang/String;I)Lio/ktor/network/sockets/SocketAddress;
public fun execute (Lio/ktor/client/request/HttpRequestData;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public synthetic fun getConfig ()Lio/ktor/client/engine/HttpClientEngineConfig;
public fun getConfig ()Lio/ktor/client/engine/cio/CIOEngineConfig;
public fun getCoroutineContext ()Lkotlin/coroutines/CoroutineContext;
public fun getSupportedCapabilities ()Ljava/util/Set;
}

public final class io/ktor/client/engine/cio/CIOEngineConfig : io/ktor/client/engine/HttpClientEngineConfig {
public fun <init> ()V
public final fun getEndpoint ()Lio/ktor/client/engine/cio/EndpointConfig;
Expand All @@ -25,6 +36,15 @@ public final class io/ktor/client/engine/cio/CIOEngineContainer : io/ktor/client
public fun toString ()Ljava/lang/String;
}

public abstract interface class io/ktor/client/engine/cio/CIOHttpClientEngine : io/ktor/client/engine/HttpClientEngine {
public abstract fun createAddress (Ljava/lang/String;I)Lio/ktor/network/sockets/SocketAddress;
}

public final class io/ktor/client/engine/cio/CIOHttpClientEngine$DefaultImpls {
public static fun getSupportedCapabilities (Lio/ktor/client/engine/cio/CIOHttpClientEngine;)Ljava/util/Set;
public static fun install (Lio/ktor/client/engine/cio/CIOHttpClientEngine;Lio/ktor/client/HttpClient;)V
}

public final class io/ktor/client/engine/cio/EndpointConfig {
public fun <init> ()V
public final fun getAllowHalfClose ()Z
Expand Down
Expand Up @@ -6,6 +6,7 @@
package io.ktor.client.engine.cio

import io.ktor.client.engine.*
import io.ktor.utils.io.*

/**
* An asynchronous coroutine-based engine that can be used on JVM, Android, and Kotlin/Native.
Expand All @@ -30,6 +31,7 @@ public object CIO : HttpClientEngineFactory<CIOEngineConfig> {
addToLoader()
}

@OptIn(InternalAPI::class)
override fun create(block: CIOEngineConfig.() -> Unit): HttpClientEngine =
CIOEngine(CIOEngineConfig().apply(block))

Expand Down
Expand Up @@ -11,6 +11,7 @@ import io.ktor.client.plugins.websocket.*
import io.ktor.client.request.*
import io.ktor.http.*
import io.ktor.network.selector.*
import io.ktor.network.sockets.*
import io.ktor.util.*
import io.ktor.util.collections.*
import io.ktor.util.network.*
Expand All @@ -19,12 +20,19 @@ import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlin.coroutines.*

@OptIn(InternalAPI::class, DelicateCoroutinesApi::class)
internal class CIOEngine(
public interface CIOHttpClientEngine : HttpClientEngine {
public fun createAddress(host: String, port: Int): SocketAddress
}

@InternalAPI
@OptIn(DelicateCoroutinesApi::class)
public class CIOEngine(
override val config: CIOEngineConfig
) : HttpClientEngineBase("ktor-cio") {
) : HttpClientEngineBase("ktor-cio"), CIOHttpClientEngine {


override val supportedCapabilities =

override val supportedCapabilities: Set<HttpClientEngineCapability<*>> =
setOf(HttpTimeoutCapability, WebSocketCapability, WebSocketExtensionsCapability, SSECapability)

private val endpoints = ConcurrentMap<String, Endpoint>()
Expand Down Expand Up @@ -102,6 +110,8 @@ internal class CIOEngine(
(requestsJob[Job] as CompletableJob).complete()
}

public override fun createAddress(host: String, port: Int): SocketAddress = InetSocketAddress(host, port)

private fun selectEndpoint(url: Url, proxy: ProxyConfig?): Endpoint {
val host: String
val port: Int
Expand All @@ -121,8 +131,8 @@ internal class CIOEngine(
return endpoints.computeIfAbsent(endpointId) {
val secure = (protocol.isSecure())
Endpoint(
host,
port,
lazy { createAddress(host, port) },
CoroutineName("Endpoint timeout($host:$port)"),
proxy,
secure,
config,
Expand Down
Expand Up @@ -15,10 +15,10 @@ internal class ConnectionFactory(
private val addressConnectionsLimit: Int
) {
private val limit = Semaphore(connectionsLimit)
private val addressLimit = ConcurrentMap<InetSocketAddress, Semaphore>()
private val addressLimit = ConcurrentMap<SocketAddress, Semaphore>()

suspend fun connect(
address: InetSocketAddress,
address: SocketAddress,
configuration: SocketOptions.TCPClientSocketOptions.() -> Unit = {}
): Socket {
limit.acquire()
Expand All @@ -39,7 +39,7 @@ internal class ConnectionFactory(
}
}

fun release(address: InetSocketAddress) {
fun release(address: SocketAddress) {
addressLimit[address]!!.release()
limit.release()
}
Expand Down
Expand Up @@ -21,8 +21,8 @@ import kotlinx.coroutines.channels.*
import kotlin.coroutines.*

internal class Endpoint(
private val host: String,
private val port: Int,
private val address: Lazy<SocketAddress>,
timeoutName: CoroutineName,
private val proxy: ProxyConfig?,
private val secure: Boolean,
private val config: CIOEngineConfig,
Expand All @@ -35,7 +35,7 @@ internal class Endpoint(
private val deliveryPoint: Channel<RequestTask> = Channel()
private val maxEndpointIdleTime: Long = 2 * config.endpoint.connectTimeout

private val timeout = launch(coroutineContext + CoroutineName("Endpoint timeout($host:$port)")) {
private val timeout = launch(coroutineContext + timeoutName) {
try {
while (true) {
val remaining = (lastActivity.value + maxEndpointIdleTime) - getTimeMillis()
Expand Down Expand Up @@ -199,7 +199,7 @@ internal class Endpoint(

try {
repeat(connectAttempts) {
val address = InetSocketAddress(host, port)
val address = address.value

val connect: suspend CoroutineScope.() -> Socket = {
connectionFactory.connect(address) {
Expand Down Expand Up @@ -227,12 +227,12 @@ internal class Endpoint(
startTunnel(requestData, connection.output, connection.input)
}
val realAddress = when (proxy) {
null -> address
null -> address as? InetSocketAddress
else -> InetSocketAddress(requestData.url.host, requestData.url.port)
}
val tlsSocket = connection.tls(coroutineContext) {
takeFrom(config.https)
serverName = serverName ?: realAddress.hostname
serverName = serverName ?: realAddress?.hostname
}
return tlsSocket.connection()
} catch (cause: Throwable) {
Expand Down Expand Up @@ -282,7 +282,7 @@ internal class Endpoint(
}

private fun releaseConnection() {
val address = InetSocketAddress(host, port)
val address = address.value
connectionFactory.release(address)
connections.decrementAndGet()
}
Expand Down
12 changes: 11 additions & 1 deletion ktor-server/ktor-server-cio/api/ktor-server-cio.api
Expand Up @@ -4,9 +4,10 @@ public final class io/ktor/server/cio/CIO : io/ktor/server/engine/ApplicationEng
public synthetic fun create (Lio/ktor/server/engine/ApplicationEngineEnvironment;Lkotlin/jvm/functions/Function1;)Lio/ktor/server/engine/ApplicationEngine;
}

public final class io/ktor/server/cio/CIOApplicationEngine : io/ktor/server/engine/BaseApplicationEngine {
public final class io/ktor/server/cio/CIOApplicationEngine : io/ktor/server/engine/BaseApplicationEngine, io/ktor/server/cio/CIOApplicationEngineInterface {
public fun <init> (Lio/ktor/server/engine/ApplicationEngineEnvironment;Lkotlin/jvm/functions/Function1;)V
public fun start (Z)Lio/ktor/server/engine/ApplicationEngine;
public fun startHttpServer (Lkotlinx/coroutines/CoroutineScope;Lio/ktor/server/engine/EngineConnectorConfig;JLkotlin/jvm/functions/Function3;)Lio/ktor/server/cio/HttpServer;
public fun stop (JJ)V
}

Expand All @@ -18,6 +19,14 @@ public final class io/ktor/server/cio/CIOApplicationEngine$Configuration : io/kt
public final fun setReuseAddress (Z)V
}

public abstract interface class io/ktor/server/cio/CIOApplicationEngineInterface : io/ktor/server/engine/ApplicationEngine {
public abstract fun startHttpServer (Lkotlinx/coroutines/CoroutineScope;Lio/ktor/server/engine/EngineConnectorConfig;JLkotlin/jvm/functions/Function3;)Lio/ktor/server/cio/HttpServer;
}

public final class io/ktor/server/cio/CIOApplicationEngineInterface$DefaultImpls {
public static fun getApplication (Lio/ktor/server/cio/CIOApplicationEngineInterface;)Lio/ktor/server/application/Application;
}

public final class io/ktor/server/cio/EngineMain {
public static final field INSTANCE Lio/ktor/server/cio/EngineMain;
public static final fun main ([Ljava/lang/String;)V
Expand Down Expand Up @@ -51,6 +60,7 @@ public final class io/ktor/server/cio/HttpServerSettings {

public final class io/ktor/server/cio/backend/HttpServerKt {
public static final fun httpServer (Lkotlinx/coroutines/CoroutineScope;Lio/ktor/server/cio/HttpServerSettings;Lkotlin/jvm/functions/Function3;)Lio/ktor/server/cio/HttpServer;
public static final fun httpServer (Lkotlinx/coroutines/CoroutineScope;Lkotlin/jvm/functions/Function0;Lio/ktor/server/cio/internal/WeakTimeoutQueue;Lkotlinx/coroutines/CoroutineName;Lkotlinx/coroutines/CoroutineName;Lio/ktor/network/selector/SelectorManager;Lkotlin/jvm/functions/Function3;)Lio/ktor/server/cio/HttpServer;
}

public final class io/ktor/server/cio/backend/ServerIncomingConnection {
Expand Down
@@ -0,0 +1,39 @@
/*
* Copyright 2014-2023 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
*/

package io.ktor.tests.server.cio

import io.ktor.client.*
import io.ktor.client.request.*
import io.ktor.client.statement.*
import io.ktor.server.engine.*
import io.ktor.server.response.*
import io.ktor.server.routing.*
import io.ktor.test.dispatcher.*
import java.nio.file.*
import kotlin.io.path.*
import kotlin.test.*

class CustomSocketCIOJvmTest {

@Test
fun connectServerAndClientOverUnixSocket() = testSuspend {
val socket = Files.createTempFile("network", "socket").absolutePathString()

val client = HttpClient(CIOSocketClient(socket))
val server = embeddedServer(CIOSocket(socket)) {
routing {
get("/hello") {
call.respondText("Get from Socket")
}
post("/hello") {
call.respondText("Post from Socket")
}
}
}.start(wait = false)
assertEquals("Get from Socket", client.get("/hello").bodyAsText())
assertEquals("Post from Socket", client.post("/hello").bodyAsText())
server.stop(1000L, 1000L)
}
}
Expand Up @@ -6,6 +6,7 @@ package io.ktor.server.cio

import io.ktor.events.*
import io.ktor.http.*
import io.ktor.http.cio.*
import io.ktor.server.application.*
import io.ktor.server.cio.backend.*
import io.ktor.server.cio.internal.*
Expand All @@ -17,13 +18,22 @@ import io.ktor.utils.io.*
import kotlinx.atomicfu.*
import kotlinx.coroutines.*

public interface CIOApplicationEngineInterface : ApplicationEngine {
@InternalAPI
public fun CoroutineScope.startHttpServer(
connectorConfig: EngineConnectorConfig,
connectionIdleTimeoutSeconds: Long,
handleRequest: suspend ServerRequestScope.(Request) -> Unit
): HttpServer
}

/**
* Engine that based on CIO backend
*/
public class CIOApplicationEngine(
environment: ApplicationEngineEnvironment,
configure: Configuration.() -> Unit
) : BaseApplicationEngine(environment) {
) : BaseApplicationEngine(environment), CIOApplicationEngineInterface {

/**
* CIO-based server configuration
Expand Down Expand Up @@ -109,10 +119,15 @@ public class CIOApplicationEngine(
}
}

private fun CoroutineScope.startConnector(host: String, port: Int): HttpServer {
@InternalAPI
override fun CoroutineScope.startHttpServer(
connectorConfig: EngineConnectorConfig,
connectionIdleTimeoutSeconds: Long,
handleRequest: suspend ServerRequestScope.(Request) -> Unit
): HttpServer {
val settings = HttpServerSettings(
host = host,
port = port,
host = connectorConfig.host,
port = connectorConfig.port,
connectionIdleTimeoutSeconds = configuration.connectionIdleTimeoutSeconds.toLong(),
reuseAddress = configuration.reuseAddress
)
Expand Down Expand Up @@ -155,7 +170,7 @@ public class CIOApplicationEngine(
return transferEncoding != null || (contentLength != null && contentLength > 0)
}

private suspend fun ServerRequestScope.handleRequest(request: io.ktor.http.cio.Request) {
private suspend fun ServerRequestScope.handleRequest(request: Request) {
withContext(userDispatcher) {
val call = CIOApplicationCall(
application,
Expand All @@ -180,6 +195,7 @@ public class CIOApplicationEngine(
}
}

@OptIn(InternalAPI::class)
private fun initServerJob(): Job {
val environment = environment
val userDispatcher = userDispatcher
Expand Down Expand Up @@ -207,7 +223,10 @@ public class CIOApplicationEngine(
}

val connectorsAndServers = environment.connectors.map { connectorSpec ->
connectorSpec to startConnector(connectorSpec.host, connectorSpec.port)
connectorSpec to startHttpServer(
connectorConfig = connectorSpec,
connectionIdleTimeoutSeconds = configuration.connectionIdleTimeoutSeconds.toLong()
) { handleRequest(it) }
}
connectors.addAll(connectorsAndServers.map { it.second })

Expand Down
Expand Up @@ -16,8 +16,8 @@ internal class CIOApplicationRequest(
call: PipelineCall,
remoteAddress: NetworkAddress?,
localAddress: NetworkAddress?,
private val input: ByteReadChannel,
private val request: io.ktor.http.cio.Request
input: ByteReadChannel,
private val request: Request
) : BaseApplicationRequest(call) {
override val cookies: RequestCookies by lazy { RequestCookies(this) }

Expand Down