Skip to content

Commit

Permalink
Rethrow downstream exception during "onCompletion" emissions
Browse files Browse the repository at this point in the history
    * We cannot allow emitting elements when downstream exception occurred, otherwise it may lead to a weird side-effects when "collect" block (or any other terminal operator) has thrown an exception, but keeps receiving new values
    * Another solution may be to silently ignore emitted values, but it may lead to a postponed cancellation and surprising behaviour for users

Fixes #1654
  • Loading branch information
qwwdfsad committed Nov 30, 2019
1 parent bd1687f commit 34dcfb1
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 12 deletions.
29 changes: 20 additions & 9 deletions kotlinx-coroutines-core/common/src/flow/operators/Emitters.kt
Expand Up @@ -128,13 +128,26 @@ public fun <T> Flow<T>.onStart(
public fun <T> Flow<T>.onCompletion(
action: suspend FlowCollector<T>.(cause: Throwable?) -> Unit
): Flow<T> = unsafeFlow { // Note: unsafe flow is used here, but safe collector is used to invoke completion action
var exception: Throwable? = null
try {
exception = catchImpl(this)
} finally {
// Separate method because of KT-32220
SafeCollector<T>(this, coroutineContext).invokeSafely(action, exception)
exception?.let { throw it }
val exception = try {
catchImpl(this)
} catch (e: Throwable) {
/*
* Exception from the downstream.
* Use throwing collector to prevent any emissions from the
* completion sequence when downstream has failed, otherwise it may
* lead to a non-sequential behaviour impossible with `finally`
*/
ThrowingCollector(e).invokeSafely(action, null)
throw e
}
// Exception from the upstream or normal completion
SafeCollector(this, coroutineContext).invokeSafely(action, exception)
exception?.let { throw it }
}

private class ThrowingCollector(private val e: Throwable) : FlowCollector<Any?> {
override suspend fun emit(value: Any?) {
throw e
}
}

Expand All @@ -155,5 +168,3 @@ private suspend fun <T> FlowCollector<T>.invokeSafely(
throw e
}
}


Expand Up @@ -5,6 +5,7 @@
package kotlinx.coroutines.flow

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.internal.*
import kotlin.test.*

class OnCompletionTest : TestBase() {
Expand Down Expand Up @@ -171,14 +172,91 @@ class OnCompletionTest : TestBase() {
.onCompletion { e ->
expect(8)
assertNull(e)
emit(TestData.Done(e))
try {
emit(TestData.Done(e))
expectUnreached()
} finally {
expect(9)
}
}.collect {
collected += it
}
}
}
val expected = (1..5).map { TestData.Value(it) } + TestData.Done(null)
val expected = (1..5).map<Int, TestData> { TestData.Value(it) }
assertEquals(expected, collected)
finish(9)
finish(10)
}

@Test
fun testFailedEmit() = runTest {
val cause = TestException()
assertFailsWith<TestException> {
flow<TestData> {
expect(1)
emit(TestData.Value(2))
expectUnreached()
}.onCompletion {
assertNull(it)
expect(3)
try {
emit(TestData.Done(it))
expectUnreached()
} catch (e: TestException) {
assertSame(cause, e)
finish(4)
}
}.collect {
expect((it as TestData.Value).i)
throw cause
}
}
}

@Test
fun testFirst() = runTest {
val value = flowOf(239).onCompletion {
assertNull(it)
expect(1)
try {
emit(42)
expectUnreached()
} catch (e: Throwable) {
assertTrue { e is AbortFlowException }
}
}.first()
assertEquals(239, value)
finish(2)
}

@Test
fun testSingle() = runTest {
assertFailsWith<IllegalStateException> {
flowOf(239).onCompletion {
assertNull(it)
expect(1)
try {
emit(42)
expectUnreached()
} catch (e: Throwable) {
// Second emit -- failure
assertTrue { e is IllegalStateException }
throw e
}
}.single()
expectUnreached()
}
finish(2)
}

@Test
fun testEmptySingleInterference() = runTest {
val value = emptyFlow<Int>().onCompletion {
assertNull(it)
expect(1)
emit(42)
}.single()
assertEquals(42, value)
finish(2)
}
}

0 comments on commit 34dcfb1

Please sign in to comment.