Skip to content

Commit

Permalink
Introduce CoroutineDispatcher.limitedParallelism
Browse files Browse the repository at this point in the history
  • Loading branch information
qwwdfsad committed Sep 6, 2021
1 parent 3f459d5 commit 778bfbc
Show file tree
Hide file tree
Showing 9 changed files with 228 additions and 1 deletion.
2 changes: 2 additions & 0 deletions kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
Expand Up @@ -156,6 +156,7 @@ public abstract class kotlinx/coroutines/CoroutineDispatcher : kotlin/coroutines
public fun get (Lkotlin/coroutines/CoroutineContext$Key;)Lkotlin/coroutines/CoroutineContext$Element;
public final fun interceptContinuation (Lkotlin/coroutines/Continuation;)Lkotlin/coroutines/Continuation;
public fun isDispatchNeeded (Lkotlin/coroutines/CoroutineContext;)Z
public fun limitedParallelism (I)Lkotlinx/coroutines/CoroutineDispatcher;
public fun minusKey (Lkotlin/coroutines/CoroutineContext$Key;)Lkotlin/coroutines/CoroutineContext;
public final fun plus (Lkotlinx/coroutines/CoroutineDispatcher;)Lkotlinx/coroutines/CoroutineDispatcher;
public final fun releaseInterceptedContinuation (Lkotlin/coroutines/Continuation;)V
Expand Down Expand Up @@ -446,6 +447,7 @@ public class kotlinx/coroutines/JobSupport : kotlinx/coroutines/ChildJob, kotlin
public abstract class kotlinx/coroutines/MainCoroutineDispatcher : kotlinx/coroutines/CoroutineDispatcher {
public fun <init> ()V
public abstract fun getImmediate ()Lkotlinx/coroutines/MainCoroutineDispatcher;
public fun limitedParallelism (I)Lkotlinx/coroutines/CoroutineDispatcher;
public fun toString ()Ljava/lang/String;
protected final fun toStringInternalImpl ()Ljava/lang/String;
}
Expand Down
34 changes: 34 additions & 0 deletions kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt
Expand Up @@ -61,6 +61,40 @@ public abstract class CoroutineDispatcher :
*/
public open fun isDispatchNeeded(context: CoroutineContext): Boolean = true

/**
* Creates a view of the current dispatcher that limits the parallelism to the given [value][parallelism].
* The resulting view uses the original dispatcher for execution, but with the guarantee that
* no more than [parallelism] coroutines are executed at the same time.
*
* This method does not impose restrictions on the number of views or the total sum of parallelism values,
* each view controls its own parallelism independently with the guarantee that the effective parallelism
* of all views cannot exceed the actual parallelism of the original dispatcher.
*
* ### Limitations
*
* The default implementation of `limitedParallelism` does not support direct dispatchers,
* such as execute the given runnable in place during [dispatch] calls. For direct dispatchers,
* it is recommended to override this method and provide a domain-specific implementation.
*
* ### Example of usage
* ```
* private val backgroundDispatcher = newFixedThreadPoolContext(4, "App Background")
* // At most 2 threads will be processing images as it is really slow and CPU-intensive
* private val imageProcessingDispatcher = backgroundDispatcher.limitedParallelism(2)
* // At most 3 threads will be processing JSON to avoid image processing starvation
* private val imageProcessingDispatcher = backgroundDispatcher.limitedParallelism(3)
* // At most 1 thread will be doing IO
* private val fileWriterDispatcher = backgroundDispatcher.limitedParallelism(1)
* ```
* Note how in this example, the application have the executor with 4 threads, but the total sum of all limits
* is 5. Yet at most 4 coroutines can be executed simultaneously as each view limits only its own parallelism.
*/
@ExperimentalCoroutinesApi
public open fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
parallelism.checkParallelism()
return LimitedDispatcher(this, parallelism)
}

/**
* Dispatches execution of a runnable [block] onto another thread in the given [context].
* This method should guarantee that the given [block] will be eventually invoked,
Expand Down
6 changes: 5 additions & 1 deletion kotlinx-coroutines-core/common/src/EventLoop.common.kt
Expand Up @@ -115,6 +115,11 @@ internal abstract class EventLoop : CoroutineDispatcher() {
}
}

final override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
parallelism.checkParallelism()
return this
}

protected open fun shutdown() {}
}

Expand Down Expand Up @@ -525,4 +530,3 @@ internal expect fun nanoTime(): Long
internal expect object DefaultExecutor {
public fun enqueue(task: Runnable)
}

8 changes: 8 additions & 0 deletions kotlinx-coroutines-core/common/src/MainCoroutineDispatcher.kt
Expand Up @@ -4,6 +4,8 @@

package kotlinx.coroutines

import kotlinx.coroutines.internal.*

/**
* Base class for special [CoroutineDispatcher] which is confined to application "Main" or "UI" thread
* and used for any UI-based activities. Instance of `MainDispatcher` can be obtained by [Dispatchers.Main].
Expand Down Expand Up @@ -51,6 +53,12 @@ public abstract class MainCoroutineDispatcher : CoroutineDispatcher() {
*/
override fun toString(): String = toStringInternalImpl() ?: "$classSimpleName@$hexAddress"

override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
parallelism.checkParallelism()
// MainCoroutineDispatcher is single-threaded -- short-circuit any attempts to limit it
return this
}

/**
* Internal method for more specific [toString] implementations. It returns non-null
* string if this dispatcher is set in the platform as the main one.
Expand Down
82 changes: 82 additions & 0 deletions kotlinx-coroutines-core/common/src/internal/LimitedDispatcher.kt
@@ -0,0 +1,82 @@
/*
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.internal

import kotlinx.coroutines.*
import kotlin.coroutines.*
import kotlin.jvm.*

/**
* The result of .limitedParallelism(x) call, dispatcher
* that wraps the given dispatcher, but limits the parallelism level, while
* trying to emulate fairness.
*/
internal class LimitedDispatcher(
private val dispatcher: CoroutineDispatcher,
private val parallelism: Int
) : CoroutineDispatcher(), Runnable, Delay by (dispatcher as? Delay ?: DefaultDelay) {

@Volatile
private var runningWorkers = 0

private val queue = LockFreeTaskQueue<Runnable>(singleConsumer = false)

@InternalCoroutinesApi
override fun dispatchYield(context: CoroutineContext, block: Runnable) {
dispatcher.dispatchYield(context, block)
}

override fun run() {
var fairnessCounter = 0
while (true) {
val task = queue.removeFirstOrNull()
if (task != null) {
task.run()
// 16 is our out-of-thin-air constant to emulate fairness. Used in JS dispatchers as well
if (++fairnessCounter >= 16 && dispatcher.isDispatchNeeded(EmptyCoroutineContext)) {
// Do "yield" to let other views to execute their runnable as well
// Note that we do not decrement 'runningWorkers' as we still committed to do our part of work
dispatcher.dispatch(EmptyCoroutineContext, this)
return
}
continue
}

@Suppress("CAST_NEVER_SUCCEEDS")
synchronized(this as SynchronizedObject) {
--runningWorkers
if (queue.size == 0) return
++runningWorkers
fairnessCounter = 0
}
}
}

override fun dispatch(context: CoroutineContext, block: Runnable) {
// Add task to queue so running workers will be able to see that
queue.addLast(block)
if (runningWorkers >= parallelism) {
return
}

/*
* Protect against race when the worker is finished
* right after our check
*/
@Suppress("CAST_NEVER_SUCCEEDS")
synchronized(this as SynchronizedObject) {
if (runningWorkers >= parallelism) return
++runningWorkers
}
if (dispatcher.isDispatchNeeded(EmptyCoroutineContext)) {
dispatcher.dispatch(EmptyCoroutineContext, this)
} else {
run()
}
}
}

// Save a few bytecode ops
internal fun Int.checkParallelism() = require(this >= 1) { "Expected positive parallelism level, but got $this" }
5 changes: 5 additions & 0 deletions kotlinx-coroutines-core/js/src/JSDispatcher.kt
Expand Up @@ -31,6 +31,11 @@ internal sealed class SetTimeoutBasedDispatcher: CoroutineDispatcher(), Delay {

abstract fun scheduleQueueProcessing()

override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
parallelism.checkParallelism()
return this
}

override fun dispatch(context: CoroutineContext, block: Runnable) {
messageQueue.enqueue(block)
}
Expand Down
3 changes: 3 additions & 0 deletions kotlinx-coroutines-core/jvm/src/internal/MainDispatchers.kt
Expand Up @@ -93,6 +93,9 @@ private class MissingMainCoroutineDispatcher(
override fun isDispatchNeeded(context: CoroutineContext): Boolean =
missing()

override fun limitedParallelism(parallelism: Int): CoroutineDispatcher =
missing()

override suspend fun delay(time: Long) =
missing()

Expand Down
56 changes: 56 additions & 0 deletions kotlinx-coroutines-core/jvm/test/LimitedParallelismStressTest.kt
@@ -0,0 +1,56 @@
/*
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines

import org.junit.*
import org.junit.Test
import org.junit.runner.*
import org.junit.runners.*
import java.util.concurrent.atomic.*
import kotlin.test.*

@RunWith(Parameterized::class)
class LimitedParallelismStressTest(private val targetParallelism: Int) : TestBase() {

companion object {
@Parameterized.Parameters(name = "{0}")
@JvmStatic
fun params(): Collection<Array<Any>> = listOf(1, 2, 3, 4).map { arrayOf(it) }
}

@get:Rule
val executor = ExecutorRule(targetParallelism * 2)
private val iterations = 100_000 * stressTestMultiplier

private val parallelism = AtomicInteger(0)

private fun checkParallelism() {
val value = parallelism.incrementAndGet()
assertTrue { value <= targetParallelism }
parallelism.decrementAndGet()
}

@Test
fun testLimited() = runTest {
val view = executor.limitedParallelism(targetParallelism)
repeat(iterations) {
launch(view) {
checkParallelism()
}
}
}

@Test
fun testUnconfined() = runTest {
val view = Dispatchers.Unconfined.limitedParallelism(targetParallelism)
repeat(iterations) {
launch(executor) {
withContext(view) {
checkParallelism()
}
}
}
}
}
33 changes: 33 additions & 0 deletions kotlinx-coroutines-core/jvm/test/LimitedParallelismTest.kt
@@ -0,0 +1,33 @@
/*
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines

import org.junit.*

class LimitedParallelismTest : TestBase() {

@Test
fun testParallelismSpec() {
assertFailsWith<IllegalArgumentException> { Dispatchers.Default.limitedParallelism(0) }
assertFailsWith<IllegalArgumentException> { Dispatchers.Default.limitedParallelism(-1) }
assertFailsWith<IllegalArgumentException> { Dispatchers.Default.limitedParallelism(Int.MIN_VALUE) }
Dispatchers.Default.limitedParallelism(Int.MAX_VALUE)
}

@Test
fun testTaskFairness() = runTest {
val executor = newSingleThreadContext("test")
val view = executor.limitedParallelism(1)
val view2 = executor.limitedParallelism(1)
val j1 = launch(view) {
while (true) {
yield()
}
}
val j2 = launch(view2) { j1.cancel() }
joinAll(j1, j2)
executor.close()
}
}

0 comments on commit 778bfbc

Please sign in to comment.