Skip to content

Commit

Permalink
Breaking: Get rid of atomic cancellation and provide a replacement (K…
Browse files Browse the repository at this point in the history
…otlin#1937)

This is a problematic for Android when Main dispatcher is cancelled on destroyed activity.
Atomic nature of channels is designed to prevent loss of elements,
which is really not an issue for a typical application, but creates problem when used with channels.

* Internal suspendAtomicCancellableCoroutine -> suspendCancellableCoroutine
* Internal suspendAtomicCancellableCoroutineReusable -> suspendCancellableCoroutineReusable
* Remove atomic cancellation from docs
* Ensures that flowOn does not resume downstream after cancellation.
* MODE_ATOMIC_DEFAULT renamed into MODE_ATOMIC
* Introduced MODE_CANCELLABLE_REUSABLE to track suspendCancellableCoroutineReusable
* Better documentation for MODE_XXX constants.
* Added stress test for proper handling of MODE_CANCELLABLE_REUSABLE
  and fixed test for Kotlin#1123 bug with job.join (working in MODE_CANCELLABLE) that was not
  properly failing in the absence of the proper code in CancellableContinuationImpl.getResult
* Added test for Flow.combine that should be fixed
* Support extended invokeOnCancellation contract
* Introduced internal tryResumeAtomic
* Channel onUnderliveredElement is introduced as a replacement.

Fixes Kotlin#1265
Fixes Kotlin#1813
Fixes Kotlin#1915
Fixes Kotlin#1936

Co-authored-by: Louis CAD <louis.cognault@gmail.com>
Co-authored-by: Vsevolod Tolstopyatov <qwwdfsad@gmail.com>
  • Loading branch information
3 people authored and recheej committed Dec 28, 2020
1 parent 79feaa0 commit 338c209
Show file tree
Hide file tree
Showing 72 changed files with 2,098 additions and 679 deletions.
Expand Up @@ -70,12 +70,12 @@ class NonCancellableChannel : SimpleChannel() {
}

class CancellableChannel : SimpleChannel() {
override suspend fun suspendReceive(): Int = suspendAtomicCancellableCoroutine {
override suspend fun suspendReceive(): Int = suspendCancellableCoroutine {
consumer = it.intercepted()
COROUTINE_SUSPENDED
}

override suspend fun suspendSend(element: Int) = suspendAtomicCancellableCoroutine<Unit> {
override suspend fun suspendSend(element: Int) = suspendCancellableCoroutine<Unit> {
enqueuedValue = element
producer = it.intercepted()
COROUTINE_SUSPENDED
Expand All @@ -84,13 +84,13 @@ class CancellableChannel : SimpleChannel() {

class CancellableReusableChannel : SimpleChannel() {
@Suppress("INVISIBLE_MEMBER")
override suspend fun suspendReceive(): Int = suspendAtomicCancellableCoroutineReusable {
override suspend fun suspendReceive(): Int = suspendCancellableCoroutineReusable {
consumer = it.intercepted()
COROUTINE_SUSPENDED
}

@Suppress("INVISIBLE_MEMBER")
override suspend fun suspendSend(element: Int) = suspendAtomicCancellableCoroutineReusable<Unit> {
override suspend fun suspendSend(element: Int) = suspendCancellableCoroutineReusable<Unit> {
enqueuedValue = element
producer = it.intercepted()
COROUTINE_SUSPENDED
Expand Down
15 changes: 7 additions & 8 deletions kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
Expand Up @@ -46,6 +46,7 @@ public abstract interface class kotlinx/coroutines/CancellableContinuation : kot
public abstract fun resumeUndispatched (Lkotlinx/coroutines/CoroutineDispatcher;Ljava/lang/Object;)V
public abstract fun resumeUndispatchedWithException (Lkotlinx/coroutines/CoroutineDispatcher;Ljava/lang/Throwable;)V
public abstract fun tryResume (Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;
public abstract fun tryResume (Ljava/lang/Object;Ljava/lang/Object;Lkotlin/jvm/functions/Function1;)Ljava/lang/Object;
public abstract fun tryResumeWithException (Ljava/lang/Throwable;)Ljava/lang/Object;
}

Expand All @@ -56,6 +57,8 @@ public final class kotlinx/coroutines/CancellableContinuation$DefaultImpls {

public class kotlinx/coroutines/CancellableContinuationImpl : kotlin/coroutines/jvm/internal/CoroutineStackFrame, kotlinx/coroutines/CancellableContinuation {
public fun <init> (Lkotlin/coroutines/Continuation;I)V
public final fun callCancelHandler (Lkotlinx/coroutines/CancelHandler;Ljava/lang/Throwable;)V
public final fun callOnCancellation (Lkotlin/jvm/functions/Function1;Ljava/lang/Throwable;)V
public fun cancel (Ljava/lang/Throwable;)Z
public fun completeResume (Ljava/lang/Object;)V
public fun getCallerFrame ()Lkotlin/coroutines/jvm/internal/CoroutineStackFrame;
Expand All @@ -75,14 +78,12 @@ public class kotlinx/coroutines/CancellableContinuationImpl : kotlin/coroutines/
public fun resumeWith (Ljava/lang/Object;)V
public fun toString ()Ljava/lang/String;
public fun tryResume (Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;
public fun tryResume (Ljava/lang/Object;Ljava/lang/Object;Lkotlin/jvm/functions/Function1;)Ljava/lang/Object;
public fun tryResumeWithException (Ljava/lang/Throwable;)Ljava/lang/Object;
}

public final class kotlinx/coroutines/CancellableContinuationKt {
public static final fun disposeOnCancellation (Lkotlinx/coroutines/CancellableContinuation;Lkotlinx/coroutines/DisposableHandle;)V
public static final fun suspendAtomicCancellableCoroutine (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun suspendAtomicCancellableCoroutine (ZLkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static synthetic fun suspendAtomicCancellableCoroutine$default (ZLkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
public static final fun suspendCancellableCoroutine (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

Expand Down Expand Up @@ -272,10 +273,6 @@ public final class kotlinx/coroutines/DelayKt {
public static final fun delay-p9JZ4hM (DLkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public final class kotlinx/coroutines/DispatchedContinuationKt {
public static final fun resumeCancellableWith (Lkotlin/coroutines/Continuation;Ljava/lang/Object;)V
}

public final class kotlinx/coroutines/Dispatchers {
public static final field INSTANCE Lkotlinx/coroutines/Dispatchers;
public static final fun getDefault ()Lkotlinx/coroutines/CoroutineDispatcher;
Expand Down Expand Up @@ -613,8 +610,10 @@ public final class kotlinx/coroutines/channels/ChannelIterator$DefaultImpls {
}

public final class kotlinx/coroutines/channels/ChannelKt {
public static final fun Channel (I)Lkotlinx/coroutines/channels/Channel;
public static final synthetic fun Channel (I)Lkotlinx/coroutines/channels/Channel;
public static final fun Channel (ILkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/channels/Channel;
public static synthetic fun Channel$default (IILjava/lang/Object;)Lkotlinx/coroutines/channels/Channel;
public static synthetic fun Channel$default (ILkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lkotlinx/coroutines/channels/Channel;
}

public final class kotlinx/coroutines/channels/ChannelsKt {
Expand Down
15 changes: 13 additions & 2 deletions kotlinx-coroutines-core/common/src/Await.kt
Expand Up @@ -5,6 +5,7 @@
package kotlinx.coroutines

import kotlinx.atomicfu.*
import kotlinx.coroutines.channels.*
import kotlin.coroutines.*

/**
Expand All @@ -18,6 +19,8 @@ import kotlin.coroutines.*
* This suspending function is cancellable.
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting,
* this function immediately resumes with [CancellationException].
* There is a **prompt cancellation guarantee**. If the job was cancelled while this function was
* suspended, it will not resume successfully. See [suspendCancellableCoroutine] documentation for low-level details.
*/
public suspend fun <T> awaitAll(vararg deferreds: Deferred<T>): List<T> =
if (deferreds.isEmpty()) emptyList() else AwaitAll(deferreds).await()
Expand All @@ -33,6 +36,8 @@ public suspend fun <T> awaitAll(vararg deferreds: Deferred<T>): List<T> =
* This suspending function is cancellable.
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting,
* this function immediately resumes with [CancellationException].
* There is a **prompt cancellation guarantee**. If the job was cancelled while this function was
* suspended, it will not resume successfully. See [suspendCancellableCoroutine] documentation for low-level details.
*/
public suspend fun <T> Collection<Deferred<T>>.awaitAll(): List<T> =
if (isEmpty()) emptyList() else AwaitAll(toTypedArray()).await()
Expand All @@ -41,17 +46,23 @@ public suspend fun <T> Collection<Deferred<T>>.awaitAll(): List<T> =
* Suspends current coroutine until all given jobs are complete.
* This method is semantically equivalent to joining all given jobs one by one with `jobs.forEach { it.join() }`.
*
* This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting,
* This suspending function is cancellable.
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting,
* this function immediately resumes with [CancellationException].
* There is a **prompt cancellation guarantee**. If the job was cancelled while this function was
* suspended, it will not resume successfully. See [suspendCancellableCoroutine] documentation for low-level details.
*/
public suspend fun joinAll(vararg jobs: Job): Unit = jobs.forEach { it.join() }

/**
* Suspends current coroutine until all given jobs are complete.
* This method is semantically equivalent to joining all given jobs one by one with `forEach { it.join() }`.
*
* This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting,
* This suspending function is cancellable.
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting,
* this function immediately resumes with [CancellationException].
* There is a **prompt cancellation guarantee**. If the job was cancelled while this function was
* suspended, it will not resume successfully. See [suspendCancellableCoroutine] documentation for low-level details.
*/
public suspend fun Collection<Job>.joinAll(): Unit = forEach { it.join() }

Expand Down
5 changes: 3 additions & 2 deletions kotlinx-coroutines-core/common/src/Builders.common.kt
Expand Up @@ -129,8 +129,9 @@ private class LazyDeferredCoroutine<T>(
* This function uses dispatcher from the new context, shifting execution of the [block] into the
* different thread if a new dispatcher is specified, and back to the original dispatcher
* when it completes. Note that the result of `withContext` invocation is
* dispatched into the original context in a cancellable way, which means that if the original [coroutineContext],
* in which `withContext` was invoked, is cancelled by the time its dispatcher starts to execute the code,
* dispatched into the original context in a cancellable way with a **prompt cancellation guarantee**,
* which means that if the original [coroutineContext], in which `withContext` was invoked,
* is cancelled by the time its dispatcher starts to execute the code,
* it discards the result of `withContext` and throws [CancellationException].
*/
public suspend fun <T> withContext(
Expand Down

0 comments on commit 338c209

Please sign in to comment.