From 20be12a564523c9c5489d89197232e29385af5df Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Tue, 7 Apr 2020 17:32:00 +0300 Subject: [PATCH] Flow onEmpty (#1904) * Introduce Flow.onEmpty operator Fixes #1890 --- .../api/kotlinx-coroutines-core.api | 1 + .../common/src/flow/operators/Emitters.kt | 39 ++++++++- .../test/flow/operators/OnCompletionTest.kt | 19 +++++ .../common/test/flow/operators/OnEmptyTest.kt | 81 +++++++++++++++++++ .../common/test/flow/operators/OnStartTest.kt | 21 ++++- 5 files changed, 156 insertions(+), 5 deletions(-) create mode 100644 kotlinx-coroutines-core/common/test/flow/operators/OnEmptyTest.kt diff --git a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api index a6e5fd513e..54e355ec37 100644 --- a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api +++ b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api @@ -942,6 +942,7 @@ public final class kotlinx/coroutines/flow/FlowKt { public static final synthetic fun onCompletion (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow; public static final fun onCompletion (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow; public static final fun onEach (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow; + public static final fun onEmpty (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow; public static final fun onErrorCollect (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow; public static synthetic fun onErrorCollect$default (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow; public static final fun onErrorResume (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow; diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Emitters.kt b/kotlinx-coroutines-core/common/src/flow/operators/Emitters.kt index d8016e018d..cbed2df929 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Emitters.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Emitters.kt @@ -56,9 +56,9 @@ internal inline fun Flow.unsafeTransform( } /** - * Invokes the given [action] when the this flow starts to be collected. + * Invokes the given [action] when this flow starts to be collected. * - * The receiver of the [action] is [FlowCollector] and thus `onStart` can emit additional elements. + * The receiver of the [action] is [FlowCollector], so `onStart` can emit additional elements. * For example: * * ``` @@ -67,7 +67,7 @@ internal inline fun Flow.unsafeTransform( * .collect { println(it) } // prints Begin, a, b, c * ``` */ -@ExperimentalCoroutinesApi // tentatively stable in 1.3.0 +@ExperimentalCoroutinesApi public fun Flow.onStart( action: suspend FlowCollector.() -> Unit ): Flow = unsafeFlow { // Note: unsafe flow is used here, but safe collector is used to invoke start action @@ -129,7 +129,7 @@ public fun Flow.onStart( * .collect { println(it) } // prints a, b, c, Done * ``` */ -@ExperimentalCoroutinesApi // tentatively stable in 1.3.0 +@ExperimentalCoroutinesApi public fun Flow.onCompletion( action: suspend FlowCollector.(cause: Throwable?) -> Unit ): Flow = unsafeFlow { // Note: unsafe flow is used here, but safe collector is used to invoke completion action @@ -155,6 +155,37 @@ public fun Flow.onCompletion( exception?.let { throw it } } +/** + * Invokes the given [action] when this flow completes without emitting any elements. + * The receiver of the [action] is [FlowCollector], so `onEmpty` can emit additional elements. + * For example: + * + * ``` + * emptyFlow().onEmpty { + * emit(1) + * emit(2) + * }.collect { println(it) } // prints 1, 2 + * ``` + */ +@ExperimentalCoroutinesApi +public fun Flow.onEmpty( + action: suspend FlowCollector.() -> Unit +): Flow = unsafeFlow { + var isEmpty = true + collect { + isEmpty = false + emit(it) + } + if (isEmpty) { + val collector = SafeCollector(this, coroutineContext) + try { + collector.action() + } finally { + collector.releaseIntercepted() + } + } +} + private class ThrowingCollector(private val e: Throwable) : FlowCollector { override suspend fun emit(value: Any?) { throw e diff --git a/kotlinx-coroutines-core/common/test/flow/operators/OnCompletionTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/OnCompletionTest.kt index c079500ef7..f56632d5fd 100644 --- a/kotlinx-coroutines-core/common/test/flow/operators/OnCompletionTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/operators/OnCompletionTest.kt @@ -259,4 +259,23 @@ class OnCompletionTest : TestBase() { assertEquals(42, value) finish(2) } + + @Test + fun testTransparencyViolation() = runTest { + val flow = emptyFlow().onCompletion { + expect(2) + coroutineScope { + launch { + try { + emit(1) + } catch (e: IllegalStateException) { + expect(3) + } + } + } + } + expect(1) + assertNull(flow.singleOrNull()) + finish(4) + } } diff --git a/kotlinx-coroutines-core/common/test/flow/operators/OnEmptyTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/OnEmptyTest.kt new file mode 100644 index 0000000000..3da166645b --- /dev/null +++ b/kotlinx-coroutines-core/common/test/flow/operators/OnEmptyTest.kt @@ -0,0 +1,81 @@ +/* + * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.flow.operators + +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.* +import kotlin.test.* + +class OnEmptyTest : TestBase() { + + @Test + fun testOnEmptyInvoked() = runTest { + val flow = emptyFlow().onEmpty { emit(1) } + assertEquals(1, flow.single()) + } + + @Test + fun testOnEmptyNotInvoked() = runTest { + val flow = flowOf(1).onEmpty { emit(2) } + assertEquals(1, flow.single()) + } + + @Test + fun testOnEmptyNotInvokedOnError() = runTest { + val flow = flow { + throw TestException() + }.onEmpty { expectUnreached() } + assertFailsWith(flow) + } + + @Test + fun testOnEmptyNotInvokedOnCancellation() = runTest { + val flow = flow { + expect(2) + hang { expect(4) } + }.onEmpty { expectUnreached() } + + expect(1) + val job = flow.onEach { expectUnreached() }.launchIn(this) + yield() + expect(3) + job.cancelAndJoin() + finish(5) + } + + @Test + fun testOnEmptyCancellation() = runTest { + val flow = emptyFlow().onEmpty { + expect(2) + hang { expect(4) } + emit(1) + } + expect(1) + val job = flow.onEach { expectUnreached() }.launchIn(this) + yield() + expect(3) + job.cancelAndJoin() + finish(5) + } + + @Test + fun testTransparencyViolation() = runTest { + val flow = emptyFlow().onEmpty { + expect(2) + coroutineScope { + launch { + try { + emit(1) + } catch (e: IllegalStateException) { + expect(3) + } + } + } + } + expect(1) + assertNull(flow.singleOrNull()) + finish(4) + } +} diff --git a/kotlinx-coroutines-core/common/test/flow/operators/OnStartTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/OnStartTest.kt index a0981ab042..0443e56a2c 100644 --- a/kotlinx-coroutines-core/common/test/flow/operators/OnStartTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/operators/OnStartTest.kt @@ -14,4 +14,23 @@ class OnStartTest : TestBase() { .onStart { emit("Begin") } assertEquals(listOf("Begin", "a", "b", "c"), flow.toList()) } -} \ No newline at end of file + + @Test + fun testTransparencyViolation() = runTest { + val flow = emptyFlow().onStart { + expect(2) + coroutineScope { + launch { + try { + emit(1) + } catch (e: IllegalStateException) { + expect(3) + } + } + } + } + expect(1) + assertNull(flow.singleOrNull()) + finish(4) + } +}