Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce SharedFlow #2034

Closed
elizarov opened this issue May 18, 2020 · 79 comments
Closed

Introduce SharedFlow #2034

elizarov opened this issue May 18, 2020 · 79 comments

Comments

@elizarov
Copy link
Contributor

elizarov commented May 18, 2020

Introduction

There is a need to have a Flow implementation that is hot (always active independently of collectors) and shares emitted values among all collectors that subscribe to it. Its design generalizes StateFlow beyond the narrow set of use-case it supports (see #1973). Previously it was discussed in various issues under the tentative name of EventFlow, but having analyzed use-cases and worked out the draft design we believe that SharedFlow better describes what it is. It will replace all sorts of BroadcastChannel implementations and serves as a basis for the design of shareIn operator. The simplest version of sharing operator builds a combination of a SharedFlow that downstream collects from and an upstream flow that emits into this SharedFlow. See #2047 for details on sharing operators.

Use-cases

Design overview

The design has two interfaces SharedFlow/MutableSharedFlow just like SharedState/MutableSharedState:

interface SharedFlow<out T> : Flow<T> {
    val replayCache: List<T>
}

interface MutableSharedFlow<T> : SharedFlow<T>, FlowCollector<T> {
    fun tryEmit(value: T): Boolean
    val subscriptionCount: StateFlow<Int>
    fun resetReplayCache()
}
  • SharedFlow is a regular Flow plus:
    • replayCache is a snapshot of the current replay cache for non-reactive use (show dialog, etc).
  • MutableSharedFlow is a FlowCollector (upstream flow can directly emit to it) plus:
    • tryEmit - non suspending variant of emit (for regular non-suspending event listeners, named consistently with upcoming trySend, see SendChannel.offer should never throw #974);
    • subscriptionCount - exposes the current number of subscriptions (active collectors) as a StateFlow, which allows to easily implement any kind of advanced upstream data connection policy if needed;
    • resetReplayCache - resets the replay cache. It is useful for very transient data that should not be replayed to collectors when its upstream is not active.

Instances of MutableSharedFlow are created with the constructor function:

fun <T> MutableSharedFlow(
    replay: Int = 0,
    extraBufferCapacity: Int = 0,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): MutableSharedFlow<T>

It has the following parameters:

  • replay - how many items are replayed to new subscribers (optional, defaults to zero);
  • extraBufferCapacity - how many items are buffered in addition to replay so that emit does not suspend while there is a buffer remaining (optional, defaults to zero);
  • onBufferOverflow - configures an action on buffer overflow (optional, defaults to suspending emit call, supported only when replay > 0 or extraBufferCapacity > 0).

Buffer overflow

The case of the shared flow without buffer (created with MutableSharedFlow()) is special. It is a rendezvous shared flow where each emit call suspends until all collectors process the emitted value. tryEmit call on such a flow never succeeds and always returns false.

In a case with buffering the call to emit may ether put the value into the buffer and return immediately (tryEmit does the same and returns true in this case) or, if there are slow collectors that have not processed previously buffered values yet, emit call will suspend (tryEmit does nothing and returns false in this case). The action on buffer overflow can be configured with onBufferOverflow parameter, which essentially configures what should be sacrificed on overflow:

enum class BufferOverflow {
    SUSPEND,     // default behavior
    DROP_OLDEST, // ~ conflate() operator
    DROP_LATEST  // ~ dropWhenBusy() operator
}

If anything other than the default SUSPEND is configured, then emit never suspends and tryEmit always returns true, handling the buffer overflow by either DROP_OLDEST strategy (drops the oldest value in the buffer, the slow collectors will not see it, makes sure that latest value is kept) or DROP_LATEST strategy (drops the newly emitted value, keeps the old buffered values).

Example usage in code

class EventBus {
    private val _events = MutableSharedFlow<Event>() // no buffer, rendezvous with subscribers
    public val events: Flow<Event> get() = _events // expose as a plain flow
    
    suspend fun produceEvent(event: Event) {
        _events.emit(event) // suspends until all subscribers receive it
    }
}

Error handling

The big elephant in the room of SharedFlow design is error handling. BroadcastChannel design and similar primitives in Rx propagate errors from upstream to downstream. We could do the same, but propagating errors immensely complicates the design and creates the following problems:

  • Caching and replay design has to take the “upstream error” state into consideration. Does the error state count against the buffer size? If the error is cleared on resetBuffer or is there a separate way to reset the error state? etc...
  • Upstream might encounter error before a single downstream collector appears or after all downstream collectors are canceled. Now, a potentially fatal error report just sits there, cached in SharedFlow for unbounded time, and is not being reported to any kind of error-handling code. This is not good for reliable applications.

What are the actual use-cases for error handling? What needs to happen when there is an error in the upstream? It looks like the following error handling strategies observed in the wild account for the majority of error-handling use-cases:

  • Retry on errors: This is a popular strategy when upstream flow is retrieving data via unreliable network.
  • Materialize errors: This is a popular strategy in UI applications to make it easier to propagate error via API layers so that they can be presented in the UI.

Both of the strategies already have existing Flow APIs to retry and to catch errors and they can be used on the upstream flow before emitting events to the SharedFlow. In essence, all kinds of completion and error states can be always materialized as emitted values to share them. This way, sharing design becomes completely decoupled and orthogonal to error handling.

Note, that sharing operators will always have a CoroutineScope they work in and will benefit from structured concurrency (see discussion #1261). If any error is not caught and reaches a sharing operator this scope gives a natural and consistent point to promptly report all the unhandled errors so that application-wide mechanisms can properly log or otherwise report them.

SharedFlow never completes

As a side-effect of this error-handling design decision, the SharedFlow never completes. A call collect { ... } on a SharedFlow must be canceled to terminate it. However, if completion is needed, then it can always be materialized by a special emitted value. A collector can apply takeWhile operator to complete the resulting flow when this special value is encountered.

Deprecation of BroadcastChannel

The SharedFlow is designed to completely replace all kinds of BroadcastChannel implementations. They will be supported but will be deprecated as soon as SharedFlow becomes stable:

  • ConflatedBroadcastChannel()MutableStateFlow()
  • BroadcastChannel(capacity)MutableSharedFlow(replay = 0, extraBufferCapacity = capacity)

MutableSharedFlow is not a drop-in replacement for BroadcastChannel. There are the following major functional differences that have to be addressed during replacement:

  • BroadcastChannel is actually capable of directly representing an error state (and causes the error-handling problem described above, that one this being one of the reasons to deprecate it). So, a code that was relying on this error-handling capability of BroadcastChannel will have to be changed to materialize those errors using catch operator.
  • BroadcastChannel supports various advanced Channel APIs, like select expression. If needed on the downstream side, the resulting SharedFlow can be converted to a channel via produceIn operator, if needed on the upstream side, the channelFlow builder can be used.

StateFlow is SharedFlow

Last, but not the least, StateFlow (see #1973) is trivially retrofitted to implement a SharedFlow with MutableStateFlow implementing MutableSharedFlow:

  • replayCache is singleton list of the current value;
  • trivial tryEmit, emit that could be useful for composition (can emit another flow into StateFlow)
  • subscriptionCount could be useful for state flows, too.
  • resetReplayCache is not supported.

With this change, SharedFlow consistently becomes the only hot flow in the library; easy to see and learn by its type. When something is a SharedFlow it means that it is always live regardless of the presence of collector and all emissions are shared among collectors.

StateFlow becomes a special-purpose, high-performance, and efficient implementation of SharedFlow for narrow, but widely used case of sharing a state. In fact, a shared flow behaves identically to a state flow when it is created with the following parameters and distinctUntilChanged operator is applied to it:

// MutableStateFlow(initialValue) is a shared flow with the following parameters:
val shared = MutableSharedFlow(
    replay = 1,
    onBufferOverflow = BufferOverflow.DROP_OLDEST,
)
shared.tryEmit(initialValue) // emit the initial value
val state = shared.distinctUntilChanged() // get StateFlow-like behavior

The above information will be added to StateFlow documentation, thus giving easily actionable advice on what to do when you have a use-case that is almost like a StateFlow, but with some tweaks (more buffer space, support duplicate emissions, does not have an initial value, etc).

onSubscription operator

The following operator that is specific to shared flows is added:

fun <T> SharedFlow<T>.onSubscription(action: suspend FlowCollector<T>.() -> Unit): SharedFlow<T>

It is similar to onStart with one big difference. onStart calls the action before a subscription to the shared flow is established. It means that if onStart action initiates an operation that emits the values into the shared flow, there is no guarantee that those emissions will be received by this downstream collector. However, onSubscription calls the action after a subscription to the shared flow is established, guaranteeing the reception of all the emitted values after this moment (assuming they are not dropped on buffer overflow).

sharedFlow.onStart { sharedFlow.emit("x") }.onEach { /* may miss "x" */ }
sharedFlow.onSubscription { sharedFlow.emit("y") }.onEach { /* guaranteed to collect "x" */ }

Implementation

Implementation is in PR #2069.

@Virgiel
Copy link

Virgiel commented May 18, 2020

I really like the personalization possibilities provided by this proposal!

Would it be possible to have a custom conflation function?
Something like that :

typealias Conflation <T> = (old: T, new: T) -> T

I actually use a ConflatedBroadcastChannel to synchronize a list state.
To avoid unnecessary calculation by the UI I have different change events such as: SelectionChange(ids), OrderChange, etc.
If I buffer these changes, the UI could compute selections twice if there are two SelectionChange buffered, I wish I could merge them to avoid this, but I have to do it atomically.

@pacher
Copy link

pacher commented May 18, 2020

First impression: Loved it, up until the point of error handling.
In my opinion error propagation from upstream and handling by (each) collector/subscriber is one of, if not the most important advantages of reactive design as a whole.
I do agree that it is a hard and complicated thing to do, but this is exactly where we need help from smart library designers to get ideas and implement best solutions.

As I understood, your proposed solution is to introduce another container type holding value|error|completed, wrap incoming values from upstream (additional allocation) and somehow unwrap them after SharedFlow before downstream to "manually" fail/complete every collector. Each of us will have to do this exercise independently, but this pattern looks general enough to be abstracted away. Can we ask for a well-designed example how to do this? Some best practice guidance?

Simple example: data coming in from the network and shared via SharedFlow. One of collectors is writing it to the file on disk. So it needs to know when the flow completes to flush/close the file and in case of server error to move incomplete file to some other location and write an email to admin about the incident. Other collectors might need to perform other actions in case of errors and completion so one central handler on the upstream is not helpful. We are too much used to onError/onComplete combo from Rx.

@elizarov
Copy link
Contributor Author

elizarov commented May 18, 2020

As I understood, your proposed solution is to introduce another container type holding value|error|completed, wrap incoming values from upstream (additional allocation) and somehow unwrap them after SharedFlow before downstream to "manually" fail/complete every collector. Each of us will have to do this exercise independently, but this pattern looks general enough to be abstracted away. Can we ask for a well-designed example how to do this? Some best practice guidance?

@pacher We are considering the option of providing a ready-to-use wrapper and operators to materialize/dematerialize completion and errors for sharing so that you don't have to write them yourself. The core design presented here is mostly aimed at sharing never-ending event flows. However, in order to design those we need to closely look at use-cases of sharing limited-sizes flows:

Simple example: data coming in from the network and shared via SharedFlow. One of collectors is writing it to the file on disk ...

This is the most quoted use-case we've seen so far and it does not seem to fold nicely into a sharing design at all. This use-case is all about replicating an incoming flow in a number of downstream flows. The key difference from sharing here is that you know the number of those flows in advance and you know that all of them must receive every emitted value without loss. None of the problems of error-handling we see with sharing apply to this use-case. It seems that a dedicated replication operation would be a better fit for this use-case and this replication operation will naturally support completion and error propagation out-of-the-box. See also my reply on #1261 (comment)

@lukas1
Copy link

lukas1 commented May 18, 2020

Both of the strategies already have existing Flow APIs to retry and to catch errors and they can be used on the upstream flow before emitting events to the SharedFlow.

This may not be ideal for retrying use case; I may want to share the retry strategy of upstream and downstream errors, but have different retry strategy between different collectors of the same SharedFlow.

The proposed solution, if I understand correctly, then would be to catch the error, materialize it some other wrapper type, then dematerialize it downstream to the actual error, and apply the retry logic in the downstream.

That certainly works, but it's a little bit convoluted solution. I'd prefer just writing a retry block downstream, without having to modify anything upstream.

If there's no plan to address error handling in any other way, then at such point, it would be at least useful to have a SharedFlow that already uses materialization by default, because for many use cases, this will probably be the version, that most people may want to use [citation needed].

@elizarov
Copy link
Contributor Author

@lukas1

This may not be ideal for retrying use case; I may want to share the retry strategy of upstream and downstream errors, but have different retry strategy between different collectors of the same SharedFlow.

Could you elaborate on this a bit, please? A gist with a code example would be helpful, too. I don't understand how different collectors for a shared (single) upstream can have different retry strategies in any useful sense.

@pacher
Copy link

pacher commented May 18, 2020

@elizarov Alright, in this case thumbs up! Maybe we should mention it in the description somehow, and maybe you can wait with deprecating channels until all those goodies are here.

As for the second part, I still believe that sharing needs a mechanism of error propagation and I don't see how the number and timing of collectors (difference to replicating) affects that.

My use case: I want to open websocket connection and expose never-ending stream of incoming events/messages as Flow. I just want to provide API method returning Flow so that any number of clients at any time can grab it and do whatever they want with it. This seems to check all the boxes for SharedFlow. I have an expensive and error-prone function transforming network text message into useful value, with parsing json etc. Since it's expensive, I want to map upstream flow once, before sharing. If something happens to the connection, then I can use retry logic on the upstream as suggested. But if my transformation throws a parsing error, there is nothing to retry, but this type of error I need to propagate to the clients of my API.

@lukas1
Copy link

lukas1 commented May 18, 2020

Coming from Android, one simple example. Let's say my app has a shopping cart. I can get number of items in the shopping cart and its contents in one call... Let's say I have two seperate screens in the app, one that displays the whole shopping cart content and one, that just displays the number.

On the screen where just number is displayed, perhaps I'd want the retry strategy to be something like, try 3-times and then fail (and don't show the number, it's not as critical).

On the screen with contents of the shopping cart, I'd like the user to be able to use a retry button.

The error that creates the necessity to retry could come from the shared upstream flow that made request to server, or it could be caused by some further processing downstream (for example combining it with other data from another REST API relevant only for that one single downstream -> such as some additional information about products in the shopping cart, that are not included in the original response to shopping cart contents)

@elizarov
Copy link
Contributor Author

elizarov commented May 18, 2020

My use case: I want to open websocket connection and expose never-ending stream of incoming events/messages as Flow. I just want to provide API method returning Flow so that any number of clients at any time can grab it and do whatever they want with it. This seems to check all the boxes for SharedFlow. I have an expensive and error-prone function transforming network text message into useful value, with parsing json etc. Since it's expensive, I want to map upstream flow once, before sharing. If something happens to the connection, then I can use retry logic on the upstream as suggested. But if my transformation throws a parsing error, there is nothing to retry, but this type of error I need to propagate to the clients of my API.

@pacher With shareIn operator (to be fully introduced later) implementation I can suggest is:

websocketFlow() // your upstream
    .retry( ... ) // configure retry for the websocket the way you want it
    .map {
         try { ParseSuccess(parse(it)) } // wrapping fail-prone parsing into try/catch
         catch (e: ParseException) { ParseFailure(e) } // ... and materialize parsing exception
     } 
    .shareIn(scope) // now we have a SharedFlow<ParsedResult>

This solution ensures that a failure to parse each individual message can be processed by clients independently (they get either ParseSuccess or ParseFailure or each upstream message), so that a failure to process a single message does not abort the whole flow. Also, note that we cath only a specially designated ParseException that is supposed to represent invalid input to parse function. Any other exception (like NullPointerException) means there is a bug somewhere in the parse code. That exception would get caught by the scope and immediately get reported to developers as a proper bug even when there are no collectors to observe it.

@elizarov
Copy link
Contributor Author

Coming from Android, one simple example. Let's say my app has a shopping cart. I can get number of items in the shopping cart and its contents in one call... Let's say I have two seperate screens in the app, one that displays the whole shopping cart content and one, that just displays the number.

@lukas1 It does not seem that you need a SharedFlow for this use-case at all. You can use a regular cold Flow to represent a shopping cart request or, even better, since you are just working with a simple REST call returning a single answer, use a suspending fuction (no need to use flows at all). Anyway, either a plain flow or a suspending function — each screen can use its own retry logic.

@pacher
Copy link

pacher commented May 18, 2020

@elizarov Thanks, I start to see a bigger picture now. SharedFlow is more of a fundamental building block. I shall wait for shareIn, materializing and other great stuff you mentioned here. Excuse my impatience.

Another question: will there be an option for unlimited buffer backed by linkedlist like in channels?
I would also really like to have an Error strategy for BufferOverflow. I see that we can not have it on SharedFlow because of error handling design, but maybe we can have it in shareIn and friends?

@lukas1
Copy link

lukas1 commented May 18, 2020

@elizarov If I use a regular cold Flow I'd do two requests. I don't want to do two requests. I just want to make one request and share the loaded data.

In principle I agree that in this case I could use regular suspending function, but having the data represented as flow gives me opportunity to use other operators on it.

Another possibility is, that the data about shopping cart could come from source other than REST, it could be a websocket that could send more messages than one. (Shopping cart example is not useful for this one, but in the past I have worked with a websocket that actually returned two messages over the socket for a single request)

@ZakTaccardi
Copy link

It seems weird to me that DROP_OLDEST is analogous to CONFLATE but is named differently. I'm not saying that DROP_OLDEST is the wrong name though as it is more descriptive by itself than CONFLATE. In fact, CONFLATE is a very confusing name because it's just not very descriptive

@ZakTaccardi
Copy link

fun tryEmit(value: T): Boolean

This behaves exactly like channel.offer(..) but is named differently. It might be good be consistent here. I will say though that tryEmit(..) is a more descriptive name than offer(..)

@ZakTaccardi
Copy link

ZakTaccardi commented May 18, 2020

Will SharedFlow<T> support the following use case:

Be "active" (hot) while subscriber count >= 1, and shut down when subscriber count = 0?

And the SharedFlow<T> instance could switch between being active/inactive multiple times as the subscriber count alternates between 0 and >=1?

@zach-klippenstein
Copy link
Contributor

zach-klippenstein commented May 18, 2020

I will say though that tryEmit(..) is a more descriptive name than offer(..)

#974 discusses renaming offer to trySend.

Will SharedFlow<T> support the following use case:

I believe so, although that particular implementation would probably live in shareIn or a similar operator. E.g.

fun Flow<T>.refCountIn(scope: CoroutineScope): SharedFlow<T> {
  val upstream = this
  val sharedFlow = MutableSharedFlow<T>()
  scope.launch {
    sharedFlow.collectorsCount
      .filter { it <= 1 }
      .collectLatest { count ->
        if (count == 1) {
          // This collection will be cancelled if collectorsCount drops back to 0.
          upstream.collect(sharedFlow)
        }
      }
  }
  return sharedFlow
}

You could also add parameters to forward to MutableSharedFlow for cache size, buffering, overflow behavior, etc.

@elizarov
Copy link
Contributor Author

Another question: will there be an option for unlimited buffer backed by linkedlist like in channels?

@pacher You can specify bufferCapacity = Int.MAX_VALUE. It will grow as needed up to 2^30 items (that's the limit in the current prototype). If you have use-cases for larger sizes let's discuss them separately. They can be fit if needed into the same basic implementation.

I would also really like to have an Error strategy for BufferOverflow. I see that we can not have it on SharedFlow because of error handling design, but maybe we can have it in shareIn and friends?

What is the Error strategy supposed to do? And what is the use-case? Why is it needed?

@elizarov
Copy link
Contributor Author

elizarov commented May 18, 2020

It seems weird to me that DROP_OLDEST is analogous to CONFLATE but is named differently. I'm not saying that DROP_OLDEST is the wrong name though as it is more descriptive by itself than CONFLATE. In fact, CONFLATE is a very confusing name because it's just not very descriptive

@ZakTaccardi Indeed. We are actually looking at a separate (less-priority) design on how to retrofit this BufferStrategy into channels. Something along the lines of adding an optional bufferOverflow parameter to the Channel(...) construction function. If this design works out (not sure yet), it will also add bufferOverflow parameter to Flow.buffer(...) operator and we could then go on to deprecate both Channel.CONFLATED constant and Flow.conflated() operator.

The conceptual difference is that it only makes sense to call it conflation with a buffer size of 1. For larger sizes, neither the name nor the concept works correctly. To generalize it to a larger buffer size we need a more explicit name like DROP_OLDEST. This is an imperative (says how it works) name. We'd prefer to have a more declarative name, but we have not found one yet.

@elizarov
Copy link
Contributor Author

Will SharedFlow<T> support the following use case: Be "active" (hot) while subscriber count >= 1, and shut down when subscriber count = 0? And the SharedFlow<T> instance could switch between being active/inactive multiple times as the subscriber count alternates between 0 and >=1

@ZakTaccardi SharedFlow itself does not do anything. It is a container. But you can implement this behavior with it. That's why it provides collectorsCount. You can watch the number of active collectors to the shared flow and implement the logic to start/stop the upstream flow going into the shared flow appropriately. The upcoming shareIn operator is planned to provide both of those behaviors out-of-the-box without you having to write this code yourself.

@Dominaezzz
Copy link
Contributor

A bit of a nitpic, I think collectorsCount should be collectorCount.

@elizarov
Copy link
Contributor Author

Would it be possible to have a custom conflation function? (old: T, new: T) -> T

@Virgiel We could, but it is harder to fit into the general framework of the shared flow. Right now a slow collector can only "miss" an event that was dropped out of the buffer with DROP_OLDEST. Otherwise every collector gets the same sequence of values. It makes it easy to reason about sharing.

With a custom conflation there is a possibility that some collector was fast enough to receive an old value before it was conflated, but others were slow and were affected by this custom conflation, receiving a merged combination of old and new event instead. It is both harder to explain and harder to implement properly.

What we can do for this use-case is to provide this kind of custom-conflation operator out-of-the-box (Flow.conflate with lambda?), but separately from sharing. Then you'll be able to to use this conflation operation before or after sharing to get the following behaviors:

  • Conflate before sharing: If all collectors are slow, then upstream events are getting conflated.
  • Conflate after sharing: Fast collectors get unconflated events, slow collectors conflate events, but they don't share CPU effort of conflation.

Now, this solution cannot ensure that only slow collectors conflate events and all slow collectors share CPU effort to conflate at the same time. I'm not sure that getting this last bit of improvement in CPU consumption is really worthy of added complexity in implementation.

As a side-note, the rationale to pick this set of configuration parameters to MutableSharedFlow constructor was based on use-cases analysis and on the need to provide a seamless migration path from StateFlow to SharedFlow for those who seek flexibility in state flow. If not for the latter, based on the use-cases alone, it does not seem worth even adding distinctUntilChanged parameter, as it can be always bolted on by a separate operator with little overhead, but distinctUntilChanged was not hard to add and provided this state flow transition path we need.

elizarov added a commit that referenced this issue Sep 10, 2020
Summary of changes:
* SharedFlow, MutableSharedFlow and its constructor.
* StateFlow implements SharedFlow.
* SharedFlow.onSubscription operator, clarified docs in other onXxx operators.
* BufferOverflow strategy in kotlinx.coroutines.channels package.
* shareIn and stateIn operators and SharingStarted strategies for them.
* SharedFlow.flowOn error lint (up from StateFlow).
* Precise cancellable() operator fusion.
* Precise distinctUntilChanged() operator fusion.
* StateFlow.compareAndSet function.
* asStateFlow and asSharedFlow read-only view functions.
* Consistently clarified docs on cold vs hot flows.
* Future deprecation notice for BroadcastChannel, ConflatedBroadcastChannel, broadcast, and broadcastIn.
* Channel(...) constructor function has onBufferOverflow parameter.
* buffer(...) operator has onBufferOverflow parameter.
* shareIn/stateIn buffer and overflow strategy are configured via upstream buffer operators.
* shareIn/stateIn fuse with upstream flowOn for more efficient execution.
* conflate() is implemented as buffer(onBufferOverflow=KEEP_LATEST), non-suspending strategies are reasonably supported with 0 and default capacities.
* Added reactive operator migration hints.
* WhileSubscribed with kotlin.time.Duration params

Fixes #2034
Fixes #2047

Co-authored-by: Ibraheem Zaman <1zaman@users.noreply.github.com>
Co-authored-by: Thomas Vos <thomasjsvos@gmail.com>
Co-authored-by: Travis Wyatt <travis.i.wyatt@gmail.com>
elizarov added a commit that referenced this issue Sep 10, 2020
Summary of changes:
* SharedFlow, MutableSharedFlow and its constructor.
* StateFlow implements SharedFlow.
* SharedFlow.onSubscription operator, clarified docs in other onXxx operators.
* BufferOverflow strategy in kotlinx.coroutines.channels package.
* shareIn and stateIn operators and SharingStarted strategies for them.
* SharedFlow.flowOn error lint (up from StateFlow).
* Precise cancellable() operator fusion.
* Precise distinctUntilChanged() operator fusion.
* StateFlow.compareAndSet function.
* asStateFlow and asSharedFlow read-only view functions.
* Consistently clarified docs on cold vs hot flows.
* Future deprecation notice for BroadcastChannel, ConflatedBroadcastChannel, broadcast, and broadcastIn.
* Channel(...) constructor function has onBufferOverflow parameter.
* buffer(...) operator has onBufferOverflow parameter.
* shareIn/stateIn buffer and overflow strategy are configured via upstream buffer operators.
* shareIn/stateIn fuse with upstream flowOn for more efficient execution.
* conflate() is implemented as buffer(onBufferOverflow=KEEP_LATEST), non-suspending strategies are reasonably supported with 0 and default capacities.
* Added reactive operator migration hints.
* WhileSubscribed with kotlin.time.Duration params

Fixes #2034
Fixes #2047

Co-authored-by: Ibraheem Zaman <1zaman@users.noreply.github.com>
Co-authored-by: Thomas Vos <thomasjsvos@gmail.com>
Co-authored-by: Travis Wyatt <travis.i.wyatt@gmail.com>
@pacher
Copy link

pacher commented Sep 15, 2020

If it is not too late and not too difficult, please consider returning current replayCache from resetReplayCache instead of Unit. So that it would be possible "atomically" read and clear the replay cache in one call without a possibility to lose an item in between.

@elizarov
Copy link
Contributor Author

@pacher Can you, please, elaborate on why would you need it? What's the use-case?

@elizarov
Copy link
Contributor Author

elizarov commented Oct 9, 2020

UPDATE: Last-minute design change - resetReplayCache is going to stay an experimental API.

@elizarov
Copy link
Contributor Author

UPDATE: Very last-minute design change - replay parameter in MutableSharedFlow() constructor will be optional since "replay" is conceptually an additional feature of a shared flow that you opt-in into by specifying a value for replay parameter. Having to always specify a magic value of zero just to say "I don't need replay" is not good.

@gmk57
Copy link

gmk57 commented Oct 28, 2020

A couple of questions/observations regarding "rendezvous shared flow" (with zero replay and zero buffer):

  1. It's described as a "special case" here, but with current constructor function (replay becoming optional) it looks like a default case. Maybe it's worth explaining its behavior in the docs just a little?
  2. It seems "each emit call suspends until all collectors process the emitted value" really means "until all collectors start processing" (it doesn't wait for them to complete).
  3. "tryEmit call on such a flow never succeeds" is not strictly correct: with zero subscribers it happily returns true (which probably makes sense).

@elizarov
Copy link
Contributor Author

@gmk57 Thanks for noticing it. I'm adding some more docs: #2346

@twyatt
Copy link
Contributor

twyatt commented Oct 28, 2020

For item 2, Is there a way to get the previously documented behavior?

each emit call suspends until all collectors process the emitted value

It would make transitioning to SharedFlow from a listener-style implementation easier (if waiting for consumers to process events was relied on).

@elizarov
Copy link
Contributor Author

elizarov commented Oct 29, 2020

@twyatt For item 2, Is there a way to get the previously documented behavior?

Not really. Even if it is implemented this way it is going to an extremely fragile guarantee and all Flow operators will have to contain a separate section in their documentation on whether they preserve or break this guarantee. It will be a very big foot gun.

Fundamentally, Flow is an asynchronous reactive framework. You could adapt it for use in the case where you need some kind of synchronous processing, but that would go against all the design principles on which Flow is based. Flow is fundamentally centered around the idea that your application is structured as one-way asynchronous pipelines with a single source of truth. Data producers should be in no way affected by how and when the consumers receive and process the respective values.

@petarmarijanovicfive
Copy link

Hey @elizarov, do you maybe have an ETA when will the 1.4.0-native-mt version of the coroutine library be available on jcenter? Thanks!
https://jcenter.bintray.com/org/jetbrains/kotlinx/kotlinx-coroutines-core/

@lajevski
Copy link

Hey @elizarov, do you maybe have an ETA when will the 1.4.0-native-mt version of the coroutine library be available on jcenter? Thanks!
https://jcenter.bintray.com/org/jetbrains/kotlinx/kotlinx-coroutines-core/

https://mvnrepository.com/artifact/org.jetbrains.kotlinx/kotlinx-coroutines-core/1.4.1-native-mt
You're welcome!

@petarmarijanovicfive
Copy link

recheej pushed a commit to recheej/kotlinx.coroutines that referenced this issue Dec 28, 2020
* Introduce SharedFlow and sharing operators

Summary of changes:
* SharedFlow, MutableSharedFlow and its constructor.
* StateFlow implements SharedFlow.
* SharedFlow.onSubscription operator, clarified docs in other onXxx operators.
* BufferOverflow strategy in kotlinx.coroutines.channels package.
* shareIn and stateIn operators and SharingStarted strategies for them.
* SharedFlow.flowOn error lint (up from StateFlow).
* Precise cancellable() operator fusion.
* Precise distinctUntilChanged() operator fusion.
* StateFlow.compareAndSet function.
* asStateFlow and asSharedFlow read-only view functions.
* Consistently clarified docs on cold vs hot flows.
* Future deprecation notice for BroadcastChannel, ConflatedBroadcastChannel, broadcast, and broadcastIn.
* Channel(...) constructor function has onBufferOverflow parameter.
* buffer(...) operator has onBufferOverflow parameter.
* shareIn/stateIn buffer and overflow strategy are configured via upstream buffer operators.
* shareIn/stateIn fuse with upstream flowOn for more efficient execution.
* conflate() is implemented as buffer(onBufferOverflow=KEEP_LATEST), non-suspending strategies are reasonably supported with 0 and default capacities.
* Added reactive operator migration hints.
* WhileSubscribed with kotlin.time.Duration params

Fixes Kotlin#2034
Fixes Kotlin#2047

Co-authored-by: Ibraheem Zaman <1zaman@users.noreply.github.com>
Co-authored-by: Thomas Vos <thomasjsvos@gmail.com>
Co-authored-by: Travis Wyatt <travis.i.wyatt@gmail.com>
recheej pushed a commit to recheej/kotlinx.coroutines that referenced this issue Dec 28, 2020
* Introduce SharedFlow and sharing operators

Summary of changes:
* SharedFlow, MutableSharedFlow and its constructor.
* StateFlow implements SharedFlow.
* SharedFlow.onSubscription operator, clarified docs in other onXxx operators.
* BufferOverflow strategy in kotlinx.coroutines.channels package.
* shareIn and stateIn operators and SharingStarted strategies for them.
* SharedFlow.flowOn error lint (up from StateFlow).
* Precise cancellable() operator fusion.
* Precise distinctUntilChanged() operator fusion.
* StateFlow.compareAndSet function.
* asStateFlow and asSharedFlow read-only view functions.
* Consistently clarified docs on cold vs hot flows.
* Future deprecation notice for BroadcastChannel, ConflatedBroadcastChannel, broadcast, and broadcastIn.
* Channel(...) constructor function has onBufferOverflow parameter.
* buffer(...) operator has onBufferOverflow parameter.
* shareIn/stateIn buffer and overflow strategy are configured via upstream buffer operators.
* shareIn/stateIn fuse with upstream flowOn for more efficient execution.
* conflate() is implemented as buffer(onBufferOverflow=KEEP_LATEST), non-suspending strategies are reasonably supported with 0 and default capacities.
* Added reactive operator migration hints.
* WhileSubscribed with kotlin.time.Duration params

Fixes Kotlin#2034
Fixes Kotlin#2047

Co-authored-by: Ibraheem Zaman <1zaman@users.noreply.github.com>
Co-authored-by: Thomas Vos <thomasjsvos@gmail.com>
Co-authored-by: Travis Wyatt <travis.i.wyatt@gmail.com>
@timrijckaert
Copy link

The above information will be added to StateFlow documentation, thus giving easily actionable advice on what to do when you have a use-case that is almost like a StateFlow, but with some tweaks (more buffer space, support duplicate emissions, does not have an initial value, etc).

So how does one create a StateFlow/MutableStateFlow/SharedFlow that supports duplicate emissions?

@wkornewald
Copy link

So how does one create a StateFlow/MutableStateFlow/SharedFlow that supports duplicate emissions?

My library ReactiveState-Kotlin contains a (Mutable)ValueFlow class that does what you want:

https://github.com/ensody/ReactiveState-Kotlin/blob/master/core/src/main/java/com/ensody/reactivestate/ValueFlow.kt

@eygraber
Copy link

eygraber commented Jan 10, 2021

This is what I've been using when I need to mimic a StateFlow behavior (not API) but allow duplicate emissions:

MutableSharedFlow<Boolean>(
      replay = 1,
      onBufferOverflow = BufferOverflow.DROP_OLDEST
).apply {
    tryEmit(true)
}

Not sure if it's 100% the same behavior, but it's close enough for my use case.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests