From eb4e7d32dadb6962372e625cb684e84857650169 Mon Sep 17 00:00:00 2001 From: Roman Elizarov Date: Mon, 27 Apr 2020 15:04:00 +0300 Subject: [PATCH] Breaking change: Experimental Flow.onCompletion contract for cause (#1732) Flow.onCompletion now reports all failures and cancellation in its cause just like invokeOnCompletion. A null cause is reported if and only if flow had completed successfully (no failure, no cancellation). Emission of additional elements after the end of the flow is only possible from inside of onCompletion block in case of successful completion. Also fixed a bug where onCompletion implementation was trying to add exception to its own list of suppressed exceptions, which is not allowed. Fixes #1693 --- coroutines-guide.md | 2 +- docs/flow.md | 12 ++--- .../common/src/flow/operators/Emitters.kt | 52 ++++++++++--------- .../test/flow/operators/OnCompletionTest.kt | 26 +++++++--- .../jvm/test/guide/test/FlowGuideTest.kt | 2 +- 5 files changed, 54 insertions(+), 40 deletions(-) diff --git a/coroutines-guide.md b/coroutines-guide.md index bebd0d0e3c..0b7b842acb 100644 --- a/coroutines-guide.md +++ b/coroutines-guide.md @@ -76,7 +76,7 @@ The main coroutines guide has moved to the [docs folder](docs/coroutines-guide.m * [Flow completion](docs/flow.md#flow-completion) * [Imperative finally block](docs/flow.md#imperative-finally-block) * [Declarative handling](docs/flow.md#declarative-handling) - * [Upstream exceptions only](docs/flow.md#upstream-exceptions-only) + * [Successful completion](docs/flow.md#successful-completion) * [Imperative versus declarative](docs/flow.md#imperative-versus-declarative) * [Launching flow](docs/flow.md#launching-flow) * [Flow and Reactive Streams](docs/flow.md#flow-and-reactive-streams) diff --git a/docs/flow.md b/docs/flow.md index 37e491cf51..3f14b10417 100644 --- a/docs/flow.md +++ b/docs/flow.md @@ -39,7 +39,7 @@ * [Flow completion](#flow-completion) * [Imperative finally block](#imperative-finally-block) * [Declarative handling](#declarative-handling) - * [Upstream exceptions only](#upstream-exceptions-only) + * [Successful completion](#successful-completion) * [Imperative versus declarative](#imperative-versus-declarative) * [Launching flow](#launching-flow) * [Flow and Reactive Streams](#flow-and-reactive-streams) @@ -1635,10 +1635,10 @@ The [onCompletion] operator, unlike [catch], does not handle the exception. As w example code, the exception still flows downstream. It will be delivered to further `onCompletion` operators and can be handled with a `catch` operator. -#### Upstream exceptions only +#### Successful completion -Just like the [catch] operator, [onCompletion] only sees exceptions coming from upstream and does not -see downstream exceptions. For example, run the following code: +Another difference with [catch] operator is that [onCompletion] sees all exceptions and receives +a `null` exception only on successful completion of the upstream flow (without cancellation or failure).
@@ -1664,11 +1664,11 @@ fun main() = runBlocking { > You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-34.kt). -We can see the completion cause is null, yet collection failed with exception: +We can see the completion cause is not null, because the flow was aborted due to downstream exception: ```text 1 -Flow completed with null +Flow completed with java.lang.IllegalStateException: Collected 2 Exception in thread "main" java.lang.IllegalStateException: Collected 2 ``` diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Emitters.kt b/kotlinx-coroutines-core/common/src/flow/operators/Emitters.kt index cbed2df929..93530234eb 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Emitters.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Emitters.kt @@ -81,8 +81,8 @@ public fun Flow.onStart( } /** - * Invokes the given [action] when the given flow is completed or cancelled, using - * the exception from the upstream (if any) as cause parameter of [action]. + * Invokes the given [action] when the given flow is completed or cancelled, passing + * the cancellation exception or failure as cause parameter of [action]. * * Conceptually, `onCompletion` is similar to wrapping the flow collection into a `finally` block, * for example the following imperative snippet: @@ -106,53 +106,55 @@ public fun Flow.onStart( * .collect() * ``` * - * This operator is *transparent* to exceptions that occur in downstream flow - * and does not observe exceptions that are thrown to cancel the flow, - * while any exception from the [action] will be thrown downstream. - * This behaviour can be demonstrated by the following example: + * Unlike [catch], this operator reports exception that occur both upstream and downstream + * and observe exceptions that are thrown to cancel the flow. Exception is empty if and only if + * the flow had fully completed successfully. Conceptually, the following code: * * ``` - * flow { emitData() } - * .map { computeOne(it) } - * .onCompletion { println(it) } // Can print exceptions from emitData and computeOne - * .map { computeTwo(it) } - * .onCompletion { println(it) } // Can print exceptions from emitData, computeOne, onCompletion and computeTwo + * myFlow.collect { value -> + * println(value) + * } + * println("Completed successfully") + * ``` + * + * can be replaced with: + * + * ``` + * myFlow + * .onEach { println(it) } + * .onCompletion { if (it == null) println("Completed successfully") } * .collect() * ``` * * The receiver of the [action] is [FlowCollector] and this operator can be used to emit additional - * elements at the end of the collection. For example: + * elements at the end if it completed successfully. For example: * * ``` * flowOf("a", "b", "c") * .onCompletion { emit("Done") } * .collect { println(it) } // prints a, b, c, Done * ``` + * + * In case of failure or cancellation, any attempt to emit additional elements throws the corresponding exception. + * Use [catch] if you need to suppress failure and replace it with emission of elements. */ @ExperimentalCoroutinesApi public fun Flow.onCompletion( action: suspend FlowCollector.(cause: Throwable?) -> Unit ): Flow = unsafeFlow { // Note: unsafe flow is used here, but safe collector is used to invoke completion action - val exception = try { - catchImpl(this) + try { + collect(this) } catch (e: Throwable) { /* - * Exception from the downstream. * Use throwing collector to prevent any emissions from the * completion sequence when downstream has failed, otherwise it may * lead to a non-sequential behaviour impossible with `finally` */ - ThrowingCollector(e).invokeSafely(action, null) + ThrowingCollector(e).invokeSafely(action, e) throw e } - // Exception from the upstream or normal completion - val safeCollector = SafeCollector(this, coroutineContext) - try { - safeCollector.invokeSafely(action, exception) - } finally { - safeCollector.releaseIntercepted() - } - exception?.let { throw it } + // Normal completion + SafeCollector(this, coroutineContext).invokeSafely(action, null) } /** @@ -205,7 +207,7 @@ private suspend fun FlowCollector.invokeSafely( try { action(cause) } catch (e: Throwable) { - if (cause !== null) e.addSuppressedThrowable(cause) + if (cause !== null && cause !== e) e.addSuppressedThrowable(cause) throw e } } diff --git a/kotlinx-coroutines-core/common/test/flow/operators/OnCompletionTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/OnCompletionTest.kt index f56632d5fd..7f0c548ca6 100644 --- a/kotlinx-coroutines-core/common/test/flow/operators/OnCompletionTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/operators/OnCompletionTest.kt @@ -50,7 +50,7 @@ class OnCompletionTest : TestBase() { }.onEach { expect(2) }.onCompletion { - assertNull(it) + assertTrue(it is TestException) // flow fails because of this exception expect(4) }.onEach { expect(3) @@ -65,7 +65,7 @@ class OnCompletionTest : TestBase() { @Test fun testMultipleOnCompletions() = runTest { flowOf(1).onCompletion { - assertNull(it) + assertTrue(it is TestException) expect(2) }.onEach { expect(1) @@ -145,12 +145,12 @@ class OnCompletionTest : TestBase() { .onCompletion { e -> expect(8) assertTrue(e is TestException) - emit(TestData.Done(e)) + emit(TestData.Done(e)) // will fail }.collect { collected += it } } - val expected = (1..5).map { TestData.Value(it) } + TestData.Done(TestException("OK")) + val expected: List = (1..5).map { TestData.Value(it) } assertEquals(expected, collected) finish(9) } @@ -171,7 +171,7 @@ class OnCompletionTest : TestBase() { } .onCompletion { e -> expect(8) - assertNull(e) + assertTrue(e is CancellationException) try { emit(TestData.Done(e)) expectUnreached() @@ -197,7 +197,7 @@ class OnCompletionTest : TestBase() { emit(TestData.Value(2)) expectUnreached() }.onCompletion { - assertNull(it) + assertSame(cause, it) // flow failed because of the exception in downstream expect(3) try { emit(TestData.Done(it)) @@ -216,7 +216,7 @@ class OnCompletionTest : TestBase() { @Test fun testFirst() = runTest { val value = flowOf(239).onCompletion { - assertNull(it) + assertNotNull(it) // the flow did not complete normally expect(1) try { emit(42) @@ -278,4 +278,16 @@ class OnCompletionTest : TestBase() { assertNull(flow.singleOrNull()) finish(4) } + + @Test + fun testTakeOnCompletion() = runTest { + // even though it uses "take" from the outside it completes normally + val flow = (1..10).asFlow().take(5) + val result = flow.onCompletion { cause -> + assertNull(cause) + emit(-1) + }.toList() + val expected = (1..5).toList() + (-1) + assertEquals(expected, result) + } } diff --git a/kotlinx-coroutines-core/jvm/test/guide/test/FlowGuideTest.kt b/kotlinx-coroutines-core/jvm/test/guide/test/FlowGuideTest.kt index 5d320f2124..39a7853b5f 100644 --- a/kotlinx-coroutines-core/jvm/test/guide/test/FlowGuideTest.kt +++ b/kotlinx-coroutines-core/jvm/test/guide/test/FlowGuideTest.kt @@ -357,7 +357,7 @@ class FlowGuideTest { fun testExampleFlow34() { test("ExampleFlow34") { kotlinx.coroutines.guide.exampleFlow34.main() }.verifyExceptions( "1", - "Flow completed with null", + "Flow completed with java.lang.IllegalStateException: Collected 2", "Exception in thread \"main\" java.lang.IllegalStateException: Collected 2" ) }