Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flow.shareIn and stateIn operators #2047

Closed
elizarov opened this issue May 22, 2020 · 35 comments
Closed

Flow.shareIn and stateIn operators #2047

elizarov opened this issue May 22, 2020 · 35 comments

Comments

@elizarov
Copy link
Contributor

elizarov commented May 22, 2020

This issue supersedes #1261 and is based on the SharedFlow #2034 and StateFlow #1973 framework. See #2034 for most of the conceptual details on shared flows.

Introduction

A MutableSharedFlow provides convenient means to own a shared flow of values that other parts of code can subscribe to. However, it is often convenient to take an existing cold Flow that is defined in some other piece of code and start sharing it by launching a coroutine that collects this upstream flow and emits into the shared flow.

shareIn operator

The shareIn operator is introduced:

fun <T> Flow<T>.shareIn(
    scope: CoroutineScope,
    started: SharingStarted,
    replay: Int = 0,
): SharedFlow<T>

It has the following parameters:

  • scope- a coroutine scope in which the sharing coroutine is launched.
  • started - a strategy that controls when sharing is started and stopped.
  • replay - a number specifying how many values are replayed for each new subscriber (defaults to zero — no replay).

Starting sharing

There are the following out-of-the-box implementations of SharingStarted:

  • SharingStarted.Eagerly - start sharing coroutine immediately.
  • SharingStarted.Lazily - start sharing after the first subscriber appears and keep on going forever.
  • SharingStarted.WhileSubscribed() - maintain sharing coroutine only while there is at least one subscriber (start when the first one appears, stop when the last one disappears). It keeps the replay cache forever when subscribers disappear by default.

The WhileSubscribed strategy is a function with optional parameters:

fun SharingStarted.Companion.WhileSubscribed(
    stopTimeoutMillis: Long = 0, 
    replayExpirationMillis: Long = Long.MAX_VALUE
)
  • stopTimeoutMillis - how long to wait (in ms) before stopping sharing after the number of subscribers becomes zero.
  • replayExpirationMillis - how long to wait (in ms) after stopping sharing before resetting the replay cache.

All the above values and WhileSubscribed function are defined in SharingStarted companion object. This way, additional values/functions can be defined as extensions. Variants of WhileSubscribed function that work with different representations of duration (java.time.Duration, kotlin.time.Duration, Long, TimeUnit) will be defined by the library in the appropriate modules.

stateIn operator

As StateFlow is a specialized version of SharedFlow, stateIn operator is a specialized version of shareIn operator. It does not have a replay parameter (it is always equal to 1 for the state flow) and has a required initialValue:

fun <T> Flow<T>.stateIn(
    scope: CoroutineScope,
    started: SharingStarted,
    initialValue: T
): StateFlow<T>

When execution happens in suspending context and you want to compute and wait for the initial value of the state to arrive from the upstream flow, there is a suspending variant of stateIn without initial value and with the hard-coded sharingStarted = Eagerly:

suspend fun <T> Flow<T>.stateIn(scope: CoroutineScope): StateFlow<T>

Custom starting strategies

SharingStarted is an interface that supports 3rd-party implementations, allowing any starting strategy to be plugged into the sharing operators:

interface SharingStarted {
    fun command(subscriptionCount: StateFlow<Int>): Flow<SharingCommand>
}

An implementation of SharingStarted provides a single function command that transforms the subscriptionCount of the shared flow into the flow of commands that control sharing coroutine and are represented with SharingCommand enum:

enum class SharingCommand { START, STOP, STOP_AND_RESET_REPLAY_CACHE }

Error handling

Any error in the upstream flow cancels the sharing coroutine and resets the buffer of the shared flow. The error is delivered to the scope. If this behavior is not desirable, then error-handling operators (such as retry and catch) should be applied to the upstream flow before shareIn operator. If the upstream completes normally, then nothing happens.

Conceptual implementation

The conceptual implementation of shareIn operator is simple:

fun <T> Flow<T>.shareIn(
    scope: CoroutineScope,
    started: SharingStarted,
    replay: Int = 0
): SharedFlow<T> {
    val upstream = this
    val shared = MutableSharedFlow<T>(
        replay = replay, 
        extraBufferCapacity = maxOf(replay, DEFAULT_BUFFER_SIZE) - replay, 
    )
    scope.launch { // the single coroutine to rule the sharing
        started.command(shared.subscriptionCount)
            .distinctUntilChanged()
            .collectLatest { // cancels block on new emission
                when (it) {
                    SharingCommand.START -> upstream.collect(shared) // can be cancelled
                    SharingCommand.STOP -> { /* just cancel and do nothing else */ }
                    SharingCommand.STOP_AND_RESET_REPLAY_CACHE -> shared.resetReplayCache()
                }
            }
    }
    return shared
}

Note, that a buffer is padded to some minimal default capacity (64) by default for performance reasons.

Operator fusion

The actual implementation of shareIn operator is more complex. It fuses with the immediately preceding flowOn operators, directly launching the sharing coroutine in the corresponding context (without additional coroutine and channel to change the context).

It also fuses with the immediately preceding Flow.buffer operators. It allows for explicit configuration of the buffer size, creating a shared flow that takes a configured buffer size into account:

  • buffer(0).shareIn(scope, 0, started) - creates shared flow with extraBufferCapacity = 0. It overrides a default buffer size and thus configures full rendezvous between upstream emitter and subscribers (emitter is suspended until all subscribers process the value).
  • buffer(b).shareIn(scope, started, r) - creates shared flow with replay = r and extraBufferCapacity = b.
  • conflate().shareIn(scope, started, r) - creates shared flow with replay = r, extraBufferCapacity = 1 when replay == 0, and onBufferOverflow = DROP_OLDEST.

Note, that the last fusion of conflate().shareIn(scope, started, r) also flounders with the true spirit of fusion concept. It produces execution results that are different from the case of separate sequential application of confate and shareIn operators without fusion. For example, take an upstream flow that emits consecutive numbers starting from 1 with delay of 100ms between numbers and slow downstream subscriber taking 530ms for each value. A conflate() followed by buffer-1 shareIn without fusion collects values 1, 2, 3, 6, 11, 16, etc (three initial values collected due to double-buffering by conflation and sharing) and would buffer even more initial values if a default buffer size was used. But a fused implementation will have a single small buffer and collects 1, 6, 11, 16, etc just as expected from a separate conflate() without sharing. Fused buffer decreases the overall latency, processing the number 6 asap at 1060ms mark after start, as opposed to non-fused version where the number 6 would be collected at 2120ms mark. We consider this to be a good thing.

Implementation

Implementation is in PR #2069.

@zach-klippenstein
Copy link
Contributor

zach-klippenstein commented May 22, 2020

I love the SharingStarted design, seems really powerful.

SharingStarted.WhileSubscribed - … It resets the cache replay buffer to the initial value when subscribers disappear.

Just to confirm, opting in to preserve the cache between upstream collections (ala RxReplayingShare) is as simple as filtering RESET_BUFFER commands from the Flow produced by WhileSubscribed, right?

@elizarov
Copy link
Contributor Author

elizarov commented May 22, 2020

Just to confirm, opting in to preserve the cache between upstream collections (ala RxReplayingShare) is as simple as filtering RESET_BUFFER commands from the Flow produced by WhileSubscribed, right?

@zach-klippenstein Filtering does not work, since WhileSubscribed only sends RESET_BUFFER (which also stops). You better use WhileSubscribed(cacheExpirationMillis = Long.MAX_VALUE). Now, if that becomes a popular choice, then we can also provide a short-cut option for it. The only challenge is to pick a name for it.

@pacher
Copy link

pacher commented May 22, 2020

Can you please elaborate a little bit more on error handling?

The error is delivered to the scope.

But what happens to collectors (launched in different scopes)? As far as I can understand they will hang there forever.
Materialized error will become regular emission and will not be delivered to the scope. Also upstream will not be unsubscribed unless #2042 is implemented and applied to materialized flow.

Same goes for completion. SharedFlow explicitly never completes, but shareIn could be applied to finite cold flow and there is no word in the design about this case. So as far as I can understand all the collectors will hang there forever after upstream completes.

P.S. SharingStarted design is really powerful and flexible, kudos for that!

@elizarov
Copy link
Contributor Author

Can you please elaborate a little bit more on error handling?
But what happens to collectors (launched in different scopes)? As far as I can understand they will hang there forever.

@pacher Yes. The basic rule of shared flow is that subscriber never completes. We'll have it written very near the top in shared flow docs and repeat in stateIn docs, too.

So what does it mean in practice? It depends on the error-handling policy of the app:

  • Crash on error (default): By default it is customary to the terminate (crash) the app on error and send some note with a stracktrace to developers that they have a bug in their code. That is what going to happen (by default) when an error in upstream is delivered to a typical scope.
  • Retry on error: If your upstream is some unrealiable network flow a typical approach would be to retry on error. You can do it using existing retry operators on your upstream.
  • Materialize error: If you need to deliver the error downstream, then you can materialize it using catch on the upstream and dematerialize, if needed, using map on the downstream. Be careful in this case about not loosing the error, as a critical bug in your code like a NullPointerException may accidentally sit forever materialized waiting for subscribers to see it or even get lost on buffer reset.

@elizarov
Copy link
Contributor Author

shareIn could be applied to finite cold flow and there is no word in the design about this case

@pacher During design and community review we have not found use-cases for sharing finite-size cold flows to subscribers whose number is not known in advance. We've found that there are some use-cases for replicating a finite-size cold flow into fixed number, known in advance, flows. This case is robust with respect to error propagation since it is syntactically scoped in nature and we plan to address it separately by a dedicated operator for flow replication that will be both completion and error propagating.

@pacher
Copy link

pacher commented May 22, 2020

@elizarov Thanks for clarifications, very helpful
I never did android and "crashing the app" is not an option for me. Maybe I am in the wrong place and not your target audience, which is a pity because I like what you do here.

I do believe that materialization should be baked-in, maybe as another operator which uses shareIn under the hood. I was trying to show that materialization could be trickier than it seems. For example another riddle: how to make materialized shareIn deliver only the error to new collectors if it "failed" and not to replay the whole buffer up until the error. For me it's just a riddle, I don't have a use-case for replay > 1 yet.

It's nice to see that you are still thinking about replicate. For the moment it felt like you are starting to get rid of error propagation in general, which is one the greatest aspects of reactive design in my opinion.

P.S. I remember how proud Andrey looked in his keynote when he talked about kotlin being used in the banks. I can't imagine them being happy with "crashing the app" either ;-)

@pacher
Copy link

pacher commented May 23, 2020

* critical bug in your code like a `NullPointerException` may accidentally sit forever materialized

Materializing error could be done along the lines of

upstream.
   catch {
      emit( materializedError )
      throw it
   }

This way it would be delivered both downstream and to the scope.

@elizarov
Copy link
Contributor Author

elizarov commented May 27, 2020

📣 There is an important question on the design of sharing operators we need community help with. We need to figure out what should be the default behavior of SharingStarted.WhileSubscribed with respect to cache reset. We feel that the default should be based on the most common usage.

The question only matters for an upstreamFlow.shareIn(scope, replay) where replay >= 1 or for stateIn operator (where replay is always implicitly equal to one). The intended use for started = WhileSubscribed is a case when an upstream flow is expensive to maintain (uses network resources, device connection, or something like that) and you want to maintain a running upstream flow only when there is at least one downstream subscriber present. Now, when all subscribers disappear, we have two variants of behavior to pick as our default:

👍 Immediately reset the cache to the initial value (if it was specified; clear if it was not) so that the next time subscribers appear they will not get stale value(s) from the previous upstream flow collection but will receive some kind of initial value (or an empty flow) to explicitly tell them to wait while upstream emits anything (establishes network connection and gets data, etc)

🚀 Keep the last value(s) emitted by the upstream so that the next time subscribers appear they will immediately get previously emitted last value(s) without having to wait until upstream flow emits anything.

Let's do quick poll: What do you use in your code most often and want to see as a default?

@ZakTaccardi
Copy link

ZakTaccardi commented May 27, 2020

I haven't really used Rx in over 2 years since I've been using coroutines, and I can't remember what scenarios I preferred the 👍 scenario vs 🚀. I would love to hear more use cases about where everyone would prefer one over the other, and vice versa.

👍 Immediately reset the cache to the initial value (if it was specified; clear if it was not)

Downside: less performant than quickly emitting a cached value

🚀 Keep the last value(s) emitted by the upstream

Downside: unexpected weird buggy behavior. For example, with the network connection example, if you had a State.Loaded(..) that was cached, and an .onStart { State.Loading }, you would receive stale Loaded(..) with fully loaded data immediately, followed by an immediate State.Loading. This could result in a fully loaded UI of stale data being shown with a quick swap to a loading screen.

My personal opinion is that the 👍 makes more sense as a default because it is safer, and for places where 🚀 makes more sense, devs can opt-in to that less safe but potentially more performant behavior

@JakeWharton
Copy link
Contributor

But move .onStart before the cache and you don't have that problem. The whole point of the cache is so you get the most recent value. If you're deliberately subverting the cache, the problem isn't the cache behavior it's your order of operations. The state machine which starts with the loading state is an upstream source of states from the cache.

@LouisCAD
Copy link
Contributor

@elizarov If you use replay with a value of 1 or greater, why would you want to not replay it but resetting to the initial value instead?

To me, for shareIn, the 👍 behavior makes sense with no replay (value of 0), and the 🚀 behavior makes sense with replay (>= 1).

For stateIn, if there's a way to have it replace the last value with the initial value, I think the behavior should not have any default, to force being explicit about the behavior.
That said, for stateIn, in regards to StateFlow API, I'm not sure replacing the last value with the initial value is right, so I'd say it should always give the last value, and other use cases would use shareIn.

@ZakTaccardi
Copy link

If you're deliberately subverting the cache, the problem isn't the cache behavior it's your order of operations. The state machine which starts with the loading state is an upstream source of states from the cache.

I'd have to use .shareIn(..) more to understand it, but if you are building some type of state machine where you do some style of scan operation where your Flow<T> needs an initial state, then I'd expect it to be quite useful to have that initial state be the cached value

@ZakTaccardi
Copy link

@elizarov for the poll, it might be useful to show an example of the .shareIn(..) parameters of what the non-default behavior would look like to specify whether you want the cache to replay or reset in the two scenarios

@RobertKeazor
Copy link

RobertKeazor commented May 28, 2020

This is a bit of a tricky question, 🚀 seems to represents the concept that this flow always maintains the most recent value of the state machine . But emitting a state that could be stale sounds pretty dangerous lol. Than this one 👍 seems safer but breaks that concept... I agree @elizarov is always better , as long devs have a way to opt in to 🚀

@matejdro
Copy link

matejdro commented May 28, 2020

Good solution for this could maybe allow developer to receive close callback and allow him to transform data before going into "long-term cache"?

Going from Zak's example, one could have State.Loaded(data) and State.Loading(partialData). When stream is in use, it would emit State.Loaded events. But after stream closes, it could take the last data and put it into partialData of State.Loading. That way when new subscriber arrives, it knows that last data is actually stale since received state is Loading, not Loaded.

@pacher
Copy link

pacher commented May 28, 2020

I would vote to remove WhileSubscribed and introduce two distinct strategies instead.
As a side benefit, we would have good examples of how to deal with replay buffer for our custom strategies and could use the source code of either as a starting point.

@elizarov
Copy link
Contributor Author

UPDATE: Based on your feedback and having discussed the issue in the team we've decided to make the following changes to this proposed design:

  • A default behavior for WhileSubscribed is changed to keep the replay cache.
  • The name of the corresponding configuration parameter for WhileSubscribed(...) function is changed to replayExpirationMillis (and its default is changed to Long.MAX_VALUE) to make it more consistent with the rest of replay-related parameters.
  • SharingCommand.RESET_BUFFER is renamed to SharingCommand.STOP_AND_RESET_BUFFER to make it more explicit.

I'll update this issue shortly. We're also changing for buffer terminology that is related to operator-fusion. The shared flow design will be tweaked a bit, too.

elizarov added a commit that referenced this issue May 29, 2020
Summary of changes:
* SharedFlow, MutableSharedFlow and its constructor.
* StateFlow implements SharedFlow.
* SharedFlow.onSubscription operator, clarified docs in other onXxx operators.
* BufferOverflow strategy in kotlinx.coroutines.channels package.
* shareIn and stateIn operators and SharingStarted strategies for them.
* SharedFlow.flowOn error lint (up from StateFlow).
* Precise cancellable() operator fusion.
* Precise distinctUntilChanged() operator fusion.
* StateFlow.compareAndSet function.
* asStateFlow and asSharedFlow read-only view functions.
* Consistently clarified docs on cold vs hot flows.
* Future deprecation notice for BroadcastChannel, ConflatedBroadcastChannel, broadcast, and broadcastIn.

This is a DRAFT. Parts that are not done yet:
* BufferOverflow strategy support in channels.
* Sharing operators conflation with preceding buffer(...) operator and the corresponding tests.
* Better functional tests for SharingStarted strategies.
* Test cancellability of shared flows.
* Add reactive operator migration hints.

Fixes #2034
Fixes #2047
@elizarov
Copy link
Contributor Author

Draft PR with implementation 👉 #2069

@gajicm93
Copy link

gajicm93 commented Jun 4, 2020

I have a question about CoroutineScope passed into shareIn operator. Since SharedFlow can be collected from many different Scopes with variable lifecycle durations, does the Scope passed into shareIn operator must be longer lasting than all the collector scopes?

For example, if I "shareIn" with Fragment lifecycle scope, but then collect with GlobalScope, that collector will be canceled as soon as my Fragment destroys, even though I'm collecting inside GlobalScope?

Thanks.

@elizarov
Copy link
Contributor Author

elizarov commented Jun 4, 2020

I have a question about CoroutineScope passed into shareIn operator. Since SharedFlow can be collected from many different Scopes with variable lifecycle durations, does the Scope passed into shareIn operator must be longer lasting than all the collector scopes?

@gajicm93 It does not have to be. When the sharing scope is cancelled, the sharing corouitne stops. It means it no longer collects from the upstream, but it does not affect downstream subscribers. They can still be active, although they will not receive any further updates. Moreover, shared flow's replay cache is still preserved, so new subscribers can appear and will get a snapshot of a replay cache, too.

@gajicm93
Copy link

gajicm93 commented Jun 4, 2020

although they will not receive any further updates.

Thank you, this is the key takeaway for me. So essentially I will want the shareIn scope to be "wider" than any of the collectors.

@1zaman
Copy link
Contributor

1zaman commented Jun 5, 2020

What about providing an operator to collect to an existing MutableSharedFlow with a SharingStarted strategy? This could be defined as a (suspending) terminal operator, and then we don't need to provide a scope as a parameter. Something like this (naming TBD of course):

suspend fun <T> Flow<T>.collectWhen(shared: MutableSharedFlow<T>, started: SharingStarted)

This would be useful for cases where there are multiple upstreams, or the upstream can change, but we still want to have the benefit of using a SharingStarted strategy.

Of course, there can be no buffer fusion here, since buffer configuration is immutable in MutableSharedFlow.

@elizarov
Copy link
Contributor Author

elizarov commented Jun 5, 2020

@1zaman It looks to be too narrow and quite confusing to be provided out-of-the-box and you can always write this kind of collectWhen yourself if needed:

started.commandFlow(shared.subscriberCount).distinctUntilChanged().collectLatest { 
    when (it) {
        SharingCommand.START -> upstream.collect(shared)
        SharingCommand.STOP -> {}
        SharingCommand.STOP_AND_RESET_BUFFER -> shared.resetBuffer()
    }
}

@1zaman
Copy link
Contributor

1zaman commented Jun 6, 2020

Agreed that it's probably a narrow use-case, and might not be warranted in the standard library.

Thinking a bit more on the method definition, it might also be better to define it as an extension method on MutableSharedFlow instead, which takes the upstream Flow as a parameter. Also, I realize now that it's probably not useful to make it a suspending function, since the command flow of the starting strategy might never finish; instead it might better to provide the scope and let it launch a coroutine from that scope, like the stateIn() operator does, and return a Job to allow cancelling the collection.

@elizarov
Copy link
Contributor Author

📣 UPDATE: Some final tweaks in the design and worked out PR #2069 with full implementation:

  • SharingStarted function is renamed to command (dropped Flow suffix from its name).
  • SharingStarted.WhileSubscribed() is now provided only as a function (with default parameters), the constant is dropped.
  • SharingCommand with reset is renamed to STOP_AND_RESET_REPLAY_CACHE consistently with updates to the naming in SharedFlow design (Introduce SharedFlow #2034 (comment)).

@elizarov
Copy link
Contributor Author

📣 UPDATE: PR is now done. Additional changes in the design:

  • initialValue support is dropped from shareIn operator. You can always use onStart { emit(initialValue) } on the upstream flow to mimic it if needed, this snippet is also added to the docs.
  • Replay cache is kept intact on completion or failure of the sharing coroutine, which simplifies the conceptual implementation of shareIn operator. If a special action is need on completion it can be configured with onCompletion operator. The example is given in the shareIn docs.

elizarov added a commit that referenced this issue Jun 15, 2020
Summary of changes:
* SharedFlow, MutableSharedFlow and its constructor.
* StateFlow implements SharedFlow.
* SharedFlow.onSubscription operator, clarified docs in other onXxx operators.
* BufferOverflow strategy in kotlinx.coroutines.channels package.
* shareIn and stateIn operators and SharingStarted strategies for them.
* SharedFlow.flowOn error lint (up from StateFlow).
* Precise cancellable() operator fusion.
* Precise distinctUntilChanged() operator fusion.
* StateFlow.compareAndSet function.
* asStateFlow and asSharedFlow read-only view functions.
* Consistently clarified docs on cold vs hot flows.
* Future deprecation notice for BroadcastChannel, ConflatedBroadcastChannel, broadcast, and broadcastIn.
* Channel(...) constructor function has onBufferOverflow parameter.
* buffer(...) operator has onBufferOverflow parameter.
* shareIn/stateIn buffer and overflow strategy are configured via upstream buffer operators.
* shareIn/stateIn fuse with upstream flowOn for more efficient execution.
* conflate() is implemented as buffer(onBufferOverflow=KEEP_LATEST), non-suspending strategies are reasonably supported with 0 and default capacities.
* Added reactive operator migration hints.
* WhileSubscribed with kotlin.time.Duration params

Fixes #2034
Fixes #2047
@Volatile-Memory
Copy link

I can't wait to use this, I hope it comes out soon like how stateflow came out so quick following it's GitHub issue.

@Volatile-Memory
Copy link

Is this in the new coroutines 1.3.8 release that is part of the Kotlin 1.4RC?

@Volatile-Memory
Copy link

Any updates on when this might come out?

twyatt pushed a commit to twyatt/kotlinx.coroutines that referenced this issue Aug 28, 2020
Summary of changes:
* SharedFlow, MutableSharedFlow and its constructor.
* StateFlow implements SharedFlow.
* SharedFlow.onSubscription operator, clarified docs in other onXxx operators.
* BufferOverflow strategy in kotlinx.coroutines.channels package.
* shareIn and stateIn operators and SharingStarted strategies for them.
* SharedFlow.flowOn error lint (up from StateFlow).
* Precise cancellable() operator fusion.
* Precise distinctUntilChanged() operator fusion.
* StateFlow.compareAndSet function.
* asStateFlow and asSharedFlow read-only view functions.
* Consistently clarified docs on cold vs hot flows.
* Future deprecation notice for BroadcastChannel, ConflatedBroadcastChannel, broadcast, and broadcastIn.
* Channel(...) constructor function has onBufferOverflow parameter.
* buffer(...) operator has onBufferOverflow parameter.
* shareIn/stateIn buffer and overflow strategy are configured via upstream buffer operators.
* shareIn/stateIn fuse with upstream flowOn for more efficient execution.
* conflate() is implemented as buffer(onBufferOverflow=KEEP_LATEST), non-suspending strategies are reasonably supported with 0 and default capacities.
* Added reactive operator migration hints.
* WhileSubscribed with kotlin.time.Duration params

Fixes Kotlin#2034
Fixes Kotlin#2047
elizarov added a commit that referenced this issue Sep 10, 2020
Summary of changes:
* SharedFlow, MutableSharedFlow and its constructor.
* StateFlow implements SharedFlow.
* SharedFlow.onSubscription operator, clarified docs in other onXxx operators.
* BufferOverflow strategy in kotlinx.coroutines.channels package.
* shareIn and stateIn operators and SharingStarted strategies for them.
* SharedFlow.flowOn error lint (up from StateFlow).
* Precise cancellable() operator fusion.
* Precise distinctUntilChanged() operator fusion.
* StateFlow.compareAndSet function.
* asStateFlow and asSharedFlow read-only view functions.
* Consistently clarified docs on cold vs hot flows.
* Future deprecation notice for BroadcastChannel, ConflatedBroadcastChannel, broadcast, and broadcastIn.
* Channel(...) constructor function has onBufferOverflow parameter.
* buffer(...) operator has onBufferOverflow parameter.
* shareIn/stateIn buffer and overflow strategy are configured via upstream buffer operators.
* shareIn/stateIn fuse with upstream flowOn for more efficient execution.
* conflate() is implemented as buffer(onBufferOverflow=KEEP_LATEST), non-suspending strategies are reasonably supported with 0 and default capacities.
* Added reactive operator migration hints.
* WhileSubscribed with kotlin.time.Duration params

Fixes #2034
Fixes #2047

Co-authored-by: Ibraheem Zaman <1zaman@users.noreply.github.com>
Co-authored-by: Thomas Vos <thomasjsvos@gmail.com>
elizarov added a commit that referenced this issue Sep 10, 2020
Summary of changes:
* SharedFlow, MutableSharedFlow and its constructor.
* StateFlow implements SharedFlow.
* SharedFlow.onSubscription operator, clarified docs in other onXxx operators.
* BufferOverflow strategy in kotlinx.coroutines.channels package.
* shareIn and stateIn operators and SharingStarted strategies for them.
* SharedFlow.flowOn error lint (up from StateFlow).
* Precise cancellable() operator fusion.
* Precise distinctUntilChanged() operator fusion.
* StateFlow.compareAndSet function.
* asStateFlow and asSharedFlow read-only view functions.
* Consistently clarified docs on cold vs hot flows.
* Future deprecation notice for BroadcastChannel, ConflatedBroadcastChannel, broadcast, and broadcastIn.
* Channel(...) constructor function has onBufferOverflow parameter.
* buffer(...) operator has onBufferOverflow parameter.
* shareIn/stateIn buffer and overflow strategy are configured via upstream buffer operators.
* shareIn/stateIn fuse with upstream flowOn for more efficient execution.
* conflate() is implemented as buffer(onBufferOverflow=KEEP_LATEST), non-suspending strategies are reasonably supported with 0 and default capacities.
* Added reactive operator migration hints.
* WhileSubscribed with kotlin.time.Duration params

Fixes #2034
Fixes #2047

Co-authored-by: Ibraheem Zaman <1zaman@users.noreply.github.com>
Co-authored-by: Thomas Vos <thomasjsvos@gmail.com>
elizarov added a commit that referenced this issue Sep 10, 2020
Summary of changes:
* SharedFlow, MutableSharedFlow and its constructor.
* StateFlow implements SharedFlow.
* SharedFlow.onSubscription operator, clarified docs in other onXxx operators.
* BufferOverflow strategy in kotlinx.coroutines.channels package.
* shareIn and stateIn operators and SharingStarted strategies for them.
* SharedFlow.flowOn error lint (up from StateFlow).
* Precise cancellable() operator fusion.
* Precise distinctUntilChanged() operator fusion.
* StateFlow.compareAndSet function.
* asStateFlow and asSharedFlow read-only view functions.
* Consistently clarified docs on cold vs hot flows.
* Future deprecation notice for BroadcastChannel, ConflatedBroadcastChannel, broadcast, and broadcastIn.
* Channel(...) constructor function has onBufferOverflow parameter.
* buffer(...) operator has onBufferOverflow parameter.
* shareIn/stateIn buffer and overflow strategy are configured via upstream buffer operators.
* shareIn/stateIn fuse with upstream flowOn for more efficient execution.
* conflate() is implemented as buffer(onBufferOverflow=KEEP_LATEST), non-suspending strategies are reasonably supported with 0 and default capacities.
* Added reactive operator migration hints.
* WhileSubscribed with kotlin.time.Duration params

Fixes #2034
Fixes #2047

Co-authored-by: Ibraheem Zaman <1zaman@users.noreply.github.com>
Co-authored-by: Thomas Vos <thomasjsvos@gmail.com>
Co-authored-by: Travis Wyatt <travis.i.wyatt@gmail.com>
elizarov added a commit that referenced this issue Sep 10, 2020
Summary of changes:
* SharedFlow, MutableSharedFlow and its constructor.
* StateFlow implements SharedFlow.
* SharedFlow.onSubscription operator, clarified docs in other onXxx operators.
* BufferOverflow strategy in kotlinx.coroutines.channels package.
* shareIn and stateIn operators and SharingStarted strategies for them.
* SharedFlow.flowOn error lint (up from StateFlow).
* Precise cancellable() operator fusion.
* Precise distinctUntilChanged() operator fusion.
* StateFlow.compareAndSet function.
* asStateFlow and asSharedFlow read-only view functions.
* Consistently clarified docs on cold vs hot flows.
* Future deprecation notice for BroadcastChannel, ConflatedBroadcastChannel, broadcast, and broadcastIn.
* Channel(...) constructor function has onBufferOverflow parameter.
* buffer(...) operator has onBufferOverflow parameter.
* shareIn/stateIn buffer and overflow strategy are configured via upstream buffer operators.
* shareIn/stateIn fuse with upstream flowOn for more efficient execution.
* conflate() is implemented as buffer(onBufferOverflow=KEEP_LATEST), non-suspending strategies are reasonably supported with 0 and default capacities.
* Added reactive operator migration hints.
* WhileSubscribed with kotlin.time.Duration params

Fixes #2034
Fixes #2047

Co-authored-by: Ibraheem Zaman <1zaman@users.noreply.github.com>
Co-authored-by: Thomas Vos <thomasjsvos@gmail.com>
Co-authored-by: Travis Wyatt <travis.i.wyatt@gmail.com>
@elizarov
Copy link
Contributor Author

elizarov commented Oct 9, 2020

UPDATE: Last-minute design change. There will be no default value for started parameter. It will have to be explicitly specified.

@fluidsonic
Copy link

fluidsonic commented Oct 11, 2020

I still find this confusing to understand. More explanation and examples would be great (in this issue) on:

  • the difference between shareIn and stateIn.
    I guess it's just the the last emitted value becomes accessible, but otherwise the behavior stays the same.
  • when the upstream is hot and when it is cold. And how does that relate to the replay cache usage?
  • typical use cases.

The network request example was brought up which is exactly what I'm struggling with at the moment, but in a slightly different manner.

Instead of just fetching data from a server once and then push it downstream, I want include refreshes.

  • Once data gets stale, it needs to be refetched automatically.
  • Refreshes should happen continuously only while the Flow is hot.
  • State should be preserved while while the Flow is cold.
  • The resulting Flow should be shared, otherwise I'll load the data from same source multiple times independently of each other.

Here's a quick example how it could look like in a fully hot scenario.
The questions are now, how can I use StateFlow and SharedFlow to

  • start/stop the refresh logic depending on whether there are collectors or not.
  • maintain the last state while the there are no collectors.
  • ensure that the refresh logic is not running multiple times in parallel if multiple collectors collect the same Flow.
import java.time.*
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

data class Data(val num: Int, val expires: Instant)

fun CoroutineScope.fetchData(initial: Data): Flow<Data> =
    flow {
        var data = initial

        log("I: $initial")
        emit(initial)

        while (true) {
            delay(Duration.between(Instant.now(), data.expires).toMillis())
            data = updateData(data).also { log("U: $it") }
            emit(data)
        }
    }
        .broadcastIn(this)
        .asFlow()

fun log(message: String) = println("${Instant.now()} $message")
fun updateData(data: Data) = Data(num = data.num + 1, expires = Instant.now() + Duration.ofSeconds(3))

suspend fun main(): Unit = coroutineScope {
    val dataFlow = fetchData(initial = Data(num = 1, expires = Instant.now() + Duration.ofSeconds(7)))
    log("-- Flow created. No collectors. Starting in 5s…")
    delay(5_000)
    log("-- Adding 2 collectors")
    coroutineScope {
        launch { dataFlow.take(3).onCompletion { log("-- Removed collector 1") }.collect { log("1: $it") } }
        launch { dataFlow.take(5).onCompletion { log("-- Removed collector 2") }.collect { log("2: $it") } }
    }
    delay(5_000)
    log("-- Adding 1 collector")
    dataFlow.take(2).collect { log("3: $it") }
    log("-- Done")

    coroutineContext.cancelChildren()
}
2020-10-11T15:03:52.628690Z -- Flow created. No collectors. Starting in 5s…
2020-10-11T15:03:57.647744Z -- Adding 2 collectors
2020-10-11T15:03:57.675430Z I: Data(num=1, expires=2020-10-11T15:03:59.593134Z)
2020-10-11T15:03:57.681988Z 2: Data(num=1, expires=2020-10-11T15:03:59.593134Z)
2020-10-11T15:03:57.682021Z 1: Data(num=1, expires=2020-10-11T15:03:59.593134Z)
2020-10-11T15:03:59.597450Z U: Data(num=2, expires=2020-10-11T15:04:02.597321Z)
2020-10-11T15:03:59.598067Z 1: Data(num=2, expires=2020-10-11T15:04:02.597321Z)
2020-10-11T15:03:59.598123Z 2: Data(num=2, expires=2020-10-11T15:04:02.597321Z)
2020-10-11T15:04:02.598584Z U: Data(num=3, expires=2020-10-11T15:04:05.598486Z)
2020-10-11T15:04:02.599157Z 2: Data(num=3, expires=2020-10-11T15:04:05.598486Z)
2020-10-11T15:04:02.600014Z 1: Data(num=3, expires=2020-10-11T15:04:05.598486Z)
2020-10-11T15:04:02.610756Z -- Removed collector 1
2020-10-11T15:04:05.602926Z U: Data(num=4, expires=2020-10-11T15:04:08.602840Z)
2020-10-11T15:04:05.603526Z 2: Data(num=4, expires=2020-10-11T15:04:08.602840Z)
2020-10-11T15:04:08.606803Z U: Data(num=5, expires=2020-10-11T15:04:11.606700Z)
2020-10-11T15:04:08.607280Z 2: Data(num=5, expires=2020-10-11T15:04:11.606700Z)
2020-10-11T15:04:08.607662Z -- Removed collector 2
2020-10-11T15:04:11.609861Z U: Data(num=6, expires=2020-10-11T15:04:14.609795Z) // upstream still hot
2020-10-11T15:04:13.610290Z -- Adding 1 collector
2020-10-11T15:04:14.609555Z U: Data(num=7, expires=2020-10-11T15:04:17.609471Z)
2020-10-11T15:04:14.609942Z 3: Data(num=7, expires=2020-10-11T15:04:17.609471Z)
2020-10-11T15:04:17.609524Z U: Data(num=8, expires=2020-10-11T15:04:20.609459Z)
2020-10-11T15:04:17.609707Z 3: Data(num=8, expires=2020-10-11T15:04:20.609459Z)
2020-10-11T15:04:17.609858Z -- Done

Here's an improved example with a rudimentary shareIn that uses BroadcastChannel. I think it does the right thing but I'm not sure.

import java.time.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.sync.*

data class Data(val num: Int, val expires: Instant)

fun fetchData(initial: Data): Flow<Data> {
    var data = initial

    return flow {
        log("I: $data")
        emit(data)

        while (true) {
            delay(Duration.between(Instant.now(), data.expires).toMillis())
            data = updateData(data).also { log("U: $it") }
            emit(data)
        }
    }
}

fun <T> Flow<T>.shareIn(scope: CoroutineScope): Flow<T> {
    val upstream = this
    var broadcastChannel: BroadcastChannel<T>? = null
    val mutex = Mutex()
    var subscriberCount = 0

    return flow {
        val activeChannel = mutex.withLock {
            subscriberCount += 1
            broadcastChannel ?: upstream.broadcastIn(scope).also {
                broadcastChannel = it
                log("-- now hot")
            }
        }

        try {
            activeChannel.consumeEach { emit(it) }
        } finally {
            mutex.withLock {
                subscriberCount -= 1

                if (subscriberCount == 0) {
                    broadcastChannel?.cancel()
                    broadcastChannel = null
                    log("-- now cold")
                }
            }
        }
    }
}

fun log(message: String) = println("${Instant.now()} $message")
fun updateData(data: Data) = Data(num = data.num + 1, expires = Instant.now() + Duration.ofSeconds(3))

suspend fun main(): Unit {
    coroutineScope {
        val dataFlow = fetchData(initial = Data(num = 1, expires = Instant.now() + Duration.ofSeconds(7)))
            .shareIn(this)

        log("-- Flow created. No collectors. Starting in 5s…")
        delay(5_000)
        log("-- Adding 2 collectors")
        coroutineScope {
            launch { dataFlow.take(3).onCompletion { log("-- Removed collector 1") }.collect { log("1: $it") } }
            launch { dataFlow.take(5).onCompletion { log("-- Removed collector 2") }.collect { log("2: $it") } }
        }
        delay(5_000)
        log("-- Adding 1 collector")
        dataFlow.take(2).collect { log("3: $it") }
    }

    log("-- Done")
}
2020-10-11T15:33:28.882805Z -- Flow created. No collectors. Starting in 5s…
2020-10-11T15:33:33.907860Z -- Adding 2 collectors
2020-10-11T15:33:33.949222Z -- now hot
2020-10-11T15:33:33.951699Z I: Data(num=1, expires=2020-10-11T15:33:35.871034Z)
2020-10-11T15:33:33.955681Z 1: Data(num=1, expires=2020-10-11T15:33:35.871034Z)
2020-10-11T15:33:33.955679Z 2: Data(num=1, expires=2020-10-11T15:33:35.871034Z)
2020-10-11T15:33:35.871934Z U: Data(num=2, expires=2020-10-11T15:33:38.871850Z)
2020-10-11T15:33:35.872275Z 1: Data(num=2, expires=2020-10-11T15:33:38.871850Z)
2020-10-11T15:33:35.872317Z 2: Data(num=2, expires=2020-10-11T15:33:38.871850Z)
2020-10-11T15:33:38.873083Z U: Data(num=3, expires=2020-10-11T15:33:41.872996Z)
2020-10-11T15:33:38.873712Z 2: Data(num=3, expires=2020-10-11T15:33:41.872996Z)
2020-10-11T15:33:38.874460Z 1: Data(num=3, expires=2020-10-11T15:33:41.872996Z)
2020-10-11T15:33:38.877840Z -- Removed collector 1
2020-10-11T15:33:41.874234Z U: Data(num=4, expires=2020-10-11T15:33:44.874107Z)
2020-10-11T15:33:41.874722Z 2: Data(num=4, expires=2020-10-11T15:33:44.874107Z)
2020-10-11T15:33:44.878920Z U: Data(num=5, expires=2020-10-11T15:33:47.878837Z)
2020-10-11T15:33:44.879452Z 2: Data(num=5, expires=2020-10-11T15:33:47.878837Z)
2020-10-11T15:33:44.881969Z -- now cold
2020-10-11T15:33:44.882125Z -- Removed collector 2
2020-10-11T15:33:49.884995Z -- Adding 1 collector
2020-10-11T15:33:49.885976Z -- now hot
2020-10-11T15:33:49.886278Z I: Data(num=5, expires=2020-10-11T15:33:47.878837Z)
2020-10-11T15:33:49.886484Z 3: Data(num=5, expires=2020-10-11T15:33:47.878837Z)
2020-10-11T15:33:49.886677Z U: Data(num=6, expires=2020-10-11T15:33:52.886645Z)
2020-10-11T15:33:49.886817Z 3: Data(num=6, expires=2020-10-11T15:33:52.886645Z)
2020-10-11T15:33:49.887226Z -- now cold
2020-10-11T15:33:49.887507Z -- Done

@fluidsonic
Copy link

The shareIn and stateIn documentation in the pull request is very helpful to better understand the concept 👍

It answers most of my questions above.

  • There's still no explanation why there are shareIn and stateIn. I assume it's really just to have the latest value accessible. In that case the examples should make that clear. Both operators have similar example code and it's difficult to decide what operator to choose.
  • The shareIn example code should use .shareIn(scope, replay = 0) instead of .shareIn(scope, 0). Confusing magic number.

@elizarov
Copy link
Contributor Author

@fluidsonic It all boils down on whether you work with a state of something (and only need the latest most recent value of this state) or with a shared stream of events

@elizarov
Copy link
Contributor Author

UPDATE: Very last-minute design change: The order of parameters in shareIn in schanged to be consistent with stateIn. The would both accept CoroutineScope and SharingStarted as the first two parameters, followed by additional operator-specific parameters: initial value for stateIn and replay size of shareIn. Morever, since "replay" is an optional feature of a shared flow we'll make it an optional parameters so that you don't have to specify a magic value of zero to opt out of the replay when you don't need it.

@fluidsonic
Copy link

@fluidsonic It all boils down on whether you work with a state of something (and only need the latest most recent value of this state) or with a shared stream of events

@elizarov ah, I've completely missed that a StateFlow is conflating. It takes a while to commit all these details into memory.

recheej pushed a commit to recheej/kotlinx.coroutines that referenced this issue Dec 28, 2020
* Introduce SharedFlow and sharing operators

Summary of changes:
* SharedFlow, MutableSharedFlow and its constructor.
* StateFlow implements SharedFlow.
* SharedFlow.onSubscription operator, clarified docs in other onXxx operators.
* BufferOverflow strategy in kotlinx.coroutines.channels package.
* shareIn and stateIn operators and SharingStarted strategies for them.
* SharedFlow.flowOn error lint (up from StateFlow).
* Precise cancellable() operator fusion.
* Precise distinctUntilChanged() operator fusion.
* StateFlow.compareAndSet function.
* asStateFlow and asSharedFlow read-only view functions.
* Consistently clarified docs on cold vs hot flows.
* Future deprecation notice for BroadcastChannel, ConflatedBroadcastChannel, broadcast, and broadcastIn.
* Channel(...) constructor function has onBufferOverflow parameter.
* buffer(...) operator has onBufferOverflow parameter.
* shareIn/stateIn buffer and overflow strategy are configured via upstream buffer operators.
* shareIn/stateIn fuse with upstream flowOn for more efficient execution.
* conflate() is implemented as buffer(onBufferOverflow=KEEP_LATEST), non-suspending strategies are reasonably supported with 0 and default capacities.
* Added reactive operator migration hints.
* WhileSubscribed with kotlin.time.Duration params

Fixes Kotlin#2034
Fixes Kotlin#2047

Co-authored-by: Ibraheem Zaman <1zaman@users.noreply.github.com>
Co-authored-by: Thomas Vos <thomasjsvos@gmail.com>
Co-authored-by: Travis Wyatt <travis.i.wyatt@gmail.com>
recheej pushed a commit to recheej/kotlinx.coroutines that referenced this issue Dec 28, 2020
* Introduce SharedFlow and sharing operators

Summary of changes:
* SharedFlow, MutableSharedFlow and its constructor.
* StateFlow implements SharedFlow.
* SharedFlow.onSubscription operator, clarified docs in other onXxx operators.
* BufferOverflow strategy in kotlinx.coroutines.channels package.
* shareIn and stateIn operators and SharingStarted strategies for them.
* SharedFlow.flowOn error lint (up from StateFlow).
* Precise cancellable() operator fusion.
* Precise distinctUntilChanged() operator fusion.
* StateFlow.compareAndSet function.
* asStateFlow and asSharedFlow read-only view functions.
* Consistently clarified docs on cold vs hot flows.
* Future deprecation notice for BroadcastChannel, ConflatedBroadcastChannel, broadcast, and broadcastIn.
* Channel(...) constructor function has onBufferOverflow parameter.
* buffer(...) operator has onBufferOverflow parameter.
* shareIn/stateIn buffer and overflow strategy are configured via upstream buffer operators.
* shareIn/stateIn fuse with upstream flowOn for more efficient execution.
* conflate() is implemented as buffer(onBufferOverflow=KEEP_LATEST), non-suspending strategies are reasonably supported with 0 and default capacities.
* Added reactive operator migration hints.
* WhileSubscribed with kotlin.time.Duration params

Fixes Kotlin#2034
Fixes Kotlin#2047

Co-authored-by: Ibraheem Zaman <1zaman@users.noreply.github.com>
Co-authored-by: Thomas Vos <thomasjsvos@gmail.com>
Co-authored-by: Travis Wyatt <travis.i.wyatt@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests