diff --git a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api index 50bfb60d62..15d8b7409c 100644 --- a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api +++ b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api @@ -279,6 +279,7 @@ public final class kotlinx/coroutines/Dispatchers { public static final fun getIO ()Lkotlinx/coroutines/CoroutineDispatcher; public static final fun getMain ()Lkotlinx/coroutines/MainCoroutineDispatcher; public static final fun getUnconfined ()Lkotlinx/coroutines/CoroutineDispatcher; + public final fun shutdown ()V } public final class kotlinx/coroutines/DispatchersKt { diff --git a/kotlinx-coroutines-core/common/src/EventLoop.common.kt b/kotlinx-coroutines-core/common/src/EventLoop.common.kt index e6a57c927a..e2a1ffd69f 100644 --- a/kotlinx-coroutines-core/common/src/EventLoop.common.kt +++ b/kotlinx-coroutines-core/common/src/EventLoop.common.kt @@ -115,7 +115,7 @@ internal abstract class EventLoop : CoroutineDispatcher() { } } - protected open fun shutdown() {} + open fun shutdown() {} } @ThreadLocal @@ -279,7 +279,7 @@ internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay { public final override fun dispatch(context: CoroutineContext, block: Runnable) = enqueue(block) - public fun enqueue(task: Runnable) { + open fun enqueue(task: Runnable) { if (enqueueImpl(task)) { // todo: we should unpark only when this delayed task became first in the queue unpark() diff --git a/kotlinx-coroutines-core/jvm/src/CoroutineContext.kt b/kotlinx-coroutines-core/jvm/src/CoroutineContext.kt index e91bb9fd21..97d528209b 100644 --- a/kotlinx-coroutines-core/jvm/src/CoroutineContext.kt +++ b/kotlinx-coroutines-core/jvm/src/CoroutineContext.kt @@ -22,6 +22,12 @@ internal val useCoroutinesScheduler = systemProp(COROUTINES_SCHEDULER_PROPERTY_N internal actual fun createDefaultDispatcher(): CoroutineDispatcher = if (useCoroutinesScheduler) DefaultScheduler else CommonPool +internal fun shutdownDefaultDispatchers() { + // Shutdown IO and, maybe, default + DefaultScheduler.shutdown() + if (!useCoroutinesScheduler) CommonPool.shutdown(0L) +} + /** * Creates context for the new coroutine. It installs [Dispatchers.Default] when no other dispatcher nor * [ContinuationInterceptor] is specified, and adds optional support for debugging facilities (when turned on). diff --git a/kotlinx-coroutines-core/jvm/src/DefaultExecutor.kt b/kotlinx-coroutines-core/jvm/src/DefaultExecutor.kt index fe020276e5..349ab62ea9 100644 --- a/kotlinx-coroutines-core/jvm/src/DefaultExecutor.kt +++ b/kotlinx-coroutines-core/jvm/src/DefaultExecutor.kt @@ -17,13 +17,13 @@ internal actual object DefaultExecutor : EventLoopImplBase(), Runnable { incrementUseCount() // this event loop is never completed } - private const val DEFAULT_KEEP_ALIVE = 1000L // in milliseconds + private const val DEFAULT_KEEP_ALIVE_MS = 1000L // in milliseconds private val KEEP_ALIVE_NANOS = TimeUnit.MILLISECONDS.toNanos( try { - java.lang.Long.getLong("kotlinx.coroutines.DefaultExecutor.keepAlive", DEFAULT_KEEP_ALIVE) + java.lang.Long.getLong("kotlinx.coroutines.DefaultExecutor.keepAlive", DEFAULT_KEEP_ALIVE_MS) } catch (e: SecurityException) { - DEFAULT_KEEP_ALIVE + DEFAULT_KEEP_ALIVE_MS }) @Suppress("ObjectPropertyName") @@ -37,15 +37,28 @@ internal actual object DefaultExecutor : EventLoopImplBase(), Runnable { private const val ACTIVE = 1 private const val SHUTDOWN_REQ = 2 private const val SHUTDOWN_ACK = 3 + private const val SHUTDOWN = 4 @Volatile private var debugStatus: Int = FRESH + private val isShutDown: Boolean get() = debugStatus == SHUTDOWN + private val isShutdownRequested: Boolean get() { val debugStatus = debugStatus return debugStatus == SHUTDOWN_REQ || debugStatus == SHUTDOWN_ACK } + actual override fun enqueue(task: Runnable) { + if (isShutDown) throw RejectedExecutionException("DefaultExecutor was shut down") + super.enqueue(task) + } + + override fun shutdown() { + debugStatus = SHUTDOWN + super.shutdown() + } + /** * All event loops are using DefaultExecutor#invokeOnTimeout to avoid livelock on * ``` @@ -118,9 +131,8 @@ internal actual object DefaultExecutor : EventLoopImplBase(), Runnable { return true } - // used for tests - @Synchronized - fun shutdown(timeout: Long) { + @Synchronized // used _only_ for tests + fun shutdownForTests(timeout: Long) { val deadline = System.currentTimeMillis() + timeout if (!isShutdownRequested) debugStatus = SHUTDOWN_REQ // loop while there is anything to do immediately or deadline passes diff --git a/kotlinx-coroutines-core/jvm/src/Dispatchers.kt b/kotlinx-coroutines-core/jvm/src/Dispatchers.kt index d82598eab4..d6eba68b5f 100644 --- a/kotlinx-coroutines-core/jvm/src/Dispatchers.kt +++ b/kotlinx-coroutines-core/jvm/src/Dispatchers.kt @@ -21,7 +21,7 @@ public const val IO_PARALLELISM_PROPERTY_NAME: String = "kotlinx.coroutines.io.p public actual object Dispatchers { /** * The default [CoroutineDispatcher] that is used by all standard builders like - * [launch][CoroutineScope.launch], [async][CoroutineScope.async], etc + * [launch][CoroutineScope.launch], [async][CoroutineScope.async], etc. * if no dispatcher nor any other [ContinuationInterceptor] is specified in their context. * * It is backed by a shared pool of threads on JVM. By default, the maximal level of parallelism used @@ -116,4 +116,27 @@ public actual object Dispatchers { */ @JvmStatic public val IO: CoroutineDispatcher = DefaultScheduler.IO + + /** + * Shuts down built-in dispatchers, such as [Default] and [IO], + * stopping all the threads associated with them and making them reject all new tasks. + * Dispatcher used as a fallback for time-related operations (`delay`, `withTimeout`) + * and to handle rejected tasks from other dispatchers is also shut down. + * + * This is a **delicate** API. It is not supposed to be called from a general + * application-level code and its invocation is irreversible. + * The invocation of shutdown affects most of the coroutines machinery and + * leaves the coroutines framework in an inoperable state. + * + * The main goal of the shutdown is to stop all background threads associated with the coroutines + * framework in order to make kotlinx.coroutines classes unloadable by Java Virtual Machine. + * It is only recommended to be used in containerized environments (OSGi, Gradle plugins system, + * IDEA plugins) at the end of the container lifecycle. + */ + @DelicateCoroutinesApi + public fun shutdown() { + DefaultExecutor.shutdown() + // Also shuts down Dispatchers.IO + shutdownDefaultDispatchers() + } } diff --git a/kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt b/kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt index 7227b07c07..0b5a542c2c 100644 --- a/kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt +++ b/kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt @@ -14,6 +14,7 @@ import kotlin.coroutines.* * Default instance of coroutine dispatcher. */ internal object DefaultScheduler : ExperimentalCoroutineDispatcher() { + @JvmField val IO: CoroutineDispatcher = LimitingDispatcher( this, systemProp(IO_PARALLELISM_PROPERTY_NAME, 64.coerceAtLeast(AVAILABLE_PROCESSORS)), @@ -21,6 +22,12 @@ internal object DefaultScheduler : ExperimentalCoroutineDispatcher() { TASK_PROBABLY_BLOCKING ) + // Shuts down the dispatcher, used only by Dispatchers.shutdown() + internal fun shutdown() { + super.close() + } + + // Overridden in case anyone writes (Dispatchers.Default as ExecutorCoroutineDispatcher).close() override fun close() { throw UnsupportedOperationException("$DEFAULT_DISPATCHER_NAME cannot be closed") } diff --git a/kotlinx-coroutines-core/jvm/test/TestBase.kt b/kotlinx-coroutines-core/jvm/test/TestBase.kt index 61a2c8b8b7..407a0b0e0b 100644 --- a/kotlinx-coroutines-core/jvm/test/TestBase.kt +++ b/kotlinx-coroutines-core/jvm/test/TestBase.kt @@ -206,7 +206,7 @@ public actual open class TestBase(private var disableOutCheck: Boolean) { fun shutdownPoolsAfterTest() { CommonPool.shutdown(SHUTDOWN_TIMEOUT) DefaultScheduler.shutdown(SHUTDOWN_TIMEOUT) - DefaultExecutor.shutdown(SHUTDOWN_TIMEOUT) + DefaultExecutor.shutdownForTests(SHUTDOWN_TIMEOUT) CommonPool.restore() DefaultScheduler.restore() } diff --git a/kotlinx-coroutines-core/jvm/test/VirtualTimeSource.kt b/kotlinx-coroutines-core/jvm/test/VirtualTimeSource.kt index bd9a185f6a..b4bc96ebdd 100644 --- a/kotlinx-coroutines-core/jvm/test/VirtualTimeSource.kt +++ b/kotlinx-coroutines-core/jvm/test/VirtualTimeSource.kt @@ -11,14 +11,14 @@ import java.util.concurrent.locks.* private const val SHUTDOWN_TIMEOUT = 1000L internal inline fun withVirtualTimeSource(log: PrintStream? = null, block: () -> Unit) { - DefaultExecutor.shutdown(SHUTDOWN_TIMEOUT) // shutdown execution with old time source (in case it was working) + DefaultExecutor.shutdownForTests(SHUTDOWN_TIMEOUT) // shutdown execution with old time source (in case it was working) val testTimeSource = VirtualTimeSource(log) timeSource = testTimeSource DefaultExecutor.ensureStarted() // should start with new time source try { block() } finally { - DefaultExecutor.shutdown(SHUTDOWN_TIMEOUT) + DefaultExecutor.shutdownForTests(SHUTDOWN_TIMEOUT) testTimeSource.shutdown() timeSource = null // restore time source } diff --git a/kotlinx-coroutines-core/jvm/test/knit/TestUtil.kt b/kotlinx-coroutines-core/jvm/test/knit/TestUtil.kt index 2e61ec6bce..6ec2f94278 100644 --- a/kotlinx-coroutines-core/jvm/test/knit/TestUtil.kt +++ b/kotlinx-coroutines-core/jvm/test/knit/TestUtil.kt @@ -28,7 +28,7 @@ fun test(name: String, block: () -> R): List = outputException(name) captureOutput(name, stdoutEnabled = OUT_ENABLED) { log -> CommonPool.usePrivatePool() DefaultScheduler.usePrivateScheduler() - DefaultExecutor.shutdown(SHUTDOWN_TIMEOUT) + DefaultExecutor.shutdownForTests(SHUTDOWN_TIMEOUT) resetCoroutineId() val threadsBefore = currentThreads() try { @@ -42,7 +42,7 @@ fun test(name: String, block: () -> R): List = outputException(name) CommonPool.shutdown(SHUTDOWN_TIMEOUT) DefaultScheduler.shutdown(SHUTDOWN_TIMEOUT) shutdownDispatcherPools(SHUTDOWN_TIMEOUT) - DefaultExecutor.shutdown(SHUTDOWN_TIMEOUT) // the last man standing -- cleanup all pending tasks + DefaultExecutor.shutdownForTests(SHUTDOWN_TIMEOUT) // the last man standing -- cleanup all pending tasks } checkTestThreads(threadsBefore) // check thread if the main completed successfully }