Skip to content

Commit

Permalink
JobSupport: non-atomic adding to a Job's list of listeners (#2096)
Browse files Browse the repository at this point in the history
* Get rid of DCSS (double compare single swap) primitive in JobSupport in order to reduce both b binary size and performance of list addition 

Co-authored-by: SokolovaMaria <maria.sokolova@jetbrains.com>
Co-authored-by: Vsevolod Tolstopyatov <qwwdfsad@gmail.com>
  • Loading branch information
3 people committed Nov 16, 2021
1 parent e2a6827 commit 2b1e276
Show file tree
Hide file tree
Showing 10 changed files with 136 additions and 110 deletions.
4 changes: 2 additions & 2 deletions kotlinx-coroutines-core/common/src/Await.kt
Expand Up @@ -107,8 +107,8 @@ private class AwaitAll<T>(private val deferreds: Array<out Deferred<T>>) {
var disposer: DisposeHandlersOnCancel?
get() = _disposer.value
set(value) { _disposer.value = value }
override fun invoke(cause: Throwable?) {

override fun invokeOnce(cause: Throwable?) {
if (cause != null) {
val token = continuation.tryResumeWithException(cause)
if (token != null) {
Expand Down
Expand Up @@ -44,4 +44,6 @@ internal expect val CancelHandlerBase.asHandler: CompletionHandler
// because we play type tricks on Kotlin/JS and handler is not necessarily a function there
internal expect fun CompletionHandler.invokeIt(cause: Throwable?)

// :KLUDGE: We have to use `isHandlerOf` extension, because performing this type check directly in the code
// causes Incompatible types error during Kotlin/JS compilation
internal inline fun <reified T> CompletionHandler.isHandlerOf(): Boolean = this is T
163 changes: 117 additions & 46 deletions kotlinx-coroutines-core/common/src/JobSupport.kt
Expand Up @@ -456,51 +456,86 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
// Create node upfront -- for common cases it just initializes JobNode.job field,
// for user-defined handlers it allocates a JobNode object that we might not need, but this is Ok.
val node: JobNode = makeNode(handler, onCancelling)
/*
* LinkedList algorithm does not support removing and re-adding the same node,
* so here we check whether the node is already added to the list to avoid adding the node twice.
*/
var isNodeAdded = false
loopOnState { state ->
when (state) {
is Empty -> { // EMPTY_X state -- no completion handlers
if (state.isActive) {
// try move to SINGLE state
if (_state.compareAndSet(state, node)) return node
} else
} else {
promoteEmptyToNodeList(state) // that way we can add listener for non-active coroutine
}
}
is Incomplete -> {
val list = state.list
if (list == null) { // SINGLE/SINGLE+
promoteSingleToNodeList(state as JobNode)
} else {
var rootCause: Throwable? = null
var handle: DisposableHandle = NonDisposableHandle
if (onCancelling && state is Finishing) {
synchronized(state) {
// check if we are installing cancellation handler on job that is being cancelled
rootCause = state.rootCause // != null if cancelling job
// We add node to the list in two cases --- either the job is not being cancelled
// or we are adding a child to a coroutine that is not completing yet
if (rootCause == null || handler.isHandlerOf<ChildHandleNode>() && !state.isCompleting) {
// Note: add node the list while holding lock on state (make sure it cannot change)
if (!addLastAtomic(state, list, node)) return@loopOnState // retry
// just return node if we don't have to invoke handler (not cancelling yet)
if (rootCause == null) return node
// otherwise handler is invoked immediately out of the synchronized section & handle returned
handle = node
return@loopOnState // retry
}
// ...else {, but without nesting
var rootCause: Throwable? = null
var handle: DisposableHandle = NonDisposableHandle
if (onCancelling && state is Finishing) {
synchronized(state) {
// check if we are installing cancellation handler on job that is being cancelled
rootCause = state.rootCause // != null if cancelling job
// We add node to the list in two cases --- either the job is not being cancelled
// or we are adding a child to a coroutine that is not completing yet
if (rootCause == null || handler.isHandlerOf<ChildHandleNode>() && !state.isCompleting) {
// Note: add node the list while holding lock on state (make sure it cannot change)
if (!isNodeAdded) list.addLast(node)
if (this.state !== state) {
/*
* Here we have an additional check for ChildCompletion. Invoking the handler once is not enough
* for this particular kind of node -- the caller makes a decision based on whether the node was added
* or not and that decision should be made **once**.
* To be more precise, the caller of ChildCompletion, in case when it's the last child,
* should make a decision whether to start transition to the final state, based on
* whether the ChildCompletion was added to the list or not. If not -- the JobNode.invoke will do that.
* See comment to JobNode.invoke, we cannot differentiate the situation when external state updater
* invoked or skipped the node, thus we additionally synchronize on 'markInvoked'.
*/
if (node is ChildCompletion && !node.markInvoked()) return node
// The state can only change to Complete here, so the node can stay in the list, just retry
return@loopOnState
}
// just return node if we don't have to invoke handler (not cancelling yet)
if (rootCause == null) return node
// otherwise handler is invoked immediately out of the synchronized section & handle returned
handle = node
}
}
if (rootCause != null) {
// Note: attachChild uses invokeImmediately, so it gets invoked when adding to cancelled job
if (invokeImmediately) handler.invokeIt(rootCause)
return handle
}
if (rootCause != null) {
// Note: attachChild uses invokeImmediately, so it gets invoked when adding to cancelled job
if (invokeImmediately) {
node.invoke(rootCause)
}
return handle
} else {
if (!isNodeAdded) list.addLast(node)
if (this.state !== state) {
// Here again, we try to prevent concurrent finalization of the parent,
// if the parent fails to add ChildCompletion because the child changed it's state to Completed.
if (node is ChildCompletion && !node.markInvoked()) return node
// If the state has changed to Complete, the node can stay in the list, just retry.
if (this.state !is Incomplete) return@loopOnState
// If the state is Incomplete, set the flag that the node is already added to the list instead of removing it.
isNodeAdded = true
} else {
if (addLastAtomic(state, list, node)) return node
return node
}
}
}
else -> { // is complete
// :KLUDGE: We have to invoke a handler in platform-specific way via `invokeIt` extension,
// because we play type tricks on Kotlin/JS and handler is not necessarily a function there
if (invokeImmediately) handler.invokeIt((state as? CompletedExceptionally)?.cause)
if (invokeImmediately) {
node.invoke((state as? CompletedExceptionally)?.cause)
}
return NonDisposableHandle
}
}
Expand All @@ -520,9 +555,6 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
return node
}

private fun addLastAtomic(expect: Any, list: NodeList, node: JobNode) =
list.addLastIf(node) { this.state === expect }

private fun promoteEmptyToNodeList(state: Empty) {
// try to promote it to LIST state with the corresponding state
val list = NodeList()
Expand Down Expand Up @@ -596,7 +628,13 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
}
is Incomplete -> { // may have a list of completion handlers
// remove node from the list if there is a list
if (state.list != null) node.remove()
if (state.list != null) {
if (!node.remove() && node.isRemoved) {
// Note: .remove() returns 'false' if the node wasn't added at all, e.g.
// because it was an optimized "single element list"
node.helpRemove()
}
}
return
}
else -> return // it is complete and does not have any completion handlers
Expand Down Expand Up @@ -814,7 +852,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
}
}
}
}
}

/**
* Completes this job. Used by [AbstractCoroutine.resume].
Expand Down Expand Up @@ -1151,9 +1189,11 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
private val child: ChildHandleNode,
private val proposedUpdate: Any?
) : JobNode() {
override fun invoke(cause: Throwable?) {
override fun invokeOnce(cause: Throwable?) {
parent.continueCompleting(state, child, proposedUpdate)
}
override fun toString(): String =
"ChildCompletion[$child, $proposedUpdate]"
}

private class AwaitContinuation<T>(
Expand Down Expand Up @@ -1355,6 +1395,31 @@ internal abstract class JobNode : CompletionHandlerBase(), DisposableHandle, Inc
override val isActive: Boolean get() = true
override val list: NodeList? get() = null
override fun dispose() = job.removeNode(this)
private val isInvoked = atomic(false)

/*
* This method is guaranteed to be invoked once per JobNode lifecycle.
*/
protected abstract fun invokeOnce(cause: Throwable?)

/*
* This method can be invoked more than once thus it's protected
* with atomic flag.
*
* It can be invoked twice via [invokeOnCompletion(invokeImmediately = true)]
* when addLastIf fails due to the race with Job state update.
* In that case, we cannot distinguish the situation when the node was added
* and the external state updater actually invoked it from the situation
* when the state updater already invoked all elements from a list and then
* we added a new node to already abandoned list.
* So, when addLastIf fails, we invoke handler in-place, using
* markInvoked as protection against the former case.
*/
final override fun invoke(cause: Throwable?) {
if (!markInvoked()) return
invokeOnce(cause)
}
fun markInvoked() = isInvoked.compareAndSet(false, true)
override fun toString() = "$classSimpleName@$hexAddress[job@${job.hexAddress}]"
}

Expand Down Expand Up @@ -1387,20 +1452,22 @@ internal class InactiveNodeList(

private class InvokeOnCompletion(
private val handler: CompletionHandler
) : JobNode() {
override fun invoke(cause: Throwable?) = handler.invoke(cause)
) : JobNode() {
override fun invokeOnce(cause: Throwable?) = handler.invoke(cause)
override fun toString() = "InvokeOnCompletion[$classSimpleName@$hexAddress]"
}

private class ResumeOnCompletion(
private val continuation: Continuation<Unit>
) : JobNode() {
override fun invoke(cause: Throwable?) = continuation.resume(Unit)
override fun invokeOnce(cause: Throwable?) = continuation.resume(Unit)
override fun toString() = "ResumeOnCompletion[$continuation]"
}

private class ResumeAwaitOnCompletion<T>(
private val continuation: CancellableContinuationImpl<T>
) : JobNode() {
override fun invoke(cause: Throwable?) {
override fun invokeOnce(cause: Throwable?) {
val state = job.state
assert { state !is Incomplete }
if (state is CompletedExceptionally) {
Expand All @@ -1412,32 +1479,36 @@ private class ResumeAwaitOnCompletion<T>(
continuation.resume(state.unboxState() as T)
}
}
override fun toString() = "ResumeAwaitOnCompletion[$continuation]"
}

internal class DisposeOnCompletion(
private val handle: DisposableHandle
) : JobNode() {
override fun invoke(cause: Throwable?) = handle.dispose()
override fun invokeOnce(cause: Throwable?) = handle.dispose()
override fun toString(): String = "DisposeOnCompletion[${handle}]"
}

private class SelectJoinOnCompletion<R>(
private val select: SelectInstance<R>,
private val block: suspend () -> R
) : JobNode() {
override fun invoke(cause: Throwable?) {
override fun invokeOnce(cause: Throwable?) {
if (select.trySelect())
block.startCoroutineCancellable(select.completion)
}
override fun toString(): String = "SelectJoinOnCompletion[$select]"
}

private class SelectAwaitOnCompletion<T, R>(
private val select: SelectInstance<R>,
private val block: suspend (T) -> R
) : JobNode() {
override fun invoke(cause: Throwable?) {
override fun invokeOnce(cause: Throwable?) {
if (select.trySelect())
job.selectAwaitCompletion(select, block)
}
override fun toString(): String = "SelectAwaitOnCompletion[$select]"
}

// -------- invokeOnCancellation nodes
Expand All @@ -1450,28 +1521,28 @@ internal abstract class JobCancellingNode : JobNode()

private class InvokeOnCancelling(
private val handler: CompletionHandler
) : JobCancellingNode() {
) : JobCancellingNode() {
// delegate handler shall be invoked at most once, so here is an additional flag
private val _invoked = atomic(0) // todo: replace with atomic boolean after migration to recent atomicFu
override fun invoke(cause: Throwable?) {
if (_invoked.compareAndSet(0, 1)) handler.invoke(cause)
}
override fun invokeOnce(cause: Throwable?) = handler.invoke(cause)
override fun toString() = "InvokeOnCancelling[$classSimpleName@$hexAddress]"
}

internal class ChildHandleNode(
@JvmField val childJob: ChildJob
) : JobCancellingNode(), ChildHandle {
override val parent: Job get() = job
override fun invoke(cause: Throwable?) = childJob.parentCancelled(job)
override fun invokeOnce(cause: Throwable?) = childJob.parentCancelled(job)
override fun childCancelled(cause: Throwable): Boolean = job.childCancelled(cause)
override fun toString(): String = "ChildHandle[$childJob]"
}

// Same as ChildHandleNode, but for cancellable continuation
internal class ChildContinuation(
@JvmField val child: CancellableContinuationImpl<*>
) : JobCancellingNode() {
override fun invoke(cause: Throwable?) {
override fun invokeOnce(cause: Throwable?) =
child.parentCancelled(child.getContinuationCancellationCause(job))
}
override fun toString(): String =
"ChildContinuation[$child]"
}

Expand Up @@ -15,7 +15,6 @@ public expect open class LockFreeLinkedListNode() {
public val prevNode: LockFreeLinkedListNode
public fun addLast(node: LockFreeLinkedListNode)
public fun addOneIfEmpty(node: LockFreeLinkedListNode): Boolean
public inline fun addLastIf(node: LockFreeLinkedListNode, crossinline condition: () -> Boolean): Boolean
public inline fun addLastIfPrev(
node: LockFreeLinkedListNode,
predicate: (LockFreeLinkedListNode) -> Boolean
Expand Down
5 changes: 3 additions & 2 deletions kotlinx-coroutines-core/common/src/selects/Select.kt
Expand Up @@ -336,9 +336,10 @@ internal class SelectBuilderImpl<in R>(

private inner class SelectOnCancelling : JobCancellingNode() {
// Note: may be invoked multiple times, but only the first trySelect succeeds anyway
override fun invoke(cause: Throwable?) {
if (trySelect())
override fun invokeOnce(cause: Throwable?) {
if (trySelect()) {
resumeSelectWithException(job.getCancellationException())
}
}
}

Expand Down
Expand Up @@ -86,12 +86,6 @@ public actual open class LockFreeLinkedListNode {
}
}

@PublishedApi
internal inline fun makeCondAddOp(node: Node, crossinline condition: () -> Boolean): CondAddOp =
object : CondAddOp(node) {
override fun prepare(affected: Node): Any? = if (condition()) null else CONDITION_FALSE
}

public actual open val isRemoved: Boolean get() = next is Removed

// LINEARIZABLE. Returns Node | Removed
Expand Down Expand Up @@ -147,20 +141,6 @@ public actual open class LockFreeLinkedListNode {

public fun <T : Node> describeAddLast(node: T): AddLastDesc<T> = AddLastDesc(this, node)

/**
* Adds last item to this list atomically if the [condition] is true.
*/
public actual inline fun addLastIf(node: Node, crossinline condition: () -> Boolean): Boolean {
val condAdd = makeCondAddOp(node, condition)
while (true) { // lock-free loop on prev.next
val prev = prevNode // sentinel node is never removed, so prev is always defined
when (prev.tryCondAddNext(node, this, condAdd)) {
SUCCESS -> return true
FAILURE -> return false
}
}
}

public actual inline fun addLastIfPrev(node: Node, predicate: (Node) -> Boolean): Boolean {
while (true) { // lock-free loop on prev.next
val prev = prevNode // sentinel node is never removed, so prev is always defined
Expand All @@ -174,7 +154,9 @@ public actual open class LockFreeLinkedListNode {
predicate: (Node) -> Boolean, // prev node predicate
crossinline condition: () -> Boolean // atomically checked condition
): Boolean {
val condAdd = makeCondAddOp(node, condition)
val condAdd = object : CondAddOp(node) {
override fun prepare(affected: Node): Any? = if (condition()) null else CONDITION_FALSE
}
while (true) { // lock-free loop on prev.next
val prev = prevNode // sentinel node is never removed, so prev is always defined
if (!predicate(prev)) return false
Expand Down
Expand Up @@ -32,20 +32,6 @@ class LockFreeLinkedListTest {
assertContents(list)
}

@Test
fun testCondOps() {
val list = LockFreeLinkedListHead()
assertContents(list)
assertTrue(list.addLastIf(IntNode(1)) { true })
assertContents(list, 1)
assertFalse(list.addLastIf(IntNode(2)) { false })
assertContents(list, 1)
assertTrue(list.addLastIf(IntNode(3)) { true })
assertContents(list, 1, 3)
assertFalse(list.addLastIf(IntNode(4)) { false })
assertContents(list, 1, 3)
}

@Test
fun testAtomicOpsSingle() {
val list = LockFreeLinkedListHead()
Expand Down

0 comments on commit 2b1e276

Please sign in to comment.