Skip to content

Commit

Permalink
Deprecate awaitSingleOr*, specialize some await* functions for Mono a…
Browse files Browse the repository at this point in the history
…nd Maybe (#2628)

* Deprecated `awaitSingleOr*` on arbitrary Publishers
* Added specialized `awaitSingle` and `awaitSingleOrNull` methods on
  `Maybe<T>` and `Mono<T>`
* Deprecated `Maybe<T>.await()` in favor of `Maybe<T>.awaitSingleOrNull()`
* Added specializations of most of the `await*` methods for `Mono<T>` and
  deprecated them, as the only useful methods on `Mono<T>` are
  `awaitSingle` and `awaitSingleOrNull`
* Reworded some documentation for `await*` methods

Fixes #2591
Fixes #1587
  • Loading branch information
dkhalanskyjb committed Apr 21, 2021
1 parent cefb84f commit 71df60e
Show file tree
Hide file tree
Showing 22 changed files with 871 additions and 225 deletions.
72 changes: 38 additions & 34 deletions reactive/kotlinx-coroutines-jdk9/src/Await.kt
Expand Up @@ -4,78 +4,82 @@

package kotlinx.coroutines.jdk9

import kotlinx.coroutines.*
import java.util.concurrent.*
import org.reactivestreams.FlowAdapters
import kotlinx.coroutines.reactive.*

/**
* Awaits for the first value from the given publisher without blocking a thread and
* returns the resulting value or throws the corresponding exception if this publisher had produced error.
* Awaits the first value from the given publisher without blocking the thread and returns the resulting value, or, if
* the publisher has produced an error, throws the corresponding exception.
*
* This suspending function is cancellable.
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
* immediately resumes with [CancellationException].
* If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
* function immediately cancels its [Flow.Subscription] and resumes with [CancellationException].
*
* @throws NoSuchElementException if publisher does not emit any value
* @throws NoSuchElementException if the publisher does not emit any value
*/
public suspend fun <T> Flow.Publisher<T>.awaitFirst(): T = FlowAdapters.toPublisher(this).awaitFirst()
public suspend fun <T> Flow.Publisher<T>.awaitFirst(): T =
FlowAdapters.toPublisher(this).awaitFirst()

/**
* Awaits for the first value from the given observable or the [default] value if none is emitted without blocking a
* thread and returns the resulting value or throws the corresponding exception if this observable had produced error.
* Awaits the first value from the given publisher, or returns the [default] value if none is emitted, without blocking
* the thread, and returns the resulting value, or, if this publisher has produced an error, throws the corresponding
* exception.
*
* This suspending function is cancellable.
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
* immediately resumes with [CancellationException].
* If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
* function immediately cancels its [Flow.Subscription] and resumes with [CancellationException].
*/
public suspend fun <T> Flow.Publisher<T>.awaitFirstOrDefault(default: T): T =
FlowAdapters.toPublisher(this).awaitFirstOrDefault(default)
FlowAdapters.toPublisher(this).awaitFirstOrDefault(default)

/**
* Awaits for the first value from the given observable or `null` value if none is emitted without blocking a
* thread and returns the resulting value or throws the corresponding exception if this observable had produced error.
* Awaits the first value from the given publisher, or returns `null` if none is emitted, without blocking the thread,
* and returns the resulting value, or, if this publisher has produced an error, throws the corresponding exception.
*
* This suspending function is cancellable.
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
* immediately resumes with [CancellationException].
* If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
* function immediately cancels its [Flow.Subscription] and resumes with [CancellationException].
*/
public suspend fun <T> Flow.Publisher<T>.awaitFirstOrNull(): T? =
FlowAdapters.toPublisher(this).awaitFirstOrNull()
FlowAdapters.toPublisher(this).awaitFirstOrNull()

/**
* Awaits for the first value from the given observable or call [defaultValue] to get a value if none is emitted without blocking a
* thread and returns the resulting value or throws the corresponding exception if this observable had produced error.
* Awaits the first value from the given publisher, or calls [defaultValue] to get a value if none is emitted, without
* blocking the thread, and returns the resulting value, or, if this publisher has produced an error, throws the
* corresponding exception.
*
* This suspending function is cancellable.
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
* immediately resumes with [CancellationException].
* If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
* function immediately cancels its [Flow.Subscription] and resumes with [CancellationException].
*/
public suspend fun <T> Flow.Publisher<T>.awaitFirstOrElse(defaultValue: () -> T): T =
FlowAdapters.toPublisher(this).awaitFirstOrElse(defaultValue)
FlowAdapters.toPublisher(this).awaitFirstOrElse(defaultValue)

/**
* Awaits for the last value from the given publisher without blocking a thread and
* returns the resulting value or throws the corresponding exception if this publisher had produced error.
* Awaits the last value from the given publisher without blocking the thread and
* returns the resulting value, or, if this publisher has produced an error, throws the corresponding exception.
*
* This suspending function is cancellable.
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
* immediately resumes with [CancellationException].
* If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
* function immediately cancels its [Flow.Subscription] and resumes with [CancellationException].
*
* @throws NoSuchElementException if publisher does not emit any value
* @throws NoSuchElementException if the publisher does not emit any value
*/
public suspend fun <T> Flow.Publisher<T>.awaitLast(): T =
FlowAdapters.toPublisher(this).awaitLast()
FlowAdapters.toPublisher(this).awaitLast()

/**
* Awaits for the single value from the given publisher without blocking a thread and
* returns the resulting value or throws the corresponding exception if this publisher had produced error.
* Awaits the single value from the given publisher without blocking the thread and returns the resulting value, or,
* if this publisher has produced an error, throws the corresponding exception.
*
* This suspending function is cancellable.
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
* immediately resumes with [CancellationException].
* If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this
* function immediately cancels its [Flow.Subscription] and resumes with [CancellationException].
*
* @throws NoSuchElementException if publisher does not emit any value
* @throws IllegalArgumentException if publisher emits more than one value
* @throws NoSuchElementException if the publisher does not emit any value
* @throws IllegalArgumentException if the publisher emits more than one value
*/
public suspend fun <T> Flow.Publisher<T>.awaitSingle(): T =
FlowAdapters.toPublisher(this).awaitSingle()
FlowAdapters.toPublisher(this).awaitSingle()
43 changes: 43 additions & 0 deletions reactive/kotlinx-coroutines-jdk9/test/AwaitTest.kt
@@ -0,0 +1,43 @@
/*
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.jdk9

import kotlinx.coroutines.*
import org.junit.*
import java.util.concurrent.Flow as JFlow

class AwaitTest: TestBase() {

/** Tests that calls to [awaitFirst] (and, thus, to the rest of these functions) throw [CancellationException] and
* unsubscribe from the publisher when their [Job] is cancelled. */
@Test
fun testAwaitCancellation() = runTest {
expect(1)
val publisher = JFlow.Publisher<Int> { s ->
s.onSubscribe(object : JFlow.Subscription {
override fun request(n: Long) {
expect(3)
}

override fun cancel() {
expect(5)
}
})
}
val job = launch(start = CoroutineStart.UNDISPATCHED) {
try {
expect(2)
publisher.awaitFirst()
} catch (e: CancellationException) {
expect(6)
throw e
}
}
expect(4)
job.cancelAndJoin()
finish(7)
}

}

0 comments on commit 71df60e

Please sign in to comment.