From 5cbd971be389aa0feb8f3f46425f3ab4d590d6f0 Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Mon, 28 Jun 2021 11:59:01 +0300 Subject: [PATCH] =?UTF-8?q?Cancel=20dispatched=20coroutine=20on=20Dispatch?= =?UTF-8?q?ers.IO=20when=20the=20underlying=20Han=E2=80=A6=20(#2784)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Cancel dispatched coroutine on Dispatchers.IO when the underlying Handler is closed in Handler.asCoroutineDispatcher() Fixes #2778 --- .../src/HandlerDispatcher.kt | 38 +++++++---- .../test/DisabledHandlerTest.kt | 64 +++++++++++++++++++ .../test/HandlerDispatcherTest.kt | 10 +-- 3 files changed, 96 insertions(+), 16 deletions(-) create mode 100644 ui/kotlinx-coroutines-android/test/DisabledHandlerTest.kt diff --git a/ui/kotlinx-coroutines-android/src/HandlerDispatcher.kt b/ui/kotlinx-coroutines-android/src/HandlerDispatcher.kt index d693e2bc25..b1ec8047dd 100644 --- a/ui/kotlinx-coroutines-android/src/HandlerDispatcher.kt +++ b/ui/kotlinx-coroutines-android/src/HandlerDispatcher.kt @@ -7,8 +7,8 @@ package kotlinx.coroutines.android import android.os.* -import androidx.annotation.* import android.view.* +import androidx.annotation.* import kotlinx.coroutines.* import kotlinx.coroutines.internal.* import java.lang.reflect.* @@ -54,15 +54,22 @@ internal class AndroidDispatcherFactory : MainDispatcherFactory { override fun createDispatcher(allFactories: List) = HandlerContext(Looper.getMainLooper().asHandler(async = true)) - override fun hintOnError(): String? = "For tests Dispatchers.setMain from kotlinx-coroutines-test module can be used" + override fun hintOnError(): String = "For tests Dispatchers.setMain from kotlinx-coroutines-test module can be used" override val loadPriority: Int get() = Int.MAX_VALUE / 2 } /** - * Represents an arbitrary [Handler] as a implementation of [CoroutineDispatcher] + * Represents an arbitrary [Handler] as an implementation of [CoroutineDispatcher] * with an optional [name] for nicer debugging + * + * ## Rejected execution + * + * If the underlying handler is closed and its message-scheduling methods start to return `false` on + * an attempt to submit a continuation task to the resulting dispatcher, + * then the [Job] of the affected task is [cancelled][Job.cancel] and the task is submitted to the + * [Dispatchers.IO], so that the affected coroutine can cleanup its resources and promptly complete. */ @JvmName("from") // this is for a nice Java API, see issue #255 @JvmOverloads @@ -129,24 +136,33 @@ internal class HandlerContext private constructor( } override fun dispatch(context: CoroutineContext, block: Runnable) { - handler.post(block) + if (!handler.post(block)) { + cancelOnRejection(context, block) + } } override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation) { val block = Runnable { with(continuation) { resumeUndispatched(Unit) } } - handler.postDelayed(block, timeMillis.coerceAtMost(MAX_DELAY)) - continuation.invokeOnCancellation { handler.removeCallbacks(block) } + if (handler.postDelayed(block, timeMillis.coerceAtMost(MAX_DELAY))) { + continuation.invokeOnCancellation { handler.removeCallbacks(block) } + } else { + cancelOnRejection(continuation.context, block) + } } override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle { - handler.postDelayed(block, timeMillis.coerceAtMost(MAX_DELAY)) - return object : DisposableHandle { - override fun dispose() { - handler.removeCallbacks(block) - } + if (handler.postDelayed(block, timeMillis.coerceAtMost(MAX_DELAY))) { + return DisposableHandle { handler.removeCallbacks(block) } } + cancelOnRejection(context, block) + return NonDisposableHandle + } + + private fun cancelOnRejection(context: CoroutineContext, block: Runnable) { + context.cancel(CancellationException("The task was rejected, the handler underlying the dispatcher '${toString()}' was closed")) + Dispatchers.IO.dispatch(context, block) } override fun toString(): String = toStringInternalImpl() ?: run { diff --git a/ui/kotlinx-coroutines-android/test/DisabledHandlerTest.kt b/ui/kotlinx-coroutines-android/test/DisabledHandlerTest.kt new file mode 100644 index 0000000000..a1f0a03d4a --- /dev/null +++ b/ui/kotlinx-coroutines-android/test/DisabledHandlerTest.kt @@ -0,0 +1,64 @@ +/* + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.android + +import android.os.* +import kotlinx.coroutines.* +import org.junit.* +import org.junit.runner.* +import org.robolectric.* +import org.robolectric.annotation.* + +@RunWith(RobolectricTestRunner::class) +@Config(manifest = Config.NONE, sdk = [28]) +class DisabledHandlerTest : TestBase() { + + private var delegateToSuper = false + private val disabledDispatcher = object : Handler() { + override fun sendMessageAtTime(msg: Message?, uptimeMillis: Long): Boolean { + if (delegateToSuper) return super.sendMessageAtTime(msg, uptimeMillis) + return false + } + }.asCoroutineDispatcher() + + @Test + fun testRunBlocking() { + expect(1) + try { + runBlocking(disabledDispatcher) { + expectUnreached() + } + expectUnreached() + } catch (e: CancellationException) { + finish(2) + } + } + + @Test + fun testInvokeOnCancellation() = runTest { + val job = launch(disabledDispatcher, start = CoroutineStart.LAZY) { expectUnreached() } + job.invokeOnCompletion { if (it != null) expect(2) } + yield() + expect(1) + job.join() + finish(3) + } + + @Test + fun testWithTimeout() = runTest { + delegateToSuper = true + try { + withContext(disabledDispatcher) { + expect(1) + delegateToSuper = false + delay(Long.MAX_VALUE - 1) + expectUnreached() + } + expectUnreached() + } catch (e: CancellationException) { + finish(2) + } + } +} diff --git a/ui/kotlinx-coroutines-android/test/HandlerDispatcherTest.kt b/ui/kotlinx-coroutines-android/test/HandlerDispatcherTest.kt index 55decde61b..5128a74caf 100644 --- a/ui/kotlinx-coroutines-android/test/HandlerDispatcherTest.kt +++ b/ui/kotlinx-coroutines-android/test/HandlerDispatcherTest.kt @@ -29,7 +29,7 @@ class HandlerDispatcherTest : TestBase() { fun mainIsAsync() = runTest { ReflectionHelpers.setStaticField(Build.VERSION::class.java, "SDK_INT", 28) - val mainLooper = ShadowLooper.getShadowMainLooper() + val mainLooper = shadowOf(Looper.getMainLooper()) mainLooper.pause() val mainMessageQueue = shadowOf(Looper.getMainLooper().queue) @@ -48,7 +48,7 @@ class HandlerDispatcherTest : TestBase() { val main = Looper.getMainLooper().asHandler(async = true).asCoroutineDispatcher() - val mainLooper = ShadowLooper.getShadowMainLooper() + val mainLooper = shadowOf(Looper.getMainLooper()) mainLooper.pause() val mainMessageQueue = shadowOf(Looper.getMainLooper().queue) @@ -67,7 +67,7 @@ class HandlerDispatcherTest : TestBase() { val main = Looper.getMainLooper().asHandler(async = true).asCoroutineDispatcher() - val mainLooper = ShadowLooper.getShadowMainLooper() + val mainLooper = shadowOf(Looper.getMainLooper()) mainLooper.pause() val mainMessageQueue = shadowOf(Looper.getMainLooper().queue) @@ -86,7 +86,7 @@ class HandlerDispatcherTest : TestBase() { val main = Looper.getMainLooper().asHandler(async = true).asCoroutineDispatcher() - val mainLooper = ShadowLooper.getShadowMainLooper() + val mainLooper = shadowOf(Looper.getMainLooper()) mainLooper.pause() val mainMessageQueue = shadowOf(Looper.getMainLooper().queue) @@ -105,7 +105,7 @@ class HandlerDispatcherTest : TestBase() { val main = Looper.getMainLooper().asHandler(async = false).asCoroutineDispatcher() - val mainLooper = ShadowLooper.getShadowMainLooper() + val mainLooper = shadowOf(Looper.getMainLooper()) mainLooper.pause() val mainMessageQueue = shadowOf(Looper.getMainLooper().queue)