Skip to content

Commit

Permalink
Ensure that flowOn does not resume downstream after cancellation
Browse files Browse the repository at this point in the history
This is a problematic for Android when Main dispatcher is cancelled on destroyed activity. Atomic nature of the underlying channels is designed to prevent loss of elements, which is really not an issue for a typical application, but creates problem when used with channels.

This change introduces an optional `atomicCancellation` parameter to an internal `ReceiveChannel.receiveOrClosed` method to control atomicity of this receive and tweaks `emitAll(ReceiveChannel)` implementation to abandon atomicity in favor of predictable cancellation.

Fixes #1265
  • Loading branch information
elizarov committed Dec 23, 2019
1 parent 12a0318 commit cd5c58e
Show file tree
Hide file tree
Showing 12 changed files with 73 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,7 @@ public abstract interface class kotlinx/coroutines/channels/ActorScope : kotlinx

public final class kotlinx/coroutines/channels/ActorScope$DefaultImpls {
public static synthetic fun cancel (Lkotlinx/coroutines/channels/ActorScope;)V
public static synthetic fun receiveOrClosed (Lkotlinx/coroutines/channels/ActorScope;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public abstract interface class kotlinx/coroutines/channels/BroadcastChannel : kotlinx/coroutines/channels/SendChannel {
Expand Down Expand Up @@ -572,6 +573,7 @@ public abstract interface class kotlinx/coroutines/channels/Channel : kotlinx/co

public final class kotlinx/coroutines/channels/Channel$DefaultImpls {
public static synthetic fun cancel (Lkotlinx/coroutines/channels/Channel;)V
public static synthetic fun receiveOrClosed (Lkotlinx/coroutines/channels/Channel;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public final class kotlinx/coroutines/channels/Channel$Factory {
Expand Down Expand Up @@ -768,14 +770,17 @@ public abstract interface class kotlinx/coroutines/channels/ReceiveChannel {
public abstract fun iterator ()Lkotlinx/coroutines/channels/ChannelIterator;
public abstract fun poll ()Ljava/lang/Object;
public abstract fun receive (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun receiveOrClosed (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract synthetic fun receiveOrClosed (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun receiveOrClosed (ZLkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun receiveOrNull (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public final class kotlinx/coroutines/channels/ReceiveChannel$DefaultImpls {
public static synthetic fun cancel (Lkotlinx/coroutines/channels/ReceiveChannel;)V
public static synthetic fun cancel$default (Lkotlinx/coroutines/channels/ReceiveChannel;Ljava/lang/Throwable;ILjava/lang/Object;)Z
public static synthetic fun cancel$default (Lkotlinx/coroutines/channels/ReceiveChannel;Ljava/util/concurrent/CancellationException;ILjava/lang/Object;)V
public static synthetic fun receiveOrClosed (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static synthetic fun receiveOrClosed$default (Lkotlinx/coroutines/channels/ReceiveChannel;ZLkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
}

public abstract interface class kotlinx/coroutines/channels/SendChannel {
Expand Down
13 changes: 8 additions & 5 deletions kotlinx-coroutines-core/common/src/CancellableContinuation.kt
Original file line number Diff line number Diff line change
Expand Up @@ -227,17 +227,20 @@ public suspend inline fun <T> suspendAtomicCancellableCoroutine(
* Suspends coroutine similar to [suspendAtomicCancellableCoroutine], but an instance of [CancellableContinuationImpl] is reused if possible.
*/
internal suspend inline fun <T> suspendAtomicCancellableCoroutineReusable(
resumeMode: Int = MODE_ATOMIC_DEFAULT,
crossinline block: (CancellableContinuation<T>) -> Unit
): T = suspendCoroutineUninterceptedOrReturn { uCont ->
val cancellable = getOrCreateCancellableContinuation(uCont.intercepted())
val cancellable = getOrCreateCancellableContinuation(uCont.intercepted(), resumeMode)
block(cancellable)
cancellable.getResult()
}

internal fun <T> getOrCreateCancellableContinuation(delegate: Continuation<T>): CancellableContinuationImpl<T> {
internal fun <T> getOrCreateCancellableContinuation(
delegate: Continuation<T>, resumeMode: Int
): CancellableContinuationImpl<T> {
// If used outside of our dispatcher
if (delegate !is DispatchedContinuation<T>) {
return CancellableContinuationImpl(delegate, resumeMode = MODE_ATOMIC_DEFAULT)
return CancellableContinuationImpl(delegate, resumeMode)
}
/*
* Attempt to claim reusable instance.
Expand All @@ -253,8 +256,8 @@ internal fun <T> getOrCreateCancellableContinuation(delegate: Continuation<T>):
* thus leaking CC instance for indefinite time.
* 2) Continuation was cancelled. Then we should prevent any further reuse and bail out.
*/
return delegate.claimReusableCancellableContinuation()?.takeIf { it.resetState() }
?: return CancellableContinuationImpl(delegate, MODE_ATOMIC_DEFAULT)
return delegate.claimReusableCancellableContinuation()?.takeIf { it.resetState(resumeMode) }
?: return CancellableContinuationImpl(delegate, resumeMode)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ internal open class CancellableContinuationImpl<in T>(
* Resets cancellability state in order to [suspendAtomicCancellableCoroutineReusable] to work.
* Invariant: used only by [suspendAtomicCancellableCoroutineReusable] in [REUSABLE_CLAIMED] state.
*/
internal fun resetState(): Boolean {
internal fun resetState(resumeMode: Int): Boolean {
assert { parentHandle !== NonDisposableHandle }
val state = _state.value
assert { state !is NotCompleted }
Expand All @@ -101,6 +101,7 @@ internal open class CancellableContinuationImpl<in T>(
}
_decision.value = UNDECIDED
_state.value = Active
this.resumeMode = resumeMode
return true
}

Expand Down Expand Up @@ -128,7 +129,7 @@ internal open class CancellableContinuationImpl<in T>(

private fun checkCompleted(): Boolean {
val completed = isCompleted
if (resumeMode != MODE_ATOMIC_DEFAULT) return completed // Do not check postponed cancellation for non-reusable continuations
if (!resumeMode.isReusableMode) return completed // Do not check postponed cancellation for non-reusable continuations
val dispatched = delegate as? DispatchedContinuation<*> ?: return completed
val cause = dispatched.checkPostponedCancellation(this) ?: return completed
if (!completed) {
Expand Down Expand Up @@ -157,7 +158,7 @@ internal open class CancellableContinuationImpl<in T>(
* Attempt to postpone cancellation for reusable cancellable continuation
*/
private fun cancelLater(cause: Throwable): Boolean {
if (resumeMode != MODE_ATOMIC_DEFAULT) return false
if (!resumeMode.isReusableMode) return false
val dispatched = (delegate as? DispatchedContinuation<*>) ?: return false
return dispatched.postponeCancellation(cause)
}
Expand Down
12 changes: 7 additions & 5 deletions kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt
Original file line number Diff line number Diff line change
Expand Up @@ -556,11 +556,13 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
@Suppress("UNCHECKED_CAST")
if (result !== POLL_FAILED && result !is Closed<*>) return result as E
// slow-path does suspend
return receiveSuspend(RECEIVE_THROWS_ON_CLOSE)
return receiveSuspend(RECEIVE_THROWS_ON_CLOSE, MODE_ATOMIC_DEFAULT)
}

@Suppress("UNCHECKED_CAST")
private suspend fun <R> receiveSuspend(receiveMode: Int): R = suspendAtomicCancellableCoroutineReusable sc@ { cont ->
private suspend fun <R> receiveSuspend(
receiveMode: Int, resumeMode: Int
): R = suspendAtomicCancellableCoroutineReusable(resumeMode) sc@ { cont ->
val receive = ReceiveElement<E>(cont as CancellableContinuation<Any?>, receiveMode)
while (true) {
if (enqueueReceive(receive)) {
Expand Down Expand Up @@ -594,7 +596,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
@Suppress("UNCHECKED_CAST")
if (result !== POLL_FAILED && result !is Closed<*>) return result as E
// slow-path does suspend
return receiveSuspend(RECEIVE_NULL_ON_CLOSE)
return receiveSuspend(RECEIVE_NULL_ON_CLOSE, MODE_ATOMIC_DEFAULT)
}

@Suppress("UNCHECKED_CAST")
Expand All @@ -607,12 +609,12 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
}

@Suppress("UNCHECKED_CAST")
public final override suspend fun receiveOrClosed(): ValueOrClosed<E> {
public final override suspend fun receiveOrClosed(atomicCancellation: Boolean): ValueOrClosed<E> {
// fast path -- try poll non-blocking
val result = pollInternal()
if (result !== POLL_FAILED) return result.toResult()
// slow-path does suspend
return receiveSuspend(RECEIVE_RESULT)
return receiveSuspend(RECEIVE_RESULT, if (atomicCancellation) MODE_ATOMIC_DEFAULT else MODE_CANCELLABLE_REUSABLE)
}

@Suppress("UNCHECKED_CAST")
Expand Down
9 changes: 6 additions & 3 deletions kotlinx-coroutines-core/common/src/channels/Channel.kt
Original file line number Diff line number Diff line change
Expand Up @@ -251,8 +251,8 @@ public interface ReceiveChannel<out E> {
* This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
* function is suspended, this function immediately resumes with a [CancellationException].
*
* *Cancellation of suspended `receive` is atomic*: when this function
* throws a [CancellationException], it means that the element was not retrieved from this channel.
* When [atomicCancellation] is set to `true` (by default) then *cancellation of suspended `receive` is atomic*:
* when this function throws a [CancellationException], it means that the element was not retrieved from this channel.
* As a side-effect of atomic cancellation, a thread-bound coroutine (to some UI thread, for example) may
* continue to execute even after it was cancelled from the same thread in the case when this receive operation
* was already resumed and the continuation was posted for execution to the thread's queue.
Expand All @@ -267,7 +267,10 @@ public interface ReceiveChannel<out E> {
* [KT-27524](https://youtrack.jetbrains.com/issue/KT-27524) needs to be fixed.
*/
@InternalCoroutinesApi // until https://youtrack.jetbrains.com/issue/KT-27524 is fixed
public suspend fun receiveOrClosed(): ValueOrClosed<E>
public suspend fun receiveOrClosed(atomicCancellation: Boolean = true): ValueOrClosed<E>

@Deprecated(level = DeprecationLevel.HIDDEN, message = "Binary compatibility") // Since version 1.4.0
public suspend fun receiveOrClosed(): ValueOrClosed<E> = receiveOrClosed(atomicCancellation = true)

/**
* Clause for the [select] expression of the [receiveOrClosed] suspending function that selects with the [ValueOrClosed] with a value
Expand Down
9 changes: 6 additions & 3 deletions kotlinx-coroutines-core/common/src/flow/Channels.kt
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@ import kotlinx.coroutines.flow.internal.unsafeFlow as flow
* the channel afterwards. If you need to iterate over the channel without consuming it,
* a regular `for` loop should be used instead.
*
* This function provides a more efficient shorthand for `channel.consumeEach { value -> emit(value) }`.
* See [consumeEach][ReceiveChannel.consumeEach].
* Note, that emitting values from a channel into a flow is not atomic. A value that was received from the
* channel many not reach the flow collector if it was cancelled and will be lost.
*
* This function provides a more efficient shorthand for `channel.consumeEach { value -> emit(value) }` modulo
* atomicity. See [consumeEach][ReceiveChannel.consumeEach].
*/
@ExperimentalCoroutinesApi
public suspend fun <T> FlowCollector<T>.emitAll(channel: ReceiveChannel<T>) {
Expand All @@ -42,7 +45,7 @@ public suspend fun <T> FlowCollector<T>.emitAll(channel: ReceiveChannel<T>) {
// L$1 <- channel
// L$2 <- cause
// L$3 <- this$run (actually equal to this)
val result = run { channel.receiveOrClosed() }
val result = run { channel.receiveOrClosed(atomicCancellation = false) }
if (result.isClosed) {
result.closeCause?.let { throw it }
break // returns normally when result.closeCause == null
Expand Down
6 changes: 5 additions & 1 deletion kotlinx-coroutines-core/common/src/flow/operators/Context.kt
Original file line number Diff line number Diff line change
Expand Up @@ -170,13 +170,17 @@ public fun <T> Flow<T>.conflate(): Flow<T> = buffer(CONFLATED)
*
* For more explanation of context preservation please refer to [Flow] documentation.
*
* This operators retains a _sequential_ nature of flow if changing the context does not call for changing
* This operator retains a _sequential_ nature of flow if changing the context does not call for changing
* the [dispatcher][CoroutineDispatcher]. Otherwise, if changing dispatcher is required, it collects
* flow emissions in one coroutine that is run using a specified [context] and emits them from another coroutines
* with the original collector's context using a channel with a [default][Channel.BUFFERED] buffer size
* between two coroutines similarly to [buffer] operator, unless [buffer] operator is explicitly called
* before or after `flowOn`, which requests buffering behavior and specifies channel size.
*
* Note, that flows operating across different dispatchers might lose some in-flight elements when cancelled.
* In particular, this operator ensures that downstream flow does not resume on cancellation even if the element
* was already emitted by the upstream flow.
*
* ### Operator fusion
*
* Adjacent applications of [channelFlow], [flowOn], [buffer], [produceIn], and [broadcastIn] are
Expand Down
17 changes: 11 additions & 6 deletions kotlinx-coroutines-core/common/src/internal/DispatchedTask.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,17 @@ import kotlinx.coroutines.internal.*
import kotlin.coroutines.*
import kotlin.jvm.*

@PublishedApi internal const val MODE_ATOMIC_DEFAULT = 0 // schedule non-cancellable dispatch for suspendCoroutine
@PublishedApi internal const val MODE_CANCELLABLE = 1 // schedule cancellable dispatch for suspendCancellableCoroutine
@PublishedApi internal const val MODE_UNDISPATCHED = 2 // when the thread is right, but need to mark it with current coroutine
@PublishedApi
internal const val MODE_ATOMIC_DEFAULT = 0 // schedule non-cancellable dispatch for suspendCoroutine
@PublishedApi
internal const val MODE_CANCELLABLE = 1 // schedule cancellable dispatch for suspendCancellableCoroutine

internal val Int.isCancellableMode get() = this == MODE_CANCELLABLE
internal val Int.isDispatchedMode get() = this == MODE_ATOMIC_DEFAULT || this == MODE_CANCELLABLE
internal const val MODE_CANCELLABLE_REUSABLE = 2 // same as MODE_CANCELLABLE but supports reused
internal const val MODE_UNDISPATCHED = 3 // when the thread is right, but need to mark it with current coroutine

internal val Int.isCancellableMode get() = this == MODE_CANCELLABLE || this == MODE_CANCELLABLE_REUSABLE
internal val Int.isDispatchedMode get() = this != MODE_UNDISPATCHED
internal val Int.isReusableMode get() = this == MODE_ATOMIC_DEFAULT || this == MODE_CANCELLABLE_REUSABLE

internal abstract class DispatchedTask<in T>(
@JvmField public var resumeMode: Int
Expand Down Expand Up @@ -120,7 +125,7 @@ internal fun <T> DispatchedTask<T>.resume(delegate: Continuation<T>, useMode: In
val result = if (exception != null) Result.failure(exception) else Result.success(state as T)
when (useMode) {
MODE_ATOMIC_DEFAULT -> delegate.resumeWith(result)
MODE_CANCELLABLE -> delegate.resumeCancellableWith(result)
MODE_CANCELLABLE, MODE_CANCELLABLE_REUSABLE -> delegate.resumeCancellableWith(result)
MODE_UNDISPATCHED -> (delegate as DispatchedContinuation).resumeUndispatchedWith(result)
else -> error("Invalid mode $useMode")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ private class ChannelViaBroadcast<E>(

override suspend fun receive(): E = sub.receive()
override suspend fun receiveOrNull(): E? = sub.receiveOrNull()
override suspend fun receiveOrClosed(): ValueOrClosed<E> = sub.receiveOrClosed()
override suspend fun receiveOrClosed(atomicCancellation: Boolean): ValueOrClosed<E> = sub.receiveOrClosed(atomicCancellation)
override fun poll(): E? = sub.poll()
override fun iterator(): ChannelIterator<E> = sub.iterator()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,15 +134,14 @@ class CatchTest : TestBase() {
// flowOn with a different dispatcher introduces asynchrony so that all exceptions in the
// upstream flows are handled before they go downstream
.onEach { value ->
expect(8)
assertEquals("OK", value)
expectUnreached() // already cancelled
}
.catch { e ->
expect(9)
expect(8)
assertTrue(e is TestException)
assertSame(d0, kotlin.coroutines.coroutineContext[ContinuationInterceptor] as CoroutineContext)
}
.collect()
finish(10)
finish(9)
}
}
15 changes: 15 additions & 0 deletions kotlinx-coroutines-core/common/test/flow/operators/FlowOnTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -314,4 +314,19 @@ class FlowOnTest : TestBase() {
assertEquals(expected, value)
}
}

@Test
fun testCancelledFlowOn() = runTest {
assertFailsWith<CancellationException> {
coroutineScope {
flow {
cancel()
emit(Unit)
}.flowOn(Dispatchers.Default).collect {
// should not be reached, because cancelled before emit
expectUnreached()
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class ReusableCancellableContinuationTest : TestBase() {

@Test
fun testReusable() = runTest {
testContinuationsCount(10, 1, ::suspendAtomicCancellableCoroutineReusable)
testContinuationsCount(10, 1) { suspendAtomicCancellableCoroutineReusable(block = it) }
}

@Test
Expand Down

0 comments on commit cd5c58e

Please sign in to comment.