Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SwitchOnFirst better tracks in/out premature close #2794

Merged
merged 4 commits into from
Sep 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,6 @@ public void arbiter(LLLLL_Result result) {
}

@JCStressTest
//FIXME investigate this result, see https://github.com/reactor/reactor-core/issues/2789
@Outcome(id = { "0, 0, 1, 1, 1, 1, 0, 0, 1" }, expect = ACCEPTABLE_INTERESTING, desc = "MUST INVESTIGATE")
@Outcome(id = {
"0, 0, 1, 2, 2, 0, 1, 1, 0"}, expect = ACCEPTABLE, desc = "Inbound got second request, delivered onNext('value') and delivered onComplete() before cancellation")
@Outcome(id = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,27 +70,29 @@ public Object scanUnsafe(Attr key) {
return super.scanUnsafe(key);
}

static final int HAS_FIRST_VALUE_RECEIVED_FLAG =
static final int HAS_FIRST_VALUE_RECEIVED_FLAG =
0b0000_0000_0000_0000_0000_0000_0000_0001;
static final int HAS_INBOUND_SUBSCRIBED_ONCE_FLAG =
static final int HAS_INBOUND_SUBSCRIBED_ONCE_FLAG =
0b0000_0000_0000_0000_0000_0000_0000_0010;
static final int HAS_INBOUND_SUBSCRIBER_SET_FLAG =
static final int HAS_INBOUND_SUBSCRIBER_SET_FLAG =
0b0000_0000_0000_0000_0000_0000_0000_0100;
static final int HAS_INBOUND_REQUESTED_ONCE_FLAG =
static final int HAS_INBOUND_REQUESTED_ONCE_FLAG =
0b0000_0000_0000_0000_0000_0000_0000_1000;
static final int HAS_FIRST_VALUE_SENT_FLAG =
static final int HAS_FIRST_VALUE_SENT_FLAG =
0b0000_0000_0000_0000_0000_0000_0001_0000;
static final int HAS_INBOUND_CANCELLED_FLAG =
static final int HAS_INBOUND_CANCELLED_FLAG =
0b0000_0000_0000_0000_0000_0000_0010_0000;
static final int HAS_INBOUND_TERMINATED_FLAG =
static final int HAS_INBOUND_CLOSED_PREMATURELY_FLAG =
0b0000_0000_0000_0000_0000_0000_0100_0000;
static final int HAS_INBOUND_TERMINATED_FLAG =
0b0000_0000_0000_0000_0000_0000_1000_0000;

static final int HAS_OUTBOUND_SUBSCRIBED_FLAG =
0b0000_0000_0000_0000_0000_0000_1000_0000;
static final int HAS_OUTBOUND_CANCELLED_FLAG =
0b0000_0000_0000_0000_0000_0001_0000_0000;
static final int HAS_OUTBOUND_TERMINATED_FLAG =
static final int HAS_OUTBOUND_CANCELLED_FLAG =
0b0000_0000_0000_0000_0000_0010_0000_0000;
static final int HAS_OUTBOUND_TERMINATED_FLAG =
0b0000_0000_0000_0000_0000_0100_0000_0000;

/**
* Adds a flag which indicate that the first inbound onNext signal has already been
Expand All @@ -102,7 +104,7 @@ static <T, R> long markFirstValueReceived(AbstractSwitchOnFirstMain<T, R> instan
for (;;) {
final int state = instance.state;

if (hasInboundCancelled(state)) {
if (hasInboundCancelled(state) || hasInboundClosedPrematurely(state)) {
return state;
}

Expand Down Expand Up @@ -142,7 +144,7 @@ static <T, R> long markInboundSubscriberSet(AbstractSwitchOnFirstMain<T, R> inst
for (;;) {
final int state = instance.state;

if (hasInboundCancelled(state)) {
if (hasInboundCancelled(state) || hasInboundClosedPrematurely(state)) {
return state;
}

Expand All @@ -162,7 +164,7 @@ static <T, R> long markInboundRequestedOnce(AbstractSwitchOnFirstMain<T, R> inst
for (;;) {
final int state = instance.state;

if (hasInboundCancelled(state)) {
if (hasInboundCancelled(state) || hasInboundClosedPrematurely(state)) {
return state;
}

Expand All @@ -182,7 +184,7 @@ static <T, R> long markFirstValueSent(AbstractSwitchOnFirstMain<T, R> instance)
for (;;) {
final int state = instance.state;

if (hasInboundCancelled(state)) {
if (hasInboundCancelled(state) || hasInboundClosedPrematurely(state)) {
return state;
}

Expand All @@ -202,7 +204,7 @@ static <T, R> long markInboundTerminated(AbstractSwitchOnFirstMain<T, R> instanc
for (;;) {
final int state = instance.state;

if (hasInboundCancelled(state)) {
if (hasInboundCancelled(state) || hasInboundClosedPrematurely(state)) {
return state;
}

Expand All @@ -214,15 +216,15 @@ static <T, R> long markInboundTerminated(AbstractSwitchOnFirstMain<T, R> instanc

/**
* Adds a flag which indicate that the inbound has already been cancelled. Fails if
* inbound is cancelled or terminated.
* inbound is cancelled.
*
* @return previous observed state
*/
static <T, R> long markInboundCancelled(AbstractSwitchOnFirstMain<T, R> instance) {
for (;;) {
final int state = instance.state;

if (hasInboundTerminated(state) || hasInboundCancelled(state)) {
if (hasInboundCancelled(state)) {
return state;
}

Expand All @@ -233,20 +235,20 @@ static <T, R> long markInboundCancelled(AbstractSwitchOnFirstMain<T, R> instance
}

/**
* Adds flags which indicate that the inbound has cancelled upstream and errored
* the inbound downstream. Fails if either inbound is cancelled or terminated.
* Adds flags which indicate that the inbound has prematurely from the very bottom
* of the pipe. Fails if either inbound is cancelled or terminated before.
*
* @return previous observed state
*/
static <T, R> long markInboundCancelledAndErrored(AbstractSwitchOnFirstMain<T, R> instance) {
static <T, R> long markInboundClosedPrematurely(AbstractSwitchOnFirstMain<T, R> instance) {
for (;;) {
final int state = instance.state;

if (hasInboundTerminated(state) || hasInboundCancelled(state)) {
return state;
}

if (AbstractSwitchOnFirstMain.STATE.compareAndSet(instance, state, state | HAS_INBOUND_CANCELLED_FLAG | HAS_INBOUND_TERMINATED_FLAG)) {
if (AbstractSwitchOnFirstMain.STATE.compareAndSet(instance, state, state | HAS_INBOUND_CLOSED_PREMATURELY_FLAG)) {
return state;
}
}
Expand Down Expand Up @@ -342,6 +344,10 @@ static boolean hasInboundCancelled(long state) {
return (state & HAS_INBOUND_CANCELLED_FLAG) == HAS_INBOUND_CANCELLED_FLAG;
}

static boolean hasInboundClosedPrematurely(long state) {
return (state & HAS_INBOUND_CLOSED_PREMATURELY_FLAG) == HAS_INBOUND_CLOSED_PREMATURELY_FLAG;
}

static boolean hasInboundTerminated(long state) {
return (state & HAS_INBOUND_TERMINATED_FLAG) == HAS_INBOUND_TERMINATED_FLAG;
}
Expand Down Expand Up @@ -416,8 +422,8 @@ static abstract class AbstractSwitchOnFirstMain<T, R> extends Flux<T>
@Override
@Nullable
public final Object scanUnsafe(Attr key) {
if (key == Attr.CANCELLED) return hasInboundCancelled(this.state);
if (key == Attr.TERMINATED) return hasInboundTerminated(this.state);
if (key == Attr.CANCELLED) return hasInboundCancelled(this.state) || hasInboundClosedPrematurely(this.state);
if (key == Attr.TERMINATED) return hasInboundTerminated(this.state) || hasInboundClosedPrematurely(this.state);
if (key == Attr.RUN_STYLE) return Attr.RunStyle.SYNC;

return InnerOperator.super.scanUnsafe(key);
Expand Down Expand Up @@ -451,7 +457,7 @@ public final void onNext(T t) {
this.firstValue = t;

long previousState = markFirstValueReceived(this);
if (hasInboundCancelled(previousState)) {
if (hasInboundCancelled(previousState) || hasInboundClosedPrematurely(previousState)) {
this.firstValue = null;
Operators.onDiscard(t, this.outboundSubscriber.currentContext());
return;
Expand Down Expand Up @@ -500,7 +506,7 @@ public final void onError(Throwable t) {
this.throwable = t;

final long previousState = markInboundTerminated(this);
if (hasInboundCancelled(previousState) || hasInboundTerminated(previousState)) {
if (hasInboundCancelled(previousState) || hasInboundTerminated(previousState) || hasInboundClosedPrematurely(previousState)) {
Operators.onErrorDropped(t, this.outboundSubscriber.currentContext());
return;
}
Expand Down Expand Up @@ -537,7 +543,7 @@ public final void onComplete() {
this.done = true;

final long previousState = markInboundTerminated(this);
if (hasInboundCancelled(previousState) || hasInboundTerminated(previousState)) {
if (hasInboundCancelled(previousState) || hasInboundTerminated(previousState) || hasInboundClosedPrematurely(previousState)) {
return;
}

Expand Down Expand Up @@ -568,7 +574,7 @@ public final void onComplete() {
@Override
public final void cancel() {
long previousState = markInboundCancelled(this);
if (hasInboundCancelled(previousState) || hasInboundTerminated(previousState)) {
if (hasInboundCancelled(previousState) || hasInboundTerminated(previousState) || hasInboundClosedPrematurely(previousState)) {
return;
}

Expand All @@ -582,7 +588,7 @@ public final void cancel() {
}

final void cancelAndError() {
long previousState = markInboundCancelledAndErrored(this);
long previousState = markInboundClosedPrematurely(this);
if (hasInboundCancelled(previousState) || hasInboundTerminated(previousState)) {
return;
}
Expand Down Expand Up @@ -620,7 +626,7 @@ public final void request(long n) {

if (this.isFirstOnNextReceivedOnce) {
final long previousState = markInboundRequestedOnce(this);
if (hasInboundCancelled(previousState)) {
if (hasInboundCancelled(previousState) || hasInboundClosedPrematurely(previousState)) {
return;
}

Expand Down Expand Up @@ -651,7 +657,7 @@ public final void subscribe(CoreSubscriber<? super T> inboundSubscriber) {
return;
}

if (hasInboundCancelled(previousState)) {
if (hasInboundClosedPrematurely(previousState)) {
Operators.error(inboundSubscriber, new CancellationException("FluxSwitchOnFirst has already been cancelled"));
return;
}
Expand All @@ -672,24 +678,27 @@ public final void subscribe(CoreSubscriber<? super T> inboundSubscriber) {
inboundSubscriber.onSubscribe(this);

previousState = markInboundSubscriberSet(this);
if (hasInboundCancelled(previousState) && hasOutboundTerminated(previousState) && !hasInboundTerminated(previousState)) {
if (hasInboundClosedPrematurely(previousState)
&& (!hasInboundRequestedOnce(previousState) || hasFirstValueSent(previousState))
&& !hasInboundCancelled(previousState)) {
inboundSubscriber.onError(new CancellationException("FluxSwitchOnFirst has already been cancelled"));
}
}

abstract CoreSubscriber<? super T> convert(CoreSubscriber<? super T> inboundSubscriber);


final boolean sendFirst(T firstValue) {
final CoreSubscriber<? super T> a = this.inboundSubscriber;

final boolean sent = tryDirectSend(a, firstValue);

final long previousState = markFirstValueSent(this);
if (hasInboundCancelled(previousState)) {
if (hasInboundTerminated(previousState)) {
a.onError(new CancellationException("FluxSwitchOnFirst has already been cancelled"));
}
return sent;
}

if (hasInboundClosedPrematurely(previousState)) {
a.onError(new CancellationException("FluxSwitchOnFirst has already been cancelled"));
return sent;
}

Expand Down Expand Up @@ -753,7 +762,6 @@ public boolean tryOnNext(T t) {
return false;
}


if (!this.isFirstOnNextReceivedOnce) {
this.isFirstOnNextReceivedOnce = true;
this.firstValue = t;
Expand Down Expand Up @@ -805,8 +813,7 @@ boolean tryDirectSend(CoreSubscriber<? super T> actual, T t) {
}
}

static class SwitchOnFirstControlSubscriber<T>
extends Operators.DeferredSubscription
static class SwitchOnFirstControlSubscriber<T> extends Operators.DeferredSubscription
implements InnerOperator<T, T>, CoreSubscriber<T> {

final AbstractSwitchOnFirstMain<?, T> parent;
Expand Down