Skip to content

Commit

Permalink
Revert "JobSupport: non-atomic adding to a Job's list of listeners (#…
Browse files Browse the repository at this point in the history
…2096)" (#3034)

This reverts commit 2b1e276.
  • Loading branch information
qwwdfsad committed Nov 19, 2021
1 parent 9d1be50 commit a4ae389
Show file tree
Hide file tree
Showing 10 changed files with 110 additions and 136 deletions.
4 changes: 2 additions & 2 deletions kotlinx-coroutines-core/common/src/Await.kt
Expand Up @@ -107,8 +107,8 @@ private class AwaitAll<T>(private val deferreds: Array<out Deferred<T>>) {
var disposer: DisposeHandlersOnCancel?
get() = _disposer.value
set(value) { _disposer.value = value }

override fun invokeOnce(cause: Throwable?) {
override fun invoke(cause: Throwable?) {
if (cause != null) {
val token = continuation.tryResumeWithException(cause)
if (token != null) {
Expand Down
Expand Up @@ -44,6 +44,4 @@ internal expect val CancelHandlerBase.asHandler: CompletionHandler
// because we play type tricks on Kotlin/JS and handler is not necessarily a function there
internal expect fun CompletionHandler.invokeIt(cause: Throwable?)

// :KLUDGE: We have to use `isHandlerOf` extension, because performing this type check directly in the code
// causes Incompatible types error during Kotlin/JS compilation
internal inline fun <reified T> CompletionHandler.isHandlerOf(): Boolean = this is T
163 changes: 46 additions & 117 deletions kotlinx-coroutines-core/common/src/JobSupport.kt
Expand Up @@ -456,86 +456,51 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
// Create node upfront -- for common cases it just initializes JobNode.job field,
// for user-defined handlers it allocates a JobNode object that we might not need, but this is Ok.
val node: JobNode = makeNode(handler, onCancelling)
/*
* LinkedList algorithm does not support removing and re-adding the same node,
* so here we check whether the node is already added to the list to avoid adding the node twice.
*/
var isNodeAdded = false
loopOnState { state ->
when (state) {
is Empty -> { // EMPTY_X state -- no completion handlers
if (state.isActive) {
// try move to SINGLE state
if (_state.compareAndSet(state, node)) return node
} else {
} else
promoteEmptyToNodeList(state) // that way we can add listener for non-active coroutine
}
}
is Incomplete -> {
val list = state.list
if (list == null) { // SINGLE/SINGLE+
promoteSingleToNodeList(state as JobNode)
return@loopOnState // retry
}
// ...else {, but without nesting
var rootCause: Throwable? = null
var handle: DisposableHandle = NonDisposableHandle
if (onCancelling && state is Finishing) {
synchronized(state) {
// check if we are installing cancellation handler on job that is being cancelled
rootCause = state.rootCause // != null if cancelling job
// We add node to the list in two cases --- either the job is not being cancelled
// or we are adding a child to a coroutine that is not completing yet
if (rootCause == null || handler.isHandlerOf<ChildHandleNode>() && !state.isCompleting) {
// Note: add node the list while holding lock on state (make sure it cannot change)
if (!isNodeAdded) list.addLast(node)
if (this.state !== state) {
/*
* Here we have an additional check for ChildCompletion. Invoking the handler once is not enough
* for this particular kind of node -- the caller makes a decision based on whether the node was added
* or not and that decision should be made **once**.
* To be more precise, the caller of ChildCompletion, in case when it's the last child,
* should make a decision whether to start transition to the final state, based on
* whether the ChildCompletion was added to the list or not. If not -- the JobNode.invoke will do that.
* See comment to JobNode.invoke, we cannot differentiate the situation when external state updater
* invoked or skipped the node, thus we additionally synchronize on 'markInvoked'.
*/
if (node is ChildCompletion && !node.markInvoked()) return node
// The state can only change to Complete here, so the node can stay in the list, just retry
return@loopOnState
} else {
var rootCause: Throwable? = null
var handle: DisposableHandle = NonDisposableHandle
if (onCancelling && state is Finishing) {
synchronized(state) {
// check if we are installing cancellation handler on job that is being cancelled
rootCause = state.rootCause // != null if cancelling job
// We add node to the list in two cases --- either the job is not being cancelled
// or we are adding a child to a coroutine that is not completing yet
if (rootCause == null || handler.isHandlerOf<ChildHandleNode>() && !state.isCompleting) {
// Note: add node the list while holding lock on state (make sure it cannot change)
if (!addLastAtomic(state, list, node)) return@loopOnState // retry
// just return node if we don't have to invoke handler (not cancelling yet)
if (rootCause == null) return node
// otherwise handler is invoked immediately out of the synchronized section & handle returned
handle = node
}
// just return node if we don't have to invoke handler (not cancelling yet)
if (rootCause == null) return node
// otherwise handler is invoked immediately out of the synchronized section & handle returned
handle = node
}
}
}
if (rootCause != null) {
// Note: attachChild uses invokeImmediately, so it gets invoked when adding to cancelled job
if (invokeImmediately) {
node.invoke(rootCause)
}
return handle
} else {
if (!isNodeAdded) list.addLast(node)
if (this.state !== state) {
// Here again, we try to prevent concurrent finalization of the parent,
// if the parent fails to add ChildCompletion because the child changed it's state to Completed.
if (node is ChildCompletion && !node.markInvoked()) return node
// If the state has changed to Complete, the node can stay in the list, just retry.
if (this.state !is Incomplete) return@loopOnState
// If the state is Incomplete, set the flag that the node is already added to the list instead of removing it.
isNodeAdded = true
if (rootCause != null) {
// Note: attachChild uses invokeImmediately, so it gets invoked when adding to cancelled job
if (invokeImmediately) handler.invokeIt(rootCause)
return handle
} else {
return node
if (addLastAtomic(state, list, node)) return node
}
}
}
else -> { // is complete
if (invokeImmediately) {
node.invoke((state as? CompletedExceptionally)?.cause)
}
// :KLUDGE: We have to invoke a handler in platform-specific way via `invokeIt` extension,
// because we play type tricks on Kotlin/JS and handler is not necessarily a function there
if (invokeImmediately) handler.invokeIt((state as? CompletedExceptionally)?.cause)
return NonDisposableHandle
}
}
Expand All @@ -555,6 +520,9 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
return node
}

private fun addLastAtomic(expect: Any, list: NodeList, node: JobNode) =
list.addLastIf(node) { this.state === expect }

private fun promoteEmptyToNodeList(state: Empty) {
// try to promote it to LIST state with the corresponding state
val list = NodeList()
Expand Down Expand Up @@ -628,13 +596,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
}
is Incomplete -> { // may have a list of completion handlers
// remove node from the list if there is a list
if (state.list != null) {
if (!node.remove() && node.isRemoved) {
// Note: .remove() returns 'false' if the node wasn't added at all, e.g.
// because it was an optimized "single element list"
node.helpRemove()
}
}
if (state.list != null) node.remove()
return
}
else -> return // it is complete and does not have any completion handlers
Expand Down Expand Up @@ -852,7 +814,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
}
}
}
}
}

/**
* Completes this job. Used by [AbstractCoroutine.resume].
Expand Down Expand Up @@ -1189,11 +1151,9 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
private val child: ChildHandleNode,
private val proposedUpdate: Any?
) : JobNode() {
override fun invokeOnce(cause: Throwable?) {
override fun invoke(cause: Throwable?) {
parent.continueCompleting(state, child, proposedUpdate)
}
override fun toString(): String =
"ChildCompletion[$child, $proposedUpdate]"
}

private class AwaitContinuation<T>(
Expand Down Expand Up @@ -1395,31 +1355,6 @@ internal abstract class JobNode : CompletionHandlerBase(), DisposableHandle, Inc
override val isActive: Boolean get() = true
override val list: NodeList? get() = null
override fun dispose() = job.removeNode(this)
private val isInvoked = atomic(false)

/*
* This method is guaranteed to be invoked once per JobNode lifecycle.
*/
protected abstract fun invokeOnce(cause: Throwable?)

/*
* This method can be invoked more than once thus it's protected
* with atomic flag.
*
* It can be invoked twice via [invokeOnCompletion(invokeImmediately = true)]
* when addLastIf fails due to the race with Job state update.
* In that case, we cannot distinguish the situation when the node was added
* and the external state updater actually invoked it from the situation
* when the state updater already invoked all elements from a list and then
* we added a new node to already abandoned list.
* So, when addLastIf fails, we invoke handler in-place, using
* markInvoked as protection against the former case.
*/
final override fun invoke(cause: Throwable?) {
if (!markInvoked()) return
invokeOnce(cause)
}
fun markInvoked() = isInvoked.compareAndSet(false, true)
override fun toString() = "$classSimpleName@$hexAddress[job@${job.hexAddress}]"
}

Expand Down Expand Up @@ -1452,22 +1387,20 @@ internal class InactiveNodeList(

private class InvokeOnCompletion(
private val handler: CompletionHandler
) : JobNode() {
override fun invokeOnce(cause: Throwable?) = handler.invoke(cause)
override fun toString() = "InvokeOnCompletion[$classSimpleName@$hexAddress]"
) : JobNode() {
override fun invoke(cause: Throwable?) = handler.invoke(cause)
}

private class ResumeOnCompletion(
private val continuation: Continuation<Unit>
) : JobNode() {
override fun invokeOnce(cause: Throwable?) = continuation.resume(Unit)
override fun toString() = "ResumeOnCompletion[$continuation]"
override fun invoke(cause: Throwable?) = continuation.resume(Unit)
}

private class ResumeAwaitOnCompletion<T>(
private val continuation: CancellableContinuationImpl<T>
) : JobNode() {
override fun invokeOnce(cause: Throwable?) {
override fun invoke(cause: Throwable?) {
val state = job.state
assert { state !is Incomplete }
if (state is CompletedExceptionally) {
Expand All @@ -1479,36 +1412,32 @@ private class ResumeAwaitOnCompletion<T>(
continuation.resume(state.unboxState() as T)
}
}
override fun toString() = "ResumeAwaitOnCompletion[$continuation]"
}

internal class DisposeOnCompletion(
private val handle: DisposableHandle
) : JobNode() {
override fun invokeOnce(cause: Throwable?) = handle.dispose()
override fun toString(): String = "DisposeOnCompletion[${handle}]"
override fun invoke(cause: Throwable?) = handle.dispose()
}

private class SelectJoinOnCompletion<R>(
private val select: SelectInstance<R>,
private val block: suspend () -> R
) : JobNode() {
override fun invokeOnce(cause: Throwable?) {
override fun invoke(cause: Throwable?) {
if (select.trySelect())
block.startCoroutineCancellable(select.completion)
}
override fun toString(): String = "SelectJoinOnCompletion[$select]"
}

private class SelectAwaitOnCompletion<T, R>(
private val select: SelectInstance<R>,
private val block: suspend (T) -> R
) : JobNode() {
override fun invokeOnce(cause: Throwable?) {
override fun invoke(cause: Throwable?) {
if (select.trySelect())
job.selectAwaitCompletion(select, block)
}
override fun toString(): String = "SelectAwaitOnCompletion[$select]"
}

// -------- invokeOnCancellation nodes
Expand All @@ -1521,28 +1450,28 @@ internal abstract class JobCancellingNode : JobNode()

private class InvokeOnCancelling(
private val handler: CompletionHandler
) : JobCancellingNode() {
) : JobCancellingNode() {
// delegate handler shall be invoked at most once, so here is an additional flag
override fun invokeOnce(cause: Throwable?) = handler.invoke(cause)
override fun toString() = "InvokeOnCancelling[$classSimpleName@$hexAddress]"
private val _invoked = atomic(0) // todo: replace with atomic boolean after migration to recent atomicFu
override fun invoke(cause: Throwable?) {
if (_invoked.compareAndSet(0, 1)) handler.invoke(cause)
}
}

internal class ChildHandleNode(
@JvmField val childJob: ChildJob
) : JobCancellingNode(), ChildHandle {
override val parent: Job get() = job
override fun invokeOnce(cause: Throwable?) = childJob.parentCancelled(job)
override fun invoke(cause: Throwable?) = childJob.parentCancelled(job)
override fun childCancelled(cause: Throwable): Boolean = job.childCancelled(cause)
override fun toString(): String = "ChildHandle[$childJob]"
}

// Same as ChildHandleNode, but for cancellable continuation
internal class ChildContinuation(
@JvmField val child: CancellableContinuationImpl<*>
) : JobCancellingNode() {
override fun invokeOnce(cause: Throwable?) =
override fun invoke(cause: Throwable?) {
child.parentCancelled(child.getContinuationCancellationCause(job))
override fun toString(): String =
"ChildContinuation[$child]"
}
}

Expand Up @@ -15,6 +15,7 @@ public expect open class LockFreeLinkedListNode() {
public val prevNode: LockFreeLinkedListNode
public fun addLast(node: LockFreeLinkedListNode)
public fun addOneIfEmpty(node: LockFreeLinkedListNode): Boolean
public inline fun addLastIf(node: LockFreeLinkedListNode, crossinline condition: () -> Boolean): Boolean
public inline fun addLastIfPrev(
node: LockFreeLinkedListNode,
predicate: (LockFreeLinkedListNode) -> Boolean
Expand Down
5 changes: 2 additions & 3 deletions kotlinx-coroutines-core/common/src/selects/Select.kt
Expand Up @@ -336,10 +336,9 @@ internal class SelectBuilderImpl<in R>(

private inner class SelectOnCancelling : JobCancellingNode() {
// Note: may be invoked multiple times, but only the first trySelect succeeds anyway
override fun invokeOnce(cause: Throwable?) {
if (trySelect()) {
override fun invoke(cause: Throwable?) {
if (trySelect())
resumeSelectWithException(job.getCancellationException())
}
}
}

Expand Down
Expand Up @@ -86,6 +86,12 @@ public actual open class LockFreeLinkedListNode {
}
}

@PublishedApi
internal inline fun makeCondAddOp(node: Node, crossinline condition: () -> Boolean): CondAddOp =
object : CondAddOp(node) {
override fun prepare(affected: Node): Any? = if (condition()) null else CONDITION_FALSE
}

public actual open val isRemoved: Boolean get() = next is Removed

// LINEARIZABLE. Returns Node | Removed
Expand Down Expand Up @@ -141,6 +147,20 @@ public actual open class LockFreeLinkedListNode {

public fun <T : Node> describeAddLast(node: T): AddLastDesc<T> = AddLastDesc(this, node)

/**
* Adds last item to this list atomically if the [condition] is true.
*/
public actual inline fun addLastIf(node: Node, crossinline condition: () -> Boolean): Boolean {
val condAdd = makeCondAddOp(node, condition)
while (true) { // lock-free loop on prev.next
val prev = prevNode // sentinel node is never removed, so prev is always defined
when (prev.tryCondAddNext(node, this, condAdd)) {
SUCCESS -> return true
FAILURE -> return false
}
}
}

public actual inline fun addLastIfPrev(node: Node, predicate: (Node) -> Boolean): Boolean {
while (true) { // lock-free loop on prev.next
val prev = prevNode // sentinel node is never removed, so prev is always defined
Expand All @@ -154,9 +174,7 @@ public actual open class LockFreeLinkedListNode {
predicate: (Node) -> Boolean, // prev node predicate
crossinline condition: () -> Boolean // atomically checked condition
): Boolean {
val condAdd = object : CondAddOp(node) {
override fun prepare(affected: Node): Any? = if (condition()) null else CONDITION_FALSE
}
val condAdd = makeCondAddOp(node, condition)
while (true) { // lock-free loop on prev.next
val prev = prevNode // sentinel node is never removed, so prev is always defined
if (!predicate(prev)) return false
Expand Down
Expand Up @@ -32,6 +32,20 @@ class LockFreeLinkedListTest {
assertContents(list)
}

@Test
fun testCondOps() {
val list = LockFreeLinkedListHead()
assertContents(list)
assertTrue(list.addLastIf(IntNode(1)) { true })
assertContents(list, 1)
assertFalse(list.addLastIf(IntNode(2)) { false })
assertContents(list, 1)
assertTrue(list.addLastIf(IntNode(3)) { true })
assertContents(list, 1, 3)
assertFalse(list.addLastIf(IntNode(4)) { false })
assertContents(list, 1, 3)
}

@Test
fun testAtomicOpsSingle() {
val list = LockFreeLinkedListHead()
Expand Down

0 comments on commit a4ae389

Please sign in to comment.