Skip to content

Commit

Permalink
Fix limitedParallelism implementation on K/N (#3226)
Browse files Browse the repository at this point in the history
The initial implementation predates new memory model and was never working on it

Fixes #3223
  • Loading branch information
qwwdfsad committed Mar 31, 2022
1 parent 6c326e4 commit 8133c97
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 28 deletions.
Expand Up @@ -23,6 +23,9 @@ internal class LimitedDispatcher(

private val queue = LockFreeTaskQueue<Runnable>(singleConsumer = false)

// A separate object that we can synchronize on for K/N
private val workerAllocationLock = SynchronizedObject()

@ExperimentalCoroutinesApi
override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
parallelism.checkParallelism()
Expand Down Expand Up @@ -50,8 +53,7 @@ internal class LimitedDispatcher(
continue
}

@Suppress("CAST_NEVER_SUCCEEDS")
synchronized(this as SynchronizedObject) {
synchronized(workerAllocationLock) {
--runningWorkers
if (queue.size == 0) return
++runningWorkers
Expand Down Expand Up @@ -87,8 +89,7 @@ internal class LimitedDispatcher(
}

private fun tryAllocateWorker(): Boolean {
@Suppress("CAST_NEVER_SUCCEEDS")
synchronized(this as SynchronizedObject) {
synchronized(workerAllocationLock) {
if (runningWorkers >= parallelism) return false
++runningWorkers
return true
Expand Down
@@ -0,0 +1,34 @@
/*
* Copyright 2016-2022 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines

import kotlin.test.*

class LimitedParallelismSharedTest : TestBase() {

@Test
fun testLimitedDefault() = runTest {
// Test that evaluates the very basic completion of tasks in limited dispatcher
// for all supported platforms.
// For more specific and concurrent tests, see 'concurrent' package.
val view = Dispatchers.Default.limitedParallelism(1)
val view2 = Dispatchers.Default.limitedParallelism(1)
val j1 = launch(view) {
while (true) {
yield()
}
}
val j2 = launch(view2) { j1.cancel() }
joinAll(j1, j2)
}

@Test
fun testParallelismSpec() {
assertFailsWith<IllegalArgumentException> { Dispatchers.Default.limitedParallelism(0) }
assertFailsWith<IllegalArgumentException> { Dispatchers.Default.limitedParallelism(-1) }
assertFailsWith<IllegalArgumentException> { Dispatchers.Default.limitedParallelism(Int.MIN_VALUE) }
Dispatchers.Default.limitedParallelism(Int.MAX_VALUE)
}
}
@@ -0,0 +1,59 @@
/*
* Copyright 2016-2022 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

import kotlinx.atomicfu.*
import kotlinx.coroutines.*
import kotlinx.coroutines.exceptions.*
import kotlin.test.*

class LimitedParallelismConcurrentTest : TestBase() {

private val targetParallelism = 4
private val iterations = 100_000
private val parallelism = atomic(0)

private fun checkParallelism() {
val value = parallelism.incrementAndGet()
randomWait()
assertTrue { value <= targetParallelism }
parallelism.decrementAndGet()
}

@Test
fun testLimitedExecutor() = runMtTest {
val executor = newFixedThreadPoolContext(targetParallelism, "test")
val view = executor.limitedParallelism(targetParallelism)
doStress {
repeat(iterations) {
launch(view) {
checkParallelism()
}
}
}
executor.close()
}

private suspend inline fun doStress(crossinline block: suspend CoroutineScope.() -> Unit) {
repeat(stressTestMultiplier) {
coroutineScope {
block()
}
}
}

@Test
fun testTaskFairness() = runMtTest {
val executor = newSingleThreadContext("test")
val view = executor.limitedParallelism(1)
val view2 = executor.limitedParallelism(1)
val j1 = launch(view) {
while (true) {
yield()
}
}
val j2 = launch(view2) { j1.cancel() }
joinAll(j1, j2)
executor.close()
}
}
Expand Up @@ -9,30 +9,7 @@ import java.util.concurrent.*
import kotlin.coroutines.*
import kotlin.test.*

class LimitedParallelismTest : TestBase() {

@Test
fun testParallelismSpec() {
assertFailsWith<IllegalArgumentException> { Dispatchers.Default.limitedParallelism(0) }
assertFailsWith<IllegalArgumentException> { Dispatchers.Default.limitedParallelism(-1) }
assertFailsWith<IllegalArgumentException> { Dispatchers.Default.limitedParallelism(Int.MIN_VALUE) }
Dispatchers.Default.limitedParallelism(Int.MAX_VALUE)
}

@Test
fun testTaskFairness() = runTest {
val executor = newSingleThreadContext("test")
val view = executor.limitedParallelism(1)
val view2 = executor.limitedParallelism(1)
val j1 = launch(view) {
while (true) {
yield()
}
}
val j2 = launch(view2) { j1.cancel() }
joinAll(j1, j2)
executor.close()
}
class LimitedParallelismUnhandledExceptionTest : TestBase() {

@Test
fun testUnhandledException() = runTest {
Expand Down

0 comments on commit 8133c97

Please sign in to comment.