diff --git a/implementation/src/main/java/io/smallrye/mutiny/converters/uni/UniToMultiPublisher.java b/implementation/src/main/java/io/smallrye/mutiny/converters/uni/UniToMultiPublisher.java index 3735cb812..7ce69738c 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/converters/uni/UniToMultiPublisher.java +++ b/implementation/src/main/java/io/smallrye/mutiny/converters/uni/UniToMultiPublisher.java @@ -2,6 +2,7 @@ import static io.smallrye.mutiny.helpers.EmptyUniSubscription.CANCELLED; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import org.reactivestreams.Publisher; @@ -32,6 +33,7 @@ private static class UniToMultiSubscription implements UniSubscription, Subsc private final Subscriber downstream; private final AtomicReference upstream = new AtomicReference<>(); + private final AtomicBoolean uniSubscriptionRequested = new AtomicBoolean(false); private UniToMultiSubscription(Uni uni, Subscriber downstream) { this.uni = uni; @@ -55,12 +57,15 @@ public void request(long n) { if (upstream.get() == CANCELLED) { return; } - AbstractUni.subscribe(uni, this); + if (uniSubscriptionRequested.compareAndSet(false, true)) { + AbstractUni.subscribe(uni, this); + } } @Override public void onSubscribe(UniSubscription subscription) { if (!upstream.compareAndSet(null, subscription)) { + subscription.cancel(); downstream.onError(new IllegalStateException( "Invalid subscription state - already have a subscription for upstream")); } diff --git a/implementation/src/main/java/io/smallrye/mutiny/subscription/SwitchableSubscriptionSubscriber.java b/implementation/src/main/java/io/smallrye/mutiny/subscription/SwitchableSubscriptionSubscriber.java index edb039dd9..04d047a5f 100644 --- a/implementation/src/main/java/io/smallrye/mutiny/subscription/SwitchableSubscriptionSubscriber.java +++ b/implementation/src/main/java/io/smallrye/mutiny/subscription/SwitchableSubscriptionSubscriber.java @@ -149,12 +149,12 @@ public final void request(long n) { unbounded = true; } } - Subscription actual = currentUpstream.get(); if (wip.decrementAndGet() != 0) { drainLoop(); } + Subscription actual = currentUpstream.get(); if (actual != null) { actual.request(n); } diff --git a/implementation/src/test/java/io/smallrye/mutiny/BugReproducersTest.java b/implementation/src/test/java/io/smallrye/mutiny/BugReproducersTest.java new file mode 100644 index 000000000..51f011502 --- /dev/null +++ b/implementation/src/test/java/io/smallrye/mutiny/BugReproducersTest.java @@ -0,0 +1,29 @@ +package io.smallrye.mutiny; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.concurrent.atomic.AtomicLong; + +import org.junit.jupiter.api.RepeatedTest; + +import io.smallrye.mutiny.helpers.test.AssertSubscriber; +import io.smallrye.mutiny.infrastructure.Infrastructure; + +public class BugReproducersTest { + + @RepeatedTest(100) + public void reproducer_689() { + // Adapted from https://github.com/smallrye/smallrye-mutiny/issues/689 + AtomicLong src = new AtomicLong(); + + AssertSubscriber sub = Multi.createBy().repeating() + .supplier(src::incrementAndGet) + .until(l -> l.equals(10_000L)) + .flatMap(l -> Multi.createFrom().item(l * 2)) + .emitOn(Infrastructure.getDefaultExecutor()) + .subscribe().withSubscriber(AssertSubscriber.create(Long.MAX_VALUE)); + + sub.awaitCompletion(); + assertThat(sub.getItems()).hasSize(9_999); + } +} diff --git a/implementation/src/test/java/io/smallrye/mutiny/operators/UniToPublisherTest.java b/implementation/src/test/java/io/smallrye/mutiny/operators/UniToPublisherTest.java index fe511bbc2..21fd7cb81 100644 --- a/implementation/src/test/java/io/smallrye/mutiny/operators/UniToPublisherTest.java +++ b/implementation/src/test/java/io/smallrye/mutiny/operators/UniToPublisherTest.java @@ -4,10 +4,12 @@ import static org.assertj.core.api.Assertions.fail; import java.io.IOException; +import java.util.concurrent.*; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; +import org.junit.jupiter.api.*; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import org.reactivestreams.Publisher; @@ -16,6 +18,7 @@ import io.reactivex.subscribers.TestSubscriber; import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; +import io.smallrye.mutiny.helpers.test.AssertSubscriber; public class UniToPublisherTest { @@ -220,4 +223,31 @@ public void testUniOfVoid() { Multi publisher = uni.toMulti(); assertThat(publisher.collect().asList().await().indefinitely()).isEmpty(); } + + @RepeatedTest(1000) + public void multipleConcurrentRequests() throws InterruptedException { + final int n = 8; + CountDownLatch start = new CountDownLatch(n); + + Multi multi = Uni.createFrom() + .completionStage(() -> CompletableFuture.supplyAsync(() -> 63)) + .toMulti(); + AssertSubscriber subscriber = multi.subscribe().withSubscriber(AssertSubscriber.create()); + + for (int i = 0; i < n; i++) { + ForkJoinPool.commonPool().execute(() -> { + try { + start.await(); + subscriber.request(10L); + } catch (InterruptedException e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + }); + start.countDown(); + } + + subscriber.awaitCompletion(); + assertThat(subscriber.getItems()).hasSize(1).containsExactly(63); + } }