Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rethrow downstream exception during "onCompletion" emissions #1658

Merged
merged 1 commit into from Nov 30, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
29 changes: 20 additions & 9 deletions kotlinx-coroutines-core/common/src/flow/operators/Emitters.kt
Expand Up @@ -128,13 +128,26 @@ public fun <T> Flow<T>.onStart(
public fun <T> Flow<T>.onCompletion(
action: suspend FlowCollector<T>.(cause: Throwable?) -> Unit
): Flow<T> = unsafeFlow { // Note: unsafe flow is used here, but safe collector is used to invoke completion action
var exception: Throwable? = null
try {
exception = catchImpl(this)
} finally {
// Separate method because of KT-32220
SafeCollector<T>(this, coroutineContext).invokeSafely(action, exception)
exception?.let { throw it }
val exception = try {
catchImpl(this)
} catch (e: Throwable) {
/*
* Exception from the downstream.
* Use throwing collector to prevent any emissions from the
* completion sequence when downstream has failed, otherwise it may
* lead to a non-sequential behaviour impossible with `finally`
*/
ThrowingCollector(e).invokeSafely(action, null)
throw e
}
// Exception from the upstream or normal completion
SafeCollector(this, coroutineContext).invokeSafely(action, exception)
exception?.let { throw it }
}

private class ThrowingCollector(private val e: Throwable) : FlowCollector<Any?> {
override suspend fun emit(value: Any?) {
throw e
}
}

Expand All @@ -155,5 +168,3 @@ private suspend fun <T> FlowCollector<T>.invokeSafely(
throw e
}
}


Expand Up @@ -5,6 +5,7 @@
package kotlinx.coroutines.flow

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.internal.*
import kotlin.test.*

class OnCompletionTest : TestBase() {
Expand Down Expand Up @@ -171,14 +172,91 @@ class OnCompletionTest : TestBase() {
.onCompletion { e ->
expect(8)
assertNull(e)
emit(TestData.Done(e))
try {
emit(TestData.Done(e))
expectUnreached()
} finally {
expect(9)
}
}.collect {
collected += it
}
}
}
val expected = (1..5).map { TestData.Value(it) } + TestData.Done(null)
val expected = (1..5).map<Int, TestData> { TestData.Value(it) }
assertEquals(expected, collected)
finish(9)
finish(10)
}

@Test
fun testFailedEmit() = runTest {
val cause = TestException()
assertFailsWith<TestException> {
flow<TestData> {
expect(1)
emit(TestData.Value(2))
expectUnreached()
}.onCompletion {
assertNull(it)
expect(3)
try {
emit(TestData.Done(it))
expectUnreached()
} catch (e: TestException) {
assertSame(cause, e)
finish(4)
}
}.collect {
expect((it as TestData.Value).i)
throw cause
}
}
}

@Test
fun testFirst() = runTest {
val value = flowOf(239).onCompletion {
assertNull(it)
expect(1)
try {
emit(42)
expectUnreached()
} catch (e: Throwable) {
assertTrue { e is AbortFlowException }
}
}.first()
assertEquals(239, value)
finish(2)
}

@Test
fun testSingle() = runTest {
assertFailsWith<IllegalStateException> {
flowOf(239).onCompletion {
assertNull(it)
expect(1)
try {
emit(42)
expectUnreached()
} catch (e: Throwable) {
// Second emit -- failure
assertTrue { e is IllegalStateException }
throw e
}
}.single()
expectUnreached()
}
finish(2)
}

@Test
fun testEmptySingleInterference() = runTest {
val value = emptyFlow<Int>().onCompletion {
assertNull(it)
expect(1)
emit(42)
}.single()
assertEquals(42, value)
finish(2)
}
}