Skip to content

Commit

Permalink
Cancel current Job on RejectedExecutionException
Browse files Browse the repository at this point in the history
When the Executor that was used with Executor.asCoroutineDispatcher() extension rejects submitted task, it means that it had reached its capacity and so the executing current Job should be cancelled to terminate it as soon as possible. This way RejectedExecutionException works as a rate-limiter just like it serves this purpose in executor-based Java code.

Fixes #2003
  • Loading branch information
elizarov committed May 12, 2020
1 parent e47cb35 commit 861e425
Show file tree
Hide file tree
Showing 28 changed files with 175 additions and 42 deletions.
4 changes: 2 additions & 2 deletions kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
Expand Up @@ -256,13 +256,13 @@ public final class kotlinx/coroutines/Deferred$DefaultImpls {

public abstract interface class kotlinx/coroutines/Delay {
public abstract fun delay (JLkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun invokeOnTimeout (JLjava/lang/Runnable;)Lkotlinx/coroutines/DisposableHandle;
public abstract fun invokeOnTimeout (JLjava/lang/Runnable;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/DisposableHandle;
public abstract fun scheduleResumeAfterDelay (JLkotlinx/coroutines/CancellableContinuation;)V
}

public final class kotlinx/coroutines/Delay$DefaultImpls {
public static fun delay (Lkotlinx/coroutines/Delay;JLkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static fun invokeOnTimeout (Lkotlinx/coroutines/Delay;JLjava/lang/Runnable;)Lkotlinx/coroutines/DisposableHandle;
public static fun invokeOnTimeout (Lkotlinx/coroutines/Delay;JLjava/lang/Runnable;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/DisposableHandle;
}

public final class kotlinx/coroutines/DelayKt {
Expand Down
4 changes: 2 additions & 2 deletions kotlinx-coroutines-core/common/src/Delay.kt
Expand Up @@ -54,8 +54,8 @@ public interface Delay {
*
* This implementation uses a built-in single-threaded scheduled executor service.
*/
public fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle =
DefaultDelay.invokeOnTimeout(timeMillis, block)
public fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle =
DefaultDelay.invokeOnTimeout(timeMillis, block, context)
}

/**
Expand Down
2 changes: 1 addition & 1 deletion kotlinx-coroutines-core/common/src/Timeout.kt
Expand Up @@ -105,7 +105,7 @@ private fun <U, T: U> setupTimeout(
// schedule cancellation of this coroutine on time
val cont = coroutine.uCont
val context = cont.context
coroutine.disposeOnCompletion(context.delay.invokeOnTimeout(coroutine.time, coroutine))
coroutine.disposeOnCompletion(context.delay.invokeOnTimeout(coroutine.time, coroutine, coroutine.context))
// restart the block using a new coroutine with a new job,
// however, start it undispatched, because we already are in the proper context
return coroutine.startUndispatchedOrReturnIgnoreTimeout(coroutine, block)
Expand Down
2 changes: 1 addition & 1 deletion kotlinx-coroutines-core/common/src/selects/Select.kt
Expand Up @@ -649,7 +649,7 @@ internal class SelectBuilderImpl<in R>(
if (trySelect())
block.startCoroutineCancellable(completion) // shall be cancellable while waits for dispatch
}
disposeOnSelect(context.delay.invokeOnTimeout(timeMillis, action))
disposeOnSelect(context.delay.invokeOnTimeout(timeMillis, action, context))
}

private class DisposeNode(
Expand Down
4 changes: 2 additions & 2 deletions kotlinx-coroutines-core/common/test/flow/VirtualTime.kt
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines
Expand Down Expand Up @@ -50,7 +50,7 @@ private class VirtualTimeDispatcher(enclosingScope: CoroutineScope) : CoroutineD
@ExperimentalCoroutinesApi
override fun isDispatchNeeded(context: CoroutineContext): Boolean = originalDispatcher.isDispatchNeeded(context)

override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle {
override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle {
val task = TimedTask(block, currentTime + timeMillis)
heap += task
return task
Expand Down
2 changes: 2 additions & 0 deletions kotlinx-coroutines-core/jvm/src/CommonPool.kt
Expand Up @@ -103,6 +103,8 @@ internal object CommonPool : ExecutorCoroutineDispatcher() {
(pool ?: getOrCreatePoolSync()).execute(wrapTask(block))
} catch (e: RejectedExecutionException) {
unTrackTask()
// CommonPool only rejects execution when it is being closed and this behavior is reserved
// for testing purposes, so we don't have to worry about cancelling the affected Job here.
DefaultExecutor.enqueue(block)
}
}
Expand Down
3 changes: 2 additions & 1 deletion kotlinx-coroutines-core/jvm/src/DefaultExecutor.kt
Expand Up @@ -5,6 +5,7 @@
package kotlinx.coroutines

import java.util.concurrent.*
import kotlin.coroutines.*

internal actual val DefaultDelay: Delay = DefaultExecutor

Expand Down Expand Up @@ -54,7 +55,7 @@ internal actual object DefaultExecutor : EventLoopImplBase(), Runnable {
* Livelock is possible only if `runBlocking` is called on internal default executed (which is used by default [delay]),
* but it's not exposed as public API.
*/
override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle =
override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle =
scheduleInvokeOnTimeout(timeMillis, block)

override fun run() {
Expand Down
22 changes: 15 additions & 7 deletions kotlinx-coroutines-core/jvm/src/Executors.kt
Expand Up @@ -82,6 +82,7 @@ internal abstract class ExecutorCoroutineDispatcherBase : ExecutorCoroutineDispa
executor.execute(wrapTask(block))
} catch (e: RejectedExecutionException) {
unTrackTask()
cancelJobOnRejection(context, e)
DefaultExecutor.enqueue(block)
}
}
Expand All @@ -93,7 +94,7 @@ internal abstract class ExecutorCoroutineDispatcherBase : ExecutorCoroutineDispa
*/
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
val future = if (removesFutureOnCancellation) {
scheduleBlock(ResumeUndispatchedRunnable(this, continuation), timeMillis, TimeUnit.MILLISECONDS)
scheduleBlock(ResumeUndispatchedRunnable(this, continuation), continuation.context, timeMillis)
} else {
null
}
Expand All @@ -106,24 +107,31 @@ internal abstract class ExecutorCoroutineDispatcherBase : ExecutorCoroutineDispa
DefaultExecutor.scheduleResumeAfterDelay(timeMillis, continuation)
}

override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle {
override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle {
val future = if (removesFutureOnCancellation) {
scheduleBlock(block, timeMillis, TimeUnit.MILLISECONDS)
scheduleBlock(block, context, timeMillis)
} else {
null
}

return if (future != null ) DisposableFutureHandle(future) else DefaultExecutor.invokeOnTimeout(timeMillis, block)
return when {
future != null -> DisposableFutureHandle(future)
else -> DefaultExecutor.invokeOnTimeout(timeMillis, block, context)
}
}

private fun scheduleBlock(block: Runnable, time: Long, unit: TimeUnit): ScheduledFuture<*>? {
private fun scheduleBlock(block: Runnable, context: CoroutineContext, timeMillis: Long): ScheduledFuture<*>? {
return try {
(executor as? ScheduledExecutorService)?.schedule(block, time, unit)
(executor as? ScheduledExecutorService)?.schedule(block, timeMillis, TimeUnit.MILLISECONDS)
} catch (e: RejectedExecutionException) {
cancelJobOnRejection(context, e)
null
}
}

private fun cancelJobOnRejection(context: CoroutineContext, exception: RejectedExecutionException) {
context[Job]?.cancel(CancellationException("The task was rejected", exception))
}

override fun close() {
(executor as? ExecutorService)?.shutdown()
}
Expand Down
9 changes: 3 additions & 6 deletions kotlinx-coroutines-core/jvm/src/internal/MainDispatchers.kt
Expand Up @@ -87,17 +87,14 @@ private class MissingMainCoroutineDispatcher(

override val immediate: MainCoroutineDispatcher get() = this

override fun isDispatchNeeded(context: CoroutineContext): Boolean {
override fun isDispatchNeeded(context: CoroutineContext): Boolean =
missing()
}

override suspend fun delay(time: Long) {
override suspend fun delay(time: Long) =
missing()
}

override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle {
override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle =
missing()
}

override fun dispatch(context: CoroutineContext, block: Runnable) =
missing()
Expand Down
8 changes: 7 additions & 1 deletion kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt
Expand Up @@ -60,13 +60,17 @@ public open class ExperimentalCoroutineDispatcher(
try {
coroutineScheduler.dispatch(block)
} catch (e: RejectedExecutionException) {
// CoroutineScheduler only rejects execution when it is being closed and this behavior is reserved
// for testing purposes, so we don't have to worry about cancelling the affected Job here.
DefaultExecutor.dispatch(context, block)
}

override fun dispatchYield(context: CoroutineContext, block: Runnable): Unit =
try {
coroutineScheduler.dispatch(block, tailDispatch = true)
} catch (e: RejectedExecutionException) {
// CoroutineScheduler only rejects execution when it is being closed and this behavior is reserved
// for testing purposes, so we don't have to worry about cancelling the affected Job here.
DefaultExecutor.dispatchYield(context, block)
}

Expand Down Expand Up @@ -105,7 +109,9 @@ public open class ExperimentalCoroutineDispatcher(
try {
coroutineScheduler.dispatch(block, context, tailDispatch)
} catch (e: RejectedExecutionException) {
// Context shouldn't be lost here to properly invoke before/after task
// CoroutineScheduler only rejects execution when it is being closed and this behavior is reserved
// for testing purposes, so we don't have to worry about cancelling the affected Job here.
// TaskContext shouldn't be lost here to properly invoke before/after task
DefaultExecutor.enqueue(coroutineScheduler.createTask(block, context))
}
}
Expand Down
Expand Up @@ -230,7 +230,7 @@ public class TestCoroutineContext(private val name: String? = null) : CoroutineC
}, timeMillis)
}

override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle {
override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle {
val node = postDelayed(block, timeMillis)
return object : DisposableHandle {
override fun dispose() {
Expand Down
119 changes: 119 additions & 0 deletions kotlinx-coroutines-core/jvm/test/RejectedExecutionTest.kt
@@ -0,0 +1,119 @@
/*
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines

import org.junit.*
import org.junit.Test
import java.util.concurrent.*
import kotlin.test.*

class RejectedExecutionTest : TestBase() {
private val threadName = "RejectedExecutionTest"
private val executor = RejectingExecutor()

@After
fun tearDown() {
executor.shutdown()
executor.awaitTermination(10, TimeUnit.SECONDS)
}

@Test
fun testRejectOnLaunch() = runTest {
expect(1)
val job = launch(executor.asCoroutineDispatcher()) {
expectUnreached()
}
assertEquals(1, executor.submittedTasks)
assertTrue(job.isCancelled)
finish(2)
}

@Test
fun testRejectOnLaunchAtomic() = runTest {
expect(1)
val job = launch(executor.asCoroutineDispatcher(), start = CoroutineStart.ATOMIC) {
expect(2)
assertEquals(true, coroutineContext[Job]?.isCancelled)
assertNotSame(threadName, Thread.currentThread().name) // should have got dispatched on the DefaultExecutor
}
assertEquals(1, executor.submittedTasks)
job.join()
finish(3)
}

@Test
fun testRejectOnWithContext() = runTest {
expect(1)
assertFailsWith<CancellationException> {
withContext(executor.asCoroutineDispatcher()) {
expectUnreached()
}
}
assertEquals(1, executor.submittedTasks)
finish(2)
}

@Test
fun testRejectOnResumeInContext() = runTest {
expect(1)
executor.acceptTasks = 1 // accept one task
assertFailsWith<CancellationException> {
withContext(executor.asCoroutineDispatcher()) {
expect(2)
withContext(Dispatchers.Default) {
expect(3)
}
// cancelled on resume back
expectUnreached()
}
}
assertEquals(2, executor.submittedTasks)
finish(4)
}

@Test
fun testRejectOnDelay() = runTest {
expect(1)
executor.acceptTasks = 1 // accept one task
assertFailsWith<CancellationException> {
withContext(executor.asCoroutineDispatcher()) {
expect(2)
delay(10) // cancelled
expectUnreached()
}
}
assertEquals(2, executor.submittedTasks)
finish(3)
}

@Test
fun testRejectWithTimeout() = runTest {
expect(1)
executor.acceptTasks = 1 // accept one task
assertFailsWith<CancellationException> {
withContext(executor.asCoroutineDispatcher()) {
expect(2)
withTimeout(1000) {
expect(3) // atomic entry into the block (legacy behavior, it seem to be Ok with way)
assertEquals(true, coroutineContext[Job]?.isCancelled) // but the job is already cancelled
}
expectUnreached()
}
}
assertEquals(2, executor.submittedTasks)
finish(4)
}

private inner class RejectingExecutor : ScheduledThreadPoolExecutor(1, { r -> Thread(r, threadName) }) {
var acceptTasks = 0
var submittedTasks = 0

override fun schedule(command: Runnable, delay: Long, unit: TimeUnit): ScheduledFuture<*> {
submittedTasks++
if (submittedTasks > acceptTasks) throw RejectedExecutionException()
return super.schedule(command, delay, unit)
}
}
}
4 changes: 2 additions & 2 deletions kotlinx-coroutines-core/native/src/CoroutineContext.kt
Expand Up @@ -16,8 +16,8 @@ internal actual object DefaultExecutor : CoroutineDispatcher(), Delay {
takeEventLoop().dispatch(context, block)
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) =
takeEventLoop().scheduleResumeAfterDelay(timeMillis, continuation)
override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle =
takeEventLoop().invokeOnTimeout(timeMillis, block)
override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle =
takeEventLoop().invokeOnTimeout(timeMillis, block, context)

actual fun enqueue(task: Runnable): Unit = loopWasShutDown()
}
Expand Down
2 changes: 1 addition & 1 deletion kotlinx-coroutines-test/api/kotlinx-coroutines-test.api
Expand Up @@ -25,7 +25,7 @@ public final class kotlinx/coroutines/test/TestCoroutineDispatcher : kotlinx/cor
public fun dispatch (Lkotlin/coroutines/CoroutineContext;Ljava/lang/Runnable;)V
public fun dispatchYield (Lkotlin/coroutines/CoroutineContext;Ljava/lang/Runnable;)V
public fun getCurrentTime ()J
public fun invokeOnTimeout (JLjava/lang/Runnable;)Lkotlinx/coroutines/DisposableHandle;
public fun invokeOnTimeout (JLjava/lang/Runnable;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/DisposableHandle;
public fun pauseDispatcher ()V
public fun pauseDispatcher (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public fun resumeDispatcher ()V
Expand Down
2 changes: 1 addition & 1 deletion kotlinx-coroutines-test/src/TestCoroutineDispatcher.kt
Expand Up @@ -65,7 +65,7 @@ public class TestCoroutineDispatcher: CoroutineDispatcher(), Delay, DelayControl
}

/** @suppress */
override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle {
override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle {
val node = postDelayed(block, timeMillis)
return object : DisposableHandle {
override fun dispose() {
Expand Down
4 changes: 2 additions & 2 deletions kotlinx-coroutines-test/src/internal/MainTestDispatcher.kt
Expand Up @@ -46,8 +46,8 @@ internal class TestMainDispatcher(private val mainFactory: MainDispatcherFactory
delay.delay(time)
}

override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle {
return delay.invokeOnTimeout(timeMillis, block)
override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle {
return delay.invokeOnTimeout(timeMillis, block, context)
}

public fun setDispatcher(dispatcher: CoroutineDispatcher) {
Expand Down
Expand Up @@ -47,7 +47,7 @@ public final class kotlinx/coroutines/reactor/SchedulerCoroutineDispatcher : kot
public fun equals (Ljava/lang/Object;)Z
public final fun getScheduler ()Lreactor/core/scheduler/Scheduler;
public fun hashCode ()I
public fun invokeOnTimeout (JLjava/lang/Runnable;)Lkotlinx/coroutines/DisposableHandle;
public fun invokeOnTimeout (JLjava/lang/Runnable;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/DisposableHandle;
public fun scheduleResumeAfterDelay (JLkotlinx/coroutines/CancellableContinuation;)V
public fun toString ()Ljava/lang/String;
}
Expand Down
2 changes: 1 addition & 1 deletion reactive/kotlinx-coroutines-reactor/src/Scheduler.kt
Expand Up @@ -39,7 +39,7 @@ public class SchedulerCoroutineDispatcher(
}

/** @suppress */
override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle =
override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle =
scheduler.schedule(block, timeMillis, TimeUnit.MILLISECONDS).asDisposableHandle()

/** @suppress */
Expand Down
Expand Up @@ -76,7 +76,7 @@ public final class kotlinx/coroutines/rx2/SchedulerCoroutineDispatcher : kotlinx
public fun equals (Ljava/lang/Object;)Z
public final fun getScheduler ()Lio/reactivex/Scheduler;
public fun hashCode ()I
public fun invokeOnTimeout (JLjava/lang/Runnable;)Lkotlinx/coroutines/DisposableHandle;
public fun invokeOnTimeout (JLjava/lang/Runnable;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/DisposableHandle;
public fun scheduleResumeAfterDelay (JLkotlinx/coroutines/CancellableContinuation;)V
public fun toString ()Ljava/lang/String;
}
Expand Down
2 changes: 1 addition & 1 deletion reactive/kotlinx-coroutines-rx2/src/RxScheduler.kt
Expand Up @@ -38,7 +38,7 @@ public class SchedulerCoroutineDispatcher(
}

/** @suppress */
override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle {
override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle {
val disposable = scheduler.scheduleDirect(block, timeMillis, TimeUnit.MILLISECONDS)
return DisposableHandle { disposable.dispose() }
}
Expand Down

0 comments on commit 861e425

Please sign in to comment.