Skip to content

Commit

Permalink
KTOR-2036 Fix CIO connection limit
Browse files Browse the repository at this point in the history
  • Loading branch information
e5l committed Aug 30, 2022
1 parent af882b3 commit 4156899
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 10 deletions.
Expand Up @@ -33,7 +33,11 @@ internal class CIOEngine(

private val selectorManager: SelectorManager by lazy { SelectorManager(dispatcher) }

private val connectionFactory = ConnectionFactory(selectorManager, config.maxConnectionsCount)
private val connectionFactory = ConnectionFactory(
selectorManager,
config.maxConnectionsCount,
config.endpoint.maxConnectionsPerRoute
)

private val requestsJob: CoroutineContext

Expand All @@ -42,6 +46,7 @@ internal class CIOEngine(
private val proxy: ProxyConfig? = when (val type = config.proxy?.type) {
ProxyType.SOCKS,
null -> null

ProxyType.HTTP -> config.proxy
else -> throw IllegalStateException("CIO engine does not currently support $type proxies.")
}
Expand Down
Expand Up @@ -6,29 +6,35 @@ package io.ktor.client.engine.cio

import io.ktor.network.selector.*
import io.ktor.network.sockets.*
import io.ktor.util.collections.*
import kotlinx.coroutines.sync.*

internal class ConnectionFactory(
private val selector: SelectorManager,
maxConnectionsCount: Int
connectionsLimit: Int,
private val addressConnectionsLimit: Int
) {
private val semaphore = Semaphore(maxConnectionsCount)
private val limit = Semaphore(connectionsLimit)
private val addressLimit = ConcurrentMap<InetSocketAddress, Semaphore>()

suspend fun connect(
address: InetSocketAddress,
configuration: SocketOptions.TCPClientSocketOptions.() -> Unit = {}
): Socket {
semaphore.acquire()
limit.acquire()
addressLimit.computeIfAbsent(address) { Semaphore(addressConnectionsLimit) }.acquire()

return try {
aSocket(selector).tcpNoDelay().tcp().connect(address, configuration)
} catch (cause: Throwable) {
// a failure or cancellation
semaphore.release()
limit.release()
throw cause
}
}

fun release() {
semaphore.release()
fun release(address: InetSocketAddress) {
addressLimit[address]!!.release()
limit.release()
}
}
Expand Up @@ -188,7 +188,7 @@ internal class Endpoint(
} catch (_: Throwable) {
}

connectionFactory.release()
connectionFactory.release(address)
throw cause
}
}
Expand Down Expand Up @@ -229,7 +229,8 @@ internal class Endpoint(
}

private fun releaseConnection() {
connectionFactory.release()
val address = InetSocketAddress(host, port)
connectionFactory.release(address)
connections.decrementAndGet()
}

Expand Down
Expand Up @@ -31,8 +31,8 @@ public suspend fun OutgoingContent.toByteArray(): ByteArray = when (this) {
else -> ByteArray(0)
}

@OptIn(DelicateCoroutinesApi::class)
@Suppress("KDocMissingDocumentation")
@OptIn(DelicateCoroutinesApi::class)
public suspend fun OutgoingContent.toByteReadPacket(): ByteReadPacket = when (this) {
is OutgoingContent.ByteArrayContent -> ByteReadPacket(bytes())
is OutgoingContent.ReadChannelContent -> readFrom().readRemaining()
Expand Down

0 comments on commit 4156899

Please sign in to comment.