From fe1089b7b9d16cafe9d8a5cc0c0d177ed17bfb15 Mon Sep 17 00:00:00 2001 From: vjgarciag96 Date: Tue, 14 Apr 2020 21:19:50 +0200 Subject: [PATCH] 2.x: Fix Observable.flatMap with maxConcurrency hangs (#6947) --- .../observable/ObservableFlatMap.java | 39 +++++++++++++------ .../flowable/FlowableFlatMapTest.java | 23 +++++++++++ .../observable/ObservableFlatMapTest.java | 23 +++++++++++ 3 files changed, 73 insertions(+), 12 deletions(-) diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMap.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMap.java index a4766f389d..73a5306513 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMap.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMap.java @@ -334,6 +334,7 @@ void drainLoop() { if (checkTerminate()) { return; } + int innerCompleted = 0; SimplePlainQueue svq = queue; if (svq != null) { @@ -349,9 +350,18 @@ void drainLoop() { } child.onNext(o); + innerCompleted++; } } + if (innerCompleted != 0) { + if (maxConcurrency != Integer.MAX_VALUE) { + subscribeMore(innerCompleted); + innerCompleted = 0; + } + continue; + } + boolean d = done; svq = queue; InnerObserver[] inner = observers.get(); @@ -376,7 +386,6 @@ void drainLoop() { return; } - int innerCompleted = 0; if (n != 0) { long startId = lastId; int index = lastIndex; @@ -463,20 +472,12 @@ void drainLoop() { if (innerCompleted != 0) { if (maxConcurrency != Integer.MAX_VALUE) { - while (innerCompleted-- != 0) { - ObservableSource p; - synchronized (this) { - p = sources.poll(); - if (p == null) { - wip--; - continue; - } - } - subscribeInner(p); - } + subscribeMore(innerCompleted); + innerCompleted = 0; } continue; } + missed = addAndGet(-missed); if (missed == 0) { break; @@ -484,6 +485,20 @@ void drainLoop() { } } + void subscribeMore(int innerCompleted) { + while (innerCompleted-- != 0) { + ObservableSource p; + synchronized (this) { + p = sources.poll(); + if (p == null) { + wip--; + continue; + } + } + subscribeInner(p); + } + } + boolean checkTerminate() { if (cancelled) { return true; diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlatMapTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlatMapTest.java index d121eea54e..e232af7bd4 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlatMapTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlatMapTest.java @@ -1157,4 +1157,27 @@ public void innerErrorsMainCancelled() { assertFalse("Has subscribers?", pp1.hasSubscribers()); } + + @Test(timeout = 5000) + public void mixedScalarAsync() { + for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) { + Flowable + .range(0, 20) + .flatMap(new Function>() { + @Override + public Publisher apply(Integer integer) throws Exception { + if (integer % 5 != 0) { + return Flowable + .just(integer); + } + + return Flowable + .just(-integer) + .observeOn(Schedulers.computation()); + } + }, false, 1) + .ignoreElements() + .blockingAwait(); + } + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableFlatMapTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableFlatMapTest.java index 5e0fa7cf8a..61fbd21c7f 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableFlatMapTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableFlatMapTest.java @@ -1118,4 +1118,27 @@ public void innerErrorsMainCancelled() { assertFalse("Has subscribers?", ps1.hasObservers()); } + + @Test(timeout = 5000) + public void mixedScalarAsync() { + for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) { + Observable + .range(0, 20) + .flatMap(new Function>() { + @Override + public ObservableSource apply(Integer integer) throws Exception { + if (integer % 5 != 0) { + return Observable + .just(integer); + } + + return Observable + .just(-integer) + .observeOn(Schedulers.computation()); + } + }, false, 1) + .ignoreElements() + .blockingAwait(); + } + } }