Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix a race condition in OperatorMerge.InnerSubscriber#onError #5851

Merged

Conversation

pkolaczk
Copy link

Inner subscriber must queue the error first before setting done, so that after emitLoop removes the subscriber,emitLoop is guaranteed to notice the error. Otherwise it would be possible that inner subscribers count was 0, and at the same time the error queue was empty.

We observed that sometimes a result of flattening two observables doesn't properly mark the resulting observable as failed, despite the inner observable properly reporting onError. Instead, the error condition was ignored and the result observable was completing normally.

Inner subscriber must queue the error first before setting done,
so that after emitLoop() removes the subscriber,
emitLoop is guaranteed to notice the error.
Otherwise it would be possible that inner subscribers count was 0,
and at the same time the error queue was empty.
@akarnokd akarnokd added this to the 1.4 milestone Feb 14, 2018
parent.getOrCreateErrorQueue().offer(e);
done = true;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add an unit test that checks this race condition.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do I do this? This looks like a very rare / hard to trigger one. A thread would have to stop for a moment after setting done and before queuing the error at the time when emitLoop was running on another thread.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's what TestUtil.race is for. Have one subject onCompleted() and another onError(), mergeDelayError them. Loop this at least 1000 times, check a TestSubscriber for error termination. Remember to create the subjects and subscriber inside the loop due to the terminal event nature.

@codecov
Copy link

codecov bot commented Feb 14, 2018

Codecov Report

Merging #5851 into 1.x will increase coverage by 0.07%.
The diff coverage is 100%.

Impacted file tree graph

@@             Coverage Diff              @@
##                1.x    #5851      +/-   ##
============================================
+ Coverage     84.18%   84.25%   +0.07%     
- Complexity     2889     2891       +2     
============================================
  Files           290      290              
  Lines         18264    18264              
  Branches       2495     2495              
============================================
+ Hits          15375    15389      +14     
+ Misses         2006     1992      -14     
  Partials        883      883
Impacted Files Coverage Δ Complexity Δ
...main/java/rx/internal/operators/OperatorMerge.java 86.23% <100%> (+0.45%) 7 <0> (ø) ⬇️
...a/rx/internal/operators/BufferUntilSubscriber.java 71.42% <0%> (-6.35%) 11% <0%> (-1%)
...ain/java/rx/internal/operators/OnSubscribeAmb.java 79.13% <0%> (-5.04%) 13% <0%> (ø)
...ain/java/rx/internal/schedulers/SchedulerWhen.java 83.78% <0%> (-4.06%) 4% <0%> (ø)
...n/java/rx/subscriptions/CompositeSubscription.java 76.62% <0%> (-1.3%) 25% <0%> (-1%)
...n/java/rx/internal/operators/CachedObservable.java 81.67% <0%> (-1.05%) 6% <0%> (ø)
...in/java/rx/internal/operators/OperatorPublish.java 77.91% <0%> (-0.84%) 8% <0%> (ø)
...n/java/rx/subjects/SubjectSubscriptionManager.java 80.71% <0%> (-0.72%) 15% <0%> (-1%)
src/main/java/rx/Completable.java 83.29% <0%> (-0.13%) 102% <0%> (-1%)
...ain/java/rx/internal/operators/OperatorReplay.java 82.77% <0%> (+0.39%) 15% <0%> (ø) ⬇️
... and 9 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update a49c49f...9b8460d. Read the comment docs.

@pkolaczk
Copy link
Author

Wow! I wrote a test and I'm surprised it reproduced the issue so easily. When I reverted the patch, the test failed with:

java.lang.AssertionError: No errors (1 completion)

	at rx.observers.TestSubscriber.assertionError(TestSubscriber.java:667)
	at rx.observers.TestSubscriber.assertError(TestSubscriber.java:557)
	at rx.internal.operators.OperatorMergeTest.testConcurrentMergeInnerError(OperatorMergeTest.java:1232)
	...

@pkolaczk
Copy link
Author

@akarnokd Thanks, can you shed some light when it will be released to maven?

@akarnokd
Copy link
Member

Your other PR still incomplete without verifying the fix via a similar unit test. I usually announce the next version 3-5 days before release, however, since this affects a critical operator merge, it could be cut sort and release as soon both your PRs clear.

@pkolaczk
Copy link
Author

I know, working on it. Sorry, different time zone maybe. I've just started the day. ;)

@akarnokd akarnokd merged commit 2ba8bb2 into ReactiveX:1.x Feb 15, 2018
@akarnokd
Copy link
Member

Thanks for the fixes. These were long-standing bugs you stumbled upon very close to the end-of-life of RxJava 1.x (March 31, 2018), after which no further development and bugfixes will be accepted. Are you in the process of moving to RxJava 2.x by any chance?

@pkolaczk
Copy link
Author

We'd like to, however the limiting factor at the moment is lack of RxScala for 2.x. A large part of our project is already on RxJava 2.x, so using just one version everywhere would be much better for everyone.

The options are either going to Scala 2.12 and using RxJava 2.x with no wrapper at all, thanks to much better Scala-Java lambda/SAM interop in Scala 2.12, or switching to something totally else. Monix maybe.

@pkolaczk pkolaczk deleted the fix-race-OperatorMerge-InnerSubscriber-onError branch February 15, 2018 11:53
@davidmoten
Copy link
Collaborator

LGTM, really appreciate this fix too @pkolaczk!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants