Skip to content

Commit

Permalink
Introduce Dispatchers.shutdown (Kotlin#2915)
Browse files Browse the repository at this point in the history
  • Loading branch information
qwwdfsad authored and pablobaxter committed Sep 14, 2022
1 parent 3d3b13a commit a605eb3
Show file tree
Hide file tree
Showing 9 changed files with 73 additions and 16 deletions.
1 change: 1 addition & 0 deletions kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
Expand Up @@ -279,6 +279,7 @@ public final class kotlinx/coroutines/Dispatchers {
public static final fun getIO ()Lkotlinx/coroutines/CoroutineDispatcher;
public static final fun getMain ()Lkotlinx/coroutines/MainCoroutineDispatcher;
public static final fun getUnconfined ()Lkotlinx/coroutines/CoroutineDispatcher;
public final fun shutdown ()V
}

public final class kotlinx/coroutines/DispatchersKt {
Expand Down
4 changes: 2 additions & 2 deletions kotlinx-coroutines-core/common/src/EventLoop.common.kt
Expand Up @@ -115,7 +115,7 @@ internal abstract class EventLoop : CoroutineDispatcher() {
}
}

protected open fun shutdown() {}
open fun shutdown() {}
}

@ThreadLocal
Expand Down Expand Up @@ -279,7 +279,7 @@ internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay {

public final override fun dispatch(context: CoroutineContext, block: Runnable) = enqueue(block)

public fun enqueue(task: Runnable) {
open fun enqueue(task: Runnable) {
if (enqueueImpl(task)) {
// todo: we should unpark only when this delayed task became first in the queue
unpark()
Expand Down
35 changes: 29 additions & 6 deletions kotlinx-coroutines-core/jvm/src/DefaultExecutor.kt
Expand Up @@ -17,13 +17,13 @@ internal actual object DefaultExecutor : EventLoopImplBase(), Runnable {
incrementUseCount() // this event loop is never completed
}

private const val DEFAULT_KEEP_ALIVE = 1000L // in milliseconds
private const val DEFAULT_KEEP_ALIVE_MS = 1000L // in milliseconds

private val KEEP_ALIVE_NANOS = TimeUnit.MILLISECONDS.toNanos(
try {
java.lang.Long.getLong("kotlinx.coroutines.DefaultExecutor.keepAlive", DEFAULT_KEEP_ALIVE)
java.lang.Long.getLong("kotlinx.coroutines.DefaultExecutor.keepAlive", DEFAULT_KEEP_ALIVE_MS)
} catch (e: SecurityException) {
DEFAULT_KEEP_ALIVE
DEFAULT_KEEP_ALIVE_MS
})

@Suppress("ObjectPropertyName")
Expand All @@ -37,15 +37,39 @@ internal actual object DefaultExecutor : EventLoopImplBase(), Runnable {
private const val ACTIVE = 1
private const val SHUTDOWN_REQ = 2
private const val SHUTDOWN_ACK = 3
private const val SHUTDOWN = 4

@Volatile
private var debugStatus: Int = FRESH

val isShutDown: Boolean get() = debugStatus == SHUTDOWN

private val isShutdownRequested: Boolean get() {
val debugStatus = debugStatus
return debugStatus == SHUTDOWN_REQ || debugStatus == SHUTDOWN_ACK
}

actual override fun enqueue(task: Runnable) {
if (isShutDown) shutdownError()
super.enqueue(task)
}

override fun reschedule(now: Long, delayedTask: DelayedTask) {
// Reschedule on default executor can only be invoked after Dispatchers.shutdown
shutdownError()
}

private fun shutdownError() {
throw RejectedExecutionException("DefaultExecutor was shut down. " +
"This error indicates that Dispatchers.shutdown() was invoked prior to completion of exiting coroutines, leaving coroutines in incomplete state. " +
"Please refer to Dispatchers.shutdown documentation for more details")
}

override fun shutdown() {
debugStatus = SHUTDOWN
super.shutdown()
}

/**
* All event loops are using DefaultExecutor#invokeOnTimeout to avoid livelock on
* ```
Expand Down Expand Up @@ -118,9 +142,8 @@ internal actual object DefaultExecutor : EventLoopImplBase(), Runnable {
return true
}

// used for tests
@Synchronized
fun shutdown(timeout: Long) {
@Synchronized // used _only_ for tests
fun shutdownForTests(timeout: Long) {
val deadline = System.currentTimeMillis() + timeout
if (!isShutdownRequested) debugStatus = SHUTDOWN_REQ
// loop while there is anything to do immediately or deadline passes
Expand Down
29 changes: 28 additions & 1 deletion kotlinx-coroutines-core/jvm/src/Dispatchers.kt
Expand Up @@ -21,7 +21,7 @@ public const val IO_PARALLELISM_PROPERTY_NAME: String = "kotlinx.coroutines.io.p
public actual object Dispatchers {
/**
* The default [CoroutineDispatcher] that is used by all standard builders like
* [launch][CoroutineScope.launch], [async][CoroutineScope.async], etc
* [launch][CoroutineScope.launch], [async][CoroutineScope.async], etc.
* if no dispatcher nor any other [ContinuationInterceptor] is specified in their context.
*
* It is backed by a shared pool of threads on JVM. By default, the maximal level of parallelism used
Expand Down Expand Up @@ -116,4 +116,31 @@ public actual object Dispatchers {
*/
@JvmStatic
public val IO: CoroutineDispatcher = DefaultScheduler.IO

/**
* Shuts down built-in dispatchers, such as [Default] and [IO],
* stopping all the threads associated with them and making them reject all new tasks.
* Dispatcher used as a fallback for time-related operations (`delay`, `withTimeout`)
* and to handle rejected tasks from other dispatchers is also shut down.
*
* This is a **delicate** API. It is not supposed to be called from a general
* application-level code and its invocation is irreversible.
* The invocation of shutdown affects most of the coroutines machinery and
* leaves the coroutines framework in an inoperable state.
* The shutdown method should only be invoked when there are no pending tasks or active coroutines.
* Otherwise, the behavior is unspecified: the call to `shutdown` may throw an exception without completing
* the shutdown, or it may finish successfully, but the remaining jobs will be in a permanent dormant state,
* never completing nor executing.
*
* The main goal of the shutdown is to stop all background threads associated with the coroutines
* framework in order to make kotlinx.coroutines classes unloadable by Java Virtual Machine.
* It is only recommended to be used in containerized environments (OSGi, Gradle plugins system,
* IDEA plugins) at the end of the container lifecycle.
*/
@DelicateCoroutinesApi
public fun shutdown() {
DefaultExecutor.shutdown()
// Also shuts down Dispatchers.IO
DefaultScheduler.shutdown()
}
}
3 changes: 1 addition & 2 deletions kotlinx-coroutines-core/jvm/src/EventLoop.kt
Expand Up @@ -13,8 +13,7 @@ internal actual abstract class EventLoopImplPlatform: EventLoop() {
unpark(thread)
}

protected actual fun reschedule(now: Long, delayedTask: EventLoopImplBase.DelayedTask) {
assert { this !== DefaultExecutor } // otherwise default execution was shutdown with tasks in it (cannot be)
protected actual open fun reschedule(now: Long, delayedTask: EventLoopImplBase.DelayedTask) {
DefaultExecutor.schedule(now, delayedTask)
}
}
Expand Down
7 changes: 7 additions & 0 deletions kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt
Expand Up @@ -14,13 +14,20 @@ import kotlin.coroutines.*
* Default instance of coroutine dispatcher.
*/
internal object DefaultScheduler : ExperimentalCoroutineDispatcher() {
@JvmField
val IO: CoroutineDispatcher = LimitingDispatcher(
this,
systemProp(IO_PARALLELISM_PROPERTY_NAME, 64.coerceAtLeast(AVAILABLE_PROCESSORS)),
"Dispatchers.IO",
TASK_PROBABLY_BLOCKING
)

// Shuts down the dispatcher, used only by Dispatchers.shutdown()
internal fun shutdown() {
super.close()
}

// Overridden in case anyone writes (Dispatchers.Default as ExecutorCoroutineDispatcher).close()
override fun close() {
throw UnsupportedOperationException("$DEFAULT_DISPATCHER_NAME cannot be closed")
}
Expand Down
2 changes: 1 addition & 1 deletion kotlinx-coroutines-core/jvm/test/TestBase.kt
Expand Up @@ -204,7 +204,7 @@ public actual open class TestBase(private var disableOutCheck: Boolean) {

fun shutdownPoolsAfterTest() {
DefaultScheduler.shutdown(SHUTDOWN_TIMEOUT)
DefaultExecutor.shutdown(SHUTDOWN_TIMEOUT)
DefaultExecutor.shutdownForTests(SHUTDOWN_TIMEOUT)
DefaultScheduler.restore()
}

Expand Down
4 changes: 2 additions & 2 deletions kotlinx-coroutines-core/jvm/test/VirtualTimeSource.kt
Expand Up @@ -11,14 +11,14 @@ import java.util.concurrent.locks.*
private const val SHUTDOWN_TIMEOUT = 1000L

internal inline fun withVirtualTimeSource(log: PrintStream? = null, block: () -> Unit) {
DefaultExecutor.shutdown(SHUTDOWN_TIMEOUT) // shutdown execution with old time source (in case it was working)
DefaultExecutor.shutdownForTests(SHUTDOWN_TIMEOUT) // shutdown execution with old time source (in case it was working)
val testTimeSource = VirtualTimeSource(log)
timeSource = testTimeSource
DefaultExecutor.ensureStarted() // should start with new time source
try {
block()
} finally {
DefaultExecutor.shutdown(SHUTDOWN_TIMEOUT)
DefaultExecutor.shutdownForTests(SHUTDOWN_TIMEOUT)
testTimeSource.shutdown()
timeSource = null // restore time source
}
Expand Down
4 changes: 2 additions & 2 deletions kotlinx-coroutines-core/jvm/test/knit/TestUtil.kt
Expand Up @@ -27,7 +27,7 @@ fun <R> test(name: String, block: () -> R): List<String> = outputException(name)
try {
captureOutput(name, stdoutEnabled = OUT_ENABLED) { log ->
DefaultScheduler.usePrivateScheduler()
DefaultExecutor.shutdown(SHUTDOWN_TIMEOUT)
DefaultExecutor.shutdownForTests(SHUTDOWN_TIMEOUT)
resetCoroutineId()
val threadsBefore = currentThreads()
try {
Expand All @@ -40,7 +40,7 @@ fun <R> test(name: String, block: () -> R): List<String> = outputException(name)
log.println("--- shutting down")
DefaultScheduler.shutdown(SHUTDOWN_TIMEOUT)
shutdownDispatcherPools(SHUTDOWN_TIMEOUT)
DefaultExecutor.shutdown(SHUTDOWN_TIMEOUT) // the last man standing -- cleanup all pending tasks
DefaultExecutor.shutdownForTests(SHUTDOWN_TIMEOUT) // the last man standing -- cleanup all pending tasks
}
checkTestThreads(threadsBefore) // check thread if the main completed successfully
}
Expand Down

0 comments on commit a605eb3

Please sign in to comment.