Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consistently handle exceptions in reactive streams #2646

Merged
merged 7 commits into from Apr 20, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions reactive/kotlinx-coroutines-jdk9/test/FlowAsPublisherTest.kt
Expand Up @@ -57,7 +57,7 @@ class FlowAsPublisherTest : TestBase() {
private lateinit var subscription: JFlow.Subscription

override fun onComplete() {
expect(3)
expectUnreached()
}

override fun onSubscribe(s: JFlow.Subscription?) {
Expand All @@ -74,7 +74,7 @@ class FlowAsPublisherTest : TestBase() {
expectUnreached()
}
})
finish(4)
finish(3)
}

@Test
Expand Down
27 changes: 11 additions & 16 deletions reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt
Expand Up @@ -13,6 +13,7 @@ import kotlinx.coroutines.intrinsics.*
import org.reactivestreams.*
import java.util.*
import kotlin.coroutines.*
import kotlinx.coroutines.internal.*

/**
* Transforms the given reactive [Publisher] into [Flow].
Expand Down Expand Up @@ -204,17 +205,12 @@ public class FlowSubscription<T>(
}

private suspend fun flowProcessing() {
val consumeSucceeded = try {
try {
consumeFlow()
true
} catch (cause: Throwable) {
/* TODO: The part after "||" is a hack needed due to [cause] having travelled over a coroutine boundary and
being changed from the result of [getCancellationException()]. */
if (cancellationRequested && !isActive && (cause === getCancellationException() || cause.cause === getCancellationException() && cause.message == getCancellationException().message)) {
/* TODO: This is incorrect, as [Subscriber.onComplete] denotes the end of the stream and not just any
non-erroneous terminal state. */
subscriber.onComplete()
} else {
@Suppress("INVISIBLE_MEMBER")
val unwrappedCause = unwrap(cause)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#2551 filed a use-case

if (!cancellationRequested || isActive || unwrappedCause !== getCancellationException()) {
try {
subscriber.onError(cause)
} catch (e: Throwable) {
Expand All @@ -223,14 +219,13 @@ public class FlowSubscription<T>(
handleCoroutineException(coroutineContext, cause)
}
}
false
return
}
if (consumeSucceeded) {
try {
subscriber.onComplete()
} catch (e: Throwable) {
handleCoroutineException(coroutineContext, e)
}
// We only call this if `consumeFlow()` finished successfully
try {
subscriber.onComplete()
} catch (e: Throwable) {
handleCoroutineException(coroutineContext, e)
}
}

Expand Down
Expand Up @@ -57,7 +57,7 @@ class FlowAsPublisherTest : TestBase() {
private lateinit var subscription: Subscription

override fun onComplete() {
expect(3)
expectUnreached()
}

override fun onSubscribe(s: Subscription?) {
Expand All @@ -74,7 +74,7 @@ class FlowAsPublisherTest : TestBase() {
expectUnreached()
}
})
finish(4)
finish(3)
}

@Test
Expand Down
Expand Up @@ -65,7 +65,6 @@ class PublisherRequestStressTest : TestBase() {
fun testRequestStress() {
val expectedValue = AtomicLong(0)
val requestedTill = AtomicLong(0)
val completionLatch = CountDownLatch(1)
val callingOnNext = AtomicInteger()

val publisher = mtFlow().asPublisher()
Expand All @@ -75,7 +74,7 @@ class PublisherRequestStressTest : TestBase() {
private var demand = 0L // only updated from reqPool

override fun onComplete() {
completionLatch.countDown()
expectUnreached()
}

override fun onSubscribe(sub: Subscription) {
Expand Down Expand Up @@ -124,7 +123,9 @@ class PublisherRequestStressTest : TestBase() {
}
if (!error) {
subscription.cancel()
completionLatch.await()
runBlocking {
(subscription as AbstractCoroutine<*>).join()
}
}
}

Expand Down
4 changes: 3 additions & 1 deletion reactive/kotlinx-coroutines-reactor/src/Mono.kt
Expand Up @@ -13,6 +13,7 @@ import reactor.core.*
import reactor.core.publisher.*
import kotlin.coroutines.*
import kotlin.internal.*
import kotlinx.coroutines.internal.*

/**
* Creates a cold [mono][Mono] that runs a given [block] in a coroutine and emits its result.
Expand Down Expand Up @@ -59,7 +60,8 @@ private class MonoCoroutine<in T>(

override fun onCancelled(cause: Throwable, handled: Boolean) {
/** Cancellation exceptions that were caused by [dispose], that is, came from downstream, are not errors. */
if (getCancellationException() !== cause || !disposed) {
val unwrappedCause = unwrap(cause)
if (getCancellationException() !== unwrappedCause || !disposed) {
try {
/** If [sink] turns out to already be in a terminal state, this exception will be passed through the
* [Hooks.onOperatorError] hook, which is the way to signal undeliverable exceptions in Reactor. */
Expand Down
9 changes: 6 additions & 3 deletions reactive/kotlinx-coroutines-rx2/src/RxObservable.kt
Expand Up @@ -9,6 +9,7 @@ import io.reactivex.exceptions.*
import kotlinx.atomicfu.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.internal.*
import kotlinx.coroutines.selects.*
import kotlinx.coroutines.sync.*
import java.lang.RuntimeException
Expand Down Expand Up @@ -149,18 +150,20 @@ private class RxObservableCoroutine<T : Any>(
if (_signal.value == SIGNALLED)
return
_signal.value = SIGNALLED // we'll signal onError/onCompleted (that the final state -- no CAS needed)
if (cause == null) {
@Suppress("INVISIBLE_MEMBER")
val unwrappedCause = cause?.let { unwrap(it) }
if (unwrappedCause == null) {
try {
subscriber.onComplete()
} catch (e: Exception) {
handleUndeliverableException(e, context)
}
} else if (cause is UndeliverableException && !handled) {
} else if (unwrappedCause is UndeliverableException && !handled) {
/** Such exceptions are not reported to `onError`, as, according to the reactive specifications,
* exceptions thrown from the Subscriber methods must be treated as if the Subscriber was already
* cancelled. */
handleUndeliverableException(cause, context)
} else if (cause !== getCancellationException() || !subscriber.isDisposed) {
} else if (unwrappedCause !== getCancellationException() || !subscriber.isDisposed) {
try {
/** If the subscriber is already in a terminal state, the error will be signalled to
* `RxJavaPlugins.onError`. */
Expand Down
9 changes: 6 additions & 3 deletions reactive/kotlinx-coroutines-rx3/src/RxObservable.kt
Expand Up @@ -12,6 +12,7 @@ import kotlinx.coroutines.channels.*
import kotlinx.coroutines.selects.*
import kotlinx.coroutines.sync.*
import kotlin.coroutines.*
import kotlinx.coroutines.internal.*

/**
* Creates cold [observable][Observable] that will run a given [block] in a coroutine.
Expand Down Expand Up @@ -148,18 +149,20 @@ private class RxObservableCoroutine<T : Any>(
if (_signal.value == SIGNALLED)
return
_signal.value = SIGNALLED // we'll signal onError/onCompleted (that the final state -- no CAS needed)
if (cause == null) {
@Suppress("INVISIBLE_MEMBER")
val unwrappedCause = cause?.let { unwrap(it) }
if (unwrappedCause == null) {
try {
subscriber.onComplete()
} catch (e: Exception) {
handleUndeliverableException(e, context)
}
} else if (cause is UndeliverableException && !handled) {
} else if (unwrappedCause is UndeliverableException && !handled) {
/** Such exceptions are not reported to `onError`, as, according to the reactive specifications,
* exceptions thrown from the Subscriber methods must be treated as if the Subscriber was already
* cancelled. */
handleUndeliverableException(cause, context)
} else if (cause !== getCancellationException() || !subscriber.isDisposed) {
} else if (unwrappedCause !== getCancellationException() || !subscriber.isDisposed) {
try {
/** If the subscriber is already in a terminal state, the error will be signalled to
* `RxJavaPlugins.onError`. */
Expand Down