Skip to content

Commit

Permalink
Fix limitedParallelism implementation on K/N
Browse files Browse the repository at this point in the history
The initial implementation predates new memory model and was never working on it

Fixes #3223
  • Loading branch information
qwwdfsad committed Mar 29, 2022
1 parent b545807 commit 7048a1a
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 12 deletions.
Expand Up @@ -23,6 +23,9 @@ internal class LimitedDispatcher(

private val queue = LockFreeTaskQueue<Runnable>(singleConsumer = false)

// A separate object that we can synchronize on for K/N
private val workerAllocationLock = SynchronizedObject()

@ExperimentalCoroutinesApi
override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
parallelism.checkParallelism()
Expand Down Expand Up @@ -50,8 +53,7 @@ internal class LimitedDispatcher(
continue
}

@Suppress("CAST_NEVER_SUCCEEDS")
synchronized(this as SynchronizedObject) {
synchronized(workerAllocationLock) {
--runningWorkers
if (queue.size == 0) return
++runningWorkers
Expand Down Expand Up @@ -87,8 +89,7 @@ internal class LimitedDispatcher(
}

private fun tryAllocateWorker(): Boolean {
@Suppress("CAST_NEVER_SUCCEEDS")
synchronized(this as SynchronizedObject) {
synchronized(workerAllocationLock) {
if (runningWorkers >= parallelism) return false
++runningWorkers
return true
Expand Down
@@ -0,0 +1,30 @@
/*
* Copyright 2016-2022 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines

import kotlin.test.*

class LimitedParallelismSharedTest : TestBase() {
@Test
fun testTaskFairness() = runTest {
val view = Dispatchers.Default.limitedParallelism(1)
val view2 = Dispatchers.Default.limitedParallelism(1)
val j1 = launch(view) {
while (true) {
yield()
}
}
val j2 = launch(view2) { j1.cancel() }
joinAll(j1, j2)
}

@Test
fun testParallelismSpec() {
assertFailsWith<IllegalArgumentException> { Dispatchers.Default.limitedParallelism(0) }
assertFailsWith<IllegalArgumentException> { Dispatchers.Default.limitedParallelism(-1) }
assertFailsWith<IllegalArgumentException> { Dispatchers.Default.limitedParallelism(Int.MIN_VALUE) }
Dispatchers.Default.limitedParallelism(Int.MAX_VALUE)
}
}
@@ -0,0 +1,44 @@
/*
* Copyright 2016-2022 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

import kotlinx.atomicfu.*
import kotlinx.coroutines.*
import kotlinx.coroutines.exceptions.*
import kotlin.test.*

class LimitedParallelismSharedStressTest : TestBase() {

private val targetParallelism = 4
private val iterations = 100_000
private val parallelism = atomic(0)

private fun checkParallelism() {
val value = parallelism.incrementAndGet()
randomWait()
assertTrue { value <= targetParallelism }
parallelism.decrementAndGet()
}

@Test
fun testLimitedExecutor() = runMtTest {
val executor = newFixedThreadPoolContext(targetParallelism, "test")
val view = executor.limitedParallelism(targetParallelism)
doStress {
repeat(iterations) {
launch(view) {
checkParallelism()
}
}
}
executor.close()
}

private suspend inline fun doStress(crossinline block: suspend CoroutineScope.() -> Unit) {
repeat(stressTestMultiplier) {
coroutineScope {
block()
}
}
}
}
8 changes: 0 additions & 8 deletions kotlinx-coroutines-core/jvm/test/LimitedParallelismTest.kt
Expand Up @@ -11,14 +11,6 @@ import kotlin.test.*

class LimitedParallelismTest : TestBase() {

@Test
fun testParallelismSpec() {
assertFailsWith<IllegalArgumentException> { Dispatchers.Default.limitedParallelism(0) }
assertFailsWith<IllegalArgumentException> { Dispatchers.Default.limitedParallelism(-1) }
assertFailsWith<IllegalArgumentException> { Dispatchers.Default.limitedParallelism(Int.MIN_VALUE) }
Dispatchers.Default.limitedParallelism(Int.MAX_VALUE)
}

@Test
fun testTaskFairness() = runTest {
val executor = newSingleThreadContext("test")
Expand Down

0 comments on commit 7048a1a

Please sign in to comment.