Skip to content

Commit

Permalink
Fix SharedFlow with replay for subscribers working at different speed (
Browse files Browse the repository at this point in the history
…#2325)

Problematic scenario:
* Emitter suspends because there is a slow subscriber
* Fast subscriber collects all the values and suspend
* Slow subscriber resumes, collects value, causes emitter to be resume
* Fast subscribers must be resumed in this case, too

Fixes #2320
  • Loading branch information
elizarov committed Oct 26, 2020
1 parent 45ba58e commit 53f007f
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 8 deletions.
14 changes: 8 additions & 6 deletions kotlinx-coroutines-core/common/src/flow/SharedFlow.kt
Expand Up @@ -326,7 +326,7 @@ private class SharedFlowImpl<T>(
var resumes: Array<Continuation<Unit>?> = EMPTY_RESUMES
val emitted = synchronized(this) {
if (tryEmitLocked(value)) {
resumes = findSlotsToResumeLocked()
resumes = findSlotsToResumeLocked(resumes)
true
} else {
false
Expand Down Expand Up @@ -422,15 +422,15 @@ private class SharedFlowImpl<T>(
// recheck buffer under lock again (make sure it is really full)
if (tryEmitLocked(value)) {
cont.resume(Unit)
resumes = findSlotsToResumeLocked()
resumes = findSlotsToResumeLocked(resumes)
return@lock null
}
// add suspended emitter to the buffer
Emitter(this, head + totalSize, value, cont).also {
enqueueLocked(it)
queueSize++ // added to queue of waiting emitters
// synchronous shared flow might rendezvous with waiting emitter
if (bufferCapacity == 0) resumes = findSlotsToResumeLocked()
if (bufferCapacity == 0) resumes = findSlotsToResumeLocked(resumes)
}
}
// outside of the lock: register dispose on cancellation
Expand Down Expand Up @@ -512,6 +512,8 @@ private class SharedFlowImpl<T>(
updateBufferLocked(newReplayIndex, newMinCollectorIndex, newBufferEndIndex, newQueueEndIndex)
// just in case we've moved all buffered emitters and have NO_VALUE's at the tail now
cleanupTailLocked()
// We need to waken up suspended collectors if any emitters were resumed here
if (resumes.isNotEmpty()) resumes = findSlotsToResumeLocked(resumes)
return resumes
}

Expand Down Expand Up @@ -598,9 +600,9 @@ private class SharedFlowImpl<T>(
}
}

private fun findSlotsToResumeLocked(): Array<Continuation<Unit>?> {
var resumes: Array<Continuation<Unit>?> = EMPTY_RESUMES
var resumeCount = 0
private fun findSlotsToResumeLocked(resumesIn: Array<Continuation<Unit>?>): Array<Continuation<Unit>?> {
var resumes: Array<Continuation<Unit>?> = resumesIn
var resumeCount = resumesIn.size
forEachSlotLocked loop@{ slot ->
val cont = slot.cont ?: return@loop // only waiting slots
if (tryPeekLocked(slot) < 0) return@loop // only slots that can peek a value
Expand Down
Expand Up @@ -176,6 +176,31 @@ class SharedFlowScenarioTest : TestBase() {
collect(b, 15)
}

@Test // https://github.com/Kotlin/kotlinx.coroutines/issues/2320
fun testResumeFastSubscriberOnResumedEmitter() =
testSharedFlow<Int>(MutableSharedFlow(1)) {
// create two subscribers and start collecting
val s1 = subscribe("s1"); resumeCollecting(s1)
val s2 = subscribe("s2"); resumeCollecting(s2)
// now emit 0, make sure it is collected
emitRightNow(0); expectReplayOf(0)
awaitCollected(s1, 0)
awaitCollected(s2, 0)
// now emit 1, and only first subscriber continues and collects it
emitRightNow(1); expectReplayOf(1)
collect(s1, 1)
// now emit 2, it suspend (s2 is blocking it)
val e2 = emitSuspends(2)
resumeCollecting(s1) // resume, but does not collect (e2 is still queued)
collect(s2, 1) // resume + collect next --> resumes emitter, thus resumes s1
awaitCollected(s1, 2) // <-- S1 collects value from the newly resumed emitter here !!!
emitResumes(e2); expectReplayOf(2)
// now emit 3, it suspends (s2 blocks it)
val e3 = emitSuspends(3)
collect(s2, 2)
emitResumes(e3); expectReplayOf(3)
}

private fun <T> testSharedFlow(
sharedFlow: MutableSharedFlow<T>,
scenario: suspend ScenarioDsl<T>.() -> Unit
Expand Down Expand Up @@ -305,14 +330,23 @@ class SharedFlowScenarioTest : TestBase() {
return TestJob(job, name)
}

// collect ~== resumeCollecting + awaitCollected (for each value)
suspend fun collect(job: TestJob, vararg a: T) {
for (value in a) {
checkReplay() // should not have changed
addAction(ResumeCollecting(job))
awaitAction(Collected(job, value))
resumeCollecting(job)
awaitCollected(job, value)
}
}

suspend fun resumeCollecting(job: TestJob) {
addAction(ResumeCollecting(job))
}

suspend fun awaitCollected(job: TestJob, value: T) {
awaitAction(Collected(job, value))
}

fun stop() {
log("--- stop")
scope.cancel()
Expand Down

0 comments on commit 53f007f

Please sign in to comment.