Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

combine(Iterable<Flow>) is very slow #2296

Closed
fluidsonic opened this issue Oct 11, 2020 · 12 comments
Closed

combine(Iterable<Flow>) is very slow #2296

fluidsonic opened this issue Oct 11, 2020 · 12 comments
Assignees

Comments

@fluidsonic
Copy link

fluidsonic commented Oct 11, 2020

The combine(…) function becomes very slow the depending on the amount of Flows.

Flat-mapping a 40k List<List<…>> takes approx. 11ms. With one element per list.
Flat-mapping a 40k List<Flow<…>> takes approx. 1 minute. With one element per Flow.
And that's with a very very simple non-suspending test case.

If combine is expected to have such a low performance it should be documented, otherwise developers like me treat is as a simple Flow-version of List<List<…>>.flatten().
If it's not expected then there is a bug.

My use case

I have ~40k objects in memory. For each object there is a Flow and a coroutine that periodically refreshes the object from the server when it's expired and emits that. At some point I have to continuously merge the latest version of all 40k objects into a single list for further processing. For that I use combine(listOfObjectFlows) { it.toList() }.

Unfortunately that takes somewhere north of 15 minutes already. I have no time to let it finish to see the total…

I've written my own implementation now as a workaround.

Test code

import kotlin.time.*
import kotlinx.coroutines.flow.*

@OptIn(ExperimentalTime::class)
suspend fun main() {
    val flowList = (1..40_000).map { flowOf(it) }
    val start = TimeSource.Monotonic.markNow()
    val listFlow = combine(flowList) { it.toList() }

    listFlow.collect {
        println("Took: ${start.elapsedNow()}") // Took: 66.5s
    }
}
@qwwdfsad qwwdfsad self-assigned this Oct 12, 2020
qwwdfsad added a commit that referenced this issue Oct 12, 2020
@qwwdfsad
Copy link
Member

qwwdfsad commented Oct 12, 2020

Thanks for the report!
We indeed have some room for improvement, e.g. by simple tweaking I've achieved ±40% of throughput and here are opportunities for even bigger gains (SPSC channels, select replacements etc.).

If combine is expected to have such a low performance it should be documented, otherwise developers like me treat is as a simple Flow-version of List<List<…>>.flatten().

Could you please elaborate on what exactly would you like to see in the documentation?
Any operation over the stream of flows is, by design, slower than its collection-based/non-suspending/sequential counterpart, because all the flows should be collected asynchronously (-> at least one coroutine/channel per flow) and we do not know whether an arbitrary instance of Flow is suspending or not.
The factor of the slowdown is operator, usage, GC and JVM specific, so it's impossible to pinpoint some real numbers as well.

I've written my own implementation now as a workaround.

It's okay if it suits you, but please beware that it doesn't comply our combine operator. For example, the following snippet:

val flow = flowOf("a", "b", "c")
val flow2 = flowOf("1", "2", "3")
println(flow.combine(flow2) { i, j -> i + j }.toList())
println(listOf(flow, flow2).yourCombine { list -> list[0] + list[1] }.toList())

will print

[a1, b1, b2, c2, c3]
[c1, c2, c3]

which probably is not expected.

@fluidsonic
Copy link
Author

fluidsonic commented Oct 12, 2020

@qwwdfsad true. I'm aware that Flows are slower than Lists in general. And I think and hope that most developers are. These 40k objects of mine cause several coroutines each so I probably have several 100k of coroutines. Yet all operations finish in a matter of second to milliseconds.

In this specific case I was looking for a Flow-version of List<List<…>>.flatten(). While for 40k elements it's quite fast with List it's somewhere in the realm of 30+ minutes with combine (each Flow had at exactly one value emitted). That's way more than just asynchronous overhead. And that leads to your next point.

My implementation does indeed yield different results. That is in part because the current documentation is not clear on how exactly combine combines the two Flows.

Returns a Flow whose values are generated with transform function by combining the most recently emitted values by each flow.

My implementation also combines the most recently emitted values of each flow. It just happens to ignore intermediary values where newer ones are available. That's okay for me because I'm only interested in the most recent combination and not in intermediary results.

The fact that combine has to do some heavy lifting in order to account for all intermediary value is something I'd also mention in the docs. Being somewhat slower because asynchronous is very different from an O(something high) operation because it has to fulfill some special requirements. At least that wasn't clear for me.

In addition to improving the documentation maybe it makes sense to add a version like mine as an alternative operator. One that is super fast as it explicitly only cares about the most recent state and skips intermediate values if there are newer ones available. It can probably be optimized even further that my simple implementation.

When I was looking for a solution to turn my List<Flow<…>> into a Flow<List<…>> I was scanning the documentation of all operators up and down multiple times for a suitable operator. Combine seemed like the only fit. With two options I would look closely to the difference between them.

@fvasco
Copy link
Contributor

fvasco commented Oct 12, 2020

Hi, @qwwdfsad,
please consider that the output of

import kotlinx.coroutines.flow.*

suspend fun main() {
    val flow = flowOf("a", "b", "c")
    val flow2 = flowOf("1", "2", "3")
    println(flow.combine(flow2) { i, j -> i + j }.toList())
}

on my AMD A8-3870 can be

[a3, b3, c3]
[c1, c2, c3]
[a1, a2, a3, b3, c3]
[a1, b1, c1, c2, c3]

@hrach
Copy link
Contributor

hrach commented Oct 12, 2020

@qwwdfsad was writing exactly the same as @fvasco - same applies for online playground: https://pl.kotl.in/3qubtJaOh

@qwwdfsad
Copy link
Member

Yes, with suspend fun main there is no dispatcher, so multithreaded Dispatchers.Default is used and adds non-determinism.
I was implying deterministic case when combine is called from within a runBlocking or Dispatchers.Main

@fvasco
Copy link
Contributor

fvasco commented Oct 12, 2020

So [c1, c2, c3] is an expected result, all suspending function should be Dispatcher-agnostic.

@qwwdfsad
Copy link
Member

The expected result should be [a1, b1, b2, c2, c3] in a single-threaded environment.
In fact, it tries to somehow imitate "fairness", giving each flow an opportunity to emit its values even in the single-thread dispatcher. For multithreaded dispatchers, it's obviously not the case and is timing-dependent.

Regarding the proposed operator, we cannot accept it for multiple reasons:

  1. If it's named combine, then suddenly various overloads will behave differently for the same flows.
    It's not the best idea to have f1.combine(f2) and listOf(f1, f2).combine producing different results in the same environment.
  2. If it's named differently, then we end up in a situation where we have "very similar but slightly different operators and it's not clear which one to chose".

The only resolution for this issue is to improve the performance of combine operator without changing its behaviour or semantics

@fluidsonic
Copy link
Author

fluidsonic commented Oct 13, 2020

Great, that explanation should be part of the documentation 👍

I guess it's not possible to optimize the multi-threaded case by not making it fair but fast instead?

Regarding my problem I don't see how optimizing combine would help here anyway.

  • Even with a 40% gain the timing would still be a different ballpark than what I expect (minutes instead of seconds).
  • A lot of unnecessary intermediary combinations would be created that I never actually need. I'd drop many of them immediately afterwards, potentially using debounce() or expensive operators like distinctUntilChanged() or stateIn() (due to calling equals for large data sets).

Regarding a new operator:

  1. I've only named it combine because it was unclear what the current implementation does. It clearly doesn't make sense to use that name if it does something different.
  2. Regarding a different operator name there are two properties to the kind of operator that I propose:
    • It combines multiple flows into one using a transform.
    • It only cares about the latest combination and not about intermediary combinations.

So what I need would perfectly describe a combineLatest operator. It combines just like combine but doesn't need to be fair because it's only interested in the latest value.

There's deprecation in place for combineLatest which maps to combine. I don't know the history of that operator or how it was defined, but at least given its name it would fit.

@fvasco
Copy link
Contributor

fvasco commented Oct 13, 2020

Hi @fluidsonic,
is f1.combineLatest(f2) equal to f1.conflate().combine(f2.conflate())?

qwwdfsad added a commit that referenced this issue Oct 13, 2020
@fluidsonic
Copy link
Author

fluidsonic commented Oct 13, 2020

I had to change my implementation to use collectLatest instead of collect to avoid more intermediary values.

@fvasco I ran a quick test with 5k Flows of 2 values each. All Flows are static, i.e. flowOf(Int, Int). No excessive operations.
conflate() is somewhat beneficial only if there are a high number of Flows.

Using Dispatchers.Default:

248ms - flows.combineLatest() { it }                          // number of emitted values is low but varies
3.67s - combine(flows.map { it.conflate() }) { it.toList() }  // number of emitted values is low but varies
2.38s - combine(flows) { it.toList() }                        // number of emitted values is low but varies

Using newFixedThreadPoolContext(1, "test"):

362ms - flows.combineLatest()                                 // emits single value
11.0s - combine(flows.map { it.conflate() }) { it.toList() }  // emits all intermediary values
10.9s - combine(flows) { it.toList() }                        // emits all intermediary values

Just for fun - Dispatchers.Default with 50k Flows (my app has about 40k):

886ms - flows.combineLatest()
283s  - combine(flows.map { it.conflate() }) { it.toList() }  // oh my
648s  - combine(flows) { it.toList() }                        // oh my

qwwdfsad added a commit that referenced this issue Oct 15, 2020
    * Get rid of two code paths
    * Get rid of accidental O(N^2) where N is the number of flows
    * Get rid of select that hits performance hard

Fixes #2296
@qwwdfsad
Copy link
Member

Even with a 40% gain the timing would still be a different ballpark than what I expect (minutes instead of seconds).

The problem was pretty simple -- accidental and well-hidden O(N^2) asymptotic where N is the number of flows.
I've fixed it, it will be available in 1.4.0 and will be even faster (by a significant margin) than proposed implementation.
In general, for a large number of flows, combine became faster by orders of magnitude and for two flows ("basic case") it became precisely two times faster.

Thanks for pointing it out!

@fluidsonic
Copy link
Author

Awesome and thanks @qwwdfsad. I'll check it out.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants