Skip to content

Commit

Permalink
Elastic Dispatchers.IO
Browse files Browse the repository at this point in the history
    * Extract Ktor-obsolete API to a separate file for backwards compatibility
    * Make Dispatchers.IO being a slice of unlimited blocking scheduler
    * Make Dispatchers.IO.limitParallelism take slices from the same internal scheduler

Fixes #2943
  • Loading branch information
qwwdfsad committed Sep 21, 2021
1 parent ecd36dd commit 5fe0261
Show file tree
Hide file tree
Showing 18 changed files with 431 additions and 331 deletions.
Expand Up @@ -30,7 +30,7 @@ abstract class ParametrizedDispatcherBase : CoroutineScope {
coroutineContext = when {
dispatcher == "fjp" -> ForkJoinPool.commonPool().asCoroutineDispatcher()
dispatcher == "scheduler" -> {
ExperimentalCoroutineDispatcher(CORES_COUNT).also { closeable = it }
Dispatchers.Default
}
dispatcher.startsWith("ftp") -> {
newFixedThreadPoolContext(dispatcher.substring(4).toInt(), dispatcher).also { closeable = it }
Expand Down
11 changes: 4 additions & 7 deletions benchmarks/src/jmh/kotlin/benchmarks/SemaphoreBenchmark.kt
Expand Up @@ -6,13 +6,10 @@ package benchmarks

import benchmarks.common.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.scheduling.ExperimentalCoroutineDispatcher
import kotlinx.coroutines.sync.Semaphore
import kotlinx.coroutines.sync.withPermit
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.sync.*
import org.openjdk.jmh.annotations.*
import java.util.concurrent.ForkJoinPool
import java.util.concurrent.TimeUnit
import java.util.concurrent.*

@Warmup(iterations = 3, time = 500, timeUnit = TimeUnit.MICROSECONDS)
@Measurement(iterations = 10, time = 500, timeUnit = TimeUnit.MICROSECONDS)
Expand Down Expand Up @@ -84,7 +81,7 @@ open class SemaphoreBenchmark {

enum class SemaphoreBenchDispatcherCreator(val create: (parallelism: Int) -> CoroutineDispatcher) {
FORK_JOIN({ parallelism -> ForkJoinPool(parallelism).asCoroutineDispatcher() }),
EXPERIMENTAL({ parallelism -> ExperimentalCoroutineDispatcher(corePoolSize = parallelism, maxPoolSize = parallelism) })
EXPERIMENTAL({ parallelism -> Dispatchers.Default }) // TODO doesn't take parallelism into account
}

private const val WORK_INSIDE = 80
Expand Down
Expand Up @@ -27,10 +27,8 @@ import kotlin.coroutines.*
@State(Scope.Benchmark)
open class PingPongWithBlockingContext {

@UseExperimental(InternalCoroutinesApi::class)
private val experimental = ExperimentalCoroutineDispatcher(8)
@UseExperimental(InternalCoroutinesApi::class)
private val blocking = experimental.blocking(8)
private val experimental = Dispatchers.Default
private val blocking = Dispatchers.IO.limitedParallelism(8)
private val threadPool = newFixedThreadPoolContext(8, "PongCtx")

@TearDown
Expand Down
29 changes: 22 additions & 7 deletions kotlinx-coroutines-core/jvm/src/Dispatchers.kt
Expand Up @@ -86,7 +86,7 @@ public actual object Dispatchers {
* Note that if you need your coroutine to be confined to a particular thread or a thread-pool after resumption,
* but still want to execute it in the current call-frame until its first suspension, then you can use
* an optional [CoroutineStart] parameter in coroutine builders like
* [launch][CoroutineScope.launch] and [async][CoroutineScope.async] setting it to the
* [launch][CoroutineScope.launch] and [async][CoroutineScope.async] setting it to
* the value of [CoroutineStart.UNDISPATCHED].
*/
@JvmStatic
Expand All @@ -100,22 +100,37 @@ public actual object Dispatchers {
* "`kotlinx.coroutines.io.parallelism`" ([IO_PARALLELISM_PROPERTY_NAME]) system property.
* It defaults to the limit of 64 threads or the number of cores (whichever is larger).
*
* Moreover, the maximum configurable number of threads is capped by the
* `kotlinx.coroutines.scheduler.max.pool.size` system property.
* If you need a higher number of parallel threads,
* you should use a custom dispatcher backed by your own thread pool.
* ### Elasticity for limited parallelism
*
* `Dispatchers.IO` has a unique property of elasticity: its slices
* obtained with [CoroutineDispatcher.limitedParallelism] are
* not restricted by the `Dispatchers.IO` parallelism and conceptually
* are obtained from the unlimited backing pool of threads, the same
* `Dispatchers.IO` was created from, meaning that it still shares
* threads and resources with its slices internally.
*
* In the following example
* ```
* // 100 threads for MySQL connection
* val myMysqlDbDispatcher = Dispatchers.IO.limitedParallelism(100)
* // 60 threads for MongoDB connection
* val myMongoDbDispatcher = Dispatchers.IO.limitedParallelism(60)
* ```
* the system may have up to `64 + 100 + 60` threads during peak loads,
* but during its steady state it has a minimum viable number of threads shared
* among `Dispatchers.IO`, `myMysqlDbDispatcher` and `myMongoDbDispatcher`.
*
* ### Implementation note
*
* This dispatcher shares threads with the [Default][Dispatchers.Default] dispatcher, so using
* This dispatcher and its slices share threads with the [Default][Dispatchers.Default] dispatcher, so using
* `withContext(Dispatchers.IO) { ... }` when already running on the [Default][Dispatchers.Default]
* dispatcher does not lead to an actual switching to another thread — typically execution
* continues in the same thread.
* As a result of thread sharing, more than 64 (default parallelism) threads can be created (but not used)
* during operations over IO dispatcher.
*/
@JvmStatic
public val IO: CoroutineDispatcher = DefaultScheduler.IO
public val IO: CoroutineDispatcher = DefaultIoScheduler

/**
* Shuts down built-in dispatchers, such as [Default] and [IO],
Expand Down
212 changes: 212 additions & 0 deletions kotlinx-coroutines-core/jvm/src/scheduling/Deprecated.kt
@@ -0,0 +1,212 @@
/*
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

@file:Suppress("unused")

package kotlinx.coroutines.scheduling
import kotlinx.atomicfu.*
import kotlinx.coroutines.*
import java.util.concurrent.*
import kotlin.coroutines.*

/**
* This API was "public @InternalApi" and leaked into Ktor enabled-by-default sources.
* Since then, we refactored scheduler sources and its API and decided to get rid of it in
* its current shape.
*
* To preserve backwards compatibility with Ktor 1.x, previous version of the code is
* extracted here as is and isolated from the rest of code base, so R8 can get rid of it.
*
* It should be removed after Kotlin 3.0.0 (EOL of Ktor 1.x) around 2022.
*/
@PublishedApi
internal open class ExperimentalCoroutineDispatcher(
private val corePoolSize: Int,
private val maxPoolSize: Int,
private val idleWorkerKeepAliveNs: Long,
private val schedulerName: String = "CoroutineScheduler"
) : ExecutorCoroutineDispatcher() {
public constructor(
corePoolSize: Int = CORE_POOL_SIZE,
maxPoolSize: Int = MAX_POOL_SIZE,
schedulerName: String = DEFAULT_SCHEDULER_NAME
) : this(corePoolSize, maxPoolSize, IDLE_WORKER_KEEP_ALIVE_NS, schedulerName)

@Deprecated(message = "Binary compatibility for Ktor 1.0-beta", level = DeprecationLevel.HIDDEN)
public constructor(
corePoolSize: Int = CORE_POOL_SIZE,
maxPoolSize: Int = MAX_POOL_SIZE
) : this(corePoolSize, maxPoolSize, IDLE_WORKER_KEEP_ALIVE_NS)

override val executor: Executor
get() = coroutineScheduler

// This is variable for test purposes, so that we can reinitialize from clean state
private var coroutineScheduler = createScheduler()

override fun dispatch(context: CoroutineContext, block: Runnable): Unit =
try {
coroutineScheduler.dispatch(block)
} catch (e: RejectedExecutionException) {
// CoroutineScheduler only rejects execution when it is being closed and this behavior is reserved
// for testing purposes, so we don't have to worry about cancelling the affected Job here.
DefaultExecutor.dispatch(context, block)
}

override fun dispatchYield(context: CoroutineContext, block: Runnable): Unit =
try {
coroutineScheduler.dispatch(block, tailDispatch = true)
} catch (e: RejectedExecutionException) {
// CoroutineScheduler only rejects execution when it is being closed and this behavior is reserved
// for testing purposes, so we don't have to worry about cancelling the affected Job here.
DefaultExecutor.dispatchYield(context, block)
}

override fun close(): Unit = coroutineScheduler.close()

override fun toString(): String {
return "${super.toString()}[scheduler = $coroutineScheduler]"
}

/**
* Creates a coroutine execution context with limited parallelism to execute tasks which may potentially block.
* Resulting [CoroutineDispatcher] doesn't own any resources (its threads) and provides a view of the original [ExperimentalCoroutineDispatcher],
* giving it additional hints to adjust its behaviour.
*
* @param parallelism parallelism level, indicating how many threads can execute tasks in the resulting dispatcher parallel.
*/
fun blocking(parallelism: Int = 16): CoroutineDispatcher {
require(parallelism > 0) { "Expected positive parallelism level, but have $parallelism" }
return LimitingDispatcher(this, parallelism, null, TASK_PROBABLY_BLOCKING)
}

/**
* Creates a coroutine execution context with limited parallelism to execute CPU-intensive tasks.
* Resulting [CoroutineDispatcher] doesn't own any resources (its threads) and provides a view of the original [ExperimentalCoroutineDispatcher],
* giving it additional hints to adjust its behaviour.
*
* @param parallelism parallelism level, indicating how many threads can execute tasks in the resulting dispatcher parallel.
*/
fun limited(parallelism: Int): CoroutineDispatcher {
require(parallelism > 0) { "Expected positive parallelism level, but have $parallelism" }
require(parallelism <= corePoolSize) { "Expected parallelism level lesser than core pool size ($corePoolSize), but have $parallelism" }
return LimitingDispatcher(this, parallelism, null, TASK_NON_BLOCKING)
}

internal fun dispatchWithContext(block: Runnable, context: TaskContext, tailDispatch: Boolean) {
try {
coroutineScheduler.dispatch(block, context, tailDispatch)
} catch (e: RejectedExecutionException) {
// CoroutineScheduler only rejects execution when it is being closed and this behavior is reserved
// for testing purposes, so we don't have to worry about cancelling the affected Job here.
// TaskContext shouldn't be lost here to properly invoke before/after task
DefaultExecutor.enqueue(coroutineScheduler.createTask(block, context))
}
}

private fun createScheduler() = CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName)
}

private class LimitingDispatcher(
private val dispatcher: ExperimentalCoroutineDispatcher,
private val parallelism: Int,
private val name: String?,
override val taskMode: Int
) : ExecutorCoroutineDispatcher(), TaskContext, Executor {

private val queue = ConcurrentLinkedQueue<Runnable>()
private val inFlightTasks = atomic(0)

override val executor: Executor
get() = this

override fun execute(command: Runnable) = dispatch(command, false)

override fun close(): Unit = error("Close cannot be invoked on LimitingBlockingDispatcher")

override fun dispatch(context: CoroutineContext, block: Runnable) = dispatch(block, false)

private fun dispatch(block: Runnable, tailDispatch: Boolean) {
var taskToSchedule = block
while (true) {
// Commit in-flight tasks slot
val inFlight = inFlightTasks.incrementAndGet()

// Fast path, if parallelism limit is not reached, dispatch task and return
if (inFlight <= parallelism) {
dispatcher.dispatchWithContext(taskToSchedule, this, tailDispatch)
return
}

// Parallelism limit is reached, add task to the queue
queue.add(taskToSchedule)

/*
* We're not actually scheduled anything, so rollback committed in-flight task slot:
* If the amount of in-flight tasks is still above the limit, do nothing
* If the amount of in-flight tasks is lesser than parallelism, then
* it's a race with a thread which finished the task from the current context, we should resubmit the first task from the queue
* to avoid starvation.
*
* Race example #1 (TN is N-th thread, R is current in-flight tasks number), execution is sequential:
*
* T1: submit task, start execution, R == 1
* T2: commit slot for next task, R == 2
* T1: finish T1, R == 1
* T2: submit next task to local queue, decrement R, R == 0
* Without retries, task from T2 will be stuck in the local queue
*/
if (inFlightTasks.decrementAndGet() >= parallelism) {
return
}

taskToSchedule = queue.poll() ?: return
}
}

override fun dispatchYield(context: CoroutineContext, block: Runnable) {
dispatch(block, tailDispatch = true)
}

override fun toString(): String {
return name ?: "${super.toString()}[dispatcher = $dispatcher]"
}

/**
* Tries to dispatch tasks which were blocked due to reaching parallelism limit if there is any.
*
* Implementation note: blocking tasks are scheduled in a fair manner (to local queue tail) to avoid
* non-blocking continuations starvation.
* E.g. for
* ```
* foo()
* blocking()
* bar()
* ```
* it's more profitable to execute bar at the end of `blocking` rather than pending blocking task
*/
override fun afterTask() {
var next = queue.poll()
// If we have pending tasks in current blocking context, dispatch first
if (next != null) {
dispatcher.dispatchWithContext(next, this, true)
return
}
inFlightTasks.decrementAndGet()

/*
* Re-poll again and try to submit task if it's required otherwise tasks may be stuck in the local queue.
* Race example #2 (TN is N-th thread, R is current in-flight tasks number), execution is sequential:
* T1: submit task, start execution, R == 1
* T2: commit slot for next task, R == 2
* T1: finish T1, poll queue (it's still empty), R == 2
* T2: submit next task to the local queue, decrement R, R == 1
* T1: decrement R, finish. R == 0
*
* The task from T2 is stuck is the local queue
*/
next = queue.poll() ?: return
dispatch(next, true)
}
}

0 comments on commit 5fe0261

Please sign in to comment.