Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix SharedFlow with replay for subscribers working at different speed #2325

Merged
merged 1 commit into from Oct 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
14 changes: 8 additions & 6 deletions kotlinx-coroutines-core/common/src/flow/SharedFlow.kt
Expand Up @@ -326,7 +326,7 @@ private class SharedFlowImpl<T>(
var resumes: Array<Continuation<Unit>?> = EMPTY_RESUMES
val emitted = synchronized(this) {
if (tryEmitLocked(value)) {
resumes = findSlotsToResumeLocked()
resumes = findSlotsToResumeLocked(resumes)
true
} else {
false
Expand Down Expand Up @@ -422,15 +422,15 @@ private class SharedFlowImpl<T>(
// recheck buffer under lock again (make sure it is really full)
if (tryEmitLocked(value)) {
cont.resume(Unit)
resumes = findSlotsToResumeLocked()
resumes = findSlotsToResumeLocked(resumes)
return@lock null
}
// add suspended emitter to the buffer
Emitter(this, head + totalSize, value, cont).also {
enqueueLocked(it)
queueSize++ // added to queue of waiting emitters
// synchronous shared flow might rendezvous with waiting emitter
if (bufferCapacity == 0) resumes = findSlotsToResumeLocked()
if (bufferCapacity == 0) resumes = findSlotsToResumeLocked(resumes)
}
}
// outside of the lock: register dispose on cancellation
Expand Down Expand Up @@ -512,6 +512,8 @@ private class SharedFlowImpl<T>(
updateBufferLocked(newReplayIndex, newMinCollectorIndex, newBufferEndIndex, newQueueEndIndex)
// just in case we've moved all buffered emitters and have NO_VALUE's at the tail now
cleanupTailLocked()
// We need to waken up suspended collectors if any emitters were resumed here
if (resumes.isNotEmpty()) resumes = findSlotsToResumeLocked(resumes)
return resumes
}

Expand Down Expand Up @@ -598,9 +600,9 @@ private class SharedFlowImpl<T>(
}
}

private fun findSlotsToResumeLocked(): Array<Continuation<Unit>?> {
var resumes: Array<Continuation<Unit>?> = EMPTY_RESUMES
var resumeCount = 0
private fun findSlotsToResumeLocked(resumesIn: Array<Continuation<Unit>?>): Array<Continuation<Unit>?> {
var resumes: Array<Continuation<Unit>?> = resumesIn
var resumeCount = resumesIn.size
forEachSlotLocked loop@{ slot ->
val cont = slot.cont ?: return@loop // only waiting slots
if (tryPeekLocked(slot) < 0) return@loop // only slots that can peek a value
Expand Down
Expand Up @@ -176,6 +176,31 @@ class SharedFlowScenarioTest : TestBase() {
collect(b, 15)
}

@Test // https://github.com/Kotlin/kotlinx.coroutines/issues/2320
fun testResumeFastSubscriberOnResumedEmitter() =
testSharedFlow<Int>(MutableSharedFlow(1)) {
// create two subscribers and start collecting
val s1 = subscribe("s1"); resumeCollecting(s1)
val s2 = subscribe("s2"); resumeCollecting(s2)
// now emit 0, make sure it is collected
emitRightNow(0); expectReplayOf(0)
awaitCollected(s1, 0)
awaitCollected(s2, 0)
// now emit 1, and only first subscriber continues and collects it
emitRightNow(1); expectReplayOf(1)
collect(s1, 1)
// now emit 2, it suspend (s2 is blocking it)
val e2 = emitSuspends(2)
resumeCollecting(s1) // resume, but does not collect (e2 is still queued)
collect(s2, 1) // resume + collect next --> resumes emitter, thus resumes s1
awaitCollected(s1, 2) // <-- S1 collects value from the newly resumed emitter here !!!
emitResumes(e2); expectReplayOf(2)
// now emit 3, it suspends (s2 blocks it)
val e3 = emitSuspends(3)
collect(s2, 2)
emitResumes(e3); expectReplayOf(3)
}

private fun <T> testSharedFlow(
sharedFlow: MutableSharedFlow<T>,
scenario: suspend ScenarioDsl<T>.() -> Unit
Expand Down Expand Up @@ -305,14 +330,23 @@ class SharedFlowScenarioTest : TestBase() {
return TestJob(job, name)
}

// collect ~== resumeCollecting + awaitCollected (for each value)
suspend fun collect(job: TestJob, vararg a: T) {
for (value in a) {
checkReplay() // should not have changed
addAction(ResumeCollecting(job))
awaitAction(Collected(job, value))
resumeCollecting(job)
awaitCollected(job, value)
}
}

suspend fun resumeCollecting(job: TestJob) {
addAction(ResumeCollecting(job))
}

suspend fun awaitCollected(job: TestJob, value: T) {
awaitAction(Collected(job, value))
}

fun stop() {
log("--- stop")
scope.cancel()
Expand Down