Skip to content

Commit

Permalink
SharedFlow: Fix scenario with concurrent emitters and cancellation of…
Browse files Browse the repository at this point in the history
… subscriber (#2359)

* Added a specific test for a problematic scenario.
* Added stress test with concurrent emitters and subscribers that come and go.

Fixes #2356
  • Loading branch information
elizarov committed Nov 3, 2020
1 parent 4ea4078 commit e710048
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 0 deletions.
6 changes: 6 additions & 0 deletions kotlinx-coroutines-core/common/src/flow/SharedFlow.kt
Expand Up @@ -498,6 +498,12 @@ private class SharedFlowImpl<T>(
}
// Compute new buffer size -> how many values we now actually have after resume
val newBufferSize1 = (newBufferEndIndex - head).toInt()
// Note: When nCollectors == 0 we resume ALL queued emitters and we might have resumed more than bufferCapacity,
// and newMinCollectorIndex might pointing the wrong place because of that. The easiest way to fix it is by
// forcing newMinCollectorIndex = newBufferEndIndex. We do not needed to update newBufferSize1 (which could be
// too big), because the only use of newBufferSize1 in the below code is in the minOf(replay, newBufferSize1)
// expression, which coerces values that are too big anyway.
if (nCollectors == 0) newMinCollectorIndex = newBufferEndIndex
// Compute new replay size -> limit to replay the number of items we need, take into account that it can only grow
var newReplayIndex = maxOf(replayIndex, newBufferEndIndex - minOf(replay, newBufferSize1))
// adjustment for synchronous case with cancelled emitter (NO_VALUE)
Expand Down
Expand Up @@ -201,6 +201,48 @@ class SharedFlowScenarioTest : TestBase() {
emitResumes(e3); expectReplayOf(3)
}

@Test
fun testSuspendedConcurrentEmitAndCancelSubscriberReplay1() =
testSharedFlow<Int>(MutableSharedFlow(1)) {
val a = subscribe("a");
emitRightNow(0); expectReplayOf(0)
collect(a, 0)
emitRightNow(1); expectReplayOf(1)
val e2 = emitSuspends(2) // suspends until 1 is collected
val e3 = emitSuspends(3) // suspends until 1 is collected, too
cancel(a) // must resume emitters 2 & 3
emitResumes(e2)
emitResumes(e3)
expectReplayOf(3) // but replay size is 1 so only 3 should be kept
// Note: originally, SharedFlow was in a broken state here with 3 elements in the buffer
val b = subscribe("b")
collect(b, 3)
emitRightNow(4); expectReplayOf(4)
collect(b, 4)
}

@Test
fun testSuspendedConcurrentEmitAndCancelSubscriberReplay1ExtraBuffer1() =
testSharedFlow<Int>(MutableSharedFlow( replay = 1, extraBufferCapacity = 1)) {
val a = subscribe("a");
emitRightNow(0); expectReplayOf(0)
collect(a, 0)
emitRightNow(1); expectReplayOf(1)
emitRightNow(2); expectReplayOf(2)
val e3 = emitSuspends(3) // suspends until 1 is collected
val e4 = emitSuspends(4) // suspends until 1 is collected, too
val e5 = emitSuspends(5) // suspends until 1 is collected, too
cancel(a) // must resume emitters 3, 4, 5
emitResumes(e3)
emitResumes(e4)
emitResumes(e5)
expectReplayOf(5)
val b = subscribe("b")
collect(b, 5)
emitRightNow(6); expectReplayOf(6)
collect(b, 6)
}

private fun <T> testSharedFlow(
sharedFlow: MutableSharedFlow<T>,
scenario: suspend ScenarioDsl<T>.() -> Unit
Expand Down
87 changes: 87 additions & 0 deletions kotlinx-coroutines-core/jvm/test/flow/SharedFlowStressTest.kt
@@ -0,0 +1,87 @@
/*
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.flow

import kotlinx.atomicfu.*
import kotlinx.coroutines.*
import org.junit.*
import org.junit.Test
import kotlin.collections.ArrayList
import kotlin.test.*
import kotlin.time.*

@ExperimentalTime
class SharedFlowStressTest : TestBase() {
private val nProducers = 5
private val nConsumers = 3
private val nSeconds = 3 * stressTestMultiplier

private lateinit var sf: MutableSharedFlow<Long>
private lateinit var view: SharedFlow<Long>

@get:Rule
val producerDispatcher = ExecutorRule(nProducers)
@get:Rule
val consumerDispatcher = ExecutorRule(nConsumers)

private val totalProduced = atomic(0L)
private val totalConsumed = atomic(0L)

@Test
fun testStressReplay1() =
testStress(1, 0)

@Test
fun testStressReplay1ExtraBuffer1() =
testStress(1, 1)

@Test
fun testStressReplay2ExtraBuffer1() =
testStress(2, 1)

private fun testStress(replay: Int, extraBufferCapacity: Int) = runTest {
sf = MutableSharedFlow(replay, extraBufferCapacity)
view = sf.asSharedFlow()
val jobs = ArrayList<Job>()
jobs += List(nProducers) { producerIndex ->
launch(producerDispatcher) {
var cur = producerIndex.toLong()
while (isActive) {
sf.emit(cur)
totalProduced.incrementAndGet()
cur += nProducers
}
}
}
jobs += List(nConsumers) { consumerIndex ->
launch(consumerDispatcher) {
while (isActive) {
view
.dropWhile { it % nConsumers != consumerIndex.toLong() }
.take(1)
.collect {
check(it % nConsumers == consumerIndex.toLong())
totalConsumed.incrementAndGet()
}
}
}
}
var lastProduced = 0L
var lastConsumed = 0L
for (sec in 1..nSeconds) {
delay(1.seconds)
val produced = totalProduced.value
val consumed = totalConsumed.value
println("$sec sec: produced = $produced; consumed = $consumed")
assertNotEquals(lastProduced, produced)
assertNotEquals(lastConsumed, consumed)
lastProduced = produced
lastConsumed = consumed
}
jobs.forEach { it.cancel() }
jobs.forEach { it.join() }
println("total: produced = ${totalProduced.value}; consumed = ${totalConsumed.value}")
}
}

0 comments on commit e710048

Please sign in to comment.