From a4ae389503cc9b3c940e5d5539fe359b5e4a4950 Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Fri, 19 Nov 2021 18:03:59 +0300 Subject: [PATCH] Revert "JobSupport: non-atomic adding to a Job's list of listeners (#2096)" (#3034) This reverts commit 2b1e2765f61fcf830f144651c50479f64d4a8558. --- kotlinx-coroutines-core/common/src/Await.kt | 4 +- .../common/src/CompletionHandler.common.kt | 2 - .../common/src/JobSupport.kt | 163 +++++------------- .../src/internal/LockFreeLinkedList.common.kt | 1 + .../common/src/selects/Select.kt | 5 +- .../src/internal/LockFreeLinkedList.kt | 24 ++- .../test/internal/LockFreeLinkedListTest.kt | 14 ++ .../js/src/internal/LinkedList.kt | 6 + kotlinx-coroutines-core/jvm/src/Future.kt | 2 +- .../LockFreeLinkedListShortStressTest.kt | 25 ++- 10 files changed, 110 insertions(+), 136 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/Await.kt b/kotlinx-coroutines-core/common/src/Await.kt index 275db60f40..e06ed33025 100644 --- a/kotlinx-coroutines-core/common/src/Await.kt +++ b/kotlinx-coroutines-core/common/src/Await.kt @@ -107,8 +107,8 @@ private class AwaitAll(private val deferreds: Array>) { var disposer: DisposeHandlersOnCancel? get() = _disposer.value set(value) { _disposer.value = value } - - override fun invokeOnce(cause: Throwable?) { + + override fun invoke(cause: Throwable?) { if (cause != null) { val token = continuation.tryResumeWithException(cause) if (token != null) { diff --git a/kotlinx-coroutines-core/common/src/CompletionHandler.common.kt b/kotlinx-coroutines-core/common/src/CompletionHandler.common.kt index 8bc2596a76..e712ff1fc1 100644 --- a/kotlinx-coroutines-core/common/src/CompletionHandler.common.kt +++ b/kotlinx-coroutines-core/common/src/CompletionHandler.common.kt @@ -44,6 +44,4 @@ internal expect val CancelHandlerBase.asHandler: CompletionHandler // because we play type tricks on Kotlin/JS and handler is not necessarily a function there internal expect fun CompletionHandler.invokeIt(cause: Throwable?) -// :KLUDGE: We have to use `isHandlerOf` extension, because performing this type check directly in the code -// causes Incompatible types error during Kotlin/JS compilation internal inline fun CompletionHandler.isHandlerOf(): Boolean = this is T diff --git a/kotlinx-coroutines-core/common/src/JobSupport.kt b/kotlinx-coroutines-core/common/src/JobSupport.kt index e388f8e6a5..0a3dd23472 100644 --- a/kotlinx-coroutines-core/common/src/JobSupport.kt +++ b/kotlinx-coroutines-core/common/src/JobSupport.kt @@ -456,86 +456,51 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren // Create node upfront -- for common cases it just initializes JobNode.job field, // for user-defined handlers it allocates a JobNode object that we might not need, but this is Ok. val node: JobNode = makeNode(handler, onCancelling) - /* - * LinkedList algorithm does not support removing and re-adding the same node, - * so here we check whether the node is already added to the list to avoid adding the node twice. - */ - var isNodeAdded = false loopOnState { state -> when (state) { is Empty -> { // EMPTY_X state -- no completion handlers if (state.isActive) { // try move to SINGLE state if (_state.compareAndSet(state, node)) return node - } else { + } else promoteEmptyToNodeList(state) // that way we can add listener for non-active coroutine - } } is Incomplete -> { val list = state.list if (list == null) { // SINGLE/SINGLE+ promoteSingleToNodeList(state as JobNode) - return@loopOnState // retry - } - // ...else {, but without nesting - var rootCause: Throwable? = null - var handle: DisposableHandle = NonDisposableHandle - if (onCancelling && state is Finishing) { - synchronized(state) { - // check if we are installing cancellation handler on job that is being cancelled - rootCause = state.rootCause // != null if cancelling job - // We add node to the list in two cases --- either the job is not being cancelled - // or we are adding a child to a coroutine that is not completing yet - if (rootCause == null || handler.isHandlerOf() && !state.isCompleting) { - // Note: add node the list while holding lock on state (make sure it cannot change) - if (!isNodeAdded) list.addLast(node) - if (this.state !== state) { - /* - * Here we have an additional check for ChildCompletion. Invoking the handler once is not enough - * for this particular kind of node -- the caller makes a decision based on whether the node was added - * or not and that decision should be made **once**. - * To be more precise, the caller of ChildCompletion, in case when it's the last child, - * should make a decision whether to start transition to the final state, based on - * whether the ChildCompletion was added to the list or not. If not -- the JobNode.invoke will do that. - * See comment to JobNode.invoke, we cannot differentiate the situation when external state updater - * invoked or skipped the node, thus we additionally synchronize on 'markInvoked'. - */ - if (node is ChildCompletion && !node.markInvoked()) return node - // The state can only change to Complete here, so the node can stay in the list, just retry - return@loopOnState + } else { + var rootCause: Throwable? = null + var handle: DisposableHandle = NonDisposableHandle + if (onCancelling && state is Finishing) { + synchronized(state) { + // check if we are installing cancellation handler on job that is being cancelled + rootCause = state.rootCause // != null if cancelling job + // We add node to the list in two cases --- either the job is not being cancelled + // or we are adding a child to a coroutine that is not completing yet + if (rootCause == null || handler.isHandlerOf() && !state.isCompleting) { + // Note: add node the list while holding lock on state (make sure it cannot change) + if (!addLastAtomic(state, list, node)) return@loopOnState // retry + // just return node if we don't have to invoke handler (not cancelling yet) + if (rootCause == null) return node + // otherwise handler is invoked immediately out of the synchronized section & handle returned + handle = node } - // just return node if we don't have to invoke handler (not cancelling yet) - if (rootCause == null) return node - // otherwise handler is invoked immediately out of the synchronized section & handle returned - handle = node } } - } - if (rootCause != null) { - // Note: attachChild uses invokeImmediately, so it gets invoked when adding to cancelled job - if (invokeImmediately) { - node.invoke(rootCause) - } - return handle - } else { - if (!isNodeAdded) list.addLast(node) - if (this.state !== state) { - // Here again, we try to prevent concurrent finalization of the parent, - // if the parent fails to add ChildCompletion because the child changed it's state to Completed. - if (node is ChildCompletion && !node.markInvoked()) return node - // If the state has changed to Complete, the node can stay in the list, just retry. - if (this.state !is Incomplete) return@loopOnState - // If the state is Incomplete, set the flag that the node is already added to the list instead of removing it. - isNodeAdded = true + if (rootCause != null) { + // Note: attachChild uses invokeImmediately, so it gets invoked when adding to cancelled job + if (invokeImmediately) handler.invokeIt(rootCause) + return handle } else { - return node + if (addLastAtomic(state, list, node)) return node } } } else -> { // is complete - if (invokeImmediately) { - node.invoke((state as? CompletedExceptionally)?.cause) - } + // :KLUDGE: We have to invoke a handler in platform-specific way via `invokeIt` extension, + // because we play type tricks on Kotlin/JS and handler is not necessarily a function there + if (invokeImmediately) handler.invokeIt((state as? CompletedExceptionally)?.cause) return NonDisposableHandle } } @@ -555,6 +520,9 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren return node } + private fun addLastAtomic(expect: Any, list: NodeList, node: JobNode) = + list.addLastIf(node) { this.state === expect } + private fun promoteEmptyToNodeList(state: Empty) { // try to promote it to LIST state with the corresponding state val list = NodeList() @@ -628,13 +596,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren } is Incomplete -> { // may have a list of completion handlers // remove node from the list if there is a list - if (state.list != null) { - if (!node.remove() && node.isRemoved) { - // Note: .remove() returns 'false' if the node wasn't added at all, e.g. - // because it was an optimized "single element list" - node.helpRemove() - } - } + if (state.list != null) node.remove() return } else -> return // it is complete and does not have any completion handlers @@ -852,7 +814,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren } } } - } + } /** * Completes this job. Used by [AbstractCoroutine.resume]. @@ -1189,11 +1151,9 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren private val child: ChildHandleNode, private val proposedUpdate: Any? ) : JobNode() { - override fun invokeOnce(cause: Throwable?) { + override fun invoke(cause: Throwable?) { parent.continueCompleting(state, child, proposedUpdate) } - override fun toString(): String = - "ChildCompletion[$child, $proposedUpdate]" } private class AwaitContinuation( @@ -1395,31 +1355,6 @@ internal abstract class JobNode : CompletionHandlerBase(), DisposableHandle, Inc override val isActive: Boolean get() = true override val list: NodeList? get() = null override fun dispose() = job.removeNode(this) - private val isInvoked = atomic(false) - - /* - * This method is guaranteed to be invoked once per JobNode lifecycle. - */ - protected abstract fun invokeOnce(cause: Throwable?) - - /* - * This method can be invoked more than once thus it's protected - * with atomic flag. - * - * It can be invoked twice via [invokeOnCompletion(invokeImmediately = true)] - * when addLastIf fails due to the race with Job state update. - * In that case, we cannot distinguish the situation when the node was added - * and the external state updater actually invoked it from the situation - * when the state updater already invoked all elements from a list and then - * we added a new node to already abandoned list. - * So, when addLastIf fails, we invoke handler in-place, using - * markInvoked as protection against the former case. - */ - final override fun invoke(cause: Throwable?) { - if (!markInvoked()) return - invokeOnce(cause) - } - fun markInvoked() = isInvoked.compareAndSet(false, true) override fun toString() = "$classSimpleName@$hexAddress[job@${job.hexAddress}]" } @@ -1452,22 +1387,20 @@ internal class InactiveNodeList( private class InvokeOnCompletion( private val handler: CompletionHandler -) : JobNode() { - override fun invokeOnce(cause: Throwable?) = handler.invoke(cause) - override fun toString() = "InvokeOnCompletion[$classSimpleName@$hexAddress]" +) : JobNode() { + override fun invoke(cause: Throwable?) = handler.invoke(cause) } private class ResumeOnCompletion( private val continuation: Continuation ) : JobNode() { - override fun invokeOnce(cause: Throwable?) = continuation.resume(Unit) - override fun toString() = "ResumeOnCompletion[$continuation]" + override fun invoke(cause: Throwable?) = continuation.resume(Unit) } private class ResumeAwaitOnCompletion( private val continuation: CancellableContinuationImpl ) : JobNode() { - override fun invokeOnce(cause: Throwable?) { + override fun invoke(cause: Throwable?) { val state = job.state assert { state !is Incomplete } if (state is CompletedExceptionally) { @@ -1479,36 +1412,32 @@ private class ResumeAwaitOnCompletion( continuation.resume(state.unboxState() as T) } } - override fun toString() = "ResumeAwaitOnCompletion[$continuation]" } internal class DisposeOnCompletion( private val handle: DisposableHandle ) : JobNode() { - override fun invokeOnce(cause: Throwable?) = handle.dispose() - override fun toString(): String = "DisposeOnCompletion[${handle}]" + override fun invoke(cause: Throwable?) = handle.dispose() } private class SelectJoinOnCompletion( private val select: SelectInstance, private val block: suspend () -> R ) : JobNode() { - override fun invokeOnce(cause: Throwable?) { + override fun invoke(cause: Throwable?) { if (select.trySelect()) block.startCoroutineCancellable(select.completion) } - override fun toString(): String = "SelectJoinOnCompletion[$select]" } private class SelectAwaitOnCompletion( private val select: SelectInstance, private val block: suspend (T) -> R ) : JobNode() { - override fun invokeOnce(cause: Throwable?) { + override fun invoke(cause: Throwable?) { if (select.trySelect()) job.selectAwaitCompletion(select, block) } - override fun toString(): String = "SelectAwaitOnCompletion[$select]" } // -------- invokeOnCancellation nodes @@ -1521,28 +1450,28 @@ internal abstract class JobCancellingNode : JobNode() private class InvokeOnCancelling( private val handler: CompletionHandler -) : JobCancellingNode() { +) : JobCancellingNode() { // delegate handler shall be invoked at most once, so here is an additional flag - override fun invokeOnce(cause: Throwable?) = handler.invoke(cause) - override fun toString() = "InvokeOnCancelling[$classSimpleName@$hexAddress]" + private val _invoked = atomic(0) // todo: replace with atomic boolean after migration to recent atomicFu + override fun invoke(cause: Throwable?) { + if (_invoked.compareAndSet(0, 1)) handler.invoke(cause) + } } internal class ChildHandleNode( @JvmField val childJob: ChildJob ) : JobCancellingNode(), ChildHandle { override val parent: Job get() = job - override fun invokeOnce(cause: Throwable?) = childJob.parentCancelled(job) + override fun invoke(cause: Throwable?) = childJob.parentCancelled(job) override fun childCancelled(cause: Throwable): Boolean = job.childCancelled(cause) - override fun toString(): String = "ChildHandle[$childJob]" } // Same as ChildHandleNode, but for cancellable continuation internal class ChildContinuation( @JvmField val child: CancellableContinuationImpl<*> ) : JobCancellingNode() { - override fun invokeOnce(cause: Throwable?) = + override fun invoke(cause: Throwable?) { child.parentCancelled(child.getContinuationCancellationCause(job)) - override fun toString(): String = - "ChildContinuation[$child]" + } } diff --git a/kotlinx-coroutines-core/common/src/internal/LockFreeLinkedList.common.kt b/kotlinx-coroutines-core/common/src/internal/LockFreeLinkedList.common.kt index 4019ef6156..8b20ade1f0 100644 --- a/kotlinx-coroutines-core/common/src/internal/LockFreeLinkedList.common.kt +++ b/kotlinx-coroutines-core/common/src/internal/LockFreeLinkedList.common.kt @@ -15,6 +15,7 @@ public expect open class LockFreeLinkedListNode() { public val prevNode: LockFreeLinkedListNode public fun addLast(node: LockFreeLinkedListNode) public fun addOneIfEmpty(node: LockFreeLinkedListNode): Boolean + public inline fun addLastIf(node: LockFreeLinkedListNode, crossinline condition: () -> Boolean): Boolean public inline fun addLastIfPrev( node: LockFreeLinkedListNode, predicate: (LockFreeLinkedListNode) -> Boolean diff --git a/kotlinx-coroutines-core/common/src/selects/Select.kt b/kotlinx-coroutines-core/common/src/selects/Select.kt index 87ac94ec34..a313de3d5d 100644 --- a/kotlinx-coroutines-core/common/src/selects/Select.kt +++ b/kotlinx-coroutines-core/common/src/selects/Select.kt @@ -336,10 +336,9 @@ internal class SelectBuilderImpl( private inner class SelectOnCancelling : JobCancellingNode() { // Note: may be invoked multiple times, but only the first trySelect succeeds anyway - override fun invokeOnce(cause: Throwable?) { - if (trySelect()) { + override fun invoke(cause: Throwable?) { + if (trySelect()) resumeSelectWithException(job.getCancellationException()) - } } } diff --git a/kotlinx-coroutines-core/concurrent/src/internal/LockFreeLinkedList.kt b/kotlinx-coroutines-core/concurrent/src/internal/LockFreeLinkedList.kt index c6ce278987..b4b36dad34 100644 --- a/kotlinx-coroutines-core/concurrent/src/internal/LockFreeLinkedList.kt +++ b/kotlinx-coroutines-core/concurrent/src/internal/LockFreeLinkedList.kt @@ -86,6 +86,12 @@ public actual open class LockFreeLinkedListNode { } } + @PublishedApi + internal inline fun makeCondAddOp(node: Node, crossinline condition: () -> Boolean): CondAddOp = + object : CondAddOp(node) { + override fun prepare(affected: Node): Any? = if (condition()) null else CONDITION_FALSE + } + public actual open val isRemoved: Boolean get() = next is Removed // LINEARIZABLE. Returns Node | Removed @@ -141,6 +147,20 @@ public actual open class LockFreeLinkedListNode { public fun describeAddLast(node: T): AddLastDesc = AddLastDesc(this, node) + /** + * Adds last item to this list atomically if the [condition] is true. + */ + public actual inline fun addLastIf(node: Node, crossinline condition: () -> Boolean): Boolean { + val condAdd = makeCondAddOp(node, condition) + while (true) { // lock-free loop on prev.next + val prev = prevNode // sentinel node is never removed, so prev is always defined + when (prev.tryCondAddNext(node, this, condAdd)) { + SUCCESS -> return true + FAILURE -> return false + } + } + } + public actual inline fun addLastIfPrev(node: Node, predicate: (Node) -> Boolean): Boolean { while (true) { // lock-free loop on prev.next val prev = prevNode // sentinel node is never removed, so prev is always defined @@ -154,9 +174,7 @@ public actual open class LockFreeLinkedListNode { predicate: (Node) -> Boolean, // prev node predicate crossinline condition: () -> Boolean // atomically checked condition ): Boolean { - val condAdd = object : CondAddOp(node) { - override fun prepare(affected: Node): Any? = if (condition()) null else CONDITION_FALSE - } + val condAdd = makeCondAddOp(node, condition) while (true) { // lock-free loop on prev.next val prev = prevNode // sentinel node is never removed, so prev is always defined if (!predicate(prev)) return false diff --git a/kotlinx-coroutines-core/concurrent/test/internal/LockFreeLinkedListTest.kt b/kotlinx-coroutines-core/concurrent/test/internal/LockFreeLinkedListTest.kt index ebf9f44cac..7e85d495fc 100644 --- a/kotlinx-coroutines-core/concurrent/test/internal/LockFreeLinkedListTest.kt +++ b/kotlinx-coroutines-core/concurrent/test/internal/LockFreeLinkedListTest.kt @@ -32,6 +32,20 @@ class LockFreeLinkedListTest { assertContents(list) } + @Test + fun testCondOps() { + val list = LockFreeLinkedListHead() + assertContents(list) + assertTrue(list.addLastIf(IntNode(1)) { true }) + assertContents(list, 1) + assertFalse(list.addLastIf(IntNode(2)) { false }) + assertContents(list, 1) + assertTrue(list.addLastIf(IntNode(3)) { true }) + assertContents(list, 1, 3) + assertFalse(list.addLastIf(IntNode(4)) { false }) + assertContents(list, 1, 3) + } + @Test fun testAtomicOpsSingle() { val list = LockFreeLinkedListHead() diff --git a/kotlinx-coroutines-core/js/src/internal/LinkedList.kt b/kotlinx-coroutines-core/js/src/internal/LinkedList.kt index 73af5d78a1..d8c07f4e19 100644 --- a/kotlinx-coroutines-core/js/src/internal/LinkedList.kt +++ b/kotlinx-coroutines-core/js/src/internal/LinkedList.kt @@ -59,6 +59,12 @@ public open class LinkedListNode { return true } + public inline fun addLastIf(node: Node, crossinline condition: () -> Boolean): Boolean { + if (!condition()) return false + addLast(node) + return true + } + public inline fun addLastIfPrev(node: Node, predicate: (Node) -> Boolean): Boolean { if (!predicate(_prev)) return false addLast(node) diff --git a/kotlinx-coroutines-core/jvm/src/Future.kt b/kotlinx-coroutines-core/jvm/src/Future.kt index f935c2a4dc..b27a970845 100644 --- a/kotlinx-coroutines-core/jvm/src/Future.kt +++ b/kotlinx-coroutines-core/jvm/src/Future.kt @@ -35,7 +35,7 @@ public fun CancellableContinuation<*>.cancelFutureOnCancellation(future: Future< private class CancelFutureOnCompletion( private val future: Future<*> ) : JobNode() { - override fun invokeOnce(cause: Throwable?) { + override fun invoke(cause: Throwable?) { // Don't interrupt when cancelling future on completion, because no one is going to reset this // interruption flag and it will cause spurious failures elsewhere if (cause != null) future.cancel(false) diff --git a/kotlinx-coroutines-core/jvm/test/internal/LockFreeLinkedListShortStressTest.kt b/kotlinx-coroutines-core/jvm/test/internal/LockFreeLinkedListShortStressTest.kt index e692cb4c64..2ac51b9b1d 100644 --- a/kotlinx-coroutines-core/jvm/test/internal/LockFreeLinkedListShortStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/internal/LockFreeLinkedListShortStressTest.kt @@ -40,14 +40,23 @@ class LockFreeLinkedListShortStressTest : TestBase() { threads += thread(start = false, name = "adder-$threadId") { val rnd = Random() while (System.currentTimeMillis() < deadline) { - val node = IntNode(threadId) - list.addLast(node) - if (node.remove()) { - undone.incrementAndGet() - } else { - // randomly help other removal's completion - if (rnd.nextBoolean()) node.helpRemove() - missed.incrementAndGet() + var node: IntNode? = IntNode(threadId) + when (rnd.nextInt(3)) { + 0 -> list.addLast(node!!) + 1 -> assertTrue(list.addLastIf(node!!, { true })) // just to test conditional add + 2 -> { // just to test failed conditional add + assertFalse(list.addLastIf(node!!, { false })) + node = null + } + } + if (node != null) { + if (node.remove()) { + undone.incrementAndGet() + } else { + // randomly help other removal's completion + if (rnd.nextBoolean()) node.helpRemove() + missed.incrementAndGet() + } } } completedAdder.incrementAndGet()