/
SharedFlowScenarioTest.kt
385 lines (357 loc) · 15.9 KB
/
SharedFlowScenarioTest.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
/*
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.flow
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlin.coroutines.*
import kotlin.test.*
/**
* This test suit for [SharedFlow] has a dense framework that allows to test complex
* suspend/resume scenarios while keeping the code readable. Each test here is for
* one specific [SharedFlow] configuration, testing all the various corner cases in its
* behavior.
*/
class SharedFlowScenarioTest : TestBase() {
@Test
fun testReplay1Extra2() =
testSharedFlow(MutableSharedFlow<Int>(1, 2)) {
// total buffer size == 3
expectReplayOf()
emitRightNow(1); expectReplayOf(1)
emitRightNow(2); expectReplayOf(2)
emitRightNow(3); expectReplayOf(3)
emitRightNow(4); expectReplayOf(4) // no prob - no subscribers
val a = subscribe("a"); collect(a, 4)
emitRightNow(5); expectReplayOf(5)
emitRightNow(6); expectReplayOf(6)
emitRightNow(7); expectReplayOf(7)
// suspend/collect sequentially
val e8 = emitSuspends(8); collect(a, 5); emitResumes(e8); expectReplayOf(8)
val e9 = emitSuspends(9); collect(a, 6); emitResumes(e9); expectReplayOf(9)
// buffer full, but parallel emitters can still suspend (queue up)
val e10 = emitSuspends(10)
val e11 = emitSuspends(11)
val e12 = emitSuspends(12)
collect(a, 7); emitResumes(e10); expectReplayOf(10) // buffer 8, 9 | 10
collect(a, 8); emitResumes(e11); expectReplayOf(11) // buffer 9, 10 | 11
sharedFlow.resetReplayCache(); expectReplayOf() // 9, 10 11 | no replay
collect(a, 9); emitResumes(e12); expectReplayOf(12)
collect(a, 10, 11, 12); expectReplayOf(12) // buffer empty | 12
emitRightNow(13); expectReplayOf(13)
emitRightNow(14); expectReplayOf(14)
emitRightNow(15); expectReplayOf(15) // buffer 13, 14 | 15
val e16 = emitSuspends(16)
val e17 = emitSuspends(17)
val e18 = emitSuspends(18)
cancel(e17); expectReplayOf(15) // cancel in the middle of three emits; buffer 13, 14 | 15
collect(a, 13); emitResumes(e16); expectReplayOf(16) // buffer 14, 15, | 16
collect(a, 14); emitResumes(e18); expectReplayOf(18) // buffer 15, 16 | 18
val e19 = emitSuspends(19)
val e20 = emitSuspends(20)
val e21 = emitSuspends(21)
cancel(e21); expectReplayOf(18) // cancel last emit; buffer 15, 16, 18
collect(a, 15); emitResumes(e19); expectReplayOf(19) // buffer 16, 18 | 19
collect(a, 16); emitResumes(e20); expectReplayOf(20) // buffer 18, 19 | 20
collect(a, 18, 19, 20); expectReplayOf(20) // buffer empty | 20
emitRightNow(22); expectReplayOf(22)
emitRightNow(23); expectReplayOf(23)
emitRightNow(24); expectReplayOf(24) // buffer 22, 23 | 24
val e25 = emitSuspends(25)
val e26 = emitSuspends(26)
val e27 = emitSuspends(27)
cancel(e25); expectReplayOf(24) // cancel first emit, buffer 22, 23 | 24
sharedFlow.resetReplayCache(); expectReplayOf() // buffer 22, 23, 24 | no replay
val b = subscribe("b") // new subscriber
collect(a, 22); emitResumes(e26); expectReplayOf(26) // buffer 23, 24 | 26
collect(b, 26)
collect(a, 23); emitResumes(e27); expectReplayOf(27) // buffer 24, 26 | 27
collect(a, 24, 26, 27) // buffer empty | 27
emitRightNow(28); expectReplayOf(28)
emitRightNow(29); expectReplayOf(29) // buffer 27, 28 | 29
collect(a, 28, 29) // but b is slow
val e30 = emitSuspends(30)
val e31 = emitSuspends(31)
val e32 = emitSuspends(32)
val e33 = emitSuspends(33)
val e34 = emitSuspends(34)
val e35 = emitSuspends(35)
val e36 = emitSuspends(36)
val e37 = emitSuspends(37)
val e38 = emitSuspends(38)
val e39 = emitSuspends(39)
cancel(e31) // cancel emitter in queue
cancel(b) // cancel slow subscriber -> 3 emitters resume
emitResumes(e30); emitResumes(e32); emitResumes(e33); expectReplayOf(33) // buffer 30, 32 | 33
val c = subscribe("c"); collect(c, 33) // replays
cancel(e34)
collect(a, 30); emitResumes(e35); expectReplayOf(35) // buffer 32, 33 | 35
cancel(e37)
cancel(a); emitResumes(e36); emitResumes(e38); expectReplayOf(38) // buffer 35, 36 | 38
collect(c, 35); emitResumes(e39); expectReplayOf(39) // buffer 36, 38 | 39
collect(c, 36, 38, 39); expectReplayOf(39)
cancel(c); expectReplayOf(39) // replay stays
}
@Test
fun testReplay1() =
testSharedFlow(MutableSharedFlow<Int>(1)) {
emitRightNow(0); expectReplayOf(0)
emitRightNow(1); expectReplayOf(1)
emitRightNow(2); expectReplayOf(2)
sharedFlow.resetReplayCache(); expectReplayOf()
sharedFlow.resetReplayCache(); expectReplayOf()
emitRightNow(3); expectReplayOf(3)
emitRightNow(4); expectReplayOf(4)
val a = subscribe("a"); collect(a, 4)
emitRightNow(5); expectReplayOf(5); collect(a, 5)
emitRightNow(6)
sharedFlow.resetReplayCache(); expectReplayOf()
sharedFlow.resetReplayCache(); expectReplayOf()
val e7 = emitSuspends(7)
val e8 = emitSuspends(8)
val e9 = emitSuspends(9)
collect(a, 6); emitResumes(e7); expectReplayOf(7)
sharedFlow.resetReplayCache(); expectReplayOf()
sharedFlow.resetReplayCache(); expectReplayOf() // buffer 7 | -- no replay, but still buffered
val b = subscribe("b")
collect(a, 7); emitResumes(e8); expectReplayOf(8)
collect(b, 8) // buffer | 8 -- a is slow
val e10 = emitSuspends(10)
val e11 = emitSuspends(11)
val e12 = emitSuspends(12)
cancel(e9)
collect(a, 8); emitResumes(e10); expectReplayOf(10)
collect(a, 10) // now b's slow
cancel(e11)
collect(b, 10); emitResumes(e12); expectReplayOf(12)
collect(a, 12)
collect(b, 12)
sharedFlow.resetReplayCache(); expectReplayOf()
sharedFlow.resetReplayCache(); expectReplayOf() // nothing is buffered -- both collectors up to date
emitRightNow(13); expectReplayOf(13)
collect(b, 13) // a is slow
val e14 = emitSuspends(14)
val e15 = emitSuspends(15)
val e16 = emitSuspends(16)
cancel(e14)
cancel(a); emitResumes(e15); expectReplayOf(15) // cancelling slow subscriber
collect(b, 15); emitResumes(e16); expectReplayOf(16)
collect(b, 16)
}
@Test
fun testReplay2Extra2DropOldest() =
testSharedFlow<Int>(MutableSharedFlow(2, 2, BufferOverflow.DROP_OLDEST)) {
emitRightNow(0); expectReplayOf(0)
emitRightNow(1); expectReplayOf(0, 1)
emitRightNow(2); expectReplayOf(1, 2)
emitRightNow(3); expectReplayOf(2, 3)
emitRightNow(4); expectReplayOf(3, 4)
val a = subscribe("a")
collect(a, 3)
emitRightNow(5); expectReplayOf(4, 5)
emitRightNow(6); expectReplayOf(5, 6)
emitRightNow(7); expectReplayOf(6, 7) // buffer 4, 5 | 6, 7
emitRightNow(8); expectReplayOf(7, 8) // buffer 5, 6 | 7, 8
emitRightNow(9); expectReplayOf(8, 9) // buffer 6, 7 | 8, 9
collect(a, 6, 7)
val b = subscribe("b")
collect(b, 8, 9) // buffer | 8, 9
emitRightNow(10); expectReplayOf(9, 10) // buffer 8 | 9, 10
collect(a, 8, 9, 10) // buffer | 9, 10, note "b" had not collected 10 yet
emitRightNow(11); expectReplayOf(10, 11) // buffer | 10, 11
emitRightNow(12); expectReplayOf(11, 12) // buffer 10 | 11, 12
emitRightNow(13); expectReplayOf(12, 13) // buffer 10, 11 | 12, 13
emitRightNow(14); expectReplayOf(13, 14) // buffer 11, 12 | 13, 14, "b" missed 10
collect(b, 11, 12, 13, 14)
sharedFlow.resetReplayCache(); expectReplayOf() // buffer 11, 12, 13, 14 |
sharedFlow.resetReplayCache(); expectReplayOf()
collect(a, 11, 12, 13, 14)
emitRightNow(15); expectReplayOf(15)
collect(a, 15)
collect(b, 15)
}
@Test // https://github.com/Kotlin/kotlinx.coroutines/issues/2320
fun testResumeFastSubscriberOnResumedEmitter() =
testSharedFlow<Int>(MutableSharedFlow(1)) {
// create two subscribers and start collecting
val s1 = subscribe("s1"); resumeCollecting(s1)
val s2 = subscribe("s2"); resumeCollecting(s2)
// now emit 0, make sure it is collected
emitRightNow(0); expectReplayOf(0)
awaitCollected(s1, 0)
awaitCollected(s2, 0)
// now emit 1, and only first subscriber continues and collects it
emitRightNow(1); expectReplayOf(1)
collect(s1, 1)
// now emit 2, it suspend (s2 is blocking it)
val e2 = emitSuspends(2)
resumeCollecting(s1) // resume, but does not collect (e2 is still queued)
collect(s2, 1) // resume + collect next --> resumes emitter, thus resumes s1
awaitCollected(s1, 2) // <-- S1 collects value from the newly resumed emitter here !!!
emitResumes(e2); expectReplayOf(2)
// now emit 3, it suspends (s2 blocks it)
val e3 = emitSuspends(3)
collect(s2, 2)
emitResumes(e3); expectReplayOf(3)
}
@Test
fun testSuspendedConcurrentEmitAndCancelSubscriber() =
testSharedFlow<Int>(MutableSharedFlow(1)) {
val a = subscribe("a");
emitRightNow(0); expectReplayOf(0)
collect(a, 0)
emitRightNow(1); expectReplayOf(1)
val e2 = emitSuspends(2) // suspends until 1 is collected
val e3 = emitSuspends(3) // suspends until 1 is collected, too
cancel(a) // must resume emitters 2 & 3
emitResumes(e2)
emitResumes(e3)
expectReplayOf(3) // but replay size is 1 so only 3 should be kept
// Note: originally, SharedFlow was in a broken state here with 3 elements in the buffer
val b = subscribe("b")
collect(b, 3)
emitRightNow(4); expectReplayOf(4)
collect(b, 4)
}
private fun <T> testSharedFlow(
sharedFlow: MutableSharedFlow<T>,
scenario: suspend ScenarioDsl<T>.() -> Unit
) = runTest {
var dsl: ScenarioDsl<T>? = null
try {
coroutineScope {
dsl = ScenarioDsl<T>(sharedFlow, coroutineContext)
dsl!!.scenario()
dsl!!.stop()
}
} catch (e: Throwable) {
dsl?.printLog()
throw e
}
}
private data class TestJob(val job: Job, val name: String) {
override fun toString(): String = name
}
private open class Action
private data class EmitResumes(val job: TestJob) : Action()
private data class Collected(val job: TestJob, val value: Any?) : Action()
private data class ResumeCollecting(val job: TestJob) : Action()
private data class Cancelled(val job: TestJob) : Action()
@OptIn(ExperimentalStdlibApi::class)
private class ScenarioDsl<T>(
val sharedFlow: MutableSharedFlow<T>,
coroutineContext: CoroutineContext
) {
private val log = ArrayList<String>()
private val timeout = 10000L
private val scope = CoroutineScope(coroutineContext + Job())
private val actions = HashSet<Action>()
private val actionWaiters = ArrayDeque<Continuation<Unit>>()
private var expectedReplay = emptyList<T>()
private fun checkReplay() {
assertEquals(expectedReplay, sharedFlow.replayCache)
}
private fun wakeupWaiters() {
repeat(actionWaiters.size) {
actionWaiters.removeFirst().resume(Unit)
}
}
private fun addAction(action: Action) {
actions.add(action)
wakeupWaiters()
}
private suspend fun awaitAction(action: Action) {
withTimeoutOrNull(timeout) {
while (!actions.remove(action)) {
suspendCancellableCoroutine<Unit> { actionWaiters.add(it) }
}
} ?: error("Timed out waiting for action: $action")
wakeupWaiters()
}
private fun launchEmit(a: T): TestJob {
val name = "emit($a)"
val job = scope.launch(start = CoroutineStart.UNDISPATCHED) {
val job = TestJob(coroutineContext[Job]!!, name)
try {
log(name)
sharedFlow.emit(a)
log("$name resumes")
addAction(EmitResumes(job))
} catch(e: CancellationException) {
log("$name cancelled")
addAction(Cancelled(job))
}
}
return TestJob(job, name)
}
fun expectReplayOf(vararg a: T) {
expectedReplay = a.toList()
checkReplay()
}
fun emitRightNow(a: T) {
val job = launchEmit(a)
assertTrue(actions.remove(EmitResumes(job)))
}
fun emitSuspends(a: T): TestJob {
val job = launchEmit(a)
assertFalse(EmitResumes(job) in actions)
checkReplay()
return job
}
suspend fun emitResumes(job: TestJob) {
awaitAction(EmitResumes(job))
}
suspend fun cancel(job: TestJob) {
log("cancel(${job.name})")
job.job.cancel()
awaitAction(Cancelled(job))
}
fun subscribe(id: String): TestJob {
val name = "collect($id)"
val job = scope.launch(start = CoroutineStart.UNDISPATCHED) {
val job = TestJob(coroutineContext[Job]!!, name)
try {
awaitAction(ResumeCollecting(job))
log("$name start")
sharedFlow.collect { value ->
log("$name -> $value")
addAction(Collected(job, value))
awaitAction(ResumeCollecting(job))
log("$name -> $value resumes")
}
error("$name completed")
} catch(e: CancellationException) {
log("$name cancelled")
addAction(Cancelled(job))
}
}
return TestJob(job, name)
}
// collect ~== resumeCollecting + awaitCollected (for each value)
suspend fun collect(job: TestJob, vararg a: T) {
for (value in a) {
checkReplay() // should not have changed
resumeCollecting(job)
awaitCollected(job, value)
}
}
suspend fun resumeCollecting(job: TestJob) {
addAction(ResumeCollecting(job))
}
suspend fun awaitCollected(job: TestJob, value: T) {
awaitAction(Collected(job, value))
}
fun stop() {
log("--- stop")
scope.cancel()
}
private fun log(text: String) {
log.add(text)
}
fun printLog() {
println("--- The most recent log entries ---")
log.takeLast(30).forEach(::println)
println("--- That's it ---")
}
}
}