diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Share.kt b/kotlinx-coroutines-core/common/src/flow/operators/Share.kt index 4dd89ee4bf..6351d4a89b 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Share.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Share.kt @@ -330,13 +330,20 @@ private fun CoroutineScope.launchSharingDeferred( result: CompletableDeferred> ) { launch(context) { - var state: MutableStateFlow? = null - upstream.collect { value -> - state?.let { it.value = value } ?: run { - state = MutableStateFlow(value).also { - result.complete(it.asStateFlow()) + try { + var state: MutableStateFlow? = null + upstream.collect { value -> + state?.let { it.value = value } ?: run { + state = MutableStateFlow(value).also { + result.complete(it.asStateFlow()) + } } } + } catch (e: Throwable) { + // Notify the waiter that the flow has failed + result.completeExceptionally(e) + // But still cancel the scope where state was (not) produced + throw e } } } diff --git a/kotlinx-coroutines-core/common/test/flow/sharing/StateInTest.kt b/kotlinx-coroutines-core/common/test/flow/sharing/StateInTest.kt index 2a613afaf7..c90626deb8 100644 --- a/kotlinx-coroutines-core/common/test/flow/sharing/StateInTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/sharing/StateInTest.kt @@ -30,21 +30,21 @@ class StateInTest : TestBase() { @Test fun testUpstreamCompletedNoInitialValue() = - testUpstreamCompletedOrFailedReset(failed = false, iv = false) + testUpstreamCompletedOrFailedReset(failed = false, withInitialValue = false) @Test fun testUpstreamFailedNoInitialValue() = - testUpstreamCompletedOrFailedReset(failed = true, iv = false) + testUpstreamCompletedOrFailedReset(failed = true, withInitialValue = false) @Test fun testUpstreamCompletedWithInitialValue() = - testUpstreamCompletedOrFailedReset(failed = false, iv = true) + testUpstreamCompletedOrFailedReset(failed = false, withInitialValue = true) @Test fun testUpstreamFailedWithInitialValue() = - testUpstreamCompletedOrFailedReset(failed = true, iv = true) + testUpstreamCompletedOrFailedReset(failed = true, withInitialValue = true) - private fun testUpstreamCompletedOrFailedReset(failed: Boolean, iv: Boolean) = runTest { + private fun testUpstreamCompletedOrFailedReset(failed: Boolean, withInitialValue: Boolean) = runTest { val emitted = Job() val terminate = Job() val sharingJob = CompletableDeferred() @@ -56,7 +56,7 @@ class StateInTest : TestBase() { } val scope = this + sharingJob val shared: StateFlow - if (iv) { + if (withInitialValue) { shared = upstream.stateIn(scope, SharingStarted.Eagerly, null) assertEquals(null, shared.value) } else { @@ -75,4 +75,15 @@ class StateInTest : TestBase() { assertNull(sharingJob.getCompletionExceptionOrNull()) } } -} \ No newline at end of file + + @Test + fun testUpstreamFailedIMmediatelyWithInitialValue() = runTest { + val ceh = CoroutineExceptionHandler { _, _ -> expect(2) } + val flow = flow { + expect(1) + throw TestException() + } + assertFailsWith { flow.stateIn(CoroutineScope(ceh)) } + finish(3) + } +}