From 6453001b4eb5cbd2dfcf291fd27ce31b5633494e Mon Sep 17 00:00:00 2001 From: "leonid.stashevsky" Date: Wed, 24 Aug 2022 11:33:20 +0200 Subject: [PATCH 1/3] KTOR-2036 Fix CIO connection limit --- .../src/io/ktor/client/engine/cio/CIOEngine.kt | 7 ++++++- .../client/engine/cio/ConnectionFactory.kt | 18 ++++++++++++------ .../src/io/ktor/client/engine/cio/Endpoint.kt | 5 +++-- .../io/ktor/client/engine/mock/MockUtils.kt | 2 +- 4 files changed, 22 insertions(+), 10 deletions(-) diff --git a/ktor-client/ktor-client-cio/jvmAndNix/src/io/ktor/client/engine/cio/CIOEngine.kt b/ktor-client/ktor-client-cio/jvmAndNix/src/io/ktor/client/engine/cio/CIOEngine.kt index a2aa49ea07..0f4ba1c93d 100644 --- a/ktor-client/ktor-client-cio/jvmAndNix/src/io/ktor/client/engine/cio/CIOEngine.kt +++ b/ktor-client/ktor-client-cio/jvmAndNix/src/io/ktor/client/engine/cio/CIOEngine.kt @@ -33,7 +33,11 @@ internal class CIOEngine( private val selectorManager: SelectorManager by lazy { SelectorManager(dispatcher) } - private val connectionFactory = ConnectionFactory(selectorManager, config.maxConnectionsCount) + private val connectionFactory = ConnectionFactory( + selectorManager, + config.maxConnectionsCount, + config.endpoint.maxConnectionsPerRoute + ) private val requestsJob: CoroutineContext @@ -42,6 +46,7 @@ internal class CIOEngine( private val proxy: ProxyConfig? = when (val type = config.proxy?.type) { ProxyType.SOCKS, null -> null + ProxyType.HTTP -> config.proxy else -> throw IllegalStateException("CIO engine does not currently support $type proxies.") } diff --git a/ktor-client/ktor-client-cio/jvmAndNix/src/io/ktor/client/engine/cio/ConnectionFactory.kt b/ktor-client/ktor-client-cio/jvmAndNix/src/io/ktor/client/engine/cio/ConnectionFactory.kt index 3b39274534..8b114df285 100644 --- a/ktor-client/ktor-client-cio/jvmAndNix/src/io/ktor/client/engine/cio/ConnectionFactory.kt +++ b/ktor-client/ktor-client-cio/jvmAndNix/src/io/ktor/client/engine/cio/ConnectionFactory.kt @@ -6,29 +6,35 @@ package io.ktor.client.engine.cio import io.ktor.network.selector.* import io.ktor.network.sockets.* +import io.ktor.util.collections.* import kotlinx.coroutines.sync.* internal class ConnectionFactory( private val selector: SelectorManager, - maxConnectionsCount: Int + connectionsLimit: Int, + private val addressConnectionsLimit: Int ) { - private val semaphore = Semaphore(maxConnectionsCount) + private val limit = Semaphore(connectionsLimit) + private val addressLimit = ConcurrentMap() suspend fun connect( address: InetSocketAddress, configuration: SocketOptions.TCPClientSocketOptions.() -> Unit = {} ): Socket { - semaphore.acquire() + limit.acquire() + addressLimit.computeIfAbsent(address) { Semaphore(addressConnectionsLimit) }.acquire() + return try { aSocket(selector).tcpNoDelay().tcp().connect(address, configuration) } catch (cause: Throwable) { // a failure or cancellation - semaphore.release() + limit.release() throw cause } } - fun release() { - semaphore.release() + fun release(address: InetSocketAddress) { + addressLimit[address]!!.release() + limit.release() } } diff --git a/ktor-client/ktor-client-cio/jvmAndNix/src/io/ktor/client/engine/cio/Endpoint.kt b/ktor-client/ktor-client-cio/jvmAndNix/src/io/ktor/client/engine/cio/Endpoint.kt index c6e95c800f..22cbe2401e 100644 --- a/ktor-client/ktor-client-cio/jvmAndNix/src/io/ktor/client/engine/cio/Endpoint.kt +++ b/ktor-client/ktor-client-cio/jvmAndNix/src/io/ktor/client/engine/cio/Endpoint.kt @@ -188,7 +188,7 @@ internal class Endpoint( } catch (_: Throwable) { } - connectionFactory.release() + connectionFactory.release(address) throw cause } } @@ -229,7 +229,8 @@ internal class Endpoint( } private fun releaseConnection() { - connectionFactory.release() + val address = InetSocketAddress(host, port) + connectionFactory.release(address) connections.decrementAndGet() } diff --git a/ktor-client/ktor-client-mock/common/src/io/ktor/client/engine/mock/MockUtils.kt b/ktor-client/ktor-client-mock/common/src/io/ktor/client/engine/mock/MockUtils.kt index c96501da33..c7efda3aea 100644 --- a/ktor-client/ktor-client-mock/common/src/io/ktor/client/engine/mock/MockUtils.kt +++ b/ktor-client/ktor-client-mock/common/src/io/ktor/client/engine/mock/MockUtils.kt @@ -31,8 +31,8 @@ public suspend fun OutgoingContent.toByteArray(): ByteArray = when (this) { else -> ByteArray(0) } -@OptIn(DelicateCoroutinesApi::class) @Suppress("KDocMissingDocumentation") +@OptIn(DelicateCoroutinesApi::class) public suspend fun OutgoingContent.toByteReadPacket(): ByteReadPacket = when (this) { is OutgoingContent.ByteArrayContent -> ByteReadPacket(bytes()) is OutgoingContent.ReadChannelContent -> readFrom().readRemaining() From 41b863d2076860ad675691f7001975993a7b86e1 Mon Sep 17 00:00:00 2001 From: "leonid.stashevsky" Date: Wed, 31 Aug 2022 10:22:50 +0200 Subject: [PATCH 2/3] fixup! KTOR-2036 Fix CIO connection limit --- .../test/io/ktor/client/engine/cio/CIORequestTest.kt | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/ktor-client/ktor-client-cio/jvm/test/io/ktor/client/engine/cio/CIORequestTest.kt b/ktor-client/ktor-client-cio/jvm/test/io/ktor/client/engine/cio/CIORequestTest.kt index 6d1b962a79..0bdca5a931 100644 --- a/ktor-client/ktor-client-cio/jvm/test/io/ktor/client/engine/cio/CIORequestTest.kt +++ b/ktor-client/ktor-client-cio/jvm/test/io/ktor/client/engine/cio/CIORequestTest.kt @@ -5,6 +5,7 @@ package io.ktor.client.engine.cio import io.ktor.client.call.* +import io.ktor.client.network.sockets.* import io.ktor.client.plugins.* import io.ktor.client.request.* import io.ktor.client.statement.* @@ -147,13 +148,19 @@ class CIORequestTest : TestWithKtor() { } test { client -> + var fail: Throwable? = null for (i in 0..1000) { try { client.get("http://something.wrong").body() - } catch (cause: UnresolvedAddressException) { - // ignore + } catch (cause: Throwable) { + fail = cause } } + + assertNotNull(fail) + if (fail !is ConnectTimeoutException && fail !is UnresolvedAddressException) { + fail("Expected ConnectTimeoutException or UnresolvedAddressException, got $fail", fail) + } } } } From 0252f21b6db56d82d65732a032d4fbff24d9a17b Mon Sep 17 00:00:00 2001 From: "leonid.stashevsky" Date: Thu, 1 Sep 2022 10:55:40 +0200 Subject: [PATCH 3/3] fixup! fixup! KTOR-2036 Fix CIO connection limit --- .../src/io/ktor/client/engine/cio/ConnectionFactory.kt | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/ktor-client/ktor-client-cio/jvmAndNix/src/io/ktor/client/engine/cio/ConnectionFactory.kt b/ktor-client/ktor-client-cio/jvmAndNix/src/io/ktor/client/engine/cio/ConnectionFactory.kt index 8b114df285..dd2c3139dc 100644 --- a/ktor-client/ktor-client-cio/jvmAndNix/src/io/ktor/client/engine/cio/ConnectionFactory.kt +++ b/ktor-client/ktor-client-cio/jvmAndNix/src/io/ktor/client/engine/cio/ConnectionFactory.kt @@ -22,12 +22,14 @@ internal class ConnectionFactory( configuration: SocketOptions.TCPClientSocketOptions.() -> Unit = {} ): Socket { limit.acquire() - addressLimit.computeIfAbsent(address) { Semaphore(addressConnectionsLimit) }.acquire() + val addressSemaphore = addressLimit.computeIfAbsent(address) { Semaphore(addressConnectionsLimit) } + addressSemaphore.acquire() return try { aSocket(selector).tcpNoDelay().tcp().connect(address, configuration) } catch (cause: Throwable) { // a failure or cancellation + addressSemaphore.release() limit.release() throw cause }