Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Await comply with reactive streams Subscriber rule 2.7 #3360

Merged
merged 6 commits into from Jul 8, 2022

Conversation

EgorKulbachka
Copy link
Contributor

Rule 2.7 requires Subscription methods to be executed serially.

There is a possibility of race between Subscription.request and Subscription.cancel methods since cancellation handler could be executed in a separate thread.

In particular this bug leads to database connection leaks if used with jooq subscription as cancel can happen while request being initialized and hence it misses to close connection.

There is a possibility of race of Subscription.request and
Subscription.cancel methods since cancellation handler could be executed
in a separate thread.
Rule
[2.7](https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md#2-subscriber-code)
requires Subscription methods to be executed serially.
@dkhalanskyjb
Copy link
Collaborator

Thank you! This is indeed a problem, and I don't think your solution is enough to fix it. Looks like it is possible that cont.cancel also happens in parallel with a cont.cancel in the onNext implementation. I think one can't get around having to use locking here.

@EgorKulbachka
Copy link
Contributor Author

Thank you for the quick feedback!

I'm not sure if parallel cancel executions might be the problem, as specified in subscription rules 3.5 cancel method itself should be thread-safe (i.e. without requiring external synchronization to ensure program correctness.)

I can try myself to introduce lock here but I feel it might be excessive as well might have additional performance implications which is hard for me to estimate.

@dkhalanskyjb
Copy link
Collaborator

Good point about 3.5, but still, I think we should comply with 2.7 to the letter. For example, see its proposed interpretation:

The intent of this rule is to permit the calling of the request and cancel methods (including from multiple threads) if and only if a happens-before relation between each of the calls is established.

I don't think there's room for ambiguity here: the calls must have a happen-before link between them.

I can try myself to introduce lock here but I feel it might be excessive as well might have additional performance implications which is hard for me to estimate.

I would estimate the implications as insignificant. @qwwdfsad has more in-depth knowledge of this and can correct me if I'm wrong, but as I understand it, the performance impact of synchronized is twofold:

  1. To establish happens-before relations, it must synchronize the state to the main memory,
  2. Some heavy machinery is used orchestrate threads in case of contention.

1 is insignificant, because using coroutines already causes the state to synchronize to the main memory on dispatches, and 2 is insignificant because, when threads don't fight for the right to enter a synchronized block, the orchestration is trivial and fast, but if they do, well, that's what we want to guard against.

@EgorKulbachka
Copy link
Contributor Author

Makes sense to me.

I've added synchronization to the subscription methods. Unfortunately I couldn't figure out how to avoid delay in the test with this solution.

@dkhalanskyjb
Copy link
Collaborator

dkhalanskyjb commented Jul 7, 2022

Great, thanks! If, at any point, you feel like you don't want to continue working on this pull request, please say so, and we will polish it ourselves, because we want it to land in the next release, which should happen soon. You will be credited in any case.

Regarding the test: yes, I think we need to fix it, but more radically.

Currently, the test is structured in such a way that it will pass most of the time even if the bug is not fixed: I removed the @Synchronized annotation, but the test still passes successfully. So, it clearly doesn't test that there is no bug.

What do we want to test here? Judging by the name of the pull request, we want to test that rule 2.7 is upheld even in the presence of cancellation and multithreading, that is, that the operations on a subscriber are not entered in parallel. This feels like a good job for a stress test. We have plenty of those (see PublisherRequestStressTest.kt for an example), and they are a type of test where you spin some code in a loop for many iterations and check that what you want to uphold was not violated in any of them.

So, in this case, a good test, I think, would be something like this: create a subscriber that sets some variable to 1 when request or cancel is entered, and to 0 when the execution leaves the operation. Crash if, on trying to call request or cancel, that variable is already 1. Then, in parallel do awaitOne on such a subscriber and cancellation of the scope in which the awaitOne happens. Do all of the above many times.

If you do write this test, please also check that, when the bug is present, the test finds it consistently.

@EgorKulbachka
Copy link
Contributor Author

Thanks for the suggestions!

I've added suggested test, it fails for me locally only when I have sleep in request method. Please let me know if this is the right direction you had in mind.

@dkhalanskyjb
Copy link
Collaborator

dkhalanskyjb commented Jul 8, 2022

testPublisher is almost exactly what I had in mind.

  • sleep is a sign that something is heavily dependent on the execution time, which is not good. More on that below.
  • This publisher never sends any values, so it can't check that there is no race between cancellation and calls to sub.cancel in onNext.

Now, the test procedure itself could be improved significantly. Let's look at what happens currently.

  • First, jobsToRun jobs are created and launched. On my machine, this process takes about 40 ms, give or take. These jobs don't wait for the moment cancellation is allowed to start to happen, they just start working immediately, so the first job has about 40 ms to run before job.cancel happens.
  • Those 40 ms are more than enough for the request in the first job to finish, so this is why you need sleep: otherwise, there is no race and, by the time cancellation happens, the publisher is just waiting to be cancelled.
  • While cancellation of the first n publishers happens, the subsequent publishers are running as well, so every publisher suffers from this issue. Increasing their number doesn't help, it just ensures that each publisher has even more time to execute and thus avoid the race.

The fact that publishers are running in parallel also obscures the results: there aren't that many cores in the typical CPU, so, despite thousands of coroutines launching, less than 20 of them will typically execute in parallel.

What can be done here to improve the situation: instead of launching everything and only then cancelling everything, you can perform many self-contained iterations that don't influence one another in any way. PublisherCompletionStressTest, for example, follows this general pattern. In each iteration, launch a single coroutine and immediately try to cancel it in parallel. This way, cancellations should have a good chance to race with calls to request, even without sleep.

@EgorKulbachka
Copy link
Contributor Author

I've rewritten the test addressing your comments, please let me know what you think.

When I've added onNext to the test it reproduces the issue better but I faced another problem: we technically still call cancel from within request (when Subscription calls onNext inside request) and hence counter checks do not work for test anymore, I've changed that to use Lock instead to make sure no concurrent threads execute methods in parallel.

Do you think this is good enough to consider cancel and request are executed serially?
I find a lot of Subscriber implementations follow same pattern (example project reactor) but I guess it's not really serial execution since we run cancel method before request method return. Curious to hear your opinion.

Thanks for the patience and explanations btw.

@dkhalanskyjb dkhalanskyjb merged commit 8b6473d into Kotlin:develop Jul 8, 2022
@dkhalanskyjb
Copy link
Collaborator

This test doesn't actually check the synchronization in onNext, because request is itself already synchronized and cancellation can't happen in parallel with it. This is unfortunate, but, looking at this, I only see ways to test this that involve fairly advanced tricks, which may just not be worth it.

So, I think this is good to go. Good work, thank you!

@EgorKulbachka EgorKulbachka deleted the cancellation-race branch July 8, 2022 12:01
@sadensmol
Copy link

Really nice fix, we are struggling from it for a long time! Thank you @EgorKulbachka

@dkhalanskyjb
Copy link
Collaborator

@sadensmol we take such issues seriously, so if you experience something else of the sort, please feel free to report it.

pablobaxter pushed a commit to pablobaxter/kotlinx.coroutines that referenced this pull request Sep 14, 2022
There is a possibility of a race between Subscription.request and
Subscription.cancel methods since cancellation handler could be
executed in a separate thread.
Rule
[2.7](https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md#2-subscriber-code)
requires Subscription methods to be executed serially.
pablobaxter added a commit to pablobaxter/kotlinx.coroutines that referenced this pull request Sep 14, 2022
…to baxter/upstream-flow-timeout

* origin/baxter/upstream-flow-timeout: (328 commits)
  Commit API dump
  Cleanup API, update knit
  Fix typo in runTest method docs (Kotlin#3417)
  Update coroutines-and-channels.md (Kotlin#3410)
  chore: update the website's release step (Kotlin#3397)
  ktl-695 chore: support Dokka HTML customization (Kotlin#3388)
  update: KT-50122 adding kotlinx.dependencies
  Improve bump-version.sh (Kotlin#3365)
  Fix documentation for `DEBUG_PROPERTY_VALUE_OFF` (Kotlin#3389)
  feat: moving coroutines hands-on to docs (Kotlin#3369)
  Version 1.6.4
  Improve CoroutineDispatcher documentation (Kotlin#3359)
  Update binary compatibility validator to 0.11.0 (Kotlin#3362)
  Add a scope for launching background work in tests (Kotlin#3348)
  Fix debug module publication with shadow plugin (Kotlin#3357)
  Comply with Subscriber rule 2.7 in the `await*` impl (Kotlin#3360)
  Update readme (Kotlin#3343)
  Reduce reachable references of disposed invokeOnTimeout handle (Kotlin#3353)
  breakleg; knit validation fix
  Additional comment in CoroutineScheduler
  ...

# Conflicts:
#	README.md
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants