From 2ba8bb2862255ab26c61d9a14ef17b32d2bfc484 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Ko=C5=82aczkowski?= Date: Thu, 15 Feb 2018 11:15:43 +0100 Subject: [PATCH] Fix a race condition in OperatorMerge.InnerSubscriber#onError (#5851) * Fix a race condition in OperatorMerge.InnerSubscriber#onError Inner subscriber must queue the error first before setting done, so that after emitLoop() removes the subscriber, emitLoop is guaranteed to notice the error. Otherwise it would be possible that inner subscribers count was 0, and at the same time the error queue was empty. * Add unit test for OperatorMerge.InnerSubscriber#onError race --- .../rx/internal/operators/OperatorMerge.java | 5 +++- .../internal/operators/OperatorMergeTest.java | 28 +++++++++++++++++++ 2 files changed, 32 insertions(+), 1 deletion(-) diff --git a/src/main/java/rx/internal/operators/OperatorMerge.java b/src/main/java/rx/internal/operators/OperatorMerge.java index fa30f46b10..a52eee07e9 100644 --- a/src/main/java/rx/internal/operators/OperatorMerge.java +++ b/src/main/java/rx/internal/operators/OperatorMerge.java @@ -847,8 +847,11 @@ public void onNext(T t) { } @Override public void onError(Throwable e) { - done = true; + // Need to queue the error first before setting done, so that after emitLoop() removes the subscriber, + // it is guaranteed to notice the error. Otherwise it would be possible that inner subscribers count was 0, + // and at the same time the error queue was empty. parent.getOrCreateErrorQueue().offer(e); + done = true; parent.emit(); } @Override diff --git a/src/test/java/rx/internal/operators/OperatorMergeTest.java b/src/test/java/rx/internal/operators/OperatorMergeTest.java index 7528cb88d8..bc13673f5e 100644 --- a/src/test/java/rx/internal/operators/OperatorMergeTest.java +++ b/src/test/java/rx/internal/operators/OperatorMergeTest.java @@ -1205,6 +1205,34 @@ public void onNext(Integer t) { assertTrue(latch.await(10, TimeUnit.SECONDS)); } + @Test + public void testConcurrentMergeInnerError() { + for (int i = 0; i < 1000; i++) { + final TestSubscriber ts = TestSubscriber.create(); + final PublishSubject ps1 = PublishSubject.create(); + final PublishSubject ps2 = PublishSubject.create(); + final Exception error = new NullPointerException(); + Action0 action1 = new Action0() { + @Override + public void call() { + ps1.onNext(1); + ps1.onCompleted(); + } + }; + Action0 action2 = new Action0() { + @Override + public void call() { + ps2.onError(error); + } + }; + + Observable.mergeDelayError(ps1, ps2).subscribe(ts); + TestUtil.race(action1, action2); + ts.assertTerminalEvent(); + ts.assertError(error); + } + } + private static Action1 printCount() { return new Action1() { long count;