diff --git a/kotlinx-coroutines-core/jvm/src/Executors.kt b/kotlinx-coroutines-core/jvm/src/Executors.kt index da7ce7c696..7ea3cc6874 100644 --- a/kotlinx-coroutines-core/jvm/src/Executors.kt +++ b/kotlinx-coroutines-core/jvm/src/Executors.kt @@ -4,6 +4,7 @@ package kotlinx.coroutines +import kotlinx.coroutines.flow.* import kotlinx.coroutines.internal.* import java.io.* import java.util.concurrent.* @@ -39,6 +40,22 @@ public abstract class ExecutorCoroutineDispatcher: CoroutineDispatcher(), Closea /** * Converts an instance of [ExecutorService] to an implementation of [ExecutorCoroutineDispatcher]. * + * ## Interaction with [delay] and time-based coroutines. + * + * If the given [ExecutorService] is an instance of [ScheduledExecutorService], then all time-related + * coroutine operations such as [delay], [withTimeout] and time-based [Flow] operators will be scheduled + * on this executor using [schedule][ScheduledExecutorService.schedule] method. If the corresponding + * coroutine is cancelled, [ScheduledFuture.cancel] will be invoked on the corresponding future. + * + * If the given [ExecutorService] is an instance of [ScheduledThreadPoolExecutor], then prior to any scheduling, + * remove on cancel policy will be set via [ScheduledThreadPoolExecutor.setRemoveOnCancelPolicy] in order + * to reduce the memory pressure of cancelled coroutines. + * + * If the executor service is neither of this types, the separate internal thread will be used to + * _track_ the delay and time-related executions, but the coroutine itself will still be executed + * on top of the given executor. + * + * ## Rejected execution * If the underlying executor throws [RejectedExecutionException] on * attempt to submit a continuation task (it happens when [closing][ExecutorCoroutineDispatcher.close] the * resulting dispatcher, on underlying executor [shutdown][ExecutorService.shutdown], or when it uses limited queues), @@ -52,6 +69,23 @@ public fun ExecutorService.asCoroutineDispatcher(): ExecutorCoroutineDispatcher /** * Converts an instance of [Executor] to an implementation of [CoroutineDispatcher]. * + * ## Interaction with [delay] and time-based coroutines. + * + * If the given [Executor] is an instance of [ScheduledExecutorService], then all time-related + * coroutine operations such as [delay], [withTimeout] and time-based [Flow] operators will be scheduled + * on this executor using [schedule][ScheduledExecutorService.schedule] method. If the corresponding + * coroutine is cancelled, [ScheduledFuture.cancel] will be invoked on the corresponding future. + * + * If the given [Executor] is an instance of [ScheduledThreadPoolExecutor], then prior to any scheduling, + * remove on cancel policy will be set via [ScheduledThreadPoolExecutor.setRemoveOnCancelPolicy] in order + * to reduce the memory pressure of cancelled coroutines. + * + * If the executor is neither of this types, the separate internal thread will be used to + * _track_ the delay and time-related executions, but the coroutine itself will still be executed + * on top of the given executor. + * + * ## Rejected execution + * * If the underlying executor throws [RejectedExecutionException] on * attempt to submit a continuation task (it happens when [closing][ExecutorCoroutineDispatcher.close] the * resulting dispatcher, on underlying executor [shutdown][ExecutorService.shutdown], or when it uses limited queues), @@ -77,7 +111,14 @@ private class DispatcherExecutor(@JvmField val dispatcher: CoroutineDispatcher) internal class ExecutorCoroutineDispatcherImpl(override val executor: Executor) : ExecutorCoroutineDispatcher(), Delay { - private var removesFutureOnCancellation: Boolean = removeFutureOnCancel(executor) + /* + * Attempts to reflectively (to be Java 6 compatible) invoke + * ScheduledThreadPoolExecutor.setRemoveOnCancelPolicy in order to cleanup + * internal scheduler queue on cancellation. + */ + init { + removeFutureOnCancel(executor) + } override fun dispatch(context: CoroutineContext, block: Runnable) { try { @@ -89,17 +130,12 @@ internal class ExecutorCoroutineDispatcherImpl(override val executor: Executor) } } - /* - * removesFutureOnCancellation is required to avoid memory leak. - * On Java 7+ we reflectively invoke ScheduledThreadPoolExecutor.setRemoveOnCancelPolicy(true) and we're fine. - * On Java 6 we're scheduling time-based coroutines to our own thread safe heap which supports cancellation. - */ override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation) { - val future = if (removesFutureOnCancellation) { - scheduleBlock(ResumeUndispatchedRunnable(this, continuation), continuation.context, timeMillis) - } else { - null - } + val future = (executor as? ScheduledExecutorService)?.scheduleBlock( + ResumeUndispatchedRunnable(this, continuation), + continuation.context, + timeMillis + ) // If everything went fine and the scheduling attempt was not rejected -- use it if (future != null) { continuation.cancelFutureOnCancellation(future) @@ -110,20 +146,16 @@ internal class ExecutorCoroutineDispatcherImpl(override val executor: Executor) } override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle { - val future = if (removesFutureOnCancellation) { - scheduleBlock(block, context, timeMillis) - } else { - null - } + val future = (executor as? ScheduledExecutorService)?.scheduleBlock(block, context, timeMillis) return when { future != null -> DisposableFutureHandle(future) else -> DefaultExecutor.invokeOnTimeout(timeMillis, block, context) } } - private fun scheduleBlock(block: Runnable, context: CoroutineContext, timeMillis: Long): ScheduledFuture<*>? { + private fun ScheduledExecutorService.scheduleBlock(block: Runnable, context: CoroutineContext, timeMillis: Long): ScheduledFuture<*>? { return try { - (executor as? ScheduledExecutorService)?.schedule(block, timeMillis, TimeUnit.MILLISECONDS) + schedule(block, timeMillis, TimeUnit.MILLISECONDS) } catch (e: RejectedExecutionException) { cancelJobOnRejection(context, e) null diff --git a/kotlinx-coroutines-core/jvm/test/ExecutorAsCoroutineDispatcherDelayTest.kt b/kotlinx-coroutines-core/jvm/test/ExecutorAsCoroutineDispatcherDelayTest.kt new file mode 100644 index 0000000000..dbe9cb3741 --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/ExecutorAsCoroutineDispatcherDelayTest.kt @@ -0,0 +1,44 @@ +/* + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines + +import org.junit.Test +import java.lang.Runnable +import java.util.concurrent.* +import kotlin.test.* + +class ExecutorAsCoroutineDispatcherDelayTest : TestBase() { + + private var callsToSchedule = 0 + + private inner class STPE : ScheduledThreadPoolExecutor(1) { + override fun schedule(command: Runnable, delay: Long, unit: TimeUnit): ScheduledFuture<*> { + if (delay != 0L) ++callsToSchedule + return super.schedule(command, delay, unit) + } + } + + private inner class SES : ScheduledExecutorService by STPE() + + @Test + fun testScheduledThreadPool() = runTest { + val executor = STPE() + withContext(executor.asCoroutineDispatcher()) { + delay(100) + } + executor.shutdown() + assertEquals(1, callsToSchedule) + } + + @Test + fun testScheduledExecutorService() = runTest { + val executor = SES() + withContext(executor.asCoroutineDispatcher()) { + delay(100) + } + executor.shutdown() + assertEquals(1, callsToSchedule) + } +} diff --git a/kotlinx-coroutines-core/jvm/test/knit/ClosedAfterGuideTestExecutor.kt b/kotlinx-coroutines-core/jvm/test/knit/ClosedAfterGuideTestExecutor.kt index 735511f931..30fbfee264 100644 --- a/kotlinx-coroutines-core/jvm/test/knit/ClosedAfterGuideTestExecutor.kt +++ b/kotlinx-coroutines-core/jvm/test/knit/ClosedAfterGuideTestExecutor.kt @@ -8,9 +8,9 @@ import java.util.concurrent.* import java.util.concurrent.atomic.* import kotlin.coroutines.* -public fun newSingleThreadContext(name: String): ExecutorCoroutineDispatcher = ClosedAfterGuideTestDispatcher(1, name) +internal fun newSingleThreadContext(name: String): ExecutorCoroutineDispatcher = ClosedAfterGuideTestDispatcher(1, name) -public fun newFixedThreadPoolContext(nThreads: Int, name: String): ExecutorCoroutineDispatcher = +internal fun newFixedThreadPoolContext(nThreads: Int, name: String): ExecutorCoroutineDispatcher = ClosedAfterGuideTestDispatcher(nThreads, name) internal class PoolThread(