diff --git a/coroutines-guide.md b/coroutines-guide.md index bebd0d0e3c..0b7b842acb 100644 --- a/coroutines-guide.md +++ b/coroutines-guide.md @@ -76,7 +76,7 @@ The main coroutines guide has moved to the [docs folder](docs/coroutines-guide.m * [Flow completion](docs/flow.md#flow-completion) * [Imperative finally block](docs/flow.md#imperative-finally-block) * [Declarative handling](docs/flow.md#declarative-handling) - * [Upstream exceptions only](docs/flow.md#upstream-exceptions-only) + * [Successful completion](docs/flow.md#successful-completion) * [Imperative versus declarative](docs/flow.md#imperative-versus-declarative) * [Launching flow](docs/flow.md#launching-flow) * [Flow and Reactive Streams](docs/flow.md#flow-and-reactive-streams) diff --git a/docs/flow.md b/docs/flow.md index 37e491cf51..3f14b10417 100644 --- a/docs/flow.md +++ b/docs/flow.md @@ -39,7 +39,7 @@ * [Flow completion](#flow-completion) * [Imperative finally block](#imperative-finally-block) * [Declarative handling](#declarative-handling) - * [Upstream exceptions only](#upstream-exceptions-only) + * [Successful completion](#successful-completion) * [Imperative versus declarative](#imperative-versus-declarative) * [Launching flow](#launching-flow) * [Flow and Reactive Streams](#flow-and-reactive-streams) @@ -1635,10 +1635,10 @@ The [onCompletion] operator, unlike [catch], does not handle the exception. As w example code, the exception still flows downstream. It will be delivered to further `onCompletion` operators and can be handled with a `catch` operator. -#### Upstream exceptions only +#### Successful completion -Just like the [catch] operator, [onCompletion] only sees exceptions coming from upstream and does not -see downstream exceptions. For example, run the following code: +Another difference with [catch] operator is that [onCompletion] sees all exceptions and receives +a `null` exception only on successful completion of the upstream flow (without cancellation or failure).
@@ -1664,11 +1664,11 @@ fun main() = runBlocking { > You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-34.kt). -We can see the completion cause is null, yet collection failed with exception: +We can see the completion cause is not null, because the flow was aborted due to downstream exception: ```text 1 -Flow completed with null +Flow completed with java.lang.IllegalStateException: Collected 2 Exception in thread "main" java.lang.IllegalStateException: Collected 2 ``` diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Emitters.kt b/kotlinx-coroutines-core/common/src/flow/operators/Emitters.kt index cbed2df929..93530234eb 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Emitters.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Emitters.kt @@ -81,8 +81,8 @@ public fun Flow.onStart( } /** - * Invokes the given [action] when the given flow is completed or cancelled, using - * the exception from the upstream (if any) as cause parameter of [action]. + * Invokes the given [action] when the given flow is completed or cancelled, passing + * the cancellation exception or failure as cause parameter of [action]. * * Conceptually, `onCompletion` is similar to wrapping the flow collection into a `finally` block, * for example the following imperative snippet: @@ -106,53 +106,55 @@ public fun Flow.onStart( * .collect() * ``` * - * This operator is *transparent* to exceptions that occur in downstream flow - * and does not observe exceptions that are thrown to cancel the flow, - * while any exception from the [action] will be thrown downstream. - * This behaviour can be demonstrated by the following example: + * Unlike [catch], this operator reports exception that occur both upstream and downstream + * and observe exceptions that are thrown to cancel the flow. Exception is empty if and only if + * the flow had fully completed successfully. Conceptually, the following code: * * ``` - * flow { emitData() } - * .map { computeOne(it) } - * .onCompletion { println(it) } // Can print exceptions from emitData and computeOne - * .map { computeTwo(it) } - * .onCompletion { println(it) } // Can print exceptions from emitData, computeOne, onCompletion and computeTwo + * myFlow.collect { value -> + * println(value) + * } + * println("Completed successfully") + * ``` + * + * can be replaced with: + * + * ``` + * myFlow + * .onEach { println(it) } + * .onCompletion { if (it == null) println("Completed successfully") } * .collect() * ``` * * The receiver of the [action] is [FlowCollector] and this operator can be used to emit additional - * elements at the end of the collection. For example: + * elements at the end if it completed successfully. For example: * * ``` * flowOf("a", "b", "c") * .onCompletion { emit("Done") } * .collect { println(it) } // prints a, b, c, Done * ``` + * + * In case of failure or cancellation, any attempt to emit additional elements throws the corresponding exception. + * Use [catch] if you need to suppress failure and replace it with emission of elements. */ @ExperimentalCoroutinesApi public fun Flow.onCompletion( action: suspend FlowCollector.(cause: Throwable?) -> Unit ): Flow = unsafeFlow { // Note: unsafe flow is used here, but safe collector is used to invoke completion action - val exception = try { - catchImpl(this) + try { + collect(this) } catch (e: Throwable) { /* - * Exception from the downstream. * Use throwing collector to prevent any emissions from the * completion sequence when downstream has failed, otherwise it may * lead to a non-sequential behaviour impossible with `finally` */ - ThrowingCollector(e).invokeSafely(action, null) + ThrowingCollector(e).invokeSafely(action, e) throw e } - // Exception from the upstream or normal completion - val safeCollector = SafeCollector(this, coroutineContext) - try { - safeCollector.invokeSafely(action, exception) - } finally { - safeCollector.releaseIntercepted() - } - exception?.let { throw it } + // Normal completion + SafeCollector(this, coroutineContext).invokeSafely(action, null) } /** @@ -205,7 +207,7 @@ private suspend fun FlowCollector.invokeSafely( try { action(cause) } catch (e: Throwable) { - if (cause !== null) e.addSuppressedThrowable(cause) + if (cause !== null && cause !== e) e.addSuppressedThrowable(cause) throw e } } diff --git a/kotlinx-coroutines-core/common/test/flow/operators/OnCompletionTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/OnCompletionTest.kt index f56632d5fd..7f0c548ca6 100644 --- a/kotlinx-coroutines-core/common/test/flow/operators/OnCompletionTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/operators/OnCompletionTest.kt @@ -50,7 +50,7 @@ class OnCompletionTest : TestBase() { }.onEach { expect(2) }.onCompletion { - assertNull(it) + assertTrue(it is TestException) // flow fails because of this exception expect(4) }.onEach { expect(3) @@ -65,7 +65,7 @@ class OnCompletionTest : TestBase() { @Test fun testMultipleOnCompletions() = runTest { flowOf(1).onCompletion { - assertNull(it) + assertTrue(it is TestException) expect(2) }.onEach { expect(1) @@ -145,12 +145,12 @@ class OnCompletionTest : TestBase() { .onCompletion { e -> expect(8) assertTrue(e is TestException) - emit(TestData.Done(e)) + emit(TestData.Done(e)) // will fail }.collect { collected += it } } - val expected = (1..5).map { TestData.Value(it) } + TestData.Done(TestException("OK")) + val expected: List = (1..5).map { TestData.Value(it) } assertEquals(expected, collected) finish(9) } @@ -171,7 +171,7 @@ class OnCompletionTest : TestBase() { } .onCompletion { e -> expect(8) - assertNull(e) + assertTrue(e is CancellationException) try { emit(TestData.Done(e)) expectUnreached() @@ -197,7 +197,7 @@ class OnCompletionTest : TestBase() { emit(TestData.Value(2)) expectUnreached() }.onCompletion { - assertNull(it) + assertSame(cause, it) // flow failed because of the exception in downstream expect(3) try { emit(TestData.Done(it)) @@ -216,7 +216,7 @@ class OnCompletionTest : TestBase() { @Test fun testFirst() = runTest { val value = flowOf(239).onCompletion { - assertNull(it) + assertNotNull(it) // the flow did not complete normally expect(1) try { emit(42) @@ -278,4 +278,16 @@ class OnCompletionTest : TestBase() { assertNull(flow.singleOrNull()) finish(4) } + + @Test + fun testTakeOnCompletion() = runTest { + // even though it uses "take" from the outside it completes normally + val flow = (1..10).asFlow().take(5) + val result = flow.onCompletion { cause -> + assertNull(cause) + emit(-1) + }.toList() + val expected = (1..5).toList() + (-1) + assertEquals(expected, result) + } } diff --git a/kotlinx-coroutines-core/jvm/test/guide/test/FlowGuideTest.kt b/kotlinx-coroutines-core/jvm/test/guide/test/FlowGuideTest.kt index 5d320f2124..39a7853b5f 100644 --- a/kotlinx-coroutines-core/jvm/test/guide/test/FlowGuideTest.kt +++ b/kotlinx-coroutines-core/jvm/test/guide/test/FlowGuideTest.kt @@ -357,7 +357,7 @@ class FlowGuideTest { fun testExampleFlow34() { test("ExampleFlow34") { kotlinx.coroutines.guide.exampleFlow34.main() }.verifyExceptions( "1", - "Flow completed with null", + "Flow completed with java.lang.IllegalStateException: Collected 2", "Exception in thread \"main\" java.lang.IllegalStateException: Collected 2" ) }