Skip to content

Commit

Permalink
Remove atomic cancellation support
Browse files Browse the repository at this point in the history
This is a problematic for Android when Main dispatcher is cancelled on destroyed activity.
Atomic nature of channels is designed to prevent loss of elements,
which is really not an issue for a typical application, but creates problem when used with channels.

* Internal suspendAtomicCancellableCoroutine -> suspendCancellableCoroutine
* Internal suspendAtomicCancellableCoroutineReusable -> suspendCancellableCoroutineReusable
* Remove atomic cancellation from docs
* Ensures that flowOn does not resume downstream after cancellation.
* MODE_ATOMIC_DEFAULT renamed into MODE_ATOMIC
* Introduced MODE_CANCELLABLE_REUSABLE to track suspendCancellableCoroutineReusable
* Better documentation for MODE_XXX constants.
* Added stress test for proper handling of MODE_CANCELLABLE_REUSABLE
  and fixed test for #1123 bug with job.join (working in MODE_CANCELLABLE) that was not
  properly failing in the absence of the proper code in CancellableContinuationImpl.getResult
* Added test for Flow.combine that should be fixed
* Support extended invokeOnCancellation contract
* Introduced internal tryResumeAtomic

Fixes #1265
Fixes #1813
Fixes #1915
  • Loading branch information
elizarov committed Sep 16, 2020
1 parent 074ea3d commit e68cb3b
Show file tree
Hide file tree
Showing 31 changed files with 491 additions and 371 deletions.
Expand Up @@ -70,12 +70,12 @@ class NonCancellableChannel : SimpleChannel() {
}

class CancellableChannel : SimpleChannel() {
override suspend fun suspendReceive(): Int = suspendAtomicCancellableCoroutine {
override suspend fun suspendReceive(): Int = suspendCancellableCoroutine {
consumer = it.intercepted()
COROUTINE_SUSPENDED
}

override suspend fun suspendSend(element: Int) = suspendAtomicCancellableCoroutine<Unit> {
override suspend fun suspendSend(element: Int) = suspendCancellableCoroutine<Unit> {
enqueuedValue = element
producer = it.intercepted()
COROUTINE_SUSPENDED
Expand All @@ -84,13 +84,13 @@ class CancellableChannel : SimpleChannel() {

class CancellableReusableChannel : SimpleChannel() {
@Suppress("INVISIBLE_MEMBER")
override suspend fun suspendReceive(): Int = suspendAtomicCancellableCoroutineReusable {
override suspend fun suspendReceive(): Int = suspendCancellableCoroutineReusable {
consumer = it.intercepted()
COROUTINE_SUSPENDED
}

@Suppress("INVISIBLE_MEMBER")
override suspend fun suspendSend(element: Int) = suspendAtomicCancellableCoroutineReusable<Unit> {
override suspend fun suspendSend(element: Int) = suspendCancellableCoroutineReusable<Unit> {
enqueuedValue = element
producer = it.intercepted()
COROUTINE_SUSPENDED
Expand Down
3 changes: 0 additions & 3 deletions kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
Expand Up @@ -80,9 +80,6 @@ public class kotlinx/coroutines/CancellableContinuationImpl : kotlin/coroutines/

public final class kotlinx/coroutines/CancellableContinuationKt {
public static final fun disposeOnCancellation (Lkotlinx/coroutines/CancellableContinuation;Lkotlinx/coroutines/DisposableHandle;)V
public static final fun suspendAtomicCancellableCoroutine (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun suspendAtomicCancellableCoroutine (ZLkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static synthetic fun suspendAtomicCancellableCoroutine$default (ZLkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
public static final fun suspendCancellableCoroutine (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

Expand Down
71 changes: 27 additions & 44 deletions kotlinx-coroutines-core/common/src/CancellableContinuation.kt
Expand Up @@ -76,6 +76,14 @@ public interface CancellableContinuation<in T> : Continuation<T> {
@InternalCoroutinesApi
public fun tryResume(value: T, idempotent: Any? = null): Any?

/**
* Same as [tryResume] but with [onCancellation] handler that called if and only if the value is not
* delivered to the caller because of the dispatch in the process, so that atomicity delivery
* guaranteed can be provided by having a cancellation fallback.
*/
@InternalCoroutinesApi
public fun tryResumeAtomic(value: T, idempotent: Any?, onCancellation: (cause: Throwable) -> Unit): Any?

/**
* Tries to resume this continuation with the specified [exception] and returns a non-null object token if successful,
* or `null` otherwise (it was already resumed or cancelled). When a non-null object is returned,
Expand Down Expand Up @@ -110,8 +118,8 @@ public interface CancellableContinuation<in T> : Continuation<T> {
public fun cancel(cause: Throwable? = null): Boolean

/**
* Registers a [handler] to be **synchronously** invoked on cancellation (regular or exceptional) of this continuation.
* When the continuation is already cancelled, the handler will be immediately invoked
* Registers a [handler] to be **synchronously** invoked on [cancellation][cancel] (regular or exceptional) of this continuation.
* When the continuation is already cancelled, the handler is immediately invoked
* with the cancellation exception. Otherwise, the handler will be invoked as soon as this
* continuation is cancelled.
*
Expand All @@ -120,7 +128,12 @@ public interface CancellableContinuation<in T> : Continuation<T> {
* processed as an uncaught exception in the context of the current coroutine
* (see [CoroutineExceptionHandler]).
*
* At most one [handler] can be installed on a continuation.
* At most one [handler] can be installed on a continuation. Attempt to call `invokeOnCancellation` second
* time produces [IllegalStateException].
*
* This handler is also called when this continuation [resumes][resume] normally (with a value) and then
* is cancelled while waiting to be dispatched. More generally speaking, this handler is called whenever
* the caller of [suspendCancellableCoroutine] is getting a [CancellationException].
*
* **Note**: Implementation of `CompletionHandler` must be fast, non-blocking, and thread-safe.
* This `handler` can be invoked concurrently with the surrounding code.
Expand Down Expand Up @@ -199,40 +212,24 @@ public suspend inline fun <T> suspendCancellableCoroutine(
}

/**
* Suspends the coroutine like [suspendCancellableCoroutine], but with *atomic cancellation*.
*
* When the suspended function throws a [CancellationException], it means that the continuation was not resumed.
* As a side-effect of atomic cancellation, a thread-bound coroutine (to some UI thread, for example) may
* continue to execute even after it was cancelled from the same thread in the case when the continuation
* was already resumed and was posted for execution to the thread's queue.
*
* @suppress **This an internal API and should not be used from general code.**
*/
@InternalCoroutinesApi
public suspend inline fun <T> suspendAtomicCancellableCoroutine(
crossinline block: (CancellableContinuation<T>) -> Unit
): T =
suspendCoroutineUninterceptedOrReturn { uCont ->
val cancellable = CancellableContinuationImpl(uCont.intercepted(), resumeMode = MODE_ATOMIC_DEFAULT)
block(cancellable)
cancellable.getResult()
}

/**
* Suspends coroutine similar to [suspendAtomicCancellableCoroutine], but an instance of [CancellableContinuationImpl] is reused if possible.
* Suspends the coroutine similar to [suspendCancellableCoroutine], but an instance of
* [CancellableContinuationImpl] is reused.
*/
internal suspend inline fun <T> suspendAtomicCancellableCoroutineReusable(
internal suspend inline fun <T> suspendCancellableCoroutineReusable(
crossinline block: (CancellableContinuation<T>) -> Unit
): T = suspendCoroutineUninterceptedOrReturn { uCont ->
val cancellable = getOrCreateCancellableContinuation(uCont.intercepted())
val cancellable = getOrCreateCancellableContinuation(uCont.intercepted(), resumeMode = MODE_CANCELLABLE_REUSABLE)
block(cancellable)
cancellable.getResult()
}

internal fun <T> getOrCreateCancellableContinuation(delegate: Continuation<T>): CancellableContinuationImpl<T> {
internal fun <T> getOrCreateCancellableContinuation(
delegate: Continuation<T>, resumeMode: Int
): CancellableContinuationImpl<T> {
assert { resumeMode.isReusableMode }
// If used outside of our dispatcher
if (delegate !is DispatchedContinuation<T>) {
return CancellableContinuationImpl(delegate, resumeMode = MODE_ATOMIC_DEFAULT)
return CancellableContinuationImpl(delegate, resumeMode)
}
/*
* Attempt to claim reusable instance.
Expand All @@ -248,24 +245,10 @@ internal fun <T> getOrCreateCancellableContinuation(delegate: Continuation<T>):
* thus leaking CC instance for indefinite time.
* 2) Continuation was cancelled. Then we should prevent any further reuse and bail out.
*/
return delegate.claimReusableCancellableContinuation()?.takeIf { it.resetState() }
?: return CancellableContinuationImpl(delegate, MODE_ATOMIC_DEFAULT)
return delegate.claimReusableCancellableContinuation()?.takeIf { it.resetState(resumeMode) }
?: return CancellableContinuationImpl(delegate, resumeMode)
}

/**
* @suppress **Deprecated**
*/
@Deprecated(
message = "holdCancellability parameter is deprecated and is no longer used",
replaceWith = ReplaceWith("suspendAtomicCancellableCoroutine(block)")
)
@InternalCoroutinesApi
public suspend inline fun <T> suspendAtomicCancellableCoroutine(
holdCancellability: Boolean = false,
crossinline block: (CancellableContinuation<T>) -> Unit
): T =
suspendAtomicCancellableCoroutine(block)

/**
* Removes the specified [node] on cancellation.
*/
Expand Down

0 comments on commit e68cb3b

Please sign in to comment.