-
Notifications
You must be signed in to change notification settings - Fork 1k
/
DefaultWebSocketTest.kt
155 lines (120 loc) · 4.75 KB
/
DefaultWebSocketTest.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
/*
* Copyright 2014-2021 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license.
*/
package io.ktor.tests.websocket
import io.ktor.server.testing.*
import io.ktor.util.*
import io.ktor.utils.io.*
import io.ktor.utils.io.charsets.*
import io.ktor.utils.io.core.*
import io.ktor.websocket.*
import kotlinx.coroutines.*
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.channels.*
import kotlin.test.*
@OptIn(ExperimentalCoroutinesApi::class)
class DefaultWebSocketTest : BaseTest() {
private lateinit var parent: CompletableJob
private lateinit var client2server: ByteChannel
private lateinit var server2client: ByteChannel
private lateinit var server: DefaultWebSocketSession
private lateinit var client: WebSocketSession
@OptIn(InternalAPI::class)
@BeforeTest
fun prepare() {
parent = Job()
client2server = ByteChannel()
server2client = ByteChannel()
server = DefaultWebSocketSession(
RawWebSocket(client2server, server2client, coroutineContext = parent),
-1L,
1000L
)
server.start()
client = RawWebSocket(server2client, client2server, coroutineContext = parent)
}
@AfterTest
fun cleanup() {
server.cancel()
client.cancel()
client2server.cancel()
server2client.cancel()
parent.cancel()
}
@Test
fun closeByClient(): Unit = runTest {
val reason = CloseReason(CloseReason.Codes.NORMAL, "test1")
client.close(reason)
assertEquals(reason, server.closeReason.await())
// server for sure received a close frame so it should reply with a duplicate close frame
// so we should be able to receive it at client side
val closed = client.incoming.receive() as Frame.Close
assertEquals(reason, closed.readReason())
ensureCompletion()
}
@Test
fun pingPong(): Unit = runTest {
val pingsMessages = (1..5).map { "ping $it" }
pingsMessages.forEach {
client.send(Frame.Ping(it.encodeToByteArray()))
}
pingsMessages.forEach {
assertEquals(it, String((client.incoming.receive() as Frame.Pong).readBytes(), charset = Charsets.UTF_8))
}
client.close()
assertTrue(client.incoming.receive() is Frame.Close)
ensureCompletion()
}
@Test
@OptIn(InternalAPI::class)
fun testPingPongTimeout(): Unit = runTest {
cleanup()
parent = Job()
client2server = ByteChannel()
server2client = ByteChannel()
server = DefaultWebSocketSession(
RawWebSocket(client2server, server2client, coroutineContext = parent),
500L,
500L
)
server.start()
client = RawWebSocket(server2client, client2server, coroutineContext = parent)
assertTrue(client.incoming.receive() is Frame.Ping)
delay(1000)
assertTrue(client.incoming.receive() is Frame.Close)
assertTrue("server incoming should be closed") { server.incoming.isClosedForReceive }
assertTrue("server outgoing should be closed") { server.outgoing.isClosedForSend }
assertTrue("server should be closed") { server.closeReason.isCompleted }
client.close()
}
@Test
fun testCancellation(): Unit = runTest {
server.cancel()
client.incoming.receiveCatching().getOrNull()
client.close()
ensureCompletion()
}
private suspend fun ensureCompletion() {
parent.complete()
parent.join()
assertTrue("client -> server channel should be closed") { client2server.isClosedForRead }
assertTrue("client -> server channel should be closed") { client2server.isClosedForWrite }
assertTrue("server -> client channel should be closed") { server2client.isClosedForRead }
assertTrue("server -> client channel should be closed") { server2client.isClosedForWrite }
try {
server.incoming.consumeEach {
assertTrue("It should be no control frames") { !it.frameType.controlFrame }
}
} catch (_: CancellationException) {
}
try {
client.incoming.consumeEach {}
} catch (_: CancellationException) {
}
assertTrue("client incoming should be closed") { client.incoming.isClosedForReceive }
assertTrue("server incoming should be closed") { server.incoming.isClosedForReceive }
assertTrue("client outgoing should be closed") { client.outgoing.isClosedForSend }
assertTrue("server outgoing should be closed") { server.outgoing.isClosedForSend }
assertTrue("server closeReason should be completed") { server.closeReason.isCompleted }
}
}