Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Breaking: Get rid of atomic cancellation and provide a replacement #1937

Merged
merged 46 commits into from Oct 12, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
074ea3d
Refactoring: Improve DispatchedTask.run a bit
elizarov Apr 16, 2020
e68cb3b
Remove atomic cancellation support
elizarov Dec 23, 2019
46238a6
Basic resource transfer support (WIP)
elizarov Apr 11, 2020
5281798
~ Refactor to Channel.onElementCancel lambda
elizarov Apr 22, 2020
3228b92
~ onElementCancel docs
elizarov Apr 22, 2020
340e411
~ Fixup kotlinx-coroutines-rx3 module to internal API changes
elizarov Jun 16, 2020
5e1ecd8
~ Fixup new Semaphore implementation for non-atomic resume
elizarov Jun 16, 2020
25b7727
~ ChannelOnCancellationFailureTest
elizarov Sep 16, 2020
f91919d
~ ChannelOnCancellationFailureTest with more receive fail scenarios
elizarov Sep 16, 2020
356c88c
Update kotlinx-coroutines-core/common/test/CancellableResumeTest.kt
elizarov Sep 17, 2020
073c5f3
~ Added handler failure tests for CancellableContinuation
elizarov Sep 17, 2020
cbc2f8b
~ Document details of prompt cancellation
elizarov Sep 18, 2020
7fac3b7
~ Fixed channel performance
elizarov Sep 24, 2020
d94759b
~ Fixed Mutex.onLock cancellation during dispatch
elizarov Sep 24, 2020
d6986ef
~ More precise KDoc links to Continuation.resume/resumeWithException …
elizarov Sep 29, 2020
a2339f2
~ Clarified suspendCancellableCoroutine behavior on Job completion, a…
elizarov Sep 29, 2020
5d3cee9
~ Docs typos fixed
elizarov Sep 29, 2020
76792fa
~ Inlined MODE_CANCELLABLE_REUSABLE into getOrCreateCancellableContin…
elizarov Sep 29, 2020
c6aba6c
~ Fixed refs to suspendAtomicCancellableCoroutineReusable in docs
elizarov Sep 29, 2020
077b0e0
~ Fixed code style (extra parens around fun type)
elizarov Sep 29, 2020
9cfcb11
~ onElementCancel -> onUndeliveredElement
elizarov Sep 29, 2020
e9c6ee3
~ JvmField AtomicKt.NO_DECISION
elizarov Sep 29, 2020
4674989
~ Mutex.LockWaiter and its subclasses are inner classes
elizarov Sep 29, 2020
9b8778e
~ Further naming & code improvements
elizarov Sep 29, 2020
aa926fa
~ ArrayBroadcastChannel.kt does not support onUndeliveredElement
elizarov Sep 29, 2020
7fe569f
~ Consistent naming: cancelElement() -> undeliveredElement()
elizarov Sep 29, 2020
38ec1d0
~ Better commend on Send.pollResult
elizarov Sep 29, 2020
a2ed60f
~ Rename tests to xxxUndeliveredElementXxx
elizarov Sep 29, 2020
08ce2f8
~ Clarified that Channel.offer does not call onUndeliveredElement
elizarov Sep 29, 2020
c6a07b2
~ Call onUndeliveredElement when channel is closed/cancelled
elizarov Sep 29, 2020
afe8cc3
~ Test for closed channel fixed
elizarov Sep 29, 2020
427117d
~ Fixed remaining bugs to handle underlired elements when channel is …
elizarov Sep 30, 2020
b30d099
Update kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt
elizarov Oct 5, 2020
85b7812
~ non-inline helpCloseAndGetSendException
elizarov Oct 5, 2020
6535659
~ undeliveredElementException.cause !== ex optimization
elizarov Oct 5, 2020
500c911
~ Added "since version" for hidden channel constructor
elizarov Oct 5, 2020
2893470
~ Cleaner Semaphore.acquire code in case of immediate elimination wit…
elizarov Oct 5, 2020
4760c22
Update kotlinx-coroutines-core/common/src/internal/DispatchedTask.kt
elizarov Oct 5, 2020
6b79de8
~ Fixed cancellation resume mode & CancellableContinuationImpl stack …
elizarov Oct 5, 2020
c88b832
~ Better diagnostics in ChannelUndeliveredElementStressTest
elizarov Oct 5, 2020
4a7eb7a
~ Fixed bug in ChannelUndeliveredElementStressTest
elizarov Oct 5, 2020
5bca7ca
~ Initialize DispatchedContinuation.resumeMode with MODE_UNINITIALIZED
elizarov Oct 6, 2020
12bba52
~ CancellableContinuationImpl.resetState -> resetStateReusable
elizarov Oct 6, 2020
3df38e9
~ More tests on double invokeOnCancellation calls
elizarov Oct 6, 2020
164d3e5
~ Make sure that channel operations allocate memory at the same rate …
elizarov Oct 7, 2020
4361158
~ JvmField Itr.channel
elizarov Oct 12, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -70,12 +70,12 @@ class NonCancellableChannel : SimpleChannel() {
}

class CancellableChannel : SimpleChannel() {
override suspend fun suspendReceive(): Int = suspendAtomicCancellableCoroutine {
override suspend fun suspendReceive(): Int = suspendCancellableCoroutine {
consumer = it.intercepted()
COROUTINE_SUSPENDED
}

override suspend fun suspendSend(element: Int) = suspendAtomicCancellableCoroutine<Unit> {
override suspend fun suspendSend(element: Int) = suspendCancellableCoroutine<Unit> {
enqueuedValue = element
producer = it.intercepted()
COROUTINE_SUSPENDED
Expand All @@ -84,13 +84,13 @@ class CancellableChannel : SimpleChannel() {

class CancellableReusableChannel : SimpleChannel() {
@Suppress("INVISIBLE_MEMBER")
override suspend fun suspendReceive(): Int = suspendAtomicCancellableCoroutineReusable {
override suspend fun suspendReceive(): Int = suspendCancellableCoroutineReusable {
consumer = it.intercepted()
COROUTINE_SUSPENDED
}

@Suppress("INVISIBLE_MEMBER")
override suspend fun suspendSend(element: Int) = suspendAtomicCancellableCoroutineReusable<Unit> {
override suspend fun suspendSend(element: Int) = suspendCancellableCoroutineReusable<Unit> {
enqueuedValue = element
producer = it.intercepted()
COROUTINE_SUSPENDED
Expand Down
15 changes: 7 additions & 8 deletions kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
Expand Up @@ -46,6 +46,7 @@ public abstract interface class kotlinx/coroutines/CancellableContinuation : kot
public abstract fun resumeUndispatched (Lkotlinx/coroutines/CoroutineDispatcher;Ljava/lang/Object;)V
public abstract fun resumeUndispatchedWithException (Lkotlinx/coroutines/CoroutineDispatcher;Ljava/lang/Throwable;)V
public abstract fun tryResume (Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;
public abstract fun tryResume (Ljava/lang/Object;Ljava/lang/Object;Lkotlin/jvm/functions/Function1;)Ljava/lang/Object;
public abstract fun tryResumeWithException (Ljava/lang/Throwable;)Ljava/lang/Object;
}

Expand All @@ -56,6 +57,8 @@ public final class kotlinx/coroutines/CancellableContinuation$DefaultImpls {

public class kotlinx/coroutines/CancellableContinuationImpl : kotlin/coroutines/jvm/internal/CoroutineStackFrame, kotlinx/coroutines/CancellableContinuation {
public fun <init> (Lkotlin/coroutines/Continuation;I)V
public final fun callCancelHandler (Lkotlinx/coroutines/CancelHandler;Ljava/lang/Throwable;)V
public final fun callOnCancellation (Lkotlin/jvm/functions/Function1;Ljava/lang/Throwable;)V
public fun cancel (Ljava/lang/Throwable;)Z
public fun completeResume (Ljava/lang/Object;)V
public fun getCallerFrame ()Lkotlin/coroutines/jvm/internal/CoroutineStackFrame;
Expand All @@ -75,14 +78,12 @@ public class kotlinx/coroutines/CancellableContinuationImpl : kotlin/coroutines/
public fun resumeWith (Ljava/lang/Object;)V
public fun toString ()Ljava/lang/String;
public fun tryResume (Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;
public fun tryResume (Ljava/lang/Object;Ljava/lang/Object;Lkotlin/jvm/functions/Function1;)Ljava/lang/Object;
public fun tryResumeWithException (Ljava/lang/Throwable;)Ljava/lang/Object;
}

public final class kotlinx/coroutines/CancellableContinuationKt {
public static final fun disposeOnCancellation (Lkotlinx/coroutines/CancellableContinuation;Lkotlinx/coroutines/DisposableHandle;)V
public static final fun suspendAtomicCancellableCoroutine (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun suspendAtomicCancellableCoroutine (ZLkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static synthetic fun suspendAtomicCancellableCoroutine$default (ZLkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
public static final fun suspendCancellableCoroutine (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

Expand Down Expand Up @@ -272,10 +273,6 @@ public final class kotlinx/coroutines/DelayKt {
public static final fun delay-p9JZ4hM (DLkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public final class kotlinx/coroutines/DispatchedContinuationKt {
public static final fun resumeCancellableWith (Lkotlin/coroutines/Continuation;Ljava/lang/Object;)V
}

public final class kotlinx/coroutines/Dispatchers {
public static final field INSTANCE Lkotlinx/coroutines/Dispatchers;
public static final fun getDefault ()Lkotlinx/coroutines/CoroutineDispatcher;
Expand Down Expand Up @@ -613,8 +610,10 @@ public final class kotlinx/coroutines/channels/ChannelIterator$DefaultImpls {
}

public final class kotlinx/coroutines/channels/ChannelKt {
public static final fun Channel (I)Lkotlinx/coroutines/channels/Channel;
public static final synthetic fun Channel (I)Lkotlinx/coroutines/channels/Channel;
public static final fun Channel (ILkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/channels/Channel;
public static synthetic fun Channel$default (IILjava/lang/Object;)Lkotlinx/coroutines/channels/Channel;
public static synthetic fun Channel$default (ILkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lkotlinx/coroutines/channels/Channel;
}

public final class kotlinx/coroutines/channels/ChannelsKt {
Expand Down
15 changes: 13 additions & 2 deletions kotlinx-coroutines-core/common/src/Await.kt
Expand Up @@ -5,6 +5,7 @@
package kotlinx.coroutines

import kotlinx.atomicfu.*
import kotlinx.coroutines.channels.*
import kotlin.coroutines.*

/**
Expand All @@ -18,6 +19,8 @@ import kotlin.coroutines.*
* This suspending function is cancellable.
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting,
* this function immediately resumes with [CancellationException].
* There is a **prompt cancellation guarantee**. If the job was cancelled while this function was
* suspended, it will not resume successfully. See [suspendCancellableCoroutine] documentation for low-level details.
*/
public suspend fun <T> awaitAll(vararg deferreds: Deferred<T>): List<T> =
if (deferreds.isEmpty()) emptyList() else AwaitAll(deferreds).await()
Expand All @@ -33,6 +36,8 @@ public suspend fun <T> awaitAll(vararg deferreds: Deferred<T>): List<T> =
* This suspending function is cancellable.
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting,
* this function immediately resumes with [CancellationException].
* There is a **prompt cancellation guarantee**. If the job was cancelled while this function was
* suspended, it will not resume successfully. See [suspendCancellableCoroutine] documentation for low-level details.
*/
public suspend fun <T> Collection<Deferred<T>>.awaitAll(): List<T> =
if (isEmpty()) emptyList() else AwaitAll(toTypedArray()).await()
Expand All @@ -41,17 +46,23 @@ public suspend fun <T> Collection<Deferred<T>>.awaitAll(): List<T> =
* Suspends current coroutine until all given jobs are complete.
* This method is semantically equivalent to joining all given jobs one by one with `jobs.forEach { it.join() }`.
*
* This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting,
* This suspending function is cancellable.
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting,
* this function immediately resumes with [CancellationException].
* There is a **prompt cancellation guarantee**. If the job was cancelled while this function was
* suspended, it will not resume successfully. See [suspendCancellableCoroutine] documentation for low-level details.
*/
public suspend fun joinAll(vararg jobs: Job): Unit = jobs.forEach { it.join() }

/**
* Suspends current coroutine until all given jobs are complete.
* This method is semantically equivalent to joining all given jobs one by one with `forEach { it.join() }`.
*
* This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting,
* This suspending function is cancellable.
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting,
* this function immediately resumes with [CancellationException].
* There is a **prompt cancellation guarantee**. If the job was cancelled while this function was
* suspended, it will not resume successfully. See [suspendCancellableCoroutine] documentation for low-level details.
*/
public suspend fun Collection<Job>.joinAll(): Unit = forEach { it.join() }

Expand Down
5 changes: 3 additions & 2 deletions kotlinx-coroutines-core/common/src/Builders.common.kt
Expand Up @@ -129,8 +129,9 @@ private class LazyDeferredCoroutine<T>(
* This function uses dispatcher from the new context, shifting execution of the [block] into the
* different thread if a new dispatcher is specified, and back to the original dispatcher
* when it completes. Note that the result of `withContext` invocation is
* dispatched into the original context in a cancellable way, which means that if the original [coroutineContext],
* in which `withContext` was invoked, is cancelled by the time its dispatcher starts to execute the code,
* dispatched into the original context in a cancellable way with a **prompt cancellation guarantee**,
* which means that if the original [coroutineContext], in which `withContext` was invoked,
* is cancelled by the time its dispatcher starts to execute the code,
* it discards the result of `withContext` and throws [CancellationException].
*/
public suspend fun <T> withContext(
Expand Down