From 20341f2c849ada43de15d2c11d97dac9581cc4f4 Mon Sep 17 00:00:00 2001 From: Roman Elizarov Date: Fri, 9 Oct 2020 16:40:15 +0300 Subject: [PATCH] Cancel current Job on RejectedExecutionException (#2012) When the Executor that was used with Executor.asCoroutineDispatcher() extension rejects the submitted task, it means that it had reached its capacity and so the executing current Job should be canceled to terminate it as soon as possible. This way RejectedExecutionException works as a rate-limiter just like it serves this purpose in executor-based Java code. Fixes #2003 --- .../api/kotlinx-coroutines-core.api | 4 +- kotlinx-coroutines-core/common/src/Delay.kt | 4 +- kotlinx-coroutines-core/common/src/Timeout.kt | 2 +- .../common/src/selects/Select.kt | 2 +- .../common/test/flow/VirtualTime.kt | 4 +- .../js/src/JSDispatcher.kt | 4 +- kotlinx-coroutines-core/jvm/src/CommonPool.kt | 2 + .../jvm/src/DefaultExecutor.kt | 3 +- kotlinx-coroutines-core/jvm/src/Executors.kt | 36 +++- .../jvm/src/ThreadPoolDispatcher.kt | 10 ++ .../jvm/src/internal/MainDispatchers.kt | 9 +- .../jvm/src/scheduling/Dispatcher.kt | 8 +- .../jvm/src/test_/TestCoroutineContext.kt | 2 +- .../jvm/test/ExecutorsTest.kt | 6 +- .../test/FailingCoroutinesMachineryTest.kt | 39 ++-- .../jvm/test/RejectedExecutionTest.kt | 168 ++++++++++++++++++ .../native/src/CoroutineContext.kt | 4 +- .../native/src/EventLoop.kt | 3 +- .../api/kotlinx-coroutines-test.api | 2 +- .../src/TestCoroutineDispatcher.kt | 2 +- .../src/internal/MainTestDispatcher.kt | 4 +- .../api/kotlinx-coroutines-reactor.api | 2 +- .../src/Scheduler.kt | 2 +- .../api/kotlinx-coroutines-rx2.api | 2 +- .../kotlinx-coroutines-rx2/src/RxScheduler.kt | 2 +- .../api/kotlinx-coroutines-rx3.api | 2 +- .../kotlinx-coroutines-rx3/src/RxScheduler.kt | 2 +- .../api/kotlinx-coroutines-android.api | 2 +- .../src/HandlerDispatcher.kt | 2 +- .../api/kotlinx-coroutines-javafx.api | 2 +- .../src/JavaFxDispatcher.kt | 2 +- .../api/kotlinx-coroutines-swing.api | 2 +- .../src/SwingDispatcher.kt | 2 +- 33 files changed, 282 insertions(+), 60 deletions(-) create mode 100644 kotlinx-coroutines-core/jvm/test/RejectedExecutionTest.kt diff --git a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api index 6c44e14b44..6611995361 100644 --- a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api +++ b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api @@ -257,13 +257,13 @@ public final class kotlinx/coroutines/Deferred$DefaultImpls { public abstract interface class kotlinx/coroutines/Delay { public abstract fun delay (JLkotlin/coroutines/Continuation;)Ljava/lang/Object; - public abstract fun invokeOnTimeout (JLjava/lang/Runnable;)Lkotlinx/coroutines/DisposableHandle; + public abstract fun invokeOnTimeout (JLjava/lang/Runnable;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/DisposableHandle; public abstract fun scheduleResumeAfterDelay (JLkotlinx/coroutines/CancellableContinuation;)V } public final class kotlinx/coroutines/Delay$DefaultImpls { public static fun delay (Lkotlinx/coroutines/Delay;JLkotlin/coroutines/Continuation;)Ljava/lang/Object; - public static fun invokeOnTimeout (Lkotlinx/coroutines/Delay;JLjava/lang/Runnable;)Lkotlinx/coroutines/DisposableHandle; + public static fun invokeOnTimeout (Lkotlinx/coroutines/Delay;JLjava/lang/Runnable;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/DisposableHandle; } public final class kotlinx/coroutines/DelayKt { diff --git a/kotlinx-coroutines-core/common/src/Delay.kt b/kotlinx-coroutines-core/common/src/Delay.kt index ff0a6fc73b..ee28ec3b0e 100644 --- a/kotlinx-coroutines-core/common/src/Delay.kt +++ b/kotlinx-coroutines-core/common/src/Delay.kt @@ -54,8 +54,8 @@ public interface Delay { * * This implementation uses a built-in single-threaded scheduled executor service. */ - public fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle = - DefaultDelay.invokeOnTimeout(timeMillis, block) + public fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle = + DefaultDelay.invokeOnTimeout(timeMillis, block, context) } /** diff --git a/kotlinx-coroutines-core/common/src/Timeout.kt b/kotlinx-coroutines-core/common/src/Timeout.kt index 8547358b78..4bfff118e8 100644 --- a/kotlinx-coroutines-core/common/src/Timeout.kt +++ b/kotlinx-coroutines-core/common/src/Timeout.kt @@ -142,7 +142,7 @@ private fun setupTimeout( // schedule cancellation of this coroutine on time val cont = coroutine.uCont val context = cont.context - coroutine.disposeOnCompletion(context.delay.invokeOnTimeout(coroutine.time, coroutine)) + coroutine.disposeOnCompletion(context.delay.invokeOnTimeout(coroutine.time, coroutine, coroutine.context)) // restart the block using a new coroutine with a new job, // however, start it undispatched, because we already are in the proper context return coroutine.startUndispatchedOrReturnIgnoreTimeout(coroutine, block) diff --git a/kotlinx-coroutines-core/common/src/selects/Select.kt b/kotlinx-coroutines-core/common/src/selects/Select.kt index e744a0c724..628d6f7aa5 100644 --- a/kotlinx-coroutines-core/common/src/selects/Select.kt +++ b/kotlinx-coroutines-core/common/src/selects/Select.kt @@ -655,7 +655,7 @@ internal class SelectBuilderImpl( if (trySelect()) block.startCoroutineCancellable(completion) // shall be cancellable while waits for dispatch } - disposeOnSelect(context.delay.invokeOnTimeout(timeMillis, action)) + disposeOnSelect(context.delay.invokeOnTimeout(timeMillis, action, context)) } private class DisposeNode( diff --git a/kotlinx-coroutines-core/common/test/flow/VirtualTime.kt b/kotlinx-coroutines-core/common/test/flow/VirtualTime.kt index 9b257d933e..a607a4722a 100644 --- a/kotlinx-coroutines-core/common/test/flow/VirtualTime.kt +++ b/kotlinx-coroutines-core/common/test/flow/VirtualTime.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines @@ -50,7 +50,7 @@ private class VirtualTimeDispatcher(enclosingScope: CoroutineScope) : CoroutineD @ExperimentalCoroutinesApi override fun isDispatchNeeded(context: CoroutineContext): Boolean = originalDispatcher.isDispatchNeeded(context) - override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle { + override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle { val task = TimedTask(block, currentTime + timeMillis) heap += task return task diff --git a/kotlinx-coroutines-core/js/src/JSDispatcher.kt b/kotlinx-coroutines-core/js/src/JSDispatcher.kt index a0dfcba2b7..e1b3dcd7a9 100644 --- a/kotlinx-coroutines-core/js/src/JSDispatcher.kt +++ b/kotlinx-coroutines-core/js/src/JSDispatcher.kt @@ -35,7 +35,7 @@ internal sealed class SetTimeoutBasedDispatcher: CoroutineDispatcher(), Delay { messageQueue.enqueue(block) } - override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle { + override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle { val handle = setTimeout({ block.run() }, delayToInt(timeMillis)) return ClearTimeout(handle) } @@ -81,7 +81,7 @@ internal class WindowDispatcher(private val window: Window) : CoroutineDispatche window.setTimeout({ with(continuation) { resumeUndispatched(Unit) } }, delayToInt(timeMillis)) } - override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle { + override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle { val handle = window.setTimeout({ block.run() }, delayToInt(timeMillis)) return object : DisposableHandle { override fun dispose() { diff --git a/kotlinx-coroutines-core/jvm/src/CommonPool.kt b/kotlinx-coroutines-core/jvm/src/CommonPool.kt index 60f30cfe14..2203313120 100644 --- a/kotlinx-coroutines-core/jvm/src/CommonPool.kt +++ b/kotlinx-coroutines-core/jvm/src/CommonPool.kt @@ -103,6 +103,8 @@ internal object CommonPool : ExecutorCoroutineDispatcher() { (pool ?: getOrCreatePoolSync()).execute(wrapTask(block)) } catch (e: RejectedExecutionException) { unTrackTask() + // CommonPool only rejects execution when it is being closed and this behavior is reserved + // for testing purposes, so we don't have to worry about cancelling the affected Job here. DefaultExecutor.enqueue(block) } } diff --git a/kotlinx-coroutines-core/jvm/src/DefaultExecutor.kt b/kotlinx-coroutines-core/jvm/src/DefaultExecutor.kt index ed84f55e74..787cbf9c44 100644 --- a/kotlinx-coroutines-core/jvm/src/DefaultExecutor.kt +++ b/kotlinx-coroutines-core/jvm/src/DefaultExecutor.kt @@ -5,6 +5,7 @@ package kotlinx.coroutines import java.util.concurrent.* +import kotlin.coroutines.* internal actual val DefaultDelay: Delay = DefaultExecutor @@ -54,7 +55,7 @@ internal actual object DefaultExecutor : EventLoopImplBase(), Runnable { * Livelock is possible only if `runBlocking` is called on internal default executed (which is used by default [delay]), * but it's not exposed as public API. */ - override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle = + override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle = scheduleInvokeOnTimeout(timeMillis, block) override fun run() { diff --git a/kotlinx-coroutines-core/jvm/src/Executors.kt b/kotlinx-coroutines-core/jvm/src/Executors.kt index 8c36bfa14f..8ffc22d8bb 100644 --- a/kotlinx-coroutines-core/jvm/src/Executors.kt +++ b/kotlinx-coroutines-core/jvm/src/Executors.kt @@ -38,6 +38,12 @@ public abstract class ExecutorCoroutineDispatcher: CoroutineDispatcher(), Closea /** * Converts an instance of [ExecutorService] to an implementation of [ExecutorCoroutineDispatcher]. + * + * If the underlying executor throws [RejectedExecutionException] on + * attempt to submit a continuation task (it happens when [closing][ExecutorCoroutineDispatcher.close] the + * resulting dispatcher, on underlying executor [shutdown][ExecutorService.shutdown], or when it uses limited queues), + * then the [Job] of the affected task is [cancelled][Job.cancel] and the task is submitted to the + * [Dispatchers.IO], so that the affected coroutine can cleanup its resources and promptly complete. */ @JvmName("from") // this is for a nice Java API, see issue #255 public fun ExecutorService.asCoroutineDispatcher(): ExecutorCoroutineDispatcher = @@ -45,6 +51,12 @@ public fun ExecutorService.asCoroutineDispatcher(): ExecutorCoroutineDispatcher /** * Converts an instance of [Executor] to an implementation of [CoroutineDispatcher]. + * + * If the underlying executor throws [RejectedExecutionException] on + * attempt to submit a continuation task (it happens when [closing][ExecutorCoroutineDispatcher.close] the + * resulting dispatcher, on underlying executor [shutdown][ExecutorService.shutdown], or when it uses limited queues), + * then the [Job] of the affected task is [cancelled][Job.cancel] and the task is submitted to the + * [Dispatchers.IO], so that the affected coroutine can cleanup its resources and promptly complete. */ @JvmName("from") // this is for a nice Java API, see issue #255 public fun Executor.asCoroutineDispatcher(): CoroutineDispatcher = @@ -82,7 +94,8 @@ internal abstract class ExecutorCoroutineDispatcherBase : ExecutorCoroutineDispa executor.execute(wrapTask(block)) } catch (e: RejectedExecutionException) { unTrackTask() - DefaultExecutor.enqueue(block) + cancelJobOnRejection(context, e) + Dispatchers.IO.dispatch(context, block) } } @@ -93,7 +106,7 @@ internal abstract class ExecutorCoroutineDispatcherBase : ExecutorCoroutineDispa */ override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation) { val future = if (removesFutureOnCancellation) { - scheduleBlock(ResumeUndispatchedRunnable(this, continuation), timeMillis, TimeUnit.MILLISECONDS) + scheduleBlock(ResumeUndispatchedRunnable(this, continuation), continuation.context, timeMillis) } else { null } @@ -106,24 +119,31 @@ internal abstract class ExecutorCoroutineDispatcherBase : ExecutorCoroutineDispa DefaultExecutor.scheduleResumeAfterDelay(timeMillis, continuation) } - override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle { + override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle { val future = if (removesFutureOnCancellation) { - scheduleBlock(block, timeMillis, TimeUnit.MILLISECONDS) + scheduleBlock(block, context, timeMillis) } else { null } - - return if (future != null ) DisposableFutureHandle(future) else DefaultExecutor.invokeOnTimeout(timeMillis, block) + return when { + future != null -> DisposableFutureHandle(future) + else -> DefaultExecutor.invokeOnTimeout(timeMillis, block, context) + } } - private fun scheduleBlock(block: Runnable, time: Long, unit: TimeUnit): ScheduledFuture<*>? { + private fun scheduleBlock(block: Runnable, context: CoroutineContext, timeMillis: Long): ScheduledFuture<*>? { return try { - (executor as? ScheduledExecutorService)?.schedule(block, time, unit) + (executor as? ScheduledExecutorService)?.schedule(block, timeMillis, TimeUnit.MILLISECONDS) } catch (e: RejectedExecutionException) { + cancelJobOnRejection(context, e) null } } + private fun cancelJobOnRejection(context: CoroutineContext, exception: RejectedExecutionException) { + context.cancel(CancellationException("The task was rejected", exception)) + } + override fun close() { (executor as? ExecutorService)?.shutdown() } diff --git a/kotlinx-coroutines-core/jvm/src/ThreadPoolDispatcher.kt b/kotlinx-coroutines-core/jvm/src/ThreadPoolDispatcher.kt index a0e1ffa2c7..aa18cd38d6 100644 --- a/kotlinx-coroutines-core/jvm/src/ThreadPoolDispatcher.kt +++ b/kotlinx-coroutines-core/jvm/src/ThreadPoolDispatcher.kt @@ -14,6 +14,11 @@ import kotlin.coroutines.* * **NOTE: The resulting [ExecutorCoroutineDispatcher] owns native resources (its thread). * Resources are reclaimed by [ExecutorCoroutineDispatcher.close].** * + * If the resulting dispatcher is [closed][ExecutorCoroutineDispatcher.close] and + * attempt to submit a continuation task is made, + * then the [Job] of the affected task is [cancelled][Job.cancel] and the task is submitted to the + * [Dispatchers.IO], so that the affected coroutine can cleanup its resources and promptly complete. + * * **NOTE: This API will be replaced in the future**. A different API to create thread-limited thread pools * that is based on a shared thread-pool and does not require the resulting dispatcher to be explicitly closed * will be provided, thus avoiding potential thread leaks and also significantly improving performance, due @@ -35,6 +40,11 @@ public fun newSingleThreadContext(name: String): ExecutorCoroutineDispatcher = * **NOTE: The resulting [ExecutorCoroutineDispatcher] owns native resources (its threads). * Resources are reclaimed by [ExecutorCoroutineDispatcher.close].** * + * If the resulting dispatcher is [closed][ExecutorCoroutineDispatcher.close] and + * attempt to submit a continuation task is made, + * then the [Job] of the affected task is [cancelled][Job.cancel] and the task is submitted to the + * [Dispatchers.IO], so that the affected coroutine can cleanup its resources and promptly complete. + * * **NOTE: This API will be replaced in the future**. A different API to create thread-limited thread pools * that is based on a shared thread-pool and does not require the resulting dispatcher to be explicitly closed * will be provided, thus avoiding potential thread leaks and also significantly improving performance, due diff --git a/kotlinx-coroutines-core/jvm/src/internal/MainDispatchers.kt b/kotlinx-coroutines-core/jvm/src/internal/MainDispatchers.kt index ddfcdbb142..5b2b9ff68c 100644 --- a/kotlinx-coroutines-core/jvm/src/internal/MainDispatchers.kt +++ b/kotlinx-coroutines-core/jvm/src/internal/MainDispatchers.kt @@ -87,17 +87,14 @@ private class MissingMainCoroutineDispatcher( override val immediate: MainCoroutineDispatcher get() = this - override fun isDispatchNeeded(context: CoroutineContext): Boolean { + override fun isDispatchNeeded(context: CoroutineContext): Boolean = missing() - } - override suspend fun delay(time: Long) { + override suspend fun delay(time: Long) = missing() - } - override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle { + override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle = missing() - } override fun dispatch(context: CoroutineContext, block: Runnable) = missing() diff --git a/kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt b/kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt index e0890eff66..202c6e1d06 100644 --- a/kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt +++ b/kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt @@ -65,6 +65,8 @@ public open class ExperimentalCoroutineDispatcher( try { coroutineScheduler.dispatch(block) } catch (e: RejectedExecutionException) { + // CoroutineScheduler only rejects execution when it is being closed and this behavior is reserved + // for testing purposes, so we don't have to worry about cancelling the affected Job here. DefaultExecutor.dispatch(context, block) } @@ -72,6 +74,8 @@ public open class ExperimentalCoroutineDispatcher( try { coroutineScheduler.dispatch(block, tailDispatch = true) } catch (e: RejectedExecutionException) { + // CoroutineScheduler only rejects execution when it is being closed and this behavior is reserved + // for testing purposes, so we don't have to worry about cancelling the affected Job here. DefaultExecutor.dispatchYield(context, block) } @@ -110,7 +114,9 @@ public open class ExperimentalCoroutineDispatcher( try { coroutineScheduler.dispatch(block, context, tailDispatch) } catch (e: RejectedExecutionException) { - // Context shouldn't be lost here to properly invoke before/after task + // CoroutineScheduler only rejects execution when it is being closed and this behavior is reserved + // for testing purposes, so we don't have to worry about cancelling the affected Job here. + // TaskContext shouldn't be lost here to properly invoke before/after task DefaultExecutor.enqueue(coroutineScheduler.createTask(block, context)) } } diff --git a/kotlinx-coroutines-core/jvm/src/test_/TestCoroutineContext.kt b/kotlinx-coroutines-core/jvm/src/test_/TestCoroutineContext.kt index e7c8b6b671..649c95375d 100644 --- a/kotlinx-coroutines-core/jvm/src/test_/TestCoroutineContext.kt +++ b/kotlinx-coroutines-core/jvm/src/test_/TestCoroutineContext.kt @@ -230,7 +230,7 @@ public class TestCoroutineContext(private val name: String? = null) : CoroutineC }, timeMillis) } - override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle { + override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle { val node = postDelayed(block, timeMillis) return object : DisposableHandle { override fun dispose() { diff --git a/kotlinx-coroutines-core/jvm/test/ExecutorsTest.kt b/kotlinx-coroutines-core/jvm/test/ExecutorsTest.kt index 033b9b7bc9..ebf08a03d0 100644 --- a/kotlinx-coroutines-core/jvm/test/ExecutorsTest.kt +++ b/kotlinx-coroutines-core/jvm/test/ExecutorsTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines @@ -29,6 +29,8 @@ class ExecutorsTest : TestBase() { val context = newFixedThreadPoolContext(2, "TestPool") runBlocking(context) { checkThreadName("TestPool") + delay(10) + checkThreadName("TestPool") // should dispatch on the right thread } context.close() } @@ -38,6 +40,8 @@ class ExecutorsTest : TestBase() { val executor = Executors.newSingleThreadExecutor { r -> Thread(r, "TestExecutor") } runBlocking(executor.asCoroutineDispatcher()) { checkThreadName("TestExecutor") + delay(10) + checkThreadName("TestExecutor") // should dispatch on the right thread } executor.shutdown() } diff --git a/kotlinx-coroutines-core/jvm/test/FailingCoroutinesMachineryTest.kt b/kotlinx-coroutines-core/jvm/test/FailingCoroutinesMachineryTest.kt index 9bf8ffad85..c9f722a5b8 100644 --- a/kotlinx-coroutines-core/jvm/test/FailingCoroutinesMachineryTest.kt +++ b/kotlinx-coroutines-core/jvm/test/FailingCoroutinesMachineryTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines @@ -15,8 +15,21 @@ import kotlin.test.* @RunWith(Parameterized::class) class FailingCoroutinesMachineryTest( private val element: CoroutineContext.Element, - private val dispatcher: CoroutineDispatcher + private val dispatcher: TestDispatcher ) : TestBase() { + class TestDispatcher(val name: String, val block: () -> CoroutineDispatcher) { + private var _value: CoroutineDispatcher? = null + + val value: CoroutineDispatcher + get() = _value ?: block().also { _value = it } + + override fun toString(): String = name + + fun reset() { + runCatching { (_value as? ExecutorCoroutineDispatcher)?.close() } + _value = null + } + } private var caught: Throwable? = null private val latch = CountDownLatch(1) @@ -75,7 +88,7 @@ class FailingCoroutinesMachineryTest( @After fun tearDown() { - runCatching { (dispatcher as? ExecutorCoroutineDispatcher)?.close() } + dispatcher.reset() if (lazyOuterDispatcher.isInitialized()) lazyOuterDispatcher.value.close() } @@ -84,14 +97,14 @@ class FailingCoroutinesMachineryTest( @Parameterized.Parameters(name = "Element: {0}, dispatcher: {1}") fun dispatchers(): List> { val elements = listOf(FailingRestore, FailingUpdate) - val dispatchers = listOf( - Dispatchers.Unconfined, - Dispatchers.Default, - Executors.newFixedThreadPool(1).asCoroutineDispatcher(), - Executors.newScheduledThreadPool(1).asCoroutineDispatcher(), - ThrowingDispatcher, ThrowingDispatcher2 + val dispatchers = listOf( + TestDispatcher("Dispatchers.Unconfined") { Dispatchers.Unconfined }, + TestDispatcher("Dispatchers.Default") { Dispatchers.Default }, + TestDispatcher("Executors.newFixedThreadPool(1)") { Executors.newFixedThreadPool(1).asCoroutineDispatcher() }, + TestDispatcher("Executors.newScheduledThreadPool(1)") { Executors.newScheduledThreadPool(1).asCoroutineDispatcher() }, + TestDispatcher("ThrowingDispatcher") { ThrowingDispatcher }, + TestDispatcher("ThrowingDispatcher2") { ThrowingDispatcher2 } ) - return elements.flatMap { element -> dispatchers.map { dispatcher -> arrayOf(element, dispatcher) @@ -102,13 +115,13 @@ class FailingCoroutinesMachineryTest( @Test fun testElement() = runTest { - launch(NonCancellable + dispatcher + exceptionHandler + element) {} + launch(NonCancellable + dispatcher.value + exceptionHandler + element) {} checkException() } @Test fun testNestedElement() = runTest { - launch(NonCancellable + dispatcher + exceptionHandler) { + launch(NonCancellable + dispatcher.value + exceptionHandler) { launch(element) { } } checkException() @@ -117,7 +130,7 @@ class FailingCoroutinesMachineryTest( @Test fun testNestedDispatcherAndElement() = runTest { launch(lazyOuterDispatcher.value + NonCancellable + exceptionHandler) { - launch(element + dispatcher) { } + launch(element + dispatcher.value) { } } checkException() } diff --git a/kotlinx-coroutines-core/jvm/test/RejectedExecutionTest.kt b/kotlinx-coroutines-core/jvm/test/RejectedExecutionTest.kt new file mode 100644 index 0000000000..a6f4dd6b18 --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/RejectedExecutionTest.kt @@ -0,0 +1,168 @@ +/* + * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines + +import kotlinx.coroutines.flow.* +import kotlinx.coroutines.scheduling.* +import org.junit.* +import org.junit.Test +import java.util.concurrent.* +import kotlin.test.* + +class RejectedExecutionTest : TestBase() { + private val threadName = "RejectedExecutionTest" + private val executor = RejectingExecutor() + + @After + fun tearDown() { + executor.shutdown() + executor.awaitTermination(10, TimeUnit.SECONDS) + } + + @Test + fun testRejectOnLaunch() = runTest { + expect(1) + val job = launch(executor.asCoroutineDispatcher()) { + expectUnreached() + } + assertEquals(1, executor.submittedTasks) + assertTrue(job.isCancelled) + finish(2) + } + + @Test + fun testRejectOnLaunchAtomic() = runTest { + expect(1) + val job = launch(executor.asCoroutineDispatcher(), start = CoroutineStart.ATOMIC) { + expect(2) + assertEquals(true, coroutineContext[Job]?.isCancelled) + assertIoThread() // was rejected on start, but start was atomic + } + assertEquals(1, executor.submittedTasks) + job.join() + finish(3) + } + + @Test + fun testRejectOnWithContext() = runTest { + expect(1) + assertFailsWith { + withContext(executor.asCoroutineDispatcher()) { + expectUnreached() + } + } + assertEquals(1, executor.submittedTasks) + finish(2) + } + + @Test + fun testRejectOnResumeInContext() = runTest { + expect(1) + executor.acceptTasks = 1 // accept one task + assertFailsWith { + withContext(executor.asCoroutineDispatcher()) { + expect(2) + assertExecutorThread() + try { + withContext(Dispatchers.Default) { + expect(3) + assertDefaultDispatcherThread() + // We have to wait until caller executor thread had already suspended (if not running task), + // so that we resume back to it a new task is posted + executor.awaitNotRunningTask() + expect(4) + assertDefaultDispatcherThread() + } + // cancelled on resume back + } finally { + expect(5) + assertIoThread() + } + expectUnreached() + } + } + assertEquals(2, executor.submittedTasks) + finish(6) + } + + @Test + fun testRejectOnDelay() = runTest { + expect(1) + executor.acceptTasks = 1 // accept one task + assertFailsWith { + withContext(executor.asCoroutineDispatcher()) { + expect(2) + assertExecutorThread() + try { + delay(10) // cancelled + } finally { + // Since it was cancelled on attempt to delay, it still stays on the same thread + assertExecutorThread() + } + expectUnreached() + } + } + assertEquals(2, executor.submittedTasks) + finish(3) + } + + @Test + fun testRejectWithTimeout() = runTest { + expect(1) + executor.acceptTasks = 1 // accept one task + assertFailsWith { + withContext(executor.asCoroutineDispatcher()) { + expect(2) + assertExecutorThread() + withTimeout(1000) { + expect(3) // atomic entry into the block (legacy behavior, it seem to be Ok with way) + assertEquals(true, coroutineContext[Job]?.isCancelled) // but the job is already cancelled + } + expectUnreached() + } + } + assertEquals(2, executor.submittedTasks) + finish(4) + } + + private inner class RejectingExecutor : ScheduledThreadPoolExecutor(1, { r -> Thread(r, threadName) }) { + var acceptTasks = 0 + var submittedTasks = 0 + val runningTask = MutableStateFlow(false) + + override fun schedule(command: Runnable, delay: Long, unit: TimeUnit): ScheduledFuture<*> { + submittedTasks++ + if (submittedTasks > acceptTasks) throw RejectedExecutionException() + val wrapper = Runnable { + runningTask.value = true + try { + command.run() + } finally { + runningTask.value = false + } + } + return super.schedule(wrapper, delay, unit) + } + + suspend fun awaitNotRunningTask() = runningTask.first { !it } + } + + private fun assertExecutorThread() { + val thread = Thread.currentThread() + if (!thread.name.startsWith(threadName)) error("Not an executor thread: $thread") + } + + private fun assertDefaultDispatcherThread() { + val thread = Thread.currentThread() + if (thread !is CoroutineScheduler.Worker) error("Not a thread from Dispatchers.Default: $thread") + assertEquals(CoroutineScheduler.WorkerState.CPU_ACQUIRED, thread.state) + } + + private fun assertIoThread() { + val thread = Thread.currentThread() + if (thread !is CoroutineScheduler.Worker) error("Not a thread from Dispatchers.IO: $thread") + assertEquals(CoroutineScheduler.WorkerState.BLOCKING, thread.state) + } +} \ No newline at end of file diff --git a/kotlinx-coroutines-core/native/src/CoroutineContext.kt b/kotlinx-coroutines-core/native/src/CoroutineContext.kt index bcc7f48963..4ec1289ee7 100644 --- a/kotlinx-coroutines-core/native/src/CoroutineContext.kt +++ b/kotlinx-coroutines-core/native/src/CoroutineContext.kt @@ -16,8 +16,8 @@ internal actual object DefaultExecutor : CoroutineDispatcher(), Delay { takeEventLoop().dispatch(context, block) override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation) = takeEventLoop().scheduleResumeAfterDelay(timeMillis, continuation) - override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle = - takeEventLoop().invokeOnTimeout(timeMillis, block) + override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle = + takeEventLoop().invokeOnTimeout(timeMillis, block, context) actual fun enqueue(task: Runnable): Unit = loopWasShutDown() } diff --git a/kotlinx-coroutines-core/native/src/EventLoop.kt b/kotlinx-coroutines-core/native/src/EventLoop.kt index d6c6525504..b397d6f182 100644 --- a/kotlinx-coroutines-core/native/src/EventLoop.kt +++ b/kotlinx-coroutines-core/native/src/EventLoop.kt @@ -4,6 +4,7 @@ package kotlinx.coroutines +import kotlin.coroutines.* import kotlin.system.* internal actual abstract class EventLoopImplPlatform: EventLoop() { @@ -13,7 +14,7 @@ internal actual abstract class EventLoopImplPlatform: EventLoop() { } internal class EventLoopImpl: EventLoopImplBase() { - override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle = + override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle = scheduleInvokeOnTimeout(timeMillis, block) } diff --git a/kotlinx-coroutines-test/api/kotlinx-coroutines-test.api b/kotlinx-coroutines-test/api/kotlinx-coroutines-test.api index e3b1f73e4f..c99ec5cbf1 100644 --- a/kotlinx-coroutines-test/api/kotlinx-coroutines-test.api +++ b/kotlinx-coroutines-test/api/kotlinx-coroutines-test.api @@ -25,7 +25,7 @@ public final class kotlinx/coroutines/test/TestCoroutineDispatcher : kotlinx/cor public fun dispatch (Lkotlin/coroutines/CoroutineContext;Ljava/lang/Runnable;)V public fun dispatchYield (Lkotlin/coroutines/CoroutineContext;Ljava/lang/Runnable;)V public fun getCurrentTime ()J - public fun invokeOnTimeout (JLjava/lang/Runnable;)Lkotlinx/coroutines/DisposableHandle; + public fun invokeOnTimeout (JLjava/lang/Runnable;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/DisposableHandle; public fun pauseDispatcher ()V public fun pauseDispatcher (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public fun resumeDispatcher ()V diff --git a/kotlinx-coroutines-test/src/TestCoroutineDispatcher.kt b/kotlinx-coroutines-test/src/TestCoroutineDispatcher.kt index 4706d627ef..cad2636f97 100644 --- a/kotlinx-coroutines-test/src/TestCoroutineDispatcher.kt +++ b/kotlinx-coroutines-test/src/TestCoroutineDispatcher.kt @@ -65,7 +65,7 @@ public class TestCoroutineDispatcher: CoroutineDispatcher(), Delay, DelayControl } /** @suppress */ - override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle { + override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle { val node = postDelayed(block, timeMillis) return object : DisposableHandle { override fun dispose() { diff --git a/kotlinx-coroutines-test/src/internal/MainTestDispatcher.kt b/kotlinx-coroutines-test/src/internal/MainTestDispatcher.kt index c18e4108bb..baa1aa5fd2 100644 --- a/kotlinx-coroutines-test/src/internal/MainTestDispatcher.kt +++ b/kotlinx-coroutines-test/src/internal/MainTestDispatcher.kt @@ -46,8 +46,8 @@ internal class TestMainDispatcher(private val mainFactory: MainDispatcherFactory delay.delay(time) } - override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle { - return delay.invokeOnTimeout(timeMillis, block) + override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle { + return delay.invokeOnTimeout(timeMillis, block, context) } public fun setDispatcher(dispatcher: CoroutineDispatcher) { diff --git a/reactive/kotlinx-coroutines-reactor/api/kotlinx-coroutines-reactor.api b/reactive/kotlinx-coroutines-reactor/api/kotlinx-coroutines-reactor.api index 3b5c6b9522..b46fe338e5 100644 --- a/reactive/kotlinx-coroutines-reactor/api/kotlinx-coroutines-reactor.api +++ b/reactive/kotlinx-coroutines-reactor/api/kotlinx-coroutines-reactor.api @@ -49,7 +49,7 @@ public final class kotlinx/coroutines/reactor/SchedulerCoroutineDispatcher : kot public fun equals (Ljava/lang/Object;)Z public final fun getScheduler ()Lreactor/core/scheduler/Scheduler; public fun hashCode ()I - public fun invokeOnTimeout (JLjava/lang/Runnable;)Lkotlinx/coroutines/DisposableHandle; + public fun invokeOnTimeout (JLjava/lang/Runnable;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/DisposableHandle; public fun scheduleResumeAfterDelay (JLkotlinx/coroutines/CancellableContinuation;)V public fun toString ()Ljava/lang/String; } diff --git a/reactive/kotlinx-coroutines-reactor/src/Scheduler.kt b/reactive/kotlinx-coroutines-reactor/src/Scheduler.kt index e176c07bb9..4fb5514322 100644 --- a/reactive/kotlinx-coroutines-reactor/src/Scheduler.kt +++ b/reactive/kotlinx-coroutines-reactor/src/Scheduler.kt @@ -39,7 +39,7 @@ public class SchedulerCoroutineDispatcher( } /** @suppress */ - override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle = + override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle = scheduler.schedule(block, timeMillis, TimeUnit.MILLISECONDS).asDisposableHandle() /** @suppress */ diff --git a/reactive/kotlinx-coroutines-rx2/api/kotlinx-coroutines-rx2.api b/reactive/kotlinx-coroutines-rx2/api/kotlinx-coroutines-rx2.api index 2cf7bc815b..4370325f58 100644 --- a/reactive/kotlinx-coroutines-rx2/api/kotlinx-coroutines-rx2.api +++ b/reactive/kotlinx-coroutines-rx2/api/kotlinx-coroutines-rx2.api @@ -84,7 +84,7 @@ public final class kotlinx/coroutines/rx2/SchedulerCoroutineDispatcher : kotlinx public fun equals (Ljava/lang/Object;)Z public final fun getScheduler ()Lio/reactivex/Scheduler; public fun hashCode ()I - public fun invokeOnTimeout (JLjava/lang/Runnable;)Lkotlinx/coroutines/DisposableHandle; + public fun invokeOnTimeout (JLjava/lang/Runnable;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/DisposableHandle; public fun scheduleResumeAfterDelay (JLkotlinx/coroutines/CancellableContinuation;)V public fun toString ()Ljava/lang/String; } diff --git a/reactive/kotlinx-coroutines-rx2/src/RxScheduler.kt b/reactive/kotlinx-coroutines-rx2/src/RxScheduler.kt index 3ddb67649e..9952eb91a0 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxScheduler.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxScheduler.kt @@ -38,7 +38,7 @@ public class SchedulerCoroutineDispatcher( } /** @suppress */ - override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle { + override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle { val disposable = scheduler.scheduleDirect(block, timeMillis, TimeUnit.MILLISECONDS) return DisposableHandle { disposable.dispose() } } diff --git a/reactive/kotlinx-coroutines-rx3/api/kotlinx-coroutines-rx3.api b/reactive/kotlinx-coroutines-rx3/api/kotlinx-coroutines-rx3.api index eb92fd3285..6d2dd63d2c 100644 --- a/reactive/kotlinx-coroutines-rx3/api/kotlinx-coroutines-rx3.api +++ b/reactive/kotlinx-coroutines-rx3/api/kotlinx-coroutines-rx3.api @@ -71,7 +71,7 @@ public final class kotlinx/coroutines/rx3/SchedulerCoroutineDispatcher : kotlinx public fun equals (Ljava/lang/Object;)Z public final fun getScheduler ()Lio/reactivex/rxjava3/core/Scheduler; public fun hashCode ()I - public fun invokeOnTimeout (JLjava/lang/Runnable;)Lkotlinx/coroutines/DisposableHandle; + public fun invokeOnTimeout (JLjava/lang/Runnable;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/DisposableHandle; public fun scheduleResumeAfterDelay (JLkotlinx/coroutines/CancellableContinuation;)V public fun toString ()Ljava/lang/String; } diff --git a/reactive/kotlinx-coroutines-rx3/src/RxScheduler.kt b/reactive/kotlinx-coroutines-rx3/src/RxScheduler.kt index 6e91aeea85..a426aea6ba 100644 --- a/reactive/kotlinx-coroutines-rx3/src/RxScheduler.kt +++ b/reactive/kotlinx-coroutines-rx3/src/RxScheduler.kt @@ -38,7 +38,7 @@ public class SchedulerCoroutineDispatcher( } /** @suppress */ - override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle { + override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle { val disposable = scheduler.scheduleDirect(block, timeMillis, TimeUnit.MILLISECONDS) return DisposableHandle { disposable.dispose() } } diff --git a/ui/kotlinx-coroutines-android/api/kotlinx-coroutines-android.api b/ui/kotlinx-coroutines-android/api/kotlinx-coroutines-android.api index b97d8462c3..090c14e09c 100644 --- a/ui/kotlinx-coroutines-android/api/kotlinx-coroutines-android.api +++ b/ui/kotlinx-coroutines-android/api/kotlinx-coroutines-android.api @@ -1,7 +1,7 @@ public abstract class kotlinx/coroutines/android/HandlerDispatcher : kotlinx/coroutines/MainCoroutineDispatcher, kotlinx/coroutines/Delay { public fun delay (JLkotlin/coroutines/Continuation;)Ljava/lang/Object; public abstract fun getImmediate ()Lkotlinx/coroutines/android/HandlerDispatcher; - public fun invokeOnTimeout (JLjava/lang/Runnable;)Lkotlinx/coroutines/DisposableHandle; + public fun invokeOnTimeout (JLjava/lang/Runnable;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/DisposableHandle; } public final class kotlinx/coroutines/android/HandlerDispatcherKt { diff --git a/ui/kotlinx-coroutines-android/src/HandlerDispatcher.kt b/ui/kotlinx-coroutines-android/src/HandlerDispatcher.kt index 1693409875..af79da7c97 100644 --- a/ui/kotlinx-coroutines-android/src/HandlerDispatcher.kt +++ b/ui/kotlinx-coroutines-android/src/HandlerDispatcher.kt @@ -140,7 +140,7 @@ internal class HandlerContext private constructor( continuation.invokeOnCancellation { handler.removeCallbacks(block) } } - override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle { + override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle { handler.postDelayed(block, timeMillis.coerceAtMost(MAX_DELAY)) return object : DisposableHandle { override fun dispose() { diff --git a/ui/kotlinx-coroutines-javafx/api/kotlinx-coroutines-javafx.api b/ui/kotlinx-coroutines-javafx/api/kotlinx-coroutines-javafx.api index 620e904612..e2c3b8f326 100644 --- a/ui/kotlinx-coroutines-javafx/api/kotlinx-coroutines-javafx.api +++ b/ui/kotlinx-coroutines-javafx/api/kotlinx-coroutines-javafx.api @@ -5,7 +5,7 @@ public final class kotlinx/coroutines/javafx/JavaFxConvertKt { public abstract class kotlinx/coroutines/javafx/JavaFxDispatcher : kotlinx/coroutines/MainCoroutineDispatcher, kotlinx/coroutines/Delay { public fun delay (JLkotlin/coroutines/Continuation;)Ljava/lang/Object; public fun dispatch (Lkotlin/coroutines/CoroutineContext;Ljava/lang/Runnable;)V - public fun invokeOnTimeout (JLjava/lang/Runnable;)Lkotlinx/coroutines/DisposableHandle; + public fun invokeOnTimeout (JLjava/lang/Runnable;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/DisposableHandle; public fun scheduleResumeAfterDelay (JLkotlinx/coroutines/CancellableContinuation;)V } diff --git a/ui/kotlinx-coroutines-javafx/src/JavaFxDispatcher.kt b/ui/kotlinx-coroutines-javafx/src/JavaFxDispatcher.kt index a13a68368e..c3069d636f 100644 --- a/ui/kotlinx-coroutines-javafx/src/JavaFxDispatcher.kt +++ b/ui/kotlinx-coroutines-javafx/src/JavaFxDispatcher.kt @@ -42,7 +42,7 @@ public sealed class JavaFxDispatcher : MainCoroutineDispatcher(), Delay { } /** @suppress */ - override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle { + override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle { val timeline = schedule(timeMillis, TimeUnit.MILLISECONDS, EventHandler { block.run() }) diff --git a/ui/kotlinx-coroutines-swing/api/kotlinx-coroutines-swing.api b/ui/kotlinx-coroutines-swing/api/kotlinx-coroutines-swing.api index 09556e807f..d33191fd96 100644 --- a/ui/kotlinx-coroutines-swing/api/kotlinx-coroutines-swing.api +++ b/ui/kotlinx-coroutines-swing/api/kotlinx-coroutines-swing.api @@ -1,7 +1,7 @@ public abstract class kotlinx/coroutines/swing/SwingDispatcher : kotlinx/coroutines/MainCoroutineDispatcher, kotlinx/coroutines/Delay { public fun delay (JLkotlin/coroutines/Continuation;)Ljava/lang/Object; public fun dispatch (Lkotlin/coroutines/CoroutineContext;Ljava/lang/Runnable;)V - public fun invokeOnTimeout (JLjava/lang/Runnable;)Lkotlinx/coroutines/DisposableHandle; + public fun invokeOnTimeout (JLjava/lang/Runnable;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/DisposableHandle; public fun scheduleResumeAfterDelay (JLkotlinx/coroutines/CancellableContinuation;)V } diff --git a/ui/kotlinx-coroutines-swing/src/SwingDispatcher.kt b/ui/kotlinx-coroutines-swing/src/SwingDispatcher.kt index 77f109df91..054ed1f60e 100644 --- a/ui/kotlinx-coroutines-swing/src/SwingDispatcher.kt +++ b/ui/kotlinx-coroutines-swing/src/SwingDispatcher.kt @@ -36,7 +36,7 @@ public sealed class SwingDispatcher : MainCoroutineDispatcher(), Delay { } /** @suppress */ - override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle { + override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle { val timer = schedule(timeMillis, TimeUnit.MILLISECONDS, ActionListener { block.run() })