From a605eb3f8b51188216972898de8a135cde66bc8c Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Thu, 9 Sep 2021 19:11:29 +0300 Subject: [PATCH] Introduce Dispatchers.shutdown (#2915) Fixes #2558 --- .../api/kotlinx-coroutines-core.api | 1 + .../common/src/EventLoop.common.kt | 4 +-- .../jvm/src/DefaultExecutor.kt | 35 +++++++++++++++---- .../jvm/src/Dispatchers.kt | 29 ++++++++++++++- kotlinx-coroutines-core/jvm/src/EventLoop.kt | 3 +- .../jvm/src/scheduling/Dispatcher.kt | 7 ++++ kotlinx-coroutines-core/jvm/test/TestBase.kt | 2 +- .../jvm/test/VirtualTimeSource.kt | 4 +-- .../jvm/test/knit/TestUtil.kt | 4 +-- 9 files changed, 73 insertions(+), 16 deletions(-) diff --git a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api index 58fe2c194f..5f948af8af 100644 --- a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api +++ b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api @@ -279,6 +279,7 @@ public final class kotlinx/coroutines/Dispatchers { public static final fun getIO ()Lkotlinx/coroutines/CoroutineDispatcher; public static final fun getMain ()Lkotlinx/coroutines/MainCoroutineDispatcher; public static final fun getUnconfined ()Lkotlinx/coroutines/CoroutineDispatcher; + public final fun shutdown ()V } public final class kotlinx/coroutines/DispatchersKt { diff --git a/kotlinx-coroutines-core/common/src/EventLoop.common.kt b/kotlinx-coroutines-core/common/src/EventLoop.common.kt index e6a57c927a..e2a1ffd69f 100644 --- a/kotlinx-coroutines-core/common/src/EventLoop.common.kt +++ b/kotlinx-coroutines-core/common/src/EventLoop.common.kt @@ -115,7 +115,7 @@ internal abstract class EventLoop : CoroutineDispatcher() { } } - protected open fun shutdown() {} + open fun shutdown() {} } @ThreadLocal @@ -279,7 +279,7 @@ internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay { public final override fun dispatch(context: CoroutineContext, block: Runnable) = enqueue(block) - public fun enqueue(task: Runnable) { + open fun enqueue(task: Runnable) { if (enqueueImpl(task)) { // todo: we should unpark only when this delayed task became first in the queue unpark() diff --git a/kotlinx-coroutines-core/jvm/src/DefaultExecutor.kt b/kotlinx-coroutines-core/jvm/src/DefaultExecutor.kt index fe020276e5..2968825d15 100644 --- a/kotlinx-coroutines-core/jvm/src/DefaultExecutor.kt +++ b/kotlinx-coroutines-core/jvm/src/DefaultExecutor.kt @@ -17,13 +17,13 @@ internal actual object DefaultExecutor : EventLoopImplBase(), Runnable { incrementUseCount() // this event loop is never completed } - private const val DEFAULT_KEEP_ALIVE = 1000L // in milliseconds + private const val DEFAULT_KEEP_ALIVE_MS = 1000L // in milliseconds private val KEEP_ALIVE_NANOS = TimeUnit.MILLISECONDS.toNanos( try { - java.lang.Long.getLong("kotlinx.coroutines.DefaultExecutor.keepAlive", DEFAULT_KEEP_ALIVE) + java.lang.Long.getLong("kotlinx.coroutines.DefaultExecutor.keepAlive", DEFAULT_KEEP_ALIVE_MS) } catch (e: SecurityException) { - DEFAULT_KEEP_ALIVE + DEFAULT_KEEP_ALIVE_MS }) @Suppress("ObjectPropertyName") @@ -37,15 +37,39 @@ internal actual object DefaultExecutor : EventLoopImplBase(), Runnable { private const val ACTIVE = 1 private const val SHUTDOWN_REQ = 2 private const val SHUTDOWN_ACK = 3 + private const val SHUTDOWN = 4 @Volatile private var debugStatus: Int = FRESH + val isShutDown: Boolean get() = debugStatus == SHUTDOWN + private val isShutdownRequested: Boolean get() { val debugStatus = debugStatus return debugStatus == SHUTDOWN_REQ || debugStatus == SHUTDOWN_ACK } + actual override fun enqueue(task: Runnable) { + if (isShutDown) shutdownError() + super.enqueue(task) + } + + override fun reschedule(now: Long, delayedTask: DelayedTask) { + // Reschedule on default executor can only be invoked after Dispatchers.shutdown + shutdownError() + } + + private fun shutdownError() { + throw RejectedExecutionException("DefaultExecutor was shut down. " + + "This error indicates that Dispatchers.shutdown() was invoked prior to completion of exiting coroutines, leaving coroutines in incomplete state. " + + "Please refer to Dispatchers.shutdown documentation for more details") + } + + override fun shutdown() { + debugStatus = SHUTDOWN + super.shutdown() + } + /** * All event loops are using DefaultExecutor#invokeOnTimeout to avoid livelock on * ``` @@ -118,9 +142,8 @@ internal actual object DefaultExecutor : EventLoopImplBase(), Runnable { return true } - // used for tests - @Synchronized - fun shutdown(timeout: Long) { + @Synchronized // used _only_ for tests + fun shutdownForTests(timeout: Long) { val deadline = System.currentTimeMillis() + timeout if (!isShutdownRequested) debugStatus = SHUTDOWN_REQ // loop while there is anything to do immediately or deadline passes diff --git a/kotlinx-coroutines-core/jvm/src/Dispatchers.kt b/kotlinx-coroutines-core/jvm/src/Dispatchers.kt index 78723893c1..a3be9fa53c 100644 --- a/kotlinx-coroutines-core/jvm/src/Dispatchers.kt +++ b/kotlinx-coroutines-core/jvm/src/Dispatchers.kt @@ -21,7 +21,7 @@ public const val IO_PARALLELISM_PROPERTY_NAME: String = "kotlinx.coroutines.io.p public actual object Dispatchers { /** * The default [CoroutineDispatcher] that is used by all standard builders like - * [launch][CoroutineScope.launch], [async][CoroutineScope.async], etc + * [launch][CoroutineScope.launch], [async][CoroutineScope.async], etc. * if no dispatcher nor any other [ContinuationInterceptor] is specified in their context. * * It is backed by a shared pool of threads on JVM. By default, the maximal level of parallelism used @@ -116,4 +116,31 @@ public actual object Dispatchers { */ @JvmStatic public val IO: CoroutineDispatcher = DefaultScheduler.IO + + /** + * Shuts down built-in dispatchers, such as [Default] and [IO], + * stopping all the threads associated with them and making them reject all new tasks. + * Dispatcher used as a fallback for time-related operations (`delay`, `withTimeout`) + * and to handle rejected tasks from other dispatchers is also shut down. + * + * This is a **delicate** API. It is not supposed to be called from a general + * application-level code and its invocation is irreversible. + * The invocation of shutdown affects most of the coroutines machinery and + * leaves the coroutines framework in an inoperable state. + * The shutdown method should only be invoked when there are no pending tasks or active coroutines. + * Otherwise, the behavior is unspecified: the call to `shutdown` may throw an exception without completing + * the shutdown, or it may finish successfully, but the remaining jobs will be in a permanent dormant state, + * never completing nor executing. + * + * The main goal of the shutdown is to stop all background threads associated with the coroutines + * framework in order to make kotlinx.coroutines classes unloadable by Java Virtual Machine. + * It is only recommended to be used in containerized environments (OSGi, Gradle plugins system, + * IDEA plugins) at the end of the container lifecycle. + */ + @DelicateCoroutinesApi + public fun shutdown() { + DefaultExecutor.shutdown() + // Also shuts down Dispatchers.IO + DefaultScheduler.shutdown() + } } diff --git a/kotlinx-coroutines-core/jvm/src/EventLoop.kt b/kotlinx-coroutines-core/jvm/src/EventLoop.kt index e49c7dc7e1..b17d1dad25 100644 --- a/kotlinx-coroutines-core/jvm/src/EventLoop.kt +++ b/kotlinx-coroutines-core/jvm/src/EventLoop.kt @@ -13,8 +13,7 @@ internal actual abstract class EventLoopImplPlatform: EventLoop() { unpark(thread) } - protected actual fun reschedule(now: Long, delayedTask: EventLoopImplBase.DelayedTask) { - assert { this !== DefaultExecutor } // otherwise default execution was shutdown with tasks in it (cannot be) + protected actual open fun reschedule(now: Long, delayedTask: EventLoopImplBase.DelayedTask) { DefaultExecutor.schedule(now, delayedTask) } } diff --git a/kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt b/kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt index 7227b07c07..0b5a542c2c 100644 --- a/kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt +++ b/kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt @@ -14,6 +14,7 @@ import kotlin.coroutines.* * Default instance of coroutine dispatcher. */ internal object DefaultScheduler : ExperimentalCoroutineDispatcher() { + @JvmField val IO: CoroutineDispatcher = LimitingDispatcher( this, systemProp(IO_PARALLELISM_PROPERTY_NAME, 64.coerceAtLeast(AVAILABLE_PROCESSORS)), @@ -21,6 +22,12 @@ internal object DefaultScheduler : ExperimentalCoroutineDispatcher() { TASK_PROBABLY_BLOCKING ) + // Shuts down the dispatcher, used only by Dispatchers.shutdown() + internal fun shutdown() { + super.close() + } + + // Overridden in case anyone writes (Dispatchers.Default as ExecutorCoroutineDispatcher).close() override fun close() { throw UnsupportedOperationException("$DEFAULT_DISPATCHER_NAME cannot be closed") } diff --git a/kotlinx-coroutines-core/jvm/test/TestBase.kt b/kotlinx-coroutines-core/jvm/test/TestBase.kt index a8955f3e95..fce8eff3c5 100644 --- a/kotlinx-coroutines-core/jvm/test/TestBase.kt +++ b/kotlinx-coroutines-core/jvm/test/TestBase.kt @@ -204,7 +204,7 @@ public actual open class TestBase(private var disableOutCheck: Boolean) { fun shutdownPoolsAfterTest() { DefaultScheduler.shutdown(SHUTDOWN_TIMEOUT) - DefaultExecutor.shutdown(SHUTDOWN_TIMEOUT) + DefaultExecutor.shutdownForTests(SHUTDOWN_TIMEOUT) DefaultScheduler.restore() } diff --git a/kotlinx-coroutines-core/jvm/test/VirtualTimeSource.kt b/kotlinx-coroutines-core/jvm/test/VirtualTimeSource.kt index bd9a185f6a..b4bc96ebdd 100644 --- a/kotlinx-coroutines-core/jvm/test/VirtualTimeSource.kt +++ b/kotlinx-coroutines-core/jvm/test/VirtualTimeSource.kt @@ -11,14 +11,14 @@ import java.util.concurrent.locks.* private const val SHUTDOWN_TIMEOUT = 1000L internal inline fun withVirtualTimeSource(log: PrintStream? = null, block: () -> Unit) { - DefaultExecutor.shutdown(SHUTDOWN_TIMEOUT) // shutdown execution with old time source (in case it was working) + DefaultExecutor.shutdownForTests(SHUTDOWN_TIMEOUT) // shutdown execution with old time source (in case it was working) val testTimeSource = VirtualTimeSource(log) timeSource = testTimeSource DefaultExecutor.ensureStarted() // should start with new time source try { block() } finally { - DefaultExecutor.shutdown(SHUTDOWN_TIMEOUT) + DefaultExecutor.shutdownForTests(SHUTDOWN_TIMEOUT) testTimeSource.shutdown() timeSource = null // restore time source } diff --git a/kotlinx-coroutines-core/jvm/test/knit/TestUtil.kt b/kotlinx-coroutines-core/jvm/test/knit/TestUtil.kt index 5e83d7e909..80c98030d9 100644 --- a/kotlinx-coroutines-core/jvm/test/knit/TestUtil.kt +++ b/kotlinx-coroutines-core/jvm/test/knit/TestUtil.kt @@ -27,7 +27,7 @@ fun test(name: String, block: () -> R): List = outputException(name) try { captureOutput(name, stdoutEnabled = OUT_ENABLED) { log -> DefaultScheduler.usePrivateScheduler() - DefaultExecutor.shutdown(SHUTDOWN_TIMEOUT) + DefaultExecutor.shutdownForTests(SHUTDOWN_TIMEOUT) resetCoroutineId() val threadsBefore = currentThreads() try { @@ -40,7 +40,7 @@ fun test(name: String, block: () -> R): List = outputException(name) log.println("--- shutting down") DefaultScheduler.shutdown(SHUTDOWN_TIMEOUT) shutdownDispatcherPools(SHUTDOWN_TIMEOUT) - DefaultExecutor.shutdown(SHUTDOWN_TIMEOUT) // the last man standing -- cleanup all pending tasks + DefaultExecutor.shutdownForTests(SHUTDOWN_TIMEOUT) // the last man standing -- cleanup all pending tasks } checkTestThreads(threadsBefore) // check thread if the main completed successfully }