Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix limitedParallelism implementation on K/N #3226

Merged
merged 3 commits into from Mar 31, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -23,6 +23,9 @@ internal class LimitedDispatcher(

private val queue = LockFreeTaskQueue<Runnable>(singleConsumer = false)

// A separate object that we can synchronize on for K/N
private val workerAllocationLock = SynchronizedObject()

@ExperimentalCoroutinesApi
override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
parallelism.checkParallelism()
Expand Down Expand Up @@ -50,8 +53,7 @@ internal class LimitedDispatcher(
continue
}

@Suppress("CAST_NEVER_SUCCEEDS")
synchronized(this as SynchronizedObject) {
synchronized(workerAllocationLock) {
--runningWorkers
if (queue.size == 0) return
++runningWorkers
Expand Down Expand Up @@ -87,8 +89,7 @@ internal class LimitedDispatcher(
}

private fun tryAllocateWorker(): Boolean {
@Suppress("CAST_NEVER_SUCCEEDS")
synchronized(this as SynchronizedObject) {
synchronized(workerAllocationLock) {
if (runningWorkers >= parallelism) return false
++runningWorkers
return true
Expand Down
@@ -0,0 +1,30 @@
/*
* Copyright 2016-2022 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines

import kotlin.test.*

class LimitedParallelismSharedTest : TestBase() {
@Test
fun testTaskFairness() = runTest {
val view = Dispatchers.Default.limitedParallelism(1)
val view2 = Dispatchers.Default.limitedParallelism(1)
val j1 = launch(view) {
while (true) {
yield()
}
}
val j2 = launch(view2) { j1.cancel() }
joinAll(j1, j2)
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: move this test to the JS source-set, and move the testTaskFairness test from the JVM source-set to concurrent. This way, there will still be two versions of the test, but each one will be the most specific one for each target.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left this one as is to still have common coverage and added test with newSingleThreadContext to concurrent source set. Juggled with names a bit as well

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, but I think having this test looks confusing. Maybe add a comment?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, done


@Test
fun testParallelismSpec() {
assertFailsWith<IllegalArgumentException> { Dispatchers.Default.limitedParallelism(0) }
assertFailsWith<IllegalArgumentException> { Dispatchers.Default.limitedParallelism(-1) }
assertFailsWith<IllegalArgumentException> { Dispatchers.Default.limitedParallelism(Int.MIN_VALUE) }
Dispatchers.Default.limitedParallelism(Int.MAX_VALUE)
}
}
@@ -0,0 +1,44 @@
/*
* Copyright 2016-2022 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

import kotlinx.atomicfu.*
import kotlinx.coroutines.*
import kotlinx.coroutines.exceptions.*
import kotlin.test.*

class LimitedParallelismSharedStressTest : TestBase() {

private val targetParallelism = 4
private val iterations = 100_000
private val parallelism = atomic(0)

private fun checkParallelism() {
val value = parallelism.incrementAndGet()
randomWait()
assertTrue { value <= targetParallelism }
parallelism.decrementAndGet()
}

@Test
fun testLimitedExecutor() = runMtTest {
val executor = newFixedThreadPoolContext(targetParallelism, "test")
val view = executor.limitedParallelism(targetParallelism)
doStress {
repeat(iterations) {
launch(view) {
checkParallelism()
}
}
}
executor.close()
}

private suspend inline fun doStress(crossinline block: suspend CoroutineScope.() -> Unit) {
repeat(stressTestMultiplier) {
coroutineScope {
block()
}
}
}
}
8 changes: 0 additions & 8 deletions kotlinx-coroutines-core/jvm/test/LimitedParallelismTest.kt
Expand Up @@ -11,14 +11,6 @@ import kotlin.test.*

class LimitedParallelismTest : TestBase() {

@Test
fun testParallelismSpec() {
assertFailsWith<IllegalArgumentException> { Dispatchers.Default.limitedParallelism(0) }
assertFailsWith<IllegalArgumentException> { Dispatchers.Default.limitedParallelism(-1) }
assertFailsWith<IllegalArgumentException> { Dispatchers.Default.limitedParallelism(Int.MIN_VALUE) }
Dispatchers.Default.limitedParallelism(Int.MAX_VALUE)
}

@Test
fun testTaskFairness() = runTest {
val executor = newSingleThreadContext("test")
Expand Down