Skip to content

Commit

Permalink
KTOR-3504 KTOR-2430 Fix WS timeout (#3200)
Browse files Browse the repository at this point in the history
Timeout should close incoming and outgoing channels, after sending close frame

https://youtrack.jetbrains.com/issue/KTOR-2430
https://youtrack.jetbrains.com/issue/KTOR-3504
  • Loading branch information
rsinukov committed Oct 14, 2022
1 parent 5ec968c commit 3b7b62d
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 5 deletions.
Expand Up @@ -85,6 +85,33 @@ class DefaultWebSocketTest : BaseTest() {
ensureCompletion()
}

@Test
@OptIn(InternalAPI::class)
fun testPingPongTimeout(): Unit = runTest {
cleanup()

parent = Job()
client2server = ByteChannel()
server2client = ByteChannel()

server = DefaultWebSocketSession(
RawWebSocket(client2server, server2client, coroutineContext = parent),
500L,
500L
)
server.start()

client = RawWebSocket(server2client, client2server, coroutineContext = parent)
assertTrue(client.incoming.receive() is Frame.Ping)
delay(1000)
assertTrue(client.incoming.receive() is Frame.Close)

assertTrue("server incoming should be closed") { server.incoming.isClosedForReceive }
assertTrue("server outgoing should be closed") { server.outgoing.isClosedForSend }
assertTrue("server should be closed") { server.closeReason.isCompleted }
client.close()
}

@Test
fun testCancellation(): Unit = runTest {
server.cancel()
Expand Down Expand Up @@ -122,5 +149,7 @@ class DefaultWebSocketTest : BaseTest() {

assertTrue("client outgoing should be closed") { client.outgoing.isClosedForSend }
assertTrue("server outgoing should be closed") { server.outgoing.isClosedForSend }

assertTrue("server closeReason should be completed") { server.closeReason.isCompleted }
}
}
Expand Up @@ -10,6 +10,7 @@ import io.ktor.utils.io.core.*
import kotlinx.atomicfu.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import io.ktor.utils.io.errors.*
import kotlin.coroutines.*

/**
Expand Down Expand Up @@ -210,7 +211,6 @@ internal class DefaultWebSocketSessionImpl(
}
}

@OptIn(ExperimentalCoroutinesApi::class)
private fun runOutgoingProcessor(): Job = launch(
OutgoingProcessorCoroutineName + Dispatchers.Unconfined,
start = CoroutineStart.UNDISPATCHED
Expand Down Expand Up @@ -247,7 +247,7 @@ internal class DefaultWebSocketSessionImpl(
}

@OptIn(InternalAPI::class)
private suspend fun sendCloseSequence(reason: CloseReason?) {
private suspend fun sendCloseSequence(reason: CloseReason?, exception: Throwable? = null) {
if (!tryClose()) return
context.complete()

Expand All @@ -260,6 +260,11 @@ internal class DefaultWebSocketSessionImpl(
}
} finally {
closeReasonRef.complete(reasonToSend)

if (exception != null) {
outgoingToBeProcessed.close(exception)
filtered.close(exception)
}
}
}

Expand All @@ -270,7 +275,9 @@ internal class DefaultWebSocketSessionImpl(

val newPinger: SendChannel<Frame.Pong>? = when {
closed.value -> null
interval > 0L -> pinger(raw.outgoing, interval, timeoutMillis)
interval > 0L -> pinger(raw.outgoing, interval, timeoutMillis) {
sendCloseSequence(it, IOException("Ping timeout"))
}
else -> null
}

Expand Down
Expand Up @@ -45,6 +45,7 @@ internal fun CoroutineScope.pinger(
outgoing: SendChannel<Frame>,
periodMillis: Long,
timeoutMillis: Long,
onTimeout: suspend (CloseReason) -> Unit
): SendChannel<Frame.Pong> {
val actorJob = Job()

Expand Down Expand Up @@ -82,8 +83,7 @@ internal fun CoroutineScope.pinger(
// we were unable to send the ping or hadn't got a valid pong message in time,
// so we are triggering close sequence (if already started then the following close frame could be ignored)

val closeFrame = Frame.Close(CloseReason(CloseReason.Codes.INTERNAL_ERROR, "Ping timeout"))
outgoing.send(closeFrame)
onTimeout(CloseReason(CloseReason.Codes.INTERNAL_ERROR, "Ping timeout"))
break
}
}
Expand Down

0 comments on commit 3b7b62d

Please sign in to comment.