Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add await() extension on Rector Mono type #1587

Closed
elizarov opened this issue Oct 1, 2019 · 4 comments
Closed

Add await() extension on Rector Mono type #1587

elizarov opened this issue Oct 1, 2019 · 4 comments

Comments

@elizarov
Copy link
Contributor

elizarov commented Oct 1, 2019

It should simply be an inline alias to Publisher.awaitSingle(). But special care shall be taken about Mono<Void> types that never emit any value. They need awaitCompletion() which is missing altogether now.

@jorge-recio
Copy link

jorge-recio commented Apr 7, 2020

I would like to work on this issue.
If suspend fun Mono<T>.await(): T is implemented as an inline alias of suspend fun Publisher.awaitSingle(): T and Mono.await() is called the result will be a TypeCastException: null cannot be cast to non-null type. What should be the expected behavior?

About the implementation of suspend fun Mono<T>.awaitCompletion(): Unit can be implemented using a new suspend fun Publisher<T>.awaitCompletion(single: Boolean): Unit with single = true. But, can this feature be implemented as part of this issue or needs to be done in a different one?

@elizarov
Copy link
Contributor Author

elizarov commented Apr 7, 2020

They both need to be implemented together, but they need careful design on their behaviour in various edge-cases. It is not clear at this moment what should happen when Mono completes without emitting a value and how common this behavior is in the real code.

@wickedev
Copy link

wickedev commented Oct 23, 2020

Since this issue hasn't progressed for a year, I've written PoC-level code. Need more cases then this?

suspend inline fun <reified T> Mono<T>.await(): T {
    if (null is T) {
        // for Mono<T?>
        return awaitFirstOrNull() as T
    }

    return when (T::class) {
        // for Mono<Unit> or Mono<Void>
        Unit::class, Void::class -> {
            @Suppress("UNCHECKED_CAST", "ReactiveStreamsSubscriberImplementation")
            suspendCancellableCoroutine { cont ->
                cont as CancellableContinuation<T?>
                subscribe(object : Subscriber<T?> {
                    override fun onSubscribe(s: Subscription) {
                        cont.invokeOnCancellation {
                            s.cancel()
                        }
                        s.request(1)
                    }

                    override fun onNext(t: T?) {
                        error("Mono<Void> or Mono<Unit> cannot emit value")
                    }

                    override fun onError(t: Throwable) {
                        cont.resumeWithException(t)
                    }

                    override fun onComplete() {
                        cont.resume(null)
                    }
                })
            }
        }
        // for Mono<T>
        else -> awaitSingle() as T
    }
}

object MonoAwaitTest : Spek({
    test("mono await") {
        runBlocking {
            val m1: Mono<Int> = Mono.just(100)
            val r1: Int = m1.await()
            r1.should.be.equal(100)

            val m2: Mono<String> = Mono.just("value")
            val r2: String = m2.await()
            r2.should.be.equal("value")

            val m3: Mono<String?> = Mono.justOrEmpty("String?")
            val r3: String? = m3.await()
            r3.should.be.equal("String?")

            val m4: Mono<String?> = Mono.justOrEmpty(null)
            val r4: String? = m4.await()
            r4.should.be.equal(null)

            val m5: Mono<Unit> = Mono.empty()
            val r5: Unit = m5.await()
            r5.should.be.equal(null)

            val m6: Mono<Void> = Mono.empty()
            val r6: Void = m6.await()
            r6.should.be.equal(null)
        }
    }
})

@dkhalanskyjb
Copy link
Collaborator

It seems from the documentation (https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html) that Mono<T> not returning a value most be fairly common. For example, the existence of Mono<T>.defaultIfEmpty(defaultValue: T): Mono<T> already implies strongly that not returning a value is not limited to Mono<Void>, and this operator is used somewhat commonly: https://github.com/search?l=Java&q=Mono+defaultIfEmpty&type=Code. Thus, it looks to me like using Publisher.awaitSingle() would not reflect the intended semantics correctly; instead, Mono seems to be an equivalent for RxJava's Maybe and should probably be handled in the same way.

@dkhalanskyjb dkhalanskyjb self-assigned this Mar 24, 2021
pablobaxter pushed a commit to pablobaxter/kotlinx.coroutines that referenced this issue Sep 14, 2022
…nd Maybe (Kotlin#2628)

* Deprecated `awaitSingleOr*` on arbitrary Publishers
* Added specialized `awaitSingle` and `awaitSingleOrNull` methods on
  `Maybe<T>` and `Mono<T>`
* Deprecated `Maybe<T>.await()` in favor of `Maybe<T>.awaitSingleOrNull()`
* Added specializations of most of the `await*` methods for `Mono<T>` and
  deprecated them, as the only useful methods on `Mono<T>` are
  `awaitSingle` and `awaitSingleOrNull`
* Reworded some documentation for `await*` methods

Fixes Kotlin#2591
Fixes Kotlin#1587
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants