From 9227f94a83238247a4b237fd87b9d71f9a9e196a Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Fri, 12 Nov 2021 18:00:43 +0300 Subject: [PATCH] Change the exception suppression invariant to work backwards: All the benefits of the approach stay the same, but additionally, for an arbitrary flow pipeline, adding a catch operator that is not triggered will no longer change the type of resulting exception --- .../common/src/flow/Flow.kt | 6 ++- .../common/src/flow/operators/Errors.kt | 11 ++-- .../common/test/flow/operators/CatchTest.kt | 2 +- .../common/test/flow/operators/RetryTest.kt | 2 +- .../test/exceptions/FlowSuppressionTest.kt | 50 +++++++++++++++++++ 5 files changed, 61 insertions(+), 10 deletions(-) create mode 100644 kotlinx-coroutines-core/jvm/test/exceptions/FlowSuppressionTest.kt diff --git a/kotlinx-coroutines-core/common/src/flow/Flow.kt b/kotlinx-coroutines-core/common/src/flow/Flow.kt index 350087983c..6182b0697a 100644 --- a/kotlinx-coroutines-core/common/src/flow/Flow.kt +++ b/kotlinx-coroutines-core/common/src/flow/Flow.kt @@ -133,7 +133,7 @@ import kotlin.coroutines.* * * When `emit` or `emitAll` throws, the Flow implementations must immediately stop emitting new values and finish with an exception. * For diagnostics or application-specific purposes, the exception may be different from the one thrown by the emit operation, - * but then it will be suppressed, as discussed below. + * suppressing the original exception as discussed below. * If there is a need to emit values after the downstream failed, please use the [catch][Flow.catch] operator. * * The [catch][Flow.catch] operator only catches upstream exceptions, but passes @@ -152,7 +152,9 @@ import kotlin.coroutines.* * All exception-handling Flow operators follow the principle of exception suppression: * * If the upstream flow throws an exception during its completion when the downstream exception has been thrown, - * the upstream exception becomes superseded and suppressed by the downstream exception. + * the downstream exception becomes superseded and suppressed by the upstream exception, being a semantic + * equivalent of throwing from `finally` block. Exception-handling operators then ignore this exception, + * still following the downstream failure. * * Failure to adhere to the exception transparency requirement can lead to strange behaviors which make * it hard to reason about the code because an exception in the `collect { ... }` could be somehow "caught" diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Errors.kt b/kotlinx-coroutines-core/common/src/flow/operators/Errors.kt index f7eb21a443..66a260bea5 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Errors.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Errors.kt @@ -214,8 +214,7 @@ internal suspend fun Flow.catchImpl( * The exception came from the upstream [semi-] independently. * For pure failures, when the downstream functions normally, we handle the exception as intended. * But if the downstream has failed prior to or concurrently - * with the upstream, we forcefully rethrow it, preserving the contextual information and ensuring - * that it's not lost. + * with the upstream, we forcefully rethrow it, preserving the contextual information and ensuring that it's not lost. */ if (fromDownstream == null) { return e @@ -238,12 +237,12 @@ internal suspend fun Flow.catchImpl( * ``` * when *the downstream* throws. */ - if (fromDownstream is CancellationException) { - e.addSuppressed(fromDownstream) - throw e - } else { + if (e is CancellationException) { fromDownstream.addSuppressed(e) throw fromDownstream + } else { + e.addSuppressed(fromDownstream) + throw e } } } diff --git a/kotlinx-coroutines-core/common/test/flow/operators/CatchTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/CatchTest.kt index af9b3211c3..7e483bf93a 100644 --- a/kotlinx-coroutines-core/common/test/flow/operators/CatchTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/operators/CatchTest.kt @@ -160,7 +160,7 @@ class CatchTest : TestBase() { throw TestException2() } - assertFailsWith(flow) + assertFailsWith(flow) finish(4) } diff --git a/kotlinx-coroutines-core/common/test/flow/operators/RetryTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/RetryTest.kt index abd04ce577..376f1c29bc 100644 --- a/kotlinx-coroutines-core/common/test/flow/operators/RetryTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/operators/RetryTest.kt @@ -120,7 +120,7 @@ class RetryTest : TestBase() { throw TestException2() } - assertFailsWith(flow) + assertFailsWith(flow) finish(4) } diff --git a/kotlinx-coroutines-core/jvm/test/exceptions/FlowSuppressionTest.kt b/kotlinx-coroutines-core/jvm/test/exceptions/FlowSuppressionTest.kt new file mode 100644 index 0000000000..5a2fd191af --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/exceptions/FlowSuppressionTest.kt @@ -0,0 +1,50 @@ +/* + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.exceptions + +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.* +import org.junit.* +import org.junit.Test +import kotlin.test.* + +class FlowSuppressionTest : TestBase() { + @Test + fun testSuppressionForPrimaryException() = runTest { + val flow = flow { + try { + emit(1) + } finally { + throw TestException() + } + }.catch { expectUnreached() }.onEach { throw TestException2() } + + try { + flow.collect() + } catch (e: Throwable) { + assertIs(e) + assertIs(e.suppressed[0]) + } + } + + @Test + fun testSuppressionForPrimaryExceptionRetry() = runTest { + val flow = flow { + try { + emit(1) + } finally { + throw TestException() + } + }.retry { expectUnreached(); true }.onEach { throw TestException2() } + + try { + flow.collect() + } catch (e: Throwable) { + assertIs(e) + assertIs(e.suppressed[0]) + + } + } +}