From b3eade1f662d5bfdbe70fd5877c182167b2a226b Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Thu, 11 Nov 2021 19:13:08 +0300 Subject: [PATCH 1/7] Improve exception transparency: explicitly allow throwing exceptions from the upstream when the downstream has been failed, but also suppress such exceptions by the downstream one It solves the problem of graceful shutdown: when the upstream fails unwillingly (e.g. file.close() has thrown in 'finally') block, we cannot treat is as an exception transparency violation (hint: 'finally' is a shortcut for 'catch'+body), but we also cannot leave things as is, otherwise it leads to unforeseen consequences such as successful 'retry' and 'catch' operators that may, or may not, then fail with exception on attempt to emit. Downstream exception supersedes the upstream exception only if it is not an instance of CancellationException, semantically emulating cancellation-friendly 'use' block. Fixes #2860 --- .../common/src/flow/Flow.kt | 18 +++++++-- .../common/src/flow/operators/Errors.kt | 39 +++++++++++++++++- .../common/test/flow/operators/CatchTest.kt | 38 ++++++++++++++++++ .../common/test/flow/operators/RetryTest.kt | 40 ++++++++++++++++++- 4 files changed, 129 insertions(+), 6 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/flow/Flow.kt b/kotlinx-coroutines-core/common/src/flow/Flow.kt index 0ccd343ead..cd4dbbe05b 100644 --- a/kotlinx-coroutines-core/common/src/flow/Flow.kt +++ b/kotlinx-coroutines-core/common/src/flow/Flow.kt @@ -131,10 +131,14 @@ import kotlin.coroutines.* * * ### Exception transparency * - * Flow implementations never catch or handle exceptions that occur in downstream flows. From the implementation standpoint - * it means that calls to [emit][FlowCollector.emit] and [emitAll] shall never be wrapped into - * `try { ... } catch { ... }` blocks. Exception handling in flows shall be performed with - * [catch][Flow.catch] operator and it is designed to only catch exceptions coming from upstream flows while passing + * Flow implementations never explicitly catch and ignore exceptions that occur in downstream flows. + * From the implementation standpoint, it means that `catch` blocks that wrap calls to [emit][FlowCollector.emit] and [emitAll] + * are not allowed to complete normally or attempt to call [emit][FlowCollector.emit], they are only allowed + * to rethrow a caught exception or throw a different exception for diagnostics or application-specific purposes. + * + * + * Exception handling with further emission in flows shall only be performed with + * [catch][Flow.catch] operator, and it is designed to only catch exceptions coming from upstream flows while passing * all downstream exceptions. Similarly, terminal operators like [collect][Flow.collect] * throw any unhandled exceptions that occur in their code or in upstream flows, for example: * @@ -147,6 +151,11 @@ import kotlin.coroutines.* * ``` * The same reasoning can be applied to the [onCompletion] operator that is a declarative replacement for the `finally` block. * + * All exception-handling Flow operators follow the principle of exception suppression: + * + * If the upstream flow throws an exception during its completion when the downstream exception has been thrown, + * the upstream exception becomes superseded and suppressed by the downstream exception. + * * Failure to adhere to the exception transparency requirement can lead to strange behaviors which make * it hard to reason about the code because an exception in the `collect { ... }` could be somehow "caught" * by an upstream flow, limiting the ability of local reasoning about the code. @@ -154,6 +163,7 @@ import kotlin.coroutines.* * Flow machinery enforces exception transparency at runtime and throws [IllegalStateException] on any attempt to emit a value, * if an exception has been thrown on previous attempt. * + * * ### Reactive streams * * Flow is [Reactive Streams](http://www.reactive-streams.org/) compliant, you can safely interop it with diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Errors.kt b/kotlinx-coroutines-core/common/src/flow/operators/Errors.kt index 608221e09f..f7eb21a443 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Errors.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Errors.kt @@ -186,6 +186,7 @@ public fun Flow.retryWhen(predicate: suspend FlowCollector.(cause: Thr } // Return exception from upstream or null +@Suppress("NAME_SHADOWING") internal suspend fun Flow.catchImpl( collector: FlowCollector ): Throwable? { @@ -200,6 +201,8 @@ internal suspend fun Flow.catchImpl( } } } catch (e: Throwable) { + // Otherwise, smartcast is impossible + val fromDownstream = fromDownstream /* * First check ensures that we catch an original exception, not one rethrown by an operator. * Seconds check ignores cancellation causes, they cannot be caught. @@ -207,7 +210,41 @@ internal suspend fun Flow.catchImpl( if (e.isSameExceptionAs(fromDownstream) || e.isCancellationCause(coroutineContext)) { throw e // Rethrow exceptions from downstream and cancellation causes } else { - return e // not from downstream + /* + * The exception came from the upstream [semi-] independently. + * For pure failures, when the downstream functions normally, we handle the exception as intended. + * But if the downstream has failed prior to or concurrently + * with the upstream, we forcefully rethrow it, preserving the contextual information and ensuring + * that it's not lost. + */ + if (fromDownstream == null) { + return e + } + /* + * We consider "downstream" exception as the superseding one even if the + * upstream has failed (unless downstream exception is a cancellation exception, aligned with + * our cancellation mechanism), so it effectively suppresses it. + * That's important for the following scenarios: + * ``` + * flow { + * val resource = ... + * try { + * ... emit as well ... + * } finally { + * resource.close() // Unlucky throw + * } + * }.catch { } /* or retry */ + * .collect { ... } + * ``` + * when *the downstream* throws. + */ + if (fromDownstream is CancellationException) { + e.addSuppressed(fromDownstream) + throw e + } else { + fromDownstream.addSuppressed(e) + throw fromDownstream + } } } return null diff --git a/kotlinx-coroutines-core/common/test/flow/operators/CatchTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/CatchTest.kt index 447eb73b5d..af9b3211c3 100644 --- a/kotlinx-coroutines-core/common/test/flow/operators/CatchTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/operators/CatchTest.kt @@ -144,4 +144,42 @@ class CatchTest : TestBase() { .collect() finish(9) } + + @Test + fun testUpstreamExceptionConcurrentWithDownstream() = runTest { + val flow = flow { + try { + expect(1) + emit(1) + } finally { + expect(3) + throw TestException() + } + }.catch { expectUnreached() }.onEach { + expect(2) + throw TestException2() + } + + assertFailsWith(flow) + finish(4) + } + + @Test + fun testUpstreamExceptionConcurrentWithDownstreamCancellation() = runTest { + val flow = flow { + try { + expect(1) + emit(1) + } finally { + expect(3) + throw TestException() + } + }.catch { expectUnreached() }.onEach { + expect(2) + throw CancellationException("") + } + + assertFailsWith(flow) + finish(4) + } } diff --git a/kotlinx-coroutines-core/common/test/flow/operators/RetryTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/RetryTest.kt index b8a6b198fb..abd04ce577 100644 --- a/kotlinx-coroutines-core/common/test/flow/operators/RetryTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/operators/RetryTest.kt @@ -104,4 +104,42 @@ class RetryTest : TestBase() { job.cancelAndJoin() finish(3) } -} \ No newline at end of file + + @Test + fun testUpstreamExceptionConcurrentWithDownstream() = runTest { + val flow = flow { + try { + expect(1) + emit(1) + } finally { + expect(3) + throw TestException() + } + }.retry { expectUnreached(); true }.onEach { + expect(2) + throw TestException2() + } + + assertFailsWith(flow) + finish(4) + } + + @Test + fun testUpstreamExceptionConcurrentWithDownstreamCancellation() = runTest { + val flow = flow { + try { + expect(1) + emit(1) + } finally { + expect(3) + throw TestException() + } + }.retry { expectUnreached(); true }.onEach { + expect(2) + throw CancellationException("") + } + + assertFailsWith(flow) + finish(4) + } +} From 08a3ed34dec1c9d74289e68f621bde1448d256f1 Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Fri, 12 Nov 2021 16:51:59 +0300 Subject: [PATCH 2/7] Update kotlinx-coroutines-core/common/src/flow/Flow.kt Co-authored-by: dkhalanskyjb <52952525+dkhalanskyjb@users.noreply.github.com> --- kotlinx-coroutines-core/common/src/flow/Flow.kt | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/flow/Flow.kt b/kotlinx-coroutines-core/common/src/flow/Flow.kt index cd4dbbe05b..350087983c 100644 --- a/kotlinx-coroutines-core/common/src/flow/Flow.kt +++ b/kotlinx-coroutines-core/common/src/flow/Flow.kt @@ -131,14 +131,12 @@ import kotlin.coroutines.* * * ### Exception transparency * - * Flow implementations never explicitly catch and ignore exceptions that occur in downstream flows. - * From the implementation standpoint, it means that `catch` blocks that wrap calls to [emit][FlowCollector.emit] and [emitAll] - * are not allowed to complete normally or attempt to call [emit][FlowCollector.emit], they are only allowed - * to rethrow a caught exception or throw a different exception for diagnostics or application-specific purposes. + * When `emit` or `emitAll` throws, the Flow implementations must immediately stop emitting new values and finish with an exception. + * For diagnostics or application-specific purposes, the exception may be different from the one thrown by the emit operation, + * but then it will be suppressed, as discussed below. + * If there is a need to emit values after the downstream failed, please use the [catch][Flow.catch] operator. * - * - * Exception handling with further emission in flows shall only be performed with - * [catch][Flow.catch] operator, and it is designed to only catch exceptions coming from upstream flows while passing + * The [catch][Flow.catch] operator only catches upstream exceptions, but passes * all downstream exceptions. Similarly, terminal operators like [collect][Flow.collect] * throw any unhandled exceptions that occur in their code or in upstream flows, for example: * From 9227f94a83238247a4b237fd87b9d71f9a9e196a Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Fri, 12 Nov 2021 18:00:43 +0300 Subject: [PATCH 3/7] Change the exception suppression invariant to work backwards: All the benefits of the approach stay the same, but additionally, for an arbitrary flow pipeline, adding a catch operator that is not triggered will no longer change the type of resulting exception --- .../common/src/flow/Flow.kt | 6 ++- .../common/src/flow/operators/Errors.kt | 11 ++-- .../common/test/flow/operators/CatchTest.kt | 2 +- .../common/test/flow/operators/RetryTest.kt | 2 +- .../test/exceptions/FlowSuppressionTest.kt | 50 +++++++++++++++++++ 5 files changed, 61 insertions(+), 10 deletions(-) create mode 100644 kotlinx-coroutines-core/jvm/test/exceptions/FlowSuppressionTest.kt diff --git a/kotlinx-coroutines-core/common/src/flow/Flow.kt b/kotlinx-coroutines-core/common/src/flow/Flow.kt index 350087983c..6182b0697a 100644 --- a/kotlinx-coroutines-core/common/src/flow/Flow.kt +++ b/kotlinx-coroutines-core/common/src/flow/Flow.kt @@ -133,7 +133,7 @@ import kotlin.coroutines.* * * When `emit` or `emitAll` throws, the Flow implementations must immediately stop emitting new values and finish with an exception. * For diagnostics or application-specific purposes, the exception may be different from the one thrown by the emit operation, - * but then it will be suppressed, as discussed below. + * suppressing the original exception as discussed below. * If there is a need to emit values after the downstream failed, please use the [catch][Flow.catch] operator. * * The [catch][Flow.catch] operator only catches upstream exceptions, but passes @@ -152,7 +152,9 @@ import kotlin.coroutines.* * All exception-handling Flow operators follow the principle of exception suppression: * * If the upstream flow throws an exception during its completion when the downstream exception has been thrown, - * the upstream exception becomes superseded and suppressed by the downstream exception. + * the downstream exception becomes superseded and suppressed by the upstream exception, being a semantic + * equivalent of throwing from `finally` block. Exception-handling operators then ignore this exception, + * still following the downstream failure. * * Failure to adhere to the exception transparency requirement can lead to strange behaviors which make * it hard to reason about the code because an exception in the `collect { ... }` could be somehow "caught" diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Errors.kt b/kotlinx-coroutines-core/common/src/flow/operators/Errors.kt index f7eb21a443..66a260bea5 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Errors.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Errors.kt @@ -214,8 +214,7 @@ internal suspend fun Flow.catchImpl( * The exception came from the upstream [semi-] independently. * For pure failures, when the downstream functions normally, we handle the exception as intended. * But if the downstream has failed prior to or concurrently - * with the upstream, we forcefully rethrow it, preserving the contextual information and ensuring - * that it's not lost. + * with the upstream, we forcefully rethrow it, preserving the contextual information and ensuring that it's not lost. */ if (fromDownstream == null) { return e @@ -238,12 +237,12 @@ internal suspend fun Flow.catchImpl( * ``` * when *the downstream* throws. */ - if (fromDownstream is CancellationException) { - e.addSuppressed(fromDownstream) - throw e - } else { + if (e is CancellationException) { fromDownstream.addSuppressed(e) throw fromDownstream + } else { + e.addSuppressed(fromDownstream) + throw e } } } diff --git a/kotlinx-coroutines-core/common/test/flow/operators/CatchTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/CatchTest.kt index af9b3211c3..7e483bf93a 100644 --- a/kotlinx-coroutines-core/common/test/flow/operators/CatchTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/operators/CatchTest.kt @@ -160,7 +160,7 @@ class CatchTest : TestBase() { throw TestException2() } - assertFailsWith(flow) + assertFailsWith(flow) finish(4) } diff --git a/kotlinx-coroutines-core/common/test/flow/operators/RetryTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/RetryTest.kt index abd04ce577..376f1c29bc 100644 --- a/kotlinx-coroutines-core/common/test/flow/operators/RetryTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/operators/RetryTest.kt @@ -120,7 +120,7 @@ class RetryTest : TestBase() { throw TestException2() } - assertFailsWith(flow) + assertFailsWith(flow) finish(4) } diff --git a/kotlinx-coroutines-core/jvm/test/exceptions/FlowSuppressionTest.kt b/kotlinx-coroutines-core/jvm/test/exceptions/FlowSuppressionTest.kt new file mode 100644 index 0000000000..5a2fd191af --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/exceptions/FlowSuppressionTest.kt @@ -0,0 +1,50 @@ +/* + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.exceptions + +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.* +import org.junit.* +import org.junit.Test +import kotlin.test.* + +class FlowSuppressionTest : TestBase() { + @Test + fun testSuppressionForPrimaryException() = runTest { + val flow = flow { + try { + emit(1) + } finally { + throw TestException() + } + }.catch { expectUnreached() }.onEach { throw TestException2() } + + try { + flow.collect() + } catch (e: Throwable) { + assertIs(e) + assertIs(e.suppressed[0]) + } + } + + @Test + fun testSuppressionForPrimaryExceptionRetry() = runTest { + val flow = flow { + try { + emit(1) + } finally { + throw TestException() + } + }.retry { expectUnreached(); true }.onEach { throw TestException2() } + + try { + flow.collect() + } catch (e: Throwable) { + assertIs(e) + assertIs(e.suppressed[0]) + + } + } +} From c1ec29b631251544a4de9258faabdca48dea93bf Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Tue, 16 Nov 2021 18:22:16 +0300 Subject: [PATCH 4/7] ~fix internal documentation comment --- .../common/src/flow/operators/Errors.kt | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Errors.kt b/kotlinx-coroutines-core/common/src/flow/operators/Errors.kt index 66a260bea5..c406c8bd72 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Errors.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Errors.kt @@ -220,9 +220,10 @@ internal suspend fun Flow.catchImpl( return e } /* - * We consider "downstream" exception as the superseding one even if the - * upstream has failed (unless downstream exception is a cancellation exception, aligned with - * our cancellation mechanism), so it effectively suppresses it. + * We consider the upstream exception as the superseding one when both upstream and downstream + * fail, suppressing the downstream exception, and operating similarly to `finally` block with + * the useful addition of adding the original downstream exception to suppressed ones. + * * That's important for the following scenarios: * ``` * flow { @@ -230,9 +231,9 @@ internal suspend fun Flow.catchImpl( * try { * ... emit as well ... * } finally { - * resource.close() // Unlucky throw + * resource.close() // Throws in the shutdown sequence when 'collect' already has thrown an exception * } - * }.catch { } /* or retry */ + * }.catch { } // or retry * .collect { ... } * ``` * when *the downstream* throws. From 0731fa354d2b1ddbfee915bea2fcdb6a70228414 Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Tue, 16 Nov 2021 18:34:06 +0300 Subject: [PATCH 5/7] ~remove excessive indent --- kotlinx-coroutines-core/common/src/flow/Flow.kt | 1 - 1 file changed, 1 deletion(-) diff --git a/kotlinx-coroutines-core/common/src/flow/Flow.kt b/kotlinx-coroutines-core/common/src/flow/Flow.kt index 6182b0697a..25609b3f0d 100644 --- a/kotlinx-coroutines-core/common/src/flow/Flow.kt +++ b/kotlinx-coroutines-core/common/src/flow/Flow.kt @@ -163,7 +163,6 @@ import kotlin.coroutines.* * Flow machinery enforces exception transparency at runtime and throws [IllegalStateException] on any attempt to emit a value, * if an exception has been thrown on previous attempt. * - * * ### Reactive streams * * Flow is [Reactive Streams](http://www.reactive-streams.org/) compliant, you can safely interop it with From 8a85b5e1e17920fdba02b517e1917167e7cff5a6 Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Tue, 16 Nov 2021 18:43:31 +0300 Subject: [PATCH 6/7] ~more tests --- .../common/test/flow/operators/CatchTest.kt | 19 +++++++++++++++ .../common/test/flow/operators/RetryTest.kt | 19 +++++++++++++++ .../test/exceptions/FlowSuppressionTest.kt | 24 +++++++++++++++++++ 3 files changed, 62 insertions(+) diff --git a/kotlinx-coroutines-core/common/test/flow/operators/CatchTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/CatchTest.kt index 7e483bf93a..ad91e49898 100644 --- a/kotlinx-coroutines-core/common/test/flow/operators/CatchTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/operators/CatchTest.kt @@ -182,4 +182,23 @@ class CatchTest : TestBase() { assertFailsWith(flow) finish(4) } + + @Test + fun testUpstreamCancellationIsIgnoredWhenDownstreamFails() = runTest { + val flow = flow { + try { + expect(1) + emit(1) + } finally { + expect(3) + throw CancellationException("") + } + }.catch { expectUnreached() }.onEach { + expect(2) + throw TestException("") + } + + assertFailsWith(flow) + finish(4) + } } diff --git a/kotlinx-coroutines-core/common/test/flow/operators/RetryTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/RetryTest.kt index 376f1c29bc..e5dde1b7fc 100644 --- a/kotlinx-coroutines-core/common/test/flow/operators/RetryTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/operators/RetryTest.kt @@ -142,4 +142,23 @@ class RetryTest : TestBase() { assertFailsWith(flow) finish(4) } + + @Test + fun testUpstreamCancellationIsIgnoredWhenDownstreamFails() = runTest { + val flow = flow { + try { + expect(1) + emit(1) + } finally { + expect(3) + throw CancellationException("") + } + }.retry { expectUnreached(); true }.onEach { + expect(2) + throw TestException("") + } + + assertFailsWith(flow) + finish(4) + } } diff --git a/kotlinx-coroutines-core/jvm/test/exceptions/FlowSuppressionTest.kt b/kotlinx-coroutines-core/jvm/test/exceptions/FlowSuppressionTest.kt index 5a2fd191af..41fe090382 100644 --- a/kotlinx-coroutines-core/jvm/test/exceptions/FlowSuppressionTest.kt +++ b/kotlinx-coroutines-core/jvm/test/exceptions/FlowSuppressionTest.kt @@ -47,4 +47,28 @@ class FlowSuppressionTest : TestBase() { } } + + @Test + fun testCancellationSuppression() = runTest { + val flow = flow { + try { + expect(1) + emit(1) + } finally { + expect(3) + throw CancellationException("") + } + }.catch { expectUnreached() }.onEach { + expect(2) + throw TestException("") + } + + try { + flow.collect() + } catch (e: Throwable) { + assertIs(e) + assertIs(e.suppressed[0]) + } + finish(4) + } } From 90722b6da82ef83fa50107c370596affc633ec7c Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Tue, 16 Nov 2021 19:07:49 +0300 Subject: [PATCH 7/7] ~improve KDoc --- kotlinx-coroutines-core/common/src/flow/Flow.kt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/flow/Flow.kt b/kotlinx-coroutines-core/common/src/flow/Flow.kt index 25609b3f0d..259f477de0 100644 --- a/kotlinx-coroutines-core/common/src/flow/Flow.kt +++ b/kotlinx-coroutines-core/common/src/flow/Flow.kt @@ -153,8 +153,8 @@ import kotlin.coroutines.* * * If the upstream flow throws an exception during its completion when the downstream exception has been thrown, * the downstream exception becomes superseded and suppressed by the upstream exception, being a semantic - * equivalent of throwing from `finally` block. Exception-handling operators then ignore this exception, - * still following the downstream failure. + * equivalent of throwing from `finally` block. However, this doesn't affect the operation of the exception-handling operators, + * which consider the downstream exception to be the root cause and behave as if the upstream didn't throw anything. * * Failure to adhere to the exception transparency requirement can lead to strange behaviors which make * it hard to reason about the code because an exception in the `collect { ... }` could be somehow "caught"