Skip to content

Commit

Permalink
Document and tweak the contract of Executor.asCoroutineDispatcher and…
Browse files Browse the repository at this point in the history
… ExecutorService.asCoroutineDispatcher (#2727)

* Get rid of ThreadPoolDispatcher and PoolThread classes
* Reuse the same class for both asCoroutineDispatcher and newFixedThreadPoolContext
* Replace 3-classes hierarchy by a single impl class
* Copy the auto-closing logic to test source

* Document and tweak the contract of Executor.asCoroutineDispatcher and ExecutorService.asCoroutineDispatcher
* Document it properly
* Make it more robust to signature changes and/or delegation (e.g. see the implementation of java.util.concurrent.Executors.newScheduledThreadPool)
* Give a public way to reduce the memory pressure via ScheduledFuture.cancel

Fixes #2601
  • Loading branch information
qwwdfsad committed May 25, 2021
1 parent 623db41 commit 5121005
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 68 deletions.
80 changes: 51 additions & 29 deletions kotlinx-coroutines-core/jvm/src/Executors.kt
Expand Up @@ -4,6 +4,7 @@

package kotlinx.coroutines

import kotlinx.coroutines.flow.*
import kotlinx.coroutines.internal.*
import java.io.*
import java.util.concurrent.*
Expand Down Expand Up @@ -39,6 +40,22 @@ public abstract class ExecutorCoroutineDispatcher: CoroutineDispatcher(), Closea
/**
* Converts an instance of [ExecutorService] to an implementation of [ExecutorCoroutineDispatcher].
*
* ## Interaction with [delay] and time-based coroutines.
*
* If the given [ExecutorService] is an instance of [ScheduledExecutorService], then all time-related
* coroutine operations such as [delay], [withTimeout] and time-based [Flow] operators will be scheduled
* on this executor using [schedule][ScheduledExecutorService.schedule] method. If the corresponding
* coroutine is cancelled, [ScheduledFuture.cancel] will be invoked on the corresponding future.
*
* If the given [ExecutorService] is an instance of [ScheduledThreadPoolExecutor], then prior to any scheduling,
* remove on cancel policy will be set via [ScheduledThreadPoolExecutor.setRemoveOnCancelPolicy] in order
* to reduce the memory pressure of cancelled coroutines.
*
* If the executor service is neither of this types, the separate internal thread will be used to
* _track_ the delay and time-related executions, but the coroutine itself will still be executed
* on top of the given executor.
*
* ## Rejected execution
* If the underlying executor throws [RejectedExecutionException] on
* attempt to submit a continuation task (it happens when [closing][ExecutorCoroutineDispatcher.close] the
* resulting dispatcher, on underlying executor [shutdown][ExecutorService.shutdown], or when it uses limited queues),
Expand All @@ -52,6 +69,23 @@ public fun ExecutorService.asCoroutineDispatcher(): ExecutorCoroutineDispatcher
/**
* Converts an instance of [Executor] to an implementation of [CoroutineDispatcher].
*
* ## Interaction with [delay] and time-based coroutines.
*
* If the given [Executor] is an instance of [ScheduledExecutorService], then all time-related
* coroutine operations such as [delay], [withTimeout] and time-based [Flow] operators will be scheduled
* on this executor using [schedule][ScheduledExecutorService.schedule] method. If the corresponding
* coroutine is cancelled, [ScheduledFuture.cancel] will be invoked on the corresponding future.
*
* If the given [Executor] is an instance of [ScheduledThreadPoolExecutor], then prior to any scheduling,
* remove on cancel policy will be set via [ScheduledThreadPoolExecutor.setRemoveOnCancelPolicy] in order
* to reduce the memory pressure of cancelled coroutines.
*
* If the executor is neither of this types, the separate internal thread will be used to
* _track_ the delay and time-related executions, but the coroutine itself will still be executed
* on top of the given executor.
*
* ## Rejected execution
*
* If the underlying executor throws [RejectedExecutionException] on
* attempt to submit a continuation task (it happens when [closing][ExecutorCoroutineDispatcher.close] the
* resulting dispatcher, on underlying executor [shutdown][ExecutorService.shutdown], or when it uses limited queues),
Expand All @@ -75,18 +109,15 @@ private class DispatcherExecutor(@JvmField val dispatcher: CoroutineDispatcher)
override fun toString(): String = dispatcher.toString()
}

private class ExecutorCoroutineDispatcherImpl(override val executor: Executor) : ExecutorCoroutineDispatcherBase() {
init {
initFutureCancellation()
}
}
internal class ExecutorCoroutineDispatcherImpl(override val executor: Executor) : ExecutorCoroutineDispatcher(), Delay {

internal abstract class ExecutorCoroutineDispatcherBase : ExecutorCoroutineDispatcher(), Delay {

private var removesFutureOnCancellation: Boolean = false

internal fun initFutureCancellation() {
removesFutureOnCancellation = removeFutureOnCancel(executor)
/*
* Attempts to reflectively (to be Java 6 compatible) invoke
* ScheduledThreadPoolExecutor.setRemoveOnCancelPolicy in order to cleanup
* internal scheduler queue on cancellation.
*/
init {
removeFutureOnCancel(executor)
}

override fun dispatch(context: CoroutineContext, block: Runnable) {
Expand All @@ -99,17 +130,12 @@ internal abstract class ExecutorCoroutineDispatcherBase : ExecutorCoroutineDispa
}
}

/*
* removesFutureOnCancellation is required to avoid memory leak.
* On Java 7+ we reflectively invoke ScheduledThreadPoolExecutor.setRemoveOnCancelPolicy(true) and we're fine.
* On Java 6 we're scheduling time-based coroutines to our own thread safe heap which supports cancellation.
*/
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
val future = if (removesFutureOnCancellation) {
scheduleBlock(ResumeUndispatchedRunnable(this, continuation), continuation.context, timeMillis)
} else {
null
}
val future = (executor as? ScheduledExecutorService)?.scheduleBlock(
ResumeUndispatchedRunnable(this, continuation),
continuation.context,
timeMillis
)
// If everything went fine and the scheduling attempt was not rejected -- use it
if (future != null) {
continuation.cancelFutureOnCancellation(future)
Expand All @@ -120,20 +146,16 @@ internal abstract class ExecutorCoroutineDispatcherBase : ExecutorCoroutineDispa
}

override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle {
val future = if (removesFutureOnCancellation) {
scheduleBlock(block, context, timeMillis)
} else {
null
}
val future = (executor as? ScheduledExecutorService)?.scheduleBlock(block, context, timeMillis)
return when {
future != null -> DisposableFutureHandle(future)
else -> DefaultExecutor.invokeOnTimeout(timeMillis, block, context)
}
}

private fun scheduleBlock(block: Runnable, context: CoroutineContext, timeMillis: Long): ScheduledFuture<*>? {
private fun ScheduledExecutorService.scheduleBlock(block: Runnable, context: CoroutineContext, timeMillis: Long): ScheduledFuture<*>? {
return try {
(executor as? ScheduledExecutorService)?.schedule(block, timeMillis, TimeUnit.MILLISECONDS)
schedule(block, timeMillis, TimeUnit.MILLISECONDS)
} catch (e: RejectedExecutionException) {
cancelJobOnRejection(context, e)
null
Expand All @@ -149,7 +171,7 @@ internal abstract class ExecutorCoroutineDispatcherBase : ExecutorCoroutineDispa
}

override fun toString(): String = executor.toString()
override fun equals(other: Any?): Boolean = other is ExecutorCoroutineDispatcherBase && other.executor === executor
override fun equals(other: Any?): Boolean = other is ExecutorCoroutineDispatcherImpl && other.executor === executor
override fun hashCode(): Int = System.identityHashCode(executor)
}

Expand Down
41 changes: 6 additions & 35 deletions kotlinx-coroutines-core/jvm/src/ThreadPoolDispatcher.kt
Expand Up @@ -59,40 +59,11 @@ public fun newSingleThreadContext(name: String): ExecutorCoroutineDispatcher =
@ObsoleteCoroutinesApi
public fun newFixedThreadPoolContext(nThreads: Int, name: String): ExecutorCoroutineDispatcher {
require(nThreads >= 1) { "Expected at least one thread, but $nThreads specified" }
return ThreadPoolDispatcher(nThreads, name)
}

internal class PoolThread(
@JvmField val dispatcher: ThreadPoolDispatcher, // for debugging & tests
target: Runnable, name: String
) : Thread(target, name) {
init { isDaemon = true }
}

/**
* Dispatches coroutine execution to a thread pool of a fixed size. Instances of this dispatcher are
* created with [newSingleThreadContext] and [newFixedThreadPoolContext].
*/
internal class ThreadPoolDispatcher internal constructor(
private val nThreads: Int,
private val name: String
) : ExecutorCoroutineDispatcherBase() {
private val threadNo = AtomicInteger()

override val executor: Executor = Executors.newScheduledThreadPool(nThreads) { target ->
PoolThread(this, target, if (nThreads == 1) name else name + "-" + threadNo.incrementAndGet())
}

init {
initFutureCancellation()
val threadNo = AtomicInteger()
val executor = Executors.newScheduledThreadPool(nThreads) { runnable ->
val t = Thread(runnable, if (nThreads == 1) name else name + "-" + threadNo.incrementAndGet())
t.isDaemon = true
t
}

/**
* Closes this dispatcher -- shuts down all threads in this pool and releases resources.
*/
public override fun close() {
(executor as ExecutorService).shutdown()
}

override fun toString(): String = "ThreadPoolDispatcher[$nThreads, $name]"
return executor.asCoroutineDispatcher()
}
@@ -0,0 +1,44 @@
/*
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines

import org.junit.Test
import java.lang.Runnable
import java.util.concurrent.*
import kotlin.test.*

class ExecutorAsCoroutineDispatcherDelayTest : TestBase() {

private var callsToSchedule = 0

private inner class STPE : ScheduledThreadPoolExecutor(1) {
override fun schedule(command: Runnable, delay: Long, unit: TimeUnit): ScheduledFuture<*> {
if (delay != 0L) ++callsToSchedule
return super.schedule(command, delay, unit)
}
}

private inner class SES : ScheduledExecutorService by STPE()

@Test
fun testScheduledThreadPool() = runTest {
val executor = STPE()
withContext(executor.asCoroutineDispatcher()) {
delay(100)
}
executor.shutdown()
assertEquals(1, callsToSchedule)
}

@Test
fun testScheduledExecutorService() = runTest {
val executor = SES()
withContext(executor.asCoroutineDispatcher()) {
delay(100)
}
executor.shutdown()
assertEquals(1, callsToSchedule)
}
}
@@ -0,0 +1,51 @@
/*
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines // Trick to make guide tests use these declarations with executors that can be closed on our side implicitly

import java.util.concurrent.*
import java.util.concurrent.atomic.*
import kotlin.coroutines.*

internal fun newSingleThreadContext(name: String): ExecutorCoroutineDispatcher = ClosedAfterGuideTestDispatcher(1, name)

internal fun newFixedThreadPoolContext(nThreads: Int, name: String): ExecutorCoroutineDispatcher =
ClosedAfterGuideTestDispatcher(nThreads, name)

internal class PoolThread(
@JvmField val dispatcher: ExecutorCoroutineDispatcher, // for debugging & tests
target: Runnable, name: String
) : Thread(target, name) {
init {
isDaemon = true
}
}

private class ClosedAfterGuideTestDispatcher(
private val nThreads: Int,
private val name: String
) : ExecutorCoroutineDispatcher() {
private val threadNo = AtomicInteger()

override val executor: Executor =
Executors.newScheduledThreadPool(nThreads, object : ThreadFactory {
override fun newThread(target: java.lang.Runnable): Thread {
return PoolThread(
this@ClosedAfterGuideTestDispatcher,
target,
if (nThreads == 1) name else name + "-" + threadNo.incrementAndGet()
)
}
})

override fun dispatch(context: CoroutineContext, block: Runnable) {
executor.execute(wrapTask(block))
}

override fun close() {
(executor as ExecutorService).shutdown()
}

override fun toString(): String = "ThreadPoolDispatcher[$nThreads, $name]"
}
6 changes: 2 additions & 4 deletions kotlinx-coroutines-core/jvm/test/knit/TestUtil.kt
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.knit
Expand All @@ -11,8 +11,6 @@ import kotlinx.knit.test.*
import java.util.concurrent.*
import kotlin.test.*

fun wrapTask(block: Runnable) = kotlinx.coroutines.wrapTask(block)

// helper function to dump exception to stdout for ease of debugging failed tests
private inline fun <T> outputException(name: String, block: () -> T): T =
try { block() }
Expand Down Expand Up @@ -176,4 +174,4 @@ private inline fun List<String>.verify(verification: () -> Unit) {
}
throw t
}
}
}

0 comments on commit 5121005

Please sign in to comment.