Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DRAFT] Rx Observable as Flow #1768

Closed

Conversation

mareklangiewicz
Copy link
Contributor

No description provided.

@elizarov
Copy link
Contributor

How this conversion is supposed to deal with back-pressure that rx Observable does not support?

@mareklangiewicz
Copy link
Contributor Author

User can select strategy for backpressure with Flow.buffer(...)
(like in ObservableAsFlowTest.testProduce)
I just use callbackFlow, so backpressure is handled exactly the same way
as in callbackFlow kdoc description/example.
Our Observable is just a typical CallbackBasedApi
https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/callback-flow.html

Btw. this ObservableSource.asFlow operator is really useful in typical android case where we have very popular RxBinding library (https://github.com/JakeWharton/RxBinding) that allows to observe
all user actions (clicks, etc) as Observable streams. In this case dropping some user actions when app is busy can be desirable. So we can say: button.clicks().asFlow().buffer(0) (without any Flowable involved - which is big, complicated (rx backpressure strategies), and unnecessary in such cases.

@elizarov
Copy link
Contributor

elizarov commented Jan 22, 2020

I'm worried that the default behavior is the same as callbackFlow provides, which is using a buffer that start dropping events when buffer is full. This might be totally unexpected to a typical Rx user, who might assume that, by default, events are never dropped. So, consider the following test with Rx:

assertEquals(10_000L, Observable.range(1, 10_000).count().blockingGet())

It passes.

Now, lets use asFlow() from this PR:

assertEquals(10_000, runBlocking { Observable.range(1, 10_000).asFlow().count() })

It fails. Not good.

@JakeWharton
Copy link
Contributor

Rx itself forces you to pick the strategy at the point of conversion when going from Observable to Flowable. Perhaps the same technique could be employed here.

@mareklangiewicz
Copy link
Contributor Author

@elizarov @JakeWharton
I thought about adding obligatory parameter with buffer capacity, but decided to propose version without it to be consistent with other operators (and Flow builders) that use Channel that promote adding separate operators to already created Flow to adjust Channel properties.
(for example: flowOn, conflate, buffer, produceIn, etc..)

But it's true that in this case default buffer size can be confusing especially if user just say myObservable.asFlow() without even realizing that there is some Channel involved.
So I just commited new version with obligatory explicit buffer size and more explicit name: asChannelFlow. (and I marked it as ExperimentalCoroutinesApi)
Please let me know what you think.

Side note about the Observable.range(1, 10_000).asFlow().count() example:
The Observable here is pretty special meaning it is totally synchronous and emits all 10_000 items immediately, so we can say it runs in a "context" of a collector. Usually Observable sources are asynchronous and can use their own "contexts" (schedulers/threads) to emit items, and our ObservableSource.asFlow has to work correctly in such cases too.
If not that, I could probably implement Observable.asFlow that would work with this special example and would not even use any Channel in between.
(if I knew how to call suspendable emit inside non-suspendable onNext knowing that the emit lambda in the count operator does not actually suspend..) ;-)

@elizarov
Copy link
Contributor

elizarov commented Jan 23, 2020

There is a solution for Observable.range(1, 10_000).asFlow().count().

To understand this solution you need to understand how observables manage the situation of fast producer and slow consumer. Rx observable does not have any kind of explicit asynchronous backpressure control, but it has an implicit "backpressure control" via blocking. A slow observable subscriber takes a long time in its onNext calls and thus does not let observable to emit all its events "immediately". That is why Obserable.range(1, 10_000).subscribe { slowComputation(it) } works and does not lose any emitted items. The producing observable is automatically throttled by a slow subscriber due to the synchronous nature of the observable pipeline.

You should leverage this mechanism by replacing fun onNext(t: T) { offer(t) } in your code with fun onNext(t: T) { sendBlocking(t) }. This makes the tests I've given to work out-of-the box without any surprises and will ensure that, by default, no messages are lost when you integrate any kind of event-emitting observables with a flow.

Moreover, since callbackFlow is buffered by default, it will automatically provide a cushion for cases when a small number of events were slow to consume. The source observable will get blocked only in cases when multiple events are emitted in a row and the consumer is consistently slow. If that is not what users desire, they can always solve it with additional operators like Flow.conflate() or Flow.buffer(UNLIMITED) that would ensure never-blocking behavior (at the cost of conflating back-to-back events or a potential OOM due to unlimited buffering).

P.S. Please add the corresponding tests for no loss of events.

@mareklangiewicz
Copy link
Contributor Author

I added another proposal (in last commit above) for opt-in dropping behavior.
@elizarov I'm happy to revert it if it's too convoluted for users.
Rationale:
The sendBlocking solution is great (correct and never drops events),
but there is at least one use case when we need different behavior:
When our Observable represents UI events like clicks, touches, swipe-to-refresh, etc, we usually don't want to buffer any of these actions (we don't want delayed execution) and don't want to block UI thread either. We'd like to say something like:

val clickS = refreshButton.clicks().asFlow(dropWhenFull = true).buffer(0)
// ...
clickS.collect { doSomeSuspendingIORefresh() }

I don't see any other simple solution for users (like adding operators on Rx side or Flow side) that would cause this behavior (no blocking and no buffering).
The Flow.conflate() does not help because it still buffers one item and it will trigger another doSomeSuspendingIORefresh() delayed if user clicks on refreshButton during refreshing.

One another solution I can think of is to implement another type of Channel, similar to RENDEZVOUS but it would always return true from offer ("pretending" to accept the item) and it would just drop items when no receivers are waiting.
But I'm sure this solution would be much worse than the first one.

@mareklangiewicz
Copy link
Contributor Author

btw. it's fun exercise to analyze interactions between 3 different "communication protocols" (RxObservables/Channels/Flows) in such a small piece of code :-)

@elizarov
Copy link
Contributor

Unfortunately asFlow(dropWhenFull = true).buffer(0) would not reliably work. The reason is that buffer(0) uses a rendezvous channel which can only send/receive when both sides are ready. When you do offer with a rendezvous channel it might be losing events for totally trivial reasons. For example, the receiving side had not started yet, so might see lost messages just randomly on application startup.

Moreover, if we have this problem of UI events, then we'll be having this problem not only in Observable.asFlow. What if events are streamed directly to a flow? So, this problem should be solved with a separate flow operator. It is not right to provide it only as a part of Observable.asFlow.

Now, let's discuss the use-case in a bit more detail:

When our Observable represents UI events like clicks, touches, swipe-to-refresh, etc, we usually don't want to buffer any of these actions (we don't want delayed execution) and don't want to block UI thread either.

It looks to me that we already have a solution for it -- Observable.asFlow().conflate() : no blocking, no buffering. Only the most recent event is intermittently held until the processing of the previous one finishes, which makes it quite reliable and impervious to small hickups. What do you think?

@mareklangiewicz
Copy link
Contributor Author

mareklangiewicz commented Jan 26, 2020

OK, so I removed the dropWhenFull parameter.
I don't think the example with losing messages at application startup is right (when receiving side had not started yet) - because the whole chain should be cold, so it is the receiving side that start the whole machinery.
But I agree the .conflate() is good solution for almost all cases, and if it's not enough then we should have new operator (instead of dropWhenFull flag for Observable conversion only).
Btw. users can always use the callbackFlow directly if they have some special needs, so maybe it's not worth it to add too many operators for uncommon cases to kotlinx.coroutines itself.

Copy link
Contributor

@elizarov elizarov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, please squash this PR to a single commit and rebase it onto the latest develop branch.

reactive/kotlinx-coroutines-rx2/src/RxConvert.kt Outdated Show resolved Hide resolved
atomicfu on disposable is probably needed
to make it 100% correct
@mareklangiewicz mareklangiewicz changed the title Rx Observable as Flow WIP: Rx Observable as Flow Jan 30, 2020
@mareklangiewicz
Copy link
Contributor Author

I'm sorry for adding more complexity to this PR, but more I analyze potential solutions, less confident I become :-)
Generally I try to avoid going low-level Java concurrency, and try to leverage Kotlin utils instead.

I added a few propositions in last commit:
asFlow_AtomicFu attempt to use AtomicRef
asFlow_ReentrantLock just adds locking to solution from previous commit
asFlow_JobIsActive attempt to leverage Job and Channel thread-safety
asFlow_Mutex use Mutex to wait for source onSubscribe call
asFlow_Deferreduse CompletableDeferred to wait for disposable from onSubscribe call.

I like the last version because it is more sequential/simple and high-level (relying on Deferred).
There are more details in comments in the code.

(I will cleanup, squash and rebase when some solution is accepted)

@elizarov
Copy link
Contributor

All lock-based solutions and all the solutions that wait (via any means) are potentially prone to deadlocks and should not be used. Here we have a good case where you do not need to invent anything new because a sufficiently good solution already exists and is proven to work inside Rx itself. Just use java.util.concurrent.AtomicReference to write a wait-free implementation.

P.S. You cannot use atomicfu directly, because it is not supported in lamndas, but only in classes. Of course, you could have defined a class to contain your atomic field, but there's not point in doing that. Since you are writing a JVM-only code you can directly use JVM's AtomicReference class and just implement the same logic as Rx is using.

@elizarov
Copy link
Contributor

I don't think you can actually use DisposableHelper since it is a class that is internal to Rx and can change at any moment without warning even in a minor update. You should copy to corresponding logic.

@mareklangiewicz mareklangiewicz changed the title WIP: Rx Observable as Flow [DRAFT] Rx Observable as Flow Feb 23, 2020
@mareklangiewicz
Copy link
Contributor Author

Cleaned up version PR:
#1826

@elizarov
Copy link
Contributor

elizarov commented Mar 4, 2020

I'm closing this one, as it is superseded by #1826

@elizarov elizarov closed this Mar 4, 2020
qwwdfsad pushed a commit that referenced this pull request Mar 4, 2020
qwwdfsad pushed a commit that referenced this pull request Mar 4, 2020
qwwdfsad added a commit that referenced this pull request Mar 4, 2020
qwwdfsad pushed a commit that referenced this pull request Mar 4, 2020
qwwdfsad pushed a commit that referenced this pull request Mar 4, 2020
@mareklangiewicz mareklangiewicz deleted the rx-observable-as-flow branch March 8, 2020 11:59
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants