Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PublisherAsFlow ignores the CoroutineContext passed in from flowOn? #1765

Closed
bennyhuo opened this issue Jan 18, 2020 · 6 comments
Closed

PublisherAsFlow ignores the CoroutineContext passed in from flowOn? #1765

bennyhuo opened this issue Jan 18, 2020 · 6 comments
Assignees

Comments

@bennyhuo
Copy link

bennyhuo commented Jan 18, 2020

kotlin version: 1.3.61
kotlinx.coroutines version: 1.3.3

Hi, I found that PublisherAsFlow ignores the CoroutineContext when created. It always uses an EmptyCoroutineContext instead.

private class PublisherAsFlow<T : Any>(
    private val publisher: Publisher<T>,
    capacity: Int
) : ChannelFlow<T>(EmptyCoroutineContext, capacity) {
    // Just ignore the context, use EmptyCoroutineContext for ChannelFlow
    override fun create(context: CoroutineContext, capacity: Int): ChannelFlow<T> =
        PublisherAsFlow(publisher, capacity)
    ...
}

So if I call flowOn on it, nothing happens.

Flowable.create<Int>({
        ... // run on ui thread, io thread expected!!!
    }, ...)
    .asFlow()
    .flowOn(Dispatchers.IO) // will be ignored!!!
    .launchIn(mainScope)
@zach-klippenstein
Copy link
Contributor

zach-klippenstein commented Jan 18, 2020

I don't think it's possible for the flow dispatcher to be used to affect the thread that an Rx stream is subscribed on, at least not as a reasonable default. PublisherAsFlow just subscribes to a stream by forwarding all emissions to a channel. The only way to do what you're trying to do is to use Rx APIs to configure the scheduler.

If you really need to, you could write an operator that wraps asFlow and creates a Scheduler that delegates to the coroutine dispatcher, but that feels dangerously "magical" and it would be much simpler to just call subscribeOn explicitly.

@bennyhuo
Copy link
Author

bennyhuo commented Jan 18, 2020

I can use rx schedulers to switch threads. But if I get a flow from a API which converted from a Flowable inside, and just want to make it flowOn a specific dispatcher, it won't act as expected.

If it is forbidden to call on flowOn on a PublisherAsFlow instance to switch threads, just throw a IllegalStateException to make it clear.

@zach-klippenstein
Copy link
Contributor

How would the library detect that? The implementation of flowOn would have to check if the type of the upstream Flow is PublisherAsFlow, but that type is defined in a different module that the core library doesn't depend on. And even if you could check the type, it would only catch the simplest case where you call flowOn immediately after asFlow(), and would allow intermediate operators (e.g. .asFlow().map { it.toString() }.flowOn()).

That said, this isn't really a problem with the reactive integration – the same thing comes up if you call flowOn() twice in a row since the second one won't do anything.

I see your point though, it just seems like a really hard problem to solve in a general way. I would expect good libraries to allow you to configure the schedulers being used if it's hiding the Rx types from you.

A simple implementation of an operator that sets the subscription scheduler based on the context might look like this:

@UseExperimental(ExperimentalCoroutinesApi::class)
fun <T : Any> Flowable<T>.asFlowWithScheduler(): Flow<T> = flow {
  val flowable = this@asFlowWithScheduler
  val upstream = (coroutineContext[ContinuationInterceptor] as? CoroutineDispatcher)
      // RxJava analog to Unconfined is to just not specify a scheduler.
      ?.takeUnless { it == Dispatchers.Unconfined }
      ?.let { dispatcher -> Schedulers.from(dispatcher.asExecutor()) }
      ?.let { scheduler -> flowable.subscribeOn(scheduler) }
      ?: flowable
  emitAll(upstream.asFlow())
}

@bennyhuo
Copy link
Author

Rx stuffs are called in the upstream of PublisherAsFlow, so the change of the calling upsteam will make effects to the called Rx stuffs, right?

I just wonder why PublisherAsFlow ignores the context passed in from flowOn? If it accept that context, it can affect the upstream by changing the thread of subscribing the upstream.

But it just ignore the context and use EmptyCoroutineContext instead. See the source code here.

private class PublisherAsFlow<T : Any>(
    private val publisher: Publisher<T>,
    capacity: Int
) : ChannelFlow<T>(EmptyCoroutineContext, capacity) {
    override fun create(context: CoroutineContext, capacity: Int): ChannelFlow<T> =
        PublisherAsFlow(publisher, capacity)
    ...
}

When we call flowOn, it will invoke create to simply create a new Flow instance without update the context. That's weird. It is really confusing to call flowOn on a PublisherAsFlow.

@bennyhuo
Copy link
Author

The solution seems to simply add a parameter context: CoroutineContext to PublisherAsFlow,

  1. use this context to initialize the super class ChannelFlow
  2. when create called, pass the context into the constructor of PublisherAsFlow
private class PublisherAsFlow<T : Any>(
    context: CoroutineContext, 
    private val publisher: Publisher<T>,
    capacity: Int
) : ChannelFlow<T>(context, capacity) {
    override fun create(context: CoroutineContext, capacity: Int): ChannelFlow<T> =
        PublisherAsFlow(context, publisher, capacity)
    ...
}

Maybe I could have a try later on.

@elizarov
Copy link
Contributor

Good catch. The fix would not so trivial, though. It is way more complicated than that.

As a work-around, you can use the following trick to convert Dispatchers.IO (or any other dispatcher for that matter) to Rx Scheduler: Schedulers.fromExecutor(Dispatchers.IO.asExecutor()), so then you can write your code in the following way:

Flowable.create<Int>({
        ... // runs on IO thread now!
    }, ...)
    .subsribeOn(Schedulers.fromExecutor(Dispatchers.IO.asExecutor())) // <--- !!!
    .asFlow()
    .launchIn(mainScope)

@elizarov elizarov self-assigned this Jan 20, 2020
elizarov added a commit that referenced this issue Jan 21, 2020
* When using asFlow().flowOn(...) context is now properly tracked and taken into account for both execution context of the reactive subscription and for injection into Reactor context.
* Publisher.asFlow slow-path implementation is simplified. It does not sure specialized openSubscription anymore, but always uses the same flow request logic.

Fixes #1765
elizarov added a commit that referenced this issue Jan 24, 2020
* When using asFlow().flowOn(...) context is now properly tracked and taken into account for both execution context of the reactive subscription and for injection into Reactor context.
* Publisher.asFlow slow-path implementation is simplified. It does not sure specialized openSubscription anymore, but always uses the same flow request logic.

Fixes #1765
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants