Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flow multiplexing #4103

Open
odedniv opened this issue Apr 16, 2024 · 1 comment
Open

Flow multiplexing #4103

odedniv opened this issue Apr 16, 2024 · 1 comment

Comments

@odedniv
Copy link
Contributor

odedniv commented Apr 16, 2024

Use case

  1. I have a data source that can only be subscribed to once per process (specifically, this one - note that set...Callback rather than add...Callback).
  2. On subscription, I provide a set of things I want from it (specifically this - note that it's a Set).
  3. I have different usages in my program that conditionally require a specific set of things from the data source (specifically one code that if executed wants this, and another that would want that).
  4. I need a single manager that manages the desire of all usages, calling the data source based on the sum of all requirements, and feeding each usage the data it requested.

The Shape of the API

Example implementation and API can be seen in master...odedniv:kotlinx.coroutines:multiplex. Generally:

/**
 * Constructs a [MultiplexFlow].
 *
 * Behavior:
 * * [getAll] is called every time the total keys collected by flows returned by [MultiplexFlow.get] changes (when collection is started or stopped).
 * * [getAll] is called with the total keys of all collected [MultiplexFlow.get] flows.
 * * [MultiplexFlow.get] calls share the data between them, such that [getAll] is not invoked when all the keys provided to [MultiplexFlow.get] are already collected by another [MultiplexFlow.get] caller.
 *   If [replay] is 0, this rule does not apply and [getAll] is re-invoked for every change in collections.
 * * Errors in calls to [getAll] trigger a rollback to the previous keys, and collections of all [MultiplexFlow.get] with one of the new keys will throw that error.
 * * Follow-up [getAll] error, or an error after the [getAll] collection already succeeded, will clear all subscriptions and cause all [MultiplexFlow.get] collections to throw that error.
 * * If the flow returned by [getAll] finishes, all current collections of [MultiplexFlow.get] finish as well, and follow-up collections will re-invoke [getAll].
 */
public fun <K, V> MultiplexFlow(
    scope: CoroutineScope,
    replay: Int = 1,
    extraBufferCapacity: Int = 0,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
    getAll: suspend (keys: Set<K>) -> Flow<Map<K, V>>,
): MultiplexFlow<K, V>

/**
 * Allows multiplexing multiple subscriptions to a single [Flow].
 *
 * This is useful when the source allows only a single subscription, but the data is needed by multiple users.
 */
public class MultiplexFlow<K, V> internal constructor(...) {
    /** Returns a [Flow] that emits [V] for the requested [K]s, based on the map provided by `getAll`. */
    public operator fun get(vararg keys: K): Flow<V>
}

//
// Sample usage:
//

val multiplexFlow = MultiplexFlow<Int, String>(scope) { keys: Set<Int> -> // keys of all requests, eventually {1, 2, 3}
  // Collection of this flow will be cancelled when the set of total keys is replaced.
  dataSourceFor(keys).map { values: List<DataValue> -> // values for all requests
    values.associateBy { it.key } // mapping to allow each user to get only the data they requested
  }
}
launch {
  multiplexFlow[1, 2].collect { value -> /* values with keys 1 or 2 */ }
}
launch {
  multiplexFlow[2, 3].collect { value -> /* values with keys 2 or 3 */ }
}

data class DataValue(key: Int, ...)

Prior Art

This is similar to a SharedFlow, except for these distinctions:

  1. It is aware of subscription of specific data requested, rather than a single global requirement.
  2. Each collector only gets the specific data it wants, rather than everything.

Because of these distinctions, there would have to be multiple SharedFlows, which would be hard to get right if there's only a single data source for all of them. Note that the suggested implementation in master...odedniv:kotlinx.coroutines:multiplex is actually based on maintaining multiple SharedFlows, each feeding the specific users.

Implementing this is very error prone, around both thread safety and lifecycle (e.g. rollback when a new user requested data that fails to be fetched). It took many cycles in my project to get this right, so I thought it would be a good idea to have an implementation in kotlinx.coroutines.

odedniv added a commit to odedniv/kotlinx.coroutines that referenced this issue Apr 17, 2024
This is useful when the source allows only a single subscription, but
the data is needed by multiple users.

Fixing Kotlin#4103.
@odedniv
Copy link
Contributor Author

odedniv commented Apr 17, 2024

Ignore the specific commit referenced above, see master...odedniv:kotlinx.coroutines:multiplex for the most up to date proposal.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants