Skip to content

Commit

Permalink
Merge pull request #712 from smallrye/fix/705
Browse files Browse the repository at this point in the history
No need to release downstream subscriber references
  • Loading branch information
jponge committed Oct 7, 2021
2 parents 45615fd + 408d0b7 commit 8aa8f05
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 19 deletions.
Expand Up @@ -13,16 +13,21 @@

public abstract class MultiOperatorProcessor<I, O> implements MultiSubscriber<I>, Subscription {

// Cannot be final, the TCK checks it gets released.
/*
* We used to have an interpretation of the RS TCK that it had to be null on cancellation to release the subscriber.
* It's actually not necessary (and NPE-prone) since operators are instantiated per-subscription, so the *publisher*
* does not actually keep references on cancelled subscribers.
*/
protected volatile MultiSubscriber<? super O> downstream;

protected volatile Subscription upstream = null;
private volatile int hasDownstreamCancelled = 0;
private volatile int cancellationRequested = 0;

private static final AtomicReferenceFieldUpdater<MultiOperatorProcessor, Subscription> UPSTREAM_UPDATER = AtomicReferenceFieldUpdater
.newUpdater(MultiOperatorProcessor.class, Subscription.class, "upstream");

private static final AtomicIntegerFieldUpdater<MultiOperatorProcessor> DOWNSTREAM_CANCELLED_UPDATER = AtomicIntegerFieldUpdater
.newUpdater(MultiOperatorProcessor.class, "hasDownstreamCancelled");
private static final AtomicIntegerFieldUpdater<MultiOperatorProcessor> CANCELLATION_REQUESTED_UPDATER = AtomicIntegerFieldUpdater
.newUpdater(MultiOperatorProcessor.class, "cancellationRequested");

public MultiOperatorProcessor(MultiSubscriber<? super O> downstream) {
this.downstream = ParameterValidation.nonNull(downstream, "downstream");
Expand Down Expand Up @@ -53,7 +58,7 @@ protected boolean isDone() {
}

protected boolean isCancelled() {
return hasDownstreamCancelled == 1;
return cancellationRequested == 1;
}

@Override
Expand Down Expand Up @@ -107,25 +112,20 @@ public void request(long numberOfItems) {

@Override
public void cancel() {
if (atomicallyFlipDownstreamHasCancelled()) {
if (compareAndSwapDownstreamCancellationRequest()) {
cancelUpstream();
cleanup();
}
}

protected final boolean atomicallyFlipDownstreamHasCancelled() {
return DOWNSTREAM_CANCELLED_UPDATER.compareAndSet(this, 0, 1);
protected final boolean compareAndSwapDownstreamCancellationRequest() {
return CANCELLATION_REQUESTED_UPDATER.compareAndSet(this, 0, 1);
}

protected void cancelUpstream() {
this.cancellationRequested = 1;
Subscription actual = UPSTREAM_UPDATER.getAndSet(this, CANCELLED);
if (actual != null && actual != CANCELLED) {
actual.cancel();
}
}

protected void cleanup() {
downstream = null;
}

}
Expand Up @@ -162,7 +162,7 @@ public void request(long n) {

@Override
public void cancel() {
if (atomicallyFlipDownstreamHasCancelled()) {
if (compareAndSwapDownstreamCancellationRequest()) {
if (count.decrementAndGet() == 0) {
getUpstreamSubscription().cancel();
}
Expand Down Expand Up @@ -276,7 +276,7 @@ public void request(long n) {

@Override
public void cancel() {
if (atomicallyFlipDownstreamHasCancelled()) {
if (compareAndSwapDownstreamCancellationRequest()) {
if (count.decrementAndGet() == 0) {
getUpstreamSubscription().cancel();
}
Expand Down Expand Up @@ -467,7 +467,7 @@ public void request(long n) {

@Override
public void cancel() {
if (atomicallyFlipDownstreamHasCancelled()) {
if (compareAndSwapDownstreamCancellationRequest()) {
run();
}
}
Expand Down
@@ -1,18 +1,28 @@
package io.smallrye.mutiny;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;

import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;

import io.smallrye.mutiny.helpers.test.AssertSubscriber;
import io.smallrye.mutiny.infrastructure.Infrastructure;

public class BugReproducersTest {
class BugReproducersTest {

@RepeatedTest(100)
public void reproducer_689() {
void reproducer_689() {
// Adapted from https://github.com/smallrye/smallrye-mutiny/issues/689
AtomicLong src = new AtomicLong();

Expand All @@ -26,4 +36,43 @@ public void reproducer_689() {
sub.awaitCompletion();
assertThat(sub.getItems()).hasSize(9_999);
}

@Test
void reproducer_705() {
// Adapted from https://github.com/smallrye/smallrye-mutiny/issues/705
// The issue was an over-interpretation of one of the RS TCK rule regarding releasing subscriber references.
AssertSubscriber<List<Integer>> sub = AssertSubscriber.create();
AtomicInteger counter = new AtomicInteger();
AtomicReference<Throwable> threadFailure = new AtomicReference<>();

ExecutorService threadPool = Executors.newFixedThreadPool(4, new ThreadFactory() {
@Override
public Thread newThread(Runnable task) {
Thread thread = Executors.defaultThreadFactory().newThread(task);
thread.setUncaughtExceptionHandler((t, e) -> {
e.printStackTrace();
threadFailure.set(e);
});
return thread;
}
});

Multi.createFrom().range(0, 1000)
.emitOn(threadPool)
.group().intoLists().of(100)
.onItem().invoke(() -> {
if (counter.incrementAndGet() == 3) {
sub.cancel();
}
})
.runSubscriptionOn(threadPool)
.subscribe().withSubscriber(sub);

sub.request(Long.MAX_VALUE);
await().atMost(5, TimeUnit.SECONDS).untilAtomic(counter, greaterThanOrEqualTo(3));

assertThat(threadFailure.get()).isNull();
sub.assertNotTerminated();
threadPool.shutdownNow();
}
}

0 comments on commit 8aa8f05

Please sign in to comment.