Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Breaking: Rethink atomicity of certain low-level primitives #1813

Closed
elizarov opened this issue Feb 17, 2020 · 18 comments
Closed

Breaking: Rethink atomicity of certain low-level primitives #1813

elizarov opened this issue Feb 17, 2020 · 18 comments

Comments

@elizarov
Copy link
Contributor

elizarov commented Feb 17, 2020

There is a number of primitives in the library that provide "atomic cancellation":

As their documentation states:

Cancellation of suspended invocation is atomic — when this function throws CancellationException it means that the operation was not performed. As a side-effect of atomic cancellation, a thread-bound coroutine (to some UI thread, for example) may continue to execute even after it was canceled from the same thread in the case when this operation was already resumed and the continuation was posted for execution to the thread's queue.

This is a surprising behavior that needs to be accounted for when writing higher-level primitives like Flow. See #1265 as an example problem and discussion in #1730.

It all points to the need to rethink the atomic cancellation behavior of the underlying primitives.

More background on the atomic cancellation

The concept of atomic cancellation has been introduced early in the design of kotlinx.coroutines, before 1.0 release. Originally it was used only in channels with the goal of enabling safe ownership transfer of closeable resources between different coroutines.

Hereinafter we use the word resource to denote a reference to any object that must be explicitly closed. Usually, but not always, it encapsulates some native resource that is not managed by garbage collection. Losing a reference to such a resource creates a leak.

For example, consider a coroutine that opens a file and gets an InputStream. InputStream is a resource that must be explicitly closed after use. Now, if the coroutine needs to send it to another coroutine for processing via val channel = Channel<InputStream>(), then it has the following code:

val input = file.inputStream() // opens an input stream to a file
channel.send(input) // send for processing

the other coroutine receives the input stream and continues to work with it, closing it afterward:

val input  = channel.receive()
input.use { process(it) } // closes at the end

If send and receive were simply cancellable, as most other suspending functions created with the help of suspendCancellableCoroutine are, then the sender would have no way of knowing if the send had actually completed successfully or not. It could be canceled before completing send or it could send and then get canceled. The receiver can get canceled after receiving it, too. There would be no way to ensure safe ownership transfer from one coroutine to another. As a solution to this problem the concept of "atomic cancellation" was born. In kotlinx.coroutines up until now, the above code is given the following guarantees:

  • Atomic send cancellation: if send completes successfully then the reference was put into the channel, otherwise it was not.
  • Atomic receive cancellation: if receive completes successfully then the reference was retrieved from the channel, otherwise it was not.

Together, these two guarantees support the leak-free transfer of ownership between coroutines when the code on the send side modified in a specific way like this:

val input = file.inputStream() // opens file
try { 
    channel.send(input)
} catch (e: Throwable) 
    input.close() // close input on any exception, we know it was not delivered
    throw e // retrow
}

Public API status of atomic cancellation

The "atomic cancellation" was never widely publicized. It was briefly documented by a single paragraph in both send and receive documentation, but it was not mentioned in any of the guides concerning coroutines. Moreover, as it was shown above, the atomic cancellation itself only enables safe ownership transfer. One still has to write the code with extreme caution to ensure that resources are transferred safely between coroutines and the corresponding code one has to write was never explained in the documentation before.

The atomic cancellation was only employed by a small number of kotlinx.coroutines functions and it was not possible to write user-defined primitives with similar atomic-cancellation behavior. The corresponding low-level mechanisms are internal in the library.

Problems with atomic cancellation

Atomic cancellation creates a major hurdle when using channels in UI code. In UI application cancellation is used to manage the lifecycle of UI elements. For example, consider a coroutine that is running in the scope of some UI view:

val data = dataChannel.receive() // receive some data
updateUI(data)

When the data is received from the channel the line with updateUI(data) is not executed immediately. The corresponding continuation is scheduled for execution onto the main thread and needs to wait until the main thread is available. However, while this continuation waits in the queue the corresponding UI view might get destroyed. Normally, with all the other cancellable suspending functions that use suspendCancellableCoroutines, when the main thread is finally ready to execute the continuation the cancellation state of the coroutine is checked and the CancellationException is thrown instead of executing updateUI(data). However, this check for cancellation is not performed for Channel.receive continuation because there is an "atomic cancellation" guarantee for receive. The data element was already received and must be delivered, so updateUI(data) will get executed despite the fact that the coroutine is already canceled. On Android, in particular, an attempt to update UI view that was already destroyed would lead to an exception.

In practice, it means that every time a Channel (or another atomically-cancellable primitive) is used in the main thread, one must not forget to manually add the check for cancellation of the current coroutine:

val data = dataChannel.receive() // receive some data
ensureActive() // check for cancellation manually
updateUI(data)

Forgetting to add this check leads to hard-to-find bug that only rarely manifests itself.

This design creates irregularities in the coroutines API surface. All suspending functions in kotlinx.coroutines are cancellable in a regular fashion and it is guaranteed that when they resume successfully the corresponding coroutine was not canceled. However, there are a few exceptional functions with "atomic cancellation" behavior that one must remember by heart. There is no consistent naming to make them stand apart. They look like all the other suspending functions and they are cancellable, too, but they can resume successfully when the coroutine was already cancellated. Tricky and error-prone.

Moreover, atomic cancellation does not make resource transfer easy. Even with atomic cancellation, it is still a tricky and error-prone endeavor. In addition to the intricate code-dance, one has to perform (as shown above), there is a problem with channel cancellation. When the receiver on the channel does not plan to continue receiving data and cancels the whole channel to indicate that, all references that were stored in the channel buffer are simply dropped. It means that channel cancellation cannot be used when channel is used to transfer resources. You can work around it when you use a channel directly (avoid cancellation, manually retrieve and close all resources from the channel you no longer need), but this makes it impossible to design resource-leak-free high-level primitives like Kotlin Flow that must rely on channel cancellation for their own needs.

All in all, atomic cancellation fails to deliver on the promise of being a safe solution for resource transfer from one coroutine to another.

Additional details on Mutex and Semaphore

The atomic cancellation of Mutex.lock and Semaphore.acquire does not bring any advantages as with channels, but only all the problems. The typical pattern of mutex/semaphore use is:

mutex.lock()
try {
    doSomething()
} finally { 
    mutex.unlock()
}

The correctness of this pattern does not require that lock is atomically cancellable in the same way as send. All it needs is that a canceled lock attempt does not keep the mutex locked. CancellableContinuation already provides resume(value) { doOnCancel() } API to enable a correct lock implementation. See resume.

Proposed solution

The proposal is to completely drop the concept of "atomic cancellation" from kotlinx.coroutines and make all the suspending functions in the library to be consistently cancellable in a regular way with a guarantee that a suspending function does not resume normally in a canceled coroutine.

In order to address the rare use-case of actually sending resources between coroutines via channels a different easy-to-use mechanism will be introduced together with this change. See #1936 for a concrete proposal.

The key rationale for this particular and radical way to address the problem is that atomic cancellation was underdocumented and very error-prone to start with. Thus, it is extremely unlikely that there is a significant amount of code in the wild that correctly exploits those atomic cancellation guarantees to perform a leak-free transfer of resources between coroutines. At the same time, changing the behavior to regular cancellation will likely fix a lot of hard-to-find cancellation bugs in the existing Android applications that use coroutines.

Impact of the change

This is breaking change. Some code might have been relying on the atomic cancellation guarantees. After this change this code might encounter two kinds of errors:

  • Receiving CancellationException from channel.send the code might assume that item was not sent to the channel and will go to close the corresponding resource. However, in fact, without atomic cancellation guarantee on send, the resource might have been actually delivered to the receiver.
  • The receive call for a resource what was previously successfully sent channel to the channel might get CancellationException without atomic cancellation guarantee on channel.receive. Thus, the reference is lost and and the resource leaks.

Whether this is an acceptable behavior change for a major release is TBD. The key question is whether there are any widely used libraries that could be seriously impacted by this change. We plan to publish at least one -Mx (milestone) release with the proposed change so that the impact of this change could be evaluted while running the actual code with the updated kotlinx.coroutines.

Possible alternatives

We can try to maintain a certain degree of backward compatibility while making this change:

Option 1: Maintain binary backward compatibility. It means that we'll retain the old behavior (with atomic cancellation) for previously compiled code but any code that gets compiled against new version of kotlinx.compatibility will get new behavior (without atomic cancellation).

  • Cons: It will vastly complicate the code and this kind of backward compatibility guarantee will be very hard to test and to support in the future.

Option 2: Maintain source backward compatibility. It means that old send and receive will behave as before (with atomic cancellation) and some new methods to perform regularly cancellable send and receive are introduced. It can be done by either having a dedicated Channel constructor to customize cancellation behavior or dedicated send/receive methods with new names.

  • Cons: It will make the public API surface of kotlinx.coroutines more complex. What's worse, as was shown in the preceding overview, the whole atomic cancellation behavior is hard-to-use and error-prone anyway. It does not deserve to be maintained in the long run, so the corresponding "old" methods that perform atomically-cancellable send/receive will have to be deprecated, meaning a transition to less natural names.
@elizarov elizarov changed the title Rethink atomicity of certains low-level primitives Breaking: Rethink atomicity of certains low-level primitives Apr 15, 2020
@elizarov elizarov changed the title Breaking: Rethink atomicity of certains low-level primitives Breaking: Rethink atomicity of certain low-level primitives Apr 15, 2020
@pakoito
Copy link

pakoito commented Apr 15, 2020

It could help us understand the issue better if you had some tests to prove the old behavior's pitfalls, and some code proposals or tests that should pass with the fixes applied.

@elizarov
Copy link
Contributor Author

It could help us understand the issue better if you had some tests to prove the old behavior's pitfalls, and some code proposals or tests that should pass with the fixes applied.

@pakoito The pitfalls of atomic cancellation can be demonstrated with this code:

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

fun main() {
    // The main scope that is similar to a typical UI application (the single main thread)
    val mainScope = CoroutineScope(newSingleThreadContext("TheMainThread"))
    val channel = Channel<String>(1) // buffered channel to make example simpler
    // consumer coroutine
    val consumer = mainScope.launch {
        val ok = channel.receive()
        check(ok == "OK") // this is fine
        check(isActive) // WTF! The coroutine was cancelled, but we are still running here! FAILS HERE !!!
    }
    // producer coroutine
    mainScope.launch {
        channel.send("OK") // send to the channel
        mainScope.cancel() // now the main scope is cancelled
    }
    // wait for test to run
    runBlocking { consumer.join() }
}

Try it in Kotlin Playground: https://pl.kotl.in/x5Bi-IiAO and see that it gets:

Exception in thread "TheMainThread" java.lang.IllegalStateException: Check failed.
	at FileKt$main$consumer$1.invokeSuspend(File.kt:12)

That is failing line 12 is check(isActive). It is a very counterintuitive behavior that a canceled coroutine had resumed normally. More so problematic as it affects only channels. If you do something like delay(100), then you will not experience such a problem.

With the proposed change the same code completes without errors because line 10 with val ok = channel.receive() would throw a CancellationException, preventing further execution of the canceled consumer coroutine.

@SeekDaSky
Copy link

Thank you for the detailed proposal.

My humble opinion is: don't pollute the API and simply drop the atomic cancellation feature (Option 1), as you state this is a "confidential" feature. I don't know if you follow semantic versioning, but I would expect such change to appear in a major (ie. 2.0), making this change for a minor (1.4) could throw off the rare people using it.

I suspect the versioning of kotlinx.coroutines is coupled with the version of Kotlin, making the release of major impossible, if this is the case I don't have any idea on how to solve your problem apart from deprecating the feature and remove it as quickly as possible (introduce the change in 1.4.0, remove the atomic cancellation in 1.4.1).

For me the behavior of this feature is so unexpected that I don't expect a lot of people to mind the change if the "main" API stays the same and that you provide an alternative API for the use-case atomic cancellation was designed for.

elizarov added a commit that referenced this issue Apr 21, 2020
This is a problematic for Android when Main dispatcher is cancelled on destroyed activity.
Atomic nature of channels is designed to prevent loss of elements,
which is really not an issue for a typical application, but creates problem when used with channels.

* Internal suspendAtomicCancellableCoroutine -> suspendCancellableCoroutine
* Internal suspendAtomicCancellableCoroutineReusable -> suspendCancellableCoroutineReusable
* Remove atomic cancellation from docs
* Ensures that flowOn does not resume downstream after cancellation.
* MODE_ATOMIC_DEFAULT renamed into MODE_ATOMIC
* Introduced MODE_CANCELLABLE_REUSABLE to track suspendCancellableCoroutineReusable
* Better documentation for MODE_XXX constants.
* Added stress test for proper handling of MODE_CANCELLABLE_REUSABLE
  and fixed test for #1123 bug with job.join (working in MODE_CANCELLABLE) that was not
  properly failing in the absence of the proper code in CancellableContinuationImpl.getResult
* Added test for Flow.combine that should be fixed
* Support extended invokeOnCancellation contract
* Introduced internal tryResumeAtomic

Fixes #1265
Fixes #1813
Fixes #1915
elizarov added a commit that referenced this issue Apr 22, 2020
This is a problematic for Android when Main dispatcher is cancelled on destroyed activity.
Atomic nature of channels is designed to prevent loss of elements,
which is really not an issue for a typical application, but creates problem when used with channels.

* Internal suspendAtomicCancellableCoroutine -> suspendCancellableCoroutine
* Internal suspendAtomicCancellableCoroutineReusable -> suspendCancellableCoroutineReusable
* Remove atomic cancellation from docs
* Ensures that flowOn does not resume downstream after cancellation.
* MODE_ATOMIC_DEFAULT renamed into MODE_ATOMIC
* Introduced MODE_CANCELLABLE_REUSABLE to track suspendCancellableCoroutineReusable
* Better documentation for MODE_XXX constants.
* Added stress test for proper handling of MODE_CANCELLABLE_REUSABLE
  and fixed test for #1123 bug with job.join (working in MODE_CANCELLABLE) that was not
  properly failing in the absence of the proper code in CancellableContinuationImpl.getResult
* Added test for Flow.combine that should be fixed
* Support extended invokeOnCancellation contract
* Introduced internal tryResumeAtomic

Fixes #1265
Fixes #1813
Fixes #1915
elizarov added a commit that referenced this issue Apr 22, 2020
This is a problematic for Android when Main dispatcher is cancelled on destroyed activity.
Atomic nature of channels is designed to prevent loss of elements,
which is really not an issue for a typical application, but creates problem when used with channels.

* Internal suspendAtomicCancellableCoroutine -> suspendCancellableCoroutine
* Internal suspendAtomicCancellableCoroutineReusable -> suspendCancellableCoroutineReusable
* Remove atomic cancellation from docs
* Ensures that flowOn does not resume downstream after cancellation.
* MODE_ATOMIC_DEFAULT renamed into MODE_ATOMIC
* Introduced MODE_CANCELLABLE_REUSABLE to track suspendCancellableCoroutineReusable
* Better documentation for MODE_XXX constants.
* Added stress test for proper handling of MODE_CANCELLABLE_REUSABLE
  and fixed test for #1123 bug with job.join (working in MODE_CANCELLABLE) that was not
  properly failing in the absence of the proper code in CancellableContinuationImpl.getResult
* Added test for Flow.combine that should be fixed
* Support extended invokeOnCancellation contract
* Introduced internal tryResumeAtomic

Fixes #1265
Fixes #1813
Fixes #1915
@elizarov
Copy link
Contributor Author

Update: A different, simpler, and more convenient replacement for safe transfer of resources via channels will be introduced. See #1936.

elizarov added a commit that referenced this issue Apr 22, 2020
This is a problematic for Android when Main dispatcher is cancelled on destroyed activity.
Atomic nature of channels is designed to prevent loss of elements,
which is really not an issue for a typical application, but creates problem when used with channels.

* Internal suspendAtomicCancellableCoroutine -> suspendCancellableCoroutine
* Internal suspendAtomicCancellableCoroutineReusable -> suspendCancellableCoroutineReusable
* Remove atomic cancellation from docs
* Ensures that flowOn does not resume downstream after cancellation.
* MODE_ATOMIC_DEFAULT renamed into MODE_ATOMIC
* Introduced MODE_CANCELLABLE_REUSABLE to track suspendCancellableCoroutineReusable
* Better documentation for MODE_XXX constants.
* Added stress test for proper handling of MODE_CANCELLABLE_REUSABLE
  and fixed test for #1123 bug with job.join (working in MODE_CANCELLABLE) that was not
  properly failing in the absence of the proper code in CancellableContinuationImpl.getResult
* Added test for Flow.combine that should be fixed
* Support extended invokeOnCancellation contract
* Introduced internal tryResumeAtomic

Fixes #1265
Fixes #1813
Fixes #1915
@1zaman
Copy link
Contributor

1zaman commented Jun 7, 2020

While I agree with removing the atomic cancellation behaviour, it does lead to a highly undesirable and non-intuitive situation where an element that was successfully sent to a Channel can simply be dropped due to receiver cancellation, even while there are other active receivers that are eligible to receive the element.

Many Channel users might want to guard against that by using the onElementCancel parameter (proposed in #1936) to launch a coroutine that would try to resend the element to the channel if it is not closed. However, there is no API to close a Channel in a safe manner that ensures that all previously sent items are successfully received before closing. And of course, there's no way to close a Channel for sends, but not resends, since the Channel has no way of differentiating between them.

Another option is to implement the resending functionality in the Channel itself. This would also allow it to be closed safely, by immediately closing it for new sends, but keeping it open for receivers and resends until all elements have been received successfully. It's not clear which scope the retries should be performed in, so the Channel will need to take a retry scope parameter in order to do this. This would also remove ordering guarantees, since receivers could now receive elements out of the order in which they were originally sent.

Even if this is not implemented, this problem and the potential solutions should probably be at least explicitly mentioned in the Channel documentation, since it may not be clear unless someone understands how Channel works at a deep level, and can cause unanticipated issues if not properly understood and handled.

@pacher
Copy link

pacher commented Jun 7, 2020

Just my 2 cents:
Option 3: change the behavior of send and recieve but add two more methods with old behavior. Something like oldSendWithAtomicCancellation. Mark them obsolete, deprecate and delete in future versions. Describe it properly in release notes and migration guide if one exists. This way advanced folks who relied on this behavior could simply change method names in their code. But it would be possible to upgrade immediately and take time to work around usage of atomic in the codebase while using 1.4 already instead of being stuck with 1.3

@1zaman
Copy link
Contributor

1zaman commented Jun 7, 2020

Following up on my previous comment, I have realized that there's also a way for Channel to implement resending functionality while retaining ordering guarantees, though it can significantly impact receiver latency if many elements are sent in a short time period. This tradeoff might be acceptable in cases where sends are known to be infrequent, but both delivery and ordering of all the elements are important.

This can be done if all receiver dispatches are performed sequentially, by suspending all other receivers while there's any ongoing dispatch to a receiver. Then if the ongoing dispatch is cancelled, the Channel can simply try to resend the element to the next receiver, with the guarantee that this receiver won't have received any other (later) elements during this period. There could also be some optimizations done in specific cases, like preemptive dispatching if multiple contiguous receivers share the same dispatcher.

Since this would allow a single slow dispatcher of a receiver to completely halt all other receivers, there could also be a receiver dispatching timeout implemented, after which the dispatcher would be cancelled by the Channel if there are other receivers waiting in the queue, and the element sent to the next receiver.

@elizarov
Copy link
Contributor Author

elizarov commented Jun 8, 2020

Many Channel users might want to guard against that by using the onElementCancel parameter (proposed in #1936) to launch a coroutine that would try to resend the element to the channel if it is not closed.

@1zaman When you say "many users" do you have a specific use-case in mind? What kind of application might need it? Can you elaborate on this scenario, please.

@elizarov
Copy link
Contributor Author

elizarov commented Jun 8, 2020

Option 3: change the behavior of send and recieve but add two more methods with old behavior. Something like oldSendWithAtomicCancellation. Mark them obsolete, deprecate and delete in future versions. Describe it properly in release notes and migration guide if one exists. This way advanced folks who relied on this behavior could simply change method names in their code.

@pacher First of all we need to find those "advanced folks" who had relied on this underdocumented behavior to actually implement a correct code and figure out what was their use of atomic cancellation. Do you have any specific examples in mind?

@pacher
Copy link

pacher commented Jun 8, 2020

@elizarov No, unfortunately not. I am not affected by this change, but I could have been. Trying to image myself in this situation and my suggestion is what would suit me, personally, the best. Read release notes - find and replace - deal with it later.
I am the one who catches exceptions from channel.offer (#974) and I will be sad to see it deprecated. I also like the name offer and it is a pity to loose it to trySend. I would prefer to have old behavior in offerThrowing and change semantics of offer.

As for finding "advanced folks", we can't expect everybody reading every issue in the repo, but it is reasonable to expect people to read release notes and migration guide for at least "major" version bump. Hence the suggestion.

@1zaman
Copy link
Contributor

1zaman commented Jun 8, 2020

@elizarov: Let's consider for example an Android app which receives push notifications, and wants to have them displayed as a dialog inside the app. This can be done by creating a Channel where all these notifications are sent, and have every Activity launch a coroutine in it's foreground scope to receive them from it, and show them as dialogs to the user.

Now if a new Activity is launched on top of a previous one, then the previous Activity will be moved to the background, and the coroutine that it had launched to receive notifications from the Channel will be cancelled, while the new Activity will launch a new coroutine of it's own to start receiving notifications. If there were any notifications that were in transit to the previous Activity when this happened, then they will be discarded and lost, and not delivered to the new Activity.

To keep the case simple, I left out a fallback for handling the notifications when the app is in the background, which in most cases would be implemented by having them shown in Android's notification drawer. This could be achieved by adding a coroutine to receive and handle the notifications when the app is in the background, which would encounter the same issues mentioned before. I didn't include this in the main scenario, because there could also be other ways to implement a fallback, such as by using offer, but that would still have issues in the described scenario and might also have other issues.

@LouisCAD
Copy link
Contributor

LouisCAD commented Jun 9, 2020

@1zaman I don't think Channel is the right fit for the use case you describe. SharedFlow (not available yet) or something custom, or a combination seems more appropriate to do this. You probably want to ensure the Activity or whatever dealt with the notification without being cancelled too soon BTW.

@1zaman
Copy link
Contributor

1zaman commented Jun 10, 2020

@LouisCAD: Channel seems to be conceptually the right tool for sending events that can have multiple handlers (though in this case only one at a time), but which should only be processed once. The only problem is that after the change proposed here, the events can be lost due to receiver cancellation, which can be overcome by adding the resending functionality. This functionality is possible to implement even with the proposed onElementCancel public API, though it will require a level of indirection in the proposed implementation, since it needs to be provided as a parameter in the Channel constructor, and as such doesn't have a reference to the Channel.

I don't need the sharing/broadcasting feature provided by SharedFlow, nor is it's replaying/caching feature the right solution to this problem, even though there seems to have been some discussion in #2034 about using SharedFlow for something like this.

Note that this problem will be present in any scenario where scoped or cancellable coroutines receive from a Channel. My only experience with using a non-global scope has been in UI-bound code in Android, so I came up with an example related to that, but there may be other use-cases that are impacted by this issue as well.

elizarov added a commit that referenced this issue Jun 16, 2020
This is a problematic for Android when Main dispatcher is cancelled on destroyed activity.
Atomic nature of channels is designed to prevent loss of elements,
which is really not an issue for a typical application, but creates problem when used with channels.

* Internal suspendAtomicCancellableCoroutine -> suspendCancellableCoroutine
* Internal suspendAtomicCancellableCoroutineReusable -> suspendCancellableCoroutineReusable
* Remove atomic cancellation from docs
* Ensures that flowOn does not resume downstream after cancellation.
* MODE_ATOMIC_DEFAULT renamed into MODE_ATOMIC
* Introduced MODE_CANCELLABLE_REUSABLE to track suspendCancellableCoroutineReusable
* Better documentation for MODE_XXX constants.
* Added stress test for proper handling of MODE_CANCELLABLE_REUSABLE
  and fixed test for #1123 bug with job.join (working in MODE_CANCELLABLE) that was not
  properly failing in the absence of the proper code in CancellableContinuationImpl.getResult
* Added test for Flow.combine that should be fixed
* Support extended invokeOnCancellation contract
* Introduced internal tryResumeAtomic

Fixes #1265
Fixes #1813
Fixes #1915
elizarov added a commit that referenced this issue Sep 16, 2020
This is a problematic for Android when Main dispatcher is cancelled on destroyed activity.
Atomic nature of channels is designed to prevent loss of elements,
which is really not an issue for a typical application, but creates problem when used with channels.

* Internal suspendAtomicCancellableCoroutine -> suspendCancellableCoroutine
* Internal suspendAtomicCancellableCoroutineReusable -> suspendCancellableCoroutineReusable
* Remove atomic cancellation from docs
* Ensures that flowOn does not resume downstream after cancellation.
* MODE_ATOMIC_DEFAULT renamed into MODE_ATOMIC
* Introduced MODE_CANCELLABLE_REUSABLE to track suspendCancellableCoroutineReusable
* Better documentation for MODE_XXX constants.
* Added stress test for proper handling of MODE_CANCELLABLE_REUSABLE
  and fixed test for #1123 bug with job.join (working in MODE_CANCELLABLE) that was not
  properly failing in the absence of the proper code in CancellableContinuationImpl.getResult
* Added test for Flow.combine that should be fixed
* Support extended invokeOnCancellation contract
* Introduced internal tryResumeAtomic

Fixes #1265
Fixes #1813
Fixes #1915
recheej pushed a commit to recheej/kotlinx.coroutines that referenced this issue Dec 28, 2020
…otlin#1937)

This is a problematic for Android when Main dispatcher is cancelled on destroyed activity.
Atomic nature of channels is designed to prevent loss of elements,
which is really not an issue for a typical application, but creates problem when used with channels.

* Internal suspendAtomicCancellableCoroutine -> suspendCancellableCoroutine
* Internal suspendAtomicCancellableCoroutineReusable -> suspendCancellableCoroutineReusable
* Remove atomic cancellation from docs
* Ensures that flowOn does not resume downstream after cancellation.
* MODE_ATOMIC_DEFAULT renamed into MODE_ATOMIC
* Introduced MODE_CANCELLABLE_REUSABLE to track suspendCancellableCoroutineReusable
* Better documentation for MODE_XXX constants.
* Added stress test for proper handling of MODE_CANCELLABLE_REUSABLE
  and fixed test for Kotlin#1123 bug with job.join (working in MODE_CANCELLABLE) that was not
  properly failing in the absence of the proper code in CancellableContinuationImpl.getResult
* Added test for Flow.combine that should be fixed
* Support extended invokeOnCancellation contract
* Introduced internal tryResumeAtomic
* Channel onUnderliveredElement is introduced as a replacement.

Fixes Kotlin#1265
Fixes Kotlin#1813
Fixes Kotlin#1915
Fixes Kotlin#1936

Co-authored-by: Louis CAD <louis.cognault@gmail.com>
Co-authored-by: Vsevolod Tolstopyatov <qwwdfsad@gmail.com>
recheej pushed a commit to recheej/kotlinx.coroutines that referenced this issue Dec 28, 2020
…otlin#1937)

This is a problematic for Android when Main dispatcher is cancelled on destroyed activity.
Atomic nature of channels is designed to prevent loss of elements,
which is really not an issue for a typical application, but creates problem when used with channels.

* Internal suspendAtomicCancellableCoroutine -> suspendCancellableCoroutine
* Internal suspendAtomicCancellableCoroutineReusable -> suspendCancellableCoroutineReusable
* Remove atomic cancellation from docs
* Ensures that flowOn does not resume downstream after cancellation.
* MODE_ATOMIC_DEFAULT renamed into MODE_ATOMIC
* Introduced MODE_CANCELLABLE_REUSABLE to track suspendCancellableCoroutineReusable
* Better documentation for MODE_XXX constants.
* Added stress test for proper handling of MODE_CANCELLABLE_REUSABLE
  and fixed test for Kotlin#1123 bug with job.join (working in MODE_CANCELLABLE) that was not
  properly failing in the absence of the proper code in CancellableContinuationImpl.getResult
* Added test for Flow.combine that should be fixed
* Support extended invokeOnCancellation contract
* Introduced internal tryResumeAtomic
* Channel onUnderliveredElement is introduced as a replacement.

Fixes Kotlin#1265
Fixes Kotlin#1813
Fixes Kotlin#1915
Fixes Kotlin#1936

Co-authored-by: Louis CAD <louis.cognault@gmail.com>
Co-authored-by: Vsevolod Tolstopyatov <qwwdfsad@gmail.com>
@yorickhenning
Copy link
Contributor

yorickhenning commented May 4, 2021

Sorry, I know this ship has sailed, but I'd like to leave a note.

Atomic cancellation was the most important differentiating feature between Coroutines and other process concurrency libraries I've used. I rather liked it.

Some of my code got broken when 1.4.X rolled out to me. I understood atomic cancellation and send/receive under cancellation, and I used it to write in-order responses from sequentially-started concurrently-executing hedged requests, without paying join() latency in any coroutine holding the write lock.

This is a toy version of the algorithm using a Mutex barrier (playground with the sample):

suspend fun hedge(): List<Int> {
  val mutex = Mutex()
  val producers = mutableListOf<Job>()
  val output = mutableListOf<Int>()

  coroutineScope {
    producers += launch(start = CoroutineStart.LAZY) {
      val response: Int = expensiveUserDefinedRequest(1)
      mutex.withLock {
        output += response
      }
    }

    producers += launch(start = CoroutineStart.LAZY) {
      val response: Int = expensiveUserDefinedRequest(2)
      mutex.withLock {
        producers[0].cancel()
        output += response
      }
    }

    for (job in producers) {
      job.start()
    }
  }
    
  return output
}

private suspend fun expensiveUserDefinedRequest(input: Int): Int {
  delay(1)
  return input
}

Under "prompt cancellation" and given that Mutex.lock() suspends, I'd expect the valid return values to be [1, 2], and [2]. I think in earlier versions of Coroutines, those were the only valid return values. Mutex.lock() atomically either:

  1. Check-failed isActive then passed the barrier uncontended
  2. Suspended, cancellable, checking (an effectively volatile?) isActive at resumption

I've tried 1.3.3 and 1.3.9 and Mutex.lock() doesn't have atomic cancellation. My memory and the docs suggest that it did at some point, but I haven't binary-searched to figure out when/how it changed.

The real hedge() uses a Channel.send() barrier, not a Mutex.lock() barrier. First, because the real hedge() is a concurrent Flow, and second, because Mutex didn't cancel as documented, but Channel.send() did - the original RENDEZVOUS Channel did, at least. Which is why 1.4 caused trouble.

Anyway, with the breaking change to cancellation at send(), [2, 1] became a valid output, and the algorithm stopped being correct. The rate of out-of-order responses is 1/10,000 or thereabouts in the Playground sample, and I think the rate in the real implementation was similar. So, unit tests didn't find the mistake before rolling out 1.4, or in about a month of CI runs. A flake detector found it later.

My understanding of "prompt cancellation" is still related to happens-before, and Mutex even calls out synchronized to describe itself. Mutex says it has a "prompt cancellation guarantee" but also says it "doesn't check for cancellation when it's not suspended". It doesn't however define when the suspend lock() function will/will not suspend. Is lock() guaranteed not to suspend and check cancellation if the Mutex is free when a coroutine calls lock(), or does it do a best-effort check for cancellation in lock()?

As implemented now, cancellation is racy in a way that suggests that cancellation is a best-effort check at barriers. I've read suspendCancellableCoroutine(), and I think it's clear about how continuations work, but what's not clear is how barrier primitives relate to the creation of cancellable continuations.

My mental model of Mutex was:

lock() // Barrier and suspend point. Checks cancellation on resumption. Critical section begins.
try {
  block()
} finally
  unlock() // Critical section ends.
}

My new mental model is more like:

if (!isActive) {
  throw CancellationException()
}
if (tryLock()) { // Doesn't check isActive.
  try {
    block()
  } finally {
    unlock()
  }
} else {
  lock() // Checks isActive.
  try {
    block()
  } finally {
    unlock()
  }
}

That might be faster sometimes, but if I need to trade in relaxed cancellation checking at a hot lock, I can write a fast path. As the default, I found deterministic cancellation the better choice for correctness. It allowed unit tests (using real pool Dispatchers, not the undispatched single-threaded test Dispatcher) to be fairly sure there was no low-probability race unique to cancellation cases. Now, only load tests are likely to find those cancellation corner cases.

With the new cancellation model, if I want to avoid "oops kept running after cancellation" bugs after entering a shared critical section, I can check the isActive bit after passing the barrier to make cancellation deterministic. That's quite like checking Thread.interrupted() after locking a shared Java intrinsic lock.

It'd be good if coroutines' atomicity model didn't change between versions without an API break. Either accidentally (like I think Mutex did at some point?) or intentionally (like Channel has in 1.4). Changes to atomicity introduce subtle and low-probability correctness bugs to existing code.

@elizarov
Copy link
Contributor Author

elizarov commented May 12, 2021

@yorickhenning Thank you for the feedback. It was a tough decision to make, but maintaining the old atomic cancellation behavior was just becoming too cumbersome. I'm surprised you've found it different from other concurrent primitives because the difference is only apparent in the presence of cancellation, which is a pretty unique feature of Kotlin coroutines itself. Without cancellation, it all just works similarly between different languages and different libraries.

However, I've looked at your original code and I don't see how its behavior is related to atomic cancellation. Since version 1.0 coroutines primitives were checking for cancellation when they were suspended. In particular, Mutex suspends only when it has to wait for lock.

So, it was always possible to have [2, 1], because the second producer might be simply too late to cancel the first one. The cancellation may come at a moment when the 1st producer just finished doing expensiveUserDefinedRequest(1) but has not started doing mutex.lock yet, so when mutex.lock executes the 1st producer is already canceled, but the lock is not held by the 2nd producer anymore, so there is no suspension and no cancellation check.

In order to prevent [2, 1] you'll need to add ensureActive() inside the 1st producer's mutex.withLock section.

@yorickhenning
Copy link
Contributor

Thank you for the feedback. It was a tough decision to make, but maintaining the old atomic cancellation behavior was just becoming too cumbersome

I have sympathy for that. Synchronization points have clear tradeoffs against low-level performance. Atomic cancellation meant synchronization.

However, I've looked at your original code and I don't see how its behavior is related to atomic cancellation.

I tried to give a simple example of why I might care about deterministic cancellation checkpointing in a concurrent algorithm. I should've stuck to the specific example... But I'm now unconvinced it didn't rely on implementation details. :)

A rendezvous Channel.send()/Channel.receive() (absent UNDISPATCHED) was a shared cancellation checkpoint between the sending and receiving coroutine, and now isn't, and that mattered, I think?

Since version 1.0 coroutines primitives were checking for cancellation when they were suspended. In particular, Mutex suspends only when it has to wait for lock.

Thanks for confirming that Mutex never checked cancellation at an uncontended lock(). I really was wondering about that.

I found the prior Mutex documentation ambiguous about suspension timing. The newer lock() docs are significantly clearer, and the suspendCancellableCoroutine {} docs are helpful and cross-linked, which is great.

I'm surprised you've found it different from other concurrent primitives because the difference is only apparent in the presence of cancellation, which is a pretty unique feature of Kotlin coroutines itself. Without cancellation, it all just works similarly between different languages and different libraries.

The systems I work on care a lot about efficient cancellation, backpressure, and keeping ACID under interrupt-style cancellation of process pipelines. Interruption-to-the-socket, bidirectional streaming, lots of user-defined concurrent functions. So, the difference is highly apparent in the way I use coroutines. And a contrast to other languages and libraries I've used to do the same things.

A Java Executor process can't wait for a neighbouring process to finish after cancelling a Future without extra infrastructure. Even with extra work, cheaply joining after cancellation without blocking Threads is hard. Joining coroutines after cancellation is easy, though closing resources when closure is async is still tricky.

I'm not certain whether Coroutines' cancellation changes make any properties harder to implement, but I sure depended on how cancellation worked. The observability of a system's behaviour is its public API, and all? I should've read the patch notes. :)

I'm not sure I've discovered everything cancellation changes impacted, yet. This new ContinuationInterceptor paragraph is a mystery for the future:

"""
If a custom implementation of ContinuationInterceptor is used in a coroutine’s context that does not extend CoroutineDispatcher class, then there is no prompt cancellation guarantee. A custom continuation interceptor can resume execution of a previously suspended coroutine even if its job was already cancelled.
"""

In order to prevent [2, 1] you'll need to add ensureActive() inside the 1st producer's mutex.withLock section.

Yep, that's exactly how that section reads, now.

@gmk57
Copy link

gmk57 commented May 16, 2021

I suppose these cancellation subtleties deserve to be mentioned in the "Shared flows, broadcast channels" article, which suggests using channels to "handle events that must be processed exactly once", because "posted events are never dropped by default". Some of us took this literally and eventually got issues.

@elizarov
Copy link
Contributor Author

elizarov commented May 18, 2021

@gmk57 I suppose these cancellation subtleties deserve to be mentioned in the "Shared flows, broadcast channels"

Good point, added.

amal added a commit to fluxo-kt/fluxo that referenced this issue Nov 16, 2022
…it handled exactly once. Together with `Store` it can also guarantee delivery of the side effect.

Also see:
[Proposal] Primitive or Channel that guarantees the delivery and processing of items
Kotlin/kotlinx.coroutines#2886

Rethink atomicity of certain low-level primitives
Kotlin/kotlinx.coroutines#1813

LiveData with SnackBar, Navigation and other events (the SingleLiveEvent case)
https://medium.com/androiddevelopers/livedata-with-snackbar-navigation-and-other-events-the-singleliveevent-case-ac2622673150
https://gist.github.com/JoseAlcerreca/5b661f1800e1e654f07cc54fe87441af

Shared flows, broadcast channels
https://elizarov.medium.com/shared-flows-broadcast-channels-899b675e805c
https://gmk57.medium.com/unfortunately-events-may-be-dropped-if-channel-receiveasflow-cfe78ae29004
https://gist.github.com/gmk57/330a7d214f5d710811c6b5ce27ceedaa

Signed-off-by: Artyom Shendrik <artyom.shendrik@gmail.com>
amal added a commit to fluxo-kt/fluxo that referenced this issue Nov 16, 2022
…it handled exactly once. Together with `Store` it can also guarantee delivery of the side effect.

Also see:
[Proposal] Primitive or Channel that guarantees the delivery and processing of items
Kotlin/kotlinx.coroutines#2886

Rethink atomicity of certain low-level primitives
Kotlin/kotlinx.coroutines#1813

LiveData with SnackBar, Navigation and other events (the SingleLiveEvent case)
https://medium.com/androiddevelopers/livedata-with-snackbar-navigation-and-other-events-the-singleliveevent-case-ac2622673150
https://gist.github.com/JoseAlcerreca/5b661f1800e1e654f07cc54fe87441af

Shared flows, broadcast channels
https://elizarov.medium.com/shared-flows-broadcast-channels-899b675e805c
https://gmk57.medium.com/unfortunately-events-may-be-dropped-if-channel-receiveasflow-cfe78ae29004
https://gist.github.com/gmk57/330a7d214f5d710811c6b5ce27ceedaa

Signed-off-by: Artyom Shendrik <artyom.shendrik@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

8 participants