diff --git a/kotlinx-coroutines-core/common/src/flow/Flow.kt b/kotlinx-coroutines-core/common/src/flow/Flow.kt index 0ccd343ead..259f477de0 100644 --- a/kotlinx-coroutines-core/common/src/flow/Flow.kt +++ b/kotlinx-coroutines-core/common/src/flow/Flow.kt @@ -131,10 +131,12 @@ import kotlin.coroutines.* * * ### Exception transparency * - * Flow implementations never catch or handle exceptions that occur in downstream flows. From the implementation standpoint - * it means that calls to [emit][FlowCollector.emit] and [emitAll] shall never be wrapped into - * `try { ... } catch { ... }` blocks. Exception handling in flows shall be performed with - * [catch][Flow.catch] operator and it is designed to only catch exceptions coming from upstream flows while passing + * When `emit` or `emitAll` throws, the Flow implementations must immediately stop emitting new values and finish with an exception. + * For diagnostics or application-specific purposes, the exception may be different from the one thrown by the emit operation, + * suppressing the original exception as discussed below. + * If there is a need to emit values after the downstream failed, please use the [catch][Flow.catch] operator. + * + * The [catch][Flow.catch] operator only catches upstream exceptions, but passes * all downstream exceptions. Similarly, terminal operators like [collect][Flow.collect] * throw any unhandled exceptions that occur in their code or in upstream flows, for example: * @@ -147,6 +149,13 @@ import kotlin.coroutines.* * ``` * The same reasoning can be applied to the [onCompletion] operator that is a declarative replacement for the `finally` block. * + * All exception-handling Flow operators follow the principle of exception suppression: + * + * If the upstream flow throws an exception during its completion when the downstream exception has been thrown, + * the downstream exception becomes superseded and suppressed by the upstream exception, being a semantic + * equivalent of throwing from `finally` block. However, this doesn't affect the operation of the exception-handling operators, + * which consider the downstream exception to be the root cause and behave as if the upstream didn't throw anything. + * * Failure to adhere to the exception transparency requirement can lead to strange behaviors which make * it hard to reason about the code because an exception in the `collect { ... }` could be somehow "caught" * by an upstream flow, limiting the ability of local reasoning about the code. diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Errors.kt b/kotlinx-coroutines-core/common/src/flow/operators/Errors.kt index 608221e09f..c406c8bd72 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Errors.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Errors.kt @@ -186,6 +186,7 @@ public fun Flow.retryWhen(predicate: suspend FlowCollector.(cause: Thr } // Return exception from upstream or null +@Suppress("NAME_SHADOWING") internal suspend fun Flow.catchImpl( collector: FlowCollector ): Throwable? { @@ -200,6 +201,8 @@ internal suspend fun Flow.catchImpl( } } } catch (e: Throwable) { + // Otherwise, smartcast is impossible + val fromDownstream = fromDownstream /* * First check ensures that we catch an original exception, not one rethrown by an operator. * Seconds check ignores cancellation causes, they cannot be caught. @@ -207,7 +210,41 @@ internal suspend fun Flow.catchImpl( if (e.isSameExceptionAs(fromDownstream) || e.isCancellationCause(coroutineContext)) { throw e // Rethrow exceptions from downstream and cancellation causes } else { - return e // not from downstream + /* + * The exception came from the upstream [semi-] independently. + * For pure failures, when the downstream functions normally, we handle the exception as intended. + * But if the downstream has failed prior to or concurrently + * with the upstream, we forcefully rethrow it, preserving the contextual information and ensuring that it's not lost. + */ + if (fromDownstream == null) { + return e + } + /* + * We consider the upstream exception as the superseding one when both upstream and downstream + * fail, suppressing the downstream exception, and operating similarly to `finally` block with + * the useful addition of adding the original downstream exception to suppressed ones. + * + * That's important for the following scenarios: + * ``` + * flow { + * val resource = ... + * try { + * ... emit as well ... + * } finally { + * resource.close() // Throws in the shutdown sequence when 'collect' already has thrown an exception + * } + * }.catch { } // or retry + * .collect { ... } + * ``` + * when *the downstream* throws. + */ + if (e is CancellationException) { + fromDownstream.addSuppressed(e) + throw fromDownstream + } else { + e.addSuppressed(fromDownstream) + throw e + } } } return null diff --git a/kotlinx-coroutines-core/common/test/flow/operators/CatchTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/CatchTest.kt index 447eb73b5d..ad91e49898 100644 --- a/kotlinx-coroutines-core/common/test/flow/operators/CatchTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/operators/CatchTest.kt @@ -144,4 +144,61 @@ class CatchTest : TestBase() { .collect() finish(9) } + + @Test + fun testUpstreamExceptionConcurrentWithDownstream() = runTest { + val flow = flow { + try { + expect(1) + emit(1) + } finally { + expect(3) + throw TestException() + } + }.catch { expectUnreached() }.onEach { + expect(2) + throw TestException2() + } + + assertFailsWith(flow) + finish(4) + } + + @Test + fun testUpstreamExceptionConcurrentWithDownstreamCancellation() = runTest { + val flow = flow { + try { + expect(1) + emit(1) + } finally { + expect(3) + throw TestException() + } + }.catch { expectUnreached() }.onEach { + expect(2) + throw CancellationException("") + } + + assertFailsWith(flow) + finish(4) + } + + @Test + fun testUpstreamCancellationIsIgnoredWhenDownstreamFails() = runTest { + val flow = flow { + try { + expect(1) + emit(1) + } finally { + expect(3) + throw CancellationException("") + } + }.catch { expectUnreached() }.onEach { + expect(2) + throw TestException("") + } + + assertFailsWith(flow) + finish(4) + } } diff --git a/kotlinx-coroutines-core/common/test/flow/operators/RetryTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/RetryTest.kt index b8a6b198fb..e5dde1b7fc 100644 --- a/kotlinx-coroutines-core/common/test/flow/operators/RetryTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/operators/RetryTest.kt @@ -104,4 +104,61 @@ class RetryTest : TestBase() { job.cancelAndJoin() finish(3) } -} \ No newline at end of file + + @Test + fun testUpstreamExceptionConcurrentWithDownstream() = runTest { + val flow = flow { + try { + expect(1) + emit(1) + } finally { + expect(3) + throw TestException() + } + }.retry { expectUnreached(); true }.onEach { + expect(2) + throw TestException2() + } + + assertFailsWith(flow) + finish(4) + } + + @Test + fun testUpstreamExceptionConcurrentWithDownstreamCancellation() = runTest { + val flow = flow { + try { + expect(1) + emit(1) + } finally { + expect(3) + throw TestException() + } + }.retry { expectUnreached(); true }.onEach { + expect(2) + throw CancellationException("") + } + + assertFailsWith(flow) + finish(4) + } + + @Test + fun testUpstreamCancellationIsIgnoredWhenDownstreamFails() = runTest { + val flow = flow { + try { + expect(1) + emit(1) + } finally { + expect(3) + throw CancellationException("") + } + }.retry { expectUnreached(); true }.onEach { + expect(2) + throw TestException("") + } + + assertFailsWith(flow) + finish(4) + } +} diff --git a/kotlinx-coroutines-core/jvm/test/exceptions/FlowSuppressionTest.kt b/kotlinx-coroutines-core/jvm/test/exceptions/FlowSuppressionTest.kt new file mode 100644 index 0000000000..41fe090382 --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/exceptions/FlowSuppressionTest.kt @@ -0,0 +1,74 @@ +/* + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.exceptions + +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.* +import org.junit.* +import org.junit.Test +import kotlin.test.* + +class FlowSuppressionTest : TestBase() { + @Test + fun testSuppressionForPrimaryException() = runTest { + val flow = flow { + try { + emit(1) + } finally { + throw TestException() + } + }.catch { expectUnreached() }.onEach { throw TestException2() } + + try { + flow.collect() + } catch (e: Throwable) { + assertIs(e) + assertIs(e.suppressed[0]) + } + } + + @Test + fun testSuppressionForPrimaryExceptionRetry() = runTest { + val flow = flow { + try { + emit(1) + } finally { + throw TestException() + } + }.retry { expectUnreached(); true }.onEach { throw TestException2() } + + try { + flow.collect() + } catch (e: Throwable) { + assertIs(e) + assertIs(e.suppressed[0]) + + } + } + + @Test + fun testCancellationSuppression() = runTest { + val flow = flow { + try { + expect(1) + emit(1) + } finally { + expect(3) + throw CancellationException("") + } + }.catch { expectUnreached() }.onEach { + expect(2) + throw TestException("") + } + + try { + flow.collect() + } catch (e: Throwable) { + assertIs(e) + assertIs(e.suppressed[0]) + } + finish(4) + } +}