Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

No need to release downstream subscriber references #712

Merged
merged 1 commit into from Oct 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -13,16 +13,21 @@

public abstract class MultiOperatorProcessor<I, O> implements MultiSubscriber<I>, Subscription {

// Cannot be final, the TCK checks it gets released.
/*
* We used to have an interpretation of the RS TCK that it had to be null on cancellation to release the subscriber.
* It's actually not necessary (and NPE-prone) since operators are instantiated per-subscription, so the *publisher*
* does not actually keep references on cancelled subscribers.
*/
protected volatile MultiSubscriber<? super O> downstream;

protected volatile Subscription upstream = null;
private volatile int hasDownstreamCancelled = 0;
private volatile int cancellationRequested = 0;

private static final AtomicReferenceFieldUpdater<MultiOperatorProcessor, Subscription> UPSTREAM_UPDATER = AtomicReferenceFieldUpdater
.newUpdater(MultiOperatorProcessor.class, Subscription.class, "upstream");

private static final AtomicIntegerFieldUpdater<MultiOperatorProcessor> DOWNSTREAM_CANCELLED_UPDATER = AtomicIntegerFieldUpdater
.newUpdater(MultiOperatorProcessor.class, "hasDownstreamCancelled");
private static final AtomicIntegerFieldUpdater<MultiOperatorProcessor> CANCELLATION_REQUESTED_UPDATER = AtomicIntegerFieldUpdater
.newUpdater(MultiOperatorProcessor.class, "cancellationRequested");

public MultiOperatorProcessor(MultiSubscriber<? super O> downstream) {
this.downstream = ParameterValidation.nonNull(downstream, "downstream");
Expand Down Expand Up @@ -53,7 +58,7 @@ protected boolean isDone() {
}

protected boolean isCancelled() {
return hasDownstreamCancelled == 1;
return cancellationRequested == 1;
}

@Override
Expand Down Expand Up @@ -107,25 +112,20 @@ public void request(long numberOfItems) {

@Override
public void cancel() {
if (atomicallyFlipDownstreamHasCancelled()) {
if (compareAndSwapDownstreamCancellationRequest()) {
cancelUpstream();
cleanup();
}
}

protected final boolean atomicallyFlipDownstreamHasCancelled() {
return DOWNSTREAM_CANCELLED_UPDATER.compareAndSet(this, 0, 1);
protected final boolean compareAndSwapDownstreamCancellationRequest() {
return CANCELLATION_REQUESTED_UPDATER.compareAndSet(this, 0, 1);
}

protected void cancelUpstream() {
this.cancellationRequested = 1;
Subscription actual = UPSTREAM_UPDATER.getAndSet(this, CANCELLED);
if (actual != null && actual != CANCELLED) {
actual.cancel();
}
}

protected void cleanup() {
downstream = null;
}

}
Expand Up @@ -162,7 +162,7 @@ public void request(long n) {

@Override
public void cancel() {
if (atomicallyFlipDownstreamHasCancelled()) {
if (compareAndSwapDownstreamCancellationRequest()) {
if (count.decrementAndGet() == 0) {
getUpstreamSubscription().cancel();
}
Expand Down Expand Up @@ -276,7 +276,7 @@ public void request(long n) {

@Override
public void cancel() {
if (atomicallyFlipDownstreamHasCancelled()) {
if (compareAndSwapDownstreamCancellationRequest()) {
if (count.decrementAndGet() == 0) {
getUpstreamSubscription().cancel();
}
Expand Down Expand Up @@ -467,7 +467,7 @@ public void request(long n) {

@Override
public void cancel() {
if (atomicallyFlipDownstreamHasCancelled()) {
if (compareAndSwapDownstreamCancellationRequest()) {
run();
}
}
Expand Down
@@ -1,18 +1,28 @@
package io.smallrye.mutiny;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;

import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;

import io.smallrye.mutiny.helpers.test.AssertSubscriber;
import io.smallrye.mutiny.infrastructure.Infrastructure;

public class BugReproducersTest {
class BugReproducersTest {

@RepeatedTest(100)
public void reproducer_689() {
void reproducer_689() {
// Adapted from https://github.com/smallrye/smallrye-mutiny/issues/689
AtomicLong src = new AtomicLong();

Expand All @@ -26,4 +36,43 @@ public void reproducer_689() {
sub.awaitCompletion();
assertThat(sub.getItems()).hasSize(9_999);
}

@Test
void reproducer_705() {
// Adapted from https://github.com/smallrye/smallrye-mutiny/issues/705
// The issue was an over-interpretation of one of the RS TCK rule regarding releasing subscriber references.
AssertSubscriber<List<Integer>> sub = AssertSubscriber.create();
AtomicInteger counter = new AtomicInteger();
AtomicReference<Throwable> threadFailure = new AtomicReference<>();

ExecutorService threadPool = Executors.newFixedThreadPool(4, new ThreadFactory() {
@Override
public Thread newThread(Runnable task) {
Thread thread = Executors.defaultThreadFactory().newThread(task);
thread.setUncaughtExceptionHandler((t, e) -> {
e.printStackTrace();
threadFailure.set(e);
});
return thread;
}
});

Multi.createFrom().range(0, 1000)
.emitOn(threadPool)
.group().intoLists().of(100)
.onItem().invoke(() -> {
if (counter.incrementAndGet() == 3) {
sub.cancel();
}
})
.runSubscriptionOn(threadPool)
.subscribe().withSubscriber(sub);

sub.request(Long.MAX_VALUE);
await().atMost(5, TimeUnit.SECONDS).untilAtomic(counter, greaterThanOrEqualTo(3));

assertThat(threadFailure.get()).isNull();
sub.assertNotTerminated();
threadPool.shutdownNow();
}
}