Skip to content

Commit

Permalink
Introduce Dispatchers.shutdown
Browse files Browse the repository at this point in the history
Fixes #2558
  • Loading branch information
qwwdfsad committed Sep 3, 2021
1 parent 9530760 commit 8fa2ebf
Show file tree
Hide file tree
Showing 9 changed files with 63 additions and 14 deletions.
1 change: 1 addition & 0 deletions kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
Expand Up @@ -279,6 +279,7 @@ public final class kotlinx/coroutines/Dispatchers {
public static final fun getIO ()Lkotlinx/coroutines/CoroutineDispatcher;
public static final fun getMain ()Lkotlinx/coroutines/MainCoroutineDispatcher;
public static final fun getUnconfined ()Lkotlinx/coroutines/CoroutineDispatcher;
public final fun shutdown ()V
}

public final class kotlinx/coroutines/DispatchersKt {
Expand Down
4 changes: 2 additions & 2 deletions kotlinx-coroutines-core/common/src/EventLoop.common.kt
Expand Up @@ -115,7 +115,7 @@ internal abstract class EventLoop : CoroutineDispatcher() {
}
}

protected open fun shutdown() {}
open fun shutdown() {}
}

@ThreadLocal
Expand Down Expand Up @@ -279,7 +279,7 @@ internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay {

public final override fun dispatch(context: CoroutineContext, block: Runnable) = enqueue(block)

public fun enqueue(task: Runnable) {
open fun enqueue(task: Runnable) {
if (enqueueImpl(task)) {
// todo: we should unpark only when this delayed task became first in the queue
unpark()
Expand Down
6 changes: 6 additions & 0 deletions kotlinx-coroutines-core/jvm/src/CoroutineContext.kt
Expand Up @@ -22,6 +22,12 @@ internal val useCoroutinesScheduler = systemProp(COROUTINES_SCHEDULER_PROPERTY_N
internal actual fun createDefaultDispatcher(): CoroutineDispatcher =
if (useCoroutinesScheduler) DefaultScheduler else CommonPool

internal fun shutdownDefaultDispatchers() {
// Shutdown IO and, maybe, default
DefaultScheduler.shutdown()
if (!useCoroutinesScheduler) CommonPool.shutdown(0L)
}

/**
* Creates context for the new coroutine. It installs [Dispatchers.Default] when no other dispatcher nor
* [ContinuationInterceptor] is specified, and adds optional support for debugging facilities (when turned on).
Expand Down
24 changes: 18 additions & 6 deletions kotlinx-coroutines-core/jvm/src/DefaultExecutor.kt
Expand Up @@ -17,13 +17,13 @@ internal actual object DefaultExecutor : EventLoopImplBase(), Runnable {
incrementUseCount() // this event loop is never completed
}

private const val DEFAULT_KEEP_ALIVE = 1000L // in milliseconds
private const val DEFAULT_KEEP_ALIVE_MS = 1000L // in milliseconds

private val KEEP_ALIVE_NANOS = TimeUnit.MILLISECONDS.toNanos(
try {
java.lang.Long.getLong("kotlinx.coroutines.DefaultExecutor.keepAlive", DEFAULT_KEEP_ALIVE)
java.lang.Long.getLong("kotlinx.coroutines.DefaultExecutor.keepAlive", DEFAULT_KEEP_ALIVE_MS)
} catch (e: SecurityException) {
DEFAULT_KEEP_ALIVE
DEFAULT_KEEP_ALIVE_MS
})

@Suppress("ObjectPropertyName")
Expand All @@ -37,15 +37,28 @@ internal actual object DefaultExecutor : EventLoopImplBase(), Runnable {
private const val ACTIVE = 1
private const val SHUTDOWN_REQ = 2
private const val SHUTDOWN_ACK = 3
private const val SHUTDOWN = 4

@Volatile
private var debugStatus: Int = FRESH

private val isShutDown: Boolean get() = debugStatus == SHUTDOWN

private val isShutdownRequested: Boolean get() {
val debugStatus = debugStatus
return debugStatus == SHUTDOWN_REQ || debugStatus == SHUTDOWN_ACK
}

actual override fun enqueue(task: Runnable) {
if (isShutDown) throw RejectedExecutionException("DefaultExecutor was shut down")
super.enqueue(task)
}

override fun shutdown() {
debugStatus = SHUTDOWN
super.shutdown()
}

/**
* All event loops are using DefaultExecutor#invokeOnTimeout to avoid livelock on
* ```
Expand Down Expand Up @@ -118,9 +131,8 @@ internal actual object DefaultExecutor : EventLoopImplBase(), Runnable {
return true
}

// used for tests
@Synchronized
fun shutdown(timeout: Long) {
@Synchronized // used _only_ for tests
fun shutdownForTests(timeout: Long) {
val deadline = System.currentTimeMillis() + timeout
if (!isShutdownRequested) debugStatus = SHUTDOWN_REQ
// loop while there is anything to do immediately or deadline passes
Expand Down
25 changes: 24 additions & 1 deletion kotlinx-coroutines-core/jvm/src/Dispatchers.kt
Expand Up @@ -21,7 +21,7 @@ public const val IO_PARALLELISM_PROPERTY_NAME: String = "kotlinx.coroutines.io.p
public actual object Dispatchers {
/**
* The default [CoroutineDispatcher] that is used by all standard builders like
* [launch][CoroutineScope.launch], [async][CoroutineScope.async], etc
* [launch][CoroutineScope.launch], [async][CoroutineScope.async], etc.
* if no dispatcher nor any other [ContinuationInterceptor] is specified in their context.
*
* It is backed by a shared pool of threads on JVM. By default, the maximal level of parallelism used
Expand Down Expand Up @@ -116,4 +116,27 @@ public actual object Dispatchers {
*/
@JvmStatic
public val IO: CoroutineDispatcher = DefaultScheduler.IO

/**
* Shuts down built-in dispatchers, such as [Default] and [IO],
* stopping all the threads associated with them and making them reject all new tasks.
* Dispatcher used as a fallback for time-related operations (`delay`, `withTimeout`)
* and to handle rejected tasks from other dispatchers is also shut down.
*
* This is a **delicate** API. It is not supposed to be called from a general
* application-level code and its invocation is irreversible.
* The invocation of shutdown affects most of the coroutines machinery and
* leaves the coroutines framework in an inoperable state.
*
* The main goal of the shutdown is to stop all background threads associated with the coroutines
* framework in order to make kotlinx.coroutines classes unloadable by Java Virtual Machine.
* It is only recommended to be used in containerized environments (OSGi, Gradle plugins system,
* IDEA plugins) at the end of the container lifecycle.
*/
@DelicateCoroutinesApi
public fun shutdown() {
DefaultExecutor.shutdown()
// Also shuts down Dispatchers.IO
shutdownDefaultDispatchers()
}
}
7 changes: 7 additions & 0 deletions kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt
Expand Up @@ -14,13 +14,20 @@ import kotlin.coroutines.*
* Default instance of coroutine dispatcher.
*/
internal object DefaultScheduler : ExperimentalCoroutineDispatcher() {
@JvmField
val IO: CoroutineDispatcher = LimitingDispatcher(
this,
systemProp(IO_PARALLELISM_PROPERTY_NAME, 64.coerceAtLeast(AVAILABLE_PROCESSORS)),
"Dispatchers.IO",
TASK_PROBABLY_BLOCKING
)

// Shuts down the dispatcher, used only by Dispatchers.shutdown()
internal fun shutdown() {
super.close()
}

// Overridden in case anyone writes (Dispatchers.Default as ExecutorCoroutineDispatcher).close()
override fun close() {
throw UnsupportedOperationException("$DEFAULT_DISPATCHER_NAME cannot be closed")
}
Expand Down
2 changes: 1 addition & 1 deletion kotlinx-coroutines-core/jvm/test/TestBase.kt
Expand Up @@ -206,7 +206,7 @@ public actual open class TestBase(private var disableOutCheck: Boolean) {
fun shutdownPoolsAfterTest() {
CommonPool.shutdown(SHUTDOWN_TIMEOUT)
DefaultScheduler.shutdown(SHUTDOWN_TIMEOUT)
DefaultExecutor.shutdown(SHUTDOWN_TIMEOUT)
DefaultExecutor.shutdownForTests(SHUTDOWN_TIMEOUT)
CommonPool.restore()
DefaultScheduler.restore()
}
Expand Down
4 changes: 2 additions & 2 deletions kotlinx-coroutines-core/jvm/test/VirtualTimeSource.kt
Expand Up @@ -11,14 +11,14 @@ import java.util.concurrent.locks.*
private const val SHUTDOWN_TIMEOUT = 1000L

internal inline fun withVirtualTimeSource(log: PrintStream? = null, block: () -> Unit) {
DefaultExecutor.shutdown(SHUTDOWN_TIMEOUT) // shutdown execution with old time source (in case it was working)
DefaultExecutor.shutdownForTests(SHUTDOWN_TIMEOUT) // shutdown execution with old time source (in case it was working)
val testTimeSource = VirtualTimeSource(log)
timeSource = testTimeSource
DefaultExecutor.ensureStarted() // should start with new time source
try {
block()
} finally {
DefaultExecutor.shutdown(SHUTDOWN_TIMEOUT)
DefaultExecutor.shutdownForTests(SHUTDOWN_TIMEOUT)
testTimeSource.shutdown()
timeSource = null // restore time source
}
Expand Down
4 changes: 2 additions & 2 deletions kotlinx-coroutines-core/jvm/test/knit/TestUtil.kt
Expand Up @@ -28,7 +28,7 @@ fun <R> test(name: String, block: () -> R): List<String> = outputException(name)
captureOutput(name, stdoutEnabled = OUT_ENABLED) { log ->
CommonPool.usePrivatePool()
DefaultScheduler.usePrivateScheduler()
DefaultExecutor.shutdown(SHUTDOWN_TIMEOUT)
DefaultExecutor.shutdownForTests(SHUTDOWN_TIMEOUT)
resetCoroutineId()
val threadsBefore = currentThreads()
try {
Expand All @@ -42,7 +42,7 @@ fun <R> test(name: String, block: () -> R): List<String> = outputException(name)
CommonPool.shutdown(SHUTDOWN_TIMEOUT)
DefaultScheduler.shutdown(SHUTDOWN_TIMEOUT)
shutdownDispatcherPools(SHUTDOWN_TIMEOUT)
DefaultExecutor.shutdown(SHUTDOWN_TIMEOUT) // the last man standing -- cleanup all pending tasks
DefaultExecutor.shutdownForTests(SHUTDOWN_TIMEOUT) // the last man standing -- cleanup all pending tasks
}
checkTestThreads(threadsBefore) // check thread if the main completed successfully
}
Expand Down

0 comments on commit 8fa2ebf

Please sign in to comment.