Skip to content

Commit

Permalink
Change the exception suppression invariant to work backwards:
Browse files Browse the repository at this point in the history
All the benifits of the approach stay the same, but additionally, for an arbitrary flow pipeline, adding a catch operator that is not triggered will no longer change the type of resulting exception
  • Loading branch information
qwwdfsad committed Nov 12, 2021
1 parent 08a3ed3 commit 935b83a
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 10 deletions.
6 changes: 4 additions & 2 deletions kotlinx-coroutines-core/common/src/flow/Flow.kt
Expand Up @@ -133,7 +133,7 @@ import kotlin.coroutines.*
*
* When `emit` or `emitAll` throws, the Flow implementations must immediately stop emitting new values and finish with an exception.
* For diagnostics or application-specific purposes, the exception may be different from the one thrown by the emit operation,
* but then it will be suppressed, as discussed below.
* suppressing the original exception as discussed below.
* If there is a need to emit values after the downstream failed, please use the [catch][Flow.catch] operator.
*
* The [catch][Flow.catch] operator only catches upstream exceptions, but passes
Expand All @@ -152,7 +152,9 @@ import kotlin.coroutines.*
* All exception-handling Flow operators follow the principle of exception suppression:
*
* If the upstream flow throws an exception during its completion when the downstream exception has been thrown,
* the upstream exception becomes superseded and suppressed by the downstream exception.
* the downstream exception becomes superseded and suppressed by the upstream exception, being a semantic
* equivalent of throwing from `finally` block. Exception-handling operators then ignore this exception,
* still following the downstream failure.
*
* Failure to adhere to the exception transparency requirement can lead to strange behaviors which make
* it hard to reason about the code because an exception in the `collect { ... }` could be somehow "caught"
Expand Down
11 changes: 5 additions & 6 deletions kotlinx-coroutines-core/common/src/flow/operators/Errors.kt
Expand Up @@ -214,8 +214,7 @@ internal suspend fun <T> Flow<T>.catchImpl(
* The exception came from the upstream [semi-] independently.
* For pure failures, when the downstream functions normally, we handle the exception as intended.
* But if the downstream has failed prior to or concurrently
* with the upstream, we forcefully rethrow it, preserving the contextual information and ensuring
* that it's not lost.
* with the upstream, we forcefully rethrow it, preserving the contextual information and ensuring that it's not lost.
*/
if (fromDownstream == null) {
return e
Expand All @@ -238,12 +237,12 @@ internal suspend fun <T> Flow<T>.catchImpl(
* ```
* when *the downstream* throws.
*/
if (fromDownstream is CancellationException) {
e.addSuppressed(fromDownstream)
throw e
} else {
if (e is CancellationException) {
fromDownstream.addSuppressed(e)
throw fromDownstream
} else {
e.addSuppressed(fromDownstream)
throw e
}
}
}
Expand Down
Expand Up @@ -160,7 +160,7 @@ class CatchTest : TestBase() {
throw TestException2()
}

assertFailsWith<TestException2>(flow)
assertFailsWith<TestException>(flow)
finish(4)
}

Expand Down
Expand Up @@ -120,7 +120,7 @@ class RetryTest : TestBase() {
throw TestException2()
}

assertFailsWith<TestException2>(flow)
assertFailsWith<TestException>(flow)
finish(4)
}

Expand Down
48 changes: 48 additions & 0 deletions kotlinx-coroutines-core/jvm/test/flow/FlowSuppressionTest.kt
@@ -0,0 +1,48 @@
/*
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.flow

import kotlinx.coroutines.*
import org.junit.Test
import kotlin.test.*

class FlowSuppressionTest : TestBase() {
@Test
fun testSuppressionForPrimaryException() = runTest {
val flow = flow {
try {
emit(1)
} finally {
throw TestException()
}
}.catch { expectUnreached() }.onEach { throw TestException2() }

try {
flow.collect()
} catch (e: Throwable) {
assertIs<TestException>(e)
assertIs<TestException2>(e.suppressed[0])
}
}

@Test
fun testSuppressionForPrimaryExceptionRetry() = runTest {
val flow = flow {
try {
emit(1)
} finally {
throw TestException()
}
}.retry { expectUnreached(); true }.onEach { throw TestException2() }

try {
flow.collect()
} catch (e: Throwable) {
assertIs<TestException>(e)
assertIs<TestException2>(e.suppressed[0])

}
}
}

0 comments on commit 935b83a

Please sign in to comment.