Skip to content

Commit

Permalink
Remove requirement that job of the pre-created JobCancelNode have to … (
Browse files Browse the repository at this point in the history
#2427)

Remove the requirement that the job of the pre-created JobCancelNode have to be equal to the outer job

Job is supposed to be "sealed" interface, but we also have non-sealed Deferred that can be successfully implemented via delegation and such delegation may break code (if assertions are enabled!) in very subtle ways.

This assertion is our internal invariant that we're preserving anyway, so it's worth to lift it to simplify the life of our users in the future

Fixes #2423

Co-authored-by: Roman Elizarov <elizarov@gmail.com>
  • Loading branch information
qwwdfsad and elizarov committed Dec 7, 2020
1 parent 556f07a commit 1176267
Show file tree
Hide file tree
Showing 7 changed files with 86 additions and 64 deletions.
5 changes: 2 additions & 3 deletions kotlinx-coroutines-core/common/src/Await.kt
Expand Up @@ -5,7 +5,6 @@
package kotlinx.coroutines

import kotlinx.atomicfu.*
import kotlinx.coroutines.channels.*
import kotlin.coroutines.*

/**
Expand Down Expand Up @@ -75,7 +74,7 @@ private class AwaitAll<T>(private val deferreds: Array<out Deferred<T>>) {
val nodes = Array(deferreds.size) { i ->
val deferred = deferreds[i]
deferred.start() // To properly await lazily started deferreds
AwaitAllNode(cont, deferred).apply {
AwaitAllNode(cont).apply {
handle = deferred.invokeOnCompletion(asHandler)
}
}
Expand All @@ -101,7 +100,7 @@ private class AwaitAll<T>(private val deferreds: Array<out Deferred<T>>) {
override fun toString(): String = "DisposeHandlersOnCancel[$nodes]"
}

private inner class AwaitAllNode(private val continuation: CancellableContinuation<List<T>>, job: Job) : JobNode<Job>(job) {
private inner class AwaitAllNode(private val continuation: CancellableContinuation<List<T>>) : JobNode() {
lateinit var handle: DisposableHandle

private val _disposer = atomic<DisposeHandlersOnCancel?>(null)
Expand Down
Expand Up @@ -129,7 +129,7 @@ internal open class CancellableContinuationImpl<in T>(
val parent = delegate.context[Job] ?: return // fast path 3 -- don't do anything without parent
val handle = parent.invokeOnCompletion(
onCancelling = true,
handler = ChildContinuation(parent, this).asHandler
handler = ChildContinuation(this).asHandler
)
parentHandle = handle
// now check our state _after_ registering (could have completed while we were registering)
Expand Down
2 changes: 1 addition & 1 deletion kotlinx-coroutines-core/common/src/Job.kt
Expand Up @@ -490,7 +490,7 @@ public interface ChildHandle : DisposableHandle {
* ```
*/
internal fun Job.disposeOnCompletion(handle: DisposableHandle): DisposableHandle =
invokeOnCompletion(handler = DisposeOnCompletion(this, handle).asHandler)
invokeOnCompletion(handler = DisposeOnCompletion(handle).asHandler)

/**
* Cancels the job and suspends the invoking coroutine until the cancelled job is complete.
Expand Down
102 changes: 49 additions & 53 deletions kotlinx-coroutines-core/common/src/JobSupport.kt
Expand Up @@ -287,7 +287,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
// fast-path method to finalize normally completed coroutines without children
// returns true if complete, and afterCompletion(update) shall be called
private fun tryFinalizeSimpleState(state: Incomplete, update: Any?): Boolean {
assert { state is Empty || state is JobNode<*> } // only simple state without lists where children can concurrently add
assert { state is Empty || state is JobNode } // only simple state without lists where children can concurrently add
assert { update !is CompletedExceptionally } // only for normal completion
if (!_state.compareAndSet(state, update.boxIncomplete())) return false
onCancelling(null) // simple state is not a failure
Expand All @@ -313,7 +313,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
* 2) Invoke completion handlers: .join(), callbacks etc.
* It's important to invoke them only AFTER exception handling and everything else, see #208
*/
if (state is JobNode<*>) { // SINGLE/SINGLE+ state -- one completion handler (common case)
if (state is JobNode) { // SINGLE/SINGLE+ state -- one completion handler (common case)
try {
state.invoke(cause)
} catch (ex: Throwable) {
Expand All @@ -327,7 +327,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
private fun notifyCancelling(list: NodeList, cause: Throwable) {
// first cancel our own children
onCancelling(cause)
notifyHandlers<JobCancellingNode<*>>(list, cause)
notifyHandlers<JobCancellingNode>(list, cause)
// then cancel parent
cancelParent(cause) // tentative cancellation -- does not matter if there is no parent
}
Expand Down Expand Up @@ -359,9 +359,9 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
}

private fun NodeList.notifyCompletion(cause: Throwable?) =
notifyHandlers<JobNode<*>>(this, cause)
notifyHandlers<JobNode>(this, cause)

private inline fun <reified T: JobNode<*>> notifyHandlers(list: NodeList, cause: Throwable?) {
private inline fun <reified T: JobNode> notifyHandlers(list: NodeList, cause: Throwable?) {
var exception: Throwable? = null
list.forEach<T> { node ->
try {
Expand Down Expand Up @@ -453,21 +453,22 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
invokeImmediately: Boolean,
handler: CompletionHandler
): DisposableHandle {
var nodeCache: JobNode<*>? = null
// Create node upfront -- for common cases it just initializes JobNode.job field,
// for user-defined handlers it allocates a JobNode object that we might not need, but this is Ok.
val node: JobNode = makeNode(handler, onCancelling)
loopOnState { state ->
when (state) {
is Empty -> { // EMPTY_X state -- no completion handlers
if (state.isActive) {
// try move to SINGLE state
val node = nodeCache ?: makeNode(handler, onCancelling).also { nodeCache = it }
if (_state.compareAndSet(state, node)) return node
} else
promoteEmptyToNodeList(state) // that way we can add listener for non-active coroutine
}
is Incomplete -> {
val list = state.list
if (list == null) { // SINGLE/SINGLE+
promoteSingleToNodeList(state as JobNode<*>)
promoteSingleToNodeList(state as JobNode)
} else {
var rootCause: Throwable? = null
var handle: DisposableHandle = NonDisposableHandle
Expand All @@ -479,7 +480,6 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
// or we are adding a child to a coroutine that is not completing yet
if (rootCause == null || handler.isHandlerOf<ChildHandleNode>() && !state.isCompleting) {
// Note: add node the list while holding lock on state (make sure it cannot change)
val node = nodeCache ?: makeNode(handler, onCancelling).also { nodeCache = it }
if (!addLastAtomic(state, list, node)) return@loopOnState // retry
// just return node if we don't have to invoke handler (not cancelling yet)
if (rootCause == null) return node
Expand All @@ -493,7 +493,6 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
if (invokeImmediately) handler.invokeIt(rootCause)
return handle
} else {
val node = nodeCache ?: makeNode(handler, onCancelling).also { nodeCache = it }
if (addLastAtomic(state, list, node)) return node
}
}
Expand All @@ -508,16 +507,20 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
}
}

private fun makeNode(handler: CompletionHandler, onCancelling: Boolean): JobNode<*> {
return if (onCancelling)
(handler as? JobCancellingNode<*>)?.also { assert { it.job === this } }
?: InvokeOnCancelling(this, handler)
else
(handler as? JobNode<*>)?.also { assert { it.job === this && it !is JobCancellingNode } }
?: InvokeOnCompletion(this, handler)
private fun makeNode(handler: CompletionHandler, onCancelling: Boolean): JobNode {
val node = if (onCancelling) {
(handler as? JobCancellingNode)
?: InvokeOnCancelling(handler)
} else {
(handler as? JobNode)
?.also { assert { it !is JobCancellingNode } }
?: InvokeOnCompletion(handler)
}
node.job = this
return node
}

private fun addLastAtomic(expect: Any, list: NodeList, node: JobNode<*>) =
private fun addLastAtomic(expect: Any, list: NodeList, node: JobNode) =
list.addLastIf(node) { this.state === expect }

private fun promoteEmptyToNodeList(state: Empty) {
Expand All @@ -527,7 +530,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
_state.compareAndSet(state, update)
}

private fun promoteSingleToNodeList(state: JobNode<*>) {
private fun promoteSingleToNodeList(state: JobNode) {
// try to promote it to list (SINGLE+ state)
state.addOneIfEmpty(NodeList())
// it must be in SINGLE+ state or state has changed (node could have need removed from state)
Expand All @@ -553,7 +556,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren

private suspend fun joinSuspend() = suspendCancellableCoroutine<Unit> { cont ->
// We have to invoke join() handler only on cancellation, on completion we will be resumed regularly without handlers
cont.disposeOnCancellation(invokeOnCompletion(handler = ResumeOnCompletion(this, cont).asHandler))
cont.disposeOnCancellation(invokeOnCompletion(handler = ResumeOnCompletion(cont).asHandler))
}

public final override val onJoin: SelectClause0
Expand All @@ -573,7 +576,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
}
if (startInternal(state) == 0) {
// slow-path -- register waiter for completion
select.disposeOnSelect(invokeOnCompletion(handler = SelectJoinOnCompletion(this, select, block).asHandler))
select.disposeOnSelect(invokeOnCompletion(handler = SelectJoinOnCompletion(select, block).asHandler))
return
}
}
Expand All @@ -582,11 +585,11 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
/**
* @suppress **This is unstable API and it is subject to change.**
*/
internal fun removeNode(node: JobNode<*>) {
internal fun removeNode(node: JobNode) {
// remove logic depends on the state of the job
loopOnState { state ->
when (state) {
is JobNode<*> -> { // SINGE/SINGLE+ state -- one completion handler
is JobNode -> { // SINGE/SINGLE+ state -- one completion handler
if (state !== node) return // a different job node --> we were already removed
// try remove and revert back to empty state
if (_state.compareAndSet(state, EMPTY_ACTIVE)) return
Expand Down Expand Up @@ -770,7 +773,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
private fun getOrPromoteCancellingList(state: Incomplete): NodeList? = state.list ?:
when (state) {
is Empty -> NodeList() // we can allocate new empty list that'll get integrated into Cancelling state
is JobNode<*> -> {
is JobNode -> {
// SINGLE/SINGLE+ must be promoted to NodeList first, because otherwise we cannot
// correctly capture a reference to it
promoteSingleToNodeList(state)
Expand Down Expand Up @@ -849,7 +852,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
* Otherwise, there can be a race between (completed state -> handled exception and newly attached child/join)
* which may miss unhandled exception.
*/
if ((state is Empty || state is JobNode<*>) && state !is ChildHandleNode && proposedUpdate !is CompletedExceptionally) {
if ((state is Empty || state is JobNode) && state !is ChildHandleNode && proposedUpdate !is CompletedExceptionally) {
if (tryFinalizeSimpleState(state, proposedUpdate)) {
// Completed successfully on fast path -- return updated state
return proposedUpdate
Expand Down Expand Up @@ -964,7 +967,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
* If child is attached when the job is already being cancelled, such child will receive immediate notification on
* cancellation, but parent *will* wait for that child before completion and will handle its exception.
*/
return invokeOnCompletion(onCancelling = true, handler = ChildHandleNode(this, child).asHandler) as ChildHandle
return invokeOnCompletion(onCancelling = true, handler = ChildHandleNode(child).asHandler) as ChildHandle
}

/**
Expand Down Expand Up @@ -1147,7 +1150,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
private val state: Finishing,
private val child: ChildHandleNode,
private val proposedUpdate: Any?
) : JobNode<Job>(child.childJob) {
) : JobNode() {
override fun invoke(cause: Throwable?) {
parent.continueCompleting(state, child, proposedUpdate)
}
Expand Down Expand Up @@ -1225,7 +1228,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
* thrown and not a JobCancellationException.
*/
val cont = AwaitContinuation(uCont.intercepted(), this)
cont.disposeOnCancellation(invokeOnCompletion(ResumeAwaitOnCompletion(this, cont).asHandler))
cont.disposeOnCancellation(invokeOnCompletion(ResumeAwaitOnCompletion(cont).asHandler))
cont.getResult()
}

Expand All @@ -1252,7 +1255,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
}
if (startInternal(state) == 0) {
// slow-path -- register waiter for completion
select.disposeOnSelect(invokeOnCompletion(handler = SelectAwaitOnCompletion(this, select, block).asHandler))
select.disposeOnSelect(invokeOnCompletion(handler = SelectAwaitOnCompletion(select, block).asHandler))
return
}
}
Expand Down Expand Up @@ -1342,12 +1345,14 @@ internal interface Incomplete {
val list: NodeList? // is null only for Empty and JobNode incomplete state objects
}

internal abstract class JobNode<out J : Job>(
@JvmField val job: J
) : CompletionHandlerBase(), DisposableHandle, Incomplete {
internal abstract class JobNode : CompletionHandlerBase(), DisposableHandle, Incomplete {
/**
* Initialized by [JobSupport.makeNode].
*/
lateinit var job: JobSupport
override val isActive: Boolean get() = true
override val list: NodeList? get() = null
override fun dispose() = (job as JobSupport).removeNode(this)
override fun dispose() = job.removeNode(this)
override fun toString() = "$classSimpleName@$hexAddress[job@${job.hexAddress}]"
}

Expand All @@ -1360,7 +1365,7 @@ internal class NodeList : LockFreeLinkedListHead(), Incomplete {
append(state)
append("}[")
var first = true
this@NodeList.forEach<JobNode<*>> { node ->
this@NodeList.forEach<JobNode> { node ->
if (first) first = false else append(", ")
append(node)
}
Expand All @@ -1379,23 +1384,20 @@ internal class InactiveNodeList(
}

private class InvokeOnCompletion(
job: Job,
private val handler: CompletionHandler
) : JobNode<Job>(job) {
) : JobNode() {
override fun invoke(cause: Throwable?) = handler.invoke(cause)
}

private class ResumeOnCompletion(
job: Job,
private val continuation: Continuation<Unit>
) : JobNode<Job>(job) {
) : JobNode() {
override fun invoke(cause: Throwable?) = continuation.resume(Unit)
}

private class ResumeAwaitOnCompletion<T>(
job: JobSupport,
private val continuation: CancellableContinuationImpl<T>
) : JobNode<JobSupport>(job) {
) : JobNode() {
override fun invoke(cause: Throwable?) {
val state = job.state
assert { state !is Incomplete }
Expand All @@ -1411,28 +1413,25 @@ private class ResumeAwaitOnCompletion<T>(
}

internal class DisposeOnCompletion(
job: Job,
private val handle: DisposableHandle
) : JobNode<Job>(job) {
) : JobNode() {
override fun invoke(cause: Throwable?) = handle.dispose()
}

private class SelectJoinOnCompletion<R>(
job: JobSupport,
private val select: SelectInstance<R>,
private val block: suspend () -> R
) : JobNode<JobSupport>(job) {
) : JobNode() {
override fun invoke(cause: Throwable?) {
if (select.trySelect())
block.startCoroutineCancellable(select.completion)
}
}

private class SelectAwaitOnCompletion<T, R>(
job: JobSupport,
private val select: SelectInstance<R>,
private val block: suspend (T) -> R
) : JobNode<JobSupport>(job) {
) : JobNode() {
override fun invoke(cause: Throwable?) {
if (select.trySelect())
job.selectAwaitCompletion(select, block)
Expand All @@ -1445,12 +1444,11 @@ private class SelectAwaitOnCompletion<T, R>(
* Marker for node that shall be invoked on in _cancelling_ state.
* **Note: may be invoked multiple times.**
*/
internal abstract class JobCancellingNode<out J : Job>(job: J) : JobNode<J>(job)
internal abstract class JobCancellingNode : JobNode()

private class InvokeOnCancelling(
job: Job,
private val handler: CompletionHandler
) : JobCancellingNode<Job>(job) {
) : JobCancellingNode() {
// delegate handler shall be invoked at most once, so here is an additional flag
private val _invoked = atomic(0) // todo: replace with atomic boolean after migration to recent atomicFu
override fun invoke(cause: Throwable?) {
Expand All @@ -1459,18 +1457,16 @@ private class InvokeOnCancelling(
}

internal class ChildHandleNode(
parent: JobSupport,
@JvmField val childJob: ChildJob
) : JobCancellingNode<JobSupport>(parent), ChildHandle {
) : JobCancellingNode(), ChildHandle {
override fun invoke(cause: Throwable?) = childJob.parentCancelled(job)
override fun childCancelled(cause: Throwable): Boolean = job.childCancelled(cause)
}

// Same as ChildHandleNode, but for cancellable continuation
internal class ChildContinuation(
parent: Job,
@JvmField val child: CancellableContinuationImpl<*>
) : JobCancellingNode<Job>(parent) {
) : JobCancellingNode() {
override fun invoke(cause: Throwable?) {
child.parentCancelled(child.getContinuationCancellationCause(job))
}
Expand Down
6 changes: 3 additions & 3 deletions kotlinx-coroutines-core/common/src/selects/Select.kt
Expand Up @@ -327,13 +327,13 @@ internal class SelectBuilderImpl<in R>(
private fun initCancellability() {
val parent = context[Job] ?: return
val newRegistration = parent.invokeOnCompletion(
onCancelling = true, handler = SelectOnCancelling(parent).asHandler)
onCancelling = true, handler = SelectOnCancelling().asHandler)
parentHandle = newRegistration
// now check our state _after_ registering
if (isSelected) newRegistration.dispose()
}

private inner class SelectOnCancelling(job: Job) : JobCancellingNode<Job>(job) {
private inner class SelectOnCancelling : JobCancellingNode() {
// Note: may be invoked multiple times, but only the first trySelect succeeds anyway
override fun invoke(cause: Throwable?) {
if (trySelect())
Expand Down Expand Up @@ -552,7 +552,7 @@ internal class SelectBuilderImpl<in R>(
return decision
}

override val atomicOp: AtomicOp<*>?
override val atomicOp: AtomicOp<*>
get() = otherOp.atomicOp
}

Expand Down

0 comments on commit 1176267

Please sign in to comment.