Skip to content

Commit

Permalink
Protect UniToMultiPublisher from re-subscriptions when concurrent req…
Browse files Browse the repository at this point in the history
…uests happen

Fixes #689
  • Loading branch information
jponge committed Sep 21, 2021
1 parent 5a631a2 commit d4923a7
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 2 deletions.
Expand Up @@ -2,6 +2,7 @@

import static io.smallrye.mutiny.helpers.EmptyUniSubscription.CANCELLED;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import org.reactivestreams.Publisher;
Expand Down Expand Up @@ -32,6 +33,7 @@ private static class UniToMultiSubscription<T> implements UniSubscription, Subsc
private final Subscriber<? super T> downstream;

private final AtomicReference<UniSubscription> upstream = new AtomicReference<>();
private final AtomicBoolean uniSubscriptionRequested = new AtomicBoolean(false);

private UniToMultiSubscription(Uni<T> uni, Subscriber<? super T> downstream) {
this.uni = uni;
Expand All @@ -55,12 +57,15 @@ public void request(long n) {
if (upstream.get() == CANCELLED) {
return;
}
AbstractUni.subscribe(uni, this);
if (uniSubscriptionRequested.compareAndSet(false, true)) {
AbstractUni.subscribe(uni, this);
}
}

@Override
public void onSubscribe(UniSubscription subscription) {
if (!upstream.compareAndSet(null, subscription)) {
subscription.cancel();
downstream.onError(new IllegalStateException(
"Invalid subscription state - already have a subscription for upstream"));
}
Expand Down
Expand Up @@ -149,12 +149,12 @@ public final void request(long n) {
unbounded = true;
}
}
Subscription actual = currentUpstream.get();

if (wip.decrementAndGet() != 0) {
drainLoop();
}

Subscription actual = currentUpstream.get();
if (actual != null) {
actual.request(n);
}
Expand Down
@@ -0,0 +1,29 @@
package io.smallrye.mutiny;

import static org.assertj.core.api.Assertions.assertThat;

import java.util.concurrent.atomic.AtomicLong;

import org.junit.jupiter.api.RepeatedTest;

import io.smallrye.mutiny.helpers.test.AssertSubscriber;
import io.smallrye.mutiny.infrastructure.Infrastructure;

public class BugReproducersTest {

@RepeatedTest(100)
public void reproducer_689() {
// Adapted from https://github.com/smallrye/smallrye-mutiny/issues/689
AtomicLong src = new AtomicLong();

AssertSubscriber<Long> sub = Multi.createBy().repeating()
.supplier(src::incrementAndGet)
.until(l -> l.equals(10_000L))
.flatMap(l -> Multi.createFrom().item(l * 2))
.emitOn(Infrastructure.getDefaultExecutor())
.subscribe().withSubscriber(AssertSubscriber.create(Long.MAX_VALUE));

sub.awaitCompletion();
assertThat(sub.getItems()).hasSize(9_999);
}
}
Expand Up @@ -4,10 +4,12 @@
import static org.assertj.core.api.Assertions.fail;

import java.io.IOException;
import java.util.concurrent.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.jupiter.api.*;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.reactivestreams.Publisher;
Expand All @@ -16,6 +18,7 @@
import io.reactivex.subscribers.TestSubscriber;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.helpers.test.AssertSubscriber;

public class UniToPublisherTest {

Expand Down Expand Up @@ -220,4 +223,31 @@ public void testUniOfVoid() {
Multi<Void> publisher = uni.toMulti();
assertThat(publisher.collect().asList().await().indefinitely()).isEmpty();
}

@RepeatedTest(1000)
public void multipleConcurrentRequests() throws InterruptedException {
final int n = 8;
CountDownLatch start = new CountDownLatch(n);

Multi<Integer> multi = Uni.createFrom()
.completionStage(() -> CompletableFuture.supplyAsync(() -> 63))
.toMulti();
AssertSubscriber<Integer> subscriber = multi.subscribe().withSubscriber(AssertSubscriber.create());

for (int i = 0; i < n; i++) {
ForkJoinPool.commonPool().execute(() -> {
try {
start.await();
subscriber.request(10L);
} catch (InterruptedException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
});
start.countDown();
}

subscriber.awaitCompletion();
assertThat(subscriber.getItems()).hasSize(1).containsExactly(63);
}
}

0 comments on commit d4923a7

Please sign in to comment.