From 0fae24a7b284a79406955398e4074b0cb867c7c3 Mon Sep 17 00:00:00 2001 From: Dmitry Khalanskiy Date: Tue, 30 Mar 2021 15:09:17 +0300 Subject: [PATCH] Make awaitSingleOrNull() consistent with singleOrNull() Fixes https://github.com/Kotlin/kotlinx.coroutines/issues/2591 --- .../kotlinx-coroutines-reactive/src/Await.kt | 22 +++++++++++-------- .../test/FluxSingleTest.kt | 6 ++--- 2 files changed, 16 insertions(+), 12 deletions(-) diff --git a/reactive/kotlinx-coroutines-reactive/src/Await.kt b/reactive/kotlinx-coroutines-reactive/src/Await.kt index 9af134cb94..2fbae92158 100644 --- a/reactive/kotlinx-coroutines-reactive/src/Await.kt +++ b/reactive/kotlinx-coroutines-reactive/src/Await.kt @@ -93,17 +93,19 @@ public suspend fun Publisher.awaitSingle(): T = awaitOne(Mode.SINGLE) public suspend fun Publisher.awaitSingleOrDefault(default: T): T = awaitOne(Mode.SINGLE_OR_DEFAULT, default) /** - * Awaits for the single value from the given publisher or `null` value if none is emitted without blocking a thread and - * returns the resulting value or throws the corresponding exception if this publisher had produced error. + * Awaits the single value from the given observable without blocking the thread and returns the resulting value, or, if + * this observable has produced an error, throws the corresponding exception. If more than one value or none were + * produced by the publisher, `null` is returned. * * This suspending function is cancellable. - * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function - * immediately resumes with [CancellationException]. - * - * @throws NoSuchElementException if publisher does not emit any value - * @throws IllegalArgumentException if publisher emits more than one value + * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this + * function immediately cancels its [Subscription] and resumes with [CancellationException]. */ -public suspend fun Publisher.awaitSingleOrNull(): T = awaitOne(Mode.SINGLE_OR_DEFAULT) +public suspend fun Publisher.awaitSingleOrNull(): T? = try { + awaitOne(Mode.SINGLE_OR_DEFAULT) +} catch (e: TooManyElementsException) { + null +} /** * Awaits for the single value from the given publisher or call [defaultValue] to get a value if none is emitted without blocking a thread and @@ -120,6 +122,8 @@ public suspend fun Publisher.awaitSingleOrElse(defaultValue: () -> T): T // ------------------------ private ------------------------ +private class TooManyElementsException(message: String): IllegalArgumentException(message) + private enum class Mode(val s: String) { FIRST("awaitFirst"), FIRST_OR_DEFAULT("awaitFirstOrDefault"), @@ -186,7 +190,7 @@ private suspend fun Publisher.awaitOne( /* the check for `cont.isActive` is needed in case `sub.cancel() above calls `onComplete` or `onError` on its own. */ if (cont.isActive) { - cont.resumeWithException(IllegalArgumentException("More than one onNext value for $mode")) + cont.resumeWithException(TooManyElementsException("More than one onNext value for $mode")) } } else { value = t diff --git a/reactive/kotlinx-coroutines-reactor/test/FluxSingleTest.kt b/reactive/kotlinx-coroutines-reactor/test/FluxSingleTest.kt index cc336ba6b5..26b9712c3b 100644 --- a/reactive/kotlinx-coroutines-reactor/test/FluxSingleTest.kt +++ b/reactive/kotlinx-coroutines-reactor/test/FluxSingleTest.kt @@ -104,11 +104,11 @@ class FluxSingleTest : TestBase() { @Test fun testAwaitSingleOrNullException() { val flux = flux { - send((Flux.just("O", "#").awaitSingleOrNull() ?: "!") + "K") + send((Flux.just("!", "#").awaitSingleOrNull() ?: "O") + "K") } - checkErroneous(flux) { - assert(it is IllegalArgumentException) + checkSingleValue(flux) { + assertEquals("OK", it) } }