From 2c2028168caeb1d60d8fadfd6d6f52c52447531d Mon Sep 17 00:00:00 2001 From: Sanne Grinovero Date: Tue, 24 Aug 2021 15:55:50 +0100 Subject: [PATCH 1/4] Avoid allocating excessive amounts of AtomicReference instances --- .../operators/uni/UniOperatorProcessor.java | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/uni/UniOperatorProcessor.java b/implementation/src/main/java/io/smallrye/mutiny/operators/uni/UniOperatorProcessor.java index c707c2131..bca436724 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/uni/UniOperatorProcessor.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/uni/UniOperatorProcessor.java @@ -3,7 +3,7 @@ import static io.smallrye.mutiny.helpers.EmptyUniSubscription.CANCELLED; import static io.smallrye.mutiny.helpers.EmptyUniSubscription.DONE; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import io.smallrye.mutiny.helpers.ParameterValidation; import io.smallrye.mutiny.infrastructure.Infrastructure; @@ -13,7 +13,10 @@ public abstract class UniOperatorProcessor implements UniSubscriber, UniSubscription { protected final UniSubscriber downstream; - protected final AtomicReference upstream = new AtomicReference<>(); + + private static final AtomicReferenceFieldUpdater updater = AtomicReferenceFieldUpdater.newUpdater(UniOperatorProcessor.class, UniSubscription.class, "upstream"); + + protected volatile UniSubscription upstream; public UniOperatorProcessor(UniSubscriber downstream) { this.downstream = ParameterValidation.nonNull(downstream, "downstream"); @@ -21,7 +24,7 @@ public UniOperatorProcessor(UniSubscriber downstream) { @Override public void onSubscribe(UniSubscription subscription) { - if (upstream.compareAndSet(null, subscription)) { + if (updater.compareAndSet(this, null, subscription)) { downstream.onSubscribe(this); } else { subscription.cancel(); @@ -31,7 +34,7 @@ public void onSubscribe(UniSubscription subscription) { @Override @SuppressWarnings("unchecked") public void onItem(I item) { - UniSubscription subscription = upstream.getAndSet(CANCELLED); + UniSubscription subscription = updater.getAndSet(this, CANCELLED); if (subscription != CANCELLED) { downstream.onItem((O) item); } @@ -39,7 +42,7 @@ public void onItem(I item) { @Override public void onFailure(Throwable failure) { - UniSubscription subscription = upstream.getAndSet(CANCELLED); + UniSubscription subscription = updater.getAndSet(this, CANCELLED); if (subscription != CANCELLED) { downstream.onFailure(failure); } else { @@ -49,13 +52,13 @@ public void onFailure(Throwable failure) { @Override public void cancel() { - UniSubscription subscription = upstream.getAndSet(CANCELLED); + UniSubscription subscription = updater.getAndSet(this, CANCELLED); if (subscription != null && subscription != CANCELLED && subscription != DONE) { subscription.cancel(); } } public boolean isCancelled() { - return upstream.get() == CANCELLED; + return upstream == CANCELLED; } } From f77575bfc3330c0e1a99792f1aa5f9dbf6cf853f Mon Sep 17 00:00:00 2001 From: Sanne Grinovero Date: Tue, 24 Aug 2021 16:32:31 +0100 Subject: [PATCH 2/4] Improve abstraction over internal state of UniOperatorProcessor --- .../mutiny/operators/uni/UniFailOnTimeout.java | 6 +++--- .../operators/uni/UniOnCancellation.java | 2 +- .../operators/uni/UniOnCancellationCall.java | 2 +- .../operators/uni/UniOnFailureFlatMap.java | 2 +- .../uni/UniOnItemOrFailureFlatMap.java | 2 +- .../operators/uni/UniOnItemTransformToUni.java | 2 +- .../operators/uni/UniOperatorProcessor.java | 18 ++++++++++++++++-- .../mutiny/operators/uni/UniRetryAtMost.java | 4 ++-- 8 files changed, 26 insertions(+), 12 deletions(-) diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/uni/UniFailOnTimeout.java b/implementation/src/main/java/io/smallrye/mutiny/operators/uni/UniFailOnTimeout.java index 36344ea65..a70381020 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/uni/UniFailOnTimeout.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/uni/UniFailOnTimeout.java @@ -50,7 +50,7 @@ public void onSubscribe(UniSubscription subscription) { timeoutFuture = executor.schedule(this::doTimeout, timeout.toMillis(), TimeUnit.MILLISECONDS); } catch (RejectedExecutionException e) { // Executor out of service. - upstream.set(CANCELLED); + getAndSetUpstreamSubscription(CANCELLED); subscription.cancel(); downstream.onSubscribe(EmptyUniSubscription.DONE); downstream.onFailure(e); @@ -61,7 +61,7 @@ public void onSubscribe(UniSubscription subscription) { @Override public void onItem(I item) { - UniSubscription sub = upstream.getAndSet(CANCELLED); + UniSubscription sub = getAndSetUpstreamSubscription(CANCELLED); if (sub != CANCELLED) { if (timeoutFuture != null) { timeoutFuture.cancel(false); @@ -72,7 +72,7 @@ public void onItem(I item) { @Override public void onFailure(Throwable failure) { - UniSubscription sub = upstream.getAndSet(CANCELLED); + UniSubscription sub = getAndSetUpstreamSubscription(CANCELLED); if (sub != CANCELLED) { if (timeoutFuture != null) { timeoutFuture.cancel(false); diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/uni/UniOnCancellation.java b/implementation/src/main/java/io/smallrye/mutiny/operators/uni/UniOnCancellation.java index 4fa4b0cf8..6be51554b 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/uni/UniOnCancellation.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/uni/UniOnCancellation.java @@ -54,7 +54,7 @@ public void onFailure(Throwable failure) { @Override public void cancel() { if (state.compareAndSet(State.INIT, State.CANCELLED)) { - UniSubscription sub = upstream.getAndSet(CANCELLED); + UniSubscription sub = getAndSetUpstreamSubscription(CANCELLED); callback.run(); if (sub != null) { sub.cancel(); diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/uni/UniOnCancellationCall.java b/implementation/src/main/java/io/smallrye/mutiny/operators/uni/UniOnCancellationCall.java index a983cc7a2..9f417eeaf 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/uni/UniOnCancellationCall.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/uni/UniOnCancellationCall.java @@ -58,7 +58,7 @@ public void onFailure(Throwable failure) { @Override public void cancel() { if (state.compareAndSet(State.INIT, State.CANCELLED)) { - UniSubscription sub = upstream.getAndSet(CANCELLED); + UniSubscription sub = getAndSetUpstreamSubscription(CANCELLED); execute().subscribe().with( ignoredItem -> { if (sub != null) { diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/uni/UniOnFailureFlatMap.java b/implementation/src/main/java/io/smallrye/mutiny/operators/uni/UniOnFailureFlatMap.java index c249290e2..45d5e48c6 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/uni/UniOnFailureFlatMap.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/uni/UniOnFailureFlatMap.java @@ -41,7 +41,7 @@ public UniOnFailureFlatMapProcessor(UniSubscriber downstream) { @Override public void onSubscribe(UniSubscription subscription) { - if (upstream.get() == null) { + if (getCurrentUpstreamSubscription() == null) { super.onSubscribe(subscription); } else if (innerSubscription == null) { this.innerSubscription = subscription; diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/uni/UniOnItemOrFailureFlatMap.java b/implementation/src/main/java/io/smallrye/mutiny/operators/uni/UniOnItemOrFailureFlatMap.java index 30ed5eaef..55759bedd 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/uni/UniOnItemOrFailureFlatMap.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/uni/UniOnItemOrFailureFlatMap.java @@ -37,7 +37,7 @@ public UniOnItemOrFailureFlatMapProcessor(UniSubscriber downstream) { @Override public void onSubscribe(UniSubscription subscription) { - if (upstream.get() == null) { + if (getCurrentUpstreamSubscription() == null) { super.onSubscribe(subscription); } else if (innerSubscription == null) { this.innerSubscription = subscription; diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/uni/UniOnItemTransformToUni.java b/implementation/src/main/java/io/smallrye/mutiny/operators/uni/UniOnItemTransformToUni.java index 8c9881be3..dfaa66e48 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/uni/UniOnItemTransformToUni.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/uni/UniOnItemTransformToUni.java @@ -37,7 +37,7 @@ public UniOnItemTransformToUniProcessor(UniSubscriber downstream) { @Override public void onSubscribe(UniSubscription subscription) { - if (upstream.get() == null) { + if (getCurrentUpstreamSubscription() == null) { super.onSubscribe(subscription); } else if (innerSubscription == null) { this.innerSubscription = subscription; diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/uni/UniOperatorProcessor.java b/implementation/src/main/java/io/smallrye/mutiny/operators/uni/UniOperatorProcessor.java index bca436724..4ac8bfd86 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/uni/UniOperatorProcessor.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/uni/UniOperatorProcessor.java @@ -14,9 +14,10 @@ public abstract class UniOperatorProcessor implements UniSubscriber, Un protected final UniSubscriber downstream; - private static final AtomicReferenceFieldUpdater updater = AtomicReferenceFieldUpdater.newUpdater(UniOperatorProcessor.class, UniSubscription.class, "upstream"); + private static final AtomicReferenceFieldUpdater updater = AtomicReferenceFieldUpdater + .newUpdater(UniOperatorProcessor.class, UniSubscription.class, "upstream"); - protected volatile UniSubscription upstream; + private volatile UniSubscription upstream; public UniOperatorProcessor(UniSubscriber downstream) { this.downstream = ParameterValidation.nonNull(downstream, "downstream"); @@ -61,4 +62,17 @@ public void cancel() { public boolean isCancelled() { return upstream == CANCELLED; } + + protected final UniSubscription getCurrentUpstreamSubscription() { + return upstream; + } + + protected final UniSubscription getAndSetUpstreamSubscription(UniSubscription newValue) { + return updater.getAndSet(this, newValue); + } + + protected final boolean compareAndSetUpstreamSubscription(UniSubscription expect, UniSubscription update) { + return updater.compareAndSet(this, expect, update); + } + } diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/uni/UniRetryAtMost.java b/implementation/src/main/java/io/smallrye/mutiny/operators/uni/UniRetryAtMost.java index 946cfbb41..473f4b0af 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/uni/UniRetryAtMost.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/uni/UniRetryAtMost.java @@ -39,7 +39,7 @@ public UniRetryAtMostProcessor(UniSubscriber downstream) { @Override public void onSubscribe(UniSubscription subscription) { int count = counter.incrementAndGet(); - if (upstream.compareAndSet(null, subscription)) { + if (compareAndSetUpstreamSubscription(null, subscription)) { if (count == 1) { downstream.onSubscribe(this); } @@ -61,7 +61,7 @@ public void onFailure(Throwable failure) { downstream.onFailure(failure); return; } - UniSubscription previousSubscription = upstream.getAndSet(null); + UniSubscription previousSubscription = getAndSetUpstreamSubscription(null); if (previousSubscription != null) { previousSubscription.cancel(); } From 4e3f5fa4f9bc73fe0a17c5fe4d174f745ef0aa66 Mon Sep 17 00:00:00 2001 From: Julien Ponge Date: Mon, 30 Aug 2021 16:13:15 +0200 Subject: [PATCH 3/4] Use further atomic field updaters in selected places (Uni) --- .../operators/uni/UniOnCancellation.java | 24 ++++++++++++------- .../operators/uni/UniOnCancellationCall.java | 23 +++++++++++------- .../mutiny/operators/uni/UniRetryAtMost.java | 23 +++++++++++------- 3 files changed, 43 insertions(+), 27 deletions(-) diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/uni/UniOnCancellation.java b/implementation/src/main/java/io/smallrye/mutiny/operators/uni/UniOnCancellation.java index 6be51554b..73b6d43f4 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/uni/UniOnCancellation.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/uni/UniOnCancellation.java @@ -2,7 +2,7 @@ import static io.smallrye.mutiny.helpers.EmptyUniSubscription.CANCELLED; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import io.smallrye.mutiny.Uni; import io.smallrye.mutiny.operators.AbstractUni; @@ -11,6 +11,7 @@ import io.smallrye.mutiny.subscription.UniSubscription; public class UniOnCancellation extends UniOperator { + private final Runnable callback; public UniOnCancellation(Uni upstream, Runnable callback) { @@ -20,7 +21,7 @@ public UniOnCancellation(Uni upstream, Runnable callback) { @Override public void subscribe(UniSubscriber subscriber) { - AbstractUni.subscribe(upstream(), new UniOnCancellationProcessor(subscriber)); + AbstractUni.subscribe(upstream(), new UniOnCancellationProcessor(callback, subscriber)); } private enum State { @@ -29,31 +30,36 @@ private enum State { CANCELLED } - private class UniOnCancellationProcessor extends UniOperatorProcessor { + private static class UniOnCancellationProcessor extends UniOperatorProcessor { + + private final Runnable callback; - public UniOnCancellationProcessor(UniSubscriber downstream) { + private volatile State state = State.INIT; + private static final AtomicReferenceFieldUpdater stateUpdater = AtomicReferenceFieldUpdater + .newUpdater(UniOnCancellationProcessor.class, State.class, "state"); + + public UniOnCancellationProcessor(Runnable callback, UniSubscriber downstream) { super(downstream); + this.callback = callback; } - private final AtomicReference state = new AtomicReference<>(State.INIT); - @Override public void onItem(T item) { - if (state.compareAndSet(State.INIT, State.DONE)) { + if (stateUpdater.compareAndSet(this, State.INIT, State.DONE)) { downstream.onItem(item); } } @Override public void onFailure(Throwable failure) { - if (state.compareAndSet(State.INIT, State.DONE)) { + if (stateUpdater.compareAndSet(this, State.INIT, State.DONE)) { downstream.onFailure(failure); } } @Override public void cancel() { - if (state.compareAndSet(State.INIT, State.CANCELLED)) { + if (stateUpdater.compareAndSet(this, State.INIT, State.CANCELLED)) { UniSubscription sub = getAndSetUpstreamSubscription(CANCELLED); callback.run(); if (sub != null) { diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/uni/UniOnCancellationCall.java b/implementation/src/main/java/io/smallrye/mutiny/operators/uni/UniOnCancellationCall.java index 9f417eeaf..b2fa35ea5 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/uni/UniOnCancellationCall.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/uni/UniOnCancellationCall.java @@ -3,7 +3,7 @@ import static io.smallrye.mutiny.helpers.EmptyUniSubscription.CANCELLED; import static io.smallrye.mutiny.helpers.ParameterValidation.nonNull; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.function.Supplier; import io.smallrye.mutiny.Uni; @@ -24,7 +24,7 @@ public UniOnCancellationCall(Uni upstream, Supplier> supplie @Override public void subscribe(UniSubscriber subscriber) { - AbstractUni.subscribe(upstream(), new UniOnCancellationCallProcessor(subscriber)); + AbstractUni.subscribe(upstream(), new UniOnCancellationCallProcessor(supplier, subscriber)); } private enum State { @@ -33,31 +33,36 @@ private enum State { CANCELLED } - private class UniOnCancellationCallProcessor extends UniOperatorProcessor { + private static class UniOnCancellationCallProcessor extends UniOperatorProcessor { - public UniOnCancellationCallProcessor(UniSubscriber downstream) { + private final Supplier> supplier; + + private volatile State state = State.INIT; + private static final AtomicReferenceFieldUpdater stateUpdater = AtomicReferenceFieldUpdater + .newUpdater(UniOnCancellationCallProcessor.class, State.class, "state"); + + public UniOnCancellationCallProcessor(Supplier> supplier, UniSubscriber downstream) { super(downstream); + this.supplier = supplier; } - private final AtomicReference state = new AtomicReference<>(State.INIT); - @Override public void onItem(I item) { - if (state.compareAndSet(State.INIT, State.DONE)) { + if (stateUpdater.compareAndSet(this, State.INIT, State.DONE)) { downstream.onItem(item); } } @Override public void onFailure(Throwable failure) { - if (state.compareAndSet(State.INIT, State.DONE)) { + if (stateUpdater.compareAndSet(this, State.INIT, State.DONE)) { downstream.onFailure(failure); } } @Override public void cancel() { - if (state.compareAndSet(State.INIT, State.CANCELLED)) { + if (stateUpdater.compareAndSet(this, State.INIT, State.CANCELLED)) { UniSubscription sub = getAndSetUpstreamSubscription(CANCELLED); execute().subscribe().with( ignoredItem -> { diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/uni/UniRetryAtMost.java b/implementation/src/main/java/io/smallrye/mutiny/operators/uni/UniRetryAtMost.java index 473f4b0af..cf3a567fb 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/uni/UniRetryAtMost.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/uni/UniRetryAtMost.java @@ -3,7 +3,7 @@ import static io.smallrye.mutiny.helpers.ParameterValidation.nonNull; import static io.smallrye.mutiny.helpers.ParameterValidation.positive; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.function.Predicate; import io.smallrye.mutiny.Uni; @@ -25,20 +25,25 @@ public UniRetryAtMost(Uni upstream, Predicate predicate, l @Override public void subscribe(UniSubscriber subscriber) { - AbstractUni.subscribe(upstream(), new UniRetryAtMostProcessor(subscriber)); + AbstractUni.subscribe(upstream(), new UniRetryAtMostProcessor(this, subscriber)); } - private class UniRetryAtMostProcessor extends UniOperatorProcessor { + private static class UniRetryAtMostProcessor extends UniOperatorProcessor { - private final AtomicInteger counter = new AtomicInteger(0); + private final UniRetryAtMost uniRetryAtMost; - public UniRetryAtMostProcessor(UniSubscriber downstream) { + private volatile int counter = 0; + private static final AtomicIntegerFieldUpdater counterUpdater = AtomicIntegerFieldUpdater + .newUpdater(UniRetryAtMostProcessor.class, "counter"); + + public UniRetryAtMostProcessor(UniRetryAtMost uniRetryAtMost, UniSubscriber downstream) { super(downstream); + this.uniRetryAtMost = uniRetryAtMost; } @Override public void onSubscribe(UniSubscription subscription) { - int count = counter.incrementAndGet(); + int count = counterUpdater.incrementAndGet(this); if (compareAndSetUpstreamSubscription(null, subscription)) { if (count == 1) { downstream.onSubscribe(this); @@ -57,7 +62,7 @@ public void onFailure(Throwable failure) { if (!testPredicate(failure)) { return; } - if (counter.get() > maxAttempts) { + if (counter > uniRetryAtMost.maxAttempts) { downstream.onFailure(failure); return; } @@ -65,13 +70,13 @@ public void onFailure(Throwable failure) { if (previousSubscription != null) { previousSubscription.cancel(); } - AbstractUni.subscribe(upstream(), this); + AbstractUni.subscribe(uniRetryAtMost.upstream(), this); } private boolean testPredicate(Throwable failure) { boolean passes; try { - passes = predicate.test(failure); + passes = uniRetryAtMost.predicate.test(failure); } catch (Throwable e) { downstream.onFailure(e); return false; From e06ce7d22325ab64c6340770dbeae7aa95fed263 Mon Sep 17 00:00:00 2001 From: Julien Ponge Date: Tue, 31 Aug 2021 12:12:09 +0200 Subject: [PATCH 4/4] Use CaS methods in UniOperatorProcessor --- .../mutiny/operators/uni/UniOperatorProcessor.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/uni/UniOperatorProcessor.java b/implementation/src/main/java/io/smallrye/mutiny/operators/uni/UniOperatorProcessor.java index 4ac8bfd86..d44332921 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/uni/UniOperatorProcessor.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/uni/UniOperatorProcessor.java @@ -25,7 +25,7 @@ public UniOperatorProcessor(UniSubscriber downstream) { @Override public void onSubscribe(UniSubscription subscription) { - if (updater.compareAndSet(this, null, subscription)) { + if (compareAndSetUpstreamSubscription(null, subscription)) { downstream.onSubscribe(this); } else { subscription.cancel(); @@ -35,7 +35,7 @@ public void onSubscribe(UniSubscription subscription) { @Override @SuppressWarnings("unchecked") public void onItem(I item) { - UniSubscription subscription = updater.getAndSet(this, CANCELLED); + UniSubscription subscription = getAndSetUpstreamSubscription(CANCELLED); if (subscription != CANCELLED) { downstream.onItem((O) item); } @@ -43,7 +43,7 @@ public void onItem(I item) { @Override public void onFailure(Throwable failure) { - UniSubscription subscription = updater.getAndSet(this, CANCELLED); + UniSubscription subscription = getAndSetUpstreamSubscription(CANCELLED); if (subscription != CANCELLED) { downstream.onFailure(failure); } else { @@ -53,7 +53,7 @@ public void onFailure(Throwable failure) { @Override public void cancel() { - UniSubscription subscription = updater.getAndSet(this, CANCELLED); + UniSubscription subscription = getAndSetUpstreamSubscription(CANCELLED); if (subscription != null && subscription != CANCELLED && subscription != DONE) { subscription.cancel(); }