From 9099b03005758a5ce369a5bc29b9d1d98b9a95b2 Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Tue, 16 Nov 2021 19:10:14 +0300 Subject: [PATCH] =?UTF-8?q?Improve=20exception=20transparency:=20explicitl?= =?UTF-8?q?y=20allow=20throwing=20exceptions=20=E2=80=A6=20(#3017)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Improve exception transparency: explicitly allow throwing exceptions from the upstream when the downstream has been failed, suppress the downstream exception by the new one, but still ignore it in the exception handling operators, that still consider the flow failed. It solves the problem of graceful shutdown: when the upstream fails unwillingly (e.g. `file.close()` has thrown in `finally` block), we cannot treat it as an exception transparency violation (hint: `finally` is a shortcut for `catch` + body that rethrows in the end), but we also cannot leave things as is, otherwise, it leads to unforeseen consequences such as successful `retry` and `catch` operators that may, or may not, then fail with an exception on an attempt to emit. Upstream exception supersedes the downstream exception only if it is not an instance of `CancellationException`, semantically emulating cancellation-friendly 'use' block. Fixes #2860 --- .../common/src/flow/Flow.kt | 17 ++++- .../common/src/flow/operators/Errors.kt | 39 +++++++++- .../common/test/flow/operators/CatchTest.kt | 57 ++++++++++++++ .../common/test/flow/operators/RetryTest.kt | 59 ++++++++++++++- .../test/exceptions/FlowSuppressionTest.kt | 74 +++++++++++++++++++ 5 files changed, 240 insertions(+), 6 deletions(-) create mode 100644 kotlinx-coroutines-core/jvm/test/exceptions/FlowSuppressionTest.kt diff --git a/kotlinx-coroutines-core/common/src/flow/Flow.kt b/kotlinx-coroutines-core/common/src/flow/Flow.kt index 0ccd343ead..259f477de0 100644 --- a/kotlinx-coroutines-core/common/src/flow/Flow.kt +++ b/kotlinx-coroutines-core/common/src/flow/Flow.kt @@ -131,10 +131,12 @@ import kotlin.coroutines.* * * ### Exception transparency * - * Flow implementations never catch or handle exceptions that occur in downstream flows. From the implementation standpoint - * it means that calls to [emit][FlowCollector.emit] and [emitAll] shall never be wrapped into - * `try { ... } catch { ... }` blocks. Exception handling in flows shall be performed with - * [catch][Flow.catch] operator and it is designed to only catch exceptions coming from upstream flows while passing + * When `emit` or `emitAll` throws, the Flow implementations must immediately stop emitting new values and finish with an exception. + * For diagnostics or application-specific purposes, the exception may be different from the one thrown by the emit operation, + * suppressing the original exception as discussed below. + * If there is a need to emit values after the downstream failed, please use the [catch][Flow.catch] operator. + * + * The [catch][Flow.catch] operator only catches upstream exceptions, but passes * all downstream exceptions. Similarly, terminal operators like [collect][Flow.collect] * throw any unhandled exceptions that occur in their code or in upstream flows, for example: * @@ -147,6 +149,13 @@ import kotlin.coroutines.* * ``` * The same reasoning can be applied to the [onCompletion] operator that is a declarative replacement for the `finally` block. * + * All exception-handling Flow operators follow the principle of exception suppression: + * + * If the upstream flow throws an exception during its completion when the downstream exception has been thrown, + * the downstream exception becomes superseded and suppressed by the upstream exception, being a semantic + * equivalent of throwing from `finally` block. However, this doesn't affect the operation of the exception-handling operators, + * which consider the downstream exception to be the root cause and behave as if the upstream didn't throw anything. + * * Failure to adhere to the exception transparency requirement can lead to strange behaviors which make * it hard to reason about the code because an exception in the `collect { ... }` could be somehow "caught" * by an upstream flow, limiting the ability of local reasoning about the code. diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Errors.kt b/kotlinx-coroutines-core/common/src/flow/operators/Errors.kt index 608221e09f..c406c8bd72 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Errors.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Errors.kt @@ -186,6 +186,7 @@ public fun Flow.retryWhen(predicate: suspend FlowCollector.(cause: Thr } // Return exception from upstream or null +@Suppress("NAME_SHADOWING") internal suspend fun Flow.catchImpl( collector: FlowCollector ): Throwable? { @@ -200,6 +201,8 @@ internal suspend fun Flow.catchImpl( } } } catch (e: Throwable) { + // Otherwise, smartcast is impossible + val fromDownstream = fromDownstream /* * First check ensures that we catch an original exception, not one rethrown by an operator. * Seconds check ignores cancellation causes, they cannot be caught. @@ -207,7 +210,41 @@ internal suspend fun Flow.catchImpl( if (e.isSameExceptionAs(fromDownstream) || e.isCancellationCause(coroutineContext)) { throw e // Rethrow exceptions from downstream and cancellation causes } else { - return e // not from downstream + /* + * The exception came from the upstream [semi-] independently. + * For pure failures, when the downstream functions normally, we handle the exception as intended. + * But if the downstream has failed prior to or concurrently + * with the upstream, we forcefully rethrow it, preserving the contextual information and ensuring that it's not lost. + */ + if (fromDownstream == null) { + return e + } + /* + * We consider the upstream exception as the superseding one when both upstream and downstream + * fail, suppressing the downstream exception, and operating similarly to `finally` block with + * the useful addition of adding the original downstream exception to suppressed ones. + * + * That's important for the following scenarios: + * ``` + * flow { + * val resource = ... + * try { + * ... emit as well ... + * } finally { + * resource.close() // Throws in the shutdown sequence when 'collect' already has thrown an exception + * } + * }.catch { } // or retry + * .collect { ... } + * ``` + * when *the downstream* throws. + */ + if (e is CancellationException) { + fromDownstream.addSuppressed(e) + throw fromDownstream + } else { + e.addSuppressed(fromDownstream) + throw e + } } } return null diff --git a/kotlinx-coroutines-core/common/test/flow/operators/CatchTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/CatchTest.kt index 447eb73b5d..ad91e49898 100644 --- a/kotlinx-coroutines-core/common/test/flow/operators/CatchTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/operators/CatchTest.kt @@ -144,4 +144,61 @@ class CatchTest : TestBase() { .collect() finish(9) } + + @Test + fun testUpstreamExceptionConcurrentWithDownstream() = runTest { + val flow = flow { + try { + expect(1) + emit(1) + } finally { + expect(3) + throw TestException() + } + }.catch { expectUnreached() }.onEach { + expect(2) + throw TestException2() + } + + assertFailsWith(flow) + finish(4) + } + + @Test + fun testUpstreamExceptionConcurrentWithDownstreamCancellation() = runTest { + val flow = flow { + try { + expect(1) + emit(1) + } finally { + expect(3) + throw TestException() + } + }.catch { expectUnreached() }.onEach { + expect(2) + throw CancellationException("") + } + + assertFailsWith(flow) + finish(4) + } + + @Test + fun testUpstreamCancellationIsIgnoredWhenDownstreamFails() = runTest { + val flow = flow { + try { + expect(1) + emit(1) + } finally { + expect(3) + throw CancellationException("") + } + }.catch { expectUnreached() }.onEach { + expect(2) + throw TestException("") + } + + assertFailsWith(flow) + finish(4) + } } diff --git a/kotlinx-coroutines-core/common/test/flow/operators/RetryTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/RetryTest.kt index b8a6b198fb..e5dde1b7fc 100644 --- a/kotlinx-coroutines-core/common/test/flow/operators/RetryTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/operators/RetryTest.kt @@ -104,4 +104,61 @@ class RetryTest : TestBase() { job.cancelAndJoin() finish(3) } -} \ No newline at end of file + + @Test + fun testUpstreamExceptionConcurrentWithDownstream() = runTest { + val flow = flow { + try { + expect(1) + emit(1) + } finally { + expect(3) + throw TestException() + } + }.retry { expectUnreached(); true }.onEach { + expect(2) + throw TestException2() + } + + assertFailsWith(flow) + finish(4) + } + + @Test + fun testUpstreamExceptionConcurrentWithDownstreamCancellation() = runTest { + val flow = flow { + try { + expect(1) + emit(1) + } finally { + expect(3) + throw TestException() + } + }.retry { expectUnreached(); true }.onEach { + expect(2) + throw CancellationException("") + } + + assertFailsWith(flow) + finish(4) + } + + @Test + fun testUpstreamCancellationIsIgnoredWhenDownstreamFails() = runTest { + val flow = flow { + try { + expect(1) + emit(1) + } finally { + expect(3) + throw CancellationException("") + } + }.retry { expectUnreached(); true }.onEach { + expect(2) + throw TestException("") + } + + assertFailsWith(flow) + finish(4) + } +} diff --git a/kotlinx-coroutines-core/jvm/test/exceptions/FlowSuppressionTest.kt b/kotlinx-coroutines-core/jvm/test/exceptions/FlowSuppressionTest.kt new file mode 100644 index 0000000000..41fe090382 --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/exceptions/FlowSuppressionTest.kt @@ -0,0 +1,74 @@ +/* + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.exceptions + +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.* +import org.junit.* +import org.junit.Test +import kotlin.test.* + +class FlowSuppressionTest : TestBase() { + @Test + fun testSuppressionForPrimaryException() = runTest { + val flow = flow { + try { + emit(1) + } finally { + throw TestException() + } + }.catch { expectUnreached() }.onEach { throw TestException2() } + + try { + flow.collect() + } catch (e: Throwable) { + assertIs(e) + assertIs(e.suppressed[0]) + } + } + + @Test + fun testSuppressionForPrimaryExceptionRetry() = runTest { + val flow = flow { + try { + emit(1) + } finally { + throw TestException() + } + }.retry { expectUnreached(); true }.onEach { throw TestException2() } + + try { + flow.collect() + } catch (e: Throwable) { + assertIs(e) + assertIs(e.suppressed[0]) + + } + } + + @Test + fun testCancellationSuppression() = runTest { + val flow = flow { + try { + expect(1) + emit(1) + } finally { + expect(3) + throw CancellationException("") + } + }.catch { expectUnreached() }.onEach { + expect(2) + throw TestException("") + } + + try { + flow.collect() + } catch (e: Throwable) { + assertIs(e) + assertIs(e.suppressed[0]) + } + finish(4) + } +}