From ba5b343475f9b7ac01950b3b2e346191b6cd915a Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Mon, 6 Apr 2020 15:39:09 +0300 Subject: [PATCH] Introduce Flow.onEmpty operator Fixes #1890 --- .../api/kotlinx-coroutines-core.api | 1 + .../common/src/flow/operators/Emitters.kt | 33 ++++++++ .../common/test/flow/operators/OnEmptyTest.kt | 81 +++++++++++++++++++ 3 files changed, 115 insertions(+) create mode 100644 kotlinx-coroutines-core/common/test/flow/operators/OnEmptyTest.kt diff --git a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api index a6e5fd513e..54e355ec37 100644 --- a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api +++ b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api @@ -942,6 +942,7 @@ public final class kotlinx/coroutines/flow/FlowKt { public static final synthetic fun onCompletion (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow; public static final fun onCompletion (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow; public static final fun onEach (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow; + public static final fun onEmpty (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow; public static final fun onErrorCollect (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow; public static synthetic fun onErrorCollect$default (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow; public static final fun onErrorResume (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow; diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Emitters.kt b/kotlinx-coroutines-core/common/src/flow/operators/Emitters.kt index d8016e018d..aef51c8d93 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Emitters.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Emitters.kt @@ -155,6 +155,39 @@ public fun Flow.onCompletion( exception?.let { throw it } } +/** + * Invokes the given [action] when this flow completes without emitting any elements. + * The receiver of the [action] is [FlowCollector], so `onEmpty` can emit additional elements. + * + * For example: + * + * ``` + * emptyFlow().onEmpty { + * emit(1) + * emit(2) + * } + * .collect { println(it) } // prints 1, 2 + * ``` + */ +@ExperimentalCoroutinesApi +public fun Flow.onEmpty( + action: suspend FlowCollector.() -> Unit +): Flow = unsafeFlow { + var isEmpty = true + collect { + isEmpty = false + emit(it) + } + if (isEmpty) { + val collector = SafeCollector(this, coroutineContext) + try { + collector.action() + } finally { + collector.releaseIntercepted() + } + } +} + private class ThrowingCollector(private val e: Throwable) : FlowCollector { override suspend fun emit(value: Any?) { throw e diff --git a/kotlinx-coroutines-core/common/test/flow/operators/OnEmptyTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/OnEmptyTest.kt new file mode 100644 index 0000000000..1f70ae5780 --- /dev/null +++ b/kotlinx-coroutines-core/common/test/flow/operators/OnEmptyTest.kt @@ -0,0 +1,81 @@ +/* + * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.flow.operators + +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.* +import kotlin.test.* + +class OnEmptyTest : TestBase() { + + @Test + fun testOnEmptyInvoked() = runTest { + val flow = emptyFlow().onEmpty { emit(1) } + assertEquals(1, flow.single()) + } + + @Test + fun testOnEmptyNotInvoked() = runTest { + val flow = flowOf(1).onEmpty { emit(2) } + assertEquals(1, flow.single()) + } + + @Test + fun testOnEmptyNotInvokedOnError() = runTest { + val flow = flow { + throw TestException() + }.onEmpty { expectUnreached() } + assertFailsWith(flow) + } + + @Test + fun testOnEmptyNotInvokedOnCancellation() = runTest { + val flow = flow { + expect(2) + hang { expect(4) } + }.onEmpty { expectUnreached() } + + expect(1) + val job = flow.onEach { expectUnreached() }.launchIn(this) + yield() + expect(3) + job.cancelAndJoin() + finish(5) + } + + @Test + fun testOnEmptyCancellation() = runTest { + val flow = emptyFlow().onEmpty { + expect(2) + hang { expect(4) } + emit(1) + } + expect(1) + val job = flow.onEach { expectUnreached() }.launchIn(this) + yield() + expect(3) + job.cancelAndJoin() + finish(5) + } + + @Test + fun testOnEmptyTransparencyViolation() = runTest { + val flow = emptyFlow().onEmpty { + expect(2) + coroutineScope { + launch { + try { + emit(1) + } catch (e: IllegalStateException) { + expect(3) + } + } + } + } + expect(1) + assertNull(flow.singleOrNull()) + finish(4) + } +}