Skip to content

Commit

Permalink
KTOR-3504 KTOR-2430 Fix WS timeout
Browse files Browse the repository at this point in the history
Timeout should close incoming and outgoing channels, after sending close frame

https://youtrack.jetbrains.com/issue/KTOR-2430
https://youtrack.jetbrains.com/issue/KTOR-3504
  • Loading branch information
rsinukov committed Oct 13, 2022
1 parent 5ec968c commit 22cc34f
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 5 deletions.
Expand Up @@ -10,6 +10,7 @@ import io.ktor.utils.io.core.*
import kotlinx.atomicfu.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import io.ktor.utils.io.errors.*
import kotlin.coroutines.*

/**
Expand Down Expand Up @@ -210,7 +211,6 @@ internal class DefaultWebSocketSessionImpl(
}
}

@OptIn(ExperimentalCoroutinesApi::class)
private fun runOutgoingProcessor(): Job = launch(
OutgoingProcessorCoroutineName + Dispatchers.Unconfined,
start = CoroutineStart.UNDISPATCHED
Expand Down Expand Up @@ -247,7 +247,7 @@ internal class DefaultWebSocketSessionImpl(
}

@OptIn(InternalAPI::class)
private suspend fun sendCloseSequence(reason: CloseReason?) {
private suspend fun sendCloseSequence(reason: CloseReason?, exception: Throwable? = null) {
if (!tryClose()) return
context.complete()

Expand All @@ -260,6 +260,11 @@ internal class DefaultWebSocketSessionImpl(
}
} finally {
closeReasonRef.complete(reasonToSend)

if (exception != null) {
outgoingToBeProcessed.close(exception)
filtered.close(exception)
}
}
}

Expand All @@ -270,7 +275,9 @@ internal class DefaultWebSocketSessionImpl(

val newPinger: SendChannel<Frame.Pong>? = when {
closed.value -> null
interval > 0L -> pinger(raw.outgoing, interval, timeoutMillis)
interval > 0L -> pinger(raw.outgoing, interval, timeoutMillis) {
sendCloseSequence(it, IOException("Ping timeout"))
}
else -> null
}

Expand Down
Expand Up @@ -45,6 +45,7 @@ internal fun CoroutineScope.pinger(
outgoing: SendChannel<Frame>,
periodMillis: Long,
timeoutMillis: Long,
onTimeout: suspend (CloseReason) -> Unit
): SendChannel<Frame.Pong> {
val actorJob = Job()

Expand Down Expand Up @@ -82,8 +83,7 @@ internal fun CoroutineScope.pinger(
// we were unable to send the ping or hadn't got a valid pong message in time,
// so we are triggering close sequence (if already started then the following close frame could be ignored)

val closeFrame = Frame.Close(CloseReason(CloseReason.Codes.INTERNAL_ERROR, "Ping timeout"))
outgoing.send(closeFrame)
onTimeout(CloseReason(CloseReason.Codes.INTERNAL_ERROR, "Ping timeout"))
break
}
}
Expand Down

0 comments on commit 22cc34f

Please sign in to comment.