From 0b6716eca759eb6b351f99332ca7efa1f3b812c5 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Mon, 5 Oct 2020 14:16:19 +0200 Subject: [PATCH 1/2] 2.x: Fix Flowable.concatMap backpressure w/ scalars --- .../operators/flowable/FlowableConcatMap.java | 16 +++--- .../flowable/FlowableConcatMapTest.java | 56 ++++++++++++++++++- 2 files changed, 62 insertions(+), 10 deletions(-) diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableConcatMap.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableConcatMap.java index f3dc0c58b8..64df7cc122 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableConcatMap.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableConcatMap.java @@ -13,7 +13,7 @@ package io.reactivex.internal.operators.flowable; import java.util.concurrent.Callable; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.*; import org.reactivestreams.*; @@ -332,7 +332,7 @@ void drain() { continue; } else { active = true; - inner.setSubscription(new WeakScalarSubscription(vr, inner)); + inner.setSubscription(new SimpleScalarSubscription(vr, inner)); } } else { @@ -349,20 +349,20 @@ void drain() { } } - static final class WeakScalarSubscription implements Subscription { + static final class SimpleScalarSubscription + extends AtomicBoolean + implements Subscription { final Subscriber downstream; final T value; - boolean once; - WeakScalarSubscription(T value, Subscriber downstream) { + SimpleScalarSubscription(T value, Subscriber downstream) { this.value = value; this.downstream = downstream; } @Override public void request(long n) { - if (n > 0 && !once) { - once = true; + if (n > 0 && compareAndSet(false, true)) { Subscriber a = downstream; a.onNext(value); a.onComplete(); @@ -538,7 +538,7 @@ void drain() { continue; } else { active = true; - inner.setSubscription(new WeakScalarSubscription(vr, inner)); + inner.setSubscription(new SimpleScalarSubscription(vr, inner)); } } else { active = true; diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableConcatMapTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableConcatMapTest.java index d9fe79977f..eaa972bce4 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableConcatMapTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableConcatMapTest.java @@ -24,7 +24,9 @@ import io.reactivex.*; import io.reactivex.exceptions.*; import io.reactivex.functions.*; -import io.reactivex.internal.operators.flowable.FlowableConcatMap.WeakScalarSubscription; +import io.reactivex.internal.functions.Functions; +import io.reactivex.internal.operators.flowable.FlowableConcatMap.SimpleScalarSubscription; +import io.reactivex.processors.PublishProcessor; import io.reactivex.schedulers.Schedulers; import io.reactivex.subscribers.TestSubscriber; @@ -33,7 +35,7 @@ public class FlowableConcatMapTest { @Test public void weakSubscriptionRequest() { TestSubscriber ts = new TestSubscriber(0); - WeakScalarSubscription ws = new WeakScalarSubscription(1, ts); + SimpleScalarSubscription ws = new SimpleScalarSubscription(1, ts); ts.onSubscribe(ws); ws.request(0); @@ -105,6 +107,56 @@ public Publisher apply(String v) .assertResult("RxSingleScheduler"); } + @Test + public void innerScalarRequestRace() { + Flowable just = Flowable.just(1); + int n = 1000; + for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) { + PublishProcessor> source = PublishProcessor.create(); + + TestSubscriber ts = source + .concatMap(Functions.>identity(), n + 1) + .test(1L); + + TestHelper.race(() -> { + for (int j = 0; j < n; j++) { + source.onNext(just); + } + }, () -> { + for (int j = 0; j < n; j++) { + ts.request(1); + } + }); + + ts.assertValueCount(n); + } + } + + @Test + public void innerScalarRequestRaceDelayError() { + Flowable just = Flowable.just(1); + int n = 1000; + for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) { + PublishProcessor> source = PublishProcessor.create(); + + TestSubscriber ts = source + .concatMapDelayError(Functions.>identity(), n + 1, true) + .test(1L); + + TestHelper.race(() -> { + for (int j = 0; j < n; j++) { + source.onNext(just); + } + }, () -> { + for (int j = 0; j < n; j++) { + ts.request(1); + } + }); + + ts.assertValueCount(n); + } + } + @Test public void pollThrows() { Flowable.just(1) From 0d563fe93d5557bf0616a5220b7b295f965c4a6f Mon Sep 17 00:00:00 2001 From: akarnokd Date: Mon, 5 Oct 2020 14:24:04 +0200 Subject: [PATCH 2/2] Replace Java 8 constructs --- .../flowable/FlowableConcatMapTest.java | 52 ++++++++++++------- 1 file changed, 32 insertions(+), 20 deletions(-) diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableConcatMapTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableConcatMapTest.java index eaa972bce4..8bd29121a0 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableConcatMapTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableConcatMapTest.java @@ -109,22 +109,28 @@ public Publisher apply(String v) @Test public void innerScalarRequestRace() { - Flowable just = Flowable.just(1); - int n = 1000; + final Flowable just = Flowable.just(1); + final int n = 1000; for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) { - PublishProcessor> source = PublishProcessor.create(); + final PublishProcessor> source = PublishProcessor.create(); - TestSubscriber ts = source + final TestSubscriber ts = source .concatMap(Functions.>identity(), n + 1) .test(1L); - TestHelper.race(() -> { - for (int j = 0; j < n; j++) { - source.onNext(just); + TestHelper.race(new Runnable() { + @Override + public void run() { + for (int j = 0; j < n; j++) { + source.onNext(just); + } } - }, () -> { - for (int j = 0; j < n; j++) { - ts.request(1); + }, new Runnable() { + @Override + public void run() { + for (int j = 0; j < n; j++) { + ts.request(1); + } } }); @@ -134,22 +140,28 @@ public void innerScalarRequestRace() { @Test public void innerScalarRequestRaceDelayError() { - Flowable just = Flowable.just(1); - int n = 1000; + final Flowable just = Flowable.just(1); + final int n = 1000; for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) { - PublishProcessor> source = PublishProcessor.create(); + final PublishProcessor> source = PublishProcessor.create(); - TestSubscriber ts = source + final TestSubscriber ts = source .concatMapDelayError(Functions.>identity(), n + 1, true) .test(1L); - TestHelper.race(() -> { - for (int j = 0; j < n; j++) { - source.onNext(just); + TestHelper.race(new Runnable() { + @Override + public void run() { + for (int j = 0; j < n; j++) { + source.onNext(just); + } } - }, () -> { - for (int j = 0; j < n; j++) { - ts.request(1); + }, new Runnable() { + @Override + public void run() { + for (int j = 0; j < n; j++) { + ts.request(1); + } } });