Skip to content

Commit

Permalink
Breaking change: Experimental Flow.onCompletion contract for cause (#…
Browse files Browse the repository at this point in the history
…1732)

Flow.onCompletion now reports all failures and cancellation in its cause just like invokeOnCompletion. A null cause is reported if and only if flow had completed successfully (no failure, no cancellation). Emission of additional elements after the end of the flow is only possible from inside of onCompletion block in case of successful completion.

Also fixed a bug where onCompletion implementation was trying to add exception to its own list of suppressed exceptions, which is not allowed.

Fixes #1693
  • Loading branch information
elizarov committed Apr 27, 2020
1 parent 5073704 commit eb4e7d3
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 40 deletions.
2 changes: 1 addition & 1 deletion coroutines-guide.md
Expand Up @@ -76,7 +76,7 @@ The main coroutines guide has moved to the [docs folder](docs/coroutines-guide.m
* <a name='flow-completion'></a>[Flow completion](docs/flow.md#flow-completion)
* <a name='imperative-finally-block'></a>[Imperative finally block](docs/flow.md#imperative-finally-block)
* <a name='declarative-handling'></a>[Declarative handling](docs/flow.md#declarative-handling)
* <a name='upstream-exceptions-only'></a>[Upstream exceptions only](docs/flow.md#upstream-exceptions-only)
* <a name='successful-completion'></a>[Successful completion](docs/flow.md#successful-completion)
* <a name='imperative-versus-declarative'></a>[Imperative versus declarative](docs/flow.md#imperative-versus-declarative)
* <a name='launching-flow'></a>[Launching flow](docs/flow.md#launching-flow)
* <a name='flow-and-reactive-streams'></a>[Flow and Reactive Streams](docs/flow.md#flow-and-reactive-streams)
Expand Down
12 changes: 6 additions & 6 deletions docs/flow.md
Expand Up @@ -39,7 +39,7 @@
* [Flow completion](#flow-completion)
* [Imperative finally block](#imperative-finally-block)
* [Declarative handling](#declarative-handling)
* [Upstream exceptions only](#upstream-exceptions-only)
* [Successful completion](#successful-completion)
* [Imperative versus declarative](#imperative-versus-declarative)
* [Launching flow](#launching-flow)
* [Flow and Reactive Streams](#flow-and-reactive-streams)
Expand Down Expand Up @@ -1635,10 +1635,10 @@ The [onCompletion] operator, unlike [catch], does not handle the exception. As w
example code, the exception still flows downstream. It will be delivered to further `onCompletion` operators
and can be handled with a `catch` operator.

#### Upstream exceptions only
#### Successful completion

Just like the [catch] operator, [onCompletion] only sees exceptions coming from upstream and does not
see downstream exceptions. For example, run the following code:
Another difference with [catch] operator is that [onCompletion] sees all exceptions and receives
a `null` exception only on successful completion of the upstream flow (without cancellation or failure).

<div class="sample" markdown="1" theme="idea" data-min-compiler-version="1.3">

Expand All @@ -1664,11 +1664,11 @@ fun main() = runBlocking<Unit> {

> You can get the full code from [here](../kotlinx-coroutines-core/jvm/test/guide/example-flow-34.kt).
We can see the completion cause is null, yet collection failed with exception:
We can see the completion cause is not null, because the flow was aborted due to downstream exception:

```text
1
Flow completed with null
Flow completed with java.lang.IllegalStateException: Collected 2
Exception in thread "main" java.lang.IllegalStateException: Collected 2
```

Expand Down
52 changes: 27 additions & 25 deletions kotlinx-coroutines-core/common/src/flow/operators/Emitters.kt
Expand Up @@ -81,8 +81,8 @@ public fun <T> Flow<T>.onStart(
}

/**
* Invokes the given [action] when the given flow is completed or cancelled, using
* the exception from the upstream (if any) as cause parameter of [action].
* Invokes the given [action] when the given flow is completed or cancelled, passing
* the cancellation exception or failure as cause parameter of [action].
*
* Conceptually, `onCompletion` is similar to wrapping the flow collection into a `finally` block,
* for example the following imperative snippet:
Expand All @@ -106,53 +106,55 @@ public fun <T> Flow<T>.onStart(
* .collect()
* ```
*
* This operator is *transparent* to exceptions that occur in downstream flow
* and does not observe exceptions that are thrown to cancel the flow,
* while any exception from the [action] will be thrown downstream.
* This behaviour can be demonstrated by the following example:
* Unlike [catch], this operator reports exception that occur both upstream and downstream
* and observe exceptions that are thrown to cancel the flow. Exception is empty if and only if
* the flow had fully completed successfully. Conceptually, the following code:
*
* ```
* flow { emitData() }
* .map { computeOne(it) }
* .onCompletion { println(it) } // Can print exceptions from emitData and computeOne
* .map { computeTwo(it) }
* .onCompletion { println(it) } // Can print exceptions from emitData, computeOne, onCompletion and computeTwo
* myFlow.collect { value ->
* println(value)
* }
* println("Completed successfully")
* ```
*
* can be replaced with:
*
* ```
* myFlow
* .onEach { println(it) }
* .onCompletion { if (it == null) println("Completed successfully") }
* .collect()
* ```
*
* The receiver of the [action] is [FlowCollector] and this operator can be used to emit additional
* elements at the end of the collection. For example:
* elements at the end if it completed successfully. For example:
*
* ```
* flowOf("a", "b", "c")
* .onCompletion { emit("Done") }
* .collect { println(it) } // prints a, b, c, Done
* ```
*
* In case of failure or cancellation, any attempt to emit additional elements throws the corresponding exception.
* Use [catch] if you need to suppress failure and replace it with emission of elements.
*/
@ExperimentalCoroutinesApi
public fun <T> Flow<T>.onCompletion(
action: suspend FlowCollector<T>.(cause: Throwable?) -> Unit
): Flow<T> = unsafeFlow { // Note: unsafe flow is used here, but safe collector is used to invoke completion action
val exception = try {
catchImpl(this)
try {
collect(this)
} catch (e: Throwable) {
/*
* Exception from the downstream.
* Use throwing collector to prevent any emissions from the
* completion sequence when downstream has failed, otherwise it may
* lead to a non-sequential behaviour impossible with `finally`
*/
ThrowingCollector(e).invokeSafely(action, null)
ThrowingCollector(e).invokeSafely(action, e)
throw e
}
// Exception from the upstream or normal completion
val safeCollector = SafeCollector(this, coroutineContext)
try {
safeCollector.invokeSafely(action, exception)
} finally {
safeCollector.releaseIntercepted()
}
exception?.let { throw it }
// Normal completion
SafeCollector(this, coroutineContext).invokeSafely(action, null)
}

/**
Expand Down Expand Up @@ -205,7 +207,7 @@ private suspend fun <T> FlowCollector<T>.invokeSafely(
try {
action(cause)
} catch (e: Throwable) {
if (cause !== null) e.addSuppressedThrowable(cause)
if (cause !== null && cause !== e) e.addSuppressedThrowable(cause)
throw e
}
}
Expand Up @@ -50,7 +50,7 @@ class OnCompletionTest : TestBase() {
}.onEach {
expect(2)
}.onCompletion {
assertNull(it)
assertTrue(it is TestException) // flow fails because of this exception
expect(4)
}.onEach {
expect(3)
Expand All @@ -65,7 +65,7 @@ class OnCompletionTest : TestBase() {
@Test
fun testMultipleOnCompletions() = runTest {
flowOf(1).onCompletion {
assertNull(it)
assertTrue(it is TestException)
expect(2)
}.onEach {
expect(1)
Expand Down Expand Up @@ -145,12 +145,12 @@ class OnCompletionTest : TestBase() {
.onCompletion { e ->
expect(8)
assertTrue(e is TestException)
emit(TestData.Done(e))
emit(TestData.Done(e)) // will fail
}.collect {
collected += it
}
}
val expected = (1..5).map { TestData.Value(it) } + TestData.Done(TestException("OK"))
val expected: List<TestData> = (1..5).map { TestData.Value(it) }
assertEquals(expected, collected)
finish(9)
}
Expand All @@ -171,7 +171,7 @@ class OnCompletionTest : TestBase() {
}
.onCompletion { e ->
expect(8)
assertNull(e)
assertTrue(e is CancellationException)
try {
emit(TestData.Done(e))
expectUnreached()
Expand All @@ -197,7 +197,7 @@ class OnCompletionTest : TestBase() {
emit(TestData.Value(2))
expectUnreached()
}.onCompletion {
assertNull(it)
assertSame(cause, it) // flow failed because of the exception in downstream
expect(3)
try {
emit(TestData.Done(it))
Expand All @@ -216,7 +216,7 @@ class OnCompletionTest : TestBase() {
@Test
fun testFirst() = runTest {
val value = flowOf(239).onCompletion {
assertNull(it)
assertNotNull(it) // the flow did not complete normally
expect(1)
try {
emit(42)
Expand Down Expand Up @@ -278,4 +278,16 @@ class OnCompletionTest : TestBase() {
assertNull(flow.singleOrNull())
finish(4)
}

@Test
fun testTakeOnCompletion() = runTest {
// even though it uses "take" from the outside it completes normally
val flow = (1..10).asFlow().take(5)
val result = flow.onCompletion { cause ->
assertNull(cause)
emit(-1)
}.toList()
val expected = (1..5).toList() + (-1)
assertEquals(expected, result)
}
}
Expand Up @@ -357,7 +357,7 @@ class FlowGuideTest {
fun testExampleFlow34() {
test("ExampleFlow34") { kotlinx.coroutines.guide.exampleFlow34.main() }.verifyExceptions(
"1",
"Flow completed with null",
"Flow completed with java.lang.IllegalStateException: Collected 2",
"Exception in thread \"main\" java.lang.IllegalStateException: Collected 2"
)
}
Expand Down

0 comments on commit eb4e7d3

Please sign in to comment.