Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ObservableSource.asFlow() extension should at least surround sendBlocking() call with try/catch block #2299

Closed
mhernand40 opened this issue Oct 14, 2020 · 1 comment

Comments

@mhernand40
Copy link

I work out of an Android code base that is mostly Rx-based. However, we have started writing new code using Coroutines. The kotlinx-coroutines-rx2 library is crucial for interfacing between the two libraries. However, ObservableSource.asFlow() has proven to be dangerous and has introduced crashes for us in production.

Please see #2104. I have shared several stack traces in that issue we have been seeing. The root cause seems to correspond with #974. However, it seems that effort is going to take some careful redesign. In the meantime, can we consider wrapping the call to sendBlocking() with a try/catch block? I'm thinking the catch block can specifically catch CancellationException or the appropriate subtypes, e.g., JobCancellationException and ChildCancelledException to avoid catching all Exceptions.

@drinkthestars
Copy link

This issue would also exist in kotlinx-coroutines-rx3/RxConvert because of the same callbackFlow + sendBlocking structure:

@ExperimentalCoroutinesApi
public fun <T: Any> ObservableSource<T>.asFlow(): Flow<T> = callbackFlow {
val disposableRef = AtomicReference<Disposable>()
val observer = object : Observer<T> {
override fun onComplete() { close() }
override fun onSubscribe(d: Disposable) { if (!disposableRef.compareAndSet(null, d)) d.dispose() }
override fun onNext(t: T) { sendBlocking(t) }
override fun onError(e: Throwable) { close(e) }
}
subscribe(observer)
awaitClose { disposableRef.getAndSet(Disposable.disposed())?.dispose() }
}

These implementations don't adhere to the callbackFlow documentation example that surrounds try/catch w/ sendBlocking:

* override fun onNextValue(value: T) {
* // To avoid blocking you can configure channel capacity using
* // either buffer(Channel.CONFLATED) or buffer(Channel.UNLIMITED) to avoid overfill
* try {
* sendBlocking(value)
* } catch (e: Exception) {
* // Handle exception from the channel: failure in flow or premature closing
* }
* }

If only to avoid repeating code with try/catch in both places, we might even need something like this (insp.):

fun <E> SendChannel<E>.safeSendBlocking/sendBlockingCatching(value: E) {
    return runCatching { sendBlocking(value) }
}

// OR
fun <E> SendChannel<E>.safeSendBlocking/sendBlockingCatching(value: E) {
    return try {
        sendBlocking(value)
    } catch (ex: ClosedSendChannelException) { // or other exceptions
        // ...
    }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants