Skip to content

Commit

Permalink
Do not propagate cancellation to the upstream in Flow flatMapFirst op…
Browse files Browse the repository at this point in the history
…erators

(Related to Kotlin/kotlinx.coroutines#2964)
  • Loading branch information
hoc081098 committed Nov 23, 2021
1 parent 079e612 commit bb1c93a
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 9 deletions.
4 changes: 2 additions & 2 deletions build.gradle.kts
Expand Up @@ -11,7 +11,7 @@ plugins {
id("maven-publish")
id("com.vanniktech.maven.publish") version "0.18.0"
id("org.jetbrains.kotlinx.binary-compatibility-validator") version "0.8.0"
id("org.jetbrains.dokka") version "1.5.31"
id("org.jetbrains.dokka") version "1.6.0"
}

group = "io.github.hoc081098"
Expand All @@ -23,7 +23,7 @@ repositories {
gradlePluginPortal()
}

val kotlinCoroutinesVersion = "1.5.2"
val kotlinCoroutinesVersion = "1.6.0-RC"
val ktlintVersion = "0.43.0"

kotlin {
Expand Down
9 changes: 2 additions & 7 deletions src/commonMain/kotlin/com/hoc081098/flowext/flatMapFirst.kt
@@ -1,9 +1,7 @@
package com.hoc081098.flowext

import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineStart
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.cancel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.buffer
import kotlinx.coroutines.flow.channelFlow
Expand Down Expand Up @@ -35,19 +33,16 @@ public fun <T, R> Flow<T>.flatMapFirst(transform: suspend (value: T) -> Flow<R>)
*/
@ExperimentalCoroutinesApi
public fun <T> Flow<Flow<T>>.flattenFirst(): Flow<T> = channelFlow {
val outerScope = this
val busy = AtomicBoolean(false)

collect { inner ->
if (busy.compareAndSet(expect = false, update = true)) {
// Do not pay for dispatch here, it's never necessary
launch(start = CoroutineStart.UNDISPATCHED) {
try {
inner.collect { outerScope.send(it) }
inner.collect { send(it) }
} finally {
busy.value = false
} catch (e: CancellationException) {
// cancel outer scope on cancellation exception, too
outerScope.cancel(e)
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions src/commonMain/kotlin/com/hoc081098/flowext/neverFlow.kt
Expand Up @@ -6,4 +6,7 @@ import kotlinx.coroutines.flow.flow

private object NeverFlow : Flow<Nothing> by (flow { awaitCancellation() })

/**
* Returns a [Flow] that never emits any values.
*/
public fun neverFlow(): Flow<Nothing> = NeverFlow
25 changes: 25 additions & 0 deletions src/commonTest/kotlin/com/hoc081098/flowext/FlatMapFirstTest.kt
@@ -1,5 +1,6 @@
package com.hoc081098.flowext

import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.InternalCoroutinesApi
import kotlinx.coroutines.delay
Expand Down Expand Up @@ -110,4 +111,28 @@ class FlatMapFirstTest {
assertIs<RuntimeException>(it[1].errorOrThrow())
}
}

@Test
fun testCancellation() = suspendTest {
flow {
repeat(5) {
emit(
flow {
if (it == 2) throw CancellationException("")
emit(1)
}
)
}
}
.flattenFirst()
.test(
listOf(
Event.Value(1),
Event.Value(1),
Event.Value(1),
Event.Value(1),
Event.Complete,
)
)
}
}
28 changes: 28 additions & 0 deletions src/commonTest/kotlin/com/hoc081098/flowext/WithLatestFromTest.kt
@@ -1,9 +1,11 @@
package com.hoc081098.flowext

import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.InternalCoroutinesApi
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.emptyFlow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.onEach
Expand Down Expand Up @@ -75,4 +77,30 @@ class WithLatestFromTest {
.collect()
}
}

@Test
fun testWithLatestFrom_cancellation() = suspendTest {
assertFailsWith<CancellationException> {
flow {
emit(1)
throw CancellationException("")
}
.withLatestFrom(emptyFlow<Nothing>())
.collect()
}

flowOf(1)
.withLatestFrom(
flow {
emit(2)
throw CancellationException("")
}
)
.test(
listOf(
Event.Value(1 to 2),
Event.Complete
)
)
}
}

0 comments on commit bb1c93a

Please sign in to comment.