Skip to content

Commit

Permalink
Make awaitSingleOrNull() consistent with singleOrNull()
Browse files Browse the repository at this point in the history
Fixes #2591
  • Loading branch information
dkhalanskyjb committed Apr 7, 2021
1 parent eb963e8 commit a1c28e2
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 13 deletions.
22 changes: 13 additions & 9 deletions reactive/kotlinx-coroutines-reactive/src/Await.kt
Expand Up @@ -94,17 +94,19 @@ public suspend fun <T> Publisher<T>.awaitSingle(): T = awaitOne(Mode.SINGLE)
public suspend fun <T> Publisher<T>.awaitSingleOrDefault(default: T): T = awaitOne(Mode.SINGLE_OR_DEFAULT, default)

/**
* Awaits for the single value from the given publisher or `null` value if none is emitted without blocking a thread and
* returns the resulting value or throws the corresponding exception if this publisher had produced error.
* Awaits the single value from the given observable without blocking the thread and returns the resulting value, or, if
* this observable has produced an error, throws the corresponding exception. If more than one value or none were
* produced by the publisher, `null` is returned.
*
* This suspending function is cancellable.
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
* immediately resumes with [CancellationException].
*
* @throws NoSuchElementException if publisher does not emit any value
* @throws IllegalArgumentException if publisher emits more than one value
* If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
* function immediately cancels its [Subscription] and resumes with [CancellationException].
*/
public suspend fun <T> Publisher<T>.awaitSingleOrNull(): T = awaitOne(Mode.SINGLE_OR_DEFAULT)
public suspend fun <T> Publisher<T>.awaitSingleOrNull(): T? = try {
awaitOne(Mode.SINGLE_OR_DEFAULT)
} catch (e: TooManyElementsException) {
null
}

/**
* Awaits for the single value from the given publisher or call [defaultValue] to get a value if none is emitted without blocking a thread and
Expand All @@ -121,6 +123,8 @@ public suspend fun <T> Publisher<T>.awaitSingleOrElse(defaultValue: () -> T): T

// ------------------------ private ------------------------

private class TooManyElementsException(message: String): IllegalArgumentException(message)

private enum class Mode(val s: String) {
FIRST("awaitFirst"),
FIRST_OR_DEFAULT("awaitFirstOrDefault"),
Expand Down Expand Up @@ -158,7 +162,7 @@ private suspend fun <T> Publisher<T>.awaitOne(
if ((mode == Mode.SINGLE || mode == Mode.SINGLE_OR_DEFAULT) && seenValue) {
subscription.cancel()
if (cont.isActive)
cont.resumeWithException(IllegalArgumentException("More than one onNext value for $mode"))
cont.resumeWithException(TooManyElementsException("More than one onNext value for $mode"))
} else {
value = t
seenValue = true
Expand Down
Expand Up @@ -92,10 +92,10 @@ class IntegrationTest(
assertEquals(1, pub.awaitFirstOrDefault(0))
assertEquals(1, pub.awaitFirstOrNull())
assertEquals(1, pub.awaitFirstOrElse { 0 })
assertEquals(null, pub.awaitSingleOrNull())
assertEquals(n, pub.awaitLast())
assertFailsWith<IllegalArgumentException> { pub.awaitSingle() }
assertFailsWith<IllegalArgumentException> { pub.awaitSingleOrDefault(0) }
assertFailsWith<IllegalArgumentException> { pub.awaitSingleOrNull() }
assertFailsWith<IllegalArgumentException> { pub.awaitSingleOrElse { 0 } }
checkNumbers(n, pub)
val channel = pub.openSubscription()
Expand Down
6 changes: 3 additions & 3 deletions reactive/kotlinx-coroutines-reactor/test/FluxSingleTest.kt
Expand Up @@ -104,11 +104,11 @@ class FluxSingleTest : TestBase() {
@Test
fun testAwaitSingleOrNullException() {
val flux = flux {
send((Flux.just("O", "#").awaitSingleOrNull() ?: "!") + "K")
send((Flux.just("!", "#").awaitSingleOrNull() ?: "O") + "K")
}

checkErroneous(flux) {
assert(it is IllegalArgumentException)
checkSingleValue(flux) {
assertEquals("OK", it)
}
}

Expand Down

0 comments on commit a1c28e2

Please sign in to comment.