Skip to content

Commit

Permalink
Fail-fast in emitAll implementation from onCompletion (Kotlin#2700)
Browse files Browse the repository at this point in the history
It is helpful to prevent bugs like KT-46013 and potential deadlocks or delayed cancellations
  • Loading branch information
qwwdfsad authored and pablobaxter committed Sep 14, 2022
1 parent c30e18c commit fa7b73f
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 4 deletions.
1 change: 1 addition & 0 deletions kotlinx-coroutines-core/common/src/flow/Channels.kt
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public suspend fun <T> FlowCollector<T>.emitAll(channel: ReceiveChannel<T>): Uni
emitAllImpl(channel, consume = true)

private suspend fun <T> FlowCollector<T>.emitAllImpl(channel: ReceiveChannel<T>, consume: Boolean) {
ensureActive()
// Manually inlined "consumeEach" implementation that does not use iterator but works via "receiveCatching".
// It has smaller and more efficient spilled state which also allows to implement a manual kludge to
// fix retention of the last emitted value.
Expand Down
10 changes: 9 additions & 1 deletion kotlinx-coroutines-core/common/src/flow/operators/Emitters.kt
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,15 @@ public fun <T> Flow<T>.onEmpty(
}
}

private class ThrowingCollector(private val e: Throwable) : FlowCollector<Any?> {
/*
* 'emitAll' methods call this to fail-fast before starting to collect
* their sources (that may not have any elements for a long time).
*/
internal fun FlowCollector<*>.ensureActive() {
if (this is ThrowingCollector) throw e
}

internal class ThrowingCollector(@JvmField val e: Throwable) : FlowCollector<Any?> {
override suspend fun emit(value: Any?) {
throw e
}
Expand Down
6 changes: 4 additions & 2 deletions kotlinx-coroutines-core/common/src/flow/terminal/Collect.kt
Original file line number Diff line number Diff line change
Expand Up @@ -127,5 +127,7 @@ public suspend fun <T> Flow<T>.collectLatest(action: suspend (value: T) -> Unit)
* Collects all the values from the given [flow] and emits them to the collector.
* It is a shorthand for `flow.collect { value -> emit(value) }`.
*/
@BuilderInference
public suspend inline fun <T> FlowCollector<T>.emitAll(flow: Flow<T>): Unit = flow.collect(this)
public suspend fun <T> FlowCollector<T>.emitAll(flow: Flow<T>) {
ensureActive()
flow.collect(this)
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
/*
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.flow

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.internal.*
import kotlin.test.*

Expand Down Expand Up @@ -290,4 +291,24 @@ class OnCompletionTest : TestBase() {
val expected = (1..5).toList() + (-1)
assertEquals(expected, result)
}

@Test
fun testCancelledEmitAllFlow() = runTest {
// emitAll does not call 'collect' on onCompletion collector
// if the target flow is empty
flowOf(1, 2, 3)
.onCompletion { emitAll(MutableSharedFlow()) }
.take(1)
.collect()
}

@Test
fun testCancelledEmitAllChannel() = runTest {
// emitAll does not call 'collect' on onCompletion collector
// if the target channel is empty
flowOf(1, 2, 3)
.onCompletion { emitAll(Channel()) }
.take(1)
.collect()
}
}

0 comments on commit fa7b73f

Please sign in to comment.