Skip to content

Commit

Permalink
Make awaitSingleOrNull() consistent with singleOrNull()
Browse files Browse the repository at this point in the history
Fixes #2591
  • Loading branch information
dkhalanskyjb committed Apr 16, 2021
1 parent 3c83c0c commit c05b4e8
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 20 deletions.
22 changes: 13 additions & 9 deletions reactive/kotlinx-coroutines-reactive/src/Await.kt
Expand Up @@ -93,17 +93,19 @@ public suspend fun <T> Publisher<T>.awaitSingle(): T = awaitOne(Mode.SINGLE)
public suspend fun <T> Publisher<T>.awaitSingleOrDefault(default: T): T = awaitOne(Mode.SINGLE_OR_DEFAULT, default)

/**
* Awaits for the single value from the given publisher or `null` value if none is emitted without blocking a thread and
* returns the resulting value or throws the corresponding exception if this publisher had produced error.
* Awaits the single value from the given observable without blocking the thread and returns the resulting value, or, if
* this observable has produced an error, throws the corresponding exception. If more than one value or none were
* produced by the publisher, `null` is returned.
*
* This suspending function is cancellable.
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
* immediately resumes with [CancellationException].
*
* @throws NoSuchElementException if publisher does not emit any value
* @throws IllegalArgumentException if publisher emits more than one value
* If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
* function immediately cancels its [Subscription] and resumes with [CancellationException].
*/
public suspend fun <T> Publisher<T>.awaitSingleOrNull(): T = awaitOne(Mode.SINGLE_OR_DEFAULT)
public suspend fun <T> Publisher<T>.awaitSingleOrNull(): T? = try {
awaitOne(Mode.SINGLE_OR_DEFAULT)
} catch (e: TooManyElementsException) {
null
}

/**
* Awaits for the single value from the given publisher or call [defaultValue] to get a value if none is emitted without blocking a thread and
Expand All @@ -120,6 +122,8 @@ public suspend fun <T> Publisher<T>.awaitSingleOrElse(defaultValue: () -> T): T

// ------------------------ private ------------------------

private class TooManyElementsException(message: String): IllegalArgumentException(message)

private enum class Mode(val s: String) {
FIRST("awaitFirst"),
FIRST_OR_DEFAULT("awaitFirstOrDefault"),
Expand Down Expand Up @@ -186,7 +190,7 @@ private suspend fun <T> Publisher<T>.awaitOne(
/* the check for `cont.isActive` is needed in case `sub.cancel() above calls `onComplete` or
`onError` on its own. */
if (cont.isActive) {
cont.resumeWithException(IllegalArgumentException("More than one onNext value for $mode"))
cont.resumeWithException(TooManyElementsException("More than one onNext value for $mode"))
}
} else {
value = t
Expand Down
8 changes: 0 additions & 8 deletions reactive/kotlinx-coroutines-reactive/test/IntegrationTest.kt
Expand Up @@ -238,12 +238,4 @@ class IntegrationTest(
}
}

private suspend fun checkNumbers(n: Int, pub: Publisher<Int>) {
var last = 0
pub.collect {
assertEquals(++last, it)
}
assertEquals(n, last)
}

}
6 changes: 3 additions & 3 deletions reactive/kotlinx-coroutines-reactor/test/FluxSingleTest.kt
Expand Up @@ -104,11 +104,11 @@ class FluxSingleTest : TestBase() {
@Test
fun testAwaitSingleOrNullException() {
val flux = flux {
send((Flux.just("O", "#").awaitSingleOrNull() ?: "!") + "K")
send((Flux.just("!", "#").awaitSingleOrNull() ?: "O") + "K")
}

checkErroneous(flux) {
assert(it is IllegalArgumentException)
checkSingleValue(flux) {
assertEquals("OK", it)
}
}

Expand Down

0 comments on commit c05b4e8

Please sign in to comment.