Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Breaking: Get rid of atomic cancellation and provide a replacement #1937

Merged
merged 46 commits into from Oct 12, 2020
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
074ea3d
Refactoring: Improve DispatchedTask.run a bit
elizarov Apr 16, 2020
e68cb3b
Remove atomic cancellation support
elizarov Dec 23, 2019
46238a6
Basic resource transfer support (WIP)
elizarov Apr 11, 2020
5281798
~ Refactor to Channel.onElementCancel lambda
elizarov Apr 22, 2020
3228b92
~ onElementCancel docs
elizarov Apr 22, 2020
340e411
~ Fixup kotlinx-coroutines-rx3 module to internal API changes
elizarov Jun 16, 2020
5e1ecd8
~ Fixup new Semaphore implementation for non-atomic resume
elizarov Jun 16, 2020
25b7727
~ ChannelOnCancellationFailureTest
elizarov Sep 16, 2020
f91919d
~ ChannelOnCancellationFailureTest with more receive fail scenarios
elizarov Sep 16, 2020
356c88c
Update kotlinx-coroutines-core/common/test/CancellableResumeTest.kt
elizarov Sep 17, 2020
073c5f3
~ Added handler failure tests for CancellableContinuation
elizarov Sep 17, 2020
cbc2f8b
~ Document details of prompt cancellation
elizarov Sep 18, 2020
7fac3b7
~ Fixed channel performance
elizarov Sep 24, 2020
d94759b
~ Fixed Mutex.onLock cancellation during dispatch
elizarov Sep 24, 2020
d6986ef
~ More precise KDoc links to Continuation.resume/resumeWithException …
elizarov Sep 29, 2020
a2339f2
~ Clarified suspendCancellableCoroutine behavior on Job completion, a…
elizarov Sep 29, 2020
5d3cee9
~ Docs typos fixed
elizarov Sep 29, 2020
76792fa
~ Inlined MODE_CANCELLABLE_REUSABLE into getOrCreateCancellableContin…
elizarov Sep 29, 2020
c6aba6c
~ Fixed refs to suspendAtomicCancellableCoroutineReusable in docs
elizarov Sep 29, 2020
077b0e0
~ Fixed code style (extra parens around fun type)
elizarov Sep 29, 2020
9cfcb11
~ onElementCancel -> onUndeliveredElement
elizarov Sep 29, 2020
e9c6ee3
~ JvmField AtomicKt.NO_DECISION
elizarov Sep 29, 2020
4674989
~ Mutex.LockWaiter and its subclasses are inner classes
elizarov Sep 29, 2020
9b8778e
~ Further naming & code improvements
elizarov Sep 29, 2020
aa926fa
~ ArrayBroadcastChannel.kt does not support onUndeliveredElement
elizarov Sep 29, 2020
7fe569f
~ Consistent naming: cancelElement() -> undeliveredElement()
elizarov Sep 29, 2020
38ec1d0
~ Better commend on Send.pollResult
elizarov Sep 29, 2020
a2ed60f
~ Rename tests to xxxUndeliveredElementXxx
elizarov Sep 29, 2020
08ce2f8
~ Clarified that Channel.offer does not call onUndeliveredElement
elizarov Sep 29, 2020
c6a07b2
~ Call onUndeliveredElement when channel is closed/cancelled
elizarov Sep 29, 2020
afe8cc3
~ Test for closed channel fixed
elizarov Sep 29, 2020
427117d
~ Fixed remaining bugs to handle underlired elements when channel is …
elizarov Sep 30, 2020
b30d099
Update kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt
elizarov Oct 5, 2020
85b7812
~ non-inline helpCloseAndGetSendException
elizarov Oct 5, 2020
6535659
~ undeliveredElementException.cause !== ex optimization
elizarov Oct 5, 2020
500c911
~ Added "since version" for hidden channel constructor
elizarov Oct 5, 2020
2893470
~ Cleaner Semaphore.acquire code in case of immediate elimination wit…
elizarov Oct 5, 2020
4760c22
Update kotlinx-coroutines-core/common/src/internal/DispatchedTask.kt
elizarov Oct 5, 2020
6b79de8
~ Fixed cancellation resume mode & CancellableContinuationImpl stack …
elizarov Oct 5, 2020
c88b832
~ Better diagnostics in ChannelUndeliveredElementStressTest
elizarov Oct 5, 2020
4a7eb7a
~ Fixed bug in ChannelUndeliveredElementStressTest
elizarov Oct 5, 2020
5bca7ca
~ Initialize DispatchedContinuation.resumeMode with MODE_UNINITIALIZED
elizarov Oct 6, 2020
12bba52
~ CancellableContinuationImpl.resetState -> resetStateReusable
elizarov Oct 6, 2020
3df38e9
~ More tests on double invokeOnCancellation calls
elizarov Oct 6, 2020
164d3e5
~ Make sure that channel operations allocate memory at the same rate …
elizarov Oct 7, 2020
4361158
~ JvmField Itr.channel
elizarov Oct 12, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -70,12 +70,12 @@ class NonCancellableChannel : SimpleChannel() {
}

class CancellableChannel : SimpleChannel() {
override suspend fun suspendReceive(): Int = suspendAtomicCancellableCoroutine {
override suspend fun suspendReceive(): Int = suspendCancellableCoroutine {
consumer = it.intercepted()
COROUTINE_SUSPENDED
}

override suspend fun suspendSend(element: Int) = suspendAtomicCancellableCoroutine<Unit> {
override suspend fun suspendSend(element: Int) = suspendCancellableCoroutine<Unit> {
enqueuedValue = element
producer = it.intercepted()
COROUTINE_SUSPENDED
Expand All @@ -84,13 +84,13 @@ class CancellableChannel : SimpleChannel() {

class CancellableReusableChannel : SimpleChannel() {
@Suppress("INVISIBLE_MEMBER")
override suspend fun suspendReceive(): Int = suspendAtomicCancellableCoroutineReusable {
override suspend fun suspendReceive(): Int = suspendCancellableCoroutineReusable {
consumer = it.intercepted()
COROUTINE_SUSPENDED
}

@Suppress("INVISIBLE_MEMBER")
override suspend fun suspendSend(element: Int) = suspendAtomicCancellableCoroutineReusable<Unit> {
override suspend fun suspendSend(element: Int) = suspendCancellableCoroutineReusable<Unit> {
enqueuedValue = element
producer = it.intercepted()
COROUTINE_SUSPENDED
Expand Down
15 changes: 7 additions & 8 deletions kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
Expand Up @@ -46,6 +46,7 @@ public abstract interface class kotlinx/coroutines/CancellableContinuation : kot
public abstract fun resumeUndispatched (Lkotlinx/coroutines/CoroutineDispatcher;Ljava/lang/Object;)V
public abstract fun resumeUndispatchedWithException (Lkotlinx/coroutines/CoroutineDispatcher;Ljava/lang/Throwable;)V
public abstract fun tryResume (Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;
public abstract fun tryResume (Ljava/lang/Object;Ljava/lang/Object;Lkotlin/jvm/functions/Function1;)Ljava/lang/Object;
public abstract fun tryResumeWithException (Ljava/lang/Throwable;)Ljava/lang/Object;
}

Expand All @@ -56,6 +57,8 @@ public final class kotlinx/coroutines/CancellableContinuation$DefaultImpls {

public class kotlinx/coroutines/CancellableContinuationImpl : kotlin/coroutines/jvm/internal/CoroutineStackFrame, kotlinx/coroutines/CancellableContinuation {
public fun <init> (Lkotlin/coroutines/Continuation;I)V
public final fun callCancelHandler (Lkotlinx/coroutines/CancelHandler;Ljava/lang/Throwable;)V
public final fun callOnCancellation (Lkotlin/jvm/functions/Function1;Ljava/lang/Throwable;)V
public fun cancel (Ljava/lang/Throwable;)Z
public fun completeResume (Ljava/lang/Object;)V
public fun getCallerFrame ()Lkotlin/coroutines/jvm/internal/CoroutineStackFrame;
Expand All @@ -75,14 +78,12 @@ public class kotlinx/coroutines/CancellableContinuationImpl : kotlin/coroutines/
public fun resumeWith (Ljava/lang/Object;)V
public fun toString ()Ljava/lang/String;
public fun tryResume (Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;
public fun tryResume (Ljava/lang/Object;Ljava/lang/Object;Lkotlin/jvm/functions/Function1;)Ljava/lang/Object;
public fun tryResumeWithException (Ljava/lang/Throwable;)Ljava/lang/Object;
}

public final class kotlinx/coroutines/CancellableContinuationKt {
public static final fun disposeOnCancellation (Lkotlinx/coroutines/CancellableContinuation;Lkotlinx/coroutines/DisposableHandle;)V
public static final fun suspendAtomicCancellableCoroutine (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun suspendAtomicCancellableCoroutine (ZLkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static synthetic fun suspendAtomicCancellableCoroutine$default (ZLkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
public static final fun suspendCancellableCoroutine (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

Expand Down Expand Up @@ -272,10 +273,6 @@ public final class kotlinx/coroutines/DelayKt {
public static final fun delay-p9JZ4hM (DLkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public final class kotlinx/coroutines/DispatchedContinuationKt {
public static final fun resumeCancellableWith (Lkotlin/coroutines/Continuation;Ljava/lang/Object;)V
}

public final class kotlinx/coroutines/Dispatchers {
public static final field INSTANCE Lkotlinx/coroutines/Dispatchers;
public static final fun getDefault ()Lkotlinx/coroutines/CoroutineDispatcher;
Expand Down Expand Up @@ -613,8 +610,10 @@ public final class kotlinx/coroutines/channels/ChannelIterator$DefaultImpls {
}

public final class kotlinx/coroutines/channels/ChannelKt {
public static final fun Channel (I)Lkotlinx/coroutines/channels/Channel;
public static final synthetic fun Channel (I)Lkotlinx/coroutines/channels/Channel;
public static final fun Channel (ILkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/channels/Channel;
public static synthetic fun Channel$default (IILjava/lang/Object;)Lkotlinx/coroutines/channels/Channel;
public static synthetic fun Channel$default (ILkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lkotlinx/coroutines/channels/Channel;
}

public final class kotlinx/coroutines/channels/ChannelsKt {
Expand Down
73 changes: 28 additions & 45 deletions kotlinx-coroutines-core/common/src/CancellableContinuation.kt
Expand Up @@ -76,6 +76,14 @@ public interface CancellableContinuation<in T> : Continuation<T> {
@InternalCoroutinesApi
public fun tryResume(value: T, idempotent: Any? = null): Any?

/**
* Same as [tryResume] but with [onCancellation] handler that called if and only if the value is not
* delivered to the caller because of the dispatch in the process, so that atomicity delivery
* guaranteed can be provided by having a cancellation fallback.
*/
@InternalCoroutinesApi
public fun tryResume(value: T, idempotent: Any?, onCancellation: ((cause: Throwable) -> Unit)?): Any?

/**
* Tries to resume this continuation with the specified [exception] and returns a non-null object token if successful,
* or `null` otherwise (it was already resumed or cancelled). When a non-null object is returned,
Expand Down Expand Up @@ -110,8 +118,8 @@ public interface CancellableContinuation<in T> : Continuation<T> {
public fun cancel(cause: Throwable? = null): Boolean

/**
* Registers a [handler] to be **synchronously** invoked on cancellation (regular or exceptional) of this continuation.
* When the continuation is already cancelled, the handler will be immediately invoked
* Registers a [handler] to be **synchronously** invoked on [cancellation][cancel] (regular or exceptional) of this continuation.
* When the continuation is already cancelled, the handler is immediately invoked
* with the cancellation exception. Otherwise, the handler will be invoked as soon as this
* continuation is cancelled.
*
Expand All @@ -120,7 +128,12 @@ public interface CancellableContinuation<in T> : Continuation<T> {
* processed as an uncaught exception in the context of the current coroutine
* (see [CoroutineExceptionHandler]).
*
* At most one [handler] can be installed on a continuation.
* At most one [handler] can be installed on a continuation. Attempt to call `invokeOnCancellation` second
* time produces [IllegalStateException].
*
* This handler is also called when this continuation [resumes][resume] normally (with a value) and then
* is cancelled while waiting to be dispatched. More generally speaking, this handler is called whenever
* the caller of [suspendCancellableCoroutine] is getting a [CancellationException].
*
* **Note**: Implementation of `CompletionHandler` must be fast, non-blocking, and thread-safe.
* This `handler` can be invoked concurrently with the surrounding code.
Expand Down Expand Up @@ -176,7 +189,7 @@ public interface CancellableContinuation<in T> : Continuation<T> {
* There is no guarantee on the execution context of its invocation.
*/
@ExperimentalCoroutinesApi // since 1.2.0
public fun resume(value: T, onCancellation: (cause: Throwable) -> Unit)
public fun resume(value: T, onCancellation: ((cause: Throwable) -> Unit)?)
}

/**
Expand All @@ -199,40 +212,24 @@ public suspend inline fun <T> suspendCancellableCoroutine(
}

/**
* Suspends the coroutine like [suspendCancellableCoroutine], but with *atomic cancellation*.
*
* When the suspended function throws a [CancellationException], it means that the continuation was not resumed.
* As a side-effect of atomic cancellation, a thread-bound coroutine (to some UI thread, for example) may
* continue to execute even after it was cancelled from the same thread in the case when the continuation
* was already resumed and was posted for execution to the thread's queue.
*
* @suppress **This an internal API and should not be used from general code.**
*/
@InternalCoroutinesApi
public suspend inline fun <T> suspendAtomicCancellableCoroutine(
crossinline block: (CancellableContinuation<T>) -> Unit
): T =
suspendCoroutineUninterceptedOrReturn { uCont ->
val cancellable = CancellableContinuationImpl(uCont.intercepted(), resumeMode = MODE_ATOMIC_DEFAULT)
block(cancellable)
cancellable.getResult()
}

/**
* Suspends coroutine similar to [suspendAtomicCancellableCoroutine], but an instance of [CancellableContinuationImpl] is reused if possible.
* Suspends the coroutine similar to [suspendCancellableCoroutine], but an instance of
* [CancellableContinuationImpl] is reused.
*/
internal suspend inline fun <T> suspendAtomicCancellableCoroutineReusable(
internal suspend inline fun <T> suspendCancellableCoroutineReusable(
crossinline block: (CancellableContinuation<T>) -> Unit
): T = suspendCoroutineUninterceptedOrReturn { uCont ->
val cancellable = getOrCreateCancellableContinuation(uCont.intercepted())
val cancellable = getOrCreateCancellableContinuation(uCont.intercepted(), resumeMode = MODE_CANCELLABLE_REUSABLE)
elizarov marked this conversation as resolved.
Show resolved Hide resolved
block(cancellable)
cancellable.getResult()
}

internal fun <T> getOrCreateCancellableContinuation(delegate: Continuation<T>): CancellableContinuationImpl<T> {
internal fun <T> getOrCreateCancellableContinuation(
delegate: Continuation<T>, resumeMode: Int
): CancellableContinuationImpl<T> {
assert { resumeMode.isReusableMode }
// If used outside of our dispatcher
if (delegate !is DispatchedContinuation<T>) {
return CancellableContinuationImpl(delegate, resumeMode = MODE_ATOMIC_DEFAULT)
return CancellableContinuationImpl(delegate, resumeMode)
}
/*
* Attempt to claim reusable instance.
elizarov marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -248,24 +245,10 @@ internal fun <T> getOrCreateCancellableContinuation(delegate: Continuation<T>):
* thus leaking CC instance for indefinite time.
* 2) Continuation was cancelled. Then we should prevent any further reuse and bail out.
*/
return delegate.claimReusableCancellableContinuation()?.takeIf { it.resetState() }
?: return CancellableContinuationImpl(delegate, MODE_ATOMIC_DEFAULT)
return delegate.claimReusableCancellableContinuation()?.takeIf { it.resetState(resumeMode) }
?: return CancellableContinuationImpl(delegate, resumeMode)
}

/**
* @suppress **Deprecated**
*/
@Deprecated(
message = "holdCancellability parameter is deprecated and is no longer used",
replaceWith = ReplaceWith("suspendAtomicCancellableCoroutine(block)")
)
@InternalCoroutinesApi
public suspend inline fun <T> suspendAtomicCancellableCoroutine(
holdCancellability: Boolean = false,
crossinline block: (CancellableContinuation<T>) -> Unit
): T =
suspendAtomicCancellableCoroutine(block)

/**
* Removes the specified [node] on cancellation.
*/
Expand Down