Skip to content

Commit

Permalink
Add a way to construct ReactorContext from ContextView (Kotlin#2622)
Browse files Browse the repository at this point in the history
  • Loading branch information
dkhalanskyjb authored and pablobaxter committed Sep 14, 2022
1 parent 2da7106 commit 1cdc50d
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 28 deletions.
Expand Up @@ -33,14 +33,16 @@ public final class kotlinx/coroutines/reactor/MonoKt {
public final class kotlinx/coroutines/reactor/ReactorContext : kotlin/coroutines/AbstractCoroutineContextElement {
public static final field Key Lkotlinx/coroutines/reactor/ReactorContext$Key;
public fun <init> (Lreactor/util/context/Context;)V
public fun <init> (Lreactor/util/context/ContextView;)V
public final fun getContext ()Lreactor/util/context/Context;
}

public final class kotlinx/coroutines/reactor/ReactorContext$Key : kotlin/coroutines/CoroutineContext$Key {
}

public final class kotlinx/coroutines/reactor/ReactorContextKt {
public static final fun asCoroutineContext (Lreactor/util/context/Context;)Lkotlinx/coroutines/reactor/ReactorContext;
public static final synthetic fun asCoroutineContext (Lreactor/util/context/Context;)Lkotlinx/coroutines/reactor/ReactorContext;
public static final fun asCoroutineContext (Lreactor/util/context/ContextView;)Lkotlinx/coroutines/reactor/ReactorContext;
}

public final class kotlinx/coroutines/reactor/ReactorFlowKt {
Expand Down
4 changes: 2 additions & 2 deletions reactive/kotlinx-coroutines-reactor/src/Convert.kt
Expand Up @@ -41,8 +41,8 @@ public fun <T> Deferred<T?>.asMono(context: CoroutineContext): Mono<T> = mono(co
/**
* Converts a stream of elements received from the channel to the hot reactive flux.
*
* Every subscriber receives values from this channel in **fan-out** fashion. If the are multiple subscribers,
* they'll receive values in round-robin way.
* Every subscriber receives values from this channel in a **fan-out** fashion. If the are multiple subscribers,
* they'll receive values in a round-robin way.
* @param context -- the coroutine context from which the resulting flux is going to be signalled
*/
@Deprecated(message = "Deprecated in the favour of consumeAsFlow()",
Expand Down
46 changes: 32 additions & 14 deletions reactive/kotlinx-coroutines-reactor/src/Flux.kt
Expand Up @@ -14,20 +14,20 @@ import reactor.util.context.*
import kotlin.coroutines.*

/**
* Creates cold reactive [Flux] that runs a given [block] in a coroutine.
* Creates a cold reactive [Flux] that runs the given [block] in a coroutine.
* Every time the returned flux is subscribed, it starts a new coroutine in the specified [context].
* Coroutine emits ([Subscriber.onNext]) values with `send`, completes ([Subscriber.onComplete])
* when the coroutine completes or channel is explicitly closed and emits error ([Subscriber.onError])
* if coroutine throws an exception or closes channel with a cause.
* Unsubscribing cancels running coroutine.
* The coroutine emits ([Subscriber.onNext]) values with [send][ProducerScope.send], completes ([Subscriber.onComplete])
* when the coroutine completes, or, in case the coroutine throws an exception or the channel is closed,
* emits the error ([Subscriber.onError]) and closes the channel with the cause.
* Unsubscribing cancels the running coroutine.
*
* Invocations of `send` are suspended appropriately when subscribers apply back-pressure and to ensure that
* `onNext` is not invoked concurrently.
*
* Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance.
* Invocations of [send][ProducerScope.send] are suspended appropriately when subscribers apply back-pressure and to
* ensure that [onNext][Subscriber.onNext] is not invoked concurrently.
*
* **Note: This is an experimental api.** Behaviour of publishers that work as children in a parent scope with respect
* to cancellation and error handling may change in the future.
*
* @throws IllegalArgumentException if the provided [context] contains a [Job] instance.
*/
@ExperimentalCoroutinesApi
public fun <T> flux(
Expand All @@ -43,12 +43,13 @@ private fun <T> reactorPublish(
scope: CoroutineScope,
context: CoroutineContext = EmptyCoroutineContext,
@BuilderInference block: suspend ProducerScope<T>.() -> Unit
): Publisher<T> = Publisher { subscriber ->
// specification requires NPE on null subscriber
if (subscriber == null) throw NullPointerException("Subscriber cannot be null")
require(subscriber is CoreSubscriber) { "Subscriber is not an instance of CoreSubscriber, context can not be extracted." }
): Publisher<T> = Publisher onSubscribe@{ subscriber: Subscriber<in T>? ->
if (subscriber !is CoreSubscriber) {
subscriber.reject(IllegalArgumentException("Subscriber is not an instance of CoreSubscriber, context can not be extracted."))
return@onSubscribe
}
val currentContext = subscriber.currentContext()
val reactorContext = (context[ReactorContext]?.context?.putAll(currentContext) ?: currentContext).asCoroutineContext()
val reactorContext = context.extendReactorContext(currentContext)
val newContext = scope.newCoroutineContext(context + reactorContext)
val coroutine = PublisherCoroutine(newContext, subscriber, REACTOR_HANDLER)
subscriber.onSubscribe(coroutine) // do it first (before starting coroutine), to avoid unnecessary suspensions
Expand All @@ -66,6 +67,23 @@ private val REACTOR_HANDLER: (Throwable, CoroutineContext) -> Unit = { cause, ct
}
}

/** The proper way to reject the subscriber, according to
* [the reactive spec](https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md#1.9)
*/
private fun <T> Subscriber<T>?.reject(t: Throwable) {
if (this == null)
throw NullPointerException("The subscriber can not be null")
onSubscribe(object: Subscription {
override fun request(n: Long) {
// intentionally left blank
}
override fun cancel() {
// intentionally left blank
}
})
onError(t)
}

@Deprecated(
message = "CoroutineScope.flux is deprecated in favour of top-level flux",
level = DeprecationLevel.HIDDEN,
Expand Down
2 changes: 1 addition & 1 deletion reactive/kotlinx-coroutines-reactor/src/Mono.kt
Expand Up @@ -83,7 +83,7 @@ private fun <T> monoInternal(
context: CoroutineContext,
block: suspend CoroutineScope.() -> T?
): Mono<T> = Mono.create { sink ->
val reactorContext = (context[ReactorContext]?.context?.putAll(sink.currentContext()) ?: sink.currentContext()).asCoroutineContext()
val reactorContext = context.extendReactorContext(sink.currentContext())
val newContext = scope.newCoroutineContext(context + reactorContext)
val coroutine = MonoCoroutine(newContext, sink)
sink.onDispose(coroutine)
Expand Down
40 changes: 30 additions & 10 deletions reactive/kotlinx-coroutines-reactor/src/ReactorContext.kt
Expand Up @@ -5,19 +5,21 @@
package kotlinx.coroutines.reactor

import kotlinx.coroutines.ExperimentalCoroutinesApi
import reactor.util.context.Context
import kotlin.coroutines.*
import kotlinx.coroutines.reactive.*
import reactor.util.context.*

/**
* Wraps Reactor's [Context] into [CoroutineContext] element for seamless integration Reactor and kotlinx.coroutines.
* [Context.asCoroutineContext] is defined to add Reactor's [Context] elements as part of [CoroutineContext].
* Coroutine context element that propagates information about Reactor's [Context] through coroutines.
* Wraps Reactor's [Context] into a [CoroutineContext] element for seamless integration between
* Reactor and kotlinx.coroutines.
* [Context.asCoroutineContext] puts Reactor's [Context] elements into a [CoroutineContext],
* which can be used to propagate the information about Reactor's [Context] through coroutines.
*
* This context element is implicitly propagated through subscriber's context by all Reactive integrations, such as [mono], [flux],
* [Publisher.asFlow][asFlow], [Flow.asPublisher][asPublisher] and [Flow.asFlux][asFlux].
* Functions that subscribe to the reactive stream (e.g. [Publisher.awaitFirst][kotlinx.coroutines.reactive.awaitFirst])
* also propagate the [ReactorContext] to the subscriber's [Context].
* This context element is implicitly propagated through subscribers' context by all Reactive integrations,
* such as [mono], [flux], [Publisher.asFlow][asFlow], [Flow.asPublisher][asPublisher] and [Flow.asFlux][asFlux].
* Functions that subscribe to a reactive stream
* (e.g. [Publisher.awaitFirst][kotlinx.coroutines.reactive.awaitFirst]), too, propagate [ReactorContext]
* to the subscriber's [Context].
**
* ### Examples of Reactive context integration.
*
Expand Down Expand Up @@ -49,12 +51,30 @@ import kotlinx.coroutines.reactive.*
*/
@ExperimentalCoroutinesApi
public class ReactorContext(public val context: Context) : AbstractCoroutineContextElement(ReactorContext) {

// `Context.of` is zero-cost if the argument is a `Context`
public constructor(contextView: ContextView): this(Context.of(contextView))

public companion object Key : CoroutineContext.Key<ReactorContext>
}

/**
* Wraps the given [Context] into [ReactorContext], so it can be added to coroutine's context
* Wraps the given [ContextView] into [ReactorContext], so it can be added to the coroutine's context
* and later used via `coroutineContext[ReactorContext]`.
*/
@ExperimentalCoroutinesApi
public fun ContextView.asCoroutineContext(): ReactorContext = ReactorContext(this)

/**
* Wraps the given [Context] into [ReactorContext], so it can be added to the coroutine's context
* and later used via `coroutineContext[ReactorContext]`.
*/
@ExperimentalCoroutinesApi
public fun Context.asCoroutineContext(): ReactorContext = ReactorContext(this)
@Deprecated("The more general version for ContextView should be used instead", level = DeprecationLevel.HIDDEN)
public fun Context.asCoroutineContext(): ReactorContext = readOnly().asCoroutineContext() // `readOnly()` is zero-cost.

/**
* Updates the Reactor context in this [CoroutineContext], adding (or possibly replacing) some values.
*/
internal fun CoroutineContext.extendReactorContext(extensions: ContextView): CoroutineContext =
(this[ReactorContext]?.context?.putAll(extensions) ?: extensions).asCoroutineContext()

0 comments on commit 1cdc50d

Please sign in to comment.