diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableConcatMap.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableConcatMap.java index f3dc0c58b8..64df7cc122 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableConcatMap.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableConcatMap.java @@ -13,7 +13,7 @@ package io.reactivex.internal.operators.flowable; import java.util.concurrent.Callable; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.*; import org.reactivestreams.*; @@ -332,7 +332,7 @@ void drain() { continue; } else { active = true; - inner.setSubscription(new WeakScalarSubscription(vr, inner)); + inner.setSubscription(new SimpleScalarSubscription(vr, inner)); } } else { @@ -349,20 +349,20 @@ void drain() { } } - static final class WeakScalarSubscription implements Subscription { + static final class SimpleScalarSubscription + extends AtomicBoolean + implements Subscription { final Subscriber downstream; final T value; - boolean once; - WeakScalarSubscription(T value, Subscriber downstream) { + SimpleScalarSubscription(T value, Subscriber downstream) { this.value = value; this.downstream = downstream; } @Override public void request(long n) { - if (n > 0 && !once) { - once = true; + if (n > 0 && compareAndSet(false, true)) { Subscriber a = downstream; a.onNext(value); a.onComplete(); @@ -538,7 +538,7 @@ void drain() { continue; } else { active = true; - inner.setSubscription(new WeakScalarSubscription(vr, inner)); + inner.setSubscription(new SimpleScalarSubscription(vr, inner)); } } else { active = true; diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableConcatMapTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableConcatMapTest.java index d9fe79977f..8bd29121a0 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableConcatMapTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableConcatMapTest.java @@ -24,7 +24,9 @@ import io.reactivex.*; import io.reactivex.exceptions.*; import io.reactivex.functions.*; -import io.reactivex.internal.operators.flowable.FlowableConcatMap.WeakScalarSubscription; +import io.reactivex.internal.functions.Functions; +import io.reactivex.internal.operators.flowable.FlowableConcatMap.SimpleScalarSubscription; +import io.reactivex.processors.PublishProcessor; import io.reactivex.schedulers.Schedulers; import io.reactivex.subscribers.TestSubscriber; @@ -33,7 +35,7 @@ public class FlowableConcatMapTest { @Test public void weakSubscriptionRequest() { TestSubscriber ts = new TestSubscriber(0); - WeakScalarSubscription ws = new WeakScalarSubscription(1, ts); + SimpleScalarSubscription ws = new SimpleScalarSubscription(1, ts); ts.onSubscribe(ws); ws.request(0); @@ -105,6 +107,68 @@ public Publisher apply(String v) .assertResult("RxSingleScheduler"); } + @Test + public void innerScalarRequestRace() { + final Flowable just = Flowable.just(1); + final int n = 1000; + for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) { + final PublishProcessor> source = PublishProcessor.create(); + + final TestSubscriber ts = source + .concatMap(Functions.>identity(), n + 1) + .test(1L); + + TestHelper.race(new Runnable() { + @Override + public void run() { + for (int j = 0; j < n; j++) { + source.onNext(just); + } + } + }, new Runnable() { + @Override + public void run() { + for (int j = 0; j < n; j++) { + ts.request(1); + } + } + }); + + ts.assertValueCount(n); + } + } + + @Test + public void innerScalarRequestRaceDelayError() { + final Flowable just = Flowable.just(1); + final int n = 1000; + for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) { + final PublishProcessor> source = PublishProcessor.create(); + + final TestSubscriber ts = source + .concatMapDelayError(Functions.>identity(), n + 1, true) + .test(1L); + + TestHelper.race(new Runnable() { + @Override + public void run() { + for (int j = 0; j < n; j++) { + source.onNext(just); + } + } + }, new Runnable() { + @Override + public void run() { + for (int j = 0; j < n; j++) { + ts.request(1); + } + } + }); + + ts.assertValueCount(n); + } + } + @Test public void pollThrows() { Flowable.just(1)