Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve exception transparency: explicitly allow throwing exceptions … #3017

Merged
merged 7 commits into from Nov 16, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
18 changes: 14 additions & 4 deletions kotlinx-coroutines-core/common/src/flow/Flow.kt
Expand Up @@ -131,10 +131,12 @@ import kotlin.coroutines.*
*
* ### Exception transparency
*
* Flow implementations never catch or handle exceptions that occur in downstream flows. From the implementation standpoint
* it means that calls to [emit][FlowCollector.emit] and [emitAll] shall never be wrapped into
* `try { ... } catch { ... }` blocks. Exception handling in flows shall be performed with
* [catch][Flow.catch] operator and it is designed to only catch exceptions coming from upstream flows while passing
* When `emit` or `emitAll` throws, the Flow implementations must immediately stop emitting new values and finish with an exception.
* For diagnostics or application-specific purposes, the exception may be different from the one thrown by the emit operation,
* suppressing the original exception as discussed below.
* If there is a need to emit values after the downstream failed, please use the [catch][Flow.catch] operator.
*
* The [catch][Flow.catch] operator only catches upstream exceptions, but passes
* all downstream exceptions. Similarly, terminal operators like [collect][Flow.collect]
* throw any unhandled exceptions that occur in their code or in upstream flows, for example:
*
Expand All @@ -147,13 +149,21 @@ import kotlin.coroutines.*
* ```
* The same reasoning can be applied to the [onCompletion] operator that is a declarative replacement for the `finally` block.
*
* All exception-handling Flow operators follow the principle of exception suppression:
*
* If the upstream flow throws an exception during its completion when the downstream exception has been thrown,
* the downstream exception becomes superseded and suppressed by the upstream exception, being a semantic
* equivalent of throwing from `finally` block. Exception-handling operators then ignore this exception,
* still following the downstream failure.
qwwdfsad marked this conversation as resolved.
Show resolved Hide resolved
*
* Failure to adhere to the exception transparency requirement can lead to strange behaviors which make
* it hard to reason about the code because an exception in the `collect { ... }` could be somehow "caught"
* by an upstream flow, limiting the ability of local reasoning about the code.
*
* Flow machinery enforces exception transparency at runtime and throws [IllegalStateException] on any attempt to emit a value,
* if an exception has been thrown on previous attempt.
*
*
qwwdfsad marked this conversation as resolved.
Show resolved Hide resolved
* ### Reactive streams
*
* Flow is [Reactive Streams](http://www.reactive-streams.org/) compliant, you can safely interop it with
Expand Down
39 changes: 38 additions & 1 deletion kotlinx-coroutines-core/common/src/flow/operators/Errors.kt
Expand Up @@ -186,6 +186,7 @@ public fun <T> Flow<T>.retryWhen(predicate: suspend FlowCollector<T>.(cause: Thr
}

// Return exception from upstream or null
@Suppress("NAME_SHADOWING")
internal suspend fun <T> Flow<T>.catchImpl(
collector: FlowCollector<T>
): Throwable? {
Expand All @@ -200,14 +201,50 @@ internal suspend fun <T> Flow<T>.catchImpl(
}
}
} catch (e: Throwable) {
// Otherwise, smartcast is impossible
val fromDownstream = fromDownstream
/*
* First check ensures that we catch an original exception, not one rethrown by an operator.
* Seconds check ignores cancellation causes, they cannot be caught.
*/
if (e.isSameExceptionAs(fromDownstream) || e.isCancellationCause(coroutineContext)) {
throw e // Rethrow exceptions from downstream and cancellation causes
} else {
return e // not from downstream
/*
* The exception came from the upstream [semi-] independently.
* For pure failures, when the downstream functions normally, we handle the exception as intended.
* But if the downstream has failed prior to or concurrently
* with the upstream, we forcefully rethrow it, preserving the contextual information and ensuring that it's not lost.
*/
if (fromDownstream == null) {
return e
}
/*
* We consider the upstream exception as the superseding one when both upstream and downstream
* fail, suppressing the downstream exception, and operating similarly to `finally` block with
* the useful addition of adding the original downstream exception to suppressed ones.
*
* That's important for the following scenarios:
* ```
* flow {
* val resource = ...
* try {
* ... emit as well ...
* } finally {
* resource.close() // Throws in the shutdown sequence when 'collect' already has thrown an exception
* }
* }.catch { } // or retry
* .collect { ... }
* ```
* when *the downstream* throws.
*/
if (e is CancellationException) {
fromDownstream.addSuppressed(e)
throw fromDownstream
} else {
e.addSuppressed(fromDownstream)
throw e
}
}
}
return null
Expand Down
38 changes: 38 additions & 0 deletions kotlinx-coroutines-core/common/test/flow/operators/CatchTest.kt
Expand Up @@ -144,4 +144,42 @@ class CatchTest : TestBase() {
.collect()
finish(9)
}

@Test
fun testUpstreamExceptionConcurrentWithDownstream() = runTest {
val flow = flow {
try {
expect(1)
emit(1)
} finally {
expect(3)
throw TestException()
}
}.catch { expectUnreached() }.onEach {
expect(2)
throw TestException2()
}

assertFailsWith<TestException>(flow)
finish(4)
}

@Test
fun testUpstreamExceptionConcurrentWithDownstreamCancellation() = runTest {
val flow = flow {
try {
expect(1)
emit(1)
} finally {
expect(3)
throw TestException()
}
}.catch { expectUnreached() }.onEach {
expect(2)
throw CancellationException("")
}

assertFailsWith<TestException>(flow)
qwwdfsad marked this conversation as resolved.
Show resolved Hide resolved
finish(4)
}
}
40 changes: 39 additions & 1 deletion kotlinx-coroutines-core/common/test/flow/operators/RetryTest.kt
Expand Up @@ -104,4 +104,42 @@ class RetryTest : TestBase() {
job.cancelAndJoin()
finish(3)
}
}

@Test
fun testUpstreamExceptionConcurrentWithDownstream() = runTest {
val flow = flow {
try {
expect(1)
emit(1)
} finally {
expect(3)
throw TestException()
}
}.retry { expectUnreached(); true }.onEach {
expect(2)
throw TestException2()
}

assertFailsWith<TestException>(flow)
finish(4)
}

@Test
fun testUpstreamExceptionConcurrentWithDownstreamCancellation() = runTest {
val flow = flow {
try {
expect(1)
emit(1)
} finally {
expect(3)
throw TestException()
}
}.retry { expectUnreached(); true }.onEach {
expect(2)
throw CancellationException("")
}

assertFailsWith<TestException>(flow)
finish(4)
}
}
50 changes: 50 additions & 0 deletions kotlinx-coroutines-core/jvm/test/exceptions/FlowSuppressionTest.kt
@@ -0,0 +1,50 @@
/*
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.exceptions

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import org.junit.*
import org.junit.Test
import kotlin.test.*

class FlowSuppressionTest : TestBase() {
@Test
fun testSuppressionForPrimaryException() = runTest {
val flow = flow {
try {
emit(1)
} finally {
throw TestException()
}
}.catch { expectUnreached() }.onEach { throw TestException2() }

try {
flow.collect()
} catch (e: Throwable) {
assertIs<TestException>(e)
assertIs<TestException2>(e.suppressed[0])
}
}

@Test
fun testSuppressionForPrimaryExceptionRetry() = runTest {
val flow = flow {
try {
emit(1)
} finally {
throw TestException()
}
}.retry { expectUnreached(); true }.onEach { throw TestException2() }

try {
flow.collect()
} catch (e: Throwable) {
assertIs<TestException>(e)
assertIs<TestException2>(e.suppressed[0])

}
}
}