From 408d0b7c76251806725c5f0d27e31c1d1f7aaa68 Mon Sep 17 00:00:00 2001 From: Julien Ponge Date: Thu, 7 Oct 2021 18:52:33 +0200 Subject: [PATCH] No need to release downstream subscriber references We used to have an interpretation of the RS TCK that it had to be null on cancellation to release the subscriber. It's actually not necessary (and NPE-prone) since operators are instantiated per-subscription, so the *publisher* does not actually keep references on cancelled subscribers. Fixes #705 --- .../multi/MultiOperatorProcessor.java | 28 +++++----- .../mutiny/operators/multi/MultiWindowOp.java | 6 +-- .../smallrye/mutiny/BugReproducersTest.java | 53 ++++++++++++++++++- 3 files changed, 68 insertions(+), 19 deletions(-) diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiOperatorProcessor.java b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiOperatorProcessor.java index 352c68be8..97d501d6a 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiOperatorProcessor.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiOperatorProcessor.java @@ -13,16 +13,21 @@ public abstract class MultiOperatorProcessor implements MultiSubscriber, Subscription { - // Cannot be final, the TCK checks it gets released. + /* + * We used to have an interpretation of the RS TCK that it had to be null on cancellation to release the subscriber. + * It's actually not necessary (and NPE-prone) since operators are instantiated per-subscription, so the *publisher* + * does not actually keep references on cancelled subscribers. + */ protected volatile MultiSubscriber downstream; + protected volatile Subscription upstream = null; - private volatile int hasDownstreamCancelled = 0; + private volatile int cancellationRequested = 0; private static final AtomicReferenceFieldUpdater UPSTREAM_UPDATER = AtomicReferenceFieldUpdater .newUpdater(MultiOperatorProcessor.class, Subscription.class, "upstream"); - private static final AtomicIntegerFieldUpdater DOWNSTREAM_CANCELLED_UPDATER = AtomicIntegerFieldUpdater - .newUpdater(MultiOperatorProcessor.class, "hasDownstreamCancelled"); + private static final AtomicIntegerFieldUpdater CANCELLATION_REQUESTED_UPDATER = AtomicIntegerFieldUpdater + .newUpdater(MultiOperatorProcessor.class, "cancellationRequested"); public MultiOperatorProcessor(MultiSubscriber downstream) { this.downstream = ParameterValidation.nonNull(downstream, "downstream"); @@ -53,7 +58,7 @@ protected boolean isDone() { } protected boolean isCancelled() { - return hasDownstreamCancelled == 1; + return cancellationRequested == 1; } @Override @@ -107,25 +112,20 @@ public void request(long numberOfItems) { @Override public void cancel() { - if (atomicallyFlipDownstreamHasCancelled()) { + if (compareAndSwapDownstreamCancellationRequest()) { cancelUpstream(); - cleanup(); } } - protected final boolean atomicallyFlipDownstreamHasCancelled() { - return DOWNSTREAM_CANCELLED_UPDATER.compareAndSet(this, 0, 1); + protected final boolean compareAndSwapDownstreamCancellationRequest() { + return CANCELLATION_REQUESTED_UPDATER.compareAndSet(this, 0, 1); } protected void cancelUpstream() { + this.cancellationRequested = 1; Subscription actual = UPSTREAM_UPDATER.getAndSet(this, CANCELLED); if (actual != null && actual != CANCELLED) { actual.cancel(); } } - - protected void cleanup() { - downstream = null; - } - } diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiWindowOp.java b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiWindowOp.java index d5623d7d0..a02be4257 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiWindowOp.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiWindowOp.java @@ -162,7 +162,7 @@ public void request(long n) { @Override public void cancel() { - if (atomicallyFlipDownstreamHasCancelled()) { + if (compareAndSwapDownstreamCancellationRequest()) { if (count.decrementAndGet() == 0) { getUpstreamSubscription().cancel(); } @@ -276,7 +276,7 @@ public void request(long n) { @Override public void cancel() { - if (atomicallyFlipDownstreamHasCancelled()) { + if (compareAndSwapDownstreamCancellationRequest()) { if (count.decrementAndGet() == 0) { getUpstreamSubscription().cancel(); } @@ -467,7 +467,7 @@ public void request(long n) { @Override public void cancel() { - if (atomicallyFlipDownstreamHasCancelled()) { + if (compareAndSwapDownstreamCancellationRequest()) { run(); } } diff --git a/implementation/src/test/java/io/smallrye/mutiny/BugReproducersTest.java b/implementation/src/test/java/io/smallrye/mutiny/BugReproducersTest.java index 51f011502..d6c10493b 100644 --- a/implementation/src/test/java/io/smallrye/mutiny/BugReproducersTest.java +++ b/implementation/src/test/java/io/smallrye/mutiny/BugReproducersTest.java @@ -1,18 +1,28 @@ package io.smallrye.mutiny; import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import org.junit.jupiter.api.RepeatedTest; +import org.junit.jupiter.api.Test; import io.smallrye.mutiny.helpers.test.AssertSubscriber; import io.smallrye.mutiny.infrastructure.Infrastructure; -public class BugReproducersTest { +class BugReproducersTest { @RepeatedTest(100) - public void reproducer_689() { + void reproducer_689() { // Adapted from https://github.com/smallrye/smallrye-mutiny/issues/689 AtomicLong src = new AtomicLong(); @@ -26,4 +36,43 @@ public void reproducer_689() { sub.awaitCompletion(); assertThat(sub.getItems()).hasSize(9_999); } + + @Test + void reproducer_705() { + // Adapted from https://github.com/smallrye/smallrye-mutiny/issues/705 + // The issue was an over-interpretation of one of the RS TCK rule regarding releasing subscriber references. + AssertSubscriber> sub = AssertSubscriber.create(); + AtomicInteger counter = new AtomicInteger(); + AtomicReference threadFailure = new AtomicReference<>(); + + ExecutorService threadPool = Executors.newFixedThreadPool(4, new ThreadFactory() { + @Override + public Thread newThread(Runnable task) { + Thread thread = Executors.defaultThreadFactory().newThread(task); + thread.setUncaughtExceptionHandler((t, e) -> { + e.printStackTrace(); + threadFailure.set(e); + }); + return thread; + } + }); + + Multi.createFrom().range(0, 1000) + .emitOn(threadPool) + .group().intoLists().of(100) + .onItem().invoke(() -> { + if (counter.incrementAndGet() == 3) { + sub.cancel(); + } + }) + .runSubscriptionOn(threadPool) + .subscribe().withSubscriber(sub); + + sub.request(Long.MAX_VALUE); + await().atMost(5, TimeUnit.SECONDS).untilAtomic(counter, greaterThanOrEqualTo(3)); + + assertThat(threadFailure.get()).isNull(); + sub.assertNotTerminated(); + threadPool.shutdownNow(); + } }