Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix potential data-race in EventLoop #3289

Merged
merged 3 commits into from May 23, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 7 additions & 1 deletion kotlinx-coroutines-core/common/src/EventLoop.common.kt
Expand Up @@ -236,8 +236,13 @@ internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay {
if (timeNanos < MAX_DELAY_NS) {
val now = nanoTime()
DelayedResumeTask(now + timeNanos, continuation).also { task ->
continuation.disposeOnCancellation(task)
/*
* Order is important here: first we schedule heap and only then
* publish it to continuation. Otherwise, `DelayedResumeTask` should know
* how to be disposed when it's in the process of scheduling.
qwwdfsad marked this conversation as resolved.
Show resolved Hide resolved
*/
schedule(now, task)
continuation.disposeOnCancellation(task)
}
}
}
Expand Down Expand Up @@ -410,6 +415,7 @@ internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay {
*/
@JvmField var nanoTime: Long
) : Runnable, Comparable<DelayedTask>, DisposableHandle, ThreadSafeHeapNode {
@Volatile
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without this volatile, removeAtImpl may see an arbitrary value at assert { result.heap === this }, leading to a potential crash.

private var _heap: Any? = null // null | ThreadSafeHeap | DISPOSED_TASK

override var heap: ThreadSafeHeap<*>?
Expand Down
2 changes: 0 additions & 2 deletions kotlinx-coroutines-core/common/src/internal/ThreadSafeHeap.kt
Expand Up @@ -47,7 +47,6 @@ public open class ThreadSafeHeap<T> : SynchronizedObject() where T: ThreadSafeHe
}
}

// @Synchronized // NOTE! NOTE! NOTE! inline fun cannot be @Synchronized
public inline fun removeFirstIf(predicate: (T) -> Boolean): T? = synchronized(this) {
val first = firstImpl() ?: return null
if (predicate(first)) {
Expand All @@ -59,7 +58,6 @@ public open class ThreadSafeHeap<T> : SynchronizedObject() where T: ThreadSafeHe

public fun addLast(node: T): Unit = synchronized(this) { addImpl(node) }

// @Synchronized // NOTE! NOTE! NOTE! inline fun cannot be @Synchronized
// Condition also receives current first node in the heap
public inline fun addLastIf(node: T, cond: (T?) -> Boolean): Boolean = synchronized(this) {
if (cond(firstImpl())) {
Expand Down
@@ -0,0 +1,66 @@
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.internal

import kotlinx.coroutines.*
import java.util.concurrent.*
import java.util.concurrent.CancellationException
import kotlin.test.*

class ThreadSafeHeapStressTest : TestBase() {
private class DisposableNode : EventLoopImplBase.DelayedTask(1L) {
override fun run() {
}
}

@Test
fun testConcurrentRemoveDispose() = runTest {
val heap = EventLoopImplBase.DelayedTaskQueue(1)
repeat(10_000 * stressTestMultiplierSqrt) {
withContext(Dispatchers.Default) {
val node = DisposableNode()
val barrier = CyclicBarrier(2)
launch {
heap.addLast(node)
barrier.await()
heap.remove(node)
}
launch {
barrier.await()
Thread.yield()
node.dispose()
}
}
}
}

@Test()
fun testConcurrentAddDispose() = runTest {
repeat(10_000 * stressTestMultiplierSqrt) {
val jobToCancel = Job()
val barrier = CyclicBarrier(2)
val jobToJoin = launch(Dispatchers.Default) {
barrier.await()
jobToCancel.cancelAndJoin()
}

try {
runBlocking { // Use event loop impl
withContext(jobToCancel) {
// This one is to workaround heap allocation optimization
qwwdfsad marked this conversation as resolved.
Show resolved Hide resolved
launch(start = CoroutineStart.UNDISPATCHED) {
delay(100_000)
}
barrier.await()
delay(100_000)
}
}
} catch (e: CancellationException) {
// Expected exception
}
jobToJoin.join()
}
}
}
Expand Up @@ -93,4 +93,4 @@ class ThreadSafeHeapTest : TestBase() {
assertEquals(set.size, h.size)
}
}
}
}