From f2ef0d6eb4bb4c2f6b382346da52a0709c10a1f1 Mon Sep 17 00:00:00 2001 From: SokolovaMaria Date: Tue, 16 Nov 2021 16:49:21 +0300 Subject: [PATCH] JobSupport: non-atomic adding to a Job's list of listeners (#2096) * Get rid of DCSS (double compare single swap) primitive in JobSupport in order to reduce both b binary size and performance of list addition Co-authored-by: SokolovaMaria Co-authored-by: Vsevolod Tolstopyatov --- kotlinx-coroutines-core/common/src/Await.kt | 4 +- .../common/src/CompletionHandler.common.kt | 2 + .../common/src/JobSupport.kt | 163 +++++++++++++----- .../src/internal/LockFreeLinkedList.common.kt | 1 - .../common/src/selects/Select.kt | 5 +- .../src/internal/LockFreeLinkedList.kt | 24 +-- .../test/internal/LockFreeLinkedListTest.kt | 14 -- .../js/src/internal/LinkedList.kt | 6 - kotlinx-coroutines-core/jvm/src/Future.kt | 2 +- .../LockFreeLinkedListShortStressTest.kt | 25 +-- 10 files changed, 136 insertions(+), 110 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/Await.kt b/kotlinx-coroutines-core/common/src/Await.kt index e06ed33025..275db60f40 100644 --- a/kotlinx-coroutines-core/common/src/Await.kt +++ b/kotlinx-coroutines-core/common/src/Await.kt @@ -107,8 +107,8 @@ private class AwaitAll(private val deferreds: Array>) { var disposer: DisposeHandlersOnCancel? get() = _disposer.value set(value) { _disposer.value = value } - - override fun invoke(cause: Throwable?) { + + override fun invokeOnce(cause: Throwable?) { if (cause != null) { val token = continuation.tryResumeWithException(cause) if (token != null) { diff --git a/kotlinx-coroutines-core/common/src/CompletionHandler.common.kt b/kotlinx-coroutines-core/common/src/CompletionHandler.common.kt index e712ff1fc1..8bc2596a76 100644 --- a/kotlinx-coroutines-core/common/src/CompletionHandler.common.kt +++ b/kotlinx-coroutines-core/common/src/CompletionHandler.common.kt @@ -44,4 +44,6 @@ internal expect val CancelHandlerBase.asHandler: CompletionHandler // because we play type tricks on Kotlin/JS and handler is not necessarily a function there internal expect fun CompletionHandler.invokeIt(cause: Throwable?) +// :KLUDGE: We have to use `isHandlerOf` extension, because performing this type check directly in the code +// causes Incompatible types error during Kotlin/JS compilation internal inline fun CompletionHandler.isHandlerOf(): Boolean = this is T diff --git a/kotlinx-coroutines-core/common/src/JobSupport.kt b/kotlinx-coroutines-core/common/src/JobSupport.kt index 0a3dd23472..e388f8e6a5 100644 --- a/kotlinx-coroutines-core/common/src/JobSupport.kt +++ b/kotlinx-coroutines-core/common/src/JobSupport.kt @@ -456,51 +456,86 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren // Create node upfront -- for common cases it just initializes JobNode.job field, // for user-defined handlers it allocates a JobNode object that we might not need, but this is Ok. val node: JobNode = makeNode(handler, onCancelling) + /* + * LinkedList algorithm does not support removing and re-adding the same node, + * so here we check whether the node is already added to the list to avoid adding the node twice. + */ + var isNodeAdded = false loopOnState { state -> when (state) { is Empty -> { // EMPTY_X state -- no completion handlers if (state.isActive) { // try move to SINGLE state if (_state.compareAndSet(state, node)) return node - } else + } else { promoteEmptyToNodeList(state) // that way we can add listener for non-active coroutine + } } is Incomplete -> { val list = state.list if (list == null) { // SINGLE/SINGLE+ promoteSingleToNodeList(state as JobNode) - } else { - var rootCause: Throwable? = null - var handle: DisposableHandle = NonDisposableHandle - if (onCancelling && state is Finishing) { - synchronized(state) { - // check if we are installing cancellation handler on job that is being cancelled - rootCause = state.rootCause // != null if cancelling job - // We add node to the list in two cases --- either the job is not being cancelled - // or we are adding a child to a coroutine that is not completing yet - if (rootCause == null || handler.isHandlerOf() && !state.isCompleting) { - // Note: add node the list while holding lock on state (make sure it cannot change) - if (!addLastAtomic(state, list, node)) return@loopOnState // retry - // just return node if we don't have to invoke handler (not cancelling yet) - if (rootCause == null) return node - // otherwise handler is invoked immediately out of the synchronized section & handle returned - handle = node + return@loopOnState // retry + } + // ...else {, but without nesting + var rootCause: Throwable? = null + var handle: DisposableHandle = NonDisposableHandle + if (onCancelling && state is Finishing) { + synchronized(state) { + // check if we are installing cancellation handler on job that is being cancelled + rootCause = state.rootCause // != null if cancelling job + // We add node to the list in two cases --- either the job is not being cancelled + // or we are adding a child to a coroutine that is not completing yet + if (rootCause == null || handler.isHandlerOf() && !state.isCompleting) { + // Note: add node the list while holding lock on state (make sure it cannot change) + if (!isNodeAdded) list.addLast(node) + if (this.state !== state) { + /* + * Here we have an additional check for ChildCompletion. Invoking the handler once is not enough + * for this particular kind of node -- the caller makes a decision based on whether the node was added + * or not and that decision should be made **once**. + * To be more precise, the caller of ChildCompletion, in case when it's the last child, + * should make a decision whether to start transition to the final state, based on + * whether the ChildCompletion was added to the list or not. If not -- the JobNode.invoke will do that. + * See comment to JobNode.invoke, we cannot differentiate the situation when external state updater + * invoked or skipped the node, thus we additionally synchronize on 'markInvoked'. + */ + if (node is ChildCompletion && !node.markInvoked()) return node + // The state can only change to Complete here, so the node can stay in the list, just retry + return@loopOnState } + // just return node if we don't have to invoke handler (not cancelling yet) + if (rootCause == null) return node + // otherwise handler is invoked immediately out of the synchronized section & handle returned + handle = node } } - if (rootCause != null) { - // Note: attachChild uses invokeImmediately, so it gets invoked when adding to cancelled job - if (invokeImmediately) handler.invokeIt(rootCause) - return handle + } + if (rootCause != null) { + // Note: attachChild uses invokeImmediately, so it gets invoked when adding to cancelled job + if (invokeImmediately) { + node.invoke(rootCause) + } + return handle + } else { + if (!isNodeAdded) list.addLast(node) + if (this.state !== state) { + // Here again, we try to prevent concurrent finalization of the parent, + // if the parent fails to add ChildCompletion because the child changed it's state to Completed. + if (node is ChildCompletion && !node.markInvoked()) return node + // If the state has changed to Complete, the node can stay in the list, just retry. + if (this.state !is Incomplete) return@loopOnState + // If the state is Incomplete, set the flag that the node is already added to the list instead of removing it. + isNodeAdded = true } else { - if (addLastAtomic(state, list, node)) return node + return node } } } else -> { // is complete - // :KLUDGE: We have to invoke a handler in platform-specific way via `invokeIt` extension, - // because we play type tricks on Kotlin/JS and handler is not necessarily a function there - if (invokeImmediately) handler.invokeIt((state as? CompletedExceptionally)?.cause) + if (invokeImmediately) { + node.invoke((state as? CompletedExceptionally)?.cause) + } return NonDisposableHandle } } @@ -520,9 +555,6 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren return node } - private fun addLastAtomic(expect: Any, list: NodeList, node: JobNode) = - list.addLastIf(node) { this.state === expect } - private fun promoteEmptyToNodeList(state: Empty) { // try to promote it to LIST state with the corresponding state val list = NodeList() @@ -596,7 +628,13 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren } is Incomplete -> { // may have a list of completion handlers // remove node from the list if there is a list - if (state.list != null) node.remove() + if (state.list != null) { + if (!node.remove() && node.isRemoved) { + // Note: .remove() returns 'false' if the node wasn't added at all, e.g. + // because it was an optimized "single element list" + node.helpRemove() + } + } return } else -> return // it is complete and does not have any completion handlers @@ -814,7 +852,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren } } } - } + } /** * Completes this job. Used by [AbstractCoroutine.resume]. @@ -1151,9 +1189,11 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren private val child: ChildHandleNode, private val proposedUpdate: Any? ) : JobNode() { - override fun invoke(cause: Throwable?) { + override fun invokeOnce(cause: Throwable?) { parent.continueCompleting(state, child, proposedUpdate) } + override fun toString(): String = + "ChildCompletion[$child, $proposedUpdate]" } private class AwaitContinuation( @@ -1355,6 +1395,31 @@ internal abstract class JobNode : CompletionHandlerBase(), DisposableHandle, Inc override val isActive: Boolean get() = true override val list: NodeList? get() = null override fun dispose() = job.removeNode(this) + private val isInvoked = atomic(false) + + /* + * This method is guaranteed to be invoked once per JobNode lifecycle. + */ + protected abstract fun invokeOnce(cause: Throwable?) + + /* + * This method can be invoked more than once thus it's protected + * with atomic flag. + * + * It can be invoked twice via [invokeOnCompletion(invokeImmediately = true)] + * when addLastIf fails due to the race with Job state update. + * In that case, we cannot distinguish the situation when the node was added + * and the external state updater actually invoked it from the situation + * when the state updater already invoked all elements from a list and then + * we added a new node to already abandoned list. + * So, when addLastIf fails, we invoke handler in-place, using + * markInvoked as protection against the former case. + */ + final override fun invoke(cause: Throwable?) { + if (!markInvoked()) return + invokeOnce(cause) + } + fun markInvoked() = isInvoked.compareAndSet(false, true) override fun toString() = "$classSimpleName@$hexAddress[job@${job.hexAddress}]" } @@ -1387,20 +1452,22 @@ internal class InactiveNodeList( private class InvokeOnCompletion( private val handler: CompletionHandler -) : JobNode() { - override fun invoke(cause: Throwable?) = handler.invoke(cause) +) : JobNode() { + override fun invokeOnce(cause: Throwable?) = handler.invoke(cause) + override fun toString() = "InvokeOnCompletion[$classSimpleName@$hexAddress]" } private class ResumeOnCompletion( private val continuation: Continuation ) : JobNode() { - override fun invoke(cause: Throwable?) = continuation.resume(Unit) + override fun invokeOnce(cause: Throwable?) = continuation.resume(Unit) + override fun toString() = "ResumeOnCompletion[$continuation]" } private class ResumeAwaitOnCompletion( private val continuation: CancellableContinuationImpl ) : JobNode() { - override fun invoke(cause: Throwable?) { + override fun invokeOnce(cause: Throwable?) { val state = job.state assert { state !is Incomplete } if (state is CompletedExceptionally) { @@ -1412,32 +1479,36 @@ private class ResumeAwaitOnCompletion( continuation.resume(state.unboxState() as T) } } + override fun toString() = "ResumeAwaitOnCompletion[$continuation]" } internal class DisposeOnCompletion( private val handle: DisposableHandle ) : JobNode() { - override fun invoke(cause: Throwable?) = handle.dispose() + override fun invokeOnce(cause: Throwable?) = handle.dispose() + override fun toString(): String = "DisposeOnCompletion[${handle}]" } private class SelectJoinOnCompletion( private val select: SelectInstance, private val block: suspend () -> R ) : JobNode() { - override fun invoke(cause: Throwable?) { + override fun invokeOnce(cause: Throwable?) { if (select.trySelect()) block.startCoroutineCancellable(select.completion) } + override fun toString(): String = "SelectJoinOnCompletion[$select]" } private class SelectAwaitOnCompletion( private val select: SelectInstance, private val block: suspend (T) -> R ) : JobNode() { - override fun invoke(cause: Throwable?) { + override fun invokeOnce(cause: Throwable?) { if (select.trySelect()) job.selectAwaitCompletion(select, block) } + override fun toString(): String = "SelectAwaitOnCompletion[$select]" } // -------- invokeOnCancellation nodes @@ -1450,28 +1521,28 @@ internal abstract class JobCancellingNode : JobNode() private class InvokeOnCancelling( private val handler: CompletionHandler -) : JobCancellingNode() { +) : JobCancellingNode() { // delegate handler shall be invoked at most once, so here is an additional flag - private val _invoked = atomic(0) // todo: replace with atomic boolean after migration to recent atomicFu - override fun invoke(cause: Throwable?) { - if (_invoked.compareAndSet(0, 1)) handler.invoke(cause) - } + override fun invokeOnce(cause: Throwable?) = handler.invoke(cause) + override fun toString() = "InvokeOnCancelling[$classSimpleName@$hexAddress]" } internal class ChildHandleNode( @JvmField val childJob: ChildJob ) : JobCancellingNode(), ChildHandle { override val parent: Job get() = job - override fun invoke(cause: Throwable?) = childJob.parentCancelled(job) + override fun invokeOnce(cause: Throwable?) = childJob.parentCancelled(job) override fun childCancelled(cause: Throwable): Boolean = job.childCancelled(cause) + override fun toString(): String = "ChildHandle[$childJob]" } // Same as ChildHandleNode, but for cancellable continuation internal class ChildContinuation( @JvmField val child: CancellableContinuationImpl<*> ) : JobCancellingNode() { - override fun invoke(cause: Throwable?) { + override fun invokeOnce(cause: Throwable?) = child.parentCancelled(child.getContinuationCancellationCause(job)) - } + override fun toString(): String = + "ChildContinuation[$child]" } diff --git a/kotlinx-coroutines-core/common/src/internal/LockFreeLinkedList.common.kt b/kotlinx-coroutines-core/common/src/internal/LockFreeLinkedList.common.kt index 8b20ade1f0..4019ef6156 100644 --- a/kotlinx-coroutines-core/common/src/internal/LockFreeLinkedList.common.kt +++ b/kotlinx-coroutines-core/common/src/internal/LockFreeLinkedList.common.kt @@ -15,7 +15,6 @@ public expect open class LockFreeLinkedListNode() { public val prevNode: LockFreeLinkedListNode public fun addLast(node: LockFreeLinkedListNode) public fun addOneIfEmpty(node: LockFreeLinkedListNode): Boolean - public inline fun addLastIf(node: LockFreeLinkedListNode, crossinline condition: () -> Boolean): Boolean public inline fun addLastIfPrev( node: LockFreeLinkedListNode, predicate: (LockFreeLinkedListNode) -> Boolean diff --git a/kotlinx-coroutines-core/common/src/selects/Select.kt b/kotlinx-coroutines-core/common/src/selects/Select.kt index a313de3d5d..87ac94ec34 100644 --- a/kotlinx-coroutines-core/common/src/selects/Select.kt +++ b/kotlinx-coroutines-core/common/src/selects/Select.kt @@ -336,9 +336,10 @@ internal class SelectBuilderImpl( private inner class SelectOnCancelling : JobCancellingNode() { // Note: may be invoked multiple times, but only the first trySelect succeeds anyway - override fun invoke(cause: Throwable?) { - if (trySelect()) + override fun invokeOnce(cause: Throwable?) { + if (trySelect()) { resumeSelectWithException(job.getCancellationException()) + } } } diff --git a/kotlinx-coroutines-core/concurrent/src/internal/LockFreeLinkedList.kt b/kotlinx-coroutines-core/concurrent/src/internal/LockFreeLinkedList.kt index b4b36dad34..c6ce278987 100644 --- a/kotlinx-coroutines-core/concurrent/src/internal/LockFreeLinkedList.kt +++ b/kotlinx-coroutines-core/concurrent/src/internal/LockFreeLinkedList.kt @@ -86,12 +86,6 @@ public actual open class LockFreeLinkedListNode { } } - @PublishedApi - internal inline fun makeCondAddOp(node: Node, crossinline condition: () -> Boolean): CondAddOp = - object : CondAddOp(node) { - override fun prepare(affected: Node): Any? = if (condition()) null else CONDITION_FALSE - } - public actual open val isRemoved: Boolean get() = next is Removed // LINEARIZABLE. Returns Node | Removed @@ -147,20 +141,6 @@ public actual open class LockFreeLinkedListNode { public fun describeAddLast(node: T): AddLastDesc = AddLastDesc(this, node) - /** - * Adds last item to this list atomically if the [condition] is true. - */ - public actual inline fun addLastIf(node: Node, crossinline condition: () -> Boolean): Boolean { - val condAdd = makeCondAddOp(node, condition) - while (true) { // lock-free loop on prev.next - val prev = prevNode // sentinel node is never removed, so prev is always defined - when (prev.tryCondAddNext(node, this, condAdd)) { - SUCCESS -> return true - FAILURE -> return false - } - } - } - public actual inline fun addLastIfPrev(node: Node, predicate: (Node) -> Boolean): Boolean { while (true) { // lock-free loop on prev.next val prev = prevNode // sentinel node is never removed, so prev is always defined @@ -174,7 +154,9 @@ public actual open class LockFreeLinkedListNode { predicate: (Node) -> Boolean, // prev node predicate crossinline condition: () -> Boolean // atomically checked condition ): Boolean { - val condAdd = makeCondAddOp(node, condition) + val condAdd = object : CondAddOp(node) { + override fun prepare(affected: Node): Any? = if (condition()) null else CONDITION_FALSE + } while (true) { // lock-free loop on prev.next val prev = prevNode // sentinel node is never removed, so prev is always defined if (!predicate(prev)) return false diff --git a/kotlinx-coroutines-core/concurrent/test/internal/LockFreeLinkedListTest.kt b/kotlinx-coroutines-core/concurrent/test/internal/LockFreeLinkedListTest.kt index 7e85d495fc..ebf9f44cac 100644 --- a/kotlinx-coroutines-core/concurrent/test/internal/LockFreeLinkedListTest.kt +++ b/kotlinx-coroutines-core/concurrent/test/internal/LockFreeLinkedListTest.kt @@ -32,20 +32,6 @@ class LockFreeLinkedListTest { assertContents(list) } - @Test - fun testCondOps() { - val list = LockFreeLinkedListHead() - assertContents(list) - assertTrue(list.addLastIf(IntNode(1)) { true }) - assertContents(list, 1) - assertFalse(list.addLastIf(IntNode(2)) { false }) - assertContents(list, 1) - assertTrue(list.addLastIf(IntNode(3)) { true }) - assertContents(list, 1, 3) - assertFalse(list.addLastIf(IntNode(4)) { false }) - assertContents(list, 1, 3) - } - @Test fun testAtomicOpsSingle() { val list = LockFreeLinkedListHead() diff --git a/kotlinx-coroutines-core/js/src/internal/LinkedList.kt b/kotlinx-coroutines-core/js/src/internal/LinkedList.kt index d8c07f4e19..73af5d78a1 100644 --- a/kotlinx-coroutines-core/js/src/internal/LinkedList.kt +++ b/kotlinx-coroutines-core/js/src/internal/LinkedList.kt @@ -59,12 +59,6 @@ public open class LinkedListNode { return true } - public inline fun addLastIf(node: Node, crossinline condition: () -> Boolean): Boolean { - if (!condition()) return false - addLast(node) - return true - } - public inline fun addLastIfPrev(node: Node, predicate: (Node) -> Boolean): Boolean { if (!predicate(_prev)) return false addLast(node) diff --git a/kotlinx-coroutines-core/jvm/src/Future.kt b/kotlinx-coroutines-core/jvm/src/Future.kt index b27a970845..f935c2a4dc 100644 --- a/kotlinx-coroutines-core/jvm/src/Future.kt +++ b/kotlinx-coroutines-core/jvm/src/Future.kt @@ -35,7 +35,7 @@ public fun CancellableContinuation<*>.cancelFutureOnCancellation(future: Future< private class CancelFutureOnCompletion( private val future: Future<*> ) : JobNode() { - override fun invoke(cause: Throwable?) { + override fun invokeOnce(cause: Throwable?) { // Don't interrupt when cancelling future on completion, because no one is going to reset this // interruption flag and it will cause spurious failures elsewhere if (cause != null) future.cancel(false) diff --git a/kotlinx-coroutines-core/jvm/test/internal/LockFreeLinkedListShortStressTest.kt b/kotlinx-coroutines-core/jvm/test/internal/LockFreeLinkedListShortStressTest.kt index 2ac51b9b1d..e692cb4c64 100644 --- a/kotlinx-coroutines-core/jvm/test/internal/LockFreeLinkedListShortStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/internal/LockFreeLinkedListShortStressTest.kt @@ -40,23 +40,14 @@ class LockFreeLinkedListShortStressTest : TestBase() { threads += thread(start = false, name = "adder-$threadId") { val rnd = Random() while (System.currentTimeMillis() < deadline) { - var node: IntNode? = IntNode(threadId) - when (rnd.nextInt(3)) { - 0 -> list.addLast(node!!) - 1 -> assertTrue(list.addLastIf(node!!, { true })) // just to test conditional add - 2 -> { // just to test failed conditional add - assertFalse(list.addLastIf(node!!, { false })) - node = null - } - } - if (node != null) { - if (node.remove()) { - undone.incrementAndGet() - } else { - // randomly help other removal's completion - if (rnd.nextBoolean()) node.helpRemove() - missed.incrementAndGet() - } + val node = IntNode(threadId) + list.addLast(node) + if (node.remove()) { + undone.incrementAndGet() + } else { + // randomly help other removal's completion + if (rnd.nextBoolean()) node.helpRemove() + missed.incrementAndGet() } } completedAdder.incrementAndGet()