diff --git a/src/main/java/rx/internal/operators/OperatorMerge.java b/src/main/java/rx/internal/operators/OperatorMerge.java index fa30f46b10..a52eee07e9 100644 --- a/src/main/java/rx/internal/operators/OperatorMerge.java +++ b/src/main/java/rx/internal/operators/OperatorMerge.java @@ -847,8 +847,11 @@ public void onNext(T t) { } @Override public void onError(Throwable e) { - done = true; + // Need to queue the error first before setting done, so that after emitLoop() removes the subscriber, + // it is guaranteed to notice the error. Otherwise it would be possible that inner subscribers count was 0, + // and at the same time the error queue was empty. parent.getOrCreateErrorQueue().offer(e); + done = true; parent.emit(); } @Override diff --git a/src/test/java/rx/internal/operators/OperatorMergeTest.java b/src/test/java/rx/internal/operators/OperatorMergeTest.java index 7528cb88d8..bc13673f5e 100644 --- a/src/test/java/rx/internal/operators/OperatorMergeTest.java +++ b/src/test/java/rx/internal/operators/OperatorMergeTest.java @@ -1205,6 +1205,34 @@ public void onNext(Integer t) { assertTrue(latch.await(10, TimeUnit.SECONDS)); } + @Test + public void testConcurrentMergeInnerError() { + for (int i = 0; i < 1000; i++) { + final TestSubscriber ts = TestSubscriber.create(); + final PublishSubject ps1 = PublishSubject.create(); + final PublishSubject ps2 = PublishSubject.create(); + final Exception error = new NullPointerException(); + Action0 action1 = new Action0() { + @Override + public void call() { + ps1.onNext(1); + ps1.onCompleted(); + } + }; + Action0 action2 = new Action0() { + @Override + public void call() { + ps2.onError(error); + } + }; + + Observable.mergeDelayError(ps1, ps2).subscribe(ts); + TestUtil.race(action1, action2); + ts.assertTerminalEvent(); + ts.assertError(error); + } + } + private static Action1 printCount() { return new Action1() { long count;