Skip to content

Commit

Permalink
Cancel current Job on RejectedExecutionException (#2012)
Browse files Browse the repository at this point in the history
When the Executor that was used with Executor.asCoroutineDispatcher() extension rejects the submitted task, it means that it had reached its capacity and so the executing current Job should be canceled to terminate it as soon as possible. This way RejectedExecutionException works as a rate-limiter just like it serves this purpose in executor-based Java code.

Fixes #2003
  • Loading branch information
elizarov committed Oct 9, 2020
1 parent b82439e commit 20341f2
Show file tree
Hide file tree
Showing 33 changed files with 282 additions and 60 deletions.
4 changes: 2 additions & 2 deletions kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
Expand Up @@ -257,13 +257,13 @@ public final class kotlinx/coroutines/Deferred$DefaultImpls {

public abstract interface class kotlinx/coroutines/Delay {
public abstract fun delay (JLkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun invokeOnTimeout (JLjava/lang/Runnable;)Lkotlinx/coroutines/DisposableHandle;
public abstract fun invokeOnTimeout (JLjava/lang/Runnable;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/DisposableHandle;
public abstract fun scheduleResumeAfterDelay (JLkotlinx/coroutines/CancellableContinuation;)V
}

public final class kotlinx/coroutines/Delay$DefaultImpls {
public static fun delay (Lkotlinx/coroutines/Delay;JLkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static fun invokeOnTimeout (Lkotlinx/coroutines/Delay;JLjava/lang/Runnable;)Lkotlinx/coroutines/DisposableHandle;
public static fun invokeOnTimeout (Lkotlinx/coroutines/Delay;JLjava/lang/Runnable;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/DisposableHandle;
}

public final class kotlinx/coroutines/DelayKt {
Expand Down
4 changes: 2 additions & 2 deletions kotlinx-coroutines-core/common/src/Delay.kt
Expand Up @@ -54,8 +54,8 @@ public interface Delay {
*
* This implementation uses a built-in single-threaded scheduled executor service.
*/
public fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle =
DefaultDelay.invokeOnTimeout(timeMillis, block)
public fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle =
DefaultDelay.invokeOnTimeout(timeMillis, block, context)
}

/**
Expand Down
2 changes: 1 addition & 1 deletion kotlinx-coroutines-core/common/src/Timeout.kt
Expand Up @@ -142,7 +142,7 @@ private fun <U, T: U> setupTimeout(
// schedule cancellation of this coroutine on time
val cont = coroutine.uCont
val context = cont.context
coroutine.disposeOnCompletion(context.delay.invokeOnTimeout(coroutine.time, coroutine))
coroutine.disposeOnCompletion(context.delay.invokeOnTimeout(coroutine.time, coroutine, coroutine.context))
// restart the block using a new coroutine with a new job,
// however, start it undispatched, because we already are in the proper context
return coroutine.startUndispatchedOrReturnIgnoreTimeout(coroutine, block)
Expand Down
2 changes: 1 addition & 1 deletion kotlinx-coroutines-core/common/src/selects/Select.kt
Expand Up @@ -655,7 +655,7 @@ internal class SelectBuilderImpl<in R>(
if (trySelect())
block.startCoroutineCancellable(completion) // shall be cancellable while waits for dispatch
}
disposeOnSelect(context.delay.invokeOnTimeout(timeMillis, action))
disposeOnSelect(context.delay.invokeOnTimeout(timeMillis, action, context))
}

private class DisposeNode(
Expand Down
4 changes: 2 additions & 2 deletions kotlinx-coroutines-core/common/test/flow/VirtualTime.kt
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines
Expand Down Expand Up @@ -50,7 +50,7 @@ private class VirtualTimeDispatcher(enclosingScope: CoroutineScope) : CoroutineD
@ExperimentalCoroutinesApi
override fun isDispatchNeeded(context: CoroutineContext): Boolean = originalDispatcher.isDispatchNeeded(context)

override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle {
override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle {
val task = TimedTask(block, currentTime + timeMillis)
heap += task
return task
Expand Down
4 changes: 2 additions & 2 deletions kotlinx-coroutines-core/js/src/JSDispatcher.kt
Expand Up @@ -35,7 +35,7 @@ internal sealed class SetTimeoutBasedDispatcher: CoroutineDispatcher(), Delay {
messageQueue.enqueue(block)
}

override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle {
override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle {
val handle = setTimeout({ block.run() }, delayToInt(timeMillis))
return ClearTimeout(handle)
}
Expand Down Expand Up @@ -81,7 +81,7 @@ internal class WindowDispatcher(private val window: Window) : CoroutineDispatche
window.setTimeout({ with(continuation) { resumeUndispatched(Unit) } }, delayToInt(timeMillis))
}

override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle {
override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle {
val handle = window.setTimeout({ block.run() }, delayToInt(timeMillis))
return object : DisposableHandle {
override fun dispose() {
Expand Down
2 changes: 2 additions & 0 deletions kotlinx-coroutines-core/jvm/src/CommonPool.kt
Expand Up @@ -103,6 +103,8 @@ internal object CommonPool : ExecutorCoroutineDispatcher() {
(pool ?: getOrCreatePoolSync()).execute(wrapTask(block))
} catch (e: RejectedExecutionException) {
unTrackTask()
// CommonPool only rejects execution when it is being closed and this behavior is reserved
// for testing purposes, so we don't have to worry about cancelling the affected Job here.
DefaultExecutor.enqueue(block)
}
}
Expand Down
3 changes: 2 additions & 1 deletion kotlinx-coroutines-core/jvm/src/DefaultExecutor.kt
Expand Up @@ -5,6 +5,7 @@
package kotlinx.coroutines

import java.util.concurrent.*
import kotlin.coroutines.*

internal actual val DefaultDelay: Delay = DefaultExecutor

Expand Down Expand Up @@ -54,7 +55,7 @@ internal actual object DefaultExecutor : EventLoopImplBase(), Runnable {
* Livelock is possible only if `runBlocking` is called on internal default executed (which is used by default [delay]),
* but it's not exposed as public API.
*/
override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle =
override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle =
scheduleInvokeOnTimeout(timeMillis, block)

override fun run() {
Expand Down
36 changes: 28 additions & 8 deletions kotlinx-coroutines-core/jvm/src/Executors.kt
Expand Up @@ -38,13 +38,25 @@ public abstract class ExecutorCoroutineDispatcher: CoroutineDispatcher(), Closea

/**
* Converts an instance of [ExecutorService] to an implementation of [ExecutorCoroutineDispatcher].
*
* If the underlying executor throws [RejectedExecutionException] on
* attempt to submit a continuation task (it happens when [closing][ExecutorCoroutineDispatcher.close] the
* resulting dispatcher, on underlying executor [shutdown][ExecutorService.shutdown], or when it uses limited queues),
* then the [Job] of the affected task is [cancelled][Job.cancel] and the task is submitted to the
* [Dispatchers.IO], so that the affected coroutine can cleanup its resources and promptly complete.
*/
@JvmName("from") // this is for a nice Java API, see issue #255
public fun ExecutorService.asCoroutineDispatcher(): ExecutorCoroutineDispatcher =
ExecutorCoroutineDispatcherImpl(this)

/**
* Converts an instance of [Executor] to an implementation of [CoroutineDispatcher].
*
* If the underlying executor throws [RejectedExecutionException] on
* attempt to submit a continuation task (it happens when [closing][ExecutorCoroutineDispatcher.close] the
* resulting dispatcher, on underlying executor [shutdown][ExecutorService.shutdown], or when it uses limited queues),
* then the [Job] of the affected task is [cancelled][Job.cancel] and the task is submitted to the
* [Dispatchers.IO], so that the affected coroutine can cleanup its resources and promptly complete.
*/
@JvmName("from") // this is for a nice Java API, see issue #255
public fun Executor.asCoroutineDispatcher(): CoroutineDispatcher =
Expand Down Expand Up @@ -82,7 +94,8 @@ internal abstract class ExecutorCoroutineDispatcherBase : ExecutorCoroutineDispa
executor.execute(wrapTask(block))
} catch (e: RejectedExecutionException) {
unTrackTask()
DefaultExecutor.enqueue(block)
cancelJobOnRejection(context, e)
Dispatchers.IO.dispatch(context, block)
}
}

Expand All @@ -93,7 +106,7 @@ internal abstract class ExecutorCoroutineDispatcherBase : ExecutorCoroutineDispa
*/
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
val future = if (removesFutureOnCancellation) {
scheduleBlock(ResumeUndispatchedRunnable(this, continuation), timeMillis, TimeUnit.MILLISECONDS)
scheduleBlock(ResumeUndispatchedRunnable(this, continuation), continuation.context, timeMillis)
} else {
null
}
Expand All @@ -106,24 +119,31 @@ internal abstract class ExecutorCoroutineDispatcherBase : ExecutorCoroutineDispa
DefaultExecutor.scheduleResumeAfterDelay(timeMillis, continuation)
}

override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle {
override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle {
val future = if (removesFutureOnCancellation) {
scheduleBlock(block, timeMillis, TimeUnit.MILLISECONDS)
scheduleBlock(block, context, timeMillis)
} else {
null
}

return if (future != null ) DisposableFutureHandle(future) else DefaultExecutor.invokeOnTimeout(timeMillis, block)
return when {
future != null -> DisposableFutureHandle(future)
else -> DefaultExecutor.invokeOnTimeout(timeMillis, block, context)
}
}

private fun scheduleBlock(block: Runnable, time: Long, unit: TimeUnit): ScheduledFuture<*>? {
private fun scheduleBlock(block: Runnable, context: CoroutineContext, timeMillis: Long): ScheduledFuture<*>? {
return try {
(executor as? ScheduledExecutorService)?.schedule(block, time, unit)
(executor as? ScheduledExecutorService)?.schedule(block, timeMillis, TimeUnit.MILLISECONDS)
} catch (e: RejectedExecutionException) {
cancelJobOnRejection(context, e)
null
}
}

private fun cancelJobOnRejection(context: CoroutineContext, exception: RejectedExecutionException) {
context.cancel(CancellationException("The task was rejected", exception))
}

override fun close() {
(executor as? ExecutorService)?.shutdown()
}
Expand Down
10 changes: 10 additions & 0 deletions kotlinx-coroutines-core/jvm/src/ThreadPoolDispatcher.kt
Expand Up @@ -14,6 +14,11 @@ import kotlin.coroutines.*
* **NOTE: The resulting [ExecutorCoroutineDispatcher] owns native resources (its thread).
* Resources are reclaimed by [ExecutorCoroutineDispatcher.close].**
*
* If the resulting dispatcher is [closed][ExecutorCoroutineDispatcher.close] and
* attempt to submit a continuation task is made,
* then the [Job] of the affected task is [cancelled][Job.cancel] and the task is submitted to the
* [Dispatchers.IO], so that the affected coroutine can cleanup its resources and promptly complete.
*
* **NOTE: This API will be replaced in the future**. A different API to create thread-limited thread pools
* that is based on a shared thread-pool and does not require the resulting dispatcher to be explicitly closed
* will be provided, thus avoiding potential thread leaks and also significantly improving performance, due
Expand All @@ -35,6 +40,11 @@ public fun newSingleThreadContext(name: String): ExecutorCoroutineDispatcher =
* **NOTE: The resulting [ExecutorCoroutineDispatcher] owns native resources (its threads).
* Resources are reclaimed by [ExecutorCoroutineDispatcher.close].**
*
* If the resulting dispatcher is [closed][ExecutorCoroutineDispatcher.close] and
* attempt to submit a continuation task is made,
* then the [Job] of the affected task is [cancelled][Job.cancel] and the task is submitted to the
* [Dispatchers.IO], so that the affected coroutine can cleanup its resources and promptly complete.
*
* **NOTE: This API will be replaced in the future**. A different API to create thread-limited thread pools
* that is based on a shared thread-pool and does not require the resulting dispatcher to be explicitly closed
* will be provided, thus avoiding potential thread leaks and also significantly improving performance, due
Expand Down
9 changes: 3 additions & 6 deletions kotlinx-coroutines-core/jvm/src/internal/MainDispatchers.kt
Expand Up @@ -87,17 +87,14 @@ private class MissingMainCoroutineDispatcher(

override val immediate: MainCoroutineDispatcher get() = this

override fun isDispatchNeeded(context: CoroutineContext): Boolean {
override fun isDispatchNeeded(context: CoroutineContext): Boolean =
missing()
}

override suspend fun delay(time: Long) {
override suspend fun delay(time: Long) =
missing()
}

override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle {
override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle =
missing()
}

override fun dispatch(context: CoroutineContext, block: Runnable) =
missing()
Expand Down
8 changes: 7 additions & 1 deletion kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt
Expand Up @@ -65,13 +65,17 @@ public open class ExperimentalCoroutineDispatcher(
try {
coroutineScheduler.dispatch(block)
} catch (e: RejectedExecutionException) {
// CoroutineScheduler only rejects execution when it is being closed and this behavior is reserved
// for testing purposes, so we don't have to worry about cancelling the affected Job here.
DefaultExecutor.dispatch(context, block)
}

override fun dispatchYield(context: CoroutineContext, block: Runnable): Unit =
try {
coroutineScheduler.dispatch(block, tailDispatch = true)
} catch (e: RejectedExecutionException) {
// CoroutineScheduler only rejects execution when it is being closed and this behavior is reserved
// for testing purposes, so we don't have to worry about cancelling the affected Job here.
DefaultExecutor.dispatchYield(context, block)
}

Expand Down Expand Up @@ -110,7 +114,9 @@ public open class ExperimentalCoroutineDispatcher(
try {
coroutineScheduler.dispatch(block, context, tailDispatch)
} catch (e: RejectedExecutionException) {
// Context shouldn't be lost here to properly invoke before/after task
// CoroutineScheduler only rejects execution when it is being closed and this behavior is reserved
// for testing purposes, so we don't have to worry about cancelling the affected Job here.
// TaskContext shouldn't be lost here to properly invoke before/after task
DefaultExecutor.enqueue(coroutineScheduler.createTask(block, context))
}
}
Expand Down
Expand Up @@ -230,7 +230,7 @@ public class TestCoroutineContext(private val name: String? = null) : CoroutineC
}, timeMillis)
}

override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle {
override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle {
val node = postDelayed(block, timeMillis)
return object : DisposableHandle {
override fun dispose() {
Expand Down
6 changes: 5 additions & 1 deletion kotlinx-coroutines-core/jvm/test/ExecutorsTest.kt
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines
Expand Down Expand Up @@ -29,6 +29,8 @@ class ExecutorsTest : TestBase() {
val context = newFixedThreadPoolContext(2, "TestPool")
runBlocking(context) {
checkThreadName("TestPool")
delay(10)
checkThreadName("TestPool") // should dispatch on the right thread
}
context.close()
}
Expand All @@ -38,6 +40,8 @@ class ExecutorsTest : TestBase() {
val executor = Executors.newSingleThreadExecutor { r -> Thread(r, "TestExecutor") }
runBlocking(executor.asCoroutineDispatcher()) {
checkThreadName("TestExecutor")
delay(10)
checkThreadName("TestExecutor") // should dispatch on the right thread
}
executor.shutdown()
}
Expand Down
39 changes: 26 additions & 13 deletions kotlinx-coroutines-core/jvm/test/FailingCoroutinesMachineryTest.kt
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines
Expand All @@ -15,8 +15,21 @@ import kotlin.test.*
@RunWith(Parameterized::class)
class FailingCoroutinesMachineryTest(
private val element: CoroutineContext.Element,
private val dispatcher: CoroutineDispatcher
private val dispatcher: TestDispatcher
) : TestBase() {
class TestDispatcher(val name: String, val block: () -> CoroutineDispatcher) {
private var _value: CoroutineDispatcher? = null

val value: CoroutineDispatcher
get() = _value ?: block().also { _value = it }

override fun toString(): String = name

fun reset() {
runCatching { (_value as? ExecutorCoroutineDispatcher)?.close() }
_value = null
}
}

private var caught: Throwable? = null
private val latch = CountDownLatch(1)
Expand Down Expand Up @@ -75,7 +88,7 @@ class FailingCoroutinesMachineryTest(

@After
fun tearDown() {
runCatching { (dispatcher as? ExecutorCoroutineDispatcher)?.close() }
dispatcher.reset()
if (lazyOuterDispatcher.isInitialized()) lazyOuterDispatcher.value.close()
}

Expand All @@ -84,14 +97,14 @@ class FailingCoroutinesMachineryTest(
@Parameterized.Parameters(name = "Element: {0}, dispatcher: {1}")
fun dispatchers(): List<Array<Any>> {
val elements = listOf<Any>(FailingRestore, FailingUpdate)
val dispatchers = listOf<Any>(
Dispatchers.Unconfined,
Dispatchers.Default,
Executors.newFixedThreadPool(1).asCoroutineDispatcher(),
Executors.newScheduledThreadPool(1).asCoroutineDispatcher(),
ThrowingDispatcher, ThrowingDispatcher2
val dispatchers = listOf<TestDispatcher>(
TestDispatcher("Dispatchers.Unconfined") { Dispatchers.Unconfined },
TestDispatcher("Dispatchers.Default") { Dispatchers.Default },
TestDispatcher("Executors.newFixedThreadPool(1)") { Executors.newFixedThreadPool(1).asCoroutineDispatcher() },
TestDispatcher("Executors.newScheduledThreadPool(1)") { Executors.newScheduledThreadPool(1).asCoroutineDispatcher() },
TestDispatcher("ThrowingDispatcher") { ThrowingDispatcher },
TestDispatcher("ThrowingDispatcher2") { ThrowingDispatcher2 }
)

return elements.flatMap { element ->
dispatchers.map { dispatcher ->
arrayOf(element, dispatcher)
Expand All @@ -102,13 +115,13 @@ class FailingCoroutinesMachineryTest(

@Test
fun testElement() = runTest {
launch(NonCancellable + dispatcher + exceptionHandler + element) {}
launch(NonCancellable + dispatcher.value + exceptionHandler + element) {}
checkException()
}

@Test
fun testNestedElement() = runTest {
launch(NonCancellable + dispatcher + exceptionHandler) {
launch(NonCancellable + dispatcher.value + exceptionHandler) {
launch(element) { }
}
checkException()
Expand All @@ -117,7 +130,7 @@ class FailingCoroutinesMachineryTest(
@Test
fun testNestedDispatcherAndElement() = runTest {
launch(lazyOuterDispatcher.value + NonCancellable + exceptionHandler) {
launch(element + dispatcher) { }
launch(element + dispatcher.value) { }
}
checkException()
}
Expand Down

0 comments on commit 20341f2

Please sign in to comment.