Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add optional name parameter to .limitedParallelism #4106

Merged
merged 2 commits into from Apr 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 4 additions & 2 deletions kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
Expand Up @@ -170,7 +170,9 @@ public abstract class kotlinx/coroutines/CoroutineDispatcher : kotlin/coroutines
public fun get (Lkotlin/coroutines/CoroutineContext$Key;)Lkotlin/coroutines/CoroutineContext$Element;
public final fun interceptContinuation (Lkotlin/coroutines/Continuation;)Lkotlin/coroutines/Continuation;
public fun isDispatchNeeded (Lkotlin/coroutines/CoroutineContext;)Z
public fun limitedParallelism (I)Lkotlinx/coroutines/CoroutineDispatcher;
public synthetic fun limitedParallelism (I)Lkotlinx/coroutines/CoroutineDispatcher;
public fun limitedParallelism (ILjava/lang/String;)Lkotlinx/coroutines/CoroutineDispatcher;
public static synthetic fun limitedParallelism$default (Lkotlinx/coroutines/CoroutineDispatcher;ILjava/lang/String;ILjava/lang/Object;)Lkotlinx/coroutines/CoroutineDispatcher;
public fun minusKey (Lkotlin/coroutines/CoroutineContext$Key;)Lkotlin/coroutines/CoroutineContext;
public final fun plus (Lkotlinx/coroutines/CoroutineDispatcher;)Lkotlinx/coroutines/CoroutineDispatcher;
public final fun releaseInterceptedContinuation (Lkotlin/coroutines/Continuation;)V
Expand Down Expand Up @@ -502,7 +504,7 @@ public class kotlinx/coroutines/JobSupport : kotlinx/coroutines/ChildJob, kotlin
public abstract class kotlinx/coroutines/MainCoroutineDispatcher : kotlinx/coroutines/CoroutineDispatcher {
public fun <init> ()V
public abstract fun getImmediate ()Lkotlinx/coroutines/MainCoroutineDispatcher;
public fun limitedParallelism (I)Lkotlinx/coroutines/CoroutineDispatcher;
qwwdfsad marked this conversation as resolved.
Show resolved Hide resolved
public fun limitedParallelism (ILjava/lang/String;)Lkotlinx/coroutines/CoroutineDispatcher;
public fun toString ()Ljava/lang/String;
protected final fun toStringInternalImpl ()Ljava/lang/String;
}
Expand Down
3 changes: 2 additions & 1 deletion kotlinx-coroutines-core/api/kotlinx-coroutines-core.klib.api
Expand Up @@ -59,14 +59,15 @@ abstract class kotlinx.coroutines/CoroutineDispatcher : kotlin.coroutines/Abstra
open fun dispatchYield(kotlin.coroutines/CoroutineContext, kotlinx.coroutines/Runnable) // kotlinx.coroutines/CoroutineDispatcher.dispatchYield|dispatchYield(kotlin.coroutines.CoroutineContext;kotlinx.coroutines.Runnable){}[0]
open fun isDispatchNeeded(kotlin.coroutines/CoroutineContext): kotlin/Boolean // kotlinx.coroutines/CoroutineDispatcher.isDispatchNeeded|isDispatchNeeded(kotlin.coroutines.CoroutineContext){}[0]
open fun limitedParallelism(kotlin/Int): kotlinx.coroutines/CoroutineDispatcher // kotlinx.coroutines/CoroutineDispatcher.limitedParallelism|limitedParallelism(kotlin.Int){}[0]
open fun limitedParallelism(kotlin/Int, kotlin/String? =...): kotlinx.coroutines/CoroutineDispatcher // kotlinx.coroutines/CoroutineDispatcher.limitedParallelism|limitedParallelism(kotlin.Int;kotlin.String?){}[0]
open fun toString(): kotlin/String // kotlinx.coroutines/CoroutineDispatcher.toString|toString(){}[0]
}
abstract class kotlinx.coroutines/MainCoroutineDispatcher : kotlinx.coroutines/CoroutineDispatcher { // kotlinx.coroutines/MainCoroutineDispatcher|null[0]
abstract val immediate // kotlinx.coroutines/MainCoroutineDispatcher.immediate|{}immediate[0]
abstract fun <get-immediate>(): kotlinx.coroutines/MainCoroutineDispatcher // kotlinx.coroutines/MainCoroutineDispatcher.immediate.<get-immediate>|<get-immediate>(){}[0]
constructor <init>() // kotlinx.coroutines/MainCoroutineDispatcher.<init>|<init>(){}[0]
final fun toStringInternalImpl(): kotlin/String? // kotlinx.coroutines/MainCoroutineDispatcher.toStringInternalImpl|toStringInternalImpl(){}[0]
open fun limitedParallelism(kotlin/Int): kotlinx.coroutines/CoroutineDispatcher // kotlinx.coroutines/MainCoroutineDispatcher.limitedParallelism|limitedParallelism(kotlin.Int){}[0]
open fun limitedParallelism(kotlin/Int, kotlin/String?): kotlinx.coroutines/CoroutineDispatcher // kotlinx.coroutines/MainCoroutineDispatcher.limitedParallelism|limitedParallelism(kotlin.Int;kotlin.String?){}[0]
open fun toString(): kotlin/String // kotlinx.coroutines/MainCoroutineDispatcher.toString|toString(){}[0]
}
abstract fun interface <#A: in kotlin/Any?> kotlinx.coroutines.flow/FlowCollector { // kotlinx.coroutines.flow/FlowCollector|null[0]
Expand Down
21 changes: 15 additions & 6 deletions kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt
Expand Up @@ -83,11 +83,11 @@ public abstract class CoroutineDispatcher :
* // Background dispatcher for the application
* val dispatcher = newFixedThreadPoolContext(4, "App Background")
* // At most 2 threads will be processing images as it is really slow and CPU-intensive
* val imageProcessingDispatcher = dispatcher.limitedParallelism(2)
* val imageProcessingDispatcher = dispatcher.limitedParallelism(2, "Image processor")
* // At most 3 threads will be processing JSON to avoid image processing starvation
* val jsonProcessingDispatcher = dispatcher.limitedParallelism(3)
* val jsonProcessingDispatcher = dispatcher.limitedParallelism(3, "Json processor")
* // At most 1 thread will be doing IO
* val fileWriterDispatcher = dispatcher.limitedParallelism(1)
* val fileWriterDispatcher = dispatcher.limitedParallelism(1, "File writer")
* ```
* Note how in this example the application has an executor with 4 threads, but the total sum of all limits
* is 6. Still, at most 4 coroutines can be executed simultaneously as each view limits only its own parallelism,
Expand All @@ -105,7 +105,7 @@ public abstract class CoroutineDispatcher :
* is established between them:
*
* ```
* val confined = Dispatchers.Default.limitedParallelism(1)
* val confined = Dispatchers.Default.limitedParallelism(1, "incrementDispatcher")
* var counter = 0
*
* // Invoked from arbitrary coroutines
Expand Down Expand Up @@ -135,15 +135,24 @@ public abstract class CoroutineDispatcher :
* Implementations of this method are allowed to return `this` if the current dispatcher already satisfies the parallelism requirement.
* For example, `Dispatchers.Main.limitedParallelism(1)` returns `Dispatchers.Main`, because the main dispatcher is already single-threaded.
*
* @param name optional name for the resulting dispatcher string representation if a new dispatcher was created.
* Implementations are free to ignore this parameter.
* @throws IllegalArgumentException if the given [parallelism] is non-positive
* @throws UnsupportedOperationException if the current dispatcher does not support limited parallelism views
*/
@ExperimentalCoroutinesApi
public open fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
public open fun limitedParallelism(parallelism: Int, name: String? = null): CoroutineDispatcher {
parallelism.checkParallelism()
return LimitedDispatcher(this, parallelism)
return LimitedDispatcher(this, parallelism, name)
}

// Was experimental since 1.6.0, deprecated since 1.8.x
@Deprecated("Deprecated for good. Override 'limitedParallelism(parallelism: Int, name: String?)' instead",
level = DeprecationLevel.HIDDEN,
replaceWith = ReplaceWith("limitedParallelism(parallelism, null)")
)
public open fun limitedParallelism(parallelism: Int): CoroutineDispatcher = limitedParallelism(parallelism, null)

/**
* Requests execution of a runnable [block].
* The dispatcher guarantees that [block] will eventually execute, typically by dispatching it to a thread pool,
Expand Down
4 changes: 2 additions & 2 deletions kotlinx-coroutines-core/common/src/EventLoop.common.kt
Expand Up @@ -111,9 +111,9 @@ internal abstract class EventLoop : CoroutineDispatcher() {
}
}

final override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
final override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher {
parallelism.checkParallelism()
return this
return namedOrThis(name) // Single-threaded, short-circuit
}

open fun shutdown() {}
Expand Down
4 changes: 2 additions & 2 deletions kotlinx-coroutines-core/common/src/MainCoroutineDispatcher.kt
Expand Up @@ -49,10 +49,10 @@ public abstract class MainCoroutineDispatcher : CoroutineDispatcher() {
*/
override fun toString(): String = toStringInternalImpl() ?: "$classSimpleName@$hexAddress"

override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher {
parallelism.checkParallelism()
// MainCoroutineDispatcher is single-threaded -- short-circuit any attempts to limit it
return this
return namedOrThis(name)
}

/**
Expand Down
2 changes: 1 addition & 1 deletion kotlinx-coroutines-core/common/src/Unconfined.kt
Expand Up @@ -9,7 +9,7 @@ import kotlin.jvm.*
internal object Unconfined : CoroutineDispatcher() {

@ExperimentalCoroutinesApi
override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher {
throw UnsupportedOperationException("limitedParallelism is not supported for Dispatchers.Unconfined")
}

Expand Down
16 changes: 11 additions & 5 deletions kotlinx-coroutines-core/common/src/internal/LimitedDispatcher.kt
Expand Up @@ -21,7 +21,8 @@ import kotlin.coroutines.*
*/
internal class LimitedDispatcher(
private val dispatcher: CoroutineDispatcher,
private val parallelism: Int
private val parallelism: Int,
private val name: String?
) : CoroutineDispatcher(), Delay by (dispatcher as? Delay ?: DefaultDelay) {

// Atomic is necessary here for the sake of K/N memory ordering,
Expand All @@ -34,10 +35,10 @@ internal class LimitedDispatcher(
private val workerAllocationLock = SynchronizedObject()

@ExperimentalCoroutinesApi
override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher {
parallelism.checkParallelism()
if (parallelism >= this.parallelism) return this
return super.limitedParallelism(parallelism)
if (parallelism >= this.parallelism) return namedOrThis(name)
return super.limitedParallelism(parallelism, name)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Minor) Since checkParallelism was already called, we could avoid calling super here, instead directly constructing a LimitedDispatcher.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In things like that, I prefer leaving the double-check, but ensuring that the code is reused and has less entry point

}

override fun dispatch(context: CoroutineContext, block: Runnable) {
Expand Down Expand Up @@ -95,7 +96,7 @@ internal class LimitedDispatcher(
}
}

override fun toString() = "$dispatcher.limitedParallelism($parallelism)"
override fun toString() = name ?: "$dispatcher.limitedParallelism($parallelism)"

/**
* A worker that polls the queue and runs tasks until there are no more of them.
Expand Down Expand Up @@ -128,3 +129,8 @@ internal class LimitedDispatcher(
}

internal fun Int.checkParallelism() = require(this >= 1) { "Expected positive parallelism level, but got $this" }

internal fun CoroutineDispatcher.namedOrThis(name: String?): CoroutineDispatcher {
if (name != null) return NamedDispatcher(this, name)
return this
}
25 changes: 25 additions & 0 deletions kotlinx-coroutines-core/common/src/internal/NamedDispatcher.kt
@@ -0,0 +1,25 @@
package kotlinx.coroutines.internal

import kotlinx.coroutines.*
import kotlinx.coroutines.DefaultDelay
import kotlin.coroutines.*

/**
* Wrapping dispatcher that has a nice user-supplied `toString()` representation
*/
internal class NamedDispatcher(
private val dispatcher: CoroutineDispatcher,
private val name: String
) : CoroutineDispatcher(), Delay by (dispatcher as? Delay ?: DefaultDelay) {

override fun isDispatchNeeded(context: CoroutineContext): Boolean = dispatcher.isDispatchNeeded(context)

override fun dispatch(context: CoroutineContext, block: Runnable) = dispatcher.dispatch(context, block)

@InternalCoroutinesApi
override fun dispatchYield(context: CoroutineContext, block: Runnable) = dispatcher.dispatchYield(context, block)

override fun toString(): String {
return name
}
}
Expand Up @@ -30,9 +30,9 @@ internal abstract class SetTimeoutBasedDispatcher: CoroutineDispatcher(), Delay

abstract fun scheduleQueueProcessing()

override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher {
parallelism.checkParallelism()
return this
return namedOrThis(name)
}

override fun dispatch(context: CoroutineContext, block: Runnable) {
Expand Down
Expand Up @@ -91,7 +91,7 @@ private class MissingMainCoroutineDispatcher(
override fun isDispatchNeeded(context: CoroutineContext): Boolean =
missing()

override fun limitedParallelism(parallelism: Int): CoroutineDispatcher =
override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher =
missing()

override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle =
Expand Down
20 changes: 12 additions & 8 deletions kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt
Expand Up @@ -12,10 +12,12 @@ internal object DefaultScheduler : SchedulerCoroutineDispatcher(
) {

@ExperimentalCoroutinesApi
override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher {
parallelism.checkParallelism()
if (parallelism >= CORE_POOL_SIZE) return this
return super.limitedParallelism(parallelism)
if (parallelism >= CORE_POOL_SIZE) {
return namedOrThis(name)
}
return super.limitedParallelism(parallelism, name)
}

// Shuts down the dispatcher, used only by Dispatchers.shutdown()
Expand Down Expand Up @@ -44,10 +46,12 @@ private object UnlimitedIoScheduler : CoroutineDispatcher() {
}

@ExperimentalCoroutinesApi
override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher {
parallelism.checkParallelism()
if (parallelism >= MAX_POOL_SIZE) return this
return super.limitedParallelism(parallelism)
if (parallelism >= MAX_POOL_SIZE) {
return namedOrThis(name)
}
return super.limitedParallelism(parallelism, name)
}

// This name only leaks to user code as part of .limitedParallelism machinery
Expand All @@ -72,9 +76,9 @@ internal object DefaultIoScheduler : ExecutorCoroutineDispatcher(), Executor {
override fun execute(command: java.lang.Runnable) = dispatch(EmptyCoroutineContext, command)

@ExperimentalCoroutinesApi
override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher {
// See documentation to Dispatchers.IO for the rationale
return UnlimitedIoScheduler.limitedParallelism(parallelism)
return UnlimitedIoScheduler.limitedParallelism(parallelism, name)
}

override fun dispatch(context: CoroutineContext, block: Runnable) {
Expand Down
24 changes: 24 additions & 0 deletions kotlinx-coroutines-core/jvm/test/DispatchersToStringTest.kt
@@ -1,3 +1,5 @@
@file:OptIn(ExperimentalStdlibApi::class)

package kotlinx.coroutines

import kotlin.test.*
Expand All @@ -18,5 +20,27 @@ class DispatchersToStringTest {
assertEquals("Dispatchers.Default.limitedParallelism(2)", Dispatchers.Default.limitedParallelism(2).toString())
// Not overridden at all, limited parallelism returns `this`
assertEquals("DefaultExecutor", (DefaultDelay as CoroutineDispatcher).limitedParallelism(42).toString())

assertEquals("filesDispatcher", Dispatchers.IO.limitedParallelism(1, "filesDispatcher").toString())
assertEquals("json", Dispatchers.Default.limitedParallelism(2, "json").toString())
assertEquals("\uD80C\uDE11", (DefaultDelay as CoroutineDispatcher).limitedParallelism(42, "\uD80C\uDE11").toString())
assertEquals("DefaultExecutor", (DefaultDelay as CoroutineDispatcher).limitedParallelism(42).toString())

val limitedNamed = Dispatchers.IO.limitedParallelism(10, "limited")
assertEquals("limited.limitedParallelism(2)", limitedNamed.limitedParallelism(2).toString())
assertEquals("2", limitedNamed.limitedParallelism(2, "2").toString())
// We asked for too many threads with no name, this was returned
assertEquals("limited", limitedNamed.limitedParallelism(12).toString())
assertEquals("12", limitedNamed.limitedParallelism(12, "12").toString())

runBlocking {
val d = coroutineContext[CoroutineDispatcher]!!
assertContains(d.toString(), "BlockingEventLoop")
val limited = d.limitedParallelism(2)
assertContains(limited.toString(), "BlockingEventLoop")
assertFalse(limited.toString().contains("limitedParallelism"))
val named = d.limitedParallelism(2, "Named")
assertEquals("Named", named.toString())
}
}
}
4 changes: 2 additions & 2 deletions kotlinx-coroutines-core/native/src/Dispatchers.kt
Expand Up @@ -28,9 +28,9 @@ internal object DefaultIoScheduler : CoroutineDispatcher() {
private val io = unlimitedPool.limitedParallelism(64) // Default JVM size

@ExperimentalCoroutinesApi
override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher {
// See documentation to Dispatchers.IO for the rationale
return unlimitedPool.limitedParallelism(parallelism)
return unlimitedPool.limitedParallelism(parallelism, name)
}

override fun dispatch(context: CoroutineContext, block: Runnable) {
Expand Down
1 change: 1 addition & 0 deletions test-utils/common/src/TestBase.common.kt
Expand Up @@ -252,6 +252,7 @@ public class TestRuntimeException(message: String? = null, private val data: Any
public class RecoverableTestException(message: String? = null) : RuntimeException(message)
public class RecoverableTestCancellationException(message: String? = null) : CancellationException(message)

// Erases identity and equality checks for tests
public fun wrapperDispatcher(context: CoroutineContext): CoroutineContext {
val dispatcher = context[ContinuationInterceptor] as CoroutineDispatcher
return object : CoroutineDispatcher() {
Expand Down