Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flow Replicate Operator #3833

Open
yoxjames opened this issue Aug 1, 2023 · 0 comments
Open

Flow Replicate Operator #3833

yoxjames opened this issue Aug 1, 2023 · 0 comments

Comments

@yoxjames
Copy link

yoxjames commented Aug 1, 2023

Introduction

Shared Flows allow us to create a flow that can have multiple subscribers, or convert a cold flow into a hot flow. Shared flows are "hot" in that its active instance exists independently of the presence of collectors. An important consequence of this design is that a shared flow never completes. This is great when the underlying concept the flow represents is truly hot.

What if you wanted to share or split a flow without making it hot and allow for completion? Currently, there's no clean way to do this with Shared Flows. This concept has been and will be referred to as "replication" in this proposal to make it distinct from sharing.

This concept is not an original idea of mine. It has been brought up in several places, however I did not see an issue for it and consider it an important feature for my use cases.

This idea has come up in the following discussions:
#2034
#1086
#2092

How are replica flows different from shared flows?

At first glance this proposal may seem very similar to Flow sharing. However, it has several key differences.

  1. Replicated Flows are cold. They will not run anywhere independent of collection. They allow you to "share" a flow preventing the need to recompute the entire flow however, they do not do anything without being collected.
  2. Converting a normal flow to a replicated flow should not require a coroutine scope or sharing policy. There should be no possibility of missed emissions.
  3. The amount of "consumers" of the shared flow is known at creation. In contrast, shared flows are designed to be shared widely to an unknown amount of consumers. In theory, this difference should enable points 1 and 2.
  4. Replica flows can complete.

Here's a code example of the difference:

Shared Flow

coroutineScope {
    val flow = flowOf(1, 2, 3)
        .onEach { println("onEach: $it") }
        .shareIn(this, SharingStarted.Lazily)
    
    launch { flow.collect { println("collect1: $it") } }
    launch { flow.collect { println("collect2: $it") } }
}
println("done") // This won't execute
Output
onEach: 1
onEach: 2
onEach: 3
collect1: 1
collect1: 2
collect1: 3
collect2: 1
collect2: 2
collect2: 3

Note that in this example my source flow flowOf(1,2,3) is simple enough to prevent any nondeterministic outcomes. However, if we add a delay to the source flow then it's possible for the second collector to miss values. This can be addressed with the correct SharingStarted policy but this would not address flow completion.

Replicated Flow

val flow = flowOf(1, 2, 3)
    .onEach { println("onEach: $it") }
    .replicate {
        replica { collect { println("collect1: $it") } }
        replica { collect { println("collect2: $it") } }
        println("done") // This will execute
    }
Hypothetical Output
onEach: 1
onEach: 2
onEach: 3
collect1: 1
collect1: 2
collect1: 3
collect2: 1
collect2: 2
collect2: 3
done

Another important distinction not highlighted here is that replica flows will always get every emission from the source flow. We don't have to worry about sharing policies like with a shared flow.

Use Cases

Preventing reevaluation of costly operation

#2092 (comment)
In many cases we simply want to prevent reevaluating a cold flow numerous times as it could be expensive (such as reading a file). We could share the flow, but then we have a flow that never completes which is not ideal. Reading a file or running a process on the host machine in many cases does complete and a Shared Flow doesn't adequately model that concept. We're forced to live without completion or run something twice which may be a non-starter in certain applications.

Split and Merge processing

#1086 (comment)
Here's an example I found when searching about this issue. I have my own example below.

Many times it's easier conceptually to break a flow into multiple flows and process those before merging them back together. This can allow for much cleaner implementations of certain concepts.

In my case I often find myself wanting to do this with sealed type hierarchies:

sealed interface ResultOrError<out E, out T> {
    data class Result<out T>(val value: T) : ResultOrError<Nothing, T>
    data class Error<out E>(val value: E) : ResultOrError<E,Nothing>
}

This "union" type is similar to something like Arrow's Either<A,B> where values can either represent errors or success. Though my primary use case is Arrow I wanted to keep this example standard Kotlin . Let's say I have a Flow that is a ResultOrError<Throwable, String>.Generally speaking, I would expect, and hope, most values from this flow would be a Result<String> and not Error<Throwable>. I might not be able to do much with Error<Throwable> so it would be a lot nicer if I could write flow logic on String using a split and merge strategy to continue propagating the errors. Then my business logic isn't having to constantly unbox things and error propagation can be delegated to a higher order function.

val flow: Flow<ResultOrError<Throwable, String>> = expensiveFlow()
    .onEach { println("expensive operation" ) }

flow.replicate {
    val success = replica { filterIsInstance<Result>.doSomething().map { Result(it) } }
    val failure = replica { filterIsInstance<Error> }

    merge(success, failure)
}.collect { println(it) } // Flow<ResultOrError<Throwable, SomethingElse>>

// This function can be written to process a Flow of the underlying success type only.
// This decouples it from error handling which is simpler to conceptualize.
fun Flow<String>.doSomething(): Flow<SomethingElse> { TODO() }

While there is additional overhead of the actual replicate { } call that too could be abstracted away, especially for classes like the commonly used Either<A,B>. Currently, you could do something like this with shareIn(...) but it is truly not an intended use case, and you would be left with a flow that is hot and cannot complete which simply won't work for all use cases. While this example is simple, the costs of error wrapping can increase exponentially as multiple levels of flow processing can return error types (or other sealed types). This strategy could make it a lot simpler to reason about.

RxJava and Project Reactor

#1086 (comment)

In this comment @pacher highlights that Reactor and RxJava support this. I understand Kotlin Coroutines is not attempting to chase operator counts from those two, but I believe this feature has important use cases and that share is probably being used inappropriately for this purpose. Especially from people coming from those libraries.

Mock Design

Most of the design was borrowed from this comment from Roman
#1086 (comment)

I'll try to expand on this a bit and details can be hammered out if there's interest in this feature.

// Split and Merge

// replicate would be an extension function on flow. It would take a function as a param and
// use the "type safe builder" pattern.
flow.replicate {
    // Inside the builder replica would be brought into context. This would take the underlying
    // Flow and produce a ReplicatedFlow<T> which would be a Flow<T>. Inside replica {... } the 
    // this receiver would be a ReplicatedFlow<T> of the original flow allowing transformation
    // or collection. Replica would also have a generic return type in this case it returns a flow
    val success = replica { filterIsInstance<Result>.doSomething().map { Result(it) } }
    // We will know exactly how many "subscribers" exist due to the number of replica calls
    val failure = replica { filterIsInstance<Error> }
    // Replicate would return a generic. This means that you could use it to terminate a flow
    // or return a new flow to continue fluid (chained) processing.
    merge(success, failure)
}.collect { println(it) } // Flow<ResultOrError<Throwable, SomethingElse>>
// Preventing costly execution

val flow = somethingExpensive()
flow.replicate {
    // We collect flow twice but the flow is only evaluated once.
    replica { collect { doSomething(it) } }
    // I am not sure if we'll be able to allow `launch` in these cases. I think that would be
    // very cool though.
    replica { collect { doSomethingElse(it) } }
    // We don't explicitly return anything (we return the result of the second `replica { ... })`
    // In this case the return type would be `Nothing` 
}

The goal of this design is to make replicate { ... } flexible enough to handle the use cases described and possibly more.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants