Skip to content

Commit

Permalink
rework prematurely closed scenario
Browse files Browse the repository at this point in the history
Signed-off-by: Oleh Dokuka <odokuka@vmware.com>
  • Loading branch information
Oleh Dokuka committed Sep 28, 2021
1 parent a050dcf commit 14c5f49
Showing 1 changed file with 46 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,27 +70,29 @@ public Object scanUnsafe(Attr key) {
return super.scanUnsafe(key);
}

static final int HAS_FIRST_VALUE_RECEIVED_FLAG =
static final int HAS_FIRST_VALUE_RECEIVED_FLAG =
0b0000_0000_0000_0000_0000_0000_0000_0001;
static final int HAS_INBOUND_SUBSCRIBED_ONCE_FLAG =
static final int HAS_INBOUND_SUBSCRIBED_ONCE_FLAG =
0b0000_0000_0000_0000_0000_0000_0000_0010;
static final int HAS_INBOUND_SUBSCRIBER_SET_FLAG =
static final int HAS_INBOUND_SUBSCRIBER_SET_FLAG =
0b0000_0000_0000_0000_0000_0000_0000_0100;
static final int HAS_INBOUND_REQUESTED_ONCE_FLAG =
static final int HAS_INBOUND_REQUESTED_ONCE_FLAG =
0b0000_0000_0000_0000_0000_0000_0000_1000;
static final int HAS_FIRST_VALUE_SENT_FLAG =
static final int HAS_FIRST_VALUE_SENT_FLAG =
0b0000_0000_0000_0000_0000_0000_0001_0000;
static final int HAS_INBOUND_CANCELLED_FLAG =
static final int HAS_INBOUND_CANCELLED_FLAG =
0b0000_0000_0000_0000_0000_0000_0010_0000;
static final int HAS_INBOUND_TERMINATED_FLAG =
0b0000_0000_0000_0000_0000_0000_0100_0000;
static final int HAS_INBOUND_CLOSED_PREMATURELY_FLAG =
0b0000_0000_0000_0000_0000_0000_0010_0000;
static final int HAS_INBOUND_TERMINATED_FLAG =
0b0000_0000_0000_0000_0000_0000_1000_0000;

static final int HAS_OUTBOUND_SUBSCRIBED_FLAG =
0b0000_0000_0000_0000_0000_0000_1000_0000;
static final int HAS_OUTBOUND_CANCELLED_FLAG =
0b0000_0000_0000_0000_0000_0001_0000_0000;
static final int HAS_OUTBOUND_TERMINATED_FLAG =
static final int HAS_OUTBOUND_CANCELLED_FLAG =
0b0000_0000_0000_0000_0000_0010_0000_0000;
static final int HAS_OUTBOUND_TERMINATED_FLAG =
0b0000_0000_0000_0000_0000_0100_0000_0000;

/**
* Adds a flag which indicate that the first inbound onNext signal has already been
Expand Down Expand Up @@ -142,7 +144,7 @@ static <T, R> long markInboundSubscriberSet(AbstractSwitchOnFirstMain<T, R> inst
for (;;) {
final int state = instance.state;

if (hasInboundCancelled(state)) {
if (hasInboundCancelled(state) || hasInboundClosedPrematurely(state)) {
return state;
}

Expand All @@ -162,7 +164,7 @@ static <T, R> long markInboundRequestedOnce(AbstractSwitchOnFirstMain<T, R> inst
for (;;) {
final int state = instance.state;

if (hasInboundCancelled(state)) {
if (hasInboundCancelled(state) || hasInboundClosedPrematurely(state)) {
return state;
}

Expand All @@ -182,7 +184,7 @@ static <T, R> long markFirstValueSent(AbstractSwitchOnFirstMain<T, R> instance)
for (;;) {
final int state = instance.state;

if (hasInboundCancelled(state)) {
if (hasInboundCancelled(state) || hasInboundClosedPrematurely(state)) {
return state;
}

Expand All @@ -202,7 +204,7 @@ static <T, R> long markInboundTerminated(AbstractSwitchOnFirstMain<T, R> instanc
for (;;) {
final int state = instance.state;

if (hasInboundCancelled(state)) {
if (hasInboundCancelled(state) || hasInboundClosedPrematurely(state)) {
return state;
}

Expand All @@ -214,15 +216,15 @@ static <T, R> long markInboundTerminated(AbstractSwitchOnFirstMain<T, R> instanc

/**
* Adds a flag which indicate that the inbound has already been cancelled. Fails if
* inbound is cancelled or terminated.
* inbound is cancelled.
*
* @return previous observed state
*/
static <T, R> long markInboundCancelled(AbstractSwitchOnFirstMain<T, R> instance) {
for (;;) {
final int state = instance.state;

if (hasInboundTerminated(state) || hasInboundCancelled(state)) {
if (hasInboundCancelled(state)) {
return state;
}

Expand All @@ -233,20 +235,20 @@ static <T, R> long markInboundCancelled(AbstractSwitchOnFirstMain<T, R> instance
}

/**
* Adds flags which indicate that the inbound has cancelled upstream and errored
* the inbound downstream. Fails if either inbound is cancelled or terminated.
* Adds flags which indicate that the inbound has prematurely from the very bottom
* of the pipe. Fails if either inbound is cancelled or terminated before.
*
* @return previous observed state
*/
static <T, R> long markInboundCancelledAndErrored(AbstractSwitchOnFirstMain<T, R> instance) {
static <T, R> long markInboundClosedPrematurely(AbstractSwitchOnFirstMain<T, R> instance) {
for (;;) {
final int state = instance.state;

if (hasInboundTerminated(state) || hasInboundCancelled(state)) {
return state;
}

if (AbstractSwitchOnFirstMain.STATE.compareAndSet(instance, state, state | HAS_INBOUND_CANCELLED_FLAG | HAS_INBOUND_TERMINATED_FLAG)) {
if (AbstractSwitchOnFirstMain.STATE.compareAndSet(instance, state, state | HAS_INBOUND_CLOSED_PREMATURELY_FLAG)) {
return state;
}
}
Expand Down Expand Up @@ -342,6 +344,10 @@ static boolean hasInboundCancelled(long state) {
return (state & HAS_INBOUND_CANCELLED_FLAG) == HAS_INBOUND_CANCELLED_FLAG;
}

static boolean hasInboundClosedPrematurely(long state) {
return (state & HAS_INBOUND_CLOSED_PREMATURELY_FLAG) == HAS_INBOUND_CLOSED_PREMATURELY_FLAG;
}

static boolean hasInboundTerminated(long state) {
return (state & HAS_INBOUND_TERMINATED_FLAG) == HAS_INBOUND_TERMINATED_FLAG;
}
Expand Down Expand Up @@ -416,8 +422,8 @@ static abstract class AbstractSwitchOnFirstMain<T, R> extends Flux<T>
@Override
@Nullable
public final Object scanUnsafe(Attr key) {
if (key == Attr.CANCELLED) return hasInboundCancelled(this.state);
if (key == Attr.TERMINATED) return hasInboundTerminated(this.state);
if (key == Attr.CANCELLED) return hasInboundCancelled(this.state) || hasInboundClosedPrematurely(this.state);
if (key == Attr.TERMINATED) return hasInboundTerminated(this.state) || hasInboundClosedPrematurely(this.state);
if (key == Attr.RUN_STYLE) return Attr.RunStyle.SYNC;

return InnerOperator.super.scanUnsafe(key);
Expand Down Expand Up @@ -517,7 +523,8 @@ public final void onError(Throwable t) {
final CoreSubscriber<? super R> o = this.outboundSubscriber;
try {
final Signal<T> signal = Signal.error(t, o.currentContext());
result = Objects.requireNonNull(this.transformer.apply(signal, this), "The transformer returned a null value");
result = Objects.requireNonNull(this.transformer.apply(signal, this),
"The transformer returned a null value");
}
catch (Throwable e) {
o.onError(Exceptions.addSuppressed(t, e));
Expand Down Expand Up @@ -554,7 +561,8 @@ public final void onComplete() {

try {
final Signal<T> signal = Signal.complete(o.currentContext());
result = Objects.requireNonNull(this.transformer.apply(signal, this), "The transformer returned a null value");
result = Objects.requireNonNull(this.transformer.apply(signal, this),
"The transformer returned a null value");
}
catch (Throwable e) {
o.onError(e);
Expand All @@ -568,7 +576,7 @@ public final void onComplete() {
@Override
public final void cancel() {
long previousState = markInboundCancelled(this);
if (hasInboundCancelled(previousState) || hasInboundTerminated(previousState)) {
if (hasInboundCancelled(previousState) || hasInboundTerminated(previousState) || hasInboundClosedPrematurely(previousState)) {
return;
}

Expand All @@ -582,7 +590,7 @@ public final void cancel() {
}

final void cancelAndError() {
long previousState = markInboundCancelledAndErrored(this);
long previousState = markInboundClosedPrematurely(this);
if (hasInboundCancelled(previousState) || hasInboundTerminated(previousState)) {
return;
}
Expand Down Expand Up @@ -651,7 +659,7 @@ public final void subscribe(CoreSubscriber<? super T> inboundSubscriber) {
return;
}

if (hasInboundCancelled(previousState)) {
if (hasInboundClosedPrematurely(previousState)) {
Operators.error(inboundSubscriber, new CancellationException("FluxSwitchOnFirst has already been cancelled"));
return;
}
Expand All @@ -672,24 +680,28 @@ public final void subscribe(CoreSubscriber<? super T> inboundSubscriber) {
inboundSubscriber.onSubscribe(this);

previousState = markInboundSubscriberSet(this);
if (hasInboundCancelled(previousState) && hasOutboundTerminated(previousState) && !hasInboundTerminated(previousState)) {
if (hasInboundClosedPrematurely(previousState)
&& hasFirstValueSent(previousState)
&& !hasInboundCancelled(previousState)
&& !hasInboundTerminated(previousState)) {
inboundSubscriber.onError(new CancellationException("FluxSwitchOnFirst has already been cancelled"));
}
}

abstract CoreSubscriber<? super T> convert(CoreSubscriber<? super T> inboundSubscriber);


final boolean sendFirst(T firstValue) {
final CoreSubscriber<? super T> a = this.inboundSubscriber;

final boolean sent = tryDirectSend(a, firstValue);

final long previousState = markFirstValueSent(this);
if (hasInboundCancelled(previousState)) {
if (hasInboundTerminated(previousState)) {
a.onError(new CancellationException("FluxSwitchOnFirst has already been cancelled"));
}
return sent;
}

if (hasInboundClosedPrematurely(previousState)) {
a.onError(new CancellationException("FluxSwitchOnFirst has already been cancelled"));
return sent;
}

Expand Down Expand Up @@ -753,7 +765,6 @@ public boolean tryOnNext(T t) {
return false;
}


if (!this.isFirstOnNextReceivedOnce) {
this.isFirstOnNextReceivedOnce = true;
this.firstValue = t;
Expand Down Expand Up @@ -805,8 +816,7 @@ boolean tryDirectSend(CoreSubscriber<? super T> actual, T t) {
}
}

static class SwitchOnFirstControlSubscriber<T>
extends Operators.DeferredSubscription
static class SwitchOnFirstControlSubscriber<T> extends Operators.DeferredSubscription
implements InnerOperator<T, T>, CoreSubscriber<T> {

final AbstractSwitchOnFirstMain<?, T> parent;
Expand Down

0 comments on commit 14c5f49

Please sign in to comment.