From 449a9849d5fe75482c98661347b354b0b71fb762 Mon Sep 17 00:00:00 2001 From: rsinukov Date: Thu, 13 Oct 2022 19:20:09 +0200 Subject: [PATCH] KTOR-3504 KTOR-2430 Fix WS timeout Timeout should close incoming and outgoing channels, after sending close frame https://youtrack.jetbrains.com/issue/KTOR-2430 https://youtrack.jetbrains.com/issue/KTOR-3504 --- .../tests/websocket/DefaultWebSocketTest.kt | 29 +++++++++++++++++++ .../ktor/websocket/DefaultWebSocketSession.kt | 13 +++++++-- .../common/src/io/ktor/websocket/PingPong.kt | 4 +-- 3 files changed, 41 insertions(+), 5 deletions(-) diff --git a/ktor-server/ktor-server-plugins/ktor-server-websockets/jvmAndNix/test/io/ktor/tests/websocket/DefaultWebSocketTest.kt b/ktor-server/ktor-server-plugins/ktor-server-websockets/jvmAndNix/test/io/ktor/tests/websocket/DefaultWebSocketTest.kt index c506d0b335..f99a4308b1 100644 --- a/ktor-server/ktor-server-plugins/ktor-server-websockets/jvmAndNix/test/io/ktor/tests/websocket/DefaultWebSocketTest.kt +++ b/ktor-server/ktor-server-plugins/ktor-server-websockets/jvmAndNix/test/io/ktor/tests/websocket/DefaultWebSocketTest.kt @@ -85,6 +85,33 @@ class DefaultWebSocketTest : BaseTest() { ensureCompletion() } + @Test + @OptIn(InternalAPI::class) + fun testPingPongTimeout(): Unit = runTest { + cleanup() + + parent = Job() + client2server = ByteChannel() + server2client = ByteChannel() + + server = DefaultWebSocketSession( + RawWebSocket(client2server, server2client, coroutineContext = parent), + 500L, + 500L + ) + server.start() + + client = RawWebSocket(server2client, client2server, coroutineContext = parent) + assertTrue(client.incoming.receive() is Frame.Ping) + delay(1000) + assertTrue(client.incoming.receive() is Frame.Close) + + assertTrue("server incoming should be closed") { server.incoming.isClosedForReceive } + assertTrue("server outgoing should be closed") { server.outgoing.isClosedForSend } + assertTrue("server should be closed") { server.closeReason.isCompleted } + client.close() + } + @Test fun testCancellation(): Unit = runTest { server.cancel() @@ -122,5 +149,7 @@ class DefaultWebSocketTest : BaseTest() { assertTrue("client outgoing should be closed") { client.outgoing.isClosedForSend } assertTrue("server outgoing should be closed") { server.outgoing.isClosedForSend } + + assertTrue("server closeReason should be completed") { server.closeReason.isCompleted } } } diff --git a/ktor-shared/ktor-websockets/common/src/io/ktor/websocket/DefaultWebSocketSession.kt b/ktor-shared/ktor-websockets/common/src/io/ktor/websocket/DefaultWebSocketSession.kt index ee1c5c1ce5..ef73eb2917 100644 --- a/ktor-shared/ktor-websockets/common/src/io/ktor/websocket/DefaultWebSocketSession.kt +++ b/ktor-shared/ktor-websockets/common/src/io/ktor/websocket/DefaultWebSocketSession.kt @@ -10,6 +10,7 @@ import io.ktor.utils.io.core.* import kotlinx.atomicfu.* import kotlinx.coroutines.* import kotlinx.coroutines.channels.* +import io.ktor.utils.io.errors.* import kotlin.coroutines.* /** @@ -210,7 +211,6 @@ internal class DefaultWebSocketSessionImpl( } } - @OptIn(ExperimentalCoroutinesApi::class) private fun runOutgoingProcessor(): Job = launch( OutgoingProcessorCoroutineName + Dispatchers.Unconfined, start = CoroutineStart.UNDISPATCHED @@ -247,7 +247,7 @@ internal class DefaultWebSocketSessionImpl( } @OptIn(InternalAPI::class) - private suspend fun sendCloseSequence(reason: CloseReason?) { + private suspend fun sendCloseSequence(reason: CloseReason?, exception: Throwable? = null) { if (!tryClose()) return context.complete() @@ -260,6 +260,11 @@ internal class DefaultWebSocketSessionImpl( } } finally { closeReasonRef.complete(reasonToSend) + + if (exception != null) { + outgoingToBeProcessed.close(exception) + filtered.close(exception) + } } } @@ -270,7 +275,9 @@ internal class DefaultWebSocketSessionImpl( val newPinger: SendChannel? = when { closed.value -> null - interval > 0L -> pinger(raw.outgoing, interval, timeoutMillis) + interval > 0L -> pinger(raw.outgoing, interval, timeoutMillis) { + sendCloseSequence(it, IOException("Ping timeout")) + } else -> null } diff --git a/ktor-shared/ktor-websockets/common/src/io/ktor/websocket/PingPong.kt b/ktor-shared/ktor-websockets/common/src/io/ktor/websocket/PingPong.kt index 91361ba088..2376c1769a 100644 --- a/ktor-shared/ktor-websockets/common/src/io/ktor/websocket/PingPong.kt +++ b/ktor-shared/ktor-websockets/common/src/io/ktor/websocket/PingPong.kt @@ -45,6 +45,7 @@ internal fun CoroutineScope.pinger( outgoing: SendChannel, periodMillis: Long, timeoutMillis: Long, + onTimeout: suspend (CloseReason) -> Unit ): SendChannel { val actorJob = Job() @@ -82,8 +83,7 @@ internal fun CoroutineScope.pinger( // we were unable to send the ping or hadn't got a valid pong message in time, // so we are triggering close sequence (if already started then the following close frame could be ignored) - val closeFrame = Frame.Close(CloseReason(CloseReason.Codes.INTERNAL_ERROR, "Ping timeout")) - outgoing.send(closeFrame) + onTimeout(CloseReason(CloseReason.Codes.INTERNAL_ERROR, "Ping timeout")) break } }