From 5121005eae8f04e0110421d721002cef7b003e45 Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Tue, 25 May 2021 05:07:39 -0700 Subject: [PATCH] Document and tweak the contract of Executor.asCoroutineDispatcher and ExecutorService.asCoroutineDispatcher (#2727) * Get rid of ThreadPoolDispatcher and PoolThread classes * Reuse the same class for both asCoroutineDispatcher and newFixedThreadPoolContext * Replace 3-classes hierarchy by a single impl class * Copy the auto-closing logic to test source * Document and tweak the contract of Executor.asCoroutineDispatcher and ExecutorService.asCoroutineDispatcher * Document it properly * Make it more robust to signature changes and/or delegation (e.g. see the implementation of java.util.concurrent.Executors.newScheduledThreadPool) * Give a public way to reduce the memory pressure via ScheduledFuture.cancel Fixes #2601 --- kotlinx-coroutines-core/jvm/src/Executors.kt | 80 ++++++++++++------- .../jvm/src/ThreadPoolDispatcher.kt | 41 ++-------- .../ExecutorAsCoroutineDispatcherDelayTest.kt | 44 ++++++++++ .../test/knit/ClosedAfterGuideTestExecutor.kt | 51 ++++++++++++ .../jvm/test/knit/TestUtil.kt | 6 +- 5 files changed, 154 insertions(+), 68 deletions(-) create mode 100644 kotlinx-coroutines-core/jvm/test/ExecutorAsCoroutineDispatcherDelayTest.kt create mode 100644 kotlinx-coroutines-core/jvm/test/knit/ClosedAfterGuideTestExecutor.kt diff --git a/kotlinx-coroutines-core/jvm/src/Executors.kt b/kotlinx-coroutines-core/jvm/src/Executors.kt index 394304f231..7ea3cc6874 100644 --- a/kotlinx-coroutines-core/jvm/src/Executors.kt +++ b/kotlinx-coroutines-core/jvm/src/Executors.kt @@ -4,6 +4,7 @@ package kotlinx.coroutines +import kotlinx.coroutines.flow.* import kotlinx.coroutines.internal.* import java.io.* import java.util.concurrent.* @@ -39,6 +40,22 @@ public abstract class ExecutorCoroutineDispatcher: CoroutineDispatcher(), Closea /** * Converts an instance of [ExecutorService] to an implementation of [ExecutorCoroutineDispatcher]. * + * ## Interaction with [delay] and time-based coroutines. + * + * If the given [ExecutorService] is an instance of [ScheduledExecutorService], then all time-related + * coroutine operations such as [delay], [withTimeout] and time-based [Flow] operators will be scheduled + * on this executor using [schedule][ScheduledExecutorService.schedule] method. If the corresponding + * coroutine is cancelled, [ScheduledFuture.cancel] will be invoked on the corresponding future. + * + * If the given [ExecutorService] is an instance of [ScheduledThreadPoolExecutor], then prior to any scheduling, + * remove on cancel policy will be set via [ScheduledThreadPoolExecutor.setRemoveOnCancelPolicy] in order + * to reduce the memory pressure of cancelled coroutines. + * + * If the executor service is neither of this types, the separate internal thread will be used to + * _track_ the delay and time-related executions, but the coroutine itself will still be executed + * on top of the given executor. + * + * ## Rejected execution * If the underlying executor throws [RejectedExecutionException] on * attempt to submit a continuation task (it happens when [closing][ExecutorCoroutineDispatcher.close] the * resulting dispatcher, on underlying executor [shutdown][ExecutorService.shutdown], or when it uses limited queues), @@ -52,6 +69,23 @@ public fun ExecutorService.asCoroutineDispatcher(): ExecutorCoroutineDispatcher /** * Converts an instance of [Executor] to an implementation of [CoroutineDispatcher]. * + * ## Interaction with [delay] and time-based coroutines. + * + * If the given [Executor] is an instance of [ScheduledExecutorService], then all time-related + * coroutine operations such as [delay], [withTimeout] and time-based [Flow] operators will be scheduled + * on this executor using [schedule][ScheduledExecutorService.schedule] method. If the corresponding + * coroutine is cancelled, [ScheduledFuture.cancel] will be invoked on the corresponding future. + * + * If the given [Executor] is an instance of [ScheduledThreadPoolExecutor], then prior to any scheduling, + * remove on cancel policy will be set via [ScheduledThreadPoolExecutor.setRemoveOnCancelPolicy] in order + * to reduce the memory pressure of cancelled coroutines. + * + * If the executor is neither of this types, the separate internal thread will be used to + * _track_ the delay and time-related executions, but the coroutine itself will still be executed + * on top of the given executor. + * + * ## Rejected execution + * * If the underlying executor throws [RejectedExecutionException] on * attempt to submit a continuation task (it happens when [closing][ExecutorCoroutineDispatcher.close] the * resulting dispatcher, on underlying executor [shutdown][ExecutorService.shutdown], or when it uses limited queues), @@ -75,18 +109,15 @@ private class DispatcherExecutor(@JvmField val dispatcher: CoroutineDispatcher) override fun toString(): String = dispatcher.toString() } -private class ExecutorCoroutineDispatcherImpl(override val executor: Executor) : ExecutorCoroutineDispatcherBase() { - init { - initFutureCancellation() - } -} +internal class ExecutorCoroutineDispatcherImpl(override val executor: Executor) : ExecutorCoroutineDispatcher(), Delay { -internal abstract class ExecutorCoroutineDispatcherBase : ExecutorCoroutineDispatcher(), Delay { - - private var removesFutureOnCancellation: Boolean = false - - internal fun initFutureCancellation() { - removesFutureOnCancellation = removeFutureOnCancel(executor) + /* + * Attempts to reflectively (to be Java 6 compatible) invoke + * ScheduledThreadPoolExecutor.setRemoveOnCancelPolicy in order to cleanup + * internal scheduler queue on cancellation. + */ + init { + removeFutureOnCancel(executor) } override fun dispatch(context: CoroutineContext, block: Runnable) { @@ -99,17 +130,12 @@ internal abstract class ExecutorCoroutineDispatcherBase : ExecutorCoroutineDispa } } - /* - * removesFutureOnCancellation is required to avoid memory leak. - * On Java 7+ we reflectively invoke ScheduledThreadPoolExecutor.setRemoveOnCancelPolicy(true) and we're fine. - * On Java 6 we're scheduling time-based coroutines to our own thread safe heap which supports cancellation. - */ override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation) { - val future = if (removesFutureOnCancellation) { - scheduleBlock(ResumeUndispatchedRunnable(this, continuation), continuation.context, timeMillis) - } else { - null - } + val future = (executor as? ScheduledExecutorService)?.scheduleBlock( + ResumeUndispatchedRunnable(this, continuation), + continuation.context, + timeMillis + ) // If everything went fine and the scheduling attempt was not rejected -- use it if (future != null) { continuation.cancelFutureOnCancellation(future) @@ -120,20 +146,16 @@ internal abstract class ExecutorCoroutineDispatcherBase : ExecutorCoroutineDispa } override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle { - val future = if (removesFutureOnCancellation) { - scheduleBlock(block, context, timeMillis) - } else { - null - } + val future = (executor as? ScheduledExecutorService)?.scheduleBlock(block, context, timeMillis) return when { future != null -> DisposableFutureHandle(future) else -> DefaultExecutor.invokeOnTimeout(timeMillis, block, context) } } - private fun scheduleBlock(block: Runnable, context: CoroutineContext, timeMillis: Long): ScheduledFuture<*>? { + private fun ScheduledExecutorService.scheduleBlock(block: Runnable, context: CoroutineContext, timeMillis: Long): ScheduledFuture<*>? { return try { - (executor as? ScheduledExecutorService)?.schedule(block, timeMillis, TimeUnit.MILLISECONDS) + schedule(block, timeMillis, TimeUnit.MILLISECONDS) } catch (e: RejectedExecutionException) { cancelJobOnRejection(context, e) null @@ -149,7 +171,7 @@ internal abstract class ExecutorCoroutineDispatcherBase : ExecutorCoroutineDispa } override fun toString(): String = executor.toString() - override fun equals(other: Any?): Boolean = other is ExecutorCoroutineDispatcherBase && other.executor === executor + override fun equals(other: Any?): Boolean = other is ExecutorCoroutineDispatcherImpl && other.executor === executor override fun hashCode(): Int = System.identityHashCode(executor) } diff --git a/kotlinx-coroutines-core/jvm/src/ThreadPoolDispatcher.kt b/kotlinx-coroutines-core/jvm/src/ThreadPoolDispatcher.kt index 44a79d42ed..99e3b46cce 100644 --- a/kotlinx-coroutines-core/jvm/src/ThreadPoolDispatcher.kt +++ b/kotlinx-coroutines-core/jvm/src/ThreadPoolDispatcher.kt @@ -59,40 +59,11 @@ public fun newSingleThreadContext(name: String): ExecutorCoroutineDispatcher = @ObsoleteCoroutinesApi public fun newFixedThreadPoolContext(nThreads: Int, name: String): ExecutorCoroutineDispatcher { require(nThreads >= 1) { "Expected at least one thread, but $nThreads specified" } - return ThreadPoolDispatcher(nThreads, name) -} - -internal class PoolThread( - @JvmField val dispatcher: ThreadPoolDispatcher, // for debugging & tests - target: Runnable, name: String -) : Thread(target, name) { - init { isDaemon = true } -} - -/** - * Dispatches coroutine execution to a thread pool of a fixed size. Instances of this dispatcher are - * created with [newSingleThreadContext] and [newFixedThreadPoolContext]. - */ -internal class ThreadPoolDispatcher internal constructor( - private val nThreads: Int, - private val name: String -) : ExecutorCoroutineDispatcherBase() { - private val threadNo = AtomicInteger() - - override val executor: Executor = Executors.newScheduledThreadPool(nThreads) { target -> - PoolThread(this, target, if (nThreads == 1) name else name + "-" + threadNo.incrementAndGet()) - } - - init { - initFutureCancellation() + val threadNo = AtomicInteger() + val executor = Executors.newScheduledThreadPool(nThreads) { runnable -> + val t = Thread(runnable, if (nThreads == 1) name else name + "-" + threadNo.incrementAndGet()) + t.isDaemon = true + t } - - /** - * Closes this dispatcher -- shuts down all threads in this pool and releases resources. - */ - public override fun close() { - (executor as ExecutorService).shutdown() - } - - override fun toString(): String = "ThreadPoolDispatcher[$nThreads, $name]" + return executor.asCoroutineDispatcher() } diff --git a/kotlinx-coroutines-core/jvm/test/ExecutorAsCoroutineDispatcherDelayTest.kt b/kotlinx-coroutines-core/jvm/test/ExecutorAsCoroutineDispatcherDelayTest.kt new file mode 100644 index 0000000000..dbe9cb3741 --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/ExecutorAsCoroutineDispatcherDelayTest.kt @@ -0,0 +1,44 @@ +/* + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines + +import org.junit.Test +import java.lang.Runnable +import java.util.concurrent.* +import kotlin.test.* + +class ExecutorAsCoroutineDispatcherDelayTest : TestBase() { + + private var callsToSchedule = 0 + + private inner class STPE : ScheduledThreadPoolExecutor(1) { + override fun schedule(command: Runnable, delay: Long, unit: TimeUnit): ScheduledFuture<*> { + if (delay != 0L) ++callsToSchedule + return super.schedule(command, delay, unit) + } + } + + private inner class SES : ScheduledExecutorService by STPE() + + @Test + fun testScheduledThreadPool() = runTest { + val executor = STPE() + withContext(executor.asCoroutineDispatcher()) { + delay(100) + } + executor.shutdown() + assertEquals(1, callsToSchedule) + } + + @Test + fun testScheduledExecutorService() = runTest { + val executor = SES() + withContext(executor.asCoroutineDispatcher()) { + delay(100) + } + executor.shutdown() + assertEquals(1, callsToSchedule) + } +} diff --git a/kotlinx-coroutines-core/jvm/test/knit/ClosedAfterGuideTestExecutor.kt b/kotlinx-coroutines-core/jvm/test/knit/ClosedAfterGuideTestExecutor.kt new file mode 100644 index 0000000000..30fbfee264 --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/knit/ClosedAfterGuideTestExecutor.kt @@ -0,0 +1,51 @@ +/* + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines // Trick to make guide tests use these declarations with executors that can be closed on our side implicitly + +import java.util.concurrent.* +import java.util.concurrent.atomic.* +import kotlin.coroutines.* + +internal fun newSingleThreadContext(name: String): ExecutorCoroutineDispatcher = ClosedAfterGuideTestDispatcher(1, name) + +internal fun newFixedThreadPoolContext(nThreads: Int, name: String): ExecutorCoroutineDispatcher = + ClosedAfterGuideTestDispatcher(nThreads, name) + +internal class PoolThread( + @JvmField val dispatcher: ExecutorCoroutineDispatcher, // for debugging & tests + target: Runnable, name: String +) : Thread(target, name) { + init { + isDaemon = true + } +} + +private class ClosedAfterGuideTestDispatcher( + private val nThreads: Int, + private val name: String +) : ExecutorCoroutineDispatcher() { + private val threadNo = AtomicInteger() + + override val executor: Executor = + Executors.newScheduledThreadPool(nThreads, object : ThreadFactory { + override fun newThread(target: java.lang.Runnable): Thread { + return PoolThread( + this@ClosedAfterGuideTestDispatcher, + target, + if (nThreads == 1) name else name + "-" + threadNo.incrementAndGet() + ) + } + }) + + override fun dispatch(context: CoroutineContext, block: Runnable) { + executor.execute(wrapTask(block)) + } + + override fun close() { + (executor as ExecutorService).shutdown() + } + + override fun toString(): String = "ThreadPoolDispatcher[$nThreads, $name]" +} diff --git a/kotlinx-coroutines-core/jvm/test/knit/TestUtil.kt b/kotlinx-coroutines-core/jvm/test/knit/TestUtil.kt index 7eda9043db..2e61ec6bce 100644 --- a/kotlinx-coroutines-core/jvm/test/knit/TestUtil.kt +++ b/kotlinx-coroutines-core/jvm/test/knit/TestUtil.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.knit @@ -11,8 +11,6 @@ import kotlinx.knit.test.* import java.util.concurrent.* import kotlin.test.* -fun wrapTask(block: Runnable) = kotlinx.coroutines.wrapTask(block) - // helper function to dump exception to stdout for ease of debugging failed tests private inline fun outputException(name: String, block: () -> T): T = try { block() } @@ -176,4 +174,4 @@ private inline fun List.verify(verification: () -> Unit) { } throw t } -} \ No newline at end of file +}