From fc16ec395ea9958fa8a8d6849c0c91f777dc01ec Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Tue, 17 May 2022 12:50:37 +0300 Subject: [PATCH] Fix potential data-race in EventLoop Fixes #3251 --- .../common/src/EventLoop.common.kt | 8 ++- .../common/src/internal/ThreadSafeHeap.kt | 2 - .../test/internal/ThreadSafeHeapStressTest.kt | 66 +++++++++++++++++++ .../jvm/test/internal/ThreadSafeHeapTest.kt | 2 +- 4 files changed, 74 insertions(+), 4 deletions(-) create mode 100644 kotlinx-coroutines-core/jvm/test/internal/ThreadSafeHeapStressTest.kt diff --git a/kotlinx-coroutines-core/common/src/EventLoop.common.kt b/kotlinx-coroutines-core/common/src/EventLoop.common.kt index f4f61b25d4..7d223da06c 100644 --- a/kotlinx-coroutines-core/common/src/EventLoop.common.kt +++ b/kotlinx-coroutines-core/common/src/EventLoop.common.kt @@ -236,8 +236,13 @@ internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay { if (timeNanos < MAX_DELAY_NS) { val now = nanoTime() DelayedResumeTask(now + timeNanos, continuation).also { task -> - continuation.disposeOnCancellation(task) + /* + * Order is important here: first we schedule heap and only then + * publish it to continuation. Otherwise, `DelayedResumeTask` should know + * how to be disposed when it's in the process of scheduling. + */ schedule(now, task) + continuation.disposeOnCancellation(task) } } } @@ -410,6 +415,7 @@ internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay { */ @JvmField var nanoTime: Long ) : Runnable, Comparable, DisposableHandle, ThreadSafeHeapNode { + @Volatile private var _heap: Any? = null // null | ThreadSafeHeap | DISPOSED_TASK override var heap: ThreadSafeHeap<*>? diff --git a/kotlinx-coroutines-core/common/src/internal/ThreadSafeHeap.kt b/kotlinx-coroutines-core/common/src/internal/ThreadSafeHeap.kt index 43b7e9de51..2100454be3 100644 --- a/kotlinx-coroutines-core/common/src/internal/ThreadSafeHeap.kt +++ b/kotlinx-coroutines-core/common/src/internal/ThreadSafeHeap.kt @@ -47,7 +47,6 @@ public open class ThreadSafeHeap : SynchronizedObject() where T: ThreadSafeHe } } - // @Synchronized // NOTE! NOTE! NOTE! inline fun cannot be @Synchronized public inline fun removeFirstIf(predicate: (T) -> Boolean): T? = synchronized(this) { val first = firstImpl() ?: return null if (predicate(first)) { @@ -59,7 +58,6 @@ public open class ThreadSafeHeap : SynchronizedObject() where T: ThreadSafeHe public fun addLast(node: T): Unit = synchronized(this) { addImpl(node) } - // @Synchronized // NOTE! NOTE! NOTE! inline fun cannot be @Synchronized // Condition also receives current first node in the heap public inline fun addLastIf(node: T, cond: (T?) -> Boolean): Boolean = synchronized(this) { if (cond(firstImpl())) { diff --git a/kotlinx-coroutines-core/jvm/test/internal/ThreadSafeHeapStressTest.kt b/kotlinx-coroutines-core/jvm/test/internal/ThreadSafeHeapStressTest.kt new file mode 100644 index 0000000000..175f92d202 --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/internal/ThreadSafeHeapStressTest.kt @@ -0,0 +1,66 @@ +/* + * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.internal + +import kotlinx.coroutines.* +import java.util.concurrent.* +import java.util.concurrent.CancellationException +import kotlin.test.* + +class ThreadSafeHeapStressTest : TestBase() { + private class DisposableNode : EventLoopImplBase.DelayedTask(1L) { + override fun run() { + } + } + + @Test + fun testConcurrentRemoveDispose() = runTest { + val heap = EventLoopImplBase.DelayedTaskQueue(1) + repeat(10_000 * stressTestMultiplierSqrt) { + withContext(Dispatchers.Default) { + val node = DisposableNode() + val barrier = CyclicBarrier(2) + launch { + heap.addLast(node) + barrier.await() + heap.remove(node) + } + launch { + barrier.await() + Thread.yield() + node.dispose() + } + } + } + } + + @Test() + fun testConcurrentAddDispose() = runTest { + repeat(10_000 * stressTestMultiplierSqrt) { + val jobToCancel = Job() + val barrier = CyclicBarrier(2) + val jobToJoin = launch(Dispatchers.Default) { + barrier.await() + jobToCancel.cancelAndJoin() + } + + try { + runBlocking { // Use event loop impl + withContext(jobToCancel) { + // This one is to workaround heap allocation optimization + launch(start = CoroutineStart.UNDISPATCHED) { + delay(100_000) + } + barrier.await() + delay(100_000) + } + } + } catch (e: CancellationException) { + // Expected exception + } + jobToJoin.join() + } + } +} diff --git a/kotlinx-coroutines-core/jvm/test/internal/ThreadSafeHeapTest.kt b/kotlinx-coroutines-core/jvm/test/internal/ThreadSafeHeapTest.kt index be7ed91a3f..ee0960c849 100644 --- a/kotlinx-coroutines-core/jvm/test/internal/ThreadSafeHeapTest.kt +++ b/kotlinx-coroutines-core/jvm/test/internal/ThreadSafeHeapTest.kt @@ -93,4 +93,4 @@ class ThreadSafeHeapTest : TestBase() { assertEquals(set.size, h.size) } } -} \ No newline at end of file +}