From c494649a7f10e8a9e5183905198204c67c50430c Mon Sep 17 00:00:00 2001 From: Julien Ponge Date: Mon, 20 Sep 2021 19:17:17 +0200 Subject: [PATCH] Atomic field updater refactoring of the upstream subscription in MultiOperatorProcessor --- .../mutiny/operators/multi/MultiBufferOp.java | 12 +++--- .../multi/MultiBufferWithTimeoutOp.java | 2 +- .../operators/multi/MultiCollectorOp.java | 4 +- .../mutiny/operators/multi/MultiEmitOnOp.java | 12 +++--- .../operators/multi/MultiGroupByOp.java | 14 +++--- .../mutiny/operators/multi/MultiIgnoreOp.java | 2 +- .../operators/multi/MultiLastItemOp.java | 4 +- .../operators/multi/MultiOnFailureInvoke.java | 3 +- .../operators/multi/MultiOnItemInvoke.java | 2 +- .../operators/multi/MultiOnSubscribeCall.java | 8 ++-- .../multi/MultiOnSubscribeInvokeOp.java | 6 ++- .../multi/MultiOperatorProcessor.java | 43 ++++++++++++++----- .../operators/multi/MultiSelectFirstOp.java | 14 +++--- .../operators/multi/MultiSelectLastOp.java | 8 ++-- .../operators/multi/MultiSelectWhereOp.java | 2 +- .../operators/multi/MultiSkipFirstOp.java | 2 +- .../operators/multi/MultiSkipLastOp.java | 2 +- .../operators/multi/MultiSubscribeOnOp.java | 4 +- .../multi/MultiWindowOnDurationOp.java | 6 +-- .../mutiny/operators/multi/MultiWindowOp.java | 22 +++++----- .../overflow/MultiOnOverflowBufferOp.java | 2 +- .../overflow/MultiOnOverflowDropItemsOp.java | 4 +- .../overflow/MultiOnOverflowKeepLastOp.java | 2 +- 23 files changed, 104 insertions(+), 76 deletions(-) diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiBufferOp.java b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiBufferOp.java index 0e0c3905e..2c384a395 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiBufferOp.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiBufferOp.java @@ -84,7 +84,7 @@ static final class BufferExactProcessor extends MultiOperatorProcessor buffer = current; if (buffer != null && !buffer.isEmpty()) { @@ -179,7 +179,7 @@ public void onItem(T item) { @Override public void onFailure(Throwable t) { - Subscription subscription = upstream.getAndSet(CANCELLED); + Subscription subscription = getAndSetUpstreamSubscription(CANCELLED); if (subscription != CANCELLED) { current = null; downstream.onFailure(t); @@ -190,7 +190,7 @@ public void onFailure(Throwable t) { @Override public void onCompletion() { - Subscription subscription = upstream.getAndSet(CANCELLED); + Subscription subscription = getAndSetUpstreamSubscription(CANCELLED); if (subscription != CANCELLED) { List buffer = current; current = null; @@ -277,7 +277,7 @@ public void onItem(T item) { @Override public void onFailure(Throwable t) { - Subscription subscription = upstream.getAndSet(CANCELLED); + Subscription subscription = getAndSetUpstreamSubscription(CANCELLED); if (subscription != CANCELLED) { downstream.onFailure(t); } else { @@ -287,7 +287,7 @@ public void onFailure(Throwable t) { @Override public void onCompletion() { - Subscription subscription = upstream.getAndSet(CANCELLED); + Subscription subscription = getAndSetUpstreamSubscription(CANCELLED); if (subscription != CANCELLED) { long p = produced; if (p != 0L) { diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiBufferWithTimeoutOp.java b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiBufferWithTimeoutOp.java index cb0cd1edc..449b7479a 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiBufferWithTimeoutOp.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiBufferWithTimeoutOp.java @@ -245,7 +245,7 @@ public void onFailure(Throwable throwable) { @Override public void onSubscribe(Subscription subscription) { - if (upstream.compareAndSet(null, subscription)) { + if (compareAndSetUpstreamSubscription(null, subscription)) { doOnSubscribe(); downstream.onSubscribe(this); } else { diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiCollectorOp.java b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiCollectorOp.java index 4a878311a..4ef4f8346 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiCollectorOp.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiCollectorOp.java @@ -72,7 +72,7 @@ static class CollectorProcessor extends MultiOperatorProcessor { @Override public void onItem(T item) { - if (upstream.get() != CANCELLED) { + if (getUpstreamSubscription() != CANCELLED) { try { accumulator.accept(intermediate, item); } catch (Throwable ex) { @@ -83,7 +83,7 @@ public void onItem(T item) { @Override public void onCompletion() { - Subscription subscription = upstream.getAndSet(Subscriptions.CANCELLED); + Subscription subscription = getAndSetUpstreamSubscription(Subscriptions.CANCELLED); if (subscription != Subscriptions.CANCELLED) { R result; diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiEmitOnOp.java b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiEmitOnOp.java index 1b59aa69d..fcfedd7d5 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiEmitOnOp.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiEmitOnOp.java @@ -84,7 +84,7 @@ static final class MultiEmitOnProcessor extends MultiOperatorProcessor @Override public void onSubscribe(Subscription subscription) { - if (upstream.compareAndSet(null, subscription)) { + if (compareAndSetUpstreamSubscription(null, subscription)) { downstream.onSubscribe(this); subscription.request(16); } else { @@ -102,7 +102,7 @@ public void onItem(T t) { if (!queue.offer(t)) { // queue full, this is a failure. // onError will schedule. - Subscriptions.cancel(upstream); // cancel upstream + cancelUpstream(); // cancel upstream onFailure(new BackPressureFailure("Queue is full, the upstream didn't enforce the requests")); done = true; } else { @@ -145,7 +145,7 @@ public void cancel() { return; } cancelled = true; - Subscriptions.cancel(upstream); + cancelUpstream(); if (wip.getAndIncrement() == 0) { // nothing was currently dispatched, clearing the queue. queue.clear(); @@ -161,10 +161,10 @@ void schedule() { try { executor.execute(this); } catch (RejectedExecutionException rejected) { - Subscription s = upstream.getAndSet(CANCELLED); - if (s != CANCELLED) { + Subscription subscription = getAndSetUpstreamSubscription(CANCELLED); + if (subscription != CANCELLED) { done = true; - Subscriptions.cancel(upstream); + cancelUpstream(); queue.clear(); downstream.onFailure(rejected); super.cancel(); diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiGroupByOp.java b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiGroupByOp.java index 048549b7c..5d6955140 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiGroupByOp.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiGroupByOp.java @@ -77,7 +77,7 @@ public MultiGroupByProcessor(MultiSubscriber> downstr @Override public void onSubscribe(Subscription subscription) { - if (upstream.compareAndSet(null, subscription)) { + if (compareAndSetUpstreamSubscription(null, subscription)) { // Propagate subscription to downstream. downstream.onSubscribe(this); subscription.request(128); @@ -136,7 +136,7 @@ public void onItem(T item) { @Override public void onFailure(Throwable throwable) { - Subscription subscription = upstream.getAndSet(CANCELLED); + Subscription subscription = getAndSetUpstreamSubscription(CANCELLED); if (subscription != CANCELLED) { done = true; groups.values().forEach(group -> group.onFailure(throwable)); @@ -151,7 +151,7 @@ public void onFailure(Throwable throwable) { @Override public void onCompletion() { - Subscription subscription = upstream.getAndSet(CANCELLED); + Subscription subscription = getAndSetUpstreamSubscription(CANCELLED); if (subscription != CANCELLED) { done = true; groups.values().forEach(GroupedUnicast::onComplete); @@ -175,7 +175,7 @@ public void cancel() { // but running groups still require new values if (cancelled.compareAndSet(false, true)) { if (groupCount.decrementAndGet() == 0) { - Subscriptions.cancel(upstream); + cancelUpstream(); } } } @@ -184,7 +184,7 @@ public void cancel(K key) { Object mapKey = key != null ? key : NO_KEY; groups.remove(mapKey); if (groupCount.decrementAndGet() == 0) { - Subscriptions.cancel(upstream); + cancelUpstream(); if (wip.getAndIncrement() == 0) { queue.clear(); @@ -410,7 +410,7 @@ void drain() { if (r != Long.MAX_VALUE) { requested.addAndGet(-e); } - parent.upstream.get().request(e); + parent.getUpstreamSubscription().request(e); } } @@ -431,7 +431,7 @@ boolean hasCompleted(boolean isDone, boolean isEmpty, long emitted) { emitted++; } if (emitted != 0) { - parent.upstream.get().request(emitted); + parent.getUpstreamSubscription().request(emitted); } return true; } diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiIgnoreOp.java b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiIgnoreOp.java index 834529300..24d2d476e 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiIgnoreOp.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiIgnoreOp.java @@ -25,7 +25,7 @@ public MultiIgnoreProcessor(MultiSubscriber downstream) { @Override public void onSubscribe(Subscription subscription) { - if (upstream.compareAndSet(null, subscription)) { + if (compareAndSetUpstreamSubscription(null, subscription)) { // Propagate subscription to downstream. downstream.onSubscribe(this); subscription.request(Long.MAX_VALUE); diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiLastItemOp.java b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiLastItemOp.java index f28a40a69..c56cc4db9 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiLastItemOp.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiLastItemOp.java @@ -28,7 +28,7 @@ static final class MultiLastItemProcessor extends MultiOperatorProcessor subscriber = downstream; diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiOnFailureInvoke.java b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiOnFailureInvoke.java index 1f3b3daca..f41767ef3 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiOnFailureInvoke.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiOnFailureInvoke.java @@ -1,6 +1,7 @@ package io.smallrye.mutiny.operators.multi; import static io.smallrye.mutiny.helpers.ParameterValidation.nonNull; +import static io.smallrye.mutiny.helpers.Subscriptions.CANCELLED; import java.util.function.Consumer; import java.util.function.Predicate; @@ -37,7 +38,7 @@ public MultiOnFailureInvokeProcessor(MultiSubscriber downstream) { @Override public void onFailure(Throwable failure) { - Subscription up = upstream.getAndSet(Subscriptions.CANCELLED); + Subscription up = getAndSetUpstreamSubscription(CANCELLED); MultiSubscriber subscriber = downstream; if (up != Subscriptions.CANCELLED) { try { diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiOnItemInvoke.java b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiOnItemInvoke.java index 21b89f43d..6f603fdc1 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiOnItemInvoke.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiOnItemInvoke.java @@ -30,7 +30,7 @@ public MultiOnItemInvokeProcessor(MultiSubscriber downstream) { @Override public void onItem(T item) { - if (upstream.get() != Subscriptions.CANCELLED) { + if (getUpstreamSubscription() != Subscriptions.CANCELLED) { MultiSubscriber subscriber = this.downstream; try { callback.accept(item); diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiOnSubscribeCall.java b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiOnSubscribeCall.java index 4539e48d5..8f4cc447c 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiOnSubscribeCall.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiOnSubscribeCall.java @@ -1,5 +1,7 @@ package io.smallrye.mutiny.operators.multi; +import static io.smallrye.mutiny.helpers.Subscriptions.CANCELLED; + import java.util.Objects; import java.util.function.Function; @@ -44,7 +46,7 @@ private final class OnSubscribeSubscriber extends MultiOperatorProcessor { @Override public void onSubscribe(Subscription s) { - if (upstream.compareAndSet(null, s)) { + if (compareAndSetUpstreamSubscription(null, s)) { try { Uni uni = Objects.requireNonNull(onSubscribe.apply(s), "The produced Uni must not be `null`"); uni @@ -52,11 +54,11 @@ public void onSubscribe(Subscription s) { ignored -> downstream.onSubscribe(this), failure -> { Subscriptions.fail(downstream, failure); - upstream.getAndSet(Subscriptions.CANCELLED).cancel(); + getAndSetUpstreamSubscription(CANCELLED).cancel(); }); } catch (Throwable e) { Subscriptions.fail(downstream, e); - upstream.getAndSet(Subscriptions.CANCELLED).cancel(); + getAndSetUpstreamSubscription(CANCELLED).cancel(); } } else { s.cancel(); diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiOnSubscribeInvokeOp.java b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiOnSubscribeInvokeOp.java index 99d3216c5..bb9b0b91e 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiOnSubscribeInvokeOp.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiOnSubscribeInvokeOp.java @@ -1,5 +1,7 @@ package io.smallrye.mutiny.operators.multi; +import static io.smallrye.mutiny.helpers.Subscriptions.CANCELLED; + import java.util.Objects; import java.util.function.Consumer; @@ -42,12 +44,12 @@ private final class OnSubscribeSubscriber extends MultiOperatorProcessor { @Override public void onSubscribe(Subscription s) { - if (upstream.compareAndSet(null, s)) { + if (compareAndSetUpstreamSubscription(null, s)) { try { onSubscribe.accept(s); } catch (Throwable e) { Subscriptions.fail(downstream, e); - upstream.getAndSet(Subscriptions.CANCELLED).cancel(); + getAndSetUpstreamSubscription(CANCELLED).cancel(); return; } downstream.onSubscribe(this); diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiOperatorProcessor.java b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiOperatorProcessor.java index 7f462348e..555c3df01 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiOperatorProcessor.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiOperatorProcessor.java @@ -3,12 +3,11 @@ import static io.smallrye.mutiny.helpers.Subscriptions.CANCELLED; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import org.reactivestreams.Subscription; import io.smallrye.mutiny.helpers.ParameterValidation; -import io.smallrye.mutiny.helpers.Subscriptions; import io.smallrye.mutiny.infrastructure.Infrastructure; import io.smallrye.mutiny.subscription.MultiSubscriber; @@ -16,23 +15,38 @@ public abstract class MultiOperatorProcessor implements MultiSubscriber // Cannot be final, the TCK checks it gets released. protected volatile MultiSubscriber downstream; - protected AtomicReference upstream = new AtomicReference<>(); + protected volatile Subscription upstream = null; AtomicBoolean hasDownstreamCancelled = new AtomicBoolean(); + private static final AtomicReferenceFieldUpdater UPSTREAM_UPDATER = AtomicReferenceFieldUpdater + .newUpdater(MultiOperatorProcessor.class, Subscription.class, "upstream"); + public MultiOperatorProcessor(MultiSubscriber downstream) { this.downstream = ParameterValidation.nonNull(downstream, "downstream"); } void failAndCancel(Throwable throwable) { - Subscription subscription = upstream.get(); + Subscription subscription = getUpstreamSubscription(); if (subscription != null) { subscription.cancel(); } onFailure(throwable); } + protected Subscription getUpstreamSubscription() { + return upstream; + } + + protected boolean compareAndSetUpstreamSubscription(Subscription expectedValue, Subscription newValue) { + return UPSTREAM_UPDATER.compareAndSet(this, expectedValue, newValue); + } + + protected Subscription getAndSetUpstreamSubscription(Subscription newValue) { + return UPSTREAM_UPDATER.getAndSet(this, newValue); + } + protected boolean isDone() { - return upstream.get() == CANCELLED; + return getUpstreamSubscription() == CANCELLED; } protected boolean isCancelled() { @@ -41,7 +55,7 @@ protected boolean isCancelled() { @Override public void onSubscribe(Subscription subscription) { - if (upstream.compareAndSet(null, subscription)) { + if (compareAndSetUpstreamSubscription(null, subscription)) { // Propagate subscription to downstream. downstream.onSubscribe(this); } else { @@ -51,7 +65,7 @@ public void onSubscribe(Subscription subscription) { @Override public void onFailure(Throwable throwable) { - Subscription subscription = upstream.getAndSet(CANCELLED); + Subscription subscription = getAndSetUpstreamSubscription(CANCELLED); if (subscription != CANCELLED) { downstream.onFailure(throwable); } else { @@ -62,7 +76,7 @@ public void onFailure(Throwable throwable) { @SuppressWarnings("unchecked") @Override public void onItem(I item) { - Subscription subscription = upstream.get(); + Subscription subscription = getUpstreamSubscription(); if (subscription != CANCELLED) { downstream.onItem((O) item); } @@ -70,7 +84,7 @@ public void onItem(I item) { @Override public void onCompletion() { - Subscription subscription = upstream.getAndSet(CANCELLED); + Subscription subscription = getAndSetUpstreamSubscription(CANCELLED); if (subscription != CANCELLED) { downstream.onCompletion(); } @@ -78,7 +92,7 @@ public void onCompletion() { @Override public void request(long numberOfItems) { - Subscription subscription = upstream.get(); + Subscription subscription = getUpstreamSubscription(); if (subscription != CANCELLED) { if (numberOfItems <= 0) { onFailure(new IllegalArgumentException("Invalid number of request, must be greater than 0")); @@ -91,11 +105,18 @@ public void request(long numberOfItems) { @Override public void cancel() { if (hasDownstreamCancelled.compareAndSet(false, true)) { - Subscriptions.cancel(upstream); + cancelUpstream(); cleanup(); } } + protected void cancelUpstream() { + Subscription actual = UPSTREAM_UPDATER.getAndSet(this, CANCELLED); + if (actual != null && actual != CANCELLED) { + actual.cancel(); + } + } + protected void cleanup() { downstream = null; } diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiSelectFirstOp.java b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiSelectFirstOp.java index e6f8a6e07..f608e1086 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiSelectFirstOp.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiSelectFirstOp.java @@ -1,5 +1,7 @@ package io.smallrye.mutiny.operators.multi; +import static io.smallrye.mutiny.helpers.Subscriptions.CANCELLED; + import java.util.concurrent.atomic.AtomicInteger; import org.reactivestreams.Subscription; @@ -46,9 +48,9 @@ static final class MultiSelectFirstProcessor extends MultiOperatorProcessor= this.numberOfItems) { actual.request(Long.MAX_VALUE); diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiSelectLastOp.java b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiSelectLastOp.java index e6c170c3b..5347054b8 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiSelectLastOp.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiSelectLastOp.java @@ -42,7 +42,7 @@ static final class TakeSelectLastZeroProcessor extends MultiOperatorProcessor @Override public void onSubscribe(Subscription subscription) { - if (upstream.compareAndSet(null, subscription)) { + if (compareAndSetUpstreamSubscription(null, subscription)) { // Propagate subscription to downstream. downstream.onSubscribe(this); // Dropping all values. @@ -80,7 +80,7 @@ public void request(long n) { @Override public void onSubscribe(Subscription subscription) { - if (upstream.compareAndSet(null, subscription)) { + if (compareAndSetUpstreamSubscription(null, subscription)) { // Propagate subscription to downstream. downstream.onSubscribe(this); subscription.request(Long.MAX_VALUE); @@ -108,14 +108,14 @@ private void drain() { if (wip.getAndIncrement() == 0) { long req = requested.get(); do { - if (upstream.get() == Subscriptions.CANCELLED) { + if (getUpstreamSubscription() == Subscriptions.CANCELLED) { return; } if (upstreamCompleted) { long count = 0L; while (count != req) { - if (upstream.get() == Subscriptions.CANCELLED) { + if (getUpstreamSubscription() == Subscriptions.CANCELLED) { return; } T item = queue.poll(); diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiSelectWhereOp.java b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiSelectWhereOp.java index 819ffb67d..9c89974fd 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiSelectWhereOp.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiSelectWhereOp.java @@ -63,7 +63,7 @@ public void onItem(T t) { @Override public void request(long numberOfItems) { - Subscription subscription = upstream.get(); + Subscription subscription = getUpstreamSubscription(); if (subscription != CANCELLED) { if (numberOfItems <= 0) { onFailure(new IllegalArgumentException("Invalid number of request, must be greater than 0")); diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiSkipFirstOp.java b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiSkipFirstOp.java index c044e352c..284e8a0bf 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiSkipFirstOp.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiSkipFirstOp.java @@ -42,7 +42,7 @@ static final class SkipFirstProcessor extends MultiOperatorProcessor { @Override public void onSubscribe(Subscription subscription) { - if (upstream.compareAndSet(null, subscription)) { + if (compareAndSetUpstreamSubscription(null, subscription)) { downstream.onSubscribe(this); long l = remaining.get(); // Do not request 0 diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiSkipLastOp.java b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiSkipLastOp.java index f1efcc1ff..c89f0f24a 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiSkipLastOp.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiSkipLastOp.java @@ -44,7 +44,7 @@ static final class SkipLastProcessor @Override public void onSubscribe(Subscription subscription) { - if (upstream.compareAndSet(null, subscription)) { + if (compareAndSetUpstreamSubscription(null, subscription)) { // Propagate subscription to downstream. downstream.onSubscribe(this); subscription.request(numberOfItems); diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiSubscribeOnOp.java b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiSubscribeOnOp.java index 599c1f36b..e64269d0e 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiSubscribeOnOp.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiSubscribeOnOp.java @@ -58,7 +58,7 @@ static final class SubscribeOnProcessor extends MultiOperatorProcessor @Override public void onSubscribe(Subscription subscription) { - if (upstream.compareAndSet(null, subscription)) { + if (compareAndSetUpstreamSubscription(null, subscription)) { downstream.onSubscribe(this); } else { subscription.cancel(); @@ -91,7 +91,7 @@ public void onItem(T t) { @Override public void request(long n) { if (n > 0) { - Subscription subscription = upstream.get(); + Subscription subscription = getUpstreamSubscription(); requestUpstream(n, subscription); } } diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiWindowOnDurationOp.java b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiWindowOnDurationOp.java index 36e1a34fa..b6466417c 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiWindowOnDurationOp.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiWindowOnDurationOp.java @@ -66,7 +66,7 @@ static final class WindowTimeoutSubscriber extends MultiOperatorProcessor { if (count.decrementAndGet() == 0) { - upstream.get().cancel(); + getUpstreamSubscription().cancel(); } }); @@ -127,7 +127,7 @@ public void onItem(T t) { @Override public void onFailure(Throwable failure) { - Subscription subscription = upstream.getAndSet(CANCELLED); + Subscription subscription = getAndSetUpstreamSubscription(CANCELLED); if (subscription != CANCELLED) { UnicastProcessor proc = processor; if (proc != null) { @@ -142,7 +142,7 @@ public void onFailure(Throwable failure) { @Override public void onCompletion() { - Subscription subscription = upstream.getAndSet(CANCELLED); + Subscription subscription = getAndSetUpstreamSubscription(CANCELLED); if (subscription != CANCELLED) { UnicastProcessor proc = processor; if (proc != null) { @@ -164,7 +164,7 @@ public void request(long n) { public void cancel() { if (hasDownstreamCancelled.compareAndSet(false, true)) { if (count.decrementAndGet() == 0) { - upstream.get().cancel(); + getUpstreamSubscription().cancel(); } } } @@ -205,7 +205,7 @@ public void onItem(T t) { count.getAndIncrement(); proc = UnicastProcessor.create(supplier.get(), () -> { if (count.decrementAndGet() == 0) { - upstream.get().cancel(); + getUpstreamSubscription().cancel(); } }); processor = proc; @@ -234,7 +234,7 @@ public void onItem(T t) { @Override public void onFailure(Throwable failure) { - Subscription subscription = upstream.getAndSet(CANCELLED); + Subscription subscription = getAndSetUpstreamSubscription(CANCELLED); if (subscription != CANCELLED) { Processor proc = processor; if (proc != null) { @@ -249,7 +249,7 @@ public void onFailure(Throwable failure) { @Override public void onCompletion() { - Subscription subscription = upstream.getAndSet(CANCELLED); + Subscription subscription = getAndSetUpstreamSubscription(CANCELLED); if (subscription != CANCELLED) { Processor proc = processor; if (proc != null) { @@ -278,7 +278,7 @@ public void request(long n) { public void cancel() { if (hasDownstreamCancelled.compareAndSet(false, true)) { if (count.decrementAndGet() == 0) { - upstream.get().cancel(); + getUpstreamSubscription().cancel(); } } } @@ -357,7 +357,7 @@ public void onItem(T t) { @Override public void onFailure(Throwable f) { - Subscription subscription = upstream.getAndSet(CANCELLED); + Subscription subscription = getAndSetUpstreamSubscription(CANCELLED); if (subscription != CANCELLED) { for (UnicastProcessor proc : processors) { proc.onError(f); @@ -372,7 +372,7 @@ public void onFailure(Throwable f) { @Override public void onCompletion() { - Subscription subscription = upstream.getAndSet(CANCELLED); + Subscription subscription = getAndSetUpstreamSubscription(CANCELLED); if (subscription != CANCELLED) { for (UnicastProcessor proc : processors) { proc.onComplete(); @@ -475,7 +475,7 @@ public void cancel() { @Override public void run() { if (count.decrementAndGet() == 0) { - upstream.get().cancel(); + getUpstreamSubscription().cancel(); } } } diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/overflow/MultiOnOverflowBufferOp.java b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/overflow/MultiOnOverflowBufferOp.java index 0cb8b5407..1950735c9 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/overflow/MultiOnOverflowBufferOp.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/overflow/MultiOnOverflowBufferOp.java @@ -61,7 +61,7 @@ class OnOverflowBufferProcessor extends MultiOperatorProcessor { @Override public void onSubscribe(Subscription subscription) { - if (upstream.compareAndSet(null, subscription)) { + if (compareAndSetUpstreamSubscription(null, subscription)) { downstream.onSubscribe(this); subscription.request(Long.MAX_VALUE); } else { diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/overflow/MultiOnOverflowDropItemsOp.java b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/overflow/MultiOnOverflowDropItemsOp.java index 1ecb12ce3..f0dc4ba7c 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/overflow/MultiOnOverflowDropItemsOp.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/overflow/MultiOnOverflowDropItemsOp.java @@ -50,9 +50,9 @@ class MultiOnOverflowDropItemsProcessor extends MultiOperatorProcessor { @Override public void onSubscribe(Subscription subscription) { - if (upstream.compareAndSet(null, subscription)) { + if (compareAndSetUpstreamSubscription(null, subscription)) { downstream.onSubscribe(this); - upstream.get().request(Long.MAX_VALUE); + getUpstreamSubscription().request(Long.MAX_VALUE); } else { subscription.cancel(); } diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/overflow/MultiOnOverflowKeepLastOp.java b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/overflow/MultiOnOverflowKeepLastOp.java index cd97be17d..ca257e8c6 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/overflow/MultiOnOverflowKeepLastOp.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/overflow/MultiOnOverflowKeepLastOp.java @@ -50,7 +50,7 @@ class MultiOnOverflowLatestProcessor extends MultiOperatorProcessor { @Override public void onSubscribe(Subscription subscription) { - if (upstream.compareAndSet(null, subscription)) { + if (compareAndSetUpstreamSubscription(null, subscription)) { downstream.onSubscribe(this); subscription.request(Long.MAX_VALUE); } else {