Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Align Mono#share() behavior with Flux#share, split sink impl out #2756

Merged
merged 6 commits into from
Sep 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ default void emitValue(@Nullable T value, Sinks.EmitFailureHandler failureHandle
case FAIL_OVERFLOW:
Operators.onDiscard(value, currentContext());
//the emitError will onErrorDropped if already terminated
emitError(Exceptions.failWithOverflow("Backpressure overflow during Sinks.Many#emitNext"),
emitError(Exceptions.failWithOverflow("Backpressure overflow during Sinks.One#emitValue"),
failureHandler);
return;
case FAIL_CANCELLED:
Expand Down
48 changes: 34 additions & 14 deletions reactor-core/src/main/java/reactor/core/publisher/Mono.java
Original file line number Diff line number Diff line change
Expand Up @@ -1797,6 +1797,9 @@ public final <E> Mono<E> cast(Class<E> clazz) {
* Completion and Error will also be replayed.
* <p>
* <img class="marble" src="doc-files/marbles/cacheForMono.svg" alt="">
* <p>
* Once the first subscription is made to this {@link Mono}, the source is subscribed to and
* the signal will be cached, indefinitely. This process cannot be cancelled.
*
* @return a replaying {@link Mono}
*/
Expand All @@ -1812,6 +1815,9 @@ public final Mono<T> cache() {
* the next {@link Subscriber} will start over a new subscription.
* <p>
* <img class="marble" src="doc-files/marbles/cacheWithTtlForMono.svg" alt="">
* <p>
* First subscription after the cache has been emptied triggers cache loading (ie
* subscription to the source), which cannot be cancelled.
*
* @return a replaying {@link Mono}
*/
Expand All @@ -1820,19 +1826,22 @@ public final Mono<T> cache(Duration ttl) {
}

/**
* Turn this {@link Mono} into a hot source and cache last emitted signals for further
* {@link Subscriber}, with an expiry timeout.
* <p>
* Completion and Error will also be replayed until {@code ttl} triggers in which case
* the next {@link Subscriber} will start over a new subscription.
* <p>
* <img class="marble" src="doc-files/marbles/cacheWithTtlForMono.svg" alt="">
*
* Turn this {@link Mono} into a hot source and cache last emitted signals for further
* {@link Subscriber}, with an expiry timeout.
* <p>
* Completion and Error will also be replayed until {@code ttl} triggers in which case
* the next {@link Subscriber} will start over a new subscription.
* <p>
* <img class="marble" src="doc-files/marbles/cacheWithTtlForMono.svg" alt="">
* <p>
* First subscription after the cache has been emptied triggers cache loading (ie
* subscription to the source), which cannot be cancelled.
*
* @param ttl Time-to-live for each cached item and post termination.
* @param timer the {@link Scheduler} on which to measure the duration.
*
* @return a replaying {@link Mono}
*/
* @return a replaying {@link Mono}
*/
public final Mono<T> cache(Duration ttl, Scheduler timer) {
return onAssembly(new MonoCacheTime<>(this, ttl, timer));
}
Expand All @@ -1856,6 +1865,9 @@ public final Mono<T> cache(Duration ttl, Scheduler timer) {
* <p>
* Note that subscribers that come in perfectly simultaneously could receive the same
* cached signal even if the TTL is set to zero.
* <p>
* First subscription after the cache has been emptied triggers cache loading (ie
* subscription to the source), which cannot be cancelled.
*
* @param ttlForValue the TTL-generating {@link Function} invoked when source is valued
* @param ttlForError the TTL-generating {@link Function} invoked when source is erroring
Expand Down Expand Up @@ -1887,6 +1899,9 @@ public final Mono<T> cache(Function<? super T, Duration> ttlForValue,
* <p>
* Note that subscribers that come in perfectly simultaneously could receive the same
* cached signal even if the TTL is set to zero.
* <p>
* First subscription after the cache has been emptied triggers cache loading (ie
* subscription to the source), which cannot be cancelled.
*
* @param ttlForValue the TTL-generating {@link Function} invoked when source is valued
* @param ttlForError the TTL-generating {@link Function} invoked when source is erroring
Expand Down Expand Up @@ -4096,7 +4111,7 @@ public final Mono<T> retryWhen(Retry retrySpec) {
/**
* Prepare a {@link Mono} which shares this {@link Mono} result similar to {@link Flux#shareNext()}.
* This will effectively turn this {@link Mono} into a hot task when the first
* {@link Subscriber} subscribes using {@link #subscribe} API. Further {@link Subscriber} will share the same {@link Subscription}
* {@link Subscriber} subscribes using {@link #subscribe()} API. Further {@link Subscriber} will share the same {@link Subscription}
* and therefore the same result.
* It's worth noting this is an un-cancellable {@link Subscription}.
* <p>
Expand All @@ -4109,10 +4124,10 @@ public final Mono<T> share() {
return this;
}

if (this instanceof NextProcessor) { //TODO should we check whether the NextProcessor has a source or not?
if (this instanceof NextProcessor && ((NextProcessor<?>) this).isRefCounted) { //TODO should we check whether the NextProcessor has a source or not?
return this;
}
return new NextProcessor<>(this);
return new NextProcessor<>(this, true);
}

/**
Expand Down Expand Up @@ -4165,7 +4180,12 @@ public final Mono<T> single() {
public final Disposable subscribe() {
if(this instanceof NextProcessor){
NextProcessor<T> s = (NextProcessor<T>)this;
if (s.source != null) { //enables `Sinks.one().subscribe()` usecases
// Mono#share() should return a new lambda subscriber during subscribe.
// This can now be precisely detected with source != null in combination to the isRefCounted boolean being true.
// `Sinks.one().subscribe()` case is now split into a separate implementation.
// Otherwise, this is a (legacy) #toProcessor() usage, and we return the processor itself below (and don't forget to connect() it):
if (s.source != null && !s.isRefCounted) {
s.subscribe(new LambdaMonoSubscriber<>(null, null, null, null, null));
s.connect();
return s;
}
Expand Down
151 changes: 127 additions & 24 deletions reactor-core/src/main/java/reactor/core/publisher/NextProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,21 @@
import reactor.core.CoreSubscriber;
import reactor.core.Exceptions;
import reactor.core.Scannable;
import reactor.core.publisher.Sinks.EmitResult;
import reactor.core.publisher.Sinks.EmissionException;
import reactor.core.publisher.Sinks.EmitFailureHandler;
import reactor.core.publisher.Sinks.EmitResult;
import reactor.util.annotation.Nullable;
import reactor.util.context.Context;

// NextProcessor extends a deprecated class but is itself not deprecated and is here to stay, hence the following line is ok.
@SuppressWarnings("deprecation")
class NextProcessor<O> extends MonoProcessor<O> implements InternalOneSink<O> {
class NextProcessor<O> extends MonoProcessor<O> {

/**
* This boolean indicates a usage as `Mono#share()` where, for alignment with Flux#share(), the removal of all
* subscribers should lead to the cancellation of the upstream Subscription.
*/
final boolean isRefCounted;

volatile NextInner<O>[] subscribers;

Expand Down Expand Up @@ -66,18 +73,13 @@ class NextProcessor<O> extends MonoProcessor<O> implements InternalOneSink<O> {
O value;

NextProcessor(@Nullable CorePublisher<? extends O> source) {
this.source = source;
SUBSCRIBERS.lazySet(this, source != null ? EMPTY_WITH_SOURCE : EMPTY);
}

@Override
public int currentSubscriberCount() {
return subscribers.length;
this(source, false);
}

@Override
public Mono<O> asMono() {
return this;
NextProcessor(@Nullable CorePublisher<? extends O> source, boolean isRefCounted) {
this.source = source;
this.isRefCounted = isRefCounted;
SUBSCRIBERS.lazySet(this, source != null ? EMPTY_WITH_SOURCE : EMPTY);
}

@Override
Expand Down Expand Up @@ -145,22 +147,72 @@ public O block(@Nullable Duration timeout) {
@Override
public final void onComplete() {
//no particular error condition handling for onComplete
@SuppressWarnings("unused") EmitResult emitResult = tryEmitEmpty();
@SuppressWarnings("unused") EmitResult emitResult = tryEmitValue(null);
}

@Override
public EmitResult tryEmitEmpty() {
return tryEmitValue(null);
void emitEmpty(Sinks.EmitFailureHandler failureHandler) {
for (;;) {
Sinks.EmitResult emitResult = tryEmitValue(null);
if (emitResult.isSuccess()) {
return;
}

boolean shouldRetry = failureHandler.onEmitFailure(SignalType.ON_COMPLETE,
emitResult);
if (shouldRetry) {
continue;
}

switch (emitResult) {
case FAIL_ZERO_SUBSCRIBER:
case FAIL_OVERFLOW:
case FAIL_CANCELLED:
case FAIL_TERMINATED:
return;
case FAIL_NON_SERIALIZED:
throw new EmissionException(emitResult,
"Spec. Rule 1.3 - onSubscribe, onNext, onError and onComplete signaled to a Subscriber MUST be signaled serially."
);
default:
throw new EmissionException(emitResult, "Unknown emitResult value");
}
}
}

@Override
public final void onError(Throwable cause) {
emitError(cause, EmitFailureHandler.FAIL_FAST);
for (;;) {
EmitResult emitResult = tryEmitError(cause);
if (emitResult.isSuccess()) {
return;
}

boolean shouldRetry = EmitFailureHandler.FAIL_FAST.onEmitFailure(SignalType.ON_ERROR,
emitResult);
if (shouldRetry) {
continue;
}

switch (emitResult) {
case FAIL_ZERO_SUBSCRIBER:
case FAIL_OVERFLOW:
case FAIL_CANCELLED:
return;
case FAIL_TERMINATED:
Operators.onErrorDropped(cause, currentContext());
return;
case FAIL_NON_SERIALIZED:
throw new EmissionException(emitResult,
"Spec. Rule 1.3 - onSubscribe, onNext, onError and onComplete signaled to a Subscriber MUST be signaled serially."
);
default:
throw new EmissionException(emitResult, "Unknown emitResult value");
}
}
}

@Override
@SuppressWarnings("unchecked")
public Sinks.EmitResult tryEmitError(Throwable cause) {
Sinks.EmitResult tryEmitError(Throwable cause) {
Objects.requireNonNull(cause, "onError cannot be null");

if (UPSTREAM.getAndSet(this, Operators.cancelledSubscription()) == Operators.cancelledSubscription()) {
Expand All @@ -180,11 +232,50 @@ public Sinks.EmitResult tryEmitError(Throwable cause) {

@Override
public final void onNext(@Nullable O value) {
emitValue(value, EmitFailureHandler.FAIL_FAST);
if (value == null) {
emitEmpty(EmitFailureHandler.FAIL_FAST);
return;
}

for (;;) {
EmitResult emitResult = tryEmitValue(value);
if (emitResult.isSuccess()) {
return;
}

boolean shouldRetry = EmitFailureHandler.FAIL_FAST.onEmitFailure(SignalType.ON_NEXT,
emitResult);
if (shouldRetry) {
continue;
}

switch (emitResult) {
case FAIL_ZERO_SUBSCRIBER:
//we want to "discard" without rendering the sink terminated.
// effectively NO-OP cause there's no subscriber, so no context :(
return;
case FAIL_OVERFLOW:
Operators.onDiscard(value, currentContext());
//the emitError will onErrorDropped if already terminated
onError(Exceptions.failWithOverflow("Backpressure overflow during Sinks.Many#emitNext"));
return;
case FAIL_CANCELLED:
Operators.onDiscard(value, currentContext());
return;
case FAIL_TERMINATED:
Operators.onNextDropped(value, currentContext());
return;
case FAIL_NON_SERIALIZED:
throw new EmissionException(emitResult,
"Spec. Rule 1.3 - onSubscribe, onNext, onError and onComplete signaled to a Subscriber MUST be signaled serially."
);
default:
throw new EmissionException(emitResult, "Unknown emitResult value");
}
}
}

@Override
public EmitResult tryEmitValue(@Nullable O value) {
EmitResult tryEmitValue(@Nullable O value) {
Subscription s;
if ((s = UPSTREAM.getAndSet(this, Operators.cancelledSubscription())) == Operators.cancelledSubscription()) {
return EmitResult.FAIL_TERMINATED;
Expand Down Expand Up @@ -345,16 +436,28 @@ void remove(NextInner<O> ps) {
}

NextInner<O>[] b;

boolean disconnect = false;
if (n == 1) {
b = EMPTY;
if (isRefCounted && source != null) {
b = EMPTY_WITH_SOURCE;
disconnect = true;
}
else {
b = EMPTY;
}
}
else {
b = new NextInner[n - 1];
System.arraycopy(a, 0, b, 0, j);
System.arraycopy(a, j + 1, b, j, n - j - 1);
}
if (SUBSCRIBERS.compareAndSet(this, a, b)) {
if (disconnect) {
Subscription oldSubscription = UPSTREAM.getAndSet(this, null);
if (oldSubscription != null) {
oldSubscription.cancel();
}
}
return;
}
}
Expand Down