Skip to content

Commit

Permalink
Pr/2230 (#2287)
Browse files Browse the repository at this point in the history
* Allow nullable types in Flow.firstOrNull
* Allow nullable types in Flow.singleOrNull
* Align Flow.single and Flow.singleOrNull with Kotlin standard library

Fixes #2229
Fixes #2289

Co-authored-by: Nicklas Ansman Giertz <nicklas@ansman.se>
  • Loading branch information
qwwdfsad and ansman committed Oct 8, 2020
1 parent 448106a commit 7897f70
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 20 deletions.
37 changes: 22 additions & 15 deletions kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt
Expand Up @@ -9,6 +9,7 @@
package kotlinx.coroutines.flow

import kotlinx.coroutines.flow.internal.*
import kotlinx.coroutines.internal.Symbol
import kotlin.jvm.*

/**
Expand Down Expand Up @@ -47,33 +48,39 @@ public suspend inline fun <T, R> Flow<T>.fold(
}

/**
* The terminal operator, that awaits for one and only one value to be published.
* The terminal operator that awaits for one and only one value to be emitted.
* Throws [NoSuchElementException] for empty flow and [IllegalStateException] for flow
* that contains more than one element.
*/
public suspend fun <T> Flow<T>.single(): T {
var result: Any? = NULL
collect { value ->
if (result !== NULL) error("Expected only one element")
require(result === NULL) { "Flow has more than one element" }
result = value
}

if (result === NULL) throw NoSuchElementException("Expected at least one element")
@Suppress("UNCHECKED_CAST")
if (result === NULL) throw NoSuchElementException("Flow is empty")
return result as T
}

/**
* The terminal operator, that awaits for one and only one value to be published.
* Throws [IllegalStateException] for flow that contains more than one element.
* The terminal operator that awaits for one and only one value to be emitted.
* Returns the single value or `null`, if the flow was empty or emitted more than one value.
*/
public suspend fun <T: Any> Flow<T>.singleOrNull(): T? {
var result: T? = null
collect { value ->
if (result != null) error("Expected only one element")
result = value
public suspend fun <T> Flow<T>.singleOrNull(): T? {
var result: Any? = NULL
collectWhile {
// No values yet, update result
if (result === NULL) {
result = it
true
} else {
// Second value, reset result and bail out
result = NULL
false
}
}
return result
return if (result === NULL) null else result as T
}

/**
Expand Down Expand Up @@ -112,7 +119,7 @@ public suspend fun <T> Flow<T>.first(predicate: suspend (T) -> Boolean): T {
* The terminal operator that returns the first element emitted by the flow and then cancels flow's collection.
* Returns `null` if the flow was empty.
*/
public suspend fun <T : Any> Flow<T>.firstOrNull(): T? {
public suspend fun <T> Flow<T>.firstOrNull(): T? {
var result: T? = null
collectWhile {
result = it
Expand All @@ -122,10 +129,10 @@ public suspend fun <T : Any> Flow<T>.firstOrNull(): T? {
}

/**
* The terminal operator that returns the first element emitted by the flow matching the given [predicate] and then cancels flow's collection.
* The terminal operator that returns the first element emitted by the flow matching the given [predicate] and then cancels flow's collection.
* Returns `null` if the flow did not contain an element matching the [predicate].
*/
public suspend fun <T : Any> Flow<T>.firstOrNull(predicate: suspend (T) -> Boolean): T? {
public suspend fun <T> Flow<T>.firstOrNull(predicate: suspend (T) -> Boolean): T? {
var result: T? = null
collectWhile {
if (predicate(it)) {
Expand Down
Expand Up @@ -231,7 +231,7 @@ class OnCompletionTest : TestBase() {

@Test
fun testSingle() = runTest {
assertFailsWith<IllegalStateException> {
assertFailsWith<IllegalArgumentException> {
flowOf(239).onCompletion {
assertNull(it)
expect(1)
Expand All @@ -240,7 +240,7 @@ class OnCompletionTest : TestBase() {
expectUnreached()
} catch (e: Throwable) {
// Second emit -- failure
assertTrue { e is IllegalStateException }
assertTrue { e is IllegalArgumentException }
throw e
}
}.single()
Expand Down
Expand Up @@ -128,6 +128,12 @@ class FirstTest : TestBase() {
assertNull(emptyFlow<Int>().firstOrNull { true })
}

@Test
fun testFirstOrNullWithNullElement() = runTest {
assertNull(flowOf<String?>(null).firstOrNull())
assertNull(flowOf<String?>(null).firstOrNull { true })
}

@Test
fun testFirstOrNullWhenErrorCancelsUpstream() = runTest {
val latch = Channel<Unit>()
Expand Down
27 changes: 24 additions & 3 deletions kotlinx-coroutines-core/common/test/flow/terminal/SingleTest.kt
Expand Up @@ -7,7 +7,7 @@ package kotlinx.coroutines.flow
import kotlinx.coroutines.*
import kotlin.test.*

class SingleTest : TestBase() {
class SingleTest : TestBase() {

@Test
fun testSingle() = runTest {
Expand All @@ -25,8 +25,8 @@ class SingleTest : TestBase() {
emit(239L)
emit(240L)
}
assertFailsWith<RuntimeException> { flow.single() }
assertFailsWith<RuntimeException> { flow.singleOrNull() }
assertFailsWith<IllegalArgumentException> { flow.single() }
assertNull(flow.singleOrNull())
}

@Test
Expand Down Expand Up @@ -61,6 +61,10 @@ class SingleTest : TestBase() {
assertEquals(1, flowOf<Int?>(1).single())
assertNull(flowOf<Int?>(null).single())
assertFailsWith<NoSuchElementException> { flowOf<Int?>().single() }

assertEquals(1, flowOf<Int?>(1).singleOrNull())
assertNull(flowOf<Int?>(null).singleOrNull())
assertNull(flowOf<Int?>().singleOrNull())
}

@Test
Expand All @@ -69,5 +73,22 @@ class SingleTest : TestBase() {
val flow = flowOf(instance)
assertSame(instance, flow.single())
assertSame(instance, flow.singleOrNull())

val flow2 = flow {
emit(BadClass())
emit(BadClass())
}
assertFailsWith<IllegalArgumentException> { flow2.single() }
}

@Test
fun testSingleNoWait() = runTest {
val flow = flow {
emit(1)
emit(2)
awaitCancellation()
}

assertNull(flow.singleOrNull())
}
}

0 comments on commit 7897f70

Please sign in to comment.