Skip to content

Commit

Permalink
Propagate exception from stateIn to the caller when the upstream fail…
Browse files Browse the repository at this point in the history
…ed to produce initial value (#2329)
  • Loading branch information
qwwdfsad committed Oct 22, 2020
1 parent 6843648 commit 3275d22
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 12 deletions.
17 changes: 12 additions & 5 deletions kotlinx-coroutines-core/common/src/flow/operators/Share.kt
Expand Up @@ -330,13 +330,20 @@ private fun <T> CoroutineScope.launchSharingDeferred(
result: CompletableDeferred<StateFlow<T>>
) {
launch(context) {
var state: MutableStateFlow<T>? = null
upstream.collect { value ->
state?.let { it.value = value } ?: run {
state = MutableStateFlow(value).also {
result.complete(it.asStateFlow())
try {
var state: MutableStateFlow<T>? = null
upstream.collect { value ->
state?.let { it.value = value } ?: run {
state = MutableStateFlow(value).also {
result.complete(it.asStateFlow())
}
}
}
} catch (e: Throwable) {
// Notify the waiter that the flow has failed
result.completeExceptionally(e)
// But still cancel the scope where state was (not) produced
throw e
}
}
}
Expand Down
25 changes: 18 additions & 7 deletions kotlinx-coroutines-core/common/test/flow/sharing/StateInTest.kt
Expand Up @@ -30,21 +30,21 @@ class StateInTest : TestBase() {

@Test
fun testUpstreamCompletedNoInitialValue() =
testUpstreamCompletedOrFailedReset(failed = false, iv = false)
testUpstreamCompletedOrFailedReset(failed = false, withInitialValue = false)

@Test
fun testUpstreamFailedNoInitialValue() =
testUpstreamCompletedOrFailedReset(failed = true, iv = false)
testUpstreamCompletedOrFailedReset(failed = true, withInitialValue = false)

@Test
fun testUpstreamCompletedWithInitialValue() =
testUpstreamCompletedOrFailedReset(failed = false, iv = true)
testUpstreamCompletedOrFailedReset(failed = false, withInitialValue = true)

@Test
fun testUpstreamFailedWithInitialValue() =
testUpstreamCompletedOrFailedReset(failed = true, iv = true)
testUpstreamCompletedOrFailedReset(failed = true, withInitialValue = true)

private fun testUpstreamCompletedOrFailedReset(failed: Boolean, iv: Boolean) = runTest {
private fun testUpstreamCompletedOrFailedReset(failed: Boolean, withInitialValue: Boolean) = runTest {
val emitted = Job()
val terminate = Job()
val sharingJob = CompletableDeferred<Unit>()
Expand All @@ -56,7 +56,7 @@ class StateInTest : TestBase() {
}
val scope = this + sharingJob
val shared: StateFlow<String?>
if (iv) {
if (withInitialValue) {
shared = upstream.stateIn(scope, SharingStarted.Eagerly, null)
assertEquals(null, shared.value)
} else {
Expand All @@ -75,4 +75,15 @@ class StateInTest : TestBase() {
assertNull(sharingJob.getCompletionExceptionOrNull())
}
}
}

@Test
fun testUpstreamFailedIMmediatelyWithInitialValue() = runTest {
val ceh = CoroutineExceptionHandler { _, _ -> expect(2) }
val flow = flow<Int> {
expect(1)
throw TestException()
}
assertFailsWith<TestException> { flow.stateIn(CoroutineScope(ceh)) }
finish(3)
}
}

0 comments on commit 3275d22

Please sign in to comment.