diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiOperatorProcessor.java b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiOperatorProcessor.java index 352c68be8..97d501d6a 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiOperatorProcessor.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiOperatorProcessor.java @@ -13,16 +13,21 @@ public abstract class MultiOperatorProcessor implements MultiSubscriber, Subscription { - // Cannot be final, the TCK checks it gets released. + /* + * We used to have an interpretation of the RS TCK that it had to be null on cancellation to release the subscriber. + * It's actually not necessary (and NPE-prone) since operators are instantiated per-subscription, so the *publisher* + * does not actually keep references on cancelled subscribers. + */ protected volatile MultiSubscriber downstream; + protected volatile Subscription upstream = null; - private volatile int hasDownstreamCancelled = 0; + private volatile int cancellationRequested = 0; private static final AtomicReferenceFieldUpdater UPSTREAM_UPDATER = AtomicReferenceFieldUpdater .newUpdater(MultiOperatorProcessor.class, Subscription.class, "upstream"); - private static final AtomicIntegerFieldUpdater DOWNSTREAM_CANCELLED_UPDATER = AtomicIntegerFieldUpdater - .newUpdater(MultiOperatorProcessor.class, "hasDownstreamCancelled"); + private static final AtomicIntegerFieldUpdater CANCELLATION_REQUESTED_UPDATER = AtomicIntegerFieldUpdater + .newUpdater(MultiOperatorProcessor.class, "cancellationRequested"); public MultiOperatorProcessor(MultiSubscriber downstream) { this.downstream = ParameterValidation.nonNull(downstream, "downstream"); @@ -53,7 +58,7 @@ protected boolean isDone() { } protected boolean isCancelled() { - return hasDownstreamCancelled == 1; + return cancellationRequested == 1; } @Override @@ -107,25 +112,20 @@ public void request(long numberOfItems) { @Override public void cancel() { - if (atomicallyFlipDownstreamHasCancelled()) { + if (compareAndSwapDownstreamCancellationRequest()) { cancelUpstream(); - cleanup(); } } - protected final boolean atomicallyFlipDownstreamHasCancelled() { - return DOWNSTREAM_CANCELLED_UPDATER.compareAndSet(this, 0, 1); + protected final boolean compareAndSwapDownstreamCancellationRequest() { + return CANCELLATION_REQUESTED_UPDATER.compareAndSet(this, 0, 1); } protected void cancelUpstream() { + this.cancellationRequested = 1; Subscription actual = UPSTREAM_UPDATER.getAndSet(this, CANCELLED); if (actual != null && actual != CANCELLED) { actual.cancel(); } } - - protected void cleanup() { - downstream = null; - } - } diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiWindowOp.java b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiWindowOp.java index d5623d7d0..a02be4257 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiWindowOp.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiWindowOp.java @@ -162,7 +162,7 @@ public void request(long n) { @Override public void cancel() { - if (atomicallyFlipDownstreamHasCancelled()) { + if (compareAndSwapDownstreamCancellationRequest()) { if (count.decrementAndGet() == 0) { getUpstreamSubscription().cancel(); } @@ -276,7 +276,7 @@ public void request(long n) { @Override public void cancel() { - if (atomicallyFlipDownstreamHasCancelled()) { + if (compareAndSwapDownstreamCancellationRequest()) { if (count.decrementAndGet() == 0) { getUpstreamSubscription().cancel(); } @@ -467,7 +467,7 @@ public void request(long n) { @Override public void cancel() { - if (atomicallyFlipDownstreamHasCancelled()) { + if (compareAndSwapDownstreamCancellationRequest()) { run(); } } diff --git a/implementation/src/test/java/io/smallrye/mutiny/BugReproducersTest.java b/implementation/src/test/java/io/smallrye/mutiny/BugReproducersTest.java index 51f011502..d6c10493b 100644 --- a/implementation/src/test/java/io/smallrye/mutiny/BugReproducersTest.java +++ b/implementation/src/test/java/io/smallrye/mutiny/BugReproducersTest.java @@ -1,18 +1,28 @@ package io.smallrye.mutiny; import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import org.junit.jupiter.api.RepeatedTest; +import org.junit.jupiter.api.Test; import io.smallrye.mutiny.helpers.test.AssertSubscriber; import io.smallrye.mutiny.infrastructure.Infrastructure; -public class BugReproducersTest { +class BugReproducersTest { @RepeatedTest(100) - public void reproducer_689() { + void reproducer_689() { // Adapted from https://github.com/smallrye/smallrye-mutiny/issues/689 AtomicLong src = new AtomicLong(); @@ -26,4 +36,43 @@ public void reproducer_689() { sub.awaitCompletion(); assertThat(sub.getItems()).hasSize(9_999); } + + @Test + void reproducer_705() { + // Adapted from https://github.com/smallrye/smallrye-mutiny/issues/705 + // The issue was an over-interpretation of one of the RS TCK rule regarding releasing subscriber references. + AssertSubscriber> sub = AssertSubscriber.create(); + AtomicInteger counter = new AtomicInteger(); + AtomicReference threadFailure = new AtomicReference<>(); + + ExecutorService threadPool = Executors.newFixedThreadPool(4, new ThreadFactory() { + @Override + public Thread newThread(Runnable task) { + Thread thread = Executors.defaultThreadFactory().newThread(task); + thread.setUncaughtExceptionHandler((t, e) -> { + e.printStackTrace(); + threadFailure.set(e); + }); + return thread; + } + }); + + Multi.createFrom().range(0, 1000) + .emitOn(threadPool) + .group().intoLists().of(100) + .onItem().invoke(() -> { + if (counter.incrementAndGet() == 3) { + sub.cancel(); + } + }) + .runSubscriptionOn(threadPool) + .subscribe().withSubscriber(sub); + + sub.request(Long.MAX_VALUE); + await().atMost(5, TimeUnit.SECONDS).untilAtomic(counter, greaterThanOrEqualTo(3)); + + assertThat(threadFailure.get()).isNull(); + sub.assertNotTerminated(); + threadPool.shutdownNow(); + } }