diff --git a/reactive/kotlinx-coroutines-reactive/src/Await.kt b/reactive/kotlinx-coroutines-reactive/src/Await.kt index e9f6955085..d0ca317a45 100644 --- a/reactive/kotlinx-coroutines-reactive/src/Await.kt +++ b/reactive/kotlinx-coroutines-reactive/src/Await.kt @@ -94,17 +94,19 @@ public suspend fun Publisher.awaitSingle(): T = awaitOne(Mode.SINGLE) public suspend fun Publisher.awaitSingleOrDefault(default: T): T = awaitOne(Mode.SINGLE_OR_DEFAULT, default) /** - * Awaits for the single value from the given publisher or `null` value if none is emitted without blocking a thread and - * returns the resulting value or throws the corresponding exception if this publisher had produced error. + * Awaits the single value from the given observable without blocking the thread and returns the resulting value, or, if + * this observable has produced an error, throws the corresponding exception. If more than one value or none were + * produced by the publisher, `null` is returned. * * This suspending function is cancellable. - * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function - * immediately resumes with [CancellationException]. - * - * @throws NoSuchElementException if publisher does not emit any value - * @throws IllegalArgumentException if publisher emits more than one value + * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this + * function immediately cancels its [Subscription] and resumes with [CancellationException]. */ -public suspend fun Publisher.awaitSingleOrNull(): T = awaitOne(Mode.SINGLE_OR_DEFAULT) +public suspend fun Publisher.awaitSingleOrNull(): T? = try { + awaitOne(Mode.SINGLE_OR_DEFAULT) +} catch (e: java.lang.IllegalArgumentException) { + null +} /** * Awaits for the single value from the given publisher or call [defaultValue] to get a value if none is emitted without blocking a thread and diff --git a/reactive/kotlinx-coroutines-reactive/test/IntegrationTest.kt b/reactive/kotlinx-coroutines-reactive/test/IntegrationTest.kt index 18cd012d16..7374fc54c9 100644 --- a/reactive/kotlinx-coroutines-reactive/test/IntegrationTest.kt +++ b/reactive/kotlinx-coroutines-reactive/test/IntegrationTest.kt @@ -92,10 +92,10 @@ class IntegrationTest( assertEquals(1, pub.awaitFirstOrDefault(0)) assertEquals(1, pub.awaitFirstOrNull()) assertEquals(1, pub.awaitFirstOrElse { 0 }) + assertEquals(null, pub.awaitSingleOrNull()) assertEquals(n, pub.awaitLast()) assertFailsWith { pub.awaitSingle() } assertFailsWith { pub.awaitSingleOrDefault(0) } - assertFailsWith { pub.awaitSingleOrNull() } assertFailsWith { pub.awaitSingleOrElse { 0 } } checkNumbers(n, pub) val channel = pub.openSubscription() diff --git a/reactive/kotlinx-coroutines-reactor/test/FluxSingleTest.kt b/reactive/kotlinx-coroutines-reactor/test/FluxSingleTest.kt index cc336ba6b5..26b9712c3b 100644 --- a/reactive/kotlinx-coroutines-reactor/test/FluxSingleTest.kt +++ b/reactive/kotlinx-coroutines-reactor/test/FluxSingleTest.kt @@ -104,11 +104,11 @@ class FluxSingleTest : TestBase() { @Test fun testAwaitSingleOrNullException() { val flux = flux { - send((Flux.just("O", "#").awaitSingleOrNull() ?: "!") + "K") + send((Flux.just("!", "#").awaitSingleOrNull() ?: "O") + "K") } - checkErroneous(flux) { - assert(it is IllegalArgumentException) + checkSingleValue(flux) { + assertEquals("OK", it) } }