/
ShareInConflationTest.kt
162 lines (139 loc) · 5.61 KB
/
ShareInConflationTest.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
/*
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines.flow
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlin.test.*
/**
* Similar to [ShareInBufferTest] and [BufferConflationTest],
* but tests [shareIn] and its fusion with [conflate] operator.
*/
class ShareInConflationTest : TestBase() {
private val n = 100
private fun checkConflation(
bufferCapacity: Int,
onBufferOverflow: BufferOverflow = BufferOverflow.DROP_OLDEST,
op: suspend Flow<Int>.(CoroutineScope) -> Flow<Int>
) = runTest {
expect(1)
// emit all and conflate, then should collect bufferCapacity the latest ones
val done = Job()
flow {
repeat(n) { i ->
expect(i + 2)
emit(i)
}
done.join() // wait until done collection
emit(-1) // signal flow completion
}
.op(this)
.takeWhile { i -> i >= 0 }
.collect { i ->
val first = if (onBufferOverflow == BufferOverflow.DROP_LATEST) 0 else n - bufferCapacity
val last = first + bufferCapacity - 1
if (i in first..last) {
expect(n + i - first + 2)
if (i == last) done.complete() // received the last one
} else {
error("Unexpected $i")
}
}
finish(n + bufferCapacity + 2)
}
@Test
fun testConflateReplay1() =
checkConflation(1) {
conflate().shareIn(it, SharingStarted.Eagerly, 1)
}
@Test // still looks like conflating the last value for the first subscriber (will not replay to others though)
fun testConflateReplay0() =
checkConflation(1) {
conflate().shareIn(it, SharingStarted.Eagerly, 0)
}
@Test
fun testConflateReplay5() =
checkConflation(5) {
conflate().shareIn(it, SharingStarted.Eagerly, 5)
}
@Test
fun testBufferDropOldestReplay1() =
checkConflation(1) {
buffer(onBufferOverflow = BufferOverflow.DROP_OLDEST).shareIn(it, SharingStarted.Eagerly, 1)
}
@Test
fun testBufferDropOldestReplay0() =
checkConflation(1) {
buffer(onBufferOverflow = BufferOverflow.DROP_OLDEST).shareIn(it, SharingStarted.Eagerly, 0)
}
@Test
fun testBufferDropOldestReplay10() =
checkConflation(10) {
buffer(onBufferOverflow = BufferOverflow.DROP_OLDEST).shareIn(it, SharingStarted.Eagerly, 10)
}
@Test
fun testBuffer20DropOldestReplay0() =
checkConflation(20) {
buffer(20, onBufferOverflow = BufferOverflow.DROP_OLDEST).shareIn(it, SharingStarted.Eagerly, 0)
}
@Test
fun testBuffer7DropOldestReplay11() =
checkConflation(18) {
buffer(7, onBufferOverflow = BufferOverflow.DROP_OLDEST).shareIn(it, SharingStarted.Eagerly, 11)
}
@Test // a preceding buffer() gets overridden by conflate()
fun testBufferConflateOverride() =
checkConflation(1) {
buffer(23).conflate().shareIn(it, SharingStarted.Eagerly, 1)
}
@Test // a preceding buffer() gets overridden by buffer(onBufferOverflow = BufferOverflow.DROP_OLDEST)
fun testBufferDropOldestOverride() =
checkConflation(1) {
buffer(23).buffer(onBufferOverflow = BufferOverflow.DROP_OLDEST).shareIn(it, SharingStarted.Eagerly, 1)
}
@Test
fun testBufferDropLatestReplay0() =
checkConflation(1, BufferOverflow.DROP_LATEST) {
buffer(onBufferOverflow = BufferOverflow.DROP_LATEST).shareIn(it, SharingStarted.Eagerly, 0)
}
@Test
fun testBufferDropLatestReplay1() =
checkConflation(1, BufferOverflow.DROP_LATEST) {
buffer(onBufferOverflow = BufferOverflow.DROP_LATEST).shareIn(it, SharingStarted.Eagerly, 1)
}
@Test
fun testBufferDropLatestReplay10() =
checkConflation(10, BufferOverflow.DROP_LATEST) {
buffer(onBufferOverflow = BufferOverflow.DROP_LATEST).shareIn(it, SharingStarted.Eagerly, 10)
}
@Test
fun testBuffer0DropLatestReplay0() =
checkConflation(1, BufferOverflow.DROP_LATEST) {
buffer(0, onBufferOverflow = BufferOverflow.DROP_LATEST).shareIn(it, SharingStarted.Eagerly, 0)
}
@Test
fun testBuffer0DropLatestReplay1() =
checkConflation(1, BufferOverflow.DROP_LATEST) {
buffer(0, onBufferOverflow = BufferOverflow.DROP_LATEST).shareIn(it, SharingStarted.Eagerly, 1)
}
@Test
fun testBuffer0DropLatestReplay10() =
checkConflation(10, BufferOverflow.DROP_LATEST) {
buffer(0, onBufferOverflow = BufferOverflow.DROP_LATEST).shareIn(it, SharingStarted.Eagerly, 10)
}
@Test
fun testBuffer5DropLatestReplay0() =
checkConflation(5, BufferOverflow.DROP_LATEST) {
buffer(5, onBufferOverflow = BufferOverflow.DROP_LATEST).shareIn(it, SharingStarted.Eagerly, 0)
}
@Test
fun testBuffer5DropLatestReplay10() =
checkConflation(15, BufferOverflow.DROP_LATEST) {
buffer(5, onBufferOverflow = BufferOverflow.DROP_LATEST).shareIn(it, SharingStarted.Eagerly, 10)
}
@Test // a preceding buffer() gets overridden by buffer(onBufferOverflow = BufferOverflow.DROP_LATEST)
fun testBufferDropLatestOverride() =
checkConflation(1, BufferOverflow.DROP_LATEST) {
buffer(23).buffer(onBufferOverflow = BufferOverflow.DROP_LATEST).shareIn(it, SharingStarted.Eagerly, 0)
}
}