Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KTOR-3504 KTOR-2430 Fix WS timeout #3200

Merged
merged 1 commit into from Oct 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -85,6 +85,33 @@ class DefaultWebSocketTest : BaseTest() {
ensureCompletion()
}

@Test
@OptIn(InternalAPI::class)
fun testPingPongTimeout(): Unit = runTest {
cleanup()

parent = Job()
client2server = ByteChannel()
server2client = ByteChannel()

server = DefaultWebSocketSession(
RawWebSocket(client2server, server2client, coroutineContext = parent),
500L,
500L
)
server.start()

client = RawWebSocket(server2client, client2server, coroutineContext = parent)
assertTrue(client.incoming.receive() is Frame.Ping)
delay(1000)
assertTrue(client.incoming.receive() is Frame.Close)

assertTrue("server incoming should be closed") { server.incoming.isClosedForReceive }
assertTrue("server outgoing should be closed") { server.outgoing.isClosedForSend }
assertTrue("server should be closed") { server.closeReason.isCompleted }
client.close()
}

@Test
fun testCancellation(): Unit = runTest {
server.cancel()
Expand Down Expand Up @@ -122,5 +149,7 @@ class DefaultWebSocketTest : BaseTest() {

assertTrue("client outgoing should be closed") { client.outgoing.isClosedForSend }
assertTrue("server outgoing should be closed") { server.outgoing.isClosedForSend }

assertTrue("server closeReason should be completed") { server.closeReason.isCompleted }
}
}
Expand Up @@ -10,6 +10,7 @@ import io.ktor.utils.io.core.*
import kotlinx.atomicfu.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import io.ktor.utils.io.errors.*
import kotlin.coroutines.*

/**
Expand Down Expand Up @@ -210,7 +211,6 @@ internal class DefaultWebSocketSessionImpl(
}
}

@OptIn(ExperimentalCoroutinesApi::class)
private fun runOutgoingProcessor(): Job = launch(
OutgoingProcessorCoroutineName + Dispatchers.Unconfined,
start = CoroutineStart.UNDISPATCHED
Expand Down Expand Up @@ -247,7 +247,7 @@ internal class DefaultWebSocketSessionImpl(
}

@OptIn(InternalAPI::class)
private suspend fun sendCloseSequence(reason: CloseReason?) {
private suspend fun sendCloseSequence(reason: CloseReason?, exception: Throwable? = null) {
if (!tryClose()) return
context.complete()

Expand All @@ -260,6 +260,11 @@ internal class DefaultWebSocketSessionImpl(
}
} finally {
closeReasonRef.complete(reasonToSend)

if (exception != null) {
outgoingToBeProcessed.close(exception)
filtered.close(exception)
}
}
}

Expand All @@ -270,7 +275,9 @@ internal class DefaultWebSocketSessionImpl(

val newPinger: SendChannel<Frame.Pong>? = when {
closed.value -> null
interval > 0L -> pinger(raw.outgoing, interval, timeoutMillis)
interval > 0L -> pinger(raw.outgoing, interval, timeoutMillis) {
sendCloseSequence(it, IOException("Ping timeout"))
}
else -> null
}

Expand Down
Expand Up @@ -45,6 +45,7 @@ internal fun CoroutineScope.pinger(
outgoing: SendChannel<Frame>,
periodMillis: Long,
timeoutMillis: Long,
onTimeout: suspend (CloseReason) -> Unit
): SendChannel<Frame.Pong> {
val actorJob = Job()

Expand Down Expand Up @@ -82,8 +83,7 @@ internal fun CoroutineScope.pinger(
// we were unable to send the ping or hadn't got a valid pong message in time,
// so we are triggering close sequence (if already started then the following close frame could be ignored)

val closeFrame = Frame.Close(CloseReason(CloseReason.Codes.INTERNAL_ERROR, "Ping timeout"))
outgoing.send(closeFrame)
onTimeout(CloseReason(CloseReason.Codes.INTERNAL_ERROR, "Ping timeout"))
break
}
}
Expand Down