diff --git a/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt b/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt index 1a5b3de100..ef46ff04cf 100644 --- a/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt +++ b/kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt @@ -9,6 +9,7 @@ package kotlinx.coroutines.flow import kotlinx.coroutines.flow.internal.* +import kotlinx.coroutines.internal.Symbol import kotlin.jvm.* /** @@ -47,33 +48,43 @@ public suspend inline fun Flow.fold( } /** - * The terminal operator, that awaits for one and only one value to be published. + * The terminal operator that awaits for one and only one value to be emitted. * Throws [NoSuchElementException] for empty flow and [IllegalStateException] for flow * that contains more than one element. */ public suspend fun Flow.single(): T { var result: Any? = NULL collect { value -> - if (result !== NULL) error("Expected only one element") + require(result == NULL) { "Flow has more than one element" } result = value } - if (result === NULL) throw NoSuchElementException("Expected at least one element") - @Suppress("UNCHECKED_CAST") + if (result === NULL) throw NoSuchElementException("Flow is empty") return result as T } /** - * The terminal operator, that awaits for one and only one value to be published. - * Throws [IllegalStateException] for flow that contains more than one element. + * The terminal operator that awaits for one and only one value to be emitted. + * Returns the single value or `null`, if the flow was empty or emitted more than one value. */ public suspend fun Flow.singleOrNull(): T? { - var result: T? = null + var result: Any? = NULL collect { value -> - if (result != null) error("Expected only one element") - result = value + /* + * result === NULL -> first value + * result === user value -> we already had first value and second one arrived + * result === DONE -> we've seen more than one value, time to return it + * as well. + */ + result = if (result === NULL) { + value + } else { + // Indicator that more than one value was observed + DONE + } } - return result + // Symbols are never leaked, so it's one comparison versus two + return if (result is Symbol) null else result as T } /** diff --git a/kotlinx-coroutines-core/common/test/flow/operators/OnCompletionTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/OnCompletionTest.kt index 7f0c548ca6..f55e8beeb2 100644 --- a/kotlinx-coroutines-core/common/test/flow/operators/OnCompletionTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/operators/OnCompletionTest.kt @@ -231,7 +231,7 @@ class OnCompletionTest : TestBase() { @Test fun testSingle() = runTest { - assertFailsWith { + assertFailsWith { flowOf(239).onCompletion { assertNull(it) expect(1) @@ -240,7 +240,7 @@ class OnCompletionTest : TestBase() { expectUnreached() } catch (e: Throwable) { // Second emit -- failure - assertTrue { e is IllegalStateException } + assertTrue { e is IllegalArgumentException } throw e } }.single() diff --git a/kotlinx-coroutines-core/common/test/flow/terminal/SingleTest.kt b/kotlinx-coroutines-core/common/test/flow/terminal/SingleTest.kt index dd9857c7aa..5865029893 100644 --- a/kotlinx-coroutines-core/common/test/flow/terminal/SingleTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/terminal/SingleTest.kt @@ -25,8 +25,8 @@ class SingleTest : TestBase() { emit(239L) emit(240L) } - assertFailsWith { flow.single() } - assertFailsWith { flow.singleOrNull() } + assertFailsWith { flow.single() } + assertNull(flow.singleOrNull()) } @Test