From e11c4a24d2ec0657deaf4c5f517acd898cfbed24 Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Thu, 8 Oct 2020 02:50:44 -0700 Subject: [PATCH] Pr/2230 (#2287) * Allow nullable types in Flow.firstOrNull * Allow nullable types in Flow.singleOrNull * Align Flow.single and Flow.singleOrNull with Kotlin standard library Fixes #2229 Fixes #2289 Co-authored-by: Nicklas Ansman Giertz --- .../common/src/flow/terminal/Reduce.kt | 37 +++++++++++-------- .../test/flow/operators/OnCompletionTest.kt | 4 +- .../common/test/flow/terminal/FirstTest.kt | 6 +++ .../common/test/flow/terminal/SingleTest.kt | 27 ++++++++++++-- 4 files changed, 54 insertions(+), 20 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt b/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt index d36e1bbf7b..83f5498e4d 100644 --- a/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt +++ b/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt @@ -9,6 +9,7 @@ package kotlinx.coroutines.flow import kotlinx.coroutines.flow.internal.* +import kotlinx.coroutines.internal.Symbol import kotlin.jvm.* /** @@ -47,33 +48,39 @@ public suspend inline fun Flow.fold( } /** - * The terminal operator, that awaits for one and only one value to be published. + * The terminal operator that awaits for one and only one value to be emitted. * Throws [NoSuchElementException] for empty flow and [IllegalStateException] for flow * that contains more than one element. */ public suspend fun Flow.single(): T { var result: Any? = NULL collect { value -> - if (result !== NULL) error("Expected only one element") + require(result === NULL) { "Flow has more than one element" } result = value } - if (result === NULL) throw NoSuchElementException("Expected at least one element") - @Suppress("UNCHECKED_CAST") + if (result === NULL) throw NoSuchElementException("Flow is empty") return result as T } /** - * The terminal operator, that awaits for one and only one value to be published. - * Throws [IllegalStateException] for flow that contains more than one element. + * The terminal operator that awaits for one and only one value to be emitted. + * Returns the single value or `null`, if the flow was empty or emitted more than one value. */ -public suspend fun Flow.singleOrNull(): T? { - var result: T? = null - collect { value -> - if (result != null) error("Expected only one element") - result = value +public suspend fun Flow.singleOrNull(): T? { + var result: Any? = NULL + collectWhile { + // No values yet, update result + if (result === NULL) { + result = it + true + } else { + // Second value, reset result and bail out + result = NULL + false + } } - return result + return if (result === NULL) null else result as T } /** @@ -112,7 +119,7 @@ public suspend fun Flow.first(predicate: suspend (T) -> Boolean): T { * The terminal operator that returns the first element emitted by the flow and then cancels flow's collection. * Returns `null` if the flow was empty. */ -public suspend fun Flow.firstOrNull(): T? { +public suspend fun Flow.firstOrNull(): T? { var result: T? = null collectWhile { result = it @@ -122,10 +129,10 @@ public suspend fun Flow.firstOrNull(): T? { } /** - * The terminal operator that returns the first element emitted by the flow matching the given [predicate] and then cancels flow's collection. + * The terminal operator that returns the first element emitted by the flow matching the given [predicate] and then cancels flow's collection. * Returns `null` if the flow did not contain an element matching the [predicate]. */ -public suspend fun Flow.firstOrNull(predicate: suspend (T) -> Boolean): T? { +public suspend fun Flow.firstOrNull(predicate: suspend (T) -> Boolean): T? { var result: T? = null collectWhile { if (predicate(it)) { diff --git a/kotlinx-coroutines-core/common/test/flow/operators/OnCompletionTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/OnCompletionTest.kt index 7f0c548ca6..f55e8beeb2 100644 --- a/kotlinx-coroutines-core/common/test/flow/operators/OnCompletionTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/operators/OnCompletionTest.kt @@ -231,7 +231,7 @@ class OnCompletionTest : TestBase() { @Test fun testSingle() = runTest { - assertFailsWith { + assertFailsWith { flowOf(239).onCompletion { assertNull(it) expect(1) @@ -240,7 +240,7 @@ class OnCompletionTest : TestBase() { expectUnreached() } catch (e: Throwable) { // Second emit -- failure - assertTrue { e is IllegalStateException } + assertTrue { e is IllegalArgumentException } throw e } }.single() diff --git a/kotlinx-coroutines-core/common/test/flow/terminal/FirstTest.kt b/kotlinx-coroutines-core/common/test/flow/terminal/FirstTest.kt index edb9f00fa6..fa7fc9cb6c 100644 --- a/kotlinx-coroutines-core/common/test/flow/terminal/FirstTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/terminal/FirstTest.kt @@ -128,6 +128,12 @@ class FirstTest : TestBase() { assertNull(emptyFlow().firstOrNull { true }) } + @Test + fun testFirstOrNullWithNullElement() = runTest { + assertNull(flowOf(null).firstOrNull()) + assertNull(flowOf(null).firstOrNull { true }) + } + @Test fun testFirstOrNullWhenErrorCancelsUpstream() = runTest { val latch = Channel() diff --git a/kotlinx-coroutines-core/common/test/flow/terminal/SingleTest.kt b/kotlinx-coroutines-core/common/test/flow/terminal/SingleTest.kt index 4e89b93bd7..2c1277b1e1 100644 --- a/kotlinx-coroutines-core/common/test/flow/terminal/SingleTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/terminal/SingleTest.kt @@ -7,7 +7,7 @@ package kotlinx.coroutines.flow import kotlinx.coroutines.* import kotlin.test.* -class SingleTest : TestBase() { +class SingleTest : TestBase() { @Test fun testSingle() = runTest { @@ -25,8 +25,8 @@ class SingleTest : TestBase() { emit(239L) emit(240L) } - assertFailsWith { flow.single() } - assertFailsWith { flow.singleOrNull() } + assertFailsWith { flow.single() } + assertNull(flow.singleOrNull()) } @Test @@ -61,6 +61,10 @@ class SingleTest : TestBase() { assertEquals(1, flowOf(1).single()) assertNull(flowOf(null).single()) assertFailsWith { flowOf().single() } + + assertEquals(1, flowOf(1).singleOrNull()) + assertNull(flowOf(null).singleOrNull()) + assertNull(flowOf().singleOrNull()) } @Test @@ -69,5 +73,22 @@ class SingleTest : TestBase() { val flow = flowOf(instance) assertSame(instance, flow.single()) assertSame(instance, flow.singleOrNull()) + + val flow2 = flow { + emit(BadClass()) + emit(BadClass()) + } + assertFailsWith { flow2.single() } + } + + @Test + fun testSingleNoWait() = runTest { + val flow = flow { + emit(1) + emit(2) + awaitCancellation() + } + + assertNull(flow.singleOrNull()) } }