From d28c635cda470b9c312909c4525c5b0f3a59c554 Mon Sep 17 00:00:00 2001 From: Roman Elizarov Date: Tue, 12 May 2020 11:56:39 +0300 Subject: [PATCH] Cancel current Job on RejectedExecutionException When the Executor that was used with Executor.asCoroutineDispatcher() extension rejects submitted task, it means that it had reached its capacity and so the executing current Job should be cancelled to terminate it as soon as possible. This way RejectedExecutionException works as a rate-limiter just like it serves this purpose in executor-based Java code. Fixes #2003 --- .../api/kotlinx-coroutines-core.api | 4 +- kotlinx-coroutines-core/common/src/Delay.kt | 4 +- kotlinx-coroutines-core/common/src/Timeout.kt | 2 +- .../common/src/selects/Select.kt | 2 +- .../common/test/flow/VirtualTime.kt | 4 +- kotlinx-coroutines-core/jvm/src/CommonPool.kt | 2 + .../jvm/src/DefaultExecutor.kt | 3 +- kotlinx-coroutines-core/jvm/src/Executors.kt | 22 ++-- .../jvm/src/internal/MainDispatchers.kt | 9 +- .../jvm/src/scheduling/Dispatcher.kt | 8 +- .../jvm/src/test_/TestCoroutineContext.kt | 2 +- .../jvm/test/RejectedExecutionTest.kt | 119 ++++++++++++++++++ .../native/src/CoroutineContext.kt | 4 +- .../api/kotlinx-coroutines-test.api | 2 +- .../src/TestCoroutineDispatcher.kt | 2 +- .../src/internal/MainTestDispatcher.kt | 4 +- .../api/kotlinx-coroutines-reactor.api | 2 +- .../src/Scheduler.kt | 2 +- .../api/kotlinx-coroutines-rx2.api | 2 +- .../kotlinx-coroutines-rx2/src/RxScheduler.kt | 2 +- .../api/kotlinx-coroutines-rx3.api | 2 +- .../kotlinx-coroutines-rx3/src/RxScheduler.kt | 2 +- .../api/kotlinx-coroutines-android.api | 2 +- .../src/HandlerDispatcher.kt | 2 +- .../api/kotlinx-coroutines-javafx.api | 2 +- .../src/JavaFxDispatcher.kt | 2 +- .../api/kotlinx-coroutines-swing.api | 2 +- .../src/SwingDispatcher.kt | 2 +- 28 files changed, 175 insertions(+), 42 deletions(-) create mode 100644 kotlinx-coroutines-core/jvm/test/RejectedExecutionTest.kt diff --git a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api index 36cbdb6960..85b9c5b0e2 100644 --- a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api +++ b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api @@ -257,13 +257,13 @@ public final class kotlinx/coroutines/Deferred$DefaultImpls { public abstract interface class kotlinx/coroutines/Delay { public abstract fun delay (JLkotlin/coroutines/Continuation;)Ljava/lang/Object; - public abstract fun invokeOnTimeout (JLjava/lang/Runnable;)Lkotlinx/coroutines/DisposableHandle; + public abstract fun invokeOnTimeout (JLjava/lang/Runnable;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/DisposableHandle; public abstract fun scheduleResumeAfterDelay (JLkotlinx/coroutines/CancellableContinuation;)V } public final class kotlinx/coroutines/Delay$DefaultImpls { public static fun delay (Lkotlinx/coroutines/Delay;JLkotlin/coroutines/Continuation;)Ljava/lang/Object; - public static fun invokeOnTimeout (Lkotlinx/coroutines/Delay;JLjava/lang/Runnable;)Lkotlinx/coroutines/DisposableHandle; + public static fun invokeOnTimeout (Lkotlinx/coroutines/Delay;JLjava/lang/Runnable;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/DisposableHandle; } public final class kotlinx/coroutines/DelayKt { diff --git a/kotlinx-coroutines-core/common/src/Delay.kt b/kotlinx-coroutines-core/common/src/Delay.kt index ab80912269..70d94cc05f 100644 --- a/kotlinx-coroutines-core/common/src/Delay.kt +++ b/kotlinx-coroutines-core/common/src/Delay.kt @@ -54,8 +54,8 @@ public interface Delay { * * This implementation uses a built-in single-threaded scheduled executor service. */ - public fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle = - DefaultDelay.invokeOnTimeout(timeMillis, block) + public fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle = + DefaultDelay.invokeOnTimeout(timeMillis, block, context) } /** diff --git a/kotlinx-coroutines-core/common/src/Timeout.kt b/kotlinx-coroutines-core/common/src/Timeout.kt index c8e4455c92..103d236937 100644 --- a/kotlinx-coroutines-core/common/src/Timeout.kt +++ b/kotlinx-coroutines-core/common/src/Timeout.kt @@ -114,7 +114,7 @@ private fun setupTimeout( // schedule cancellation of this coroutine on time val cont = coroutine.uCont val context = cont.context - coroutine.disposeOnCompletion(context.delay.invokeOnTimeout(coroutine.time, coroutine)) + coroutine.disposeOnCompletion(context.delay.invokeOnTimeout(coroutine.time, coroutine, coroutine.context)) // restart the block using a new coroutine with a new job, // however, start it undispatched, because we already are in the proper context return coroutine.startUndispatchedOrReturnIgnoreTimeout(coroutine, block) diff --git a/kotlinx-coroutines-core/common/src/selects/Select.kt b/kotlinx-coroutines-core/common/src/selects/Select.kt index e744a0c724..628d6f7aa5 100644 --- a/kotlinx-coroutines-core/common/src/selects/Select.kt +++ b/kotlinx-coroutines-core/common/src/selects/Select.kt @@ -655,7 +655,7 @@ internal class SelectBuilderImpl( if (trySelect()) block.startCoroutineCancellable(completion) // shall be cancellable while waits for dispatch } - disposeOnSelect(context.delay.invokeOnTimeout(timeMillis, action)) + disposeOnSelect(context.delay.invokeOnTimeout(timeMillis, action, context)) } private class DisposeNode( diff --git a/kotlinx-coroutines-core/common/test/flow/VirtualTime.kt b/kotlinx-coroutines-core/common/test/flow/VirtualTime.kt index 9b257d933e..a607a4722a 100644 --- a/kotlinx-coroutines-core/common/test/flow/VirtualTime.kt +++ b/kotlinx-coroutines-core/common/test/flow/VirtualTime.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines @@ -50,7 +50,7 @@ private class VirtualTimeDispatcher(enclosingScope: CoroutineScope) : CoroutineD @ExperimentalCoroutinesApi override fun isDispatchNeeded(context: CoroutineContext): Boolean = originalDispatcher.isDispatchNeeded(context) - override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle { + override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle { val task = TimedTask(block, currentTime + timeMillis) heap += task return task diff --git a/kotlinx-coroutines-core/jvm/src/CommonPool.kt b/kotlinx-coroutines-core/jvm/src/CommonPool.kt index 60f30cfe14..2203313120 100644 --- a/kotlinx-coroutines-core/jvm/src/CommonPool.kt +++ b/kotlinx-coroutines-core/jvm/src/CommonPool.kt @@ -103,6 +103,8 @@ internal object CommonPool : ExecutorCoroutineDispatcher() { (pool ?: getOrCreatePoolSync()).execute(wrapTask(block)) } catch (e: RejectedExecutionException) { unTrackTask() + // CommonPool only rejects execution when it is being closed and this behavior is reserved + // for testing purposes, so we don't have to worry about cancelling the affected Job here. DefaultExecutor.enqueue(block) } } diff --git a/kotlinx-coroutines-core/jvm/src/DefaultExecutor.kt b/kotlinx-coroutines-core/jvm/src/DefaultExecutor.kt index ed84f55e74..787cbf9c44 100644 --- a/kotlinx-coroutines-core/jvm/src/DefaultExecutor.kt +++ b/kotlinx-coroutines-core/jvm/src/DefaultExecutor.kt @@ -5,6 +5,7 @@ package kotlinx.coroutines import java.util.concurrent.* +import kotlin.coroutines.* internal actual val DefaultDelay: Delay = DefaultExecutor @@ -54,7 +55,7 @@ internal actual object DefaultExecutor : EventLoopImplBase(), Runnable { * Livelock is possible only if `runBlocking` is called on internal default executed (which is used by default [delay]), * but it's not exposed as public API. */ - override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle = + override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle = scheduleInvokeOnTimeout(timeMillis, block) override fun run() { diff --git a/kotlinx-coroutines-core/jvm/src/Executors.kt b/kotlinx-coroutines-core/jvm/src/Executors.kt index 8c36bfa14f..840cc46b2b 100644 --- a/kotlinx-coroutines-core/jvm/src/Executors.kt +++ b/kotlinx-coroutines-core/jvm/src/Executors.kt @@ -82,6 +82,7 @@ internal abstract class ExecutorCoroutineDispatcherBase : ExecutorCoroutineDispa executor.execute(wrapTask(block)) } catch (e: RejectedExecutionException) { unTrackTask() + cancelJobOnRejection(context, e) DefaultExecutor.enqueue(block) } } @@ -93,7 +94,7 @@ internal abstract class ExecutorCoroutineDispatcherBase : ExecutorCoroutineDispa */ override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation) { val future = if (removesFutureOnCancellation) { - scheduleBlock(ResumeUndispatchedRunnable(this, continuation), timeMillis, TimeUnit.MILLISECONDS) + scheduleBlock(ResumeUndispatchedRunnable(this, continuation), continuation.context, timeMillis) } else { null } @@ -106,24 +107,31 @@ internal abstract class ExecutorCoroutineDispatcherBase : ExecutorCoroutineDispa DefaultExecutor.scheduleResumeAfterDelay(timeMillis, continuation) } - override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle { + override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle { val future = if (removesFutureOnCancellation) { - scheduleBlock(block, timeMillis, TimeUnit.MILLISECONDS) + scheduleBlock(block, context, timeMillis) } else { null } - - return if (future != null ) DisposableFutureHandle(future) else DefaultExecutor.invokeOnTimeout(timeMillis, block) + return when { + future != null -> DisposableFutureHandle(future) + else -> DefaultExecutor.invokeOnTimeout(timeMillis, block, context) + } } - private fun scheduleBlock(block: Runnable, time: Long, unit: TimeUnit): ScheduledFuture<*>? { + private fun scheduleBlock(block: Runnable, context: CoroutineContext, timeMillis: Long): ScheduledFuture<*>? { return try { - (executor as? ScheduledExecutorService)?.schedule(block, time, unit) + (executor as? ScheduledExecutorService)?.schedule(block, timeMillis, TimeUnit.MILLISECONDS) } catch (e: RejectedExecutionException) { + cancelJobOnRejection(context, e) null } } + private fun cancelJobOnRejection(context: CoroutineContext, exception: RejectedExecutionException) { + context[Job]?.cancel(CancellationException("The task was rejected", exception)) + } + override fun close() { (executor as? ExecutorService)?.shutdown() } diff --git a/kotlinx-coroutines-core/jvm/src/internal/MainDispatchers.kt b/kotlinx-coroutines-core/jvm/src/internal/MainDispatchers.kt index ddfcdbb142..5b2b9ff68c 100644 --- a/kotlinx-coroutines-core/jvm/src/internal/MainDispatchers.kt +++ b/kotlinx-coroutines-core/jvm/src/internal/MainDispatchers.kt @@ -87,17 +87,14 @@ private class MissingMainCoroutineDispatcher( override val immediate: MainCoroutineDispatcher get() = this - override fun isDispatchNeeded(context: CoroutineContext): Boolean { + override fun isDispatchNeeded(context: CoroutineContext): Boolean = missing() - } - override suspend fun delay(time: Long) { + override suspend fun delay(time: Long) = missing() - } - override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle { + override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle = missing() - } override fun dispatch(context: CoroutineContext, block: Runnable) = missing() diff --git a/kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt b/kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt index e0890eff66..202c6e1d06 100644 --- a/kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt +++ b/kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt @@ -65,6 +65,8 @@ public open class ExperimentalCoroutineDispatcher( try { coroutineScheduler.dispatch(block) } catch (e: RejectedExecutionException) { + // CoroutineScheduler only rejects execution when it is being closed and this behavior is reserved + // for testing purposes, so we don't have to worry about cancelling the affected Job here. DefaultExecutor.dispatch(context, block) } @@ -72,6 +74,8 @@ public open class ExperimentalCoroutineDispatcher( try { coroutineScheduler.dispatch(block, tailDispatch = true) } catch (e: RejectedExecutionException) { + // CoroutineScheduler only rejects execution when it is being closed and this behavior is reserved + // for testing purposes, so we don't have to worry about cancelling the affected Job here. DefaultExecutor.dispatchYield(context, block) } @@ -110,7 +114,9 @@ public open class ExperimentalCoroutineDispatcher( try { coroutineScheduler.dispatch(block, context, tailDispatch) } catch (e: RejectedExecutionException) { - // Context shouldn't be lost here to properly invoke before/after task + // CoroutineScheduler only rejects execution when it is being closed and this behavior is reserved + // for testing purposes, so we don't have to worry about cancelling the affected Job here. + // TaskContext shouldn't be lost here to properly invoke before/after task DefaultExecutor.enqueue(coroutineScheduler.createTask(block, context)) } } diff --git a/kotlinx-coroutines-core/jvm/src/test_/TestCoroutineContext.kt b/kotlinx-coroutines-core/jvm/src/test_/TestCoroutineContext.kt index e7c8b6b671..649c95375d 100644 --- a/kotlinx-coroutines-core/jvm/src/test_/TestCoroutineContext.kt +++ b/kotlinx-coroutines-core/jvm/src/test_/TestCoroutineContext.kt @@ -230,7 +230,7 @@ public class TestCoroutineContext(private val name: String? = null) : CoroutineC }, timeMillis) } - override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle { + override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle { val node = postDelayed(block, timeMillis) return object : DisposableHandle { override fun dispose() { diff --git a/kotlinx-coroutines-core/jvm/test/RejectedExecutionTest.kt b/kotlinx-coroutines-core/jvm/test/RejectedExecutionTest.kt new file mode 100644 index 0000000000..27eb2afeb1 --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/RejectedExecutionTest.kt @@ -0,0 +1,119 @@ +/* + * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines + +import org.junit.* +import org.junit.Test +import java.util.concurrent.* +import kotlin.test.* + +class RejectedExecutionTest : TestBase() { + private val threadName = "RejectedExecutionTest" + private val executor = RejectingExecutor() + + @After + fun tearDown() { + executor.shutdown() + executor.awaitTermination(10, TimeUnit.SECONDS) + } + + @Test + fun testRejectOnLaunch() = runTest { + expect(1) + val job = launch(executor.asCoroutineDispatcher()) { + expectUnreached() + } + assertEquals(1, executor.submittedTasks) + assertTrue(job.isCancelled) + finish(2) + } + + @Test + fun testRejectOnLaunchAtomic() = runTest { + expect(1) + val job = launch(executor.asCoroutineDispatcher(), start = CoroutineStart.ATOMIC) { + expect(2) + assertEquals(true, coroutineContext[Job]?.isCancelled) + assertNotSame(threadName, Thread.currentThread().name) // should have got dispatched on the DefaultExecutor + } + assertEquals(1, executor.submittedTasks) + job.join() + finish(3) + } + + @Test + fun testRejectOnWithContext() = runTest { + expect(1) + assertFailsWith { + withContext(executor.asCoroutineDispatcher()) { + expectUnreached() + } + } + assertEquals(1, executor.submittedTasks) + finish(2) + } + + @Test + fun testRejectOnResumeInContext() = runTest { + expect(1) + executor.acceptTasks = 1 // accept one task + assertFailsWith { + withContext(executor.asCoroutineDispatcher()) { + expect(2) + withContext(Dispatchers.Default) { + expect(3) + } + // cancelled on resume back + expectUnreached() + } + } + assertEquals(2, executor.submittedTasks) + finish(4) + } + + @Test + fun testRejectOnDelay() = runTest { + expect(1) + executor.acceptTasks = 1 // accept one task + assertFailsWith { + withContext(executor.asCoroutineDispatcher()) { + expect(2) + delay(10) // cancelled + expectUnreached() + } + } + assertEquals(2, executor.submittedTasks) + finish(3) + } + + @Test + fun testRejectWithTimeout() = runTest { + expect(1) + executor.acceptTasks = 1 // accept one task + assertFailsWith { + withContext(executor.asCoroutineDispatcher()) { + expect(2) + withTimeout(1000) { + expect(3) // atomic entry into the block (legacy behavior, it seem to be Ok with way) + assertEquals(true, coroutineContext[Job]?.isCancelled) // but the job is already cancelled + } + expectUnreached() + } + } + assertEquals(2, executor.submittedTasks) + finish(4) + } + + private inner class RejectingExecutor : ScheduledThreadPoolExecutor(1, { r -> Thread(r, threadName) }) { + var acceptTasks = 0 + var submittedTasks = 0 + + override fun schedule(command: Runnable, delay: Long, unit: TimeUnit): ScheduledFuture<*> { + submittedTasks++ + if (submittedTasks > acceptTasks) throw RejectedExecutionException() + return super.schedule(command, delay, unit) + } + } +} \ No newline at end of file diff --git a/kotlinx-coroutines-core/native/src/CoroutineContext.kt b/kotlinx-coroutines-core/native/src/CoroutineContext.kt index bcc7f48963..4ec1289ee7 100644 --- a/kotlinx-coroutines-core/native/src/CoroutineContext.kt +++ b/kotlinx-coroutines-core/native/src/CoroutineContext.kt @@ -16,8 +16,8 @@ internal actual object DefaultExecutor : CoroutineDispatcher(), Delay { takeEventLoop().dispatch(context, block) override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation) = takeEventLoop().scheduleResumeAfterDelay(timeMillis, continuation) - override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle = - takeEventLoop().invokeOnTimeout(timeMillis, block) + override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle = + takeEventLoop().invokeOnTimeout(timeMillis, block, context) actual fun enqueue(task: Runnable): Unit = loopWasShutDown() } diff --git a/kotlinx-coroutines-test/api/kotlinx-coroutines-test.api b/kotlinx-coroutines-test/api/kotlinx-coroutines-test.api index e3b1f73e4f..c99ec5cbf1 100644 --- a/kotlinx-coroutines-test/api/kotlinx-coroutines-test.api +++ b/kotlinx-coroutines-test/api/kotlinx-coroutines-test.api @@ -25,7 +25,7 @@ public final class kotlinx/coroutines/test/TestCoroutineDispatcher : kotlinx/cor public fun dispatch (Lkotlin/coroutines/CoroutineContext;Ljava/lang/Runnable;)V public fun dispatchYield (Lkotlin/coroutines/CoroutineContext;Ljava/lang/Runnable;)V public fun getCurrentTime ()J - public fun invokeOnTimeout (JLjava/lang/Runnable;)Lkotlinx/coroutines/DisposableHandle; + public fun invokeOnTimeout (JLjava/lang/Runnable;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/DisposableHandle; public fun pauseDispatcher ()V public fun pauseDispatcher (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public fun resumeDispatcher ()V diff --git a/kotlinx-coroutines-test/src/TestCoroutineDispatcher.kt b/kotlinx-coroutines-test/src/TestCoroutineDispatcher.kt index 4706d627ef..cad2636f97 100644 --- a/kotlinx-coroutines-test/src/TestCoroutineDispatcher.kt +++ b/kotlinx-coroutines-test/src/TestCoroutineDispatcher.kt @@ -65,7 +65,7 @@ public class TestCoroutineDispatcher: CoroutineDispatcher(), Delay, DelayControl } /** @suppress */ - override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle { + override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle { val node = postDelayed(block, timeMillis) return object : DisposableHandle { override fun dispose() { diff --git a/kotlinx-coroutines-test/src/internal/MainTestDispatcher.kt b/kotlinx-coroutines-test/src/internal/MainTestDispatcher.kt index c18e4108bb..baa1aa5fd2 100644 --- a/kotlinx-coroutines-test/src/internal/MainTestDispatcher.kt +++ b/kotlinx-coroutines-test/src/internal/MainTestDispatcher.kt @@ -46,8 +46,8 @@ internal class TestMainDispatcher(private val mainFactory: MainDispatcherFactory delay.delay(time) } - override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle { - return delay.invokeOnTimeout(timeMillis, block) + override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle { + return delay.invokeOnTimeout(timeMillis, block, context) } public fun setDispatcher(dispatcher: CoroutineDispatcher) { diff --git a/reactive/kotlinx-coroutines-reactor/api/kotlinx-coroutines-reactor.api b/reactive/kotlinx-coroutines-reactor/api/kotlinx-coroutines-reactor.api index 3b5c6b9522..b46fe338e5 100644 --- a/reactive/kotlinx-coroutines-reactor/api/kotlinx-coroutines-reactor.api +++ b/reactive/kotlinx-coroutines-reactor/api/kotlinx-coroutines-reactor.api @@ -49,7 +49,7 @@ public final class kotlinx/coroutines/reactor/SchedulerCoroutineDispatcher : kot public fun equals (Ljava/lang/Object;)Z public final fun getScheduler ()Lreactor/core/scheduler/Scheduler; public fun hashCode ()I - public fun invokeOnTimeout (JLjava/lang/Runnable;)Lkotlinx/coroutines/DisposableHandle; + public fun invokeOnTimeout (JLjava/lang/Runnable;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/DisposableHandle; public fun scheduleResumeAfterDelay (JLkotlinx/coroutines/CancellableContinuation;)V public fun toString ()Ljava/lang/String; } diff --git a/reactive/kotlinx-coroutines-reactor/src/Scheduler.kt b/reactive/kotlinx-coroutines-reactor/src/Scheduler.kt index e176c07bb9..4fb5514322 100644 --- a/reactive/kotlinx-coroutines-reactor/src/Scheduler.kt +++ b/reactive/kotlinx-coroutines-reactor/src/Scheduler.kt @@ -39,7 +39,7 @@ public class SchedulerCoroutineDispatcher( } /** @suppress */ - override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle = + override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle = scheduler.schedule(block, timeMillis, TimeUnit.MILLISECONDS).asDisposableHandle() /** @suppress */ diff --git a/reactive/kotlinx-coroutines-rx2/api/kotlinx-coroutines-rx2.api b/reactive/kotlinx-coroutines-rx2/api/kotlinx-coroutines-rx2.api index 06ddb68e9c..84c337446c 100644 --- a/reactive/kotlinx-coroutines-rx2/api/kotlinx-coroutines-rx2.api +++ b/reactive/kotlinx-coroutines-rx2/api/kotlinx-coroutines-rx2.api @@ -80,7 +80,7 @@ public final class kotlinx/coroutines/rx2/SchedulerCoroutineDispatcher : kotlinx public fun equals (Ljava/lang/Object;)Z public final fun getScheduler ()Lio/reactivex/Scheduler; public fun hashCode ()I - public fun invokeOnTimeout (JLjava/lang/Runnable;)Lkotlinx/coroutines/DisposableHandle; + public fun invokeOnTimeout (JLjava/lang/Runnable;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/DisposableHandle; public fun scheduleResumeAfterDelay (JLkotlinx/coroutines/CancellableContinuation;)V public fun toString ()Ljava/lang/String; } diff --git a/reactive/kotlinx-coroutines-rx2/src/RxScheduler.kt b/reactive/kotlinx-coroutines-rx2/src/RxScheduler.kt index 3ddb67649e..9952eb91a0 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxScheduler.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxScheduler.kt @@ -38,7 +38,7 @@ public class SchedulerCoroutineDispatcher( } /** @suppress */ - override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle { + override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle { val disposable = scheduler.scheduleDirect(block, timeMillis, TimeUnit.MILLISECONDS) return DisposableHandle { disposable.dispose() } } diff --git a/reactive/kotlinx-coroutines-rx3/api/kotlinx-coroutines-rx3.api b/reactive/kotlinx-coroutines-rx3/api/kotlinx-coroutines-rx3.api index 4f15eda7d4..5ab6d7336d 100644 --- a/reactive/kotlinx-coroutines-rx3/api/kotlinx-coroutines-rx3.api +++ b/reactive/kotlinx-coroutines-rx3/api/kotlinx-coroutines-rx3.api @@ -67,7 +67,7 @@ public final class kotlinx/coroutines/rx3/SchedulerCoroutineDispatcher : kotlinx public fun equals (Ljava/lang/Object;)Z public final fun getScheduler ()Lio/reactivex/rxjava3/core/Scheduler; public fun hashCode ()I - public fun invokeOnTimeout (JLjava/lang/Runnable;)Lkotlinx/coroutines/DisposableHandle; + public fun invokeOnTimeout (JLjava/lang/Runnable;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/DisposableHandle; public fun scheduleResumeAfterDelay (JLkotlinx/coroutines/CancellableContinuation;)V public fun toString ()Ljava/lang/String; } diff --git a/reactive/kotlinx-coroutines-rx3/src/RxScheduler.kt b/reactive/kotlinx-coroutines-rx3/src/RxScheduler.kt index 6e91aeea85..a426aea6ba 100644 --- a/reactive/kotlinx-coroutines-rx3/src/RxScheduler.kt +++ b/reactive/kotlinx-coroutines-rx3/src/RxScheduler.kt @@ -38,7 +38,7 @@ public class SchedulerCoroutineDispatcher( } /** @suppress */ - override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle { + override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle { val disposable = scheduler.scheduleDirect(block, timeMillis, TimeUnit.MILLISECONDS) return DisposableHandle { disposable.dispose() } } diff --git a/ui/kotlinx-coroutines-android/api/kotlinx-coroutines-android.api b/ui/kotlinx-coroutines-android/api/kotlinx-coroutines-android.api index b97d8462c3..090c14e09c 100644 --- a/ui/kotlinx-coroutines-android/api/kotlinx-coroutines-android.api +++ b/ui/kotlinx-coroutines-android/api/kotlinx-coroutines-android.api @@ -1,7 +1,7 @@ public abstract class kotlinx/coroutines/android/HandlerDispatcher : kotlinx/coroutines/MainCoroutineDispatcher, kotlinx/coroutines/Delay { public fun delay (JLkotlin/coroutines/Continuation;)Ljava/lang/Object; public abstract fun getImmediate ()Lkotlinx/coroutines/android/HandlerDispatcher; - public fun invokeOnTimeout (JLjava/lang/Runnable;)Lkotlinx/coroutines/DisposableHandle; + public fun invokeOnTimeout (JLjava/lang/Runnable;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/DisposableHandle; } public final class kotlinx/coroutines/android/HandlerDispatcherKt { diff --git a/ui/kotlinx-coroutines-android/src/HandlerDispatcher.kt b/ui/kotlinx-coroutines-android/src/HandlerDispatcher.kt index 1693409875..af79da7c97 100644 --- a/ui/kotlinx-coroutines-android/src/HandlerDispatcher.kt +++ b/ui/kotlinx-coroutines-android/src/HandlerDispatcher.kt @@ -140,7 +140,7 @@ internal class HandlerContext private constructor( continuation.invokeOnCancellation { handler.removeCallbacks(block) } } - override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle { + override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle { handler.postDelayed(block, timeMillis.coerceAtMost(MAX_DELAY)) return object : DisposableHandle { override fun dispose() { diff --git a/ui/kotlinx-coroutines-javafx/api/kotlinx-coroutines-javafx.api b/ui/kotlinx-coroutines-javafx/api/kotlinx-coroutines-javafx.api index 620e904612..e2c3b8f326 100644 --- a/ui/kotlinx-coroutines-javafx/api/kotlinx-coroutines-javafx.api +++ b/ui/kotlinx-coroutines-javafx/api/kotlinx-coroutines-javafx.api @@ -5,7 +5,7 @@ public final class kotlinx/coroutines/javafx/JavaFxConvertKt { public abstract class kotlinx/coroutines/javafx/JavaFxDispatcher : kotlinx/coroutines/MainCoroutineDispatcher, kotlinx/coroutines/Delay { public fun delay (JLkotlin/coroutines/Continuation;)Ljava/lang/Object; public fun dispatch (Lkotlin/coroutines/CoroutineContext;Ljava/lang/Runnable;)V - public fun invokeOnTimeout (JLjava/lang/Runnable;)Lkotlinx/coroutines/DisposableHandle; + public fun invokeOnTimeout (JLjava/lang/Runnable;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/DisposableHandle; public fun scheduleResumeAfterDelay (JLkotlinx/coroutines/CancellableContinuation;)V } diff --git a/ui/kotlinx-coroutines-javafx/src/JavaFxDispatcher.kt b/ui/kotlinx-coroutines-javafx/src/JavaFxDispatcher.kt index a13a68368e..c3069d636f 100644 --- a/ui/kotlinx-coroutines-javafx/src/JavaFxDispatcher.kt +++ b/ui/kotlinx-coroutines-javafx/src/JavaFxDispatcher.kt @@ -42,7 +42,7 @@ public sealed class JavaFxDispatcher : MainCoroutineDispatcher(), Delay { } /** @suppress */ - override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle { + override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle { val timeline = schedule(timeMillis, TimeUnit.MILLISECONDS, EventHandler { block.run() }) diff --git a/ui/kotlinx-coroutines-swing/api/kotlinx-coroutines-swing.api b/ui/kotlinx-coroutines-swing/api/kotlinx-coroutines-swing.api index 09556e807f..d33191fd96 100644 --- a/ui/kotlinx-coroutines-swing/api/kotlinx-coroutines-swing.api +++ b/ui/kotlinx-coroutines-swing/api/kotlinx-coroutines-swing.api @@ -1,7 +1,7 @@ public abstract class kotlinx/coroutines/swing/SwingDispatcher : kotlinx/coroutines/MainCoroutineDispatcher, kotlinx/coroutines/Delay { public fun delay (JLkotlin/coroutines/Continuation;)Ljava/lang/Object; public fun dispatch (Lkotlin/coroutines/CoroutineContext;Ljava/lang/Runnable;)V - public fun invokeOnTimeout (JLjava/lang/Runnable;)Lkotlinx/coroutines/DisposableHandle; + public fun invokeOnTimeout (JLjava/lang/Runnable;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/DisposableHandle; public fun scheduleResumeAfterDelay (JLkotlinx/coroutines/CancellableContinuation;)V } diff --git a/ui/kotlinx-coroutines-swing/src/SwingDispatcher.kt b/ui/kotlinx-coroutines-swing/src/SwingDispatcher.kt index 77f109df91..054ed1f60e 100644 --- a/ui/kotlinx-coroutines-swing/src/SwingDispatcher.kt +++ b/ui/kotlinx-coroutines-swing/src/SwingDispatcher.kt @@ -36,7 +36,7 @@ public sealed class SwingDispatcher : MainCoroutineDispatcher(), Delay { } /** @suppress */ - override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle { + override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle { val timer = schedule(timeMillis, TimeUnit.MILLISECONDS, ActionListener { block.run() })