Skip to content

Commit

Permalink
Implement optional thread interrupt on coroutine cancellation (Kotlin#57
Browse files Browse the repository at this point in the history
)

See issue Kotlin#57 for details

Signed-off-by: Trol <jiaoxiaodong@xiaomi.com>
  • Loading branch information
Trol committed Apr 19, 2020
1 parent 5eaf83c commit 3565690
Show file tree
Hide file tree
Showing 6 changed files with 424 additions and 2 deletions.
25 changes: 25 additions & 0 deletions kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
Expand Up @@ -4,6 +4,7 @@ public abstract class kotlinx/coroutines/AbstractCoroutine : kotlinx/coroutines/
public synthetic fun <init> (Lkotlin/coroutines/CoroutineContext;ZILkotlin/jvm/internal/DefaultConstructorMarker;)V
protected fun afterResume (Ljava/lang/Object;)V
protected fun cancellationExceptionMessage ()Ljava/lang/String;
public final fun completeCoroutine (Ljava/lang/Object;)Ljava/lang/Object;
public final fun getContext ()Lkotlin/coroutines/CoroutineContext;
public fun getCoroutineContext ()Lkotlin/coroutines/CoroutineContext;
public fun isActive ()Z
Expand Down Expand Up @@ -187,6 +188,30 @@ public final class kotlinx/coroutines/CoroutineExceptionHandlerKt {
public static final fun handleCoroutineException (Lkotlin/coroutines/CoroutineContext;Ljava/lang/Throwable;)V
}

public abstract class kotlinx/coroutines/CoroutineInterruptController : kotlin/coroutines/AbstractCoroutineContextElement {
public static final field Key Lkotlinx/coroutines/CoroutineInterruptController$Key;
public fun <init> ()V
public abstract fun updateCoroutineCompleteState (Ljava/lang/Object;)Ljava/lang/Object;
}

public final class kotlinx/coroutines/CoroutineInterruptController$Key : kotlin/coroutines/CoroutineContext$Key {
}

public final class kotlinx/coroutines/CoroutineInterruptible : kotlinx/coroutines/CoroutineInterruptController, kotlinx/coroutines/ThreadContextElement {
public static final field INSTANCE Lkotlinx/coroutines/CoroutineInterruptible;
public synthetic fun restoreThreadContext (Lkotlin/coroutines/CoroutineContext;Ljava/lang/Object;)V
public fun restoreThreadContext (Lkotlin/coroutines/CoroutineContext;Lkotlinx/coroutines/CoroutineInterruptible$ThreadState;)V
public fun updateCoroutineCompleteState (Ljava/lang/Object;)Ljava/lang/Object;
public synthetic fun updateThreadContext (Lkotlin/coroutines/CoroutineContext;)Ljava/lang/Object;
public fun updateThreadContext (Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/CoroutineInterruptible$ThreadState;
}

public final class kotlinx/coroutines/CoroutineInterruptible$ThreadState {
public fun <init> ()V
public final fun clearInterrupt ()V
public final fun initInterrupt (Lkotlinx/coroutines/Job;)V
}

public final class kotlinx/coroutines/CoroutineName : kotlin/coroutines/AbstractCoroutineContextElement {
public static final field Key Lkotlinx/coroutines/CoroutineName$Key;
public fun <init> (Ljava/lang/String;)V
Expand Down
13 changes: 12 additions & 1 deletion kotlinx-coroutines-core/common/src/AbstractCoroutine.kt
Expand Up @@ -104,11 +104,22 @@ public abstract class AbstractCoroutine<in T>(
onCompleted(state as T)
}

/**
* Completes execution of this coroutine with the specified state.
*/
public fun completeCoroutine(state: Any?): Any? {
var completeState = state
context[CoroutineInterruptController]?.let {
completeState = it.updateCoroutineCompleteState(completeState)
}
return makeCompletingOnce(completeState)
}

/**
* Completes execution of this with coroutine with the specified result.
*/
public final override fun resumeWith(result: Result<T>) {
val state = makeCompletingOnce(result.toState())
val state = completeCoroutine(result.toState())
if (state === COMPLETING_WAITING_CHILDREN) return
afterResume(state)
}
Expand Down
55 changes: 55 additions & 0 deletions kotlinx-coroutines-core/common/src/CoroutineInterruptController.kt
@@ -0,0 +1,55 @@
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines

import kotlin.coroutines.AbstractCoroutineContextElement
import kotlin.coroutines.CoroutineContext

/**
* This [CoroutineContext] element makes a coroutine interruptible.
*
* With this element, the thread executing the coroutine is interrupted when the coroutine is canceled, making
* blocking procedures stop. Exceptions that indicate an interrupted procedure, eg., InterruptedException on JVM
* are transformed into [CancellationException] at the end of the coroutine. Thus, everything else goes as if this
* element is not present. In particular, the parent coroutine won't be canceled by those exceptions.
*
* This is an abstract element and will be implemented by each individual platform (or won't be implemented).
* The JVM implementation is named CoroutineInterruptible.
*
* Example:
* ```
* GlobalScope.launch(Dispatchers.IO + CoroutineInterruptible) {
* async {
* // This block will throw [CancellationException] instead of an exception indicating
* // interruption, such as InterruptedException on JVM.
* withContext(CoroutineName) {
* doSomethingUseful()
*
* // This blocking procedure will be interrupted when this coroutine is canceled
* // by Exception thrown by the below async block.
* doSomethingElseUsefulInterruptible()
* }
* }
*
* async {
* delay(500L)
* throw Exception()
* }
* }
* ```
*/
abstract class CoroutineInterruptController : AbstractCoroutineContextElement(Key) {
/**
* Key for [CoroutineInterruptController] instance in the coroutine context.
*/
@InternalCoroutinesApi
companion object Key : CoroutineContext.Key<CoroutineInterruptController>

/**
* Update the complete state of a coroutine, mainly for exception transformation.
*/
@InternalCoroutinesApi
abstract fun updateCoroutineCompleteState(completeState: Any?): Any?
}
Expand Up @@ -125,7 +125,7 @@ private inline fun <T> ScopeCoroutine<T>.undispatchedResult(
* not a timeout exception.
*/
if (result === COROUTINE_SUSPENDED) return COROUTINE_SUSPENDED // (1)
val state = makeCompletingOnce(result)
val state = completeCoroutine(result)
if (state === COMPLETING_WAITING_CHILDREN) return COROUTINE_SUSPENDED // (2)
return if (state is CompletedExceptionally) { // (3)
when {
Expand Down
152 changes: 152 additions & 0 deletions kotlinx-coroutines-core/jvm/src/CoroutineInterruptible.kt
@@ -0,0 +1,152 @@
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines

import kotlinx.atomicfu.AtomicRef
import kotlinx.atomicfu.atomic
import kotlinx.atomicfu.loop
import kotlin.coroutines.CoroutineContext

/**
* This is the [CoroutineInterruptController] implementation on JVM. See [CoroutineInterruptController] for detailed
* description and examples.
*/
object CoroutineInterruptible :
CoroutineInterruptController(), ThreadContextElement<CoroutineInterruptible.ThreadState?> {

/**
* Update the complete state of a coroutine on JVM.
* Transforms [InterruptedException] into [CancellationException] for coroutines with this context element.
*/
@InternalCoroutinesApi
override fun updateCoroutineCompleteState(completeState: Any?): Any? =
if (completeState is CompletedExceptionally && completeState.cause is InterruptedException)
CompletedExceptionally(CancellationException())
else
completeState

/**
* Updates context of the current thread.
* This function is invoked before the coroutine in the specified [context] is resumed in the current thread.
* Prepares interruption for this execution, watching the [Job] for cancellation and interrupt this executing
* thread on cancellation.
*/
@InternalCoroutinesApi
override fun updateThreadContext(context: CoroutineContext): ThreadState? {
// Fast path: no Job in this context
val job = context[Job] ?: return null
// Slow path
val threadState = ThreadState()
threadState.initInterrupt(job)
return threadState
}

/**
* Restores context of the current thread.
* This function is invoked after the coroutine in the specified [context] is suspended in the current thread.
* Stops watching the [Job] for cancellation and do clean-up work.
*/
@InternalCoroutinesApi
override fun restoreThreadContext(context: CoroutineContext, oldState: ThreadState?) {
// Fast path: no Job in this context
val threadState = oldState ?: return
// Slow path
threadState.clearInterrupt()
}

/**
* Holds the state of executions for interruption.
*/
@InternalCoroutinesApi
class ThreadState {
fun initInterrupt(job: Job) {
initInvokeOnCancel(job)
initThread()
}

fun clearInterrupt() {
state.loop { s ->
when {
s is Working -> {
if (state.compareAndSet(s, Finish)) {
s.cancelHandle?.let { it.dispose() } // no more watching
return
}
}
s === Interrupting -> Thread.yield() // eases the thread
s === Interrupted -> { Thread.interrupted(); return } // no interrupt leak
s === Init || s === Finish -> throw IllegalStateException("impossible state")
else -> throw IllegalStateException("unknown state")
}
}
}

private fun initInvokeOnCancel(job: Job) {
// watches the job for cancellation
val cancelHandle =
job.invokeOnCompletion(onCancelling = true, invokeImmediately = true, handler = CancelHandler())
// remembers the cancel handle or drops it
state.loop { s ->
when {
s === Init -> if (state.compareAndSet(s, Working(null, cancelHandle))) return
s is Working -> if (state.compareAndSet(s, Working(s.thread, cancelHandle))) return
s === Finish -> { cancelHandle.dispose(); return } // no more watching needed
s === Interrupting || s === Interrupted -> return
else -> throw IllegalStateException("unknown state")
}
}
}

private fun initThread() {
val thread = Thread.currentThread()
state.loop { s ->
when {
s === Init -> if (state.compareAndSet(s, Working(thread, null))) return
s is Working -> if (state.compareAndSet(s, Working(thread, s.cancelHandle))) return
s === Interrupted -> { thread.interrupt(); return } // interrupted before the thread is set
s === Finish || s === Interrupting -> throw IllegalStateException("impossible state")
else -> throw IllegalStateException("unknown state")
}
}
}

private inner class CancelHandler : CompletionHandler {
override fun invoke(cause: Throwable?) {
state.loop { s ->
when {
s === Init || (s is Working && s.thread === null) -> {
if (state.compareAndSet(s, Interrupted))
return
}
s is Working -> {
if (state.compareAndSet(s, Interrupting)) {
s.thread!!.interrupt()
state.value = Interrupted
return
}
}
s === Finish -> return
s === Interrupting || s === Interrupted -> return
else -> throw IllegalStateException("unknown state")
}
}
}
}

private val state: AtomicRef<State> = atomic(Init)

private interface State
// initial state
private object Init : State
// cancellation watching is setup and/or the continuation is running
private data class Working(val thread: Thread?, val cancelHandle: DisposableHandle?) : State
// the continuation done running without interruption
private object Finish : State
// interrupting this thread
private object Interrupting: State
// done interrupting
private object Interrupted: State
}
}

0 comments on commit 3565690

Please sign in to comment.