From 22cc34ff6a00e6e972bb55aa44d6eff5990220e2 Mon Sep 17 00:00:00 2001 From: rsinukov Date: Thu, 13 Oct 2022 19:20:09 +0200 Subject: [PATCH] KTOR-3504 KTOR-2430 Fix WS timeout Timeout should close incoming and outgoing channels, after sending close frame https://youtrack.jetbrains.com/issue/KTOR-2430 https://youtrack.jetbrains.com/issue/KTOR-3504 --- .../io/ktor/websocket/DefaultWebSocketSession.kt | 13 ++++++++++--- .../common/src/io/ktor/websocket/PingPong.kt | 4 ++-- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/ktor-shared/ktor-websockets/common/src/io/ktor/websocket/DefaultWebSocketSession.kt b/ktor-shared/ktor-websockets/common/src/io/ktor/websocket/DefaultWebSocketSession.kt index ee1c5c1ce56..ef73eb29174 100644 --- a/ktor-shared/ktor-websockets/common/src/io/ktor/websocket/DefaultWebSocketSession.kt +++ b/ktor-shared/ktor-websockets/common/src/io/ktor/websocket/DefaultWebSocketSession.kt @@ -10,6 +10,7 @@ import io.ktor.utils.io.core.* import kotlinx.atomicfu.* import kotlinx.coroutines.* import kotlinx.coroutines.channels.* +import io.ktor.utils.io.errors.* import kotlin.coroutines.* /** @@ -210,7 +211,6 @@ internal class DefaultWebSocketSessionImpl( } } - @OptIn(ExperimentalCoroutinesApi::class) private fun runOutgoingProcessor(): Job = launch( OutgoingProcessorCoroutineName + Dispatchers.Unconfined, start = CoroutineStart.UNDISPATCHED @@ -247,7 +247,7 @@ internal class DefaultWebSocketSessionImpl( } @OptIn(InternalAPI::class) - private suspend fun sendCloseSequence(reason: CloseReason?) { + private suspend fun sendCloseSequence(reason: CloseReason?, exception: Throwable? = null) { if (!tryClose()) return context.complete() @@ -260,6 +260,11 @@ internal class DefaultWebSocketSessionImpl( } } finally { closeReasonRef.complete(reasonToSend) + + if (exception != null) { + outgoingToBeProcessed.close(exception) + filtered.close(exception) + } } } @@ -270,7 +275,9 @@ internal class DefaultWebSocketSessionImpl( val newPinger: SendChannel? = when { closed.value -> null - interval > 0L -> pinger(raw.outgoing, interval, timeoutMillis) + interval > 0L -> pinger(raw.outgoing, interval, timeoutMillis) { + sendCloseSequence(it, IOException("Ping timeout")) + } else -> null } diff --git a/ktor-shared/ktor-websockets/common/src/io/ktor/websocket/PingPong.kt b/ktor-shared/ktor-websockets/common/src/io/ktor/websocket/PingPong.kt index 91361ba0885..2376c1769a3 100644 --- a/ktor-shared/ktor-websockets/common/src/io/ktor/websocket/PingPong.kt +++ b/ktor-shared/ktor-websockets/common/src/io/ktor/websocket/PingPong.kt @@ -45,6 +45,7 @@ internal fun CoroutineScope.pinger( outgoing: SendChannel, periodMillis: Long, timeoutMillis: Long, + onTimeout: suspend (CloseReason) -> Unit ): SendChannel { val actorJob = Job() @@ -82,8 +83,7 @@ internal fun CoroutineScope.pinger( // we were unable to send the ping or hadn't got a valid pong message in time, // so we are triggering close sequence (if already started then the following close frame could be ignored) - val closeFrame = Frame.Close(CloseReason(CloseReason.Codes.INTERNAL_ERROR, "Ping timeout")) - outgoing.send(closeFrame) + onTimeout(CloseReason(CloseReason.Codes.INTERNAL_ERROR, "Ping timeout")) break } }