Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

java.lang.NullPointerException: subscribeActual failed #27

Closed
koral-- opened this issue Sep 28, 2018 · 6 comments
Closed

java.lang.NullPointerException: subscribeActual failed #27

koral-- opened this issue Sep 28, 2018 · 6 comments

Comments

@koral--
Copy link

koral-- commented Sep 28, 2018

Not sure if this a bug in scarlet, in RxJava or just unsupported use case.
Repro steps unknown it is anonymous crash report.

Scarlet usage related to cancelation looks like this:

job = launch {
        fooService.receiveFoo().consumeEach { foo -> //...
  }
}

job may be canceled at anytime.

Scarlet version: 0.1.4
Stacktrace:

java.lang.NullPointerException: subscribeActual failed
       at io.reactivex.Maybe.empty(SourceFile:3732)
       at io.reactivex.internal.operators.maybe.MaybeMap.subscribeActual(SourceFile:40)
       at io.reactivex.Maybe.empty(SourceFile:3727)
       at io.reactivex.internal.operators.maybe.MaybeFilter.subscribeActual(SourceFile:39)
       at io.reactivex.Maybe.empty(SourceFile:3727)
       at io.reactivex.internal.operators.maybe.MaybeMap.subscribeActual(SourceFile:40)
       at io.reactivex.Maybe.empty(SourceFile:3727)
       at io.reactivex.internal.operators.maybe.MaybeFilter.subscribeActual(SourceFile:39)
       at io.reactivex.Maybe.empty(SourceFile:3727)
       at io.reactivex.internal.operators.maybe.MaybeMap.subscribeActual(SourceFile:40)
       at io.reactivex.Maybe.empty(SourceFile:3727)
       at io.reactivex.internal.operators.flowable.FlowableFlatMapMaybe$FlatMapMaybeSubscriber.getOrCreateQueue(SourceFile:132)
       at io.reactivex.internal.operators.flowable.FlowableObserveOn$ObserveOnSubscriber.runAsync(SourceFile:400)
       at io.reactivex.internal.operators.flowable.FlowableObserveOn$BaseObserveOnSubscriber.run(SourceFile:176)
       at io.reactivex.internal.schedulers.ScheduledRunnable.run(SourceFile:61)
       at io.reactivex.internal.schedulers.ScheduledRunnable.call(SourceFile:52)
       at java.util.concurrent.FutureTask.run(FutureTask.java:237)
       at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:154)
       at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:269)
       at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1113)
       at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:588)
       at java.lang.Thread.run(Thread.java:818)

Caused by kotlinx.coroutines.experimental.JobCancellationException: Child job was cancelled because of parent failure


Caused by kotlinx.coroutines.experimental.JobCancellationException: Job was cancelled normally
@mitsinsar
Copy link

I get the same exception sometimes. Have you found the solution?

@koral--
Copy link
Author

koral-- commented Sep 9, 2019

Unfortunately, no.

@lambdatamer
Copy link

lambdatamer commented Dec 30, 2019

I get the same exception sometimes. Have you found the solution?

Unfortunately, no.

Try this
FlowStreamAdapter.kt

class FlowStreamAdapter<T> : StreamAdapter<T, Flow<T>> {
    override fun adapt(stream: Stream<T>): Flow<T> {
        var subscription: Subscription? = null
        return callbackFlow {
            stream.subscribe(object : Subscriber<T> {
                override fun onSubscribe(s: Subscription?) {
                    subscription = s
                }

                override fun onNext(t: T) {
                    if (!isClosedForSend) offer(t)
                }

                override fun onComplete() {
                    close()
                }

                override fun onError(t: Throwable?) {
                    close(cause = t)
                }
            })
        }.onCompletion { subscription?.cancel() }
    }
}

CoroutinesFlowStreamAdapterFactory.kt

class CoroutinesFlowStreamAdapterFactory : StreamAdapter.Factory {
    override fun create(type: Type): StreamAdapter<Any, Any> {
        return when (type.getRawType()) {
            Flow::class.java -> FlowStreamAdapter()
            else -> throw IllegalArgumentException()
        }
    }
}

Add this stream adapter factory to Scarlet builder and replace ReceiveChannel<T> to Flow<T> in your services

@mhernand40
Copy link
Contributor

@koral-- in your code snippet, I'm going to assume that fooService.receiveFoo() returns an Rx stream? Is this an Observable or a Flowable? I am curious if this is related to Kotlin/kotlinx.coroutines#2104?

@koral--
Copy link
Author

koral-- commented Oct 8, 2020

AFAIR Flowable, as stracktrace suggests: io.reactivex.internal.operators.flowable.FlowableFlatMapMaybe

@mhernand40
Copy link
Contributor

mhernand40 commented Oct 9, 2020

Ah yes, I should have looked at the stack trace more carefully. I believe that Kotlin/kotlinx.coroutines#2104 may not necessarily be specific to Observable, but is probably less likely to occur with Flowable depending on the back pressure strategy.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants