Skip to content

Commit

Permalink
Atomic field updater refactoring of the upstream subscription in Mult…
Browse files Browse the repository at this point in the history
…iOperatorProcessor
  • Loading branch information
jponge committed Sep 20, 2021
1 parent bd343af commit c494649
Show file tree
Hide file tree
Showing 23 changed files with 104 additions and 76 deletions.
Expand Up @@ -84,7 +84,7 @@ static final class BufferExactProcessor<T> extends MultiOperatorProcessor<T, Lis

@Override
public void request(long n) {
Subscription subscription = upstream.get();
Subscription subscription = getUpstreamSubscription();
if (subscription != CANCELLED) {
subscription.request(Subscriptions.multiply(n, size));
}
Expand All @@ -110,7 +110,7 @@ public void onItem(T t) {

@Override
public void onCompletion() {
Subscription subscription = upstream.getAndSet(CANCELLED);
Subscription subscription = getAndSetUpstreamSubscription(CANCELLED);
if (subscription != CANCELLED) {
List<T> buffer = current;
if (buffer != null && !buffer.isEmpty()) {
Expand Down Expand Up @@ -179,7 +179,7 @@ public void onItem(T item) {

@Override
public void onFailure(Throwable t) {
Subscription subscription = upstream.getAndSet(CANCELLED);
Subscription subscription = getAndSetUpstreamSubscription(CANCELLED);
if (subscription != CANCELLED) {
current = null;
downstream.onFailure(t);
Expand All @@ -190,7 +190,7 @@ public void onFailure(Throwable t) {

@Override
public void onCompletion() {
Subscription subscription = upstream.getAndSet(CANCELLED);
Subscription subscription = getAndSetUpstreamSubscription(CANCELLED);
if (subscription != CANCELLED) {
List<T> buffer = current;
current = null;
Expand Down Expand Up @@ -277,7 +277,7 @@ public void onItem(T item) {

@Override
public void onFailure(Throwable t) {
Subscription subscription = upstream.getAndSet(CANCELLED);
Subscription subscription = getAndSetUpstreamSubscription(CANCELLED);
if (subscription != CANCELLED) {
downstream.onFailure(t);
} else {
Expand All @@ -287,7 +287,7 @@ public void onFailure(Throwable t) {

@Override
public void onCompletion() {
Subscription subscription = upstream.getAndSet(CANCELLED);
Subscription subscription = getAndSetUpstreamSubscription(CANCELLED);
if (subscription != CANCELLED) {
long p = produced;
if (p != 0L) {
Expand Down
Expand Up @@ -245,7 +245,7 @@ public void onFailure(Throwable throwable) {

@Override
public void onSubscribe(Subscription subscription) {
if (upstream.compareAndSet(null, subscription)) {
if (compareAndSetUpstreamSubscription(null, subscription)) {
doOnSubscribe();
downstream.onSubscribe(this);
} else {
Expand Down
Expand Up @@ -72,7 +72,7 @@ static class CollectorProcessor<T, A, R> extends MultiOperatorProcessor<T, R> {

@Override
public void onItem(T item) {
if (upstream.get() != CANCELLED) {
if (getUpstreamSubscription() != CANCELLED) {
try {
accumulator.accept(intermediate, item);
} catch (Throwable ex) {
Expand All @@ -83,7 +83,7 @@ public void onItem(T item) {

@Override
public void onCompletion() {
Subscription subscription = upstream.getAndSet(Subscriptions.CANCELLED);
Subscription subscription = getAndSetUpstreamSubscription(Subscriptions.CANCELLED);
if (subscription != Subscriptions.CANCELLED) {
R result;

Expand Down
Expand Up @@ -84,7 +84,7 @@ static final class MultiEmitOnProcessor<T> extends MultiOperatorProcessor<T, T>

@Override
public void onSubscribe(Subscription subscription) {
if (upstream.compareAndSet(null, subscription)) {
if (compareAndSetUpstreamSubscription(null, subscription)) {
downstream.onSubscribe(this);
subscription.request(16);
} else {
Expand All @@ -102,7 +102,7 @@ public void onItem(T t) {
if (!queue.offer(t)) {
// queue full, this is a failure.
// onError will schedule.
Subscriptions.cancel(upstream); // cancel upstream
cancelUpstream(); // cancel upstream
onFailure(new BackPressureFailure("Queue is full, the upstream didn't enforce the requests"));
done = true;
} else {
Expand Down Expand Up @@ -145,7 +145,7 @@ public void cancel() {
return;
}
cancelled = true;
Subscriptions.cancel(upstream);
cancelUpstream();
if (wip.getAndIncrement() == 0) {
// nothing was currently dispatched, clearing the queue.
queue.clear();
Expand All @@ -161,10 +161,10 @@ void schedule() {
try {
executor.execute(this);
} catch (RejectedExecutionException rejected) {
Subscription s = upstream.getAndSet(CANCELLED);
if (s != CANCELLED) {
Subscription subscription = getAndSetUpstreamSubscription(CANCELLED);
if (subscription != CANCELLED) {
done = true;
Subscriptions.cancel(upstream);
cancelUpstream();
queue.clear();
downstream.onFailure(rejected);
super.cancel();
Expand Down
Expand Up @@ -77,7 +77,7 @@ public MultiGroupByProcessor(MultiSubscriber<? super GroupedMulti<K, V>> downstr

@Override
public void onSubscribe(Subscription subscription) {
if (upstream.compareAndSet(null, subscription)) {
if (compareAndSetUpstreamSubscription(null, subscription)) {
// Propagate subscription to downstream.
downstream.onSubscribe(this);
subscription.request(128);
Expand Down Expand Up @@ -136,7 +136,7 @@ public void onItem(T item) {

@Override
public void onFailure(Throwable throwable) {
Subscription subscription = upstream.getAndSet(CANCELLED);
Subscription subscription = getAndSetUpstreamSubscription(CANCELLED);
if (subscription != CANCELLED) {
done = true;
groups.values().forEach(group -> group.onFailure(throwable));
Expand All @@ -151,7 +151,7 @@ public void onFailure(Throwable throwable) {

@Override
public void onCompletion() {
Subscription subscription = upstream.getAndSet(CANCELLED);
Subscription subscription = getAndSetUpstreamSubscription(CANCELLED);
if (subscription != CANCELLED) {
done = true;
groups.values().forEach(GroupedUnicast::onComplete);
Expand All @@ -175,7 +175,7 @@ public void cancel() {
// but running groups still require new values
if (cancelled.compareAndSet(false, true)) {
if (groupCount.decrementAndGet() == 0) {
Subscriptions.cancel(upstream);
cancelUpstream();
}
}
}
Expand All @@ -184,7 +184,7 @@ public void cancel(K key) {
Object mapKey = key != null ? key : NO_KEY;
groups.remove(mapKey);
if (groupCount.decrementAndGet() == 0) {
Subscriptions.cancel(upstream);
cancelUpstream();

if (wip.getAndIncrement() == 0) {
queue.clear();
Expand Down Expand Up @@ -410,7 +410,7 @@ void drain() {
if (r != Long.MAX_VALUE) {
requested.addAndGet(-e);
}
parent.upstream.get().request(e);
parent.getUpstreamSubscription().request(e);
}
}

Expand All @@ -431,7 +431,7 @@ boolean hasCompleted(boolean isDone, boolean isEmpty, long emitted) {
emitted++;
}
if (emitted != 0) {
parent.upstream.get().request(emitted);
parent.getUpstreamSubscription().request(emitted);
}
return true;
}
Expand Down
Expand Up @@ -25,7 +25,7 @@ public MultiIgnoreProcessor(MultiSubscriber<? super Void> downstream) {

@Override
public void onSubscribe(Subscription subscription) {
if (upstream.compareAndSet(null, subscription)) {
if (compareAndSetUpstreamSubscription(null, subscription)) {
// Propagate subscription to downstream.
downstream.onSubscribe(this);
subscription.request(Long.MAX_VALUE);
Expand Down
Expand Up @@ -28,7 +28,7 @@ static final class MultiLastItemProcessor<T> extends MultiOperatorProcessor<T, T

@Override
public void onSubscribe(Subscription subscription) {
if (upstream.compareAndSet(null, subscription)) {
if (compareAndSetUpstreamSubscription(null, subscription)) {
downstream.onSubscribe(this);
subscription.request(Long.MAX_VALUE);
} else {
Expand All @@ -54,7 +54,7 @@ public void onFailure(Throwable failure) {

@Override
public void onCompletion() {
Subscription subscription = upstream.getAndSet(CANCELLED);
Subscription subscription = getAndSetUpstreamSubscription(CANCELLED);
if (subscription != CANCELLED) {
T item = last;
MultiSubscriber<? super T> subscriber = downstream;
Expand Down
@@ -1,6 +1,7 @@
package io.smallrye.mutiny.operators.multi;

import static io.smallrye.mutiny.helpers.ParameterValidation.nonNull;
import static io.smallrye.mutiny.helpers.Subscriptions.CANCELLED;

import java.util.function.Consumer;
import java.util.function.Predicate;
Expand Down Expand Up @@ -37,7 +38,7 @@ public MultiOnFailureInvokeProcessor(MultiSubscriber<? super T> downstream) {

@Override
public void onFailure(Throwable failure) {
Subscription up = upstream.getAndSet(Subscriptions.CANCELLED);
Subscription up = getAndSetUpstreamSubscription(CANCELLED);
MultiSubscriber<? super T> subscriber = downstream;
if (up != Subscriptions.CANCELLED) {
try {
Expand Down
Expand Up @@ -30,7 +30,7 @@ public MultiOnItemInvokeProcessor(MultiSubscriber<? super T> downstream) {

@Override
public void onItem(T item) {
if (upstream.get() != Subscriptions.CANCELLED) {
if (getUpstreamSubscription() != Subscriptions.CANCELLED) {
MultiSubscriber<? super T> subscriber = this.downstream;
try {
callback.accept(item);
Expand Down
@@ -1,5 +1,7 @@
package io.smallrye.mutiny.operators.multi;

import static io.smallrye.mutiny.helpers.Subscriptions.CANCELLED;

import java.util.Objects;
import java.util.function.Function;

Expand Down Expand Up @@ -44,19 +46,19 @@ private final class OnSubscribeSubscriber extends MultiOperatorProcessor<T, T> {

@Override
public void onSubscribe(Subscription s) {
if (upstream.compareAndSet(null, s)) {
if (compareAndSetUpstreamSubscription(null, s)) {
try {
Uni<?> uni = Objects.requireNonNull(onSubscribe.apply(s), "The produced Uni must not be `null`");
uni
.subscribe().with(
ignored -> downstream.onSubscribe(this),
failure -> {
Subscriptions.fail(downstream, failure);
upstream.getAndSet(Subscriptions.CANCELLED).cancel();
getAndSetUpstreamSubscription(CANCELLED).cancel();
});
} catch (Throwable e) {
Subscriptions.fail(downstream, e);
upstream.getAndSet(Subscriptions.CANCELLED).cancel();
getAndSetUpstreamSubscription(CANCELLED).cancel();
}
} else {
s.cancel();
Expand Down
@@ -1,5 +1,7 @@
package io.smallrye.mutiny.operators.multi;

import static io.smallrye.mutiny.helpers.Subscriptions.CANCELLED;

import java.util.Objects;
import java.util.function.Consumer;

Expand Down Expand Up @@ -42,12 +44,12 @@ private final class OnSubscribeSubscriber extends MultiOperatorProcessor<T, T> {

@Override
public void onSubscribe(Subscription s) {
if (upstream.compareAndSet(null, s)) {
if (compareAndSetUpstreamSubscription(null, s)) {
try {
onSubscribe.accept(s);
} catch (Throwable e) {
Subscriptions.fail(downstream, e);
upstream.getAndSet(Subscriptions.CANCELLED).cancel();
getAndSetUpstreamSubscription(CANCELLED).cancel();
return;
}
downstream.onSubscribe(this);
Expand Down

0 comments on commit c494649

Please sign in to comment.