Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce Dispatchers.shutdown #2915

Merged
merged 4 commits into from
Sep 9, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
Expand Up @@ -279,6 +279,7 @@ public final class kotlinx/coroutines/Dispatchers {
public static final fun getIO ()Lkotlinx/coroutines/CoroutineDispatcher;
public static final fun getMain ()Lkotlinx/coroutines/MainCoroutineDispatcher;
public static final fun getUnconfined ()Lkotlinx/coroutines/CoroutineDispatcher;
public final fun shutdown ()V
}

public final class kotlinx/coroutines/DispatchersKt {
Expand Down
4 changes: 2 additions & 2 deletions kotlinx-coroutines-core/common/src/EventLoop.common.kt
Expand Up @@ -115,7 +115,7 @@ internal abstract class EventLoop : CoroutineDispatcher() {
}
}

protected open fun shutdown() {}
open fun shutdown() {}
}

@ThreadLocal
Expand Down Expand Up @@ -279,7 +279,7 @@ internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay {

public final override fun dispatch(context: CoroutineContext, block: Runnable) = enqueue(block)

public fun enqueue(task: Runnable) {
open fun enqueue(task: Runnable) {
if (enqueueImpl(task)) {
// todo: we should unpark only when this delayed task became first in the queue
unpark()
Expand Down
6 changes: 6 additions & 0 deletions kotlinx-coroutines-core/jvm/src/CoroutineContext.kt
Expand Up @@ -22,6 +22,12 @@ internal val useCoroutinesScheduler = systemProp(COROUTINES_SCHEDULER_PROPERTY_N
internal actual fun createDefaultDispatcher(): CoroutineDispatcher =
if (useCoroutinesScheduler) DefaultScheduler else CommonPool

internal fun shutdownDefaultDispatchers() {
// Shutdown IO and, maybe, default
DefaultScheduler.shutdown()
if (!useCoroutinesScheduler) CommonPool.shutdown(0L)
}

/**
* Creates context for the new coroutine. It installs [Dispatchers.Default] when no other dispatcher nor
* [ContinuationInterceptor] is specified, and adds optional support for debugging facilities (when turned on).
Expand Down
35 changes: 29 additions & 6 deletions kotlinx-coroutines-core/jvm/src/DefaultExecutor.kt
Expand Up @@ -17,13 +17,13 @@ internal actual object DefaultExecutor : EventLoopImplBase(), Runnable {
incrementUseCount() // this event loop is never completed
}

private const val DEFAULT_KEEP_ALIVE = 1000L // in milliseconds
private const val DEFAULT_KEEP_ALIVE_MS = 1000L // in milliseconds

private val KEEP_ALIVE_NANOS = TimeUnit.MILLISECONDS.toNanos(
try {
java.lang.Long.getLong("kotlinx.coroutines.DefaultExecutor.keepAlive", DEFAULT_KEEP_ALIVE)
java.lang.Long.getLong("kotlinx.coroutines.DefaultExecutor.keepAlive", DEFAULT_KEEP_ALIVE_MS)
} catch (e: SecurityException) {
DEFAULT_KEEP_ALIVE
DEFAULT_KEEP_ALIVE_MS
})

@Suppress("ObjectPropertyName")
Expand All @@ -37,15 +37,39 @@ internal actual object DefaultExecutor : EventLoopImplBase(), Runnable {
private const val ACTIVE = 1
private const val SHUTDOWN_REQ = 2
private const val SHUTDOWN_ACK = 3
private const val SHUTDOWN = 4

@Volatile
private var debugStatus: Int = FRESH

val isShutDown: Boolean get() = debugStatus == SHUTDOWN

private val isShutdownRequested: Boolean get() {
val debugStatus = debugStatus
return debugStatus == SHUTDOWN_REQ || debugStatus == SHUTDOWN_ACK
}

actual override fun enqueue(task: Runnable) {
if (isShutDown) shutdownError()
super.enqueue(task)
}

override fun reschedule(now: Long, delayedTask: DelayedTask) {
// Reschedule on default executor can only be invoked after Dispatchers.shutdown
shutdownError()
}

private fun shutdownError() {
throw RejectedExecutionException("DefaultExecutor was shut down. " +
"This error indicates that Dispatchers.shutdown() was invoked prior to completion of exiting coroutines, leaving coroutines in incomplete state. " +
"Please refer to Dispatchers.shutdown documentation for more details")
qwwdfsad marked this conversation as resolved.
Show resolved Hide resolved
}

override fun shutdown() {
debugStatus = SHUTDOWN
super.shutdown()
}

/**
* All event loops are using DefaultExecutor#invokeOnTimeout to avoid livelock on
* ```
Expand Down Expand Up @@ -118,9 +142,8 @@ internal actual object DefaultExecutor : EventLoopImplBase(), Runnable {
return true
}

// used for tests
@Synchronized
fun shutdown(timeout: Long) {
@Synchronized // used _only_ for tests
fun shutdownForTests(timeout: Long) {
val deadline = System.currentTimeMillis() + timeout
if (!isShutdownRequested) debugStatus = SHUTDOWN_REQ
// loop while there is anything to do immediately or deadline passes
Expand Down
27 changes: 26 additions & 1 deletion kotlinx-coroutines-core/jvm/src/Dispatchers.kt
Expand Up @@ -21,7 +21,7 @@ public const val IO_PARALLELISM_PROPERTY_NAME: String = "kotlinx.coroutines.io.p
public actual object Dispatchers {
/**
* The default [CoroutineDispatcher] that is used by all standard builders like
* [launch][CoroutineScope.launch], [async][CoroutineScope.async], etc
* [launch][CoroutineScope.launch], [async][CoroutineScope.async], etc.
* if no dispatcher nor any other [ContinuationInterceptor] is specified in their context.
*
* It is backed by a shared pool of threads on JVM. By default, the maximal level of parallelism used
Expand Down Expand Up @@ -116,4 +116,29 @@ public actual object Dispatchers {
*/
@JvmStatic
public val IO: CoroutineDispatcher = DefaultScheduler.IO

/**
* Shuts down built-in dispatchers, such as [Default] and [IO],
* stopping all the threads associated with them and making them reject all new tasks.
* Dispatcher used as a fallback for time-related operations (`delay`, `withTimeout`)
* and to handle rejected tasks from other dispatchers is also shut down.
*
* This is a **delicate** API. It is not supposed to be called from a general
* application-level code and its invocation is irreversible.
* The invocation of shutdown affects most of the coroutines machinery and
* leaves the coroutines framework in an inoperable state.
* An invocation of shutdown method while having pending tasks or active coroutines
* will leave them in a permanent dormant state, preventing them from completion or execution.
qwwdfsad marked this conversation as resolved.
Show resolved Hide resolved
*
* The main goal of the shutdown is to stop all background threads associated with the coroutines
* framework in order to make kotlinx.coroutines classes unloadable by Java Virtual Machine.
* It is only recommended to be used in containerized environments (OSGi, Gradle plugins system,
* IDEA plugins) at the end of the container lifecycle.
*/
@DelicateCoroutinesApi
public fun shutdown() {
DefaultExecutor.shutdown()
// Also shuts down Dispatchers.IO
shutdownDefaultDispatchers()
}
}
3 changes: 1 addition & 2 deletions kotlinx-coroutines-core/jvm/src/EventLoop.kt
Expand Up @@ -13,8 +13,7 @@ internal actual abstract class EventLoopImplPlatform: EventLoop() {
unpark(thread)
}

protected actual fun reschedule(now: Long, delayedTask: EventLoopImplBase.DelayedTask) {
assert { this !== DefaultExecutor } // otherwise default execution was shutdown with tasks in it (cannot be)
protected actual open fun reschedule(now: Long, delayedTask: EventLoopImplBase.DelayedTask) {
DefaultExecutor.schedule(now, delayedTask)
}
}
Expand Down
7 changes: 7 additions & 0 deletions kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt
Expand Up @@ -14,13 +14,20 @@ import kotlin.coroutines.*
* Default instance of coroutine dispatcher.
*/
internal object DefaultScheduler : ExperimentalCoroutineDispatcher() {
@JvmField
val IO: CoroutineDispatcher = LimitingDispatcher(
this,
systemProp(IO_PARALLELISM_PROPERTY_NAME, 64.coerceAtLeast(AVAILABLE_PROCESSORS)),
"Dispatchers.IO",
TASK_PROBABLY_BLOCKING
)

// Shuts down the dispatcher, used only by Dispatchers.shutdown()
internal fun shutdown() {
super.close()
}

// Overridden in case anyone writes (Dispatchers.Default as ExecutorCoroutineDispatcher).close()
override fun close() {
throw UnsupportedOperationException("$DEFAULT_DISPATCHER_NAME cannot be closed")
}
Expand Down
2 changes: 1 addition & 1 deletion kotlinx-coroutines-core/jvm/test/TestBase.kt
Expand Up @@ -206,7 +206,7 @@ public actual open class TestBase(private var disableOutCheck: Boolean) {
fun shutdownPoolsAfterTest() {
CommonPool.shutdown(SHUTDOWN_TIMEOUT)
DefaultScheduler.shutdown(SHUTDOWN_TIMEOUT)
DefaultExecutor.shutdown(SHUTDOWN_TIMEOUT)
DefaultExecutor.shutdownForTests(SHUTDOWN_TIMEOUT)
CommonPool.restore()
DefaultScheduler.restore()
}
Expand Down
4 changes: 2 additions & 2 deletions kotlinx-coroutines-core/jvm/test/VirtualTimeSource.kt
Expand Up @@ -11,14 +11,14 @@ import java.util.concurrent.locks.*
private const val SHUTDOWN_TIMEOUT = 1000L

internal inline fun withVirtualTimeSource(log: PrintStream? = null, block: () -> Unit) {
DefaultExecutor.shutdown(SHUTDOWN_TIMEOUT) // shutdown execution with old time source (in case it was working)
DefaultExecutor.shutdownForTests(SHUTDOWN_TIMEOUT) // shutdown execution with old time source (in case it was working)
val testTimeSource = VirtualTimeSource(log)
timeSource = testTimeSource
DefaultExecutor.ensureStarted() // should start with new time source
try {
block()
} finally {
DefaultExecutor.shutdown(SHUTDOWN_TIMEOUT)
DefaultExecutor.shutdownForTests(SHUTDOWN_TIMEOUT)
testTimeSource.shutdown()
timeSource = null // restore time source
}
Expand Down
4 changes: 2 additions & 2 deletions kotlinx-coroutines-core/jvm/test/knit/TestUtil.kt
Expand Up @@ -28,7 +28,7 @@ fun <R> test(name: String, block: () -> R): List<String> = outputException(name)
captureOutput(name, stdoutEnabled = OUT_ENABLED) { log ->
CommonPool.usePrivatePool()
DefaultScheduler.usePrivateScheduler()
DefaultExecutor.shutdown(SHUTDOWN_TIMEOUT)
DefaultExecutor.shutdownForTests(SHUTDOWN_TIMEOUT)
resetCoroutineId()
val threadsBefore = currentThreads()
try {
Expand All @@ -42,7 +42,7 @@ fun <R> test(name: String, block: () -> R): List<String> = outputException(name)
CommonPool.shutdown(SHUTDOWN_TIMEOUT)
DefaultScheduler.shutdown(SHUTDOWN_TIMEOUT)
shutdownDispatcherPools(SHUTDOWN_TIMEOUT)
DefaultExecutor.shutdown(SHUTDOWN_TIMEOUT) // the last man standing -- cleanup all pending tasks
DefaultExecutor.shutdownForTests(SHUTDOWN_TIMEOUT) // the last man standing -- cleanup all pending tasks
}
checkTestThreads(threadsBefore) // check thread if the main completed successfully
}
Expand Down