diff --git a/benchmarks/src/jmh/kotlin/benchmarks/ParametrizedDispatcherBase.kt b/benchmarks/src/jmh/kotlin/benchmarks/ParametrizedDispatcherBase.kt index 9948a371bc..80e15a1b4f 100644 --- a/benchmarks/src/jmh/kotlin/benchmarks/ParametrizedDispatcherBase.kt +++ b/benchmarks/src/jmh/kotlin/benchmarks/ParametrizedDispatcherBase.kt @@ -30,7 +30,7 @@ abstract class ParametrizedDispatcherBase : CoroutineScope { coroutineContext = when { dispatcher == "fjp" -> ForkJoinPool.commonPool().asCoroutineDispatcher() dispatcher == "scheduler" -> { - ExperimentalCoroutineDispatcher(CORES_COUNT).also { closeable = it } + Dispatchers.Default } dispatcher.startsWith("ftp") -> { newFixedThreadPoolContext(dispatcher.substring(4).toInt(), dispatcher).also { closeable = it } diff --git a/benchmarks/src/jmh/kotlin/benchmarks/SemaphoreBenchmark.kt b/benchmarks/src/jmh/kotlin/benchmarks/SemaphoreBenchmark.kt index 40ddc8ec36..9e1bfc43bb 100644 --- a/benchmarks/src/jmh/kotlin/benchmarks/SemaphoreBenchmark.kt +++ b/benchmarks/src/jmh/kotlin/benchmarks/SemaphoreBenchmark.kt @@ -6,13 +6,10 @@ package benchmarks import benchmarks.common.* import kotlinx.coroutines.* -import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.scheduling.ExperimentalCoroutineDispatcher -import kotlinx.coroutines.sync.Semaphore -import kotlinx.coroutines.sync.withPermit +import kotlinx.coroutines.channels.* +import kotlinx.coroutines.sync.* import org.openjdk.jmh.annotations.* -import java.util.concurrent.ForkJoinPool -import java.util.concurrent.TimeUnit +import java.util.concurrent.* @Warmup(iterations = 3, time = 500, timeUnit = TimeUnit.MICROSECONDS) @Measurement(iterations = 10, time = 500, timeUnit = TimeUnit.MICROSECONDS) @@ -84,7 +81,7 @@ open class SemaphoreBenchmark { enum class SemaphoreBenchDispatcherCreator(val create: (parallelism: Int) -> CoroutineDispatcher) { FORK_JOIN({ parallelism -> ForkJoinPool(parallelism).asCoroutineDispatcher() }), - EXPERIMENTAL({ parallelism -> ExperimentalCoroutineDispatcher(corePoolSize = parallelism, maxPoolSize = parallelism) }) + EXPERIMENTAL({ parallelism -> Dispatchers.Default }) // TODO doesn't take parallelism into account } private const val WORK_INSIDE = 80 diff --git a/benchmarks/src/jmh/kotlin/benchmarks/scheduler/actors/PingPongWithBlockingContext.kt b/benchmarks/src/jmh/kotlin/benchmarks/scheduler/actors/PingPongWithBlockingContext.kt index a6f0a473c1..d874f3bbe1 100644 --- a/benchmarks/src/jmh/kotlin/benchmarks/scheduler/actors/PingPongWithBlockingContext.kt +++ b/benchmarks/src/jmh/kotlin/benchmarks/scheduler/actors/PingPongWithBlockingContext.kt @@ -27,10 +27,8 @@ import kotlin.coroutines.* @State(Scope.Benchmark) open class PingPongWithBlockingContext { - @UseExperimental(InternalCoroutinesApi::class) - private val experimental = ExperimentalCoroutineDispatcher(8) - @UseExperimental(InternalCoroutinesApi::class) - private val blocking = experimental.blocking(8) + private val experimental = Dispatchers.Default + private val blocking = Dispatchers.IO.limitedParallelism(8) private val threadPool = newFixedThreadPoolContext(8, "PongCtx") @TearDown diff --git a/integration/kotlinx-coroutines-play-services/test/TaskTest.kt b/integration/kotlinx-coroutines-play-services/test/TaskTest.kt index b125192e93..34fbe23b55 100644 --- a/integration/kotlinx-coroutines-play-services/test/TaskTest.kt +++ b/integration/kotlinx-coroutines-play-services/test/TaskTest.kt @@ -45,8 +45,8 @@ class TaskTest : TestBase() { } @Test - fun testCancelledAsTask() { - val deferred = GlobalScope.async { + fun testCancelledAsTask() = runTest { + val deferred = async(Dispatchers.Default) { delay(100) }.apply { cancel() } @@ -60,8 +60,8 @@ class TaskTest : TestBase() { } @Test - fun testThrowingAsTask() { - val deferred = GlobalScope.async { + fun testThrowingAsTask() = runTest({ e -> e is TestException }) { + val deferred = async(Dispatchers.Default) { throw TestException("Fail") } diff --git a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api index fcd51ae2eb..495d11a804 100644 --- a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api +++ b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api @@ -156,6 +156,7 @@ public abstract class kotlinx/coroutines/CoroutineDispatcher : kotlin/coroutines public fun get (Lkotlin/coroutines/CoroutineContext$Key;)Lkotlin/coroutines/CoroutineContext$Element; public final fun interceptContinuation (Lkotlin/coroutines/Continuation;)Lkotlin/coroutines/Continuation; public fun isDispatchNeeded (Lkotlin/coroutines/CoroutineContext;)Z + public fun limitedParallelism (I)Lkotlinx/coroutines/CoroutineDispatcher; public fun minusKey (Lkotlin/coroutines/CoroutineContext$Key;)Lkotlin/coroutines/CoroutineContext; public final fun plus (Lkotlinx/coroutines/CoroutineDispatcher;)Lkotlinx/coroutines/CoroutineDispatcher; public final fun releaseInterceptedContinuation (Lkotlin/coroutines/Continuation;)V @@ -447,6 +448,7 @@ public class kotlinx/coroutines/JobSupport : kotlinx/coroutines/ChildJob, kotlin public abstract class kotlinx/coroutines/MainCoroutineDispatcher : kotlinx/coroutines/CoroutineDispatcher { public fun ()V public abstract fun getImmediate ()Lkotlinx/coroutines/MainCoroutineDispatcher; + public fun limitedParallelism (I)Lkotlinx/coroutines/CoroutineDispatcher; public fun toString ()Ljava/lang/String; protected final fun toStringInternalImpl ()Ljava/lang/String; } diff --git a/kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt b/kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt index d5613d4110..c91e944b91 100644 --- a/kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt +++ b/kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt @@ -61,6 +61,45 @@ public abstract class CoroutineDispatcher : */ public open fun isDispatchNeeded(context: CoroutineContext): Boolean = true + /** + * Creates a view of the current dispatcher that limits the parallelism to the given [value][parallelism]. + * The resulting view uses the original dispatcher for execution, but with the guarantee that + * no more than [parallelism] coroutines are executed at the same time. + * + * This method does not impose restrictions on the number of views or the total sum of parallelism values, + * each view controls its own parallelism independently with the guarantee that the effective parallelism + * of all views cannot exceed the actual parallelism of the original dispatcher. + * + * ### Limitations + * + * The default implementation of `limitedParallelism` does not support direct dispatchers, + * such as executing the given runnable in place during [dispatch] calls. + * Any dispatcher that may return `false` from [isDispatchNeeded] is considered direct. + * For direct dispatchers, it is recommended to override this method + * and provide a domain-specific implementation or to throw an [UnsupportedOperationException]. + * + * ### Example of usage + * ``` + * private val backgroundDispatcher = newFixedThreadPoolContext(4, "App Background") + * // At most 2 threads will be processing images as it is really slow and CPU-intensive + * private val imageProcessingDispatcher = backgroundDispatcher.limitedParallelism(2) + * // At most 3 threads will be processing JSON to avoid image processing starvation + * private val imageProcessingDispatcher = backgroundDispatcher.limitedParallelism(3) + * // At most 1 thread will be doing IO + * private val fileWriterDispatcher = backgroundDispatcher.limitedParallelism(1) + * ``` + * is 6. Yet at most 4 coroutines can be executed simultaneously as each view limits only its own parallelism. + * + * Note that this example was structured in such a way that it illustrates the parallelism guarantees. + * In practice, it is usually better to use [Dispatchers.IO] or [Dispatchers.Default] instead of creating a + * `backgroundDispatcher`. It is both possible and advised to call `limitedParallelism` on them. + */ + @ExperimentalCoroutinesApi + public open fun limitedParallelism(parallelism: Int): CoroutineDispatcher { + parallelism.checkParallelism() + return LimitedDispatcher(this, parallelism) + } + /** * Dispatches execution of a runnable [block] onto another thread in the given [context]. * This method should guarantee that the given [block] will be eventually invoked, diff --git a/kotlinx-coroutines-core/common/src/EventLoop.common.kt b/kotlinx-coroutines-core/common/src/EventLoop.common.kt index e2a1ffd69f..c4d4e272d7 100644 --- a/kotlinx-coroutines-core/common/src/EventLoop.common.kt +++ b/kotlinx-coroutines-core/common/src/EventLoop.common.kt @@ -115,6 +115,11 @@ internal abstract class EventLoop : CoroutineDispatcher() { } } + final override fun limitedParallelism(parallelism: Int): CoroutineDispatcher { + parallelism.checkParallelism() + return this + } + open fun shutdown() {} } @@ -525,4 +530,3 @@ internal expect fun nanoTime(): Long internal expect object DefaultExecutor { public fun enqueue(task: Runnable) } - diff --git a/kotlinx-coroutines-core/common/src/MainCoroutineDispatcher.kt b/kotlinx-coroutines-core/common/src/MainCoroutineDispatcher.kt index 602da6e0b5..a7065ccd15 100644 --- a/kotlinx-coroutines-core/common/src/MainCoroutineDispatcher.kt +++ b/kotlinx-coroutines-core/common/src/MainCoroutineDispatcher.kt @@ -4,6 +4,8 @@ package kotlinx.coroutines +import kotlinx.coroutines.internal.* + /** * Base class for special [CoroutineDispatcher] which is confined to application "Main" or "UI" thread * and used for any UI-based activities. Instance of `MainDispatcher` can be obtained by [Dispatchers.Main]. @@ -51,6 +53,12 @@ public abstract class MainCoroutineDispatcher : CoroutineDispatcher() { */ override fun toString(): String = toStringInternalImpl() ?: "$classSimpleName@$hexAddress" + override fun limitedParallelism(parallelism: Int): CoroutineDispatcher { + parallelism.checkParallelism() + // MainCoroutineDispatcher is single-threaded -- short-circuit any attempts to limit it + return this + } + /** * Internal method for more specific [toString] implementations. It returns non-null * string if this dispatcher is set in the platform as the main one. diff --git a/kotlinx-coroutines-core/common/src/Unconfined.kt b/kotlinx-coroutines-core/common/src/Unconfined.kt index df0087100a..24a401f702 100644 --- a/kotlinx-coroutines-core/common/src/Unconfined.kt +++ b/kotlinx-coroutines-core/common/src/Unconfined.kt @@ -11,6 +11,12 @@ import kotlin.jvm.* * A coroutine dispatcher that is not confined to any specific thread. */ internal object Unconfined : CoroutineDispatcher() { + + @ExperimentalCoroutinesApi + override fun limitedParallelism(parallelism: Int): CoroutineDispatcher { + throw UnsupportedOperationException("limitedParallelism is not supported for Dispatchers.Unconfined") + } + override fun isDispatchNeeded(context: CoroutineContext): Boolean = false override fun dispatch(context: CoroutineContext, block: Runnable) { diff --git a/kotlinx-coroutines-core/common/src/internal/LimitedDispatcher.kt b/kotlinx-coroutines-core/common/src/internal/LimitedDispatcher.kt new file mode 100644 index 0000000000..892375b89f --- /dev/null +++ b/kotlinx-coroutines-core/common/src/internal/LimitedDispatcher.kt @@ -0,0 +1,105 @@ +/* + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.internal + +import kotlinx.coroutines.* +import kotlin.coroutines.* +import kotlin.jvm.* + +/** + * The result of .limitedParallelism(x) call, a dispatcher + * that wraps the given dispatcher, but limits the parallelism level, while + * trying to emulate fairness. + */ +internal class LimitedDispatcher( + private val dispatcher: CoroutineDispatcher, + private val parallelism: Int +) : CoroutineDispatcher(), Runnable, Delay by (dispatcher as? Delay ?: DefaultDelay) { + + @Volatile + private var runningWorkers = 0 + + private val queue = LockFreeTaskQueue(singleConsumer = false) + + @ExperimentalCoroutinesApi + override fun limitedParallelism(parallelism: Int): CoroutineDispatcher { + parallelism.checkParallelism() + if (parallelism >= this.parallelism) return this + return super.limitedParallelism(parallelism) + } + + override fun run() { + var fairnessCounter = 0 + while (true) { + val task = queue.removeFirstOrNull() + if (task != null) { + try { + task.run() + } catch (e: Throwable) { + handleCoroutineException(EmptyCoroutineContext, e) + } + // 16 is our out-of-thin-air constant to emulate fairness. Used in JS dispatchers as well + if (++fairnessCounter >= 16 && dispatcher.isDispatchNeeded(this)) { + // Do "yield" to let other views to execute their runnable as well + // Note that we do not decrement 'runningWorkers' as we still committed to do our part of work + dispatcher.dispatch(this, this) + return + } + continue + } + + @Suppress("CAST_NEVER_SUCCEEDS") + synchronized(this as SynchronizedObject) { + --runningWorkers + if (queue.size == 0) return + ++runningWorkers + fairnessCounter = 0 + } + } + } + + override fun dispatch(context: CoroutineContext, block: Runnable) { + dispatchInternal(block) { + dispatcher.dispatch(this, this) + } + } + + @InternalCoroutinesApi + override fun dispatchYield(context: CoroutineContext, block: Runnable) { + dispatchInternal(block) { + dispatcher.dispatchYield(this, this) + } + } + + private inline fun dispatchInternal(block: Runnable, dispatch: () -> Unit) { + // Add task to queue so running workers will be able to see that + if (addAndTryDispatching(block)) return + /* + * Protect against the race when the number of workers is enough, + * but one (because of synchronized serialization) attempts to complete, + * and we just observed the number of running workers smaller than the actual + * number (hit right between `--runningWorkers` and `++runningWorkers` in `run()`) + */ + if (!tryAllocateWorker()) return + dispatch() + } + + private fun tryAllocateWorker(): Boolean { + @Suppress("CAST_NEVER_SUCCEEDS") + synchronized(this as SynchronizedObject) { + if (runningWorkers >= parallelism) return false + ++runningWorkers + return true + } + } + + private fun addAndTryDispatching(block: Runnable): Boolean { + queue.addLast(block) + return runningWorkers >= parallelism + } +} + +// Save a few bytecode ops +internal fun Int.checkParallelism() = require(this >= 1) { "Expected positive parallelism level, but got $this" } diff --git a/kotlinx-coroutines-core/js/src/JSDispatcher.kt b/kotlinx-coroutines-core/js/src/JSDispatcher.kt index 6ad7d41b15..603005d5a4 100644 --- a/kotlinx-coroutines-core/js/src/JSDispatcher.kt +++ b/kotlinx-coroutines-core/js/src/JSDispatcher.kt @@ -31,6 +31,11 @@ internal sealed class SetTimeoutBasedDispatcher: CoroutineDispatcher(), Delay { abstract fun scheduleQueueProcessing() + override fun limitedParallelism(parallelism: Int): CoroutineDispatcher { + parallelism.checkParallelism() + return this + } + override fun dispatch(context: CoroutineContext, block: Runnable) { messageQueue.enqueue(block) } diff --git a/kotlinx-coroutines-core/jvm/src/Dispatchers.kt b/kotlinx-coroutines-core/jvm/src/Dispatchers.kt index a3be9fa53c..4b1b03337d 100644 --- a/kotlinx-coroutines-core/jvm/src/Dispatchers.kt +++ b/kotlinx-coroutines-core/jvm/src/Dispatchers.kt @@ -86,7 +86,7 @@ public actual object Dispatchers { * Note that if you need your coroutine to be confined to a particular thread or a thread-pool after resumption, * but still want to execute it in the current call-frame until its first suspension, then you can use * an optional [CoroutineStart] parameter in coroutine builders like - * [launch][CoroutineScope.launch] and [async][CoroutineScope.async] setting it to the + * [launch][CoroutineScope.launch] and [async][CoroutineScope.async] setting it to * the value of [CoroutineStart.UNDISPATCHED]. */ @JvmStatic @@ -100,14 +100,30 @@ public actual object Dispatchers { * "`kotlinx.coroutines.io.parallelism`" ([IO_PARALLELISM_PROPERTY_NAME]) system property. * It defaults to the limit of 64 threads or the number of cores (whichever is larger). * - * Moreover, the maximum configurable number of threads is capped by the - * `kotlinx.coroutines.scheduler.max.pool.size` system property. - * If you need a higher number of parallel threads, - * you should use a custom dispatcher backed by your own thread pool. + * ### Elasticity for limited parallelism + * + * `Dispatchers.IO` has a unique property of elasticity: its views + * obtained with [CoroutineDispatcher.limitedParallelism] are + * not restricted by the `Dispatchers.IO` parallelism. Conceptually, there is + * a dispatcher backed by an unlimited pool of threads, and both `Dispatchers.IO` + * and views of `Dispatchers.IO` are actually views of that dispatcher. In practice + * this means that, despite not abiding by `Dispatchers.IO`'s parallelism + * restrictions, its views share threads and resources with it. + * + * In the following example + * ``` + * // 100 threads for MySQL connection + * val myMysqlDbDispatcher = Dispatchers.IO.limitedParallelism(100) + * // 60 threads for MongoDB connection + * val myMongoDbDispatcher = Dispatchers.IO.limitedParallelism(60) + * ``` + * the system may have up to `64 + 100 + 60` threads dedicated to blocking tasks during peak loads, + * but during its steady state there is only a small number of threads shared + * among `Dispatchers.IO`, `myMysqlDbDispatcher` and `myMongoDbDispatcher`. * * ### Implementation note * - * This dispatcher shares threads with the [Default][Dispatchers.Default] dispatcher, so using + * This dispatcher and its views share threads with the [Default][Dispatchers.Default] dispatcher, so using * `withContext(Dispatchers.IO) { ... }` when already running on the [Default][Dispatchers.Default] * dispatcher does not lead to an actual switching to another thread — typically execution * continues in the same thread. @@ -115,7 +131,7 @@ public actual object Dispatchers { * during operations over IO dispatcher. */ @JvmStatic - public val IO: CoroutineDispatcher = DefaultScheduler.IO + public val IO: CoroutineDispatcher = DefaultIoScheduler /** * Shuts down built-in dispatchers, such as [Default] and [IO], diff --git a/kotlinx-coroutines-core/jvm/src/internal/MainDispatchers.kt b/kotlinx-coroutines-core/jvm/src/internal/MainDispatchers.kt index 2d447413b8..2da633a6b6 100644 --- a/kotlinx-coroutines-core/jvm/src/internal/MainDispatchers.kt +++ b/kotlinx-coroutines-core/jvm/src/internal/MainDispatchers.kt @@ -93,6 +93,9 @@ private class MissingMainCoroutineDispatcher( override fun isDispatchNeeded(context: CoroutineContext): Boolean = missing() + override fun limitedParallelism(parallelism: Int): CoroutineDispatcher = + missing() + override suspend fun delay(time: Long) = missing() diff --git a/kotlinx-coroutines-core/jvm/src/scheduling/Deprecated.kt b/kotlinx-coroutines-core/jvm/src/scheduling/Deprecated.kt new file mode 100644 index 0000000000..86b0ade61a --- /dev/null +++ b/kotlinx-coroutines-core/jvm/src/scheduling/Deprecated.kt @@ -0,0 +1,212 @@ +/* + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +@file:Suppress("unused") + +package kotlinx.coroutines.scheduling +import kotlinx.atomicfu.* +import kotlinx.coroutines.* +import java.util.concurrent.* +import kotlin.coroutines.* + +/** + * This API was "public @InternalApi" and leaked into Ktor enabled-by-default sources. + * Since then, we refactored scheduler sources and its API and decided to get rid of it in + * its current shape. + * + * To preserve backwards compatibility with Ktor 1.x, previous version of the code is + * extracted here as is and isolated from the rest of code base, so R8 can get rid of it. + * + * It should be removed after Kotlin 3.0.0 (EOL of Ktor 1.x) around 2022. + */ +@PublishedApi +internal open class ExperimentalCoroutineDispatcher( + private val corePoolSize: Int, + private val maxPoolSize: Int, + private val idleWorkerKeepAliveNs: Long, + private val schedulerName: String = "CoroutineScheduler" +) : ExecutorCoroutineDispatcher() { + public constructor( + corePoolSize: Int = CORE_POOL_SIZE, + maxPoolSize: Int = MAX_POOL_SIZE, + schedulerName: String = DEFAULT_SCHEDULER_NAME + ) : this(corePoolSize, maxPoolSize, IDLE_WORKER_KEEP_ALIVE_NS, schedulerName) + + @Deprecated(message = "Binary compatibility for Ktor 1.0-beta", level = DeprecationLevel.HIDDEN) + public constructor( + corePoolSize: Int = CORE_POOL_SIZE, + maxPoolSize: Int = MAX_POOL_SIZE + ) : this(corePoolSize, maxPoolSize, IDLE_WORKER_KEEP_ALIVE_NS) + + override val executor: Executor + get() = coroutineScheduler + + // This is variable for test purposes, so that we can reinitialize from clean state + private var coroutineScheduler = createScheduler() + + override fun dispatch(context: CoroutineContext, block: Runnable): Unit = + try { + coroutineScheduler.dispatch(block) + } catch (e: RejectedExecutionException) { + // CoroutineScheduler only rejects execution when it is being closed and this behavior is reserved + // for testing purposes, so we don't have to worry about cancelling the affected Job here. + DefaultExecutor.dispatch(context, block) + } + + override fun dispatchYield(context: CoroutineContext, block: Runnable): Unit = + try { + coroutineScheduler.dispatch(block, tailDispatch = true) + } catch (e: RejectedExecutionException) { + // CoroutineScheduler only rejects execution when it is being closed and this behavior is reserved + // for testing purposes, so we don't have to worry about cancelling the affected Job here. + DefaultExecutor.dispatchYield(context, block) + } + + override fun close(): Unit = coroutineScheduler.close() + + override fun toString(): String { + return "${super.toString()}[scheduler = $coroutineScheduler]" + } + + /** + * Creates a coroutine execution context with limited parallelism to execute tasks which may potentially block. + * Resulting [CoroutineDispatcher] doesn't own any resources (its threads) and provides a view of the original [ExperimentalCoroutineDispatcher], + * giving it additional hints to adjust its behaviour. + * + * @param parallelism parallelism level, indicating how many threads can execute tasks in the resulting dispatcher parallel. + */ + fun blocking(parallelism: Int = 16): CoroutineDispatcher { + require(parallelism > 0) { "Expected positive parallelism level, but have $parallelism" } + return LimitingDispatcher(this, parallelism, null, TASK_PROBABLY_BLOCKING) + } + + /** + * Creates a coroutine execution context with limited parallelism to execute CPU-intensive tasks. + * Resulting [CoroutineDispatcher] doesn't own any resources (its threads) and provides a view of the original [ExperimentalCoroutineDispatcher], + * giving it additional hints to adjust its behaviour. + * + * @param parallelism parallelism level, indicating how many threads can execute tasks in the resulting dispatcher parallel. + */ + fun limited(parallelism: Int): CoroutineDispatcher { + require(parallelism > 0) { "Expected positive parallelism level, but have $parallelism" } + require(parallelism <= corePoolSize) { "Expected parallelism level lesser than core pool size ($corePoolSize), but have $parallelism" } + return LimitingDispatcher(this, parallelism, null, TASK_NON_BLOCKING) + } + + internal fun dispatchWithContext(block: Runnable, context: TaskContext, tailDispatch: Boolean) { + try { + coroutineScheduler.dispatch(block, context, tailDispatch) + } catch (e: RejectedExecutionException) { + // CoroutineScheduler only rejects execution when it is being closed and this behavior is reserved + // for testing purposes, so we don't have to worry about cancelling the affected Job here. + // TaskContext shouldn't be lost here to properly invoke before/after task + DefaultExecutor.enqueue(coroutineScheduler.createTask(block, context)) + } + } + + private fun createScheduler() = CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName) +} + +private class LimitingDispatcher( + private val dispatcher: ExperimentalCoroutineDispatcher, + private val parallelism: Int, + private val name: String?, + override val taskMode: Int +) : ExecutorCoroutineDispatcher(), TaskContext, Executor { + + private val queue = ConcurrentLinkedQueue() + private val inFlightTasks = atomic(0) + + override val executor: Executor + get() = this + + override fun execute(command: Runnable) = dispatch(command, false) + + override fun close(): Unit = error("Close cannot be invoked on LimitingBlockingDispatcher") + + override fun dispatch(context: CoroutineContext, block: Runnable) = dispatch(block, false) + + private fun dispatch(block: Runnable, tailDispatch: Boolean) { + var taskToSchedule = block + while (true) { + // Commit in-flight tasks slot + val inFlight = inFlightTasks.incrementAndGet() + + // Fast path, if parallelism limit is not reached, dispatch task and return + if (inFlight <= parallelism) { + dispatcher.dispatchWithContext(taskToSchedule, this, tailDispatch) + return + } + + // Parallelism limit is reached, add task to the queue + queue.add(taskToSchedule) + + /* + * We're not actually scheduled anything, so rollback committed in-flight task slot: + * If the amount of in-flight tasks is still above the limit, do nothing + * If the amount of in-flight tasks is lesser than parallelism, then + * it's a race with a thread which finished the task from the current context, we should resubmit the first task from the queue + * to avoid starvation. + * + * Race example #1 (TN is N-th thread, R is current in-flight tasks number), execution is sequential: + * + * T1: submit task, start execution, R == 1 + * T2: commit slot for next task, R == 2 + * T1: finish T1, R == 1 + * T2: submit next task to local queue, decrement R, R == 0 + * Without retries, task from T2 will be stuck in the local queue + */ + if (inFlightTasks.decrementAndGet() >= parallelism) { + return + } + + taskToSchedule = queue.poll() ?: return + } + } + + override fun dispatchYield(context: CoroutineContext, block: Runnable) { + dispatch(block, tailDispatch = true) + } + + override fun toString(): String { + return name ?: "${super.toString()}[dispatcher = $dispatcher]" + } + + /** + * Tries to dispatch tasks which were blocked due to reaching parallelism limit if there is any. + * + * Implementation note: blocking tasks are scheduled in a fair manner (to local queue tail) to avoid + * non-blocking continuations starvation. + * E.g. for + * ``` + * foo() + * blocking() + * bar() + * ``` + * it's more profitable to execute bar at the end of `blocking` rather than pending blocking task + */ + override fun afterTask() { + var next = queue.poll() + // If we have pending tasks in current blocking context, dispatch first + if (next != null) { + dispatcher.dispatchWithContext(next, this, true) + return + } + inFlightTasks.decrementAndGet() + + /* + * Re-poll again and try to submit task if it's required otherwise tasks may be stuck in the local queue. + * Race example #2 (TN is N-th thread, R is current in-flight tasks number), execution is sequential: + * T1: submit task, start execution, R == 1 + * T2: commit slot for next task, R == 2 + * T1: finish T1, poll queue (it's still empty), R == 2 + * T2: submit next task to the local queue, decrement R, R == 1 + * T1: decrement R, finish. R == 0 + * + * The task from T2 is stuck is the local queue + */ + next = queue.poll() ?: return + dispatch(next, true) + } +} diff --git a/kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt b/kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt index 0b5a542c2c..d55edec94f 100644 --- a/kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt +++ b/kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt @@ -4,24 +4,16 @@ package kotlinx.coroutines.scheduling -import kotlinx.atomicfu.* import kotlinx.coroutines.* import kotlinx.coroutines.internal.* import java.util.concurrent.* import kotlin.coroutines.* -/** - * Default instance of coroutine dispatcher. - */ -internal object DefaultScheduler : ExperimentalCoroutineDispatcher() { - @JvmField - val IO: CoroutineDispatcher = LimitingDispatcher( - this, - systemProp(IO_PARALLELISM_PROPERTY_NAME, 64.coerceAtLeast(AVAILABLE_PROCESSORS)), - "Dispatchers.IO", - TASK_PROBABLY_BLOCKING - ) - +// Instance of Dispatchers.Default +internal object DefaultScheduler : SchedulerCoroutineDispatcher( + CORE_POOL_SIZE, MAX_POOL_SIZE, + IDLE_WORKER_KEEP_ALIVE_NS, DEFAULT_SCHEDULER_NAME +) { // Shuts down the dispatcher, used only by Dispatchers.shutdown() internal fun shutdown() { super.close() @@ -29,106 +21,91 @@ internal object DefaultScheduler : ExperimentalCoroutineDispatcher() { // Overridden in case anyone writes (Dispatchers.Default as ExecutorCoroutineDispatcher).close() override fun close() { - throw UnsupportedOperationException("$DEFAULT_DISPATCHER_NAME cannot be closed") + throw UnsupportedOperationException("Dispatchers.Default cannot be closed") } - override fun toString(): String = DEFAULT_DISPATCHER_NAME + override fun toString(): String = "Dispatchers.Default" +} + +// The unlimited instance of Dispatchers.IO that utilizes all the threads CoroutineScheduler provides +private object UnlimitedIoScheduler : CoroutineDispatcher() { @InternalCoroutinesApi - @Suppress("UNUSED") - public fun toDebugString(): String = super.toString() + override fun dispatchYield(context: CoroutineContext, block: Runnable) { + DefaultScheduler.dispatchWithContext(block, BlockingContext, true) + } + + override fun dispatch(context: CoroutineContext, block: Runnable) { + DefaultScheduler.dispatchWithContext(block, BlockingContext, false) + } } -/** - * @suppress **This is unstable API and it is subject to change.** - */ -// TODO make internal (and rename) after complete integration -@InternalCoroutinesApi -public open class ExperimentalCoroutineDispatcher( - private val corePoolSize: Int, - private val maxPoolSize: Int, - private val idleWorkerKeepAliveNs: Long, - private val schedulerName: String = "CoroutineScheduler" -) : ExecutorCoroutineDispatcher() { - public constructor( - corePoolSize: Int = CORE_POOL_SIZE, - maxPoolSize: Int = MAX_POOL_SIZE, - schedulerName: String = DEFAULT_SCHEDULER_NAME - ) : this(corePoolSize, maxPoolSize, IDLE_WORKER_KEEP_ALIVE_NS, schedulerName) - - @Deprecated(message = "Binary compatibility for Ktor 1.0-beta", level = DeprecationLevel.HIDDEN) - public constructor( - corePoolSize: Int = CORE_POOL_SIZE, - maxPoolSize: Int = MAX_POOL_SIZE - ) : this(corePoolSize, maxPoolSize, IDLE_WORKER_KEEP_ALIVE_NS) +// Dispatchers.IO +internal object DefaultIoScheduler : ExecutorCoroutineDispatcher(), Executor { + + private val default = UnlimitedIoScheduler.limitedParallelism( + systemProp( + IO_PARALLELISM_PROPERTY_NAME, + 64.coerceAtLeast(AVAILABLE_PROCESSORS) + ) + ) override val executor: Executor - get() = coroutineScheduler + get() = this - // This is variable for test purposes, so that we can reinitialize from clean state - private var coroutineScheduler = createScheduler() + override fun execute(command: java.lang.Runnable) = dispatch(EmptyCoroutineContext, command) - override fun dispatch(context: CoroutineContext, block: Runnable): Unit = - try { - coroutineScheduler.dispatch(block) - } catch (e: RejectedExecutionException) { - // CoroutineScheduler only rejects execution when it is being closed and this behavior is reserved - // for testing purposes, so we don't have to worry about cancelling the affected Job here. - DefaultExecutor.dispatch(context, block) - } + @ExperimentalCoroutinesApi + override fun limitedParallelism(parallelism: Int): CoroutineDispatcher { + // See documentation to Dispatchers.IO for the rationale + return UnlimitedIoScheduler.limitedParallelism(parallelism) + } - override fun dispatchYield(context: CoroutineContext, block: Runnable): Unit = - try { - coroutineScheduler.dispatch(block, tailDispatch = true) - } catch (e: RejectedExecutionException) { - // CoroutineScheduler only rejects execution when it is being closed and this behavior is reserved - // for testing purposes, so we don't have to worry about cancelling the affected Job here. - DefaultExecutor.dispatchYield(context, block) - } - - override fun close(): Unit = coroutineScheduler.close() - - override fun toString(): String { - return "${super.toString()}[scheduler = $coroutineScheduler]" + override fun dispatch(context: CoroutineContext, block: Runnable) { + default.dispatch(context, block) } - /** - * Creates a coroutine execution context with limited parallelism to execute tasks which may potentially block. - * Resulting [CoroutineDispatcher] doesn't own any resources (its threads) and provides a view of the original [ExperimentalCoroutineDispatcher], - * giving it additional hints to adjust its behaviour. - * - * @param parallelism parallelism level, indicating how many threads can execute tasks in the resulting dispatcher parallel. - */ - public fun blocking(parallelism: Int = BLOCKING_DEFAULT_PARALLELISM): CoroutineDispatcher { - require(parallelism > 0) { "Expected positive parallelism level, but have $parallelism" } - return LimitingDispatcher(this, parallelism, null, TASK_PROBABLY_BLOCKING) + @InternalCoroutinesApi + override fun dispatchYield(context: CoroutineContext, block: Runnable) { + default.dispatchYield(context, block) } - /** - * Creates a coroutine execution context with limited parallelism to execute CPU-intensive tasks. - * Resulting [CoroutineDispatcher] doesn't own any resources (its threads) and provides a view of the original [ExperimentalCoroutineDispatcher], - * giving it additional hints to adjust its behaviour. - * - * @param parallelism parallelism level, indicating how many threads can execute tasks in the resulting dispatcher parallel. - */ - public fun limited(parallelism: Int): CoroutineDispatcher { - require(parallelism > 0) { "Expected positive parallelism level, but have $parallelism" } - require(parallelism <= corePoolSize) { "Expected parallelism level lesser than core pool size ($corePoolSize), but have $parallelism" } - return LimitingDispatcher(this, parallelism, null, TASK_NON_BLOCKING) + override fun close() { + error("Cannot be invoked on Dispatchers.IO") } + override fun toString(): String = "Dispatchers.IO" +} + +// Instantiated in tests so we can test it in isolation +internal open class SchedulerCoroutineDispatcher( + private val corePoolSize: Int = CORE_POOL_SIZE, + private val maxPoolSize: Int = MAX_POOL_SIZE, + private val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS, + private val schedulerName: String = "CoroutineScheduler", +) : ExecutorCoroutineDispatcher() { + + override val executor: Executor + get() = coroutineScheduler + + // This is variable for test purposes, so that we can reinitialize from clean state + private var coroutineScheduler = createScheduler() + + private fun createScheduler() = + CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName) + + override fun dispatch(context: CoroutineContext, block: Runnable): Unit = coroutineScheduler.dispatch(block) + + override fun dispatchYield(context: CoroutineContext, block: Runnable): Unit = + coroutineScheduler.dispatch(block, tailDispatch = true) + internal fun dispatchWithContext(block: Runnable, context: TaskContext, tailDispatch: Boolean) { - try { - coroutineScheduler.dispatch(block, context, tailDispatch) - } catch (e: RejectedExecutionException) { - // CoroutineScheduler only rejects execution when it is being closed and this behavior is reserved - // for testing purposes, so we don't have to worry about cancelling the affected Job here. - // TaskContext shouldn't be lost here to properly invoke before/after task - DefaultExecutor.enqueue(coroutineScheduler.createTask(block, context)) - } + coroutineScheduler.dispatch(block, context, tailDispatch) } - private fun createScheduler() = CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName) + override fun close() { + coroutineScheduler.close() + } // fot tests only @Synchronized @@ -146,106 +123,3 @@ public open class ExperimentalCoroutineDispatcher( // for tests only internal fun restore() = usePrivateScheduler() // recreate scheduler } - -private class LimitingDispatcher( - private val dispatcher: ExperimentalCoroutineDispatcher, - private val parallelism: Int, - private val name: String?, - override val taskMode: Int -) : ExecutorCoroutineDispatcher(), TaskContext, Executor { - - private val queue = ConcurrentLinkedQueue() - private val inFlightTasks = atomic(0) - - override val executor: Executor - get() = this - - override fun execute(command: Runnable) = dispatch(command, false) - - override fun close(): Unit = error("Close cannot be invoked on LimitingBlockingDispatcher") - - override fun dispatch(context: CoroutineContext, block: Runnable) = dispatch(block, false) - - private fun dispatch(block: Runnable, tailDispatch: Boolean) { - var taskToSchedule = block - while (true) { - // Commit in-flight tasks slot - val inFlight = inFlightTasks.incrementAndGet() - - // Fast path, if parallelism limit is not reached, dispatch task and return - if (inFlight <= parallelism) { - dispatcher.dispatchWithContext(taskToSchedule, this, tailDispatch) - return - } - - // Parallelism limit is reached, add task to the queue - queue.add(taskToSchedule) - - /* - * We're not actually scheduled anything, so rollback committed in-flight task slot: - * If the amount of in-flight tasks is still above the limit, do nothing - * If the amount of in-flight tasks is lesser than parallelism, then - * it's a race with a thread which finished the task from the current context, we should resubmit the first task from the queue - * to avoid starvation. - * - * Race example #1 (TN is N-th thread, R is current in-flight tasks number), execution is sequential: - * - * T1: submit task, start execution, R == 1 - * T2: commit slot for next task, R == 2 - * T1: finish T1, R == 1 - * T2: submit next task to local queue, decrement R, R == 0 - * Without retries, task from T2 will be stuck in the local queue - */ - if (inFlightTasks.decrementAndGet() >= parallelism) { - return - } - - taskToSchedule = queue.poll() ?: return - } - } - - override fun dispatchYield(context: CoroutineContext, block: Runnable) { - dispatch(block, tailDispatch = true) - } - - override fun toString(): String { - return name ?: "${super.toString()}[dispatcher = $dispatcher]" - } - - /** - * Tries to dispatch tasks which were blocked due to reaching parallelism limit if there is any. - * - * Implementation note: blocking tasks are scheduled in a fair manner (to local queue tail) to avoid - * non-blocking continuations starvation. - * E.g. for - * ``` - * foo() - * blocking() - * bar() - * ``` - * it's more profitable to execute bar at the end of `blocking` rather than pending blocking task - */ - override fun afterTask() { - var next = queue.poll() - // If we have pending tasks in current blocking context, dispatch first - if (next != null) { - dispatcher.dispatchWithContext(next, this, true) - return - } - inFlightTasks.decrementAndGet() - - /* - * Re-poll again and try to submit task if it's required otherwise tasks may be stuck in the local queue. - * Race example #2 (TN is N-th thread, R is current in-flight tasks number), execution is sequential: - * T1: submit task, start execution, R == 1 - * T2: commit slot for next task, R == 2 - * T1: finish T1, poll queue (it's still empty), R == 2 - * T2: submit next task to the local queue, decrement R, R == 1 - * T1: decrement R, finish. R == 0 - * - * The task from T2 is stuck is the local queue - */ - next = queue.poll() ?: return - dispatch(next, true) - } -} diff --git a/kotlinx-coroutines-core/jvm/src/scheduling/Tasks.kt b/kotlinx-coroutines-core/jvm/src/scheduling/Tasks.kt index da867c9853..5403cfc1fd 100644 --- a/kotlinx-coroutines-core/jvm/src/scheduling/Tasks.kt +++ b/kotlinx-coroutines-core/jvm/src/scheduling/Tasks.kt @@ -9,10 +9,6 @@ import kotlinx.coroutines.internal.* import java.util.concurrent.* -// TODO most of these fields will be moved to 'object ExperimentalDispatcher' - -// User-visible name -internal const val DEFAULT_DISPATCHER_NAME = "Dispatchers.Default" // Internal debuggability name + thread name prefixes internal const val DEFAULT_SCHEDULER_NAME = "DefaultDispatcher" @@ -22,27 +18,24 @@ internal val WORK_STEALING_TIME_RESOLUTION_NS = systemProp( "kotlinx.coroutines.scheduler.resolution.ns", 100000L ) -@JvmField -internal val BLOCKING_DEFAULT_PARALLELISM = systemProp( - "kotlinx.coroutines.scheduler.blocking.parallelism", 16 -) - -// NOTE: we coerce default to at least two threads to give us chances that multi-threading problems -// get reproduced even on a single-core machine, but support explicit setting of 1 thread scheduler if needed. +/** + * The maximum number of threads allocated for CPU-bound tasks at the default set of dispatchers. + * + * NOTE: we coerce default to at least two threads to give us chances that multi-threading problems + * get reproduced even on a single-core machine, but support explicit setting of 1 thread scheduler if needed + */ @JvmField internal val CORE_POOL_SIZE = systemProp( "kotlinx.coroutines.scheduler.core.pool.size", - AVAILABLE_PROCESSORS.coerceAtLeast(2), // !!! at least two here + AVAILABLE_PROCESSORS.coerceAtLeast(2), minValue = CoroutineScheduler.MIN_SUPPORTED_POOL_SIZE ) +/** The maximum number of threads allocated for blocking tasks at the default set of dispatchers. */ @JvmField internal val MAX_POOL_SIZE = systemProp( "kotlinx.coroutines.scheduler.max.pool.size", - (AVAILABLE_PROCESSORS * 128).coerceIn( - CORE_POOL_SIZE, - CoroutineScheduler.MAX_SUPPORTED_POOL_SIZE - ), + CoroutineScheduler.MAX_SUPPORTED_POOL_SIZE, maxValue = CoroutineScheduler.MAX_SUPPORTED_POOL_SIZE ) @@ -69,14 +62,18 @@ internal interface TaskContext { fun afterTask() } -internal object NonBlockingContext : TaskContext { - override val taskMode: Int = TASK_NON_BLOCKING - +private class TaskContextImpl(override val taskMode: Int): TaskContext { override fun afterTask() { - // Nothing for non-blocking context + // Nothing for non-blocking context } } +@JvmField +internal val NonBlockingContext: TaskContext = TaskContextImpl(TASK_NON_BLOCKING) + +@JvmField +internal val BlockingContext: TaskContext = TaskContextImpl(TASK_PROBABLY_BLOCKING) + internal abstract class Task( @JvmField var submissionTime: Long, @JvmField var taskContext: TaskContext diff --git a/kotlinx-coroutines-core/jvm/test/LimitedParallelismStressTest.kt b/kotlinx-coroutines-core/jvm/test/LimitedParallelismStressTest.kt new file mode 100644 index 0000000000..4de1862b0f --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/LimitedParallelismStressTest.kt @@ -0,0 +1,83 @@ +/* + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines + +import org.junit.* +import org.junit.Test +import org.junit.runner.* +import org.junit.runners.* +import java.util.concurrent.* +import java.util.concurrent.atomic.* +import kotlin.test.* + +@RunWith(Parameterized::class) +class LimitedParallelismStressTest(private val targetParallelism: Int) : TestBase() { + + companion object { + @Parameterized.Parameters(name = "{0}") + @JvmStatic + fun params(): Collection> = listOf(1, 2, 3, 4).map { arrayOf(it) } + } + + @get:Rule + val executor = ExecutorRule(targetParallelism * 2) + private val iterations = 100_000 * stressTestMultiplier + + private val parallelism = AtomicInteger(0) + + private fun checkParallelism() { + val value = parallelism.incrementAndGet() + Thread.yield() + assertTrue { value <= targetParallelism } + parallelism.decrementAndGet() + } + + @Test + fun testLimitedExecutor() = runTest { + val view = executor.limitedParallelism(targetParallelism) + repeat(iterations) { + launch(view) { + checkParallelism() + } + } + } + + @Test + fun testLimitedDispatchersIo() = runTest { + val view = Dispatchers.IO.limitedParallelism(targetParallelism) + repeat(iterations) { + launch(view) { + checkParallelism() + } + } + } + + @Test + fun testLimitedDispatchersIoDispatchYield() = runTest { + val view = Dispatchers.IO.limitedParallelism(targetParallelism) + repeat(iterations) { + launch(view) { + yield() + checkParallelism() + } + } + } + + @Test + fun testLimitedExecutorReachesTargetParallelism() = runTest { + val view = executor.limitedParallelism(targetParallelism) + repeat(iterations) { + val barrier = CyclicBarrier(targetParallelism + 1) + repeat(targetParallelism) { + launch(view) { + barrier.await() + } + } + // Successfully awaited parallelism + 1 + barrier.await() + coroutineContext.job.children.toList().joinAll() + } + } +} diff --git a/kotlinx-coroutines-core/jvm/test/LimitedParallelismTest.kt b/kotlinx-coroutines-core/jvm/test/LimitedParallelismTest.kt new file mode 100644 index 0000000000..30c54117a9 --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/LimitedParallelismTest.kt @@ -0,0 +1,55 @@ +/* + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines + +import org.junit.Test +import java.util.concurrent.* +import kotlin.coroutines.* +import kotlin.test.* + +class LimitedParallelismTest : TestBase() { + + @Test + fun testParallelismSpec() { + assertFailsWith { Dispatchers.Default.limitedParallelism(0) } + assertFailsWith { Dispatchers.Default.limitedParallelism(-1) } + assertFailsWith { Dispatchers.Default.limitedParallelism(Int.MIN_VALUE) } + Dispatchers.Default.limitedParallelism(Int.MAX_VALUE) + } + + @Test + fun testTaskFairness() = runTest { + val executor = newSingleThreadContext("test") + val view = executor.limitedParallelism(1) + val view2 = executor.limitedParallelism(1) + val j1 = launch(view) { + while (true) { + yield() + } + } + val j2 = launch(view2) { j1.cancel() } + joinAll(j1, j2) + executor.close() + } + + @Test + fun testUnhandledException() = runTest { + var caughtException: Throwable? = null + val executor = Executors.newFixedThreadPool( + 1 + ) { + Thread(it).also { + it.uncaughtExceptionHandler = Thread.UncaughtExceptionHandler { _, e -> caughtException = e } + } + }.asCoroutineDispatcher() + val view = executor.limitedParallelism(1) + view.dispatch(EmptyCoroutineContext, Runnable { throw TestException() }) + withContext(view) { + // Verify it is in working state and establish happens-before + } + assertTrue { caughtException is TestException } + executor.close() + } +} diff --git a/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherTerminationStressTest.kt b/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherTerminationStressTest.kt index 9c17e6988d..864ecdc087 100644 --- a/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherTerminationStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherTerminationStressTest.kt @@ -10,7 +10,7 @@ import java.util.* import java.util.concurrent.* class BlockingCoroutineDispatcherTerminationStressTest : TestBase() { - private val baseDispatcher = ExperimentalCoroutineDispatcher( + private val baseDispatcher = SchedulerCoroutineDispatcher( 2, 20, TimeUnit.MILLISECONDS.toNanos(10) ) diff --git a/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherTest.kt b/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherTest.kt index fe09440f59..f8830feeef 100644 --- a/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherTest.kt +++ b/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherTest.kt @@ -125,71 +125,6 @@ class BlockingCoroutineDispatcherTest : SchedulerTestBase() { checkPoolThreadsCreated(101..100 + CORES_COUNT) } - @Test - fun testBlockingFairness() = runBlocking { - corePoolSize = 1 - maxPoolSize = 1 - - val blocking = blockingDispatcher(1) - val task = async(dispatcher) { - expect(1) - - val nonBlocking = async(dispatcher) { - expect(3) - } - - val firstBlocking = async(blocking) { - expect(2) - } - - val secondBlocking = async(blocking) { - // Already have 1 queued blocking task, so this one wouldn't be scheduled to head - expect(4) - } - - listOf(firstBlocking, nonBlocking, secondBlocking).joinAll() - finish(5) - } - - task.await() - } - - @Test - fun testBoundedBlockingFairness() = runBlocking { - corePoolSize = 1 - maxPoolSize = 1 - - val blocking = blockingDispatcher(2) - val task = async(dispatcher) { - expect(1) - - val nonBlocking = async(dispatcher) { - expect(3) - } - - val firstBlocking = async(blocking) { - expect(4) - } - - val secondNonBlocking = async(dispatcher) { - expect(5) - } - - val secondBlocking = async(blocking) { - expect(2) // <- last submitted blocking is executed first - } - - val thirdBlocking = async(blocking) { - expect(6) // parallelism level is reached before this task - } - - listOf(firstBlocking, nonBlocking, secondBlocking, secondNonBlocking, thirdBlocking).joinAll() - finish(7) - } - - task.await() - } - @Test(timeout = 1_000) fun testYield() = runBlocking { corePoolSize = 1 diff --git a/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherWorkSignallingStressTest.kt b/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherWorkSignallingStressTest.kt index 3280527f2a..3b3e085047 100644 --- a/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherWorkSignallingStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/scheduling/BlockingCoroutineDispatcherWorkSignallingStressTest.kt @@ -18,7 +18,7 @@ class BlockingCoroutineDispatcherWorkSignallingStressTest : SchedulerTestBase() val iterations = 1000 * stressTestMultiplier repeat(iterations) { // Create a dispatcher every iteration to increase probability of race - val dispatcher = ExperimentalCoroutineDispatcher(CORES_COUNT) + val dispatcher = SchedulerCoroutineDispatcher(CORES_COUNT) val blockingDispatcher = dispatcher.blocking(100) val blockingBarrier = CyclicBarrier(CORES_COUNT * 3 + 1) diff --git a/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineDispatcherTest.kt b/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineDispatcherTest.kt index 3cd77da74a..c95415a8df 100644 --- a/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineDispatcherTest.kt +++ b/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineDispatcherTest.kt @@ -134,7 +134,7 @@ class CoroutineDispatcherTest : SchedulerTestBase() { val initialCount = Thread.getAllStackTraces().keys.asSequence() .count { it is CoroutineScheduler.Worker && it.name.contains("SomeTestName") } assertEquals(0, initialCount) - val dispatcher = ExperimentalCoroutineDispatcher(1, 1, IDLE_WORKER_KEEP_ALIVE_NS, "SomeTestName") + val dispatcher = SchedulerCoroutineDispatcher(1, 1, IDLE_WORKER_KEEP_ALIVE_NS, "SomeTestName") dispatcher.use { launch(dispatcher) { }.join() diff --git a/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineSchedulerCloseStressTest.kt b/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineSchedulerCloseStressTest.kt index 473b429283..a50867d61c 100644 --- a/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineSchedulerCloseStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineSchedulerCloseStressTest.kt @@ -22,15 +22,13 @@ class CoroutineSchedulerCloseStressTest(private val mode: Mode) : TestBase() { fun params(): Collection> = Mode.values().map { arrayOf(it) } } - private val N_REPEAT = 2 * stressTestMultiplier private val MAX_LEVEL = 5 private val N_COROS = (1 shl (MAX_LEVEL + 1)) - 1 private val N_THREADS = 4 private val rnd = Random() - private lateinit var closeableDispatcher: ExperimentalCoroutineDispatcher - private lateinit var dispatcher: ExecutorCoroutineDispatcher - private var closeIndex = -1 + private lateinit var closeableDispatcher: SchedulerCoroutineDispatcher + private lateinit var dispatcher: CoroutineDispatcher private val started = atomic(0) private val finished = atomic(0) @@ -44,20 +42,12 @@ class CoroutineSchedulerCloseStressTest(private val mode: Mode) : TestBase() { } } - @Test - fun testRacingClose() { - repeat(N_REPEAT) { - closeIndex = rnd.nextInt(N_COROS) - launchCoroutines() - } - } - private fun launchCoroutines() = runBlocking { - closeableDispatcher = ExperimentalCoroutineDispatcher(N_THREADS) + closeableDispatcher = SchedulerCoroutineDispatcher(N_THREADS) dispatcher = when (mode) { Mode.CPU -> closeableDispatcher - Mode.CPU_LIMITED -> closeableDispatcher.limited(N_THREADS) as ExecutorCoroutineDispatcher - Mode.BLOCKING -> closeableDispatcher.blocking(N_THREADS) as ExecutorCoroutineDispatcher + Mode.CPU_LIMITED -> closeableDispatcher.limitedParallelism(N_THREADS) + Mode.BLOCKING -> closeableDispatcher.blocking(N_THREADS) } started.value = 0 finished.value = 0 @@ -68,20 +58,16 @@ class CoroutineSchedulerCloseStressTest(private val mode: Mode) : TestBase() { assertEquals(N_COROS, finished.value) } + // Index and level are used only for debugging purpose private fun CoroutineScope.launchChild(index: Int, level: Int): Job = launch(start = CoroutineStart.ATOMIC) { started.incrementAndGet() try { - if (index == closeIndex) closeableDispatcher.close() if (level < MAX_LEVEL) { launchChild(2 * index + 1, level + 1) launchChild(2 * index + 2, level + 1) } else { if (rnd.nextBoolean()) { delay(1000) - val t = Thread.currentThread() - if (!t.name.contains("DefaultDispatcher-worker")) { - val a = 2 - } } else { yield() } diff --git a/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineSchedulerStressTest.kt b/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineSchedulerStressTest.kt index cb49f054ce..7aefd4f75c 100644 --- a/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineSchedulerStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineSchedulerStressTest.kt @@ -15,7 +15,7 @@ import kotlin.coroutines.* import kotlin.test.* class CoroutineSchedulerStressTest : TestBase() { - private var dispatcher: ExperimentalCoroutineDispatcher = ExperimentalCoroutineDispatcher() + private var dispatcher: SchedulerCoroutineDispatcher = SchedulerCoroutineDispatcher() private val observedThreads = ConcurrentHashMap() private val tasksNum = 500_000 * stressMemoryMultiplier() diff --git a/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineSchedulerTest.kt b/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineSchedulerTest.kt index b0a5954b70..9d41c05d26 100644 --- a/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineSchedulerTest.kt +++ b/kotlinx-coroutines-core/jvm/test/scheduling/CoroutineSchedulerTest.kt @@ -106,22 +106,22 @@ class CoroutineSchedulerTest : TestBase() { @Test(expected = IllegalArgumentException::class) fun testNegativeCorePoolSize() { - ExperimentalCoroutineDispatcher(-1, 4) + SchedulerCoroutineDispatcher(-1, 4) } @Test(expected = IllegalArgumentException::class) fun testNegativeMaxPoolSize() { - ExperimentalCoroutineDispatcher(1, -4) + SchedulerCoroutineDispatcher(1, -4) } @Test(expected = IllegalArgumentException::class) fun testCorePoolSizeGreaterThanMaxPoolSize() { - ExperimentalCoroutineDispatcher(4, 1) + SchedulerCoroutineDispatcher(4, 1) } @Test fun testSelfClose() { - val dispatcher = ExperimentalCoroutineDispatcher(1, 1) + val dispatcher = SchedulerCoroutineDispatcher(1, 1) val latch = CountDownLatch(1) dispatcher.dispatch(EmptyCoroutineContext, Runnable { dispatcher.close(); latch.countDown() @@ -131,7 +131,7 @@ class CoroutineSchedulerTest : TestBase() { @Test fun testInterruptionCleanup() { - ExperimentalCoroutineDispatcher(1, 1).use { + SchedulerCoroutineDispatcher(1, 1).use { val executor = it.executor var latch = CountDownLatch(1) executor.execute { @@ -171,4 +171,4 @@ class CoroutineSchedulerTest : TestBase() { private class TaskContextImpl(override val taskMode: Int) : TaskContext { override fun afterTask() {} } -} \ No newline at end of file +} diff --git a/kotlinx-coroutines-core/jvm/test/scheduling/DefaultDispatchersTest.kt b/kotlinx-coroutines-core/jvm/test/scheduling/DefaultDispatchersTest.kt new file mode 100644 index 0000000000..56c669547c --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/scheduling/DefaultDispatchersTest.kt @@ -0,0 +1,75 @@ +/* + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.scheduling + +import kotlinx.coroutines.* +import org.junit.Test +import java.util.concurrent.* +import java.util.concurrent.atomic.* +import kotlin.test.* + +class DefaultDispatchersTest : TestBase() { + + private /*const*/ val EXPECTED_PARALLELISM = 64 + + @Test(timeout = 10_000L) + fun testLimitedParallelismIsSeparatedFromDefaultIo() = runTest { + val barrier = CyclicBarrier(EXPECTED_PARALLELISM + 1) + val ioBlocker = CountDownLatch(1) + repeat(EXPECTED_PARALLELISM) { + launch(Dispatchers.IO) { + barrier.await() + ioBlocker.await() + } + } + + barrier.await() // Ensure all threads are occupied + barrier.reset() + val limited = Dispatchers.IO.limitedParallelism(EXPECTED_PARALLELISM) + repeat(EXPECTED_PARALLELISM) { + launch(limited) { + barrier.await() + } + } + barrier.await() + ioBlocker.countDown() + } + + @Test(timeout = 10_000L) + fun testDefaultDispatcherIsSeparateFromIO() = runTest { + val ioBarrier = CyclicBarrier(EXPECTED_PARALLELISM + 1) + val ioBlocker = CountDownLatch(1) + repeat(EXPECTED_PARALLELISM) { + launch(Dispatchers.IO) { + ioBarrier.await() + ioBlocker.await() + } + } + + ioBarrier.await() // Ensure all threads are occupied + val parallelism = Runtime.getRuntime().availableProcessors() + val defaultBarrier = CyclicBarrier(parallelism + 1) + repeat(parallelism) { + launch(Dispatchers.Default) { + defaultBarrier.await() + } + } + defaultBarrier.await() + ioBlocker.countDown() + } + + @Test + fun testHardCapOnParallelism() = runTest { + val iterations = 100_000 * stressTestMultiplierSqrt + val concurrency = AtomicInteger() + repeat(iterations) { + launch(Dispatchers.IO) { + val c = concurrency.incrementAndGet() + assertTrue("Got: $c") { c <= EXPECTED_PARALLELISM } + concurrency.decrementAndGet() + } + } + } +} diff --git a/kotlinx-coroutines-core/jvm/test/scheduling/LimitingDispatcherTest.kt b/kotlinx-coroutines-core/jvm/test/scheduling/LimitingDispatcherTest.kt index b4924277b5..e5705803c0 100644 --- a/kotlinx-coroutines-core/jvm/test/scheduling/LimitingDispatcherTest.kt +++ b/kotlinx-coroutines-core/jvm/test/scheduling/LimitingDispatcherTest.kt @@ -10,11 +10,6 @@ import java.util.concurrent.* class LimitingDispatcherTest : SchedulerTestBase() { - @Test(expected = IllegalArgumentException::class) - fun testTooLargeView() { - view(corePoolSize + 1) - } - @Test(expected = IllegalArgumentException::class) fun testNegativeView() { view(-1) diff --git a/kotlinx-coroutines-core/jvm/test/scheduling/SchedulerTestBase.kt b/kotlinx-coroutines-core/jvm/test/scheduling/SchedulerTestBase.kt index dd969bdd37..fc4436f418 100644 --- a/kotlinx-coroutines-core/jvm/test/scheduling/SchedulerTestBase.kt +++ b/kotlinx-coroutines-core/jvm/test/scheduling/SchedulerTestBase.kt @@ -6,7 +6,6 @@ package kotlinx.coroutines.scheduling -import kotlinx.atomicfu.* import kotlinx.coroutines.* import kotlinx.coroutines.internal.* import org.junit.* @@ -61,11 +60,11 @@ abstract class SchedulerTestBase : TestBase() { protected var maxPoolSize = 1024 protected var idleWorkerKeepAliveNs = IDLE_WORKER_KEEP_ALIVE_NS - private var _dispatcher: ExperimentalCoroutineDispatcher? = null + private var _dispatcher: SchedulerCoroutineDispatcher? = null protected val dispatcher: CoroutineDispatcher get() { if (_dispatcher == null) { - _dispatcher = ExperimentalCoroutineDispatcher( + _dispatcher = SchedulerCoroutineDispatcher( corePoolSize, maxPoolSize, idleWorkerKeepAliveNs @@ -86,7 +85,7 @@ abstract class SchedulerTestBase : TestBase() { protected fun view(parallelism: Int): CoroutineDispatcher { val intitialize = dispatcher - return _dispatcher!!.limited(parallelism) + return _dispatcher!!.limitedParallelism(parallelism) } @After @@ -98,3 +97,17 @@ abstract class SchedulerTestBase : TestBase() { } } } + +internal fun SchedulerCoroutineDispatcher.blocking(parallelism: Int = 16): CoroutineDispatcher { + return object : CoroutineDispatcher() { + + @InternalCoroutinesApi + override fun dispatchYield(context: CoroutineContext, block: Runnable) { + this@blocking.dispatchWithContext(block, BlockingContext, true) + } + + override fun dispatch(context: CoroutineContext, block: Runnable) { + this@blocking.dispatchWithContext(block, BlockingContext, false) + } + }.limitedParallelism(parallelism) +} diff --git a/kotlinx-coroutines-core/jvm/test/scheduling/SharingWorkerClassTest.kt b/kotlinx-coroutines-core/jvm/test/scheduling/SharingWorkerClassTest.kt index 6a66da9f5c..743b4a617f 100644 --- a/kotlinx-coroutines-core/jvm/test/scheduling/SharingWorkerClassTest.kt +++ b/kotlinx-coroutines-core/jvm/test/scheduling/SharingWorkerClassTest.kt @@ -13,8 +13,8 @@ class SharingWorkerClassTest : SchedulerTestBase() { @Test fun testSharedThread() = runTest { - val dispatcher = ExperimentalCoroutineDispatcher(1, schedulerName = "first") - val dispatcher2 = ExperimentalCoroutineDispatcher(1, schedulerName = "second") + val dispatcher = SchedulerCoroutineDispatcher(1, schedulerName = "first") + val dispatcher2 = SchedulerCoroutineDispatcher(1, schedulerName = "second") try { withContext(dispatcher) { @@ -39,7 +39,7 @@ class SharingWorkerClassTest : SchedulerTestBase() { val cores = Runtime.getRuntime().availableProcessors() repeat(cores + 1) { CoroutineScope(Dispatchers.Default).launch { - ExperimentalCoroutineDispatcher(1).close() + SchedulerCoroutineDispatcher(1).close() }.join() } }