From 1cdc50da975bd91eeeb3bb968967130e88b01d70 Mon Sep 17 00:00:00 2001 From: dkhalanskyjb <52952525+dkhalanskyjb@users.noreply.github.com> Date: Thu, 22 Apr 2021 14:39:51 +0300 Subject: [PATCH] Add a way to construct ReactorContext from ContextView (#2622) Fixes https://github.com/Kotlin/kotlinx.coroutines/issues/2575 --- .../api/kotlinx-coroutines-reactor.api | 4 +- .../kotlinx-coroutines-reactor/src/Convert.kt | 4 +- .../kotlinx-coroutines-reactor/src/Flux.kt | 46 +++++++++++++------ .../kotlinx-coroutines-reactor/src/Mono.kt | 2 +- .../src/ReactorContext.kt | 40 ++++++++++++---- 5 files changed, 68 insertions(+), 28 deletions(-) diff --git a/reactive/kotlinx-coroutines-reactor/api/kotlinx-coroutines-reactor.api b/reactive/kotlinx-coroutines-reactor/api/kotlinx-coroutines-reactor.api index 0a10aa12a9..b69bb334d7 100644 --- a/reactive/kotlinx-coroutines-reactor/api/kotlinx-coroutines-reactor.api +++ b/reactive/kotlinx-coroutines-reactor/api/kotlinx-coroutines-reactor.api @@ -33,6 +33,7 @@ public final class kotlinx/coroutines/reactor/MonoKt { public final class kotlinx/coroutines/reactor/ReactorContext : kotlin/coroutines/AbstractCoroutineContextElement { public static final field Key Lkotlinx/coroutines/reactor/ReactorContext$Key; public fun (Lreactor/util/context/Context;)V + public fun (Lreactor/util/context/ContextView;)V public final fun getContext ()Lreactor/util/context/Context; } @@ -40,7 +41,8 @@ public final class kotlinx/coroutines/reactor/ReactorContext$Key : kotlin/corout } public final class kotlinx/coroutines/reactor/ReactorContextKt { - public static final fun asCoroutineContext (Lreactor/util/context/Context;)Lkotlinx/coroutines/reactor/ReactorContext; + public static final synthetic fun asCoroutineContext (Lreactor/util/context/Context;)Lkotlinx/coroutines/reactor/ReactorContext; + public static final fun asCoroutineContext (Lreactor/util/context/ContextView;)Lkotlinx/coroutines/reactor/ReactorContext; } public final class kotlinx/coroutines/reactor/ReactorFlowKt { diff --git a/reactive/kotlinx-coroutines-reactor/src/Convert.kt b/reactive/kotlinx-coroutines-reactor/src/Convert.kt index 3b08bd6639..002baa6185 100644 --- a/reactive/kotlinx-coroutines-reactor/src/Convert.kt +++ b/reactive/kotlinx-coroutines-reactor/src/Convert.kt @@ -41,8 +41,8 @@ public fun Deferred.asMono(context: CoroutineContext): Mono = mono(co /** * Converts a stream of elements received from the channel to the hot reactive flux. * - * Every subscriber receives values from this channel in **fan-out** fashion. If the are multiple subscribers, - * they'll receive values in round-robin way. + * Every subscriber receives values from this channel in a **fan-out** fashion. If the are multiple subscribers, + * they'll receive values in a round-robin way. * @param context -- the coroutine context from which the resulting flux is going to be signalled */ @Deprecated(message = "Deprecated in the favour of consumeAsFlow()", diff --git a/reactive/kotlinx-coroutines-reactor/src/Flux.kt b/reactive/kotlinx-coroutines-reactor/src/Flux.kt index 806f5bd5bc..df5f64f262 100644 --- a/reactive/kotlinx-coroutines-reactor/src/Flux.kt +++ b/reactive/kotlinx-coroutines-reactor/src/Flux.kt @@ -14,20 +14,20 @@ import reactor.util.context.* import kotlin.coroutines.* /** - * Creates cold reactive [Flux] that runs a given [block] in a coroutine. + * Creates a cold reactive [Flux] that runs the given [block] in a coroutine. * Every time the returned flux is subscribed, it starts a new coroutine in the specified [context]. - * Coroutine emits ([Subscriber.onNext]) values with `send`, completes ([Subscriber.onComplete]) - * when the coroutine completes or channel is explicitly closed and emits error ([Subscriber.onError]) - * if coroutine throws an exception or closes channel with a cause. - * Unsubscribing cancels running coroutine. + * The coroutine emits ([Subscriber.onNext]) values with [send][ProducerScope.send], completes ([Subscriber.onComplete]) + * when the coroutine completes, or, in case the coroutine throws an exception or the channel is closed, + * emits the error ([Subscriber.onError]) and closes the channel with the cause. + * Unsubscribing cancels the running coroutine. * - * Invocations of `send` are suspended appropriately when subscribers apply back-pressure and to ensure that - * `onNext` is not invoked concurrently. - * - * Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance. + * Invocations of [send][ProducerScope.send] are suspended appropriately when subscribers apply back-pressure and to + * ensure that [onNext][Subscriber.onNext] is not invoked concurrently. * * **Note: This is an experimental api.** Behaviour of publishers that work as children in a parent scope with respect * to cancellation and error handling may change in the future. + * + * @throws IllegalArgumentException if the provided [context] contains a [Job] instance. */ @ExperimentalCoroutinesApi public fun flux( @@ -43,12 +43,13 @@ private fun reactorPublish( scope: CoroutineScope, context: CoroutineContext = EmptyCoroutineContext, @BuilderInference block: suspend ProducerScope.() -> Unit -): Publisher = Publisher { subscriber -> - // specification requires NPE on null subscriber - if (subscriber == null) throw NullPointerException("Subscriber cannot be null") - require(subscriber is CoreSubscriber) { "Subscriber is not an instance of CoreSubscriber, context can not be extracted." } +): Publisher = Publisher onSubscribe@{ subscriber: Subscriber? -> + if (subscriber !is CoreSubscriber) { + subscriber.reject(IllegalArgumentException("Subscriber is not an instance of CoreSubscriber, context can not be extracted.")) + return@onSubscribe + } val currentContext = subscriber.currentContext() - val reactorContext = (context[ReactorContext]?.context?.putAll(currentContext) ?: currentContext).asCoroutineContext() + val reactorContext = context.extendReactorContext(currentContext) val newContext = scope.newCoroutineContext(context + reactorContext) val coroutine = PublisherCoroutine(newContext, subscriber, REACTOR_HANDLER) subscriber.onSubscribe(coroutine) // do it first (before starting coroutine), to avoid unnecessary suspensions @@ -66,6 +67,23 @@ private val REACTOR_HANDLER: (Throwable, CoroutineContext) -> Unit = { cause, ct } } +/** The proper way to reject the subscriber, according to + * [the reactive spec](https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md#1.9) + */ +private fun Subscriber?.reject(t: Throwable) { + if (this == null) + throw NullPointerException("The subscriber can not be null") + onSubscribe(object: Subscription { + override fun request(n: Long) { + // intentionally left blank + } + override fun cancel() { + // intentionally left blank + } + }) + onError(t) +} + @Deprecated( message = "CoroutineScope.flux is deprecated in favour of top-level flux", level = DeprecationLevel.HIDDEN, diff --git a/reactive/kotlinx-coroutines-reactor/src/Mono.kt b/reactive/kotlinx-coroutines-reactor/src/Mono.kt index 6a4a38f379..67c1baa02d 100644 --- a/reactive/kotlinx-coroutines-reactor/src/Mono.kt +++ b/reactive/kotlinx-coroutines-reactor/src/Mono.kt @@ -83,7 +83,7 @@ private fun monoInternal( context: CoroutineContext, block: suspend CoroutineScope.() -> T? ): Mono = Mono.create { sink -> - val reactorContext = (context[ReactorContext]?.context?.putAll(sink.currentContext()) ?: sink.currentContext()).asCoroutineContext() + val reactorContext = context.extendReactorContext(sink.currentContext()) val newContext = scope.newCoroutineContext(context + reactorContext) val coroutine = MonoCoroutine(newContext, sink) sink.onDispose(coroutine) diff --git a/reactive/kotlinx-coroutines-reactor/src/ReactorContext.kt b/reactive/kotlinx-coroutines-reactor/src/ReactorContext.kt index 333f056d97..8969662adc 100644 --- a/reactive/kotlinx-coroutines-reactor/src/ReactorContext.kt +++ b/reactive/kotlinx-coroutines-reactor/src/ReactorContext.kt @@ -5,19 +5,21 @@ package kotlinx.coroutines.reactor import kotlinx.coroutines.ExperimentalCoroutinesApi -import reactor.util.context.Context import kotlin.coroutines.* import kotlinx.coroutines.reactive.* +import reactor.util.context.* /** - * Wraps Reactor's [Context] into [CoroutineContext] element for seamless integration Reactor and kotlinx.coroutines. - * [Context.asCoroutineContext] is defined to add Reactor's [Context] elements as part of [CoroutineContext]. - * Coroutine context element that propagates information about Reactor's [Context] through coroutines. + * Wraps Reactor's [Context] into a [CoroutineContext] element for seamless integration between + * Reactor and kotlinx.coroutines. + * [Context.asCoroutineContext] puts Reactor's [Context] elements into a [CoroutineContext], + * which can be used to propagate the information about Reactor's [Context] through coroutines. * - * This context element is implicitly propagated through subscriber's context by all Reactive integrations, such as [mono], [flux], - * [Publisher.asFlow][asFlow], [Flow.asPublisher][asPublisher] and [Flow.asFlux][asFlux]. - * Functions that subscribe to the reactive stream (e.g. [Publisher.awaitFirst][kotlinx.coroutines.reactive.awaitFirst]) - * also propagate the [ReactorContext] to the subscriber's [Context]. + * This context element is implicitly propagated through subscribers' context by all Reactive integrations, + * such as [mono], [flux], [Publisher.asFlow][asFlow], [Flow.asPublisher][asPublisher] and [Flow.asFlux][asFlux]. + * Functions that subscribe to a reactive stream + * (e.g. [Publisher.awaitFirst][kotlinx.coroutines.reactive.awaitFirst]), too, propagate [ReactorContext] + * to the subscriber's [Context]. ** * ### Examples of Reactive context integration. * @@ -49,12 +51,30 @@ import kotlinx.coroutines.reactive.* */ @ExperimentalCoroutinesApi public class ReactorContext(public val context: Context) : AbstractCoroutineContextElement(ReactorContext) { + + // `Context.of` is zero-cost if the argument is a `Context` + public constructor(contextView: ContextView): this(Context.of(contextView)) + public companion object Key : CoroutineContext.Key } /** - * Wraps the given [Context] into [ReactorContext], so it can be added to coroutine's context + * Wraps the given [ContextView] into [ReactorContext], so it can be added to the coroutine's context + * and later used via `coroutineContext[ReactorContext]`. + */ +@ExperimentalCoroutinesApi +public fun ContextView.asCoroutineContext(): ReactorContext = ReactorContext(this) + +/** + * Wraps the given [Context] into [ReactorContext], so it can be added to the coroutine's context * and later used via `coroutineContext[ReactorContext]`. */ @ExperimentalCoroutinesApi -public fun Context.asCoroutineContext(): ReactorContext = ReactorContext(this) +@Deprecated("The more general version for ContextView should be used instead", level = DeprecationLevel.HIDDEN) +public fun Context.asCoroutineContext(): ReactorContext = readOnly().asCoroutineContext() // `readOnly()` is zero-cost. + +/** + * Updates the Reactor context in this [CoroutineContext], adding (or possibly replacing) some values. + */ +internal fun CoroutineContext.extendReactorContext(extensions: ContextView): CoroutineContext = + (this[ReactorContext]?.context?.putAll(extensions) ?: extensions).asCoroutineContext() \ No newline at end of file