Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ChildCancelledException when using Rx Observable.asFlow() extension within combine() #2104

Closed
mhernand40 opened this issue Jun 24, 2020 · 6 comments
Assignees
Labels

Comments

@mhernand40
Copy link

I apologize but I do not have a reduced repro for this at the moment. I am encountering a race condition that occurs in an Android app I am working on where we are slowly migrating from RxJava 2 to Coroutines. Essentially we have an upstream MutableStateFlow that we debounce and then combine with another MutableStateFlow and emit a Pair. From there, we apply flatMapLatest which involves a downstream Flow that is a combine against several Flows where some of these Flows are Rx Observables that are converted to Flows using the Observable.asFlow() extension, which btw, is still experimental.

The logic/pseudocode is as follows:

searchQueryFlow // MutableStateFlow
    .debounce(SEARCH_QUERY_DEBOUNCE_IN_MILLIS)
    .combine(isLoadingFlow /* MutableStateFlow */) { query, isLoading ->
        Pair(query, isLoading)
    }
    .flatMapLatest { (searchQuery, isLoading) ->
        combine(
            rxObservable1.asFlow().flowOn(Dispatchers.IO),
            rxObservable2.asFlow().flowOn(Dispatchers.IO),
            anActualFlow,
            rxObservable3.asFlow().flowOn(Dispatchers.IO)
        ) {
            ... // Process and build a list of items
        }.flowOn(Dispatchers.Default)
    }

Here is the stack trace I am seeing:

2020-06-23 18:23:11.885 ********: Undeliverable RxJava exception
    kotlinx.coroutines.flow.internal.ChildCancelledException: Child of the scoped flow was cancelled
2020-06-23 18:23:11.965 28425-28425/******* E/TimberLogger: error observing match list items provider
    java.lang.NullPointerException: Actually not, but can't throw other exceptions due to RS
        at io.reactivex.Observable.subscribe(Observable.java:12276)
        at io.reactivex.internal.operators.observable.ObservableConcatMap.subscribeActual(ObservableConcatMap.java:55)
        at io.reactivex.Observable.subscribe(Observable.java:12267)
        at io.reactivex.internal.operators.observable.ObservableMap.subscribeActual(ObservableMap.java:32)
        at io.reactivex.Observable.subscribe(Observable.java:12267)
        at io.reactivex.internal.operators.observable.ObservableCombineLatest$LatestCoordinator.subscribe(ObservableCombineLatest.java:117)
        at io.reactivex.internal.operators.observable.ObservableCombineLatest.subscribeActual(ObservableCombineLatest.java:71)
        at io.reactivex.Observable.subscribe(Observable.java:12267)
        at io.reactivex.internal.operators.observable.ObservableMap.subscribeActual(ObservableMap.java:32)
        at io.reactivex.Observable.subscribe(Observable.java:12267)
        at io.reactivex.internal.operators.observable.ObservableDistinctUntilChanged.subscribeActual(ObservableDistinctUntilChanged.java:35)
        at io.reactivex.Observable.subscribe(Observable.java:12267)
        at io.reactivex.internal.operators.observable.ObservableCombineLatest$LatestCoordinator.subscribe(ObservableCombineLatest.java:117)
        at io.reactivex.internal.operators.observable.ObservableCombineLatest.subscribeActual(ObservableCombineLatest.java:71)
        at io.reactivex.Observable.subscribe(Observable.java:12267)
        at io.reactivex.internal.operators.observable.ObservableDistinctUntilChanged.subscribeActual(ObservableDistinctUntilChanged.java:35)
        at io.reactivex.Observable.subscribe(Observable.java:12267)
        at kotlinx.coroutines.rx2.RxConvertKt$asFlow$1.invokeSuspend(RxConvert.kt:102)
        at kotlinx.coroutines.rx2.RxConvertKt$asFlow$1.invoke(Unknown Source:10)
        at kotlinx.coroutines.flow.ChannelFlowBuilder.collectTo$suspendImpl(Builders.kt:323)
        at kotlinx.coroutines.flow.ChannelFlowBuilder.collectTo(Unknown Source:0)
        at kotlinx.coroutines.flow.CallbackFlowBuilder.collectTo(Builders.kt:336)
        at kotlinx.coroutines.flow.internal.ChannelFlow$collectToFun$1.invokeSuspend(ChannelFlow.kt:53)
        at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
        at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:56)
        at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:571)
        at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:738)
        at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:678)
        at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:665)
     Caused by: kotlinx.coroutines.flow.internal.ChildCancelledException: Child of the scoped flow was cancelled
@mhernand40
Copy link
Author

As a mitigation, I've switched from using Observable.asFlow() to Observable.toFlowable(BackPressureStrategy.LATEST).asFlow() which ultimately ends up using the stable Publisher.asFlow() extension instead. I am unable to reproduce this stack trace with this approach.

@damianw
Copy link

damianw commented Aug 26, 2020

We ran into this as well... if I understand the problem correctly, it's more general than just ChildCancelledException as a number of internal coroutines exceptions seem to get accidentally propagated to the Observable which ends up in RxJava's global error handler.

I have created a custom asFlow using @mhernand40's suggestion above (although changing to use BackpressureStrategy.BUFFER) and added a test to our codebase something like the following:

import com.example.rx2.asFlow // via Publisher
import kotlinx.coroutines.rx2.asFlow as _asFlow

class ToFlowConversionsTest {

    private val uncaughtRxExceptions = CopyOnWriteArrayList<Throwable>()

    // Purposefully do not customize any schedulers - the tests will use Thread.sleep.
    // I can't seem to reproduce the bug any other way.
    @[JvmField Rule] val rxJavaPluginsRule = RxJavaPluginsRule(
        errorHandler = { uncaughtRxExceptions += it }
    )

    @[JvmField Rule] val coroutineRule = TestCoroutineRule {
        TestProvidedCoroutineScope(context = TestCoroutineExceptionHandler())
    }

    @Test
    fun testThatWorkaroundFixesTheProblem() {
        doTest { asFlow() }
        assertEquals<List<Throwable>>(emptyList(), uncaughtRxExceptions)
    }

    @Test
    fun testThatBuiltInObservableAsFlowIsStillBroken() {
        doTest { _asFlow() }
        val failureMessage = uncaughtRxExceptions.toString()
        // Sometimes there can be more than one!
        assertNotEquals(0, uncaughtRxExceptions.size, failureMessage)
        val expectedPrefix = "kotlinx.coroutines.JobCancellationException: StandaloneCoroutine was cancelled"
        for (thrown in uncaughtRxExceptions) {
            assertTrue(thrown.toString().startsWith(expectedPrefix), failureMessage)
        }
    }

    private fun doTest(convert: Observable<Long>.() -> Flow<Long>) {
        val observable = Observable.interval(1L, TimeUnit.MILLISECONDS)
        val flow = observable.convert()
        val items = mutableListOf<Long>()
        val job = flow
            .onEach { items += it }
            .launchIn(coroutineRule)
        Thread.sleep(20L)
        coroutineRule.advanceUntilIdle()
        job.cancel()
    }
}

Although I was initially getting pretty good results from testThatBuiltInObservableAsFlowIsStillBroken, it turned out that it was flakier than I initially thought and ended up disabling it. YMMV if that test passes or fails when run multiple times.

Regardless, it seems like none of these exceptions should be propagating to Rx's global error handling.

@mhernand40
Copy link
Author

Seeing another crash with a slightly different stack trace that seems related:

Fatal Exception: java.lang.NullPointerException: subscribeActual failed
       at io.reactivex.Maybe.subscribe(Maybe.java:4295)
       at io.reactivex.internal.operators.maybe.MaybeToSingle.subscribeActual(MaybeToSingle.java:46)
       at io.reactivex.Single.subscribe(Single.java:3666)
       at io.reactivex.internal.operators.single.SingleToObservable.subscribeActual(SingleToObservable.java:35)
       at io.reactivex.Observable.subscribe(Observable.java:12284)
       at io.reactivex.internal.operators.observable.ObservableConcatMap$ConcatMapDelayErrorObserver.drain(ObservableConcatMap.java:475)
       at io.reactivex.internal.operators.observable.ObservableConcatMap$ConcatMapDelayErrorObserver.onSubscribe(ObservableConcatMap.java:330)
       at io.reactivex.internal.operators.observable.ObservableFromArray.subscribeActual(ObservableFromArray.java:31)
       at io.reactivex.Observable.subscribe(Observable.java:12284)
       at io.reactivex.internal.operators.observable.ObservableConcatMap.subscribeActual(ObservableConcatMap.java:55)
       at io.reactivex.Observable.subscribe(Observable.java:12284)
       at io.reactivex.internal.operators.observable.ObservableMap.subscribeActual(ObservableMap.java:32)
       at io.reactivex.Observable.subscribe(Observable.java:12284)
       at kotlinx.coroutines.rx2.RxConvertKt$asFlow$1.invokeSuspend(RxConvertKt.java:102)
       at kotlinx.coroutines.rx2.RxConvertKt$asFlow$1.invoke(RxConvertKt.java:10)
       at kotlinx.coroutines.flow.ChannelFlowBuilder.collectTo$suspendImpl(ChannelFlowBuilder.java:328)
       at kotlinx.coroutines.flow.ChannelFlowBuilder.collectTo(ChannelFlowBuilder.java)
       at kotlinx.coroutines.flow.CallbackFlowBuilder.collectTo(CallbackFlowBuilder.java:341)
       at kotlinx.coroutines.flow.internal.ChannelFlow$collectToFun$1.invokeSuspend(ChannelFlow.java:53)
       at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(BaseContinuationImpl.java:33)
       at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.java:56)
       at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.java:571)
       at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.java:738)
       at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.java:678)
       at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.java:665)
Caused by kotlinx.coroutines.flow.internal.ChildCancelledException: Child of the scoped flow was cancelled

Unfortunately, given the depth of this stack trace, it is difficult to trace it down to the root source.

@mhernand40
Copy link
Author

Here is another one:

Fatal Exception: java.lang.NullPointerException: Actually not, but can't throw other exceptions due to RS
       at io.reactivex.Observable.subscribe(Observable.java:12293)
       at io.reactivex.internal.operators.observable.ObservableRepeatWhen$RepeatWhenObserver.subscribeNext(ObservableRepeatWhen.java:151)
       at io.reactivex.internal.operators.observable.ObservableRepeatWhen.subscribeActual(ObservableRepeatWhen.java:60)
       at io.reactivex.Observable.subscribe(Observable.java:12284)
       at io.reactivex.internal.operators.observable.ObservableDistinctUntilChanged.subscribeActual(ObservableDistinctUntilChanged.java:35)
       at io.reactivex.Observable.subscribe(Observable.java:12284)
       at io.reactivex.internal.operators.observable.ObservableMap.subscribeActual(ObservableMap.java:32)
       at io.reactivex.Observable.subscribe(Observable.java:12284)
       at io.reactivex.internal.operators.observable.ObservableDoOnEach.subscribeActual(ObservableDoOnEach.java:42)
       at io.reactivex.Observable.subscribe(Observable.java:12284)
       at io.reactivex.internal.operators.observable.ObservableDoOnEach.subscribeActual(ObservableDoOnEach.java:42)
       at io.reactivex.Observable.subscribe(Observable.java:12284)
       at io.reactivex.internal.operators.observable.ObservableMap.subscribeActual(ObservableMap.java:32)
       at io.reactivex.Observable.subscribe(Observable.java:12284)
       at kotlinx.coroutines.rx2.RxConvertKt$asFlow$1.invokeSuspend(RxConvertKt.java:102)
       at kotlinx.coroutines.rx2.RxConvertKt$asFlow$1.invoke(RxConvertKt.java:10)
       at kotlinx.coroutines.flow.ChannelFlowBuilder.collectTo$suspendImpl(ChannelFlowBuilder.java:328)
       at kotlinx.coroutines.flow.ChannelFlowBuilder.collectTo(ChannelFlowBuilder.java)
       at kotlinx.coroutines.flow.CallbackFlowBuilder.collectTo(CallbackFlowBuilder.java:341)
       at kotlinx.coroutines.flow.internal.ChannelFlow$collectToFun$1.invokeSuspend(ChannelFlow.java:53)
       at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(BaseContinuationImpl.java:33)
       at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.java:56)
       at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.java:571)
       at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.java:738)
       at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.java:678)
       at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.java:665)
Caused by kotlinx.coroutines.JobCancellationException: StandaloneCoroutine was cancelled

This concerns me because RxJava 2 is used heavily in our code base and recently we have encouraged developers to prefer Coroutines instead. However the Rx2 <-> Coroutines interop is a must in order for us to move forward with Coroutines for new code.

@drinkthestars
Copy link

drinkthestars commented Oct 10, 2020

On a similar trace, turning on kotlinx.coroutines.DEBUG_PROPERTY_NAME yielded some more details:

Caused by: kotlinx.coroutines.flow.internal.ChildCancelledException: Child of the scoped flow was cancelled
 	(Coroutine boundary)
 	at kotlinx.coroutines.channels.AbstractSendChannel.offer(AbstractChannel.kt:156)
 	at kotlinx.coroutines.channels.ChannelCoroutine.offer(Unknown Source:2)
 	at kotlinx.coroutines.channels.ChannelsKt__ChannelsKt.sendBlocking(Channels.kt:21)
 	at kotlinx.coroutines.channels.ChannelsKt.sendBlocking(Unknown Source:1)
 	at kotlinx.coroutines.rx2.RxConvertKt$asFlow$1$observer$1.onNext(RxConvert.kt:98)

Where AbstractSendChannel.offer(AbstractChannel.kt:156) looks to be:

result is Closed<*> -> throw recoverStackTrace(helpCloseAndGetSendException(result))

This might suggest that the underlying issue is actually about offering to a closed Channel #974. Looks like it either might already have been or will be fixed in 1.4+.

@drinkthestars
Copy link

drinkthestars commented Oct 14, 2020

The documentation in callbackFlow demonstrates a try/catch surrounding a sendBlocking:

* override fun onNextValue(value: T) {
* // To avoid blocking you can configure channel capacity using
* // either buffer(Channel.CONFLATED) or buffer(Channel.UNLIMITED) to avoid overfill
* try {
* sendBlocking(value)
* } catch (e: Exception) {
* // Handle exception from the channel: failure in flow or premature closing
* }
* }

Looks like the implementation of ObservableSource.asFlow() violates that:

@ExperimentalCoroutinesApi
public fun <T: Any> ObservableSource<T>.asFlow(): Flow<T> = callbackFlow {
val disposableRef = AtomicReference<Disposable>()
val observer = object : Observer<T> {
override fun onComplete() { close() }
override fun onSubscribe(d: Disposable) { if (!disposableRef.compareAndSet(null, d)) d.dispose() }
override fun onNext(t: T) { sendBlocking(t) }
override fun onError(e: Throwable) { close(e) }
}
subscribe(observer)
awaitClose { disposableRef.getAndSet(Disposables.disposed())?.dispose() }
}

Within the callbackFlow, the awaitClose that disposes the Disposable only runs after the resulting Flow collection is cancelled. If there is an onNext happening before awaitClose can run, then it will try to call sendBlocking on a closed Channel.

The reason Publisher.asFlow() is stable in this regard is because:

  • The Publisher pattern involves a request + onNext cadence for sending items
  • The ReactiveSubscriber that gets created for this extension uses a regular Channel internally, and it is closed only on Subscriber.onComplete/Subscriber.onError

So, this is related to both #1826, and #974.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

4 participants