From bb1c93a5c8d26c51ab4649afc8b78df74ddd77be Mon Sep 17 00:00:00 2001 From: Petrus Nguyen Thai Hoc Date: Tue, 23 Nov 2021 20:08:22 +0700 Subject: [PATCH] Do not propagate cancellation to the upstream in Flow flatMapFirst operators (Related to https://github.com/Kotlin/kotlinx.coroutines/pull/2964) --- build.gradle.kts | 4 +-- .../com/hoc081098/flowext/flatMapFirst.kt | 9 ++---- .../kotlin/com/hoc081098/flowext/neverFlow.kt | 3 ++ .../com/hoc081098/flowext/FlatMapFirstTest.kt | 25 +++++++++++++++++ .../hoc081098/flowext/WithLatestFromTest.kt | 28 +++++++++++++++++++ 5 files changed, 60 insertions(+), 9 deletions(-) diff --git a/build.gradle.kts b/build.gradle.kts index 0d2686e7..bcaba8ac 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -11,7 +11,7 @@ plugins { id("maven-publish") id("com.vanniktech.maven.publish") version "0.18.0" id("org.jetbrains.kotlinx.binary-compatibility-validator") version "0.8.0" - id("org.jetbrains.dokka") version "1.5.31" + id("org.jetbrains.dokka") version "1.6.0" } group = "io.github.hoc081098" @@ -23,7 +23,7 @@ repositories { gradlePluginPortal() } -val kotlinCoroutinesVersion = "1.5.2" +val kotlinCoroutinesVersion = "1.6.0-RC" val ktlintVersion = "0.43.0" kotlin { diff --git a/src/commonMain/kotlin/com/hoc081098/flowext/flatMapFirst.kt b/src/commonMain/kotlin/com/hoc081098/flowext/flatMapFirst.kt index e0f5d473..29128163 100644 --- a/src/commonMain/kotlin/com/hoc081098/flowext/flatMapFirst.kt +++ b/src/commonMain/kotlin/com/hoc081098/flowext/flatMapFirst.kt @@ -1,9 +1,7 @@ package com.hoc081098.flowext -import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CoroutineStart import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.cancel import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.buffer import kotlinx.coroutines.flow.channelFlow @@ -35,7 +33,6 @@ public fun Flow.flatMapFirst(transform: suspend (value: T) -> Flow) */ @ExperimentalCoroutinesApi public fun Flow>.flattenFirst(): Flow = channelFlow { - val outerScope = this val busy = AtomicBoolean(false) collect { inner -> @@ -43,11 +40,9 @@ public fun Flow>.flattenFirst(): Flow = channelFlow { // Do not pay for dispatch here, it's never necessary launch(start = CoroutineStart.UNDISPATCHED) { try { - inner.collect { outerScope.send(it) } + inner.collect { send(it) } + } finally { busy.value = false - } catch (e: CancellationException) { - // cancel outer scope on cancellation exception, too - outerScope.cancel(e) } } } diff --git a/src/commonMain/kotlin/com/hoc081098/flowext/neverFlow.kt b/src/commonMain/kotlin/com/hoc081098/flowext/neverFlow.kt index d65a4bf9..09bb8ea6 100644 --- a/src/commonMain/kotlin/com/hoc081098/flowext/neverFlow.kt +++ b/src/commonMain/kotlin/com/hoc081098/flowext/neverFlow.kt @@ -6,4 +6,7 @@ import kotlinx.coroutines.flow.flow private object NeverFlow : Flow by (flow { awaitCancellation() }) +/** + * Returns a [Flow] that never emits any values. + */ public fun neverFlow(): Flow = NeverFlow diff --git a/src/commonTest/kotlin/com/hoc081098/flowext/FlatMapFirstTest.kt b/src/commonTest/kotlin/com/hoc081098/flowext/FlatMapFirstTest.kt index 39f30791..9800fc6c 100644 --- a/src/commonTest/kotlin/com/hoc081098/flowext/FlatMapFirstTest.kt +++ b/src/commonTest/kotlin/com/hoc081098/flowext/FlatMapFirstTest.kt @@ -1,5 +1,6 @@ package com.hoc081098.flowext +import kotlinx.coroutines.CancellationException import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.InternalCoroutinesApi import kotlinx.coroutines.delay @@ -110,4 +111,28 @@ class FlatMapFirstTest { assertIs(it[1].errorOrThrow()) } } + + @Test + fun testCancellation() = suspendTest { + flow { + repeat(5) { + emit( + flow { + if (it == 2) throw CancellationException("") + emit(1) + } + ) + } + } + .flattenFirst() + .test( + listOf( + Event.Value(1), + Event.Value(1), + Event.Value(1), + Event.Value(1), + Event.Complete, + ) + ) + } } diff --git a/src/commonTest/kotlin/com/hoc081098/flowext/WithLatestFromTest.kt b/src/commonTest/kotlin/com/hoc081098/flowext/WithLatestFromTest.kt index 88a88e83..ffe395be 100644 --- a/src/commonTest/kotlin/com/hoc081098/flowext/WithLatestFromTest.kt +++ b/src/commonTest/kotlin/com/hoc081098/flowext/WithLatestFromTest.kt @@ -1,9 +1,11 @@ package com.hoc081098.flowext +import kotlinx.coroutines.CancellationException import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.InternalCoroutinesApi import kotlinx.coroutines.delay import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.flow.emptyFlow import kotlinx.coroutines.flow.flow import kotlinx.coroutines.flow.flowOf import kotlinx.coroutines.flow.onEach @@ -75,4 +77,30 @@ class WithLatestFromTest { .collect() } } + + @Test + fun testWithLatestFrom_cancellation() = suspendTest { + assertFailsWith { + flow { + emit(1) + throw CancellationException("") + } + .withLatestFrom(emptyFlow()) + .collect() + } + + flowOf(1) + .withLatestFrom( + flow { + emit(2) + throw CancellationException("") + } + ) + .test( + listOf( + Event.Value(1 to 2), + Event.Complete + ) + ) + } }