Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel flow processing #1147

Open
altavir opened this issue Apr 26, 2019 · 50 comments
Open

Parallel flow processing #1147

altavir opened this issue Apr 26, 2019 · 50 comments

Comments

@altavir
Copy link

altavir commented Apr 26, 2019

It makes sense that Flow and all map operations are sequential by default, but it makes sense to allow some heavy operations to be performed in parallel on explicitly provided dispatcher.

I've implemented the feature based on existing flowOn implementation:

@FlowPreview
fun <T, R> Flow<T>.mapParallel(scope: CoroutineScope, bufferSize: Int = 16, transform: suspend (T) -> R) =
    flow {
        val currentContext = coroutineContext.minusKey(Job) // Jobs are ignored

        coroutineScope {
            val channel = produce(currentContext, capacity = bufferSize) {
                collect { value ->
                    send(scope.async { transform(value) })
                }
            }

            (channel as Job).invokeOnCompletion { if (it is CancellationException && it.cause == null) cancel() }
            for (element in channel) {
                emit(element.await())
            }
//
//            val producer = channel as Job
//            if (producer.isCancelled) {
//                producer.join()
//                throw producer.getCancellationException()
//            }
        }
    }

It seems to be working fine, but I can't use internal kotlinx,coroutines methods. I think that this use-case require a separate function in the library.

@elizarov
Copy link
Contributor

elizarov commented Apr 26, 2019

We do plan a set of separate functions for that and to close #172 with them. However, the key challenge here is to introduce a composable set of concurrency operators as opposed to specific operators like concurrentMap, which is hard to scale to other operators (concurrentFilter, concurrentFold, etc). Tentatively, it is going to look like this:

flow.concurrent(n) { 
    // inside here we can have a sequence of flow operations,
    // each doing its own thing concurrently
    // this: Flow<T> here similarly to `flowWith` operator
    filter { ... }.map { ... }.filter { ... } // -> Flow<R>
} // -> Flow<Flow<R>> 

Alternatively you can call concurrent without a lambda. This way you get Flow<Flow<T>> which you can map manually (so concurrent with lambda is just integrated with map):

// different way to write the same code as above
flow.concurrent(n).map { 
    // it: Flow<T> here
    it.filter { ... }.map { ... }.filter { ... } // -> Flow<R>
} // -> Flow<Flow<R>> 

Note, that the result is going to be Flow<Flow<R>>. Right now the only useful thing you can do on it is flattenMerge() to get Flow<R>. But what if you want to reduce/fold the result somehow -- that will require a sequence of invocations and will not parallelize properly. You can repeat reduction inside and outside, but that does not look pretty. A classical concurrent map-reduce pipeline would be written, for example, like this:

flow.concurrent(n) { // this: Flow<Input>
    map(transform) // -> Flow<Mapped>
   .reduce(reducer) // -> R
} // -> Flow<R>
.reduce(reducer) // we need to reduce again to get -> R 

So that part of design for concurrent terminal operators is still TBD with map-reduce being the primary use-case.

The similar design problem we have with terminal concurrent collect:

flow.concurrent(n) {
    collect { ... } // collect in `n` coroutines concurrently 
}.collect {} // but must write an empty collect here to activate it all.

@altavir
Copy link
Author

altavir commented Apr 26, 2019

I see the idea, but parallel is not necessary concurrent. On the contrary, in my example, there is no concurrency. The key feature needed for parallel map is an ability to provide a place-holder ahead of time so when you actually call this place-holder, you trigger calculation, do not wait for result and move further. Now I understand that you use inner Flow in your examples as a placeholder. In my example it is eager Deferred.

Maybe we can modify your structure by replacing inner Flow by lazy Deferred? it would remove a lot of confusion and would allow more fine grained control of results. Then parallelMap will transform Flow<T> into Flow<Deferred<R>> and we can add extensions for that type or something like that. I will try to think about it later.

As for initial example, I work a lot with parallel data processing and map/reduce workflows. And basically the only operation that could be parallelized is map (safe for group-based reduce which could be represented as parallel map + non-parallel reduce). So it covers most of the use-cases.

@elizarov
Copy link
Contributor

@altavir Actually, it is the converse with the respect to the traditional distinction between concurrency and parallelism (it is quite a recent distinction, < 20 years old, but quite established by now).

Concurrent is not necessarily parallel. A mapParallel operator is concurrent, because different calls to transform happen concurrently with each other. Which means, for example, that if you update any kind of shared mutable state from inside of transform you'd run into a data race.

However, even if you take concurrent operator, it does not necessarily mean that it is going to actually run different calls to transform in parallel to each other. For example, if you run it on JS you'll get no parallelism whatsoever (since there is only a single thread), yet you can still have a lot of concurrency even on JS.

On JVM you can run your version mapParallel in a single-threaded dispatcher and while it provides concurrency between different calls to transform, there will no parallelism in a single-threaded dispatcher. You can get parallelism on JVM, though, by plugging a multi-threaded dispatcher into the context.

@altavir
Copy link
Author

altavir commented Apr 27, 2019

I won't argue with you about the terminology, especially remembering that significant fraction of my knowledge of it comes from your articles. Still, it does not change the fact that all we need for parallel processing is a way to start multiple computations ahead of time. I am trying to implement it via transforming a Flow into Flow of Deferred and batch starting those Deferred on collect. The code looks good from API point of view, but for some reason does not work right now. This function blocks on channel production stage and I am trying to figure out why.

@altavir
Copy link
Author

altavir commented Apr 28, 2019

The problem was in the conflict of lazy deferred with structured concurrency. I've fixed it by replacing deferred by my imitation. The working code is available here.

In theory, it should be possible to make concurrent collect without calling async in advance. One just have to create a number of placeholders (for example CompletableDeferred) and then fill and pop them to collector as soon as they come. I will think about it a little more. Maybe channel + switch or one channel for placeholders and the second for results.

@elizarov
Copy link
Contributor

Let's hold off a discussion on implementation strategy a bit. There are lots of ways to implement it. What are you trying to achieve in the end? You've started this thread with a proposal for mapParallel, but what is your end goal? What are you trying to build?

@altavir
Copy link
Author

altavir commented Apr 29, 2019

The particular case is a streaming mathematical processor with ability to parallel computations. In the version before the Flow I had Producers - generators, which could lazily create elements of some type (suspend receive) and Consumers working similar to actors (with suspend send). And I had Processors which implemented both interfaces. Also I had a mechanism to lock a Consumer on Producer forcing this specific Producer to send all its elements to Consumer (and no one else).

The typical application of such mechanism is the analysis of time series, when you have a very long block of numbers and you want to continuously run it through some chain of mathematical operations (like moving window, Fourier transform etc) and drop result somewhere without loading the whole block in the memory. Some of those operations could be parallelized. For example, one could split the incoming data into fixed chunks and then apply FFT to each chunk in parallel. Basically it is the same old map-reduce technology. And very similar to Java Stream processing (I would use Java Streams, but I want to implement it on MPP). Flow seems to be good replacement for my initial construct since next Flow always consumes previous one and the Flow does not start generating until final consumer is called. The internal sequential processing also probably allows to avoid context switching performance losses.

@qwwdfsad
Copy link
Member

Actually, concurrentMap can be expressed via flatMapMerge without using any private API:

private fun <T, R> Flow<T>.concurrentMap(dispatcher: CoroutineDispatcher, concurrencyLevel: Int, bufferSize: Int, transform: suspend (T) -> R): Flow<R> {
    return flatMapMerge(concurrencyLevel, bufferSize) { value ->
        flow { emit(transform(value)) }
    }.flowOn(dispatcher)
}

@salamanders
Copy link

I think I'm debating the same need over on https://discuss.kotlinlang.org/t/must-go-fast-sequences-flows-bounded-channels/12494 - but with less knowledge of the various options than this thread, so I'm going to sit quietly and watch here instead. :)

Why I think my need is the same use case:

  1. Have a source. (In my case a video file that produces BufferedImages). The source is fast enough to not be the bottleneck if it is given a dedicated thread.
  2. Need to do a lot of math, and the math can be done in parallel. (distilling chunks of images down to a single frame)
  3. But too many in parallel "in flight" is bad (doesn't make sense to be grinding on more than I have cores, and I could run out of memory)
  4. The destination (encoding to a MP4) could slow things down, so again, don't have too many in-flight. (which brings up back-pressure)
  5. Even if we do some work in parallel, the order matters.
  6. Desire to have more steps as it gets more interesting.

And I have too many options and not enough knowledge if there is one that is "use this until we make the Kotlin canonical lib"

  • Java Streams
  • Kotlin Sequences
  • Kotlin Flows
  • Java Capacity-Bounded Channels (that block)
  • Kotlin Capacity-Bounded Channels (that suspend)
  • (any of the above) containing Deferred

@altavir
Copy link
Author

altavir commented Apr 29, 2019

@qwwdfsad Seems to be working. I looked into the source and it does the trick with two channels I was intending to do. I've changed signature a bit: fun <T, R> Flow<T>.map(concurrencyLevel: Int, dispatcher: CoroutineDispatcher = Dispatchers.Default, bufferSize: Int = concurrencyLevel, transform: suspend (T) -> R . Now it could be called instead of regular map replacing map by map(4).

I think it mostly solves my case. Few additional comments:

  • I think that async/collect syntax in case we need multiple subsequent heavy operations.
  • The bufferSize purpose is not clear from documentation. Back-pressure should start to work when the process is slow and limit the speed of incoming messages. In this case the speed is limited by suspension when incoming channel is full. This parameter does not seem to do anything.
  • I have not tested everything yet, but it would be nice to have subsequent operations to be run on the same thread to minimize context switching. My solution does not seem to work this way.

@sdeleuze
Copy link
Contributor

On Spring side, the use case is pretty common: you want to chain 2 remote webservices, the first will return a list of IDs and the second will allow to get the details associated with each IDs. For that you are typically using flatMap in Rx world and you will naturally convert it to map with Flow following the documentation which is IMO misleading:

Note that even though this operator looks very familiar, we discourage its usage in a regular application-specific flows. Most likely, suspending operation in map operator will be sufficient and linear transformations are much easier to reason about.

So developers will naturally use a sequential map thinking it handles naturally parallel processing (concurrency behavior is not specified in map operator documentation) since it supports suspending functions, but it will process each element sequentially which makes little sense to me.

I agree with @gildor when he says map is sequential in Rx (or Sequence) because it doesn't support async operation there by definition. Since map in Flow support suspending functions, it is critical to be explicit about concurrency in its documentation, and to provide parallel map capabilities that does not force developers to use single Flow element + flatMap*.

I would personally be in favor to support parallel processing by default in map because the suspending function parameter is a clear signal that may happen to me, and I see much more use cases for this behavior than sequential processing with suspending functions where use cases are unclear to me. @elizarov said he is not, point taken. But please at least update the documentation of map, flatMap* and provide a parallel map operator.

@elizarov
Copy link
Contributor

elizarov commented Apr 30, 2019

I'm all for adding concurrent APIs, too, but in an explicit way. There's a lot to design, to think about, though. I've already presented one way to provide explicit concurrent API in this thread. The other would be something like this:

flow.concurrent().map { ... }.merge() 

This is both explicit about concurrency and about what happens with all those concurrently mapped things -- they are all merged at the end (the other possible termination is to reduce all those concurrent flows, which nicely gives you the classical concurrent map/reduce paradigm in the same API).

Actually, I have one really radical idea and this thread is good place to share it. I'm thinking that we should deprecate flatMapMerge because it is implicitly concurrent. The whole concurrency parameter to flatMapMerge is a design smell. The replacement for flatMapMerge should be:

flow.concurrent().flatMap { ... }.merge()
//             ^ optional concurrency limit goes here if needed

At the same time flatMapConcat should be deprecated, too. A simple flow.flatMap should perform a sequential flatMapConcat just like it happens on a plain sequential sequence.

@sdeleuze
Copy link
Contributor

sdeleuze commented Apr 30, 2019

I like the simplification that leads to more consistency with only map and flatMap while providing both sequential and concurrent behavior, but I am puzzled by the requirement to add 2 additional steps for operations as simple as map or flatmap. What types would concurrent(), map { } and flatMap { } return?

@altavir
Copy link
Author

altavir commented Apr 30, 2019

I like the idea (the radical one) in general. The flatMapMerge is really vague. It is hard to understand what it does and how its parameters affect the result. The variant with flow.concurrent().map { ... }.merge() is what I ended up doing, I just alternative map(dispatcher) and collect instead of concurrent().map{} and collect(concurrency) instead of merge().collect()` (I think I like your variant better).

The question is how the intermediate flow looks from the type system point of view. I do not like Flow<Flow>> construct because it is hard to understand what it does. I also do not like using flatMap inside concurrent block instead of map because flatMap assumes that we actually have flow of flows which needs to be flattened, not a list of single element flows which are used just because we want to represent lazy values by them.

@altavir
Copy link
Author

altavir commented Apr 30, 2019

@sdeleuze You need to convert your flow to "promises" at some point and then another point to actualize those promises. In between those points you have concurrency. So you always have "in concurrency" and "out of concurrency" gates. The question is how you will describe what happens between them and will you allow to open one gate, pass the flow somewhere and close it there.

@elizarov
Copy link
Contributor

W.r.t. to types, I think that concurrent() shall return a special ConcurrentFlows<T> type which is not a flow (because flows are sequential). Only a limited set of operators will be defined for it, ones that make sense for a concurrent setting (filter, map, flatMap among them). So, to get back to the sequential Flow you'll have to terminate it with merge, reduce of something like this.

It is somewhat conceptually similar to how collection.groupingBy { ... } in stdlib works (see here). It returns Grouping type that is just an intermediate "DSL type" that should be converted back to the collection type.

@fvasco
Copy link
Contributor

fvasco commented Apr 30, 2019

A "concurrent block" can avoid to define a dedicated type, ie:

fun <T, R> Flow<T>.concurrent(concurrency: Int = 16, block: CoroutineScope.(Flow<T>) -> R): Flow<R>

or something similar.

Moreover it provides a not concurrent sandbox to use a not pure functions.

@sdeleuze
Copy link
Contributor

I understand why you are tempted by this design, but it would be pretty heavy to use and I am quite puzzled by the need to use 3 operators to do something as simple as a concurrent map where I see much more use cases for operation with latency like network ones than for sequential one. The original design was beautiful because simple, that looks like less clean to me.

Why not keeping simple map and flatMap with concurrency: Int = 1 by default, using the sequential/optimized implementation when concurrency == 1 and make it clear in the documentation that this optional parameter should be customized if concurrency is needed?

@elizarov
Copy link
Contributor

elizarov commented Apr 30, 2019

There are several design rules to be taken into account before we make a decision:

  • We want all kinds of concurrency to be as explicit as possible. Sequential code is way easier to understand and the principle of least surprise pushes us in the direction of consistently following the rule that "flow is sequential" .
  • We want to provide the library that gives the "building blocks" that end-users will combine in various ways to solve the problems they face. This makes us prefer general-purpose solutions to specialized ones. Kotlin is not Java. We don't have to provide all those operators RxJava has. If somebody uses concurrent().flatMap { ... }.merge() so often that they find it tedious to write, they can easily define their own extension with a short name that does it.
  • Most (99.9..%) of our users don't care about performance nor concurrency. The data sets they work with are simply too small to care. We don't want to encumber them with anything that is related to concurrency, so API surface dealing with concurrency should be separate from the general-purpose sequential APIs that most people need.

Having spelled this out, having an optionalconcurrency parameter with a default of 1 for some operators is an option we'll look at. Here is one particular use-case I have in mind. Assume that someone wants to do a concurrent map while preserving original order (I think @altavir had this use-case in mind). It is not what concurrent().map { ... }.merge() would be doing, since that operator chain reorders original flow just like flatMapMerge is doing now (merge emits a value as soon as it is computed). Writing this order-preserving map operator yourself is non-trivial and it might deserve an option parameter to map.

@altavir
Copy link
Author

altavir commented Apr 30, 2019

I think that concurrent() shall return a special ConcurrentFlows<T>

My thought exactly. This way it could even inherit Flow and be collected in a sequential way, though, I am not sure it is a good idea, because it could mix behaviors.

@altavir
Copy link
Author

altavir commented Apr 30, 2019

It is not what concurrent().map { ... }.merge() would be doing, since that operator chain reorders original flow just like flatMapMerge is doing now

I do not think it will be a problem. flatMapMerge could change the order only in case you have more than one element in merged flows. For a simple map, you will always have exactly one element in each of your flow, or any other lazy placeholders, so the order will be preserved (I've implemented both concurrent/merge and parallelMap solutions and got correct order in both cases). You have to iterate a channel somewhere inside in order to reduce parallel processing back to sequential one, and this channel basically guarantees correct order. Of course I am thinking about my flowOn-based implementation. It is probably possible to do something different which will mix the order.

The order, of course is important.

@qwwdfsad
Copy link
Member

qwwdfsad commented Apr 30, 2019

@altavir indeed, the buffer size parameter is useless when you are emitting flow with a single element, I wanted to show the general idea.
You can tune it in the way you like, e.g. you can make the target flow grouped/chunked under the hood first (Flow<T> -> Flow<List<T>>). In that case, bufferSize parameter makes sense.
For example, you can do the following (not tested):

fun <T, R> Flow<T>.concurrent(dispatcher: CoroutineDispatcher, concurrencyLevel: Int, bufferSize: Int, concurrentBuilder: (Flow<T>) -> Flow<R>): Flow<R> {
    val chunked: Flow<List<T>> =  chunked(size = 42) // Implementation of chunked is left as an exercise for the reader
    return chunked.flatMapMerge(concurrencyLevel, bufferSize) { value ->
        val batch = flow {
            chunked.collect { chunk -> chunk.forEach { emit(it) } }
        }
        concurrentBuilder(batch)
    }.flowOn(dispatcher)
}

Now you can use it in the following way:

f.concurrent(Dispatchers.Default, 16, 64) { batch ->
    batch
        .filter { /* heavy lifting */ } // Will be executed concurrently
        .map { /* heavy lifting */ } // Will be executed concurrently, no thread switching after filter
}.map { /* do regular stuff */ } // Will be executed sequentially for the whole flow

Buffer size controls backpressure of the consumer and concurrency controls the amount of in-flight batches. Note that even if we completely remove flatMapMerge, vague parameters will just move to the concurrent builder.

@qwwdfsad
Copy link
Member

qwwdfsad commented Apr 30, 2019

@sdeleuze

On Spring side, the use case is pretty common: you want to chain 2 remote webservices, the first will return a list of IDs and the second will allow to get the details associated with each IDs. For that you are typically using flatMap in Rx world and you will naturally convert it to map with Flow following the documentation which is IMO misleading:
...since it supports suspending functions, but it will process each element sequentially which makes little sense to me.

Could you please elaborate a bit?
For example, consider you have fun ids(): Flow<Long> and fun details(id: Long): Flow<Details>.

What we (may) expect a user to write:

ids().map { id -> details(id).single() }  // (1) Or even details should be suspend fun in the first place

What one may want to write in RxJava

ids().flatMap { id -> details(id) } // (2)
// or
ids().concatMap { id -> details(id) } // (3)

(1) and (3) are basically the same. The (2) option depends on the concurrency parameter (or its default value) that controls how many actual details request can be executed concurrently.

If I understood you correctly, you want (2) to be the default option in this scenario.
The question is why it should be the default? I see at least multiple downsides of that:

  • It introduces an implicit concurrency that leads unprepared users to the land of various concurrency bugs
  • Concurrent map may be faster for the one isolated query, but for the system under the load it will actually slow it down (similarly if you will append parallel() to every j.l.Stream operation)
  • With a default map(concurrency) we will end in a situation "where is my anotherOperator(concurrency)"

The follow-up question is how can we improve map documentation to be clear about its intentions? What exactly did you fin senseless? We definitely don't mind some help here as documenting the core ideas you were designing/discussing/developing for a while is a bit biased :)

@sdeleuze
Copy link
Contributor

sdeleuze commented May 1, 2019

@elizarov Please find bellow my feedback

We want all kinds of concurrency to be as explicit as possible. Sequential code is way easier to understand and the principle of least surprise pushes us in the direction of consistently following the rule that "flow is sequential" .

Kotlin coroutines are designed on that principle, and I agree that's a good idea to avoid too much surprise here, so my initial proposal to make map concurrent by default is likely not compatible with that design principle. What I try to express here is that the declarative nature of Flow API can provide an elegant way to achieve concurrency (explicitly), and it would be a missed opportunity to make it a 2nd class citizen.

We want to provide the library that gives the "building blocks" that end-users will combine in various ways to solve the problems they face. This makes us prefer general-purpose solutions to specialized ones. Kotlin is not Java. We don't have to provide all those operators RxJava has. If somebody uses concurrent().flatMap { ... }.merge() so often that they find it tedious to write, they can easily define their own extension with a short name that does it.

Be sure I try to provide feedback with Kotlin mindset in mind. Forcing developers to use 2 additional operators that will most of the time just surround a single one just to enable concurrency is IMO nor elegant or pragmatic. Of course anybody can define custom operators, but here I think we are talking about one of the most common use cases.

Most (99.9..%) of our users don't care about performance nor concurrency.

Based on my experience of working on server-side reactive stuff for a few years, I strongly disagree with that statement about concurrency when applied to Flow use cases. And I really think this has nothing to do with Spring or Java.

The data sets they work with are simply too small to care. We don't want to encumber them with anything that is related to concurrency, so API surface dealing with concurrency should be separate from the general-purpose sequential APIs that most people need.

The need for concurrency does not come from data set size, but from latency. I am going to try to elaborate more as asked by @qwwdfsad.

A company is developing micro-services (with Spring or Ktor) that needs to request external slow REST webservices that are sadly not cachable. Locally I only have a Flow<Long> of let's say 10 elements that I get via fun ids(): Flow<Long>. To get these details I have typically call suspend fun awaitDetails(id: Long): Details. Latency of the remote webservice is 1s (not uncommon in real life).

The first thing that developers will do that after reading that Flow awesome map operator supports suspending functions is ids().map { awaitDetails(it) }. Coroutines are cheap, flatMap* documentation push to use map, and map documentation says nothing about concurrency. It is not silly to think hat most people will think that this will execute in 1 sec, sadly it will take 10 exactly like if I would have used a blocking stack where creating new threads is avoided because costly.

After bad feedback saying that the application is slow, the developer updates its code and find that she/he has to wrap the suspending function artificially in a Flow which is pretty ugly:
ids().flatMapMerge { flow { awaitDetails(it) } }.

Let's see what are the options to improve this for coroutines 1.3.

  • ids().concurrent().map { awaitDetails(it) }.merge() is very verbose for such common need
  • ids().mapParallel { awaitDetails(it) } is better but is adding another operator that will not solve the flatMap* design issues
  • ids().map(concurrency = 4) { awaitDetails(it) } seems so far the best option to me, it is elegant, pragmatic, leverages optional named parameter with default value, and provide a solution where we can be back on just using map and flatMap which would solve the big concern I raised about this very common use case and would provide a way to solve the flatMap* design issue. Also it pushes developer to be aware of the number of maximum concurrent requests which is IMO a good thing.

@fvasco
Copy link
Contributor

fvasco commented May 1, 2019

I wish to expose a little issue regarding the using of concurrency attribute for Flow's operators instead having a ConcurrentFlow or the above proposal concurrent sandbox.

Example:
I am implementing a REST web service usingi Spring, for each id I have to limit the concurrent invocation to 4.
So I have to write ids().map(concurrency = 4) { awaitDetails(it) }.
Now I have to filter id to check the user authentication, so I really tempted to write:

ids().filter(concurrency = 4) { checkId(it) } .map(concurrency = 4) { awaitDetails(it) }

but concurrenty now is 4 + 4, so I should write something like

ids().filter(concurrency = 2) { checkId(it) } .map(concurrency = 2) { awaitDetails(it) }

tunig these parameters is not easy, to add more operators to chain become tricky.

See other solution

ids().concurrent(concurrency = 4).filter { checkId(it) }.map { awaitDetails(it) }.merge()

This looks better, but we have to consider to implement all operators for the new type, not only a limited set.
So, if I have to insert an operator not available for ConcurrentFlow then I have to write:

ids()
.concurrent(concurrency = 4).filter { checkId(it) }.merge()
.someOperator()
.concurrent(concurrency = 4).map { awaitDetails(it) }.merge()

The same issue, unless ConcurrentFlow extends Flow.

Lastly the sandbox proposal:

ids()
.concurrent(concurrency = 4) { flow ->
  flow
    .filter { checkId(it) }
    .someOperator()
    .map { awaitDetails(it) }
}

@sdeleuze
Copy link
Contributor

sdeleuze commented May 1, 2019

Good point, I agree that tuning these parameters is not easy.

On one side, specifying this concurrency parameter (with proper documentation) makes that more explicit so users will have a better idea of how many concurrent request their could be.

On the other side, Coroutines are cheap, so maybe we don't really care of the exact value for most cases since that adds a potentially annoying cognitive load to choose the proper value. Maybe users should just choose the concurrency mode they want (using a dedicated operator or a boolean or enum optional parameter) ...

@sdeleuze
Copy link
Contributor

sdeleuze commented May 1, 2019

What about using concurrent() to set the concurrency mode for downstream operators in order to avoid this "2 level API", so in my example that would be ids().concurrent().map { awaitDetails(it) }. If we need to bring back the sequential mode, we could use concurrent(concurrency = 1). That seems to solve the following points:

  • Explicit concurrency
  • No issues due to choosing the right value for concurrency parameter
  • No new type to introduce
  • 2 operators instead of 3

@elizarov
Copy link
Contributor

elizarov commented May 1, 2019

Let us explore this a bit. Assume that we have concurrent() operator that switches into concurrency at some "default level" (without forcing people to think about the exact numbers, which is a good thing), then you do some additional operators. This design is composable and pretty concise, but raises a number of questions. Regardless of what the return type is and how it is named, the key question is whether the result of concurrent is a Flow or not. I'd argue it cannot be a flow, and here's why.

If the result of concurrent() is a Flow, it means you can apply any flow operator to it. For example, you can do concurrent().collect { ... }. But since its "concurrent" one would naturally expect that the code in the collect operator is called concurrently. There is actually a use-case for exactly that. Here it is.

Assume that I have a flow of records and I want to store all of them to DB:

flowOfRecords.collect { storeToDB(it) } // sequential

But that does storing to DB sequentially. What if storeToDB takes a second and I want that storing to be performed concurrently to minimize latency? Armed with concurrent() operator at hand I, as a naive user, expect that this code is the solution:

flowOfRecords.concurrent().collect { storeToDB(it) } // concurrent

That's a really clear way to express my intent to concurrently store those record to DB!

But now, if a result of concurrent() is a Flow I can write: fun tricky(): Flow<Record> = flowOrRecord.concurrent(), and some unsuspecting poor user of tricky() function does tricky().collect { doSomething(it) } expecting that doSomething is called sequentially, because it is a flow, but boom -- it goes concurrent onto him.

The design rationale that Java used when designing their Stream.parallel() operators does not apply to the design of Flow, because streams a hot and one-shot and you are not supposed to write user-defined functions that return a stream, but flows are cold and multi-shot and we do expect people to write their functions with a Flow return type.

So what are the possible solutions to this conundrum in the context of our exploration of concurrent() operator?

  • If the result of concurrent() is a Flow, then it has to have a separate operator for "concurrent collect" and a regular collect has to stay sequential. You can easily extend this reasoning for all the other operators (map, filter, etc), but you'll and up with a distinctly named set of operators doing essentially the same thing, but with concurrency. I'd say it is a no-go as a design choice.

  • This leads us to a natural conclusion that concurrent() result cannot be a Flow. It has to be a separate type with its own set of operators. This way thoe concurrent operators can have the same name (collect, map, filter) and it would be Ok for them to behave differently.

But that means that we'll need operators to convert back to the sequential mode and it cannot be just concurrent(1), because that does not return a flow. It has to be a separate operator: sequential, merge, whatever.

That, in turn, means that if I want to do "concurrent map" and get a regular sequential flow as a result, then I have to write somewhat longer-looking: concurrent().map { ... }.merge(). I personally do not find that to be a show-stopper, though, but it might look shorter with "sandbox" proposal if we force it to merge the results back at the end into the single flow: concurrent { map { ... } }. The sandbox design proposal has its own open questions, thought.

So far I don't see a single satisfying proposal that meets all the use-cases, so we may end up implementing multiple different solutions for concurrency to meet different use-cases.

@elizarov
Copy link
Contributor

elizarov commented May 1, 2019

@altavir

The order, of course is important.

You are looking through a prism of your narrow use-case, since there are clearly many use-cases for which the order is not important (see, for example, an example of storing to DB above). First of all, let me note that none of implementations of concurrentMap presented so far preserve an order. Consider proposal by @qwwdfsad, for example:

private fun <T, R> Flow<T>.concurrentMap(dispatcher: CoroutineDispatcher, concurrencyLevel: Int, bufferSize: Int, transform: suspend (T) -> R): Flow<R> {
    return flatMapMerge(concurrencyLevel, bufferSize) { value ->
        flow { emit(transform(value)) }
    }.flowOn(dispatcher)
}

What happens here is that it creates a lot of inner flows and they are started to be collected concurrently. Now, the first one to emit, gets merged first. You can play with an example I wrote here to convince yourself that it does indeed reorder elements. Try rewriting so that it does preserve an order, yet remains concurrent.

Now, order-presenting "concurrentMap" is indeed a use-case. One of many. This use-case is an intricate one, though, because it is hard to design in a composable way. I can envision on order-preserving "concurrentMap" implementation as a separate operator, but it does not easily lend itself for a decomposition into simpler operators to combine it with "concurrentFilter" and others. It is one of the puzzles we have in the design of concurrent operators for flows.

@Dominaezzz
Copy link
Contributor

Just my two cents. In the very likely event that we get a concurrentMap, which will probably spawn coroutines to run the next bufferSize elements (with a cap of concurrencyLevel) and maintain the original iteration order.
I propose it should be called pipelinedMap, since it's sequential but pipelined.

concurrentMap { it } vs concurrent { map { it } } already reeks of confusion (At least to me).

@Dominaezzz
Copy link
Contributor

Here's an implementation of fun <T, R> Flow<T>.pipelinedMap(bufferSize: Int, concurrency: Int, block: suspend (T) -> R): Flow<R>.

@jivimberg
Copy link

Here's my take on the implementation inspired by @Dominaezzz's and flatMapMerge: parallelMap(). It uses a pool of workers instead of set of Deffered jobs. It doesn't guarantee ordering.

@akarnokd
Copy link

I have a design that specifies a separate type ParallelFlow and thus its own sub DSL. Unfortunately, I've run into a couple of conceptional problems with using FlowCollectors, such as what to do when a rail fails/needs to be cancelled, or how to have the rails do the reduction instead of the main collector coroutine.

@elizarov
Copy link
Contributor

elizarov commented Aug 2, 2019

I have a design that specifies a separate type ParallelFlow

@akarnokd That is one of the designs we have on the table for this issue. No decision yet.

Advantage of this "separate type" design is that it is a DSL that can be tailored to the concurrent processing, but the disadvantage of it is that is not quite orthogonal in nature. It requires duplication of many operators. All transform-based operators (like map and filter) can be "lifted" to parallel flows, but would still require a separate declaration that operates on ParallelFlow and returns a ParallelFlow, considerably increasing API surface. Reduction operators, however, as you have correctly noted, need a totally separate implementation that is thread-safe and has proper synchronization. We don't see any obvious way to reuse one implementation for both parallel and non-parallel reductions as synchronization would considerably slow-down sequential operators to the point of affecting performance on sequential benchmarks.

Note, that if we follow with this design it will be called ConcurrentFlow, since it is going to bring concurrency to the flow processing, but not necessarily parallelism (multiple flows can run concurrency in a single thread without any parallelism and this is still quite useful in async code, since concurrency lets you have multiple long-running network requests in flight).

Our working theory, if we follow with "separate type" design, is that we can provide only a few reduction operators out-of-the-box with merge() being the core operator to transform concurrent flows into a sequential Flow, where you can then apply the reduction that you need sequentially. The hope is that no one really needs advanced concurrent reduction. We don't have a goal of duplicating the full glory of parallel Java streams. Our goal is to provide concurrency for asynchronous processing and there concurrent version of map and flatMap operators are going to be the most commonly used ones.

Moreover, in this case, existing flapMapMerge(concurrency = n) { block } becomes a shorthand for concurrent(n).flatMap { block }.merge()

@joost-de-vries
Copy link

Are there plans to support partition keys? Aka consistent hashing.
As in: events with the same hashkey will be processed by the same worker and thus are guaranteed to be processed sequentially. While events with different hashkeys may be processed in parallel.
It's an important pattern in server side processing: let's say we are processing a stream of windmill events. Events about a specific windmill will all have the same hashkey; the windmill id. And since they're processed by the same worker they can't overtake eachother.

@cgruber
Copy link

cgruber commented May 8, 2020

So, is this issue a thing, or are we more or less looking at flatMapMerge/flatMapConcat in the forseeable future? I like the idea of chaining parallel flows. I actually DO think we should have if not the full glory of parallel streams, at least some of it. I hate having to bail to streams to get the functionality, and it's awkward when trying to work within coroutine context.

@cgruber
Copy link

cgruber commented May 8, 2020

I'd put in a vote for the separate-type flow despite the downsides of extra API volume. It's I think a lot safer and ends up being cleaner to reason about, despite some DRY. We're really not talking about massive volumes of code, if I'm estimating this right - sure more than reusing fully, but it's not that big a delta, all things considered.

@pacher
Copy link

pacher commented May 29, 2020

specific operators like concurrentMap, which is hard to scale to other operators (concurrentFilter, concurrentFold, etc)

@elizarov In the spirit of #2065, how about concurrentTransform?

fun <T, R> Flow<T>.concurrentTransform(
    transform: suspend FlowCollector<R>.(value: T) -> Unit
): Flow<R>

It is lower - level but generic enough to give us full flexibility and be usable until composable set of concurrency operators arrives. We could trivially do our own concurrentMap and concurrentFilter while waiting for proper solutions.

@Dominaezzz
Copy link
Contributor

@pacher I really like that idea (So much, that I'm going to try and implement it). The only issue I see is how to handle back pressure.
Do you pause upstream collection when n concurrent transforms are running, or do you pause upstream collection when the transforms have emitted n number of items (each?)?
The former works for map/filter, where the number of emissions are tiny and consistent and the latter works for a more flatMap-y operator, where the number of emissions can vary.

@pacher
Copy link

pacher commented Jun 5, 2020

@Dominaezzz I don't know. I did not think it through that much, it was just an idea, maybe even not a good one.

As per the topic of the issue, here is my take on order-preserving "concurrentMap" :

    fun <T, R> Flow<T>.concurrentMap(scope: CoroutineScope, concurrencyLevel: Int, transform: suspend (T) -> R): Flow<R> = this
            .map { scope.async { transform(it) } }
            .buffer(concurrencyLevel)
            .map { it.await() }

Can anybody see problems with this approach?

It looks good to me because it is just a concise combination of a few basic flow operators. But it is preserving order compared to @qwwdfsad 's flatMapMerge snippet.

@elizarov
Copy link
Contributor

elizarov commented Jun 5, 2020

As per the topic of the issue, here is my take on order-preserving "concurrentMap" :

@pacher That's a good take on order-preserving concurrent-map. There are two things that I see.

For one, the way buffer(n) works gets you n + 2 concurrently executed transformations in this code, so the corresponding correction needs to be applied.

It is easy to remember. Without buffer there is no concurrency - at most 1 operation is executed at any time, with buffer(0) at most 2 can run concurrently, so with buffer(n) at most n + 2.

The other one is that concurrentMap should not be, conceptually, taking a scope parameter but shall operate in the scope of the caller flow. That's harder to express given the current set of flow primitives. The easiest approach I can come with is to use channelFlow. However, if you use channelFlow you can just as well control concurrency with a Semaphore instead of a buffer.

@elizarov
Copy link
Contributor

elizarov commented Jun 5, 2020

So, is this issue a thing, or are we more or less looking at flatMapMerge/flatMapConcat in the forseeable future?

@cgruber We plan it to be the next "big thing" after flow sharing.

@salamanders
Copy link

@patcher once I guessed at .concurrentMap(CoroutineScope(coroutineContext), 6) {... I was able to drop in your function and it worked like a charm, thank you!

From this end-user who doesn't know the depths of performance/contexts/coroutines/dispatchers, a +1 vote for something like this, it was very easy to make use of.

@rkklai
Copy link

rkklai commented Mar 31, 2021

Is there any update on this design item? This feature would make ETL applications so much easier and very performant. Thank you!

@gscatto
Copy link

gscatto commented May 22, 2021

@pacher @elizarov Here's my attempt on order-preserving concurrent-map operator after seeing your posts.

inline fun <T, R> Flow<T>.mapInOrder(concurrencyLevel: Int, crossinline map: suspend (T) -> R): Flow<R> =
    Semaphore(permits = concurrencyLevel).let { semaphore ->
        channelFlow {
            collect {
                semaphore.acquire()
                send(async { map(it) })
            }
        }
            .map { it.await() }
            .onEach { semaphore.release() }
    }

Is this approach better? Can it be further improved?

@lowasser
Copy link
Contributor

We've been thinking about this at Google recently, as we definitely have use cases that require preservation of order and concurrent evaluation of RPCs. (Consider search results: order is meaningful, and returning results as fast as possible is a high priority.) Flow seems like the appropriate abstraction for us.

There's also some subtlety in exception handling: in e.g. @giulioscattolin's example above, in flowOf(a,b,c).mapInOrder(3, f), if f(b) throws before f(a) or f(c) complete, then f(a) will be cancelled, a behavior difference from map.

master...lowasser:map-concurrent is a prototype implementation that tries to address all those issues, as well as both the concurrencyLevel and buffer knobs.

@jnorthrup
Copy link

as this is not the first hit on google, but the first relevant hit on flows map reduce, can this be fast-forwarded and hopefully closed to the state of 1.5.x please?

@bugaevc
Copy link

bugaevc commented Sep 22, 2021

I found myself needing concurrent mapping, and seeing how concurrent flow design has not been decided on yet, I thought I'd try to write up a sketch implementation of it myself. Here's the gist.

  • It's more complex than a regular Flow
  • The implementation is somewhat naïve and unoptimized
  • But it works!

Usage example:

flow {
    for (i in 0 until 100) {
        emit(i)
        delay(10)
    }
}.concurrent().map { value ->
    delay((0 until 100).random().toLong())
    value + 1
}.filter { value ->
    delay(1)
    value % 2 == 0
}.flatMap { value ->
    flow {
        delay(5)
        emit(value)
        delay(200)
        emit(value + 1)
    }
}.merge(preserveOrder = true).collect { value ->
    println(value)
}

Here, .concurrent() has the following signature:

/**
 * Turn this flow into a [ConcurrentFlow].
 *
 * The following operations like [map], [filter], [flatMap], [reduce], [collect],
 * will be run concurrently rather than sequentially.
 */
fun <T> Flow<T>.concurrent(concurrencyLevel: Int = 64): ConcurrentFlow<T>

And .merge() does the reverse transformation, switching between two different implementations depending on whether the original order has to be preserved or not:

/**
 * Merge a concurrent flow back into a regular, sequential flow.
 *
 * If [preserveOrder] is true, items are emitted in the same order they were
 * present in the initial flow; otherwise, in an arbitrary order. Note that
 * preserving order requires additional buffering and means the collector has
 * to wait for prior items to become available instead of processing new items
 * as they appear, which slows things down.
 */
fun <T> ConcurrentFlow<T>.merge(preserveOrder: Boolean): Flow<T>

It's also possible to directly concurrently collect a ConcurrentFlow with .collect { }, and in fact for the preserveOrder = false case the merging is implemented as just

channelFlow {
    collect { value ->
        send(value)
    }
}

@paulferaud
Copy link

paulferaud commented Feb 17, 2024

There is actually a trivial solution to this, which our app uses (and we wrote a small library for).

The "philosophy" is to be able to provide transforms "that behave like the normal transform, but faster". Basically, it should never change the order of output.

Basically, you can use a ChannelFlow to emit async events, and then await in a follow up map. Basically, this:

fun <T, R> Flow<T>.mapAsync(transform: suspend (value: T) -> R): Flow<R> =
  channelFlow {
    collect { e -> send(async { transform(e) }) }
  }
  .map { it.await() }

And... that's it.

This actually gives you (roughly) Channel.BUFFERED concurrency. but can easilly be customized with something like:

fun <T, R> Flow<T>.mapAsync(concurrency: Int, transform: suspend (value: T) -> R): Flow<R> {
  require(concurrency >= 1)
  if (concurrency == 1) return map(transform)
  return channelFlow {
    collect { e -> send(async { transform(e) }) }
  }
  .buffer(concurrency - 2)
  .map { it.await() }
}

The -2 is there because you actually have 1 element being processed in the channelFlow (and is not in the buffer), and one being awaited in map.

You can also use Channel.UNLIMITED for unbounded concurrency.

What's nice is that (almost) every other flow transform can be expressed in terms of mapConcurrently. EG:

fun <T> Flow<T>.filterAsync(predicate: suspend (value: T) -> Boolean): Flow<T> =
  mapAsync { it to pred(it) }.transform { if (it.second) emit(it.first) }

Same as mapIndexed, mapNotNull, filterNotNull etc...

Things get trickier if you want to implement things like transform or flatMapConcat. Or generally speaking, anything that produces more flows. Assuming you want an in-order behavior (as otherwise, what you want is just flatMapMerge). You instead have to create a channel that eager-collects the concurrent elements. You can't just concurrently emit flows, as these would not run until they are actually collected. You would want:

fun <T, R> Flow<T>.transformAsync(transform: suspend FlowCollector<R>.(T) -> Unit): Flow<R> =
  channelFlow<Channel<R>> {
    collect { e ->
      val channel = Channel<R>(Channel.BUFFERED) // This can be customized
      launch {
        flow { transform(e) }.collect { channel.send(it) }
        channel.close()
      }
      send(channel)
    }
  }
  .transform { emitAll(it) }

This works quite well for our app, and makes it super easy to process things concurrently in a "real-world server app".

However, there are 3 important caveats to take into account.

  1. If anything throws an exception at any time, the flow will immediately fail with that exception, even if it's not that exception's turn to be sent. If this is not to your liking, you can always mapAsync { runCatching { f(it) } }.map { it.getOrThrow() }. However, this time, keep in mind that a failure at iteration n would not cause n+1 to cancel until n is actually collected.

  2. The above argument concurrency is not really concurrency. It's just a buffer, and its elements filled concurrently. For example, assuming you have elements 0..n, which you try to mapAsync(concurrency = 3), then element k+3 will never be computed until k has been evaluated, regardless of k+1 or k+2. If you have a transform whose duration are wildly inconsistent, then you would not leverage the "full" concurrency. To address this, you'd need a separate buffer and semaphore. Something like:

fun <T, R> Flow<T>.mapAsync(concurrency: Int, buffer: Int = concurrency, transform: suspend (value: T) -> R): Flow<R> {
  require(concurrency >= 1)
  require(buffer >= 0)
  channelFlow {
    val semaphore = Semaphore(concurrency)
    collect { e ->
        semaphore.acquire()
        send(async {transform(e).also { semaphore.release() }})
    }
  }
  .buffer(buffer)
  .map { it.await() }
}

This is NOT something we actually use in our application. So caveat emptor. There might be edge cases I did not think about, and it's not production tested.

  1. This one is important. This does NOT implement an async flow "chain" operation. Each operation is independent, and elements need to be emitted from the previous operation before being processed by the next. For example, given:
myFlow
  .mapAsync { operation1(it) }
  .mapAsync { operation2(it) }

Given this example, assume that operation1(k+1) has already finished, but operation1(k) has not, then operation2(k+1) cannot start until operation1(k) emits.

Generally speaking, this has not caused issues for us, but we are "aware" to try to merge as much as possible all our operations into a single async operation.

If you do want such behavior, then bugaevec's answer seems to be what you are looking for.


So, I hope this helps. I do not believe this would be a good fit for the standard library. But it's something that's pretty easy to implement and maintain yourself. It works 100% of the time. The caveats are mostly about things not being quite as concurrent as you might they might be.

But for "everyday operations", we use this everywhere. In particular, any time we have a Collection.map { somethingThatSuspends }, we whip out the above.

@paulferaud
Copy link

Oh, I forgot. Similarly, you can implement short-circuiting async first/all operations, as so:

  suspend fun <T> Flow<T>.allAsync(predicate: suspend (T) -> Boolean): Boolean =
   channelFlow { collect { e -> launch { send(predicate(e)) } } }.firstOrNull { !it } ?: true
  }

  suspend fun <T> Flow<T>.anyAsync(predicate: suspend (T) -> Boolean): Boolean =
    channelFlow { collect { e -> launch { send(predicate(e)) } } }.firstOrNull { it } ?: false
  }

Which can be quite useful for "Only proceed if all/any of these (suspend) conditions are true".

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests