From 778bfbca69f15436d08d11664ed9b618cf2e6e91 Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Mon, 6 Sep 2021 19:19:28 +0300 Subject: [PATCH] Introduce CoroutineDispatcher.limitedParallelism --- .../api/kotlinx-coroutines-core.api | 2 + .../common/src/CoroutineDispatcher.kt | 34 ++++++++ .../common/src/EventLoop.common.kt | 6 +- .../common/src/MainCoroutineDispatcher.kt | 8 ++ .../common/src/internal/LimitedDispatcher.kt | 82 +++++++++++++++++++ .../js/src/JSDispatcher.kt | 5 ++ .../jvm/src/internal/MainDispatchers.kt | 3 + .../jvm/test/LimitedParallelismStressTest.kt | 56 +++++++++++++ .../jvm/test/LimitedParallelismTest.kt | 33 ++++++++ 9 files changed, 228 insertions(+), 1 deletion(-) create mode 100644 kotlinx-coroutines-core/common/src/internal/LimitedDispatcher.kt create mode 100644 kotlinx-coroutines-core/jvm/test/LimitedParallelismStressTest.kt create mode 100644 kotlinx-coroutines-core/jvm/test/LimitedParallelismTest.kt diff --git a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api index eb6b4184e7..37dc1ac1aa 100644 --- a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api +++ b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api @@ -156,6 +156,7 @@ public abstract class kotlinx/coroutines/CoroutineDispatcher : kotlin/coroutines public fun get (Lkotlin/coroutines/CoroutineContext$Key;)Lkotlin/coroutines/CoroutineContext$Element; public final fun interceptContinuation (Lkotlin/coroutines/Continuation;)Lkotlin/coroutines/Continuation; public fun isDispatchNeeded (Lkotlin/coroutines/CoroutineContext;)Z + public fun limitedParallelism (I)Lkotlinx/coroutines/CoroutineDispatcher; public fun minusKey (Lkotlin/coroutines/CoroutineContext$Key;)Lkotlin/coroutines/CoroutineContext; public final fun plus (Lkotlinx/coroutines/CoroutineDispatcher;)Lkotlinx/coroutines/CoroutineDispatcher; public final fun releaseInterceptedContinuation (Lkotlin/coroutines/Continuation;)V @@ -446,6 +447,7 @@ public class kotlinx/coroutines/JobSupport : kotlinx/coroutines/ChildJob, kotlin public abstract class kotlinx/coroutines/MainCoroutineDispatcher : kotlinx/coroutines/CoroutineDispatcher { public fun ()V public abstract fun getImmediate ()Lkotlinx/coroutines/MainCoroutineDispatcher; + public fun limitedParallelism (I)Lkotlinx/coroutines/CoroutineDispatcher; public fun toString ()Ljava/lang/String; protected final fun toStringInternalImpl ()Ljava/lang/String; } diff --git a/kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt b/kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt index d5613d4110..e74e9bedea 100644 --- a/kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt +++ b/kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt @@ -61,6 +61,40 @@ public abstract class CoroutineDispatcher : */ public open fun isDispatchNeeded(context: CoroutineContext): Boolean = true + /** + * Creates a view of the current dispatcher that limits the parallelism to the given [value][parallelism]. + * The resulting view uses the original dispatcher for execution, but with the guarantee that + * no more than [parallelism] coroutines are executed at the same time. + * + * This method does not impose restrictions on the number of views or the total sum of parallelism values, + * each view controls its own parallelism independently with the guarantee that the effective parallelism + * of all views cannot exceed the actual parallelism of the original dispatcher. + * + * ### Limitations + * + * The default implementation of `limitedParallelism` does not support direct dispatchers, + * such as execute the given runnable in place during [dispatch] calls. For direct dispatchers, + * it is recommended to override this method and provide a domain-specific implementation. + * + * ### Example of usage + * ``` + * private val backgroundDispatcher = newFixedThreadPoolContext(4, "App Background") + * // At most 2 threads will be processing images as it is really slow and CPU-intensive + * private val imageProcessingDispatcher = backgroundDispatcher.limitedParallelism(2) + * // At most 3 threads will be processing JSON to avoid image processing starvation + * private val imageProcessingDispatcher = backgroundDispatcher.limitedParallelism(3) + * // At most 1 thread will be doing IO + * private val fileWriterDispatcher = backgroundDispatcher.limitedParallelism(1) + * ``` + * Note how in this example, the application have the executor with 4 threads, but the total sum of all limits + * is 5. Yet at most 4 coroutines can be executed simultaneously as each view limits only its own parallelism. + */ + @ExperimentalCoroutinesApi + public open fun limitedParallelism(parallelism: Int): CoroutineDispatcher { + parallelism.checkParallelism() + return LimitedDispatcher(this, parallelism) + } + /** * Dispatches execution of a runnable [block] onto another thread in the given [context]. * This method should guarantee that the given [block] will be eventually invoked, diff --git a/kotlinx-coroutines-core/common/src/EventLoop.common.kt b/kotlinx-coroutines-core/common/src/EventLoop.common.kt index e6a57c927a..55a0cff674 100644 --- a/kotlinx-coroutines-core/common/src/EventLoop.common.kt +++ b/kotlinx-coroutines-core/common/src/EventLoop.common.kt @@ -115,6 +115,11 @@ internal abstract class EventLoop : CoroutineDispatcher() { } } + final override fun limitedParallelism(parallelism: Int): CoroutineDispatcher { + parallelism.checkParallelism() + return this + } + protected open fun shutdown() {} } @@ -525,4 +530,3 @@ internal expect fun nanoTime(): Long internal expect object DefaultExecutor { public fun enqueue(task: Runnable) } - diff --git a/kotlinx-coroutines-core/common/src/MainCoroutineDispatcher.kt b/kotlinx-coroutines-core/common/src/MainCoroutineDispatcher.kt index 602da6e0b5..a7065ccd15 100644 --- a/kotlinx-coroutines-core/common/src/MainCoroutineDispatcher.kt +++ b/kotlinx-coroutines-core/common/src/MainCoroutineDispatcher.kt @@ -4,6 +4,8 @@ package kotlinx.coroutines +import kotlinx.coroutines.internal.* + /** * Base class for special [CoroutineDispatcher] which is confined to application "Main" or "UI" thread * and used for any UI-based activities. Instance of `MainDispatcher` can be obtained by [Dispatchers.Main]. @@ -51,6 +53,12 @@ public abstract class MainCoroutineDispatcher : CoroutineDispatcher() { */ override fun toString(): String = toStringInternalImpl() ?: "$classSimpleName@$hexAddress" + override fun limitedParallelism(parallelism: Int): CoroutineDispatcher { + parallelism.checkParallelism() + // MainCoroutineDispatcher is single-threaded -- short-circuit any attempts to limit it + return this + } + /** * Internal method for more specific [toString] implementations. It returns non-null * string if this dispatcher is set in the platform as the main one. diff --git a/kotlinx-coroutines-core/common/src/internal/LimitedDispatcher.kt b/kotlinx-coroutines-core/common/src/internal/LimitedDispatcher.kt new file mode 100644 index 0000000000..38298507eb --- /dev/null +++ b/kotlinx-coroutines-core/common/src/internal/LimitedDispatcher.kt @@ -0,0 +1,82 @@ +/* + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.internal + +import kotlinx.coroutines.* +import kotlin.coroutines.* +import kotlin.jvm.* + +/** + * The result of .limitedParallelism(x) call, dispatcher + * that wraps the given dispatcher, but limits the parallelism level, while + * trying to emulate fairness. + */ +internal class LimitedDispatcher( + private val dispatcher: CoroutineDispatcher, + private val parallelism: Int +) : CoroutineDispatcher(), Runnable, Delay by (dispatcher as? Delay ?: DefaultDelay) { + + @Volatile + private var runningWorkers = 0 + + private val queue = LockFreeTaskQueue(singleConsumer = false) + + @InternalCoroutinesApi + override fun dispatchYield(context: CoroutineContext, block: Runnable) { + dispatcher.dispatchYield(context, block) + } + + override fun run() { + var fairnessCounter = 0 + while (true) { + val task = queue.removeFirstOrNull() + if (task != null) { + task.run() + // 16 is our out-of-thin-air constant to emulate fairness. Used in JS dispatchers as well + if (++fairnessCounter >= 16 && dispatcher.isDispatchNeeded(EmptyCoroutineContext)) { + // Do "yield" to let other views to execute their runnable as well + // Note that we do not decrement 'runningWorkers' as we still committed to do our part of work + dispatcher.dispatch(EmptyCoroutineContext, this) + return + } + continue + } + + @Suppress("CAST_NEVER_SUCCEEDS") + synchronized(this as SynchronizedObject) { + --runningWorkers + if (queue.size == 0) return + ++runningWorkers + fairnessCounter = 0 + } + } + } + + override fun dispatch(context: CoroutineContext, block: Runnable) { + // Add task to queue so running workers will be able to see that + queue.addLast(block) + if (runningWorkers >= parallelism) { + return + } + + /* + * Protect against race when the worker is finished + * right after our check + */ + @Suppress("CAST_NEVER_SUCCEEDS") + synchronized(this as SynchronizedObject) { + if (runningWorkers >= parallelism) return + ++runningWorkers + } + if (dispatcher.isDispatchNeeded(EmptyCoroutineContext)) { + dispatcher.dispatch(EmptyCoroutineContext, this) + } else { + run() + } + } +} + +// Save a few bytecode ops +internal fun Int.checkParallelism() = require(this >= 1) { "Expected positive parallelism level, but got $this" } diff --git a/kotlinx-coroutines-core/js/src/JSDispatcher.kt b/kotlinx-coroutines-core/js/src/JSDispatcher.kt index 6ad7d41b15..603005d5a4 100644 --- a/kotlinx-coroutines-core/js/src/JSDispatcher.kt +++ b/kotlinx-coroutines-core/js/src/JSDispatcher.kt @@ -31,6 +31,11 @@ internal sealed class SetTimeoutBasedDispatcher: CoroutineDispatcher(), Delay { abstract fun scheduleQueueProcessing() + override fun limitedParallelism(parallelism: Int): CoroutineDispatcher { + parallelism.checkParallelism() + return this + } + override fun dispatch(context: CoroutineContext, block: Runnable) { messageQueue.enqueue(block) } diff --git a/kotlinx-coroutines-core/jvm/src/internal/MainDispatchers.kt b/kotlinx-coroutines-core/jvm/src/internal/MainDispatchers.kt index 2d447413b8..2da633a6b6 100644 --- a/kotlinx-coroutines-core/jvm/src/internal/MainDispatchers.kt +++ b/kotlinx-coroutines-core/jvm/src/internal/MainDispatchers.kt @@ -93,6 +93,9 @@ private class MissingMainCoroutineDispatcher( override fun isDispatchNeeded(context: CoroutineContext): Boolean = missing() + override fun limitedParallelism(parallelism: Int): CoroutineDispatcher = + missing() + override suspend fun delay(time: Long) = missing() diff --git a/kotlinx-coroutines-core/jvm/test/LimitedParallelismStressTest.kt b/kotlinx-coroutines-core/jvm/test/LimitedParallelismStressTest.kt new file mode 100644 index 0000000000..eb1c693639 --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/LimitedParallelismStressTest.kt @@ -0,0 +1,56 @@ +/* + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines + +import org.junit.* +import org.junit.Test +import org.junit.runner.* +import org.junit.runners.* +import java.util.concurrent.atomic.* +import kotlin.test.* + +@RunWith(Parameterized::class) +class LimitedParallelismStressTest(private val targetParallelism: Int) : TestBase() { + + companion object { + @Parameterized.Parameters(name = "{0}") + @JvmStatic + fun params(): Collection> = listOf(1, 2, 3, 4).map { arrayOf(it) } + } + + @get:Rule + val executor = ExecutorRule(targetParallelism * 2) + private val iterations = 100_000 * stressTestMultiplier + + private val parallelism = AtomicInteger(0) + + private fun checkParallelism() { + val value = parallelism.incrementAndGet() + assertTrue { value <= targetParallelism } + parallelism.decrementAndGet() + } + + @Test + fun testLimited() = runTest { + val view = executor.limitedParallelism(targetParallelism) + repeat(iterations) { + launch(view) { + checkParallelism() + } + } + } + + @Test + fun testUnconfined() = runTest { + val view = Dispatchers.Unconfined.limitedParallelism(targetParallelism) + repeat(iterations) { + launch(executor) { + withContext(view) { + checkParallelism() + } + } + } + } +} diff --git a/kotlinx-coroutines-core/jvm/test/LimitedParallelismTest.kt b/kotlinx-coroutines-core/jvm/test/LimitedParallelismTest.kt new file mode 100644 index 0000000000..c58fa6ba3b --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/LimitedParallelismTest.kt @@ -0,0 +1,33 @@ +/* + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines + +import org.junit.* + +class LimitedParallelismTest : TestBase() { + + @Test + fun testParallelismSpec() { + assertFailsWith { Dispatchers.Default.limitedParallelism(0) } + assertFailsWith { Dispatchers.Default.limitedParallelism(-1) } + assertFailsWith { Dispatchers.Default.limitedParallelism(Int.MIN_VALUE) } + Dispatchers.Default.limitedParallelism(Int.MAX_VALUE) + } + + @Test + fun testTaskFairness() = runTest { + val executor = newSingleThreadContext("test") + val view = executor.limitedParallelism(1) + val view2 = executor.limitedParallelism(1) + val j1 = launch(view) { + while (true) { + yield() + } + } + val j2 = launch(view2) { j1.cancel() } + joinAll(j1, j2) + executor.close() + } +}