Skip to content

Commit

Permalink
Introduce new CancellableContinuation.resume(value, onCancellation)
Browse files Browse the repository at this point in the history
… with `onCancellation` that takes value, cause, and the coroutine context as parameters
  • Loading branch information
ndkoval committed Dec 16, 2021
1 parent 41b88af commit 6e62065
Show file tree
Hide file tree
Showing 5 changed files with 157 additions and 68 deletions.
51 changes: 49 additions & 2 deletions kotlinx-coroutines-core/common/src/CancellableContinuation.kt
Expand Up @@ -87,7 +87,7 @@ public interface CancellableContinuation<in T> : Continuation<T> {
* @suppress **This is unstable API and it is subject to change.**
*/
@InternalCoroutinesApi
public fun tryResume(value: T, idempotent: Any?, onCancellation: ((cause: Throwable) -> Unit)?): Any?
public fun tryResume(value: T, idempotent: Any?, onCancellation: OnCancellation<@UnsafeVariance T>?): Any?

/**
* Tries to resume this continuation with the specified [exception] and returns a non-null object token if successful,
Expand Down Expand Up @@ -201,8 +201,55 @@ public interface CancellableContinuation<in T> : Continuation<T> {
* It can be invoked concurrently with the surrounding code.
* There is no guarantee on the execution context of its invocation.
*/
@Deprecated(level = DeprecationLevel.WARNING,
message = "Replaced with resume(value: T, onCancellation: OnCancellation<T>?)")
@ExperimentalCoroutinesApi // since 1.2.0
public fun resume(value: T, onCancellation: ((cause: Throwable) -> Unit)?)
public fun resume(value: T, onCancellation: ((cause: Throwable) -> Unit)?) {
if (onCancellation == null) {
resume(value)
return
}
val onCancellationNew = OnCancellation<T> { _, cause, _ ->
onCancellation(cause ?: CancellationException("Cancelled"))
}
resume(value, onCancellationNew)
}

/**
* Resumes this continuation with the specified `value` and calls the specified `onCancellation`
* handler when either resumed too late (when continuation was already cancelled) or, although resumed
* successfully (before cancellation), the coroutine's job was cancelled before it had a
* chance to run in its dispatcher, so that the suspended function threw an exception
* instead of returning this value.
*
* The installed [onCancellation] handler should not throw any exceptions.
* If it does, they will get caught, wrapped into a [CompletionHandlerException] and
* processed as an uncaught exception in the context of the current coroutine
* (see [CoroutineExceptionHandler]).
*
* This function shall be used when resuming with a resource that must be closed by
* code that called the corresponding suspending function, for example:
*
* ```
* continuation.resume(resource) { r, cause ->
* r.close(cause)
* }
* ```
*
* A more complete example and further details are given in
* the documentation for the [suspendCancellableCoroutine] function.
*
* **Note**: The [onCancellation] handler must be fast, non-blocking, and thread-safe.
* It can be invoked concurrently with the surrounding code.
* There is no guarantee on the execution context of its invocation.
*/
@ExperimentalCoroutinesApi
@InternalCoroutinesApi
public fun resume(value: T, onCancellation: OnCancellation<@UnsafeVariance T>?)
}

public fun interface OnCancellation<in T> {
public fun invoke(value: T, cause: Throwable?, context: CoroutineContext)
}

/**
Expand Down
49 changes: 26 additions & 23 deletions kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt
Expand Up @@ -119,7 +119,7 @@ internal open class CancellableContinuationImpl<in T>(
assert { parentHandle !== NonDisposableHandle }
val state = _state.value
assert { state !is NotCompleted }
if (state is CompletedContinuation && state.idempotentResume != null) {
if (state is CompletedContinuation<*> && state.idempotentResume != null) {
// Cannot reuse continuation that was resumed with idempotent marker
detachChild()
return false
Expand All @@ -142,18 +142,19 @@ internal open class CancellableContinuationImpl<in T>(
when (state) {
is NotCompleted -> error("Not completed")
is CompletedExceptionally -> return // already completed exception or cancelled, nothing to do
is CompletedContinuation -> {
is CompletedContinuation<*> -> {
check(!state.cancelled) { "Must be called at most once" }
val update = state.copy(cancelCause = cause)
if (_state.compareAndSet(state, update)) {
state.invokeHandlers(this, cause)
state as CompletedContinuation<T>
state.invokeHandlers(this, state.result, cause)
return // done
}
}
else -> {
// completed normally without marker class, promote to CompletedContinuation in case
// if invokeOnCancellation if called later
if (_state.compareAndSet(state, CompletedContinuation(state, cancelCause = cause))) {
if (_state.compareAndSet(state, CompletedContinuation<T>(state as T, cancelCause = cause))) {
return // done
}
}
Expand Down Expand Up @@ -214,9 +215,9 @@ internal open class CancellableContinuationImpl<in T>(
fun callCancelHandler(handler: CancelHandler, cause: Throwable?) =
callCancelHandlerSafely { handler.invoke(cause) }

fun callOnCancellation(onCancellation: (cause: Throwable) -> Unit, cause: Throwable) {
fun callOnCancellation(onCancellation: OnCancellation<@UnsafeVariance T>, value: T, cause: Throwable) {
try {
onCancellation.invoke(cause)
onCancellation.invoke(value, cause, context)
} catch (ex: Throwable) {
// Handler should never fail, if it does -- it is an unhandled exception
handleCoroutineException(
Expand Down Expand Up @@ -327,8 +328,9 @@ internal open class CancellableContinuationImpl<in T>(
override fun resumeWith(result: Result<T>) =
resumeImpl(result.toState(this), resumeMode)

override fun resume(value: T, onCancellation: ((cause: Throwable) -> Unit)?) =
override fun resume(value: T, onCancellation: OnCancellation<@UnsafeVariance T>?) {
resumeImpl(value, resumeMode, onCancellation)
}

public override fun invokeOnCancellation(handler: CompletionHandler) {
val cancelHandler = makeCancelHandler(handler)
Expand All @@ -355,7 +357,7 @@ internal open class CancellableContinuationImpl<in T>(
}
return
}
is CompletedContinuation -> {
is CompletedContinuation<*> -> {
/*
* Continuation was already completed, and might already have cancel handler.
*/
Expand All @@ -377,7 +379,7 @@ internal open class CancellableContinuationImpl<in T>(
* does not need to be called in this case.
*/
if (cancelHandler is BeforeResumeCancelHandler) return
val update = CompletedContinuation(state, cancelHandler = cancelHandler)
val update = CompletedContinuation<T>(state as T, cancelHandler = cancelHandler)
if (_state.compareAndSet(state, update)) return // quit on cas success
}
}
Expand All @@ -401,7 +403,7 @@ internal open class CancellableContinuationImpl<in T>(
state: NotCompleted,
proposedUpdate: Any?,
resumeMode: Int,
onCancellation: ((cause: Throwable) -> Unit)?,
onCancellation: OnCancellation<T>?,
idempotent: Any?
): Any? = when {
proposedUpdate is CompletedExceptionally -> {
Expand All @@ -413,14 +415,14 @@ internal open class CancellableContinuationImpl<in T>(
onCancellation != null || (state is CancelHandler && state !is BeforeResumeCancelHandler) || idempotent != null ->
// mark as CompletedContinuation if special cases are present:
// Cancellation handlers that shall be called after resume or idempotent resume
CompletedContinuation(proposedUpdate, state as? CancelHandler, onCancellation, idempotent)
CompletedContinuation(proposedUpdate as T, state as? CancelHandler, onCancellation, idempotent)
else -> proposedUpdate // simple case -- use the value directly
}

private fun resumeImpl(
proposedUpdate: Any?,
resumeMode: Int,
onCancellation: ((cause: Throwable) -> Unit)? = null
onCancellation: OnCancellation<T>? = null
) {
_state.loop { state ->
when (state) {
Expand All @@ -439,7 +441,8 @@ internal open class CancellableContinuationImpl<in T>(
*/
if (state.makeResumed()) { // check if trying to resume one (otherwise error)
// call onCancellation
onCancellation?.let { callOnCancellation(it, state.cause) }
proposedUpdate as T
onCancellation?.let { callOnCancellation(it, proposedUpdate, state.cause) }
return // done
}
}
Expand All @@ -455,7 +458,7 @@ internal open class CancellableContinuationImpl<in T>(
private fun tryResumeImpl(
proposedUpdate: Any?,
idempotent: Any?,
onCancellation: ((cause: Throwable) -> Unit)?
onCancellation: OnCancellation<T>?
): Symbol? {
_state.loop { state ->
when (state) {
Expand All @@ -465,7 +468,7 @@ internal open class CancellableContinuationImpl<in T>(
detachChildIfNonResuable()
return RESUME_TOKEN
}
is CompletedContinuation -> {
is CompletedContinuation<*> -> {
return if (idempotent != null && state.idempotentResume === idempotent) {
assert { state.result == proposedUpdate } // "Non-idempotent resume"
RESUME_TOKEN // resumed with the same token -- ok
Expand Down Expand Up @@ -501,7 +504,7 @@ internal open class CancellableContinuationImpl<in T>(
override fun tryResume(value: T, idempotent: Any?): Any? =
tryResumeImpl(value, idempotent, onCancellation = null)

override fun tryResume(value: T, idempotent: Any?, onCancellation: ((cause: Throwable) -> Unit)?): Any? =
override fun tryResume(value: T, idempotent: Any?, onCancellation: OnCancellation<@UnsafeVariance T>?): Any? =
tryResumeImpl(value, idempotent, onCancellation)

override fun tryResumeWithException(exception: Throwable): Any? =
Expand All @@ -526,7 +529,7 @@ internal open class CancellableContinuationImpl<in T>(
@Suppress("UNCHECKED_CAST")
override fun <T> getSuccessfulResult(state: Any?): T =
when (state) {
is CompletedContinuation -> state.result as T
is CompletedContinuation<*> -> state.result as T
else -> state as T
}

Expand All @@ -536,7 +539,7 @@ internal open class CancellableContinuationImpl<in T>(
super.getExceptionalResult(state)?.let { recoverStackTrace(it, delegate) }

// For nicer debugging
public override fun toString(): String =
override fun toString(): String =
"${nameString()}(${delegate.toDebugString()}){$stateDebugRepresentation}@$hexAddress"

protected open fun nameString(): String =
Expand Down Expand Up @@ -576,17 +579,17 @@ private class InvokeOnCancel( // Clashes with InvokeOnCancellation
}

// Completed with additional metadata
private data class CompletedContinuation(
@JvmField val result: Any?,
private data class CompletedContinuation<T>(
@JvmField val result: T,
@JvmField val cancelHandler: CancelHandler? = null, // installed via invokeOnCancellation
@JvmField val onCancellation: ((cause: Throwable) -> Unit)? = null, // installed via resume block
@JvmField val onCancellation: OnCancellation<T>? = null, // installed via resume block
@JvmField val idempotentResume: Any? = null,
@JvmField val cancelCause: Throwable? = null
) {
val cancelled: Boolean get() = cancelCause != null

fun invokeHandlers(cont: CancellableContinuationImpl<*>, cause: Throwable) {
fun invokeHandlers(cont: CancellableContinuationImpl<T>, resumeValue: T, cause: Throwable) {
cancelHandler?.let { cont.callCancelHandler(it, cause) }
onCancellation?.let { cont.callOnCancellation(it, cause) }
onCancellation?.let { cont.callOnCancellation(it, resumeValue, cause) }
}
}

0 comments on commit 6e62065

Please sign in to comment.