From d28c635cda470b9c312909c4525c5b0f3a59c554 Mon Sep 17 00:00:00 2001 From: Roman Elizarov Date: Tue, 12 May 2020 11:56:39 +0300 Subject: [PATCH 01/11] Cancel current Job on RejectedExecutionException When the Executor that was used with Executor.asCoroutineDispatcher() extension rejects submitted task, it means that it had reached its capacity and so the executing current Job should be cancelled to terminate it as soon as possible. This way RejectedExecutionException works as a rate-limiter just like it serves this purpose in executor-based Java code. Fixes #2003 --- .../api/kotlinx-coroutines-core.api | 4 +- kotlinx-coroutines-core/common/src/Delay.kt | 4 +- kotlinx-coroutines-core/common/src/Timeout.kt | 2 +- .../common/src/selects/Select.kt | 2 +- .../common/test/flow/VirtualTime.kt | 4 +- kotlinx-coroutines-core/jvm/src/CommonPool.kt | 2 + .../jvm/src/DefaultExecutor.kt | 3 +- kotlinx-coroutines-core/jvm/src/Executors.kt | 22 ++-- .../jvm/src/internal/MainDispatchers.kt | 9 +- .../jvm/src/scheduling/Dispatcher.kt | 8 +- .../jvm/src/test_/TestCoroutineContext.kt | 2 +- .../jvm/test/RejectedExecutionTest.kt | 119 ++++++++++++++++++ .../native/src/CoroutineContext.kt | 4 +- .../api/kotlinx-coroutines-test.api | 2 +- .../src/TestCoroutineDispatcher.kt | 2 +- .../src/internal/MainTestDispatcher.kt | 4 +- .../api/kotlinx-coroutines-reactor.api | 2 +- .../src/Scheduler.kt | 2 +- .../api/kotlinx-coroutines-rx2.api | 2 +- .../kotlinx-coroutines-rx2/src/RxScheduler.kt | 2 +- .../api/kotlinx-coroutines-rx3.api | 2 +- .../kotlinx-coroutines-rx3/src/RxScheduler.kt | 2 +- .../api/kotlinx-coroutines-android.api | 2 +- .../src/HandlerDispatcher.kt | 2 +- .../api/kotlinx-coroutines-javafx.api | 2 +- .../src/JavaFxDispatcher.kt | 2 +- .../api/kotlinx-coroutines-swing.api | 2 +- .../src/SwingDispatcher.kt | 2 +- 28 files changed, 175 insertions(+), 42 deletions(-) create mode 100644 kotlinx-coroutines-core/jvm/test/RejectedExecutionTest.kt diff --git a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api index 36cbdb6960..85b9c5b0e2 100644 --- a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api +++ b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api @@ -257,13 +257,13 @@ public final class kotlinx/coroutines/Deferred$DefaultImpls { public abstract interface class kotlinx/coroutines/Delay { public abstract fun delay (JLkotlin/coroutines/Continuation;)Ljava/lang/Object; - public abstract fun invokeOnTimeout (JLjava/lang/Runnable;)Lkotlinx/coroutines/DisposableHandle; + public abstract fun invokeOnTimeout (JLjava/lang/Runnable;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/DisposableHandle; public abstract fun scheduleResumeAfterDelay (JLkotlinx/coroutines/CancellableContinuation;)V } public final class kotlinx/coroutines/Delay$DefaultImpls { public static fun delay (Lkotlinx/coroutines/Delay;JLkotlin/coroutines/Continuation;)Ljava/lang/Object; - public static fun invokeOnTimeout (Lkotlinx/coroutines/Delay;JLjava/lang/Runnable;)Lkotlinx/coroutines/DisposableHandle; + public static fun invokeOnTimeout (Lkotlinx/coroutines/Delay;JLjava/lang/Runnable;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/DisposableHandle; } public final class kotlinx/coroutines/DelayKt { diff --git a/kotlinx-coroutines-core/common/src/Delay.kt b/kotlinx-coroutines-core/common/src/Delay.kt index ab80912269..70d94cc05f 100644 --- a/kotlinx-coroutines-core/common/src/Delay.kt +++ b/kotlinx-coroutines-core/common/src/Delay.kt @@ -54,8 +54,8 @@ public interface Delay { * * This implementation uses a built-in single-threaded scheduled executor service. */ - public fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle = - DefaultDelay.invokeOnTimeout(timeMillis, block) + public fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle = + DefaultDelay.invokeOnTimeout(timeMillis, block, context) } /** diff --git a/kotlinx-coroutines-core/common/src/Timeout.kt b/kotlinx-coroutines-core/common/src/Timeout.kt index c8e4455c92..103d236937 100644 --- a/kotlinx-coroutines-core/common/src/Timeout.kt +++ b/kotlinx-coroutines-core/common/src/Timeout.kt @@ -114,7 +114,7 @@ private fun setupTimeout( // schedule cancellation of this coroutine on time val cont = coroutine.uCont val context = cont.context - coroutine.disposeOnCompletion(context.delay.invokeOnTimeout(coroutine.time, coroutine)) + coroutine.disposeOnCompletion(context.delay.invokeOnTimeout(coroutine.time, coroutine, coroutine.context)) // restart the block using a new coroutine with a new job, // however, start it undispatched, because we already are in the proper context return coroutine.startUndispatchedOrReturnIgnoreTimeout(coroutine, block) diff --git a/kotlinx-coroutines-core/common/src/selects/Select.kt b/kotlinx-coroutines-core/common/src/selects/Select.kt index e744a0c724..628d6f7aa5 100644 --- a/kotlinx-coroutines-core/common/src/selects/Select.kt +++ b/kotlinx-coroutines-core/common/src/selects/Select.kt @@ -655,7 +655,7 @@ internal class SelectBuilderImpl( if (trySelect()) block.startCoroutineCancellable(completion) // shall be cancellable while waits for dispatch } - disposeOnSelect(context.delay.invokeOnTimeout(timeMillis, action)) + disposeOnSelect(context.delay.invokeOnTimeout(timeMillis, action, context)) } private class DisposeNode( diff --git a/kotlinx-coroutines-core/common/test/flow/VirtualTime.kt b/kotlinx-coroutines-core/common/test/flow/VirtualTime.kt index 9b257d933e..a607a4722a 100644 --- a/kotlinx-coroutines-core/common/test/flow/VirtualTime.kt +++ b/kotlinx-coroutines-core/common/test/flow/VirtualTime.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines @@ -50,7 +50,7 @@ private class VirtualTimeDispatcher(enclosingScope: CoroutineScope) : CoroutineD @ExperimentalCoroutinesApi override fun isDispatchNeeded(context: CoroutineContext): Boolean = originalDispatcher.isDispatchNeeded(context) - override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle { + override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle { val task = TimedTask(block, currentTime + timeMillis) heap += task return task diff --git a/kotlinx-coroutines-core/jvm/src/CommonPool.kt b/kotlinx-coroutines-core/jvm/src/CommonPool.kt index 60f30cfe14..2203313120 100644 --- a/kotlinx-coroutines-core/jvm/src/CommonPool.kt +++ b/kotlinx-coroutines-core/jvm/src/CommonPool.kt @@ -103,6 +103,8 @@ internal object CommonPool : ExecutorCoroutineDispatcher() { (pool ?: getOrCreatePoolSync()).execute(wrapTask(block)) } catch (e: RejectedExecutionException) { unTrackTask() + // CommonPool only rejects execution when it is being closed and this behavior is reserved + // for testing purposes, so we don't have to worry about cancelling the affected Job here. DefaultExecutor.enqueue(block) } } diff --git a/kotlinx-coroutines-core/jvm/src/DefaultExecutor.kt b/kotlinx-coroutines-core/jvm/src/DefaultExecutor.kt index ed84f55e74..787cbf9c44 100644 --- a/kotlinx-coroutines-core/jvm/src/DefaultExecutor.kt +++ b/kotlinx-coroutines-core/jvm/src/DefaultExecutor.kt @@ -5,6 +5,7 @@ package kotlinx.coroutines import java.util.concurrent.* +import kotlin.coroutines.* internal actual val DefaultDelay: Delay = DefaultExecutor @@ -54,7 +55,7 @@ internal actual object DefaultExecutor : EventLoopImplBase(), Runnable { * Livelock is possible only if `runBlocking` is called on internal default executed (which is used by default [delay]), * but it's not exposed as public API. */ - override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle = + override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle = scheduleInvokeOnTimeout(timeMillis, block) override fun run() { diff --git a/kotlinx-coroutines-core/jvm/src/Executors.kt b/kotlinx-coroutines-core/jvm/src/Executors.kt index 8c36bfa14f..840cc46b2b 100644 --- a/kotlinx-coroutines-core/jvm/src/Executors.kt +++ b/kotlinx-coroutines-core/jvm/src/Executors.kt @@ -82,6 +82,7 @@ internal abstract class ExecutorCoroutineDispatcherBase : ExecutorCoroutineDispa executor.execute(wrapTask(block)) } catch (e: RejectedExecutionException) { unTrackTask() + cancelJobOnRejection(context, e) DefaultExecutor.enqueue(block) } } @@ -93,7 +94,7 @@ internal abstract class ExecutorCoroutineDispatcherBase : ExecutorCoroutineDispa */ override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation) { val future = if (removesFutureOnCancellation) { - scheduleBlock(ResumeUndispatchedRunnable(this, continuation), timeMillis, TimeUnit.MILLISECONDS) + scheduleBlock(ResumeUndispatchedRunnable(this, continuation), continuation.context, timeMillis) } else { null } @@ -106,24 +107,31 @@ internal abstract class ExecutorCoroutineDispatcherBase : ExecutorCoroutineDispa DefaultExecutor.scheduleResumeAfterDelay(timeMillis, continuation) } - override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle { + override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle { val future = if (removesFutureOnCancellation) { - scheduleBlock(block, timeMillis, TimeUnit.MILLISECONDS) + scheduleBlock(block, context, timeMillis) } else { null } - - return if (future != null ) DisposableFutureHandle(future) else DefaultExecutor.invokeOnTimeout(timeMillis, block) + return when { + future != null -> DisposableFutureHandle(future) + else -> DefaultExecutor.invokeOnTimeout(timeMillis, block, context) + } } - private fun scheduleBlock(block: Runnable, time: Long, unit: TimeUnit): ScheduledFuture<*>? { + private fun scheduleBlock(block: Runnable, context: CoroutineContext, timeMillis: Long): ScheduledFuture<*>? { return try { - (executor as? ScheduledExecutorService)?.schedule(block, time, unit) + (executor as? ScheduledExecutorService)?.schedule(block, timeMillis, TimeUnit.MILLISECONDS) } catch (e: RejectedExecutionException) { + cancelJobOnRejection(context, e) null } } + private fun cancelJobOnRejection(context: CoroutineContext, exception: RejectedExecutionException) { + context[Job]?.cancel(CancellationException("The task was rejected", exception)) + } + override fun close() { (executor as? ExecutorService)?.shutdown() } diff --git a/kotlinx-coroutines-core/jvm/src/internal/MainDispatchers.kt b/kotlinx-coroutines-core/jvm/src/internal/MainDispatchers.kt index ddfcdbb142..5b2b9ff68c 100644 --- a/kotlinx-coroutines-core/jvm/src/internal/MainDispatchers.kt +++ b/kotlinx-coroutines-core/jvm/src/internal/MainDispatchers.kt @@ -87,17 +87,14 @@ private class MissingMainCoroutineDispatcher( override val immediate: MainCoroutineDispatcher get() = this - override fun isDispatchNeeded(context: CoroutineContext): Boolean { + override fun isDispatchNeeded(context: CoroutineContext): Boolean = missing() - } - override suspend fun delay(time: Long) { + override suspend fun delay(time: Long) = missing() - } - override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle { + override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle = missing() - } override fun dispatch(context: CoroutineContext, block: Runnable) = missing() diff --git a/kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt b/kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt index e0890eff66..202c6e1d06 100644 --- a/kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt +++ b/kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt @@ -65,6 +65,8 @@ public open class ExperimentalCoroutineDispatcher( try { coroutineScheduler.dispatch(block) } catch (e: RejectedExecutionException) { + // CoroutineScheduler only rejects execution when it is being closed and this behavior is reserved + // for testing purposes, so we don't have to worry about cancelling the affected Job here. DefaultExecutor.dispatch(context, block) } @@ -72,6 +74,8 @@ public open class ExperimentalCoroutineDispatcher( try { coroutineScheduler.dispatch(block, tailDispatch = true) } catch (e: RejectedExecutionException) { + // CoroutineScheduler only rejects execution when it is being closed and this behavior is reserved + // for testing purposes, so we don't have to worry about cancelling the affected Job here. DefaultExecutor.dispatchYield(context, block) } @@ -110,7 +114,9 @@ public open class ExperimentalCoroutineDispatcher( try { coroutineScheduler.dispatch(block, context, tailDispatch) } catch (e: RejectedExecutionException) { - // Context shouldn't be lost here to properly invoke before/after task + // CoroutineScheduler only rejects execution when it is being closed and this behavior is reserved + // for testing purposes, so we don't have to worry about cancelling the affected Job here. + // TaskContext shouldn't be lost here to properly invoke before/after task DefaultExecutor.enqueue(coroutineScheduler.createTask(block, context)) } } diff --git a/kotlinx-coroutines-core/jvm/src/test_/TestCoroutineContext.kt b/kotlinx-coroutines-core/jvm/src/test_/TestCoroutineContext.kt index e7c8b6b671..649c95375d 100644 --- a/kotlinx-coroutines-core/jvm/src/test_/TestCoroutineContext.kt +++ b/kotlinx-coroutines-core/jvm/src/test_/TestCoroutineContext.kt @@ -230,7 +230,7 @@ public class TestCoroutineContext(private val name: String? = null) : CoroutineC }, timeMillis) } - override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle { + override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle { val node = postDelayed(block, timeMillis) return object : DisposableHandle { override fun dispose() { diff --git a/kotlinx-coroutines-core/jvm/test/RejectedExecutionTest.kt b/kotlinx-coroutines-core/jvm/test/RejectedExecutionTest.kt new file mode 100644 index 0000000000..27eb2afeb1 --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/RejectedExecutionTest.kt @@ -0,0 +1,119 @@ +/* + * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines + +import org.junit.* +import org.junit.Test +import java.util.concurrent.* +import kotlin.test.* + +class RejectedExecutionTest : TestBase() { + private val threadName = "RejectedExecutionTest" + private val executor = RejectingExecutor() + + @After + fun tearDown() { + executor.shutdown() + executor.awaitTermination(10, TimeUnit.SECONDS) + } + + @Test + fun testRejectOnLaunch() = runTest { + expect(1) + val job = launch(executor.asCoroutineDispatcher()) { + expectUnreached() + } + assertEquals(1, executor.submittedTasks) + assertTrue(job.isCancelled) + finish(2) + } + + @Test + fun testRejectOnLaunchAtomic() = runTest { + expect(1) + val job = launch(executor.asCoroutineDispatcher(), start = CoroutineStart.ATOMIC) { + expect(2) + assertEquals(true, coroutineContext[Job]?.isCancelled) + assertNotSame(threadName, Thread.currentThread().name) // should have got dispatched on the DefaultExecutor + } + assertEquals(1, executor.submittedTasks) + job.join() + finish(3) + } + + @Test + fun testRejectOnWithContext() = runTest { + expect(1) + assertFailsWith { + withContext(executor.asCoroutineDispatcher()) { + expectUnreached() + } + } + assertEquals(1, executor.submittedTasks) + finish(2) + } + + @Test + fun testRejectOnResumeInContext() = runTest { + expect(1) + executor.acceptTasks = 1 // accept one task + assertFailsWith { + withContext(executor.asCoroutineDispatcher()) { + expect(2) + withContext(Dispatchers.Default) { + expect(3) + } + // cancelled on resume back + expectUnreached() + } + } + assertEquals(2, executor.submittedTasks) + finish(4) + } + + @Test + fun testRejectOnDelay() = runTest { + expect(1) + executor.acceptTasks = 1 // accept one task + assertFailsWith { + withContext(executor.asCoroutineDispatcher()) { + expect(2) + delay(10) // cancelled + expectUnreached() + } + } + assertEquals(2, executor.submittedTasks) + finish(3) + } + + @Test + fun testRejectWithTimeout() = runTest { + expect(1) + executor.acceptTasks = 1 // accept one task + assertFailsWith { + withContext(executor.asCoroutineDispatcher()) { + expect(2) + withTimeout(1000) { + expect(3) // atomic entry into the block (legacy behavior, it seem to be Ok with way) + assertEquals(true, coroutineContext[Job]?.isCancelled) // but the job is already cancelled + } + expectUnreached() + } + } + assertEquals(2, executor.submittedTasks) + finish(4) + } + + private inner class RejectingExecutor : ScheduledThreadPoolExecutor(1, { r -> Thread(r, threadName) }) { + var acceptTasks = 0 + var submittedTasks = 0 + + override fun schedule(command: Runnable, delay: Long, unit: TimeUnit): ScheduledFuture<*> { + submittedTasks++ + if (submittedTasks > acceptTasks) throw RejectedExecutionException() + return super.schedule(command, delay, unit) + } + } +} \ No newline at end of file diff --git a/kotlinx-coroutines-core/native/src/CoroutineContext.kt b/kotlinx-coroutines-core/native/src/CoroutineContext.kt index bcc7f48963..4ec1289ee7 100644 --- a/kotlinx-coroutines-core/native/src/CoroutineContext.kt +++ b/kotlinx-coroutines-core/native/src/CoroutineContext.kt @@ -16,8 +16,8 @@ internal actual object DefaultExecutor : CoroutineDispatcher(), Delay { takeEventLoop().dispatch(context, block) override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation) = takeEventLoop().scheduleResumeAfterDelay(timeMillis, continuation) - override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle = - takeEventLoop().invokeOnTimeout(timeMillis, block) + override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle = + takeEventLoop().invokeOnTimeout(timeMillis, block, context) actual fun enqueue(task: Runnable): Unit = loopWasShutDown() } diff --git a/kotlinx-coroutines-test/api/kotlinx-coroutines-test.api b/kotlinx-coroutines-test/api/kotlinx-coroutines-test.api index e3b1f73e4f..c99ec5cbf1 100644 --- a/kotlinx-coroutines-test/api/kotlinx-coroutines-test.api +++ b/kotlinx-coroutines-test/api/kotlinx-coroutines-test.api @@ -25,7 +25,7 @@ public final class kotlinx/coroutines/test/TestCoroutineDispatcher : kotlinx/cor public fun dispatch (Lkotlin/coroutines/CoroutineContext;Ljava/lang/Runnable;)V public fun dispatchYield (Lkotlin/coroutines/CoroutineContext;Ljava/lang/Runnable;)V public fun getCurrentTime ()J - public fun invokeOnTimeout (JLjava/lang/Runnable;)Lkotlinx/coroutines/DisposableHandle; + public fun invokeOnTimeout (JLjava/lang/Runnable;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/DisposableHandle; public fun pauseDispatcher ()V public fun pauseDispatcher (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public fun resumeDispatcher ()V diff --git a/kotlinx-coroutines-test/src/TestCoroutineDispatcher.kt b/kotlinx-coroutines-test/src/TestCoroutineDispatcher.kt index 4706d627ef..cad2636f97 100644 --- a/kotlinx-coroutines-test/src/TestCoroutineDispatcher.kt +++ b/kotlinx-coroutines-test/src/TestCoroutineDispatcher.kt @@ -65,7 +65,7 @@ public class TestCoroutineDispatcher: CoroutineDispatcher(), Delay, DelayControl } /** @suppress */ - override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle { + override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle { val node = postDelayed(block, timeMillis) return object : DisposableHandle { override fun dispose() { diff --git a/kotlinx-coroutines-test/src/internal/MainTestDispatcher.kt b/kotlinx-coroutines-test/src/internal/MainTestDispatcher.kt index c18e4108bb..baa1aa5fd2 100644 --- a/kotlinx-coroutines-test/src/internal/MainTestDispatcher.kt +++ b/kotlinx-coroutines-test/src/internal/MainTestDispatcher.kt @@ -46,8 +46,8 @@ internal class TestMainDispatcher(private val mainFactory: MainDispatcherFactory delay.delay(time) } - override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle { - return delay.invokeOnTimeout(timeMillis, block) + override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle { + return delay.invokeOnTimeout(timeMillis, block, context) } public fun setDispatcher(dispatcher: CoroutineDispatcher) { diff --git a/reactive/kotlinx-coroutines-reactor/api/kotlinx-coroutines-reactor.api b/reactive/kotlinx-coroutines-reactor/api/kotlinx-coroutines-reactor.api index 3b5c6b9522..b46fe338e5 100644 --- a/reactive/kotlinx-coroutines-reactor/api/kotlinx-coroutines-reactor.api +++ b/reactive/kotlinx-coroutines-reactor/api/kotlinx-coroutines-reactor.api @@ -49,7 +49,7 @@ public final class kotlinx/coroutines/reactor/SchedulerCoroutineDispatcher : kot public fun equals (Ljava/lang/Object;)Z public final fun getScheduler ()Lreactor/core/scheduler/Scheduler; public fun hashCode ()I - public fun invokeOnTimeout (JLjava/lang/Runnable;)Lkotlinx/coroutines/DisposableHandle; + public fun invokeOnTimeout (JLjava/lang/Runnable;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/DisposableHandle; public fun scheduleResumeAfterDelay (JLkotlinx/coroutines/CancellableContinuation;)V public fun toString ()Ljava/lang/String; } diff --git a/reactive/kotlinx-coroutines-reactor/src/Scheduler.kt b/reactive/kotlinx-coroutines-reactor/src/Scheduler.kt index e176c07bb9..4fb5514322 100644 --- a/reactive/kotlinx-coroutines-reactor/src/Scheduler.kt +++ b/reactive/kotlinx-coroutines-reactor/src/Scheduler.kt @@ -39,7 +39,7 @@ public class SchedulerCoroutineDispatcher( } /** @suppress */ - override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle = + override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle = scheduler.schedule(block, timeMillis, TimeUnit.MILLISECONDS).asDisposableHandle() /** @suppress */ diff --git a/reactive/kotlinx-coroutines-rx2/api/kotlinx-coroutines-rx2.api b/reactive/kotlinx-coroutines-rx2/api/kotlinx-coroutines-rx2.api index 06ddb68e9c..84c337446c 100644 --- a/reactive/kotlinx-coroutines-rx2/api/kotlinx-coroutines-rx2.api +++ b/reactive/kotlinx-coroutines-rx2/api/kotlinx-coroutines-rx2.api @@ -80,7 +80,7 @@ public final class kotlinx/coroutines/rx2/SchedulerCoroutineDispatcher : kotlinx public fun equals (Ljava/lang/Object;)Z public final fun getScheduler ()Lio/reactivex/Scheduler; public fun hashCode ()I - public fun invokeOnTimeout (JLjava/lang/Runnable;)Lkotlinx/coroutines/DisposableHandle; + public fun invokeOnTimeout (JLjava/lang/Runnable;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/DisposableHandle; public fun scheduleResumeAfterDelay (JLkotlinx/coroutines/CancellableContinuation;)V public fun toString ()Ljava/lang/String; } diff --git a/reactive/kotlinx-coroutines-rx2/src/RxScheduler.kt b/reactive/kotlinx-coroutines-rx2/src/RxScheduler.kt index 3ddb67649e..9952eb91a0 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxScheduler.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxScheduler.kt @@ -38,7 +38,7 @@ public class SchedulerCoroutineDispatcher( } /** @suppress */ - override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle { + override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle { val disposable = scheduler.scheduleDirect(block, timeMillis, TimeUnit.MILLISECONDS) return DisposableHandle { disposable.dispose() } } diff --git a/reactive/kotlinx-coroutines-rx3/api/kotlinx-coroutines-rx3.api b/reactive/kotlinx-coroutines-rx3/api/kotlinx-coroutines-rx3.api index 4f15eda7d4..5ab6d7336d 100644 --- a/reactive/kotlinx-coroutines-rx3/api/kotlinx-coroutines-rx3.api +++ b/reactive/kotlinx-coroutines-rx3/api/kotlinx-coroutines-rx3.api @@ -67,7 +67,7 @@ public final class kotlinx/coroutines/rx3/SchedulerCoroutineDispatcher : kotlinx public fun equals (Ljava/lang/Object;)Z public final fun getScheduler ()Lio/reactivex/rxjava3/core/Scheduler; public fun hashCode ()I - public fun invokeOnTimeout (JLjava/lang/Runnable;)Lkotlinx/coroutines/DisposableHandle; + public fun invokeOnTimeout (JLjava/lang/Runnable;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/DisposableHandle; public fun scheduleResumeAfterDelay (JLkotlinx/coroutines/CancellableContinuation;)V public fun toString ()Ljava/lang/String; } diff --git a/reactive/kotlinx-coroutines-rx3/src/RxScheduler.kt b/reactive/kotlinx-coroutines-rx3/src/RxScheduler.kt index 6e91aeea85..a426aea6ba 100644 --- a/reactive/kotlinx-coroutines-rx3/src/RxScheduler.kt +++ b/reactive/kotlinx-coroutines-rx3/src/RxScheduler.kt @@ -38,7 +38,7 @@ public class SchedulerCoroutineDispatcher( } /** @suppress */ - override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle { + override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle { val disposable = scheduler.scheduleDirect(block, timeMillis, TimeUnit.MILLISECONDS) return DisposableHandle { disposable.dispose() } } diff --git a/ui/kotlinx-coroutines-android/api/kotlinx-coroutines-android.api b/ui/kotlinx-coroutines-android/api/kotlinx-coroutines-android.api index b97d8462c3..090c14e09c 100644 --- a/ui/kotlinx-coroutines-android/api/kotlinx-coroutines-android.api +++ b/ui/kotlinx-coroutines-android/api/kotlinx-coroutines-android.api @@ -1,7 +1,7 @@ public abstract class kotlinx/coroutines/android/HandlerDispatcher : kotlinx/coroutines/MainCoroutineDispatcher, kotlinx/coroutines/Delay { public fun delay (JLkotlin/coroutines/Continuation;)Ljava/lang/Object; public abstract fun getImmediate ()Lkotlinx/coroutines/android/HandlerDispatcher; - public fun invokeOnTimeout (JLjava/lang/Runnable;)Lkotlinx/coroutines/DisposableHandle; + public fun invokeOnTimeout (JLjava/lang/Runnable;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/DisposableHandle; } public final class kotlinx/coroutines/android/HandlerDispatcherKt { diff --git a/ui/kotlinx-coroutines-android/src/HandlerDispatcher.kt b/ui/kotlinx-coroutines-android/src/HandlerDispatcher.kt index 1693409875..af79da7c97 100644 --- a/ui/kotlinx-coroutines-android/src/HandlerDispatcher.kt +++ b/ui/kotlinx-coroutines-android/src/HandlerDispatcher.kt @@ -140,7 +140,7 @@ internal class HandlerContext private constructor( continuation.invokeOnCancellation { handler.removeCallbacks(block) } } - override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle { + override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle { handler.postDelayed(block, timeMillis.coerceAtMost(MAX_DELAY)) return object : DisposableHandle { override fun dispose() { diff --git a/ui/kotlinx-coroutines-javafx/api/kotlinx-coroutines-javafx.api b/ui/kotlinx-coroutines-javafx/api/kotlinx-coroutines-javafx.api index 620e904612..e2c3b8f326 100644 --- a/ui/kotlinx-coroutines-javafx/api/kotlinx-coroutines-javafx.api +++ b/ui/kotlinx-coroutines-javafx/api/kotlinx-coroutines-javafx.api @@ -5,7 +5,7 @@ public final class kotlinx/coroutines/javafx/JavaFxConvertKt { public abstract class kotlinx/coroutines/javafx/JavaFxDispatcher : kotlinx/coroutines/MainCoroutineDispatcher, kotlinx/coroutines/Delay { public fun delay (JLkotlin/coroutines/Continuation;)Ljava/lang/Object; public fun dispatch (Lkotlin/coroutines/CoroutineContext;Ljava/lang/Runnable;)V - public fun invokeOnTimeout (JLjava/lang/Runnable;)Lkotlinx/coroutines/DisposableHandle; + public fun invokeOnTimeout (JLjava/lang/Runnable;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/DisposableHandle; public fun scheduleResumeAfterDelay (JLkotlinx/coroutines/CancellableContinuation;)V } diff --git a/ui/kotlinx-coroutines-javafx/src/JavaFxDispatcher.kt b/ui/kotlinx-coroutines-javafx/src/JavaFxDispatcher.kt index a13a68368e..c3069d636f 100644 --- a/ui/kotlinx-coroutines-javafx/src/JavaFxDispatcher.kt +++ b/ui/kotlinx-coroutines-javafx/src/JavaFxDispatcher.kt @@ -42,7 +42,7 @@ public sealed class JavaFxDispatcher : MainCoroutineDispatcher(), Delay { } /** @suppress */ - override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle { + override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle { val timeline = schedule(timeMillis, TimeUnit.MILLISECONDS, EventHandler { block.run() }) diff --git a/ui/kotlinx-coroutines-swing/api/kotlinx-coroutines-swing.api b/ui/kotlinx-coroutines-swing/api/kotlinx-coroutines-swing.api index 09556e807f..d33191fd96 100644 --- a/ui/kotlinx-coroutines-swing/api/kotlinx-coroutines-swing.api +++ b/ui/kotlinx-coroutines-swing/api/kotlinx-coroutines-swing.api @@ -1,7 +1,7 @@ public abstract class kotlinx/coroutines/swing/SwingDispatcher : kotlinx/coroutines/MainCoroutineDispatcher, kotlinx/coroutines/Delay { public fun delay (JLkotlin/coroutines/Continuation;)Ljava/lang/Object; public fun dispatch (Lkotlin/coroutines/CoroutineContext;Ljava/lang/Runnable;)V - public fun invokeOnTimeout (JLjava/lang/Runnable;)Lkotlinx/coroutines/DisposableHandle; + public fun invokeOnTimeout (JLjava/lang/Runnable;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/DisposableHandle; public fun scheduleResumeAfterDelay (JLkotlinx/coroutines/CancellableContinuation;)V } diff --git a/ui/kotlinx-coroutines-swing/src/SwingDispatcher.kt b/ui/kotlinx-coroutines-swing/src/SwingDispatcher.kt index 77f109df91..054ed1f60e 100644 --- a/ui/kotlinx-coroutines-swing/src/SwingDispatcher.kt +++ b/ui/kotlinx-coroutines-swing/src/SwingDispatcher.kt @@ -36,7 +36,7 @@ public sealed class SwingDispatcher : MainCoroutineDispatcher(), Delay { } /** @suppress */ - override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle { + override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle { val timer = schedule(timeMillis, TimeUnit.MILLISECONDS, ActionListener { block.run() }) From f9936d1a7d9d81cf443c28669b6ce1d2c7fa7879 Mon Sep 17 00:00:00 2001 From: Roman Elizarov Date: Tue, 12 May 2020 20:32:37 +0300 Subject: [PATCH 02/11] ~ Fix JS and Native code --- kotlinx-coroutines-core/js/src/JSDispatcher.kt | 4 ++-- kotlinx-coroutines-core/native/src/EventLoop.kt | 3 ++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/kotlinx-coroutines-core/js/src/JSDispatcher.kt b/kotlinx-coroutines-core/js/src/JSDispatcher.kt index a0dfcba2b7..e1b3dcd7a9 100644 --- a/kotlinx-coroutines-core/js/src/JSDispatcher.kt +++ b/kotlinx-coroutines-core/js/src/JSDispatcher.kt @@ -35,7 +35,7 @@ internal sealed class SetTimeoutBasedDispatcher: CoroutineDispatcher(), Delay { messageQueue.enqueue(block) } - override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle { + override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle { val handle = setTimeout({ block.run() }, delayToInt(timeMillis)) return ClearTimeout(handle) } @@ -81,7 +81,7 @@ internal class WindowDispatcher(private val window: Window) : CoroutineDispatche window.setTimeout({ with(continuation) { resumeUndispatched(Unit) } }, delayToInt(timeMillis)) } - override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle { + override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle { val handle = window.setTimeout({ block.run() }, delayToInt(timeMillis)) return object : DisposableHandle { override fun dispose() { diff --git a/kotlinx-coroutines-core/native/src/EventLoop.kt b/kotlinx-coroutines-core/native/src/EventLoop.kt index d6c6525504..b397d6f182 100644 --- a/kotlinx-coroutines-core/native/src/EventLoop.kt +++ b/kotlinx-coroutines-core/native/src/EventLoop.kt @@ -4,6 +4,7 @@ package kotlinx.coroutines +import kotlin.coroutines.* import kotlin.system.* internal actual abstract class EventLoopImplPlatform: EventLoop() { @@ -13,7 +14,7 @@ internal actual abstract class EventLoopImplPlatform: EventLoop() { } internal class EventLoopImpl: EventLoopImplBase() { - override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle = + override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle = scheduleInvokeOnTimeout(timeMillis, block) } From aaa212f2982459f0d6644931c084c1f34f69b8b8 Mon Sep 17 00:00:00 2001 From: Roman Elizarov Date: Tue, 12 May 2020 20:36:40 +0300 Subject: [PATCH 03/11] ~ Updated asCoroutineDispatcher docs --- kotlinx-coroutines-core/jvm/src/Executors.kt | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/kotlinx-coroutines-core/jvm/src/Executors.kt b/kotlinx-coroutines-core/jvm/src/Executors.kt index 840cc46b2b..f2df9f9c00 100644 --- a/kotlinx-coroutines-core/jvm/src/Executors.kt +++ b/kotlinx-coroutines-core/jvm/src/Executors.kt @@ -38,6 +38,11 @@ public abstract class ExecutorCoroutineDispatcher: CoroutineDispatcher(), Closea /** * Converts an instance of [ExecutorService] to an implementation of [ExecutorCoroutineDispatcher]. + * + * Note, that if the underlying executor uses limited queues and throws [RejectedExecutionException] on + * attempt to submit a task, then the [Job] of the affected task is [cancelled][Job.cancel] and the + * task is submitted to the default single-threaded executor, so that the affected coroutine can cleanup its + * resources and promptly complete. */ @JvmName("from") // this is for a nice Java API, see issue #255 public fun ExecutorService.asCoroutineDispatcher(): ExecutorCoroutineDispatcher = @@ -45,6 +50,11 @@ public fun ExecutorService.asCoroutineDispatcher(): ExecutorCoroutineDispatcher /** * Converts an instance of [Executor] to an implementation of [CoroutineDispatcher]. + * + * Note, that if the underlying executor uses limited queues and throws [RejectedExecutionException] on + * attempt to submit a task, then the [Job] of the affected task is [cancelled][Job.cancel] and the + * task is submitted to the default single-threaded executor, so that the affected coroutine can cleanup its + * resources and promptly complete. */ @JvmName("from") // this is for a nice Java API, see issue #255 public fun Executor.asCoroutineDispatcher(): CoroutineDispatcher = From d2757f9075fafc8083a4641fc9ef4ac4f8877b65 Mon Sep 17 00:00:00 2001 From: Roman Elizarov Date: Tue, 12 May 2020 21:50:41 +0300 Subject: [PATCH 04/11] ~ Fix FailingCoroutinesMachineryTest Reset dispatchers after each test in FailingCoroutinesMachineryTest. Otherwise some tests were running normally, but others with dispatchers that were already shutdown. --- .../test/FailingCoroutinesMachineryTest.kt | 39 ++++++++++++------- 1 file changed, 26 insertions(+), 13 deletions(-) diff --git a/kotlinx-coroutines-core/jvm/test/FailingCoroutinesMachineryTest.kt b/kotlinx-coroutines-core/jvm/test/FailingCoroutinesMachineryTest.kt index 9bf8ffad85..c9f722a5b8 100644 --- a/kotlinx-coroutines-core/jvm/test/FailingCoroutinesMachineryTest.kt +++ b/kotlinx-coroutines-core/jvm/test/FailingCoroutinesMachineryTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines @@ -15,8 +15,21 @@ import kotlin.test.* @RunWith(Parameterized::class) class FailingCoroutinesMachineryTest( private val element: CoroutineContext.Element, - private val dispatcher: CoroutineDispatcher + private val dispatcher: TestDispatcher ) : TestBase() { + class TestDispatcher(val name: String, val block: () -> CoroutineDispatcher) { + private var _value: CoroutineDispatcher? = null + + val value: CoroutineDispatcher + get() = _value ?: block().also { _value = it } + + override fun toString(): String = name + + fun reset() { + runCatching { (_value as? ExecutorCoroutineDispatcher)?.close() } + _value = null + } + } private var caught: Throwable? = null private val latch = CountDownLatch(1) @@ -75,7 +88,7 @@ class FailingCoroutinesMachineryTest( @After fun tearDown() { - runCatching { (dispatcher as? ExecutorCoroutineDispatcher)?.close() } + dispatcher.reset() if (lazyOuterDispatcher.isInitialized()) lazyOuterDispatcher.value.close() } @@ -84,14 +97,14 @@ class FailingCoroutinesMachineryTest( @Parameterized.Parameters(name = "Element: {0}, dispatcher: {1}") fun dispatchers(): List> { val elements = listOf(FailingRestore, FailingUpdate) - val dispatchers = listOf( - Dispatchers.Unconfined, - Dispatchers.Default, - Executors.newFixedThreadPool(1).asCoroutineDispatcher(), - Executors.newScheduledThreadPool(1).asCoroutineDispatcher(), - ThrowingDispatcher, ThrowingDispatcher2 + val dispatchers = listOf( + TestDispatcher("Dispatchers.Unconfined") { Dispatchers.Unconfined }, + TestDispatcher("Dispatchers.Default") { Dispatchers.Default }, + TestDispatcher("Executors.newFixedThreadPool(1)") { Executors.newFixedThreadPool(1).asCoroutineDispatcher() }, + TestDispatcher("Executors.newScheduledThreadPool(1)") { Executors.newScheduledThreadPool(1).asCoroutineDispatcher() }, + TestDispatcher("ThrowingDispatcher") { ThrowingDispatcher }, + TestDispatcher("ThrowingDispatcher2") { ThrowingDispatcher2 } ) - return elements.flatMap { element -> dispatchers.map { dispatcher -> arrayOf(element, dispatcher) @@ -102,13 +115,13 @@ class FailingCoroutinesMachineryTest( @Test fun testElement() = runTest { - launch(NonCancellable + dispatcher + exceptionHandler + element) {} + launch(NonCancellable + dispatcher.value + exceptionHandler + element) {} checkException() } @Test fun testNestedElement() = runTest { - launch(NonCancellable + dispatcher + exceptionHandler) { + launch(NonCancellable + dispatcher.value + exceptionHandler) { launch(element) { } } checkException() @@ -117,7 +130,7 @@ class FailingCoroutinesMachineryTest( @Test fun testNestedDispatcherAndElement() = runTest { launch(lazyOuterDispatcher.value + NonCancellable + exceptionHandler) { - launch(element + dispatcher) { } + launch(element + dispatcher.value) { } } checkException() } From 05e3f24b438ca19915a48b078974782e71a8eb2f Mon Sep 17 00:00:00 2001 From: Roman Elizarov Date: Tue, 12 May 2020 21:50:58 +0300 Subject: [PATCH 05/11] ~ Use CoroutineContext.cancel extension --- kotlinx-coroutines-core/jvm/src/Executors.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kotlinx-coroutines-core/jvm/src/Executors.kt b/kotlinx-coroutines-core/jvm/src/Executors.kt index f2df9f9c00..048e36fa4e 100644 --- a/kotlinx-coroutines-core/jvm/src/Executors.kt +++ b/kotlinx-coroutines-core/jvm/src/Executors.kt @@ -139,7 +139,7 @@ internal abstract class ExecutorCoroutineDispatcherBase : ExecutorCoroutineDispa } private fun cancelJobOnRejection(context: CoroutineContext, exception: RejectedExecutionException) { - context[Job]?.cancel(CancellationException("The task was rejected", exception)) + context.cancel(CancellationException("The task was rejected", exception)) } override fun close() { From b0d1c053fad23419a24d7fb220099432268377de Mon Sep 17 00:00:00 2001 From: Roman Elizarov Date: Wed, 13 May 2020 15:19:03 +0300 Subject: [PATCH 06/11] ~ Chnaged fallback to Dispatchers.IO --- kotlinx-coroutines-core/jvm/src/Executors.kt | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/kotlinx-coroutines-core/jvm/src/Executors.kt b/kotlinx-coroutines-core/jvm/src/Executors.kt index 048e36fa4e..733245e363 100644 --- a/kotlinx-coroutines-core/jvm/src/Executors.kt +++ b/kotlinx-coroutines-core/jvm/src/Executors.kt @@ -41,7 +41,7 @@ public abstract class ExecutorCoroutineDispatcher: CoroutineDispatcher(), Closea * * Note, that if the underlying executor uses limited queues and throws [RejectedExecutionException] on * attempt to submit a task, then the [Job] of the affected task is [cancelled][Job.cancel] and the - * task is submitted to the default single-threaded executor, so that the affected coroutine can cleanup its + * task is submitted to the [Dispatchers.IO], so that the affected coroutine can cleanup its * resources and promptly complete. */ @JvmName("from") // this is for a nice Java API, see issue #255 @@ -53,7 +53,7 @@ public fun ExecutorService.asCoroutineDispatcher(): ExecutorCoroutineDispatcher * * Note, that if the underlying executor uses limited queues and throws [RejectedExecutionException] on * attempt to submit a task, then the [Job] of the affected task is [cancelled][Job.cancel] and the - * task is submitted to the default single-threaded executor, so that the affected coroutine can cleanup its + * task is submitted to the [Dispatchers.IO], so that the affected coroutine can cleanup its * resources and promptly complete. */ @JvmName("from") // this is for a nice Java API, see issue #255 @@ -93,7 +93,7 @@ internal abstract class ExecutorCoroutineDispatcherBase : ExecutorCoroutineDispa } catch (e: RejectedExecutionException) { unTrackTask() cancelJobOnRejection(context, e) - DefaultExecutor.enqueue(block) + Dispatchers.IO.dispatch(context, block) } } From 5eee7a6f8567c8cc119f45ccc1d41a2269589f33 Mon Sep 17 00:00:00 2001 From: Roman Elizarov Date: Thu, 14 May 2020 10:04:41 +0300 Subject: [PATCH 07/11] ~ Rephrased docs a bit --- kotlinx-coroutines-core/jvm/src/Executors.kt | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/kotlinx-coroutines-core/jvm/src/Executors.kt b/kotlinx-coroutines-core/jvm/src/Executors.kt index 733245e363..50761da6c1 100644 --- a/kotlinx-coroutines-core/jvm/src/Executors.kt +++ b/kotlinx-coroutines-core/jvm/src/Executors.kt @@ -39,10 +39,10 @@ public abstract class ExecutorCoroutineDispatcher: CoroutineDispatcher(), Closea /** * Converts an instance of [ExecutorService] to an implementation of [ExecutorCoroutineDispatcher]. * - * Note, that if the underlying executor uses limited queues and throws [RejectedExecutionException] on - * attempt to submit a task, then the [Job] of the affected task is [cancelled][Job.cancel] and the - * task is submitted to the [Dispatchers.IO], so that the affected coroutine can cleanup its - * resources and promptly complete. + * Note, that if the underlying executor throws [RejectedExecutionException] on + * attempt to submit a continuation task (it typically happens on executor shutdown or when it uses limited queues), + * then the [Job] of the affected task is [cancelled][Job.cancel] and the task is submitted to the + * [Dispatchers.IO], so that the affected coroutine can cleanup its resources and promptly complete. */ @JvmName("from") // this is for a nice Java API, see issue #255 public fun ExecutorService.asCoroutineDispatcher(): ExecutorCoroutineDispatcher = @@ -51,10 +51,10 @@ public fun ExecutorService.asCoroutineDispatcher(): ExecutorCoroutineDispatcher /** * Converts an instance of [Executor] to an implementation of [CoroutineDispatcher]. * - * Note, that if the underlying executor uses limited queues and throws [RejectedExecutionException] on - * attempt to submit a task, then the [Job] of the affected task is [cancelled][Job.cancel] and the - * task is submitted to the [Dispatchers.IO], so that the affected coroutine can cleanup its - * resources and promptly complete. + * Note, that if the underlying executor throws [RejectedExecutionException] on + * attempt to submit a continuation task (it typically happens on executor shutdown or when it uses limited queues), + * then the [Job] of the affected task is [cancelled][Job.cancel] and the task is submitted to the + * [Dispatchers.IO], so that the affected coroutine can cleanup its resources and promptly complete. */ @JvmName("from") // this is for a nice Java API, see issue #255 public fun Executor.asCoroutineDispatcher(): CoroutineDispatcher = From 4896990e4f46e8d7e848c1a1609bc347b3993406 Mon Sep 17 00:00:00 2001 From: Roman Elizarov Date: Fri, 15 May 2020 15:23:01 +0300 Subject: [PATCH 08/11] ~ Doc for newSingleThreadContext/newFixedThreadPoolContext --- kotlinx-coroutines-core/jvm/src/Executors.kt | 10 ++++++---- .../jvm/src/ThreadPoolDispatcher.kt | 10 ++++++++++ 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/kotlinx-coroutines-core/jvm/src/Executors.kt b/kotlinx-coroutines-core/jvm/src/Executors.kt index 50761da6c1..8ffc22d8bb 100644 --- a/kotlinx-coroutines-core/jvm/src/Executors.kt +++ b/kotlinx-coroutines-core/jvm/src/Executors.kt @@ -39,8 +39,9 @@ public abstract class ExecutorCoroutineDispatcher: CoroutineDispatcher(), Closea /** * Converts an instance of [ExecutorService] to an implementation of [ExecutorCoroutineDispatcher]. * - * Note, that if the underlying executor throws [RejectedExecutionException] on - * attempt to submit a continuation task (it typically happens on executor shutdown or when it uses limited queues), + * If the underlying executor throws [RejectedExecutionException] on + * attempt to submit a continuation task (it happens when [closing][ExecutorCoroutineDispatcher.close] the + * resulting dispatcher, on underlying executor [shutdown][ExecutorService.shutdown], or when it uses limited queues), * then the [Job] of the affected task is [cancelled][Job.cancel] and the task is submitted to the * [Dispatchers.IO], so that the affected coroutine can cleanup its resources and promptly complete. */ @@ -51,8 +52,9 @@ public fun ExecutorService.asCoroutineDispatcher(): ExecutorCoroutineDispatcher /** * Converts an instance of [Executor] to an implementation of [CoroutineDispatcher]. * - * Note, that if the underlying executor throws [RejectedExecutionException] on - * attempt to submit a continuation task (it typically happens on executor shutdown or when it uses limited queues), + * If the underlying executor throws [RejectedExecutionException] on + * attempt to submit a continuation task (it happens when [closing][ExecutorCoroutineDispatcher.close] the + * resulting dispatcher, on underlying executor [shutdown][ExecutorService.shutdown], or when it uses limited queues), * then the [Job] of the affected task is [cancelled][Job.cancel] and the task is submitted to the * [Dispatchers.IO], so that the affected coroutine can cleanup its resources and promptly complete. */ diff --git a/kotlinx-coroutines-core/jvm/src/ThreadPoolDispatcher.kt b/kotlinx-coroutines-core/jvm/src/ThreadPoolDispatcher.kt index a0e1ffa2c7..aa18cd38d6 100644 --- a/kotlinx-coroutines-core/jvm/src/ThreadPoolDispatcher.kt +++ b/kotlinx-coroutines-core/jvm/src/ThreadPoolDispatcher.kt @@ -14,6 +14,11 @@ import kotlin.coroutines.* * **NOTE: The resulting [ExecutorCoroutineDispatcher] owns native resources (its thread). * Resources are reclaimed by [ExecutorCoroutineDispatcher.close].** * + * If the resulting dispatcher is [closed][ExecutorCoroutineDispatcher.close] and + * attempt to submit a continuation task is made, + * then the [Job] of the affected task is [cancelled][Job.cancel] and the task is submitted to the + * [Dispatchers.IO], so that the affected coroutine can cleanup its resources and promptly complete. + * * **NOTE: This API will be replaced in the future**. A different API to create thread-limited thread pools * that is based on a shared thread-pool and does not require the resulting dispatcher to be explicitly closed * will be provided, thus avoiding potential thread leaks and also significantly improving performance, due @@ -35,6 +40,11 @@ public fun newSingleThreadContext(name: String): ExecutorCoroutineDispatcher = * **NOTE: The resulting [ExecutorCoroutineDispatcher] owns native resources (its threads). * Resources are reclaimed by [ExecutorCoroutineDispatcher.close].** * + * If the resulting dispatcher is [closed][ExecutorCoroutineDispatcher.close] and + * attempt to submit a continuation task is made, + * then the [Job] of the affected task is [cancelled][Job.cancel] and the task is submitted to the + * [Dispatchers.IO], so that the affected coroutine can cleanup its resources and promptly complete. + * * **NOTE: This API will be replaced in the future**. A different API to create thread-limited thread pools * that is based on a shared thread-pool and does not require the resulting dispatcher to be explicitly closed * will be provided, thus avoiding potential thread leaks and also significantly improving performance, due From f09fc9d5892301fc81243a2ec949246d09c8dc50 Mon Sep 17 00:00:00 2001 From: Roman Elizarov Date: Fri, 15 May 2020 16:00:36 +0300 Subject: [PATCH 09/11] ~ Better tests --- .../jvm/test/ExecutorsTest.kt | 6 ++- .../jvm/test/RejectedExecutionTest.kt | 42 +++++++++++++++---- 2 files changed, 38 insertions(+), 10 deletions(-) diff --git a/kotlinx-coroutines-core/jvm/test/ExecutorsTest.kt b/kotlinx-coroutines-core/jvm/test/ExecutorsTest.kt index 033b9b7bc9..ebf08a03d0 100644 --- a/kotlinx-coroutines-core/jvm/test/ExecutorsTest.kt +++ b/kotlinx-coroutines-core/jvm/test/ExecutorsTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines @@ -29,6 +29,8 @@ class ExecutorsTest : TestBase() { val context = newFixedThreadPoolContext(2, "TestPool") runBlocking(context) { checkThreadName("TestPool") + delay(10) + checkThreadName("TestPool") // should dispatch on the right thread } context.close() } @@ -38,6 +40,8 @@ class ExecutorsTest : TestBase() { val executor = Executors.newSingleThreadExecutor { r -> Thread(r, "TestExecutor") } runBlocking(executor.asCoroutineDispatcher()) { checkThreadName("TestExecutor") + delay(10) + checkThreadName("TestExecutor") // should dispatch on the right thread } executor.shutdown() } diff --git a/kotlinx-coroutines-core/jvm/test/RejectedExecutionTest.kt b/kotlinx-coroutines-core/jvm/test/RejectedExecutionTest.kt index 27eb2afeb1..8e5fea9b36 100644 --- a/kotlinx-coroutines-core/jvm/test/RejectedExecutionTest.kt +++ b/kotlinx-coroutines-core/jvm/test/RejectedExecutionTest.kt @@ -4,6 +4,7 @@ package kotlinx.coroutines +import kotlinx.coroutines.scheduling.* import org.junit.* import org.junit.Test import java.util.concurrent.* @@ -36,7 +37,7 @@ class RejectedExecutionTest : TestBase() { val job = launch(executor.asCoroutineDispatcher(), start = CoroutineStart.ATOMIC) { expect(2) assertEquals(true, coroutineContext[Job]?.isCancelled) - assertNotSame(threadName, Thread.currentThread().name) // should have got dispatched on the DefaultExecutor + assertIoThread() // was rejected on start, but start was atomic } assertEquals(1, executor.submittedTasks) job.join() @@ -60,14 +61,19 @@ class RejectedExecutionTest : TestBase() { expect(1) executor.acceptTasks = 1 // accept one task assertFailsWith { - withContext(executor.asCoroutineDispatcher()) { - expect(2) - withContext(Dispatchers.Default) { - expect(3) + withContext(executor.asCoroutineDispatcher()) { + expect(2) + assertExecutorThread() + try { + withContext(Dispatchers.Default) { + expect(3) + } + // cancelled on resume back + } finally { + assertIoThread() + } + expectUnreached() } - // cancelled on resume back - expectUnreached() - } } assertEquals(2, executor.submittedTasks) finish(4) @@ -80,7 +86,13 @@ class RejectedExecutionTest : TestBase() { assertFailsWith { withContext(executor.asCoroutineDispatcher()) { expect(2) - delay(10) // cancelled + assertExecutorThread() + try { + delay(10) // cancelled + } finally { + // Since it was cancelled on attempt to delay, it still stays on the same thread + assertExecutorThread() + } expectUnreached() } } @@ -95,6 +107,7 @@ class RejectedExecutionTest : TestBase() { assertFailsWith { withContext(executor.asCoroutineDispatcher()) { expect(2) + assertExecutorThread() withTimeout(1000) { expect(3) // atomic entry into the block (legacy behavior, it seem to be Ok with way) assertEquals(true, coroutineContext[Job]?.isCancelled) // but the job is already cancelled @@ -116,4 +129,15 @@ class RejectedExecutionTest : TestBase() { return super.schedule(command, delay, unit) } } + + private fun assertExecutorThread() { + val thread = Thread.currentThread() + if (!thread.name.startsWith(threadName)) error("Not an executor thread: $thread") + } + + private fun assertIoThread() { + val thread = Thread.currentThread() + if (thread !is CoroutineScheduler.Worker) error("Not a thread from Dispatchers.IO: $thread") + assertEquals(CoroutineScheduler.WorkerState.BLOCKING, thread.state) + } } \ No newline at end of file From fda826c816e7a569cc376f52f2a91ec669b61d2e Mon Sep 17 00:00:00 2001 From: Roman Elizarov Date: Mon, 25 May 2020 14:13:06 +0300 Subject: [PATCH 10/11] ~ Better checks in testRejectOnResumeInContext --- .../jvm/test/RejectedExecutionTest.kt | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/kotlinx-coroutines-core/jvm/test/RejectedExecutionTest.kt b/kotlinx-coroutines-core/jvm/test/RejectedExecutionTest.kt index 8e5fea9b36..67ed740767 100644 --- a/kotlinx-coroutines-core/jvm/test/RejectedExecutionTest.kt +++ b/kotlinx-coroutines-core/jvm/test/RejectedExecutionTest.kt @@ -67,16 +67,18 @@ class RejectedExecutionTest : TestBase() { try { withContext(Dispatchers.Default) { expect(3) + assertDefaultDispatcherThread() } // cancelled on resume back } finally { + expect(4) assertIoThread() } expectUnreached() } } assertEquals(2, executor.submittedTasks) - finish(4) + finish(5) } @Test @@ -135,6 +137,12 @@ class RejectedExecutionTest : TestBase() { if (!thread.name.startsWith(threadName)) error("Not an executor thread: $thread") } + private fun assertDefaultDispatcherThread() { + val thread = Thread.currentThread() + if (thread !is CoroutineScheduler.Worker) error("Not a thread from Dispatchers.Default: $thread") + assertEquals(CoroutineScheduler.WorkerState.CPU_ACQUIRED, thread.state) + } + private fun assertIoThread() { val thread = Thread.currentThread() if (thread !is CoroutineScheduler.Worker) error("Not a thread from Dispatchers.IO: $thread") From f86636f7b7eeda8c576192c99690daa0e74915b8 Mon Sep 17 00:00:00 2001 From: Roman Elizarov Date: Mon, 25 May 2020 15:38:12 +0300 Subject: [PATCH 11/11] ~ Fixed race in testRejectOnResumeInContext --- .../jvm/test/RejectedExecutionTest.kt | 23 ++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/kotlinx-coroutines-core/jvm/test/RejectedExecutionTest.kt b/kotlinx-coroutines-core/jvm/test/RejectedExecutionTest.kt index 67ed740767..a6f4dd6b18 100644 --- a/kotlinx-coroutines-core/jvm/test/RejectedExecutionTest.kt +++ b/kotlinx-coroutines-core/jvm/test/RejectedExecutionTest.kt @@ -4,6 +4,7 @@ package kotlinx.coroutines +import kotlinx.coroutines.flow.* import kotlinx.coroutines.scheduling.* import org.junit.* import org.junit.Test @@ -68,17 +69,22 @@ class RejectedExecutionTest : TestBase() { withContext(Dispatchers.Default) { expect(3) assertDefaultDispatcherThread() + // We have to wait until caller executor thread had already suspended (if not running task), + // so that we resume back to it a new task is posted + executor.awaitNotRunningTask() + expect(4) + assertDefaultDispatcherThread() } // cancelled on resume back } finally { - expect(4) + expect(5) assertIoThread() } expectUnreached() } } assertEquals(2, executor.submittedTasks) - finish(5) + finish(6) } @Test @@ -124,12 +130,23 @@ class RejectedExecutionTest : TestBase() { private inner class RejectingExecutor : ScheduledThreadPoolExecutor(1, { r -> Thread(r, threadName) }) { var acceptTasks = 0 var submittedTasks = 0 + val runningTask = MutableStateFlow(false) override fun schedule(command: Runnable, delay: Long, unit: TimeUnit): ScheduledFuture<*> { submittedTasks++ if (submittedTasks > acceptTasks) throw RejectedExecutionException() - return super.schedule(command, delay, unit) + val wrapper = Runnable { + runningTask.value = true + try { + command.run() + } finally { + runningTask.value = false + } + } + return super.schedule(wrapper, delay, unit) } + + suspend fun awaitNotRunningTask() = runningTask.first { !it } } private fun assertExecutorThread() {