From b3eade1f662d5bfdbe70fd5877c182167b2a226b Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Thu, 11 Nov 2021 19:13:08 +0300 Subject: [PATCH] Improve exception transparency: explicitly allow throwing exceptions from the upstream when the downstream has been failed, but also suppress such exceptions by the downstream one It solves the problem of graceful shutdown: when the upstream fails unwillingly (e.g. file.close() has thrown in 'finally') block, we cannot treat is as an exception transparency violation (hint: 'finally' is a shortcut for 'catch'+body), but we also cannot leave things as is, otherwise it leads to unforeseen consequences such as successful 'retry' and 'catch' operators that may, or may not, then fail with exception on attempt to emit. Downstream exception supersedes the upstream exception only if it is not an instance of CancellationException, semantically emulating cancellation-friendly 'use' block. Fixes #2860 --- .../common/src/flow/Flow.kt | 18 +++++++-- .../common/src/flow/operators/Errors.kt | 39 +++++++++++++++++- .../common/test/flow/operators/CatchTest.kt | 38 ++++++++++++++++++ .../common/test/flow/operators/RetryTest.kt | 40 ++++++++++++++++++- 4 files changed, 129 insertions(+), 6 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/flow/Flow.kt b/kotlinx-coroutines-core/common/src/flow/Flow.kt index 0ccd343ead..cd4dbbe05b 100644 --- a/kotlinx-coroutines-core/common/src/flow/Flow.kt +++ b/kotlinx-coroutines-core/common/src/flow/Flow.kt @@ -131,10 +131,14 @@ import kotlin.coroutines.* * * ### Exception transparency * - * Flow implementations never catch or handle exceptions that occur in downstream flows. From the implementation standpoint - * it means that calls to [emit][FlowCollector.emit] and [emitAll] shall never be wrapped into - * `try { ... } catch { ... }` blocks. Exception handling in flows shall be performed with - * [catch][Flow.catch] operator and it is designed to only catch exceptions coming from upstream flows while passing + * Flow implementations never explicitly catch and ignore exceptions that occur in downstream flows. + * From the implementation standpoint, it means that `catch` blocks that wrap calls to [emit][FlowCollector.emit] and [emitAll] + * are not allowed to complete normally or attempt to call [emit][FlowCollector.emit], they are only allowed + * to rethrow a caught exception or throw a different exception for diagnostics or application-specific purposes. + * + * + * Exception handling with further emission in flows shall only be performed with + * [catch][Flow.catch] operator, and it is designed to only catch exceptions coming from upstream flows while passing * all downstream exceptions. Similarly, terminal operators like [collect][Flow.collect] * throw any unhandled exceptions that occur in their code or in upstream flows, for example: * @@ -147,6 +151,11 @@ import kotlin.coroutines.* * ``` * The same reasoning can be applied to the [onCompletion] operator that is a declarative replacement for the `finally` block. * + * All exception-handling Flow operators follow the principle of exception suppression: + * + * If the upstream flow throws an exception during its completion when the downstream exception has been thrown, + * the upstream exception becomes superseded and suppressed by the downstream exception. + * * Failure to adhere to the exception transparency requirement can lead to strange behaviors which make * it hard to reason about the code because an exception in the `collect { ... }` could be somehow "caught" * by an upstream flow, limiting the ability of local reasoning about the code. @@ -154,6 +163,7 @@ import kotlin.coroutines.* * Flow machinery enforces exception transparency at runtime and throws [IllegalStateException] on any attempt to emit a value, * if an exception has been thrown on previous attempt. * + * * ### Reactive streams * * Flow is [Reactive Streams](http://www.reactive-streams.org/) compliant, you can safely interop it with diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Errors.kt b/kotlinx-coroutines-core/common/src/flow/operators/Errors.kt index 608221e09f..f7eb21a443 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Errors.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Errors.kt @@ -186,6 +186,7 @@ public fun Flow.retryWhen(predicate: suspend FlowCollector.(cause: Thr } // Return exception from upstream or null +@Suppress("NAME_SHADOWING") internal suspend fun Flow.catchImpl( collector: FlowCollector ): Throwable? { @@ -200,6 +201,8 @@ internal suspend fun Flow.catchImpl( } } } catch (e: Throwable) { + // Otherwise, smartcast is impossible + val fromDownstream = fromDownstream /* * First check ensures that we catch an original exception, not one rethrown by an operator. * Seconds check ignores cancellation causes, they cannot be caught. @@ -207,7 +210,41 @@ internal suspend fun Flow.catchImpl( if (e.isSameExceptionAs(fromDownstream) || e.isCancellationCause(coroutineContext)) { throw e // Rethrow exceptions from downstream and cancellation causes } else { - return e // not from downstream + /* + * The exception came from the upstream [semi-] independently. + * For pure failures, when the downstream functions normally, we handle the exception as intended. + * But if the downstream has failed prior to or concurrently + * with the upstream, we forcefully rethrow it, preserving the contextual information and ensuring + * that it's not lost. + */ + if (fromDownstream == null) { + return e + } + /* + * We consider "downstream" exception as the superseding one even if the + * upstream has failed (unless downstream exception is a cancellation exception, aligned with + * our cancellation mechanism), so it effectively suppresses it. + * That's important for the following scenarios: + * ``` + * flow { + * val resource = ... + * try { + * ... emit as well ... + * } finally { + * resource.close() // Unlucky throw + * } + * }.catch { } /* or retry */ + * .collect { ... } + * ``` + * when *the downstream* throws. + */ + if (fromDownstream is CancellationException) { + e.addSuppressed(fromDownstream) + throw e + } else { + fromDownstream.addSuppressed(e) + throw fromDownstream + } } } return null diff --git a/kotlinx-coroutines-core/common/test/flow/operators/CatchTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/CatchTest.kt index 447eb73b5d..af9b3211c3 100644 --- a/kotlinx-coroutines-core/common/test/flow/operators/CatchTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/operators/CatchTest.kt @@ -144,4 +144,42 @@ class CatchTest : TestBase() { .collect() finish(9) } + + @Test + fun testUpstreamExceptionConcurrentWithDownstream() = runTest { + val flow = flow { + try { + expect(1) + emit(1) + } finally { + expect(3) + throw TestException() + } + }.catch { expectUnreached() }.onEach { + expect(2) + throw TestException2() + } + + assertFailsWith(flow) + finish(4) + } + + @Test + fun testUpstreamExceptionConcurrentWithDownstreamCancellation() = runTest { + val flow = flow { + try { + expect(1) + emit(1) + } finally { + expect(3) + throw TestException() + } + }.catch { expectUnreached() }.onEach { + expect(2) + throw CancellationException("") + } + + assertFailsWith(flow) + finish(4) + } } diff --git a/kotlinx-coroutines-core/common/test/flow/operators/RetryTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/RetryTest.kt index b8a6b198fb..abd04ce577 100644 --- a/kotlinx-coroutines-core/common/test/flow/operators/RetryTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/operators/RetryTest.kt @@ -104,4 +104,42 @@ class RetryTest : TestBase() { job.cancelAndJoin() finish(3) } -} \ No newline at end of file + + @Test + fun testUpstreamExceptionConcurrentWithDownstream() = runTest { + val flow = flow { + try { + expect(1) + emit(1) + } finally { + expect(3) + throw TestException() + } + }.retry { expectUnreached(); true }.onEach { + expect(2) + throw TestException2() + } + + assertFailsWith(flow) + finish(4) + } + + @Test + fun testUpstreamExceptionConcurrentWithDownstreamCancellation() = runTest { + val flow = flow { + try { + expect(1) + emit(1) + } finally { + expect(3) + throw TestException() + } + }.retry { expectUnreached(); true }.onEach { + expect(2) + throw CancellationException("") + } + + assertFailsWith(flow) + finish(4) + } +}