Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consider making BroadcastChannels to implement Flow #1082

Closed
elizarov opened this issue Apr 10, 2019 · 51 comments
Closed

Consider making BroadcastChannels to implement Flow #1082

elizarov opened this issue Apr 10, 2019 · 51 comments

Comments

@elizarov
Copy link
Contributor

Unlike point-to-point channels that are somewhat tricky to use, various kinds of BroadcastChannel implementations seem well suited to directly implement Flow interface. That would make easier and slightly more efficient to use them as "data model" classes in MVVM architectures, being able to directly use full set of flow operators to transform them and wire to UI.

Also, we might consider a shorter name for ConflatedBroadcastChannel that would focus on its "dataflow variable" nature. DataFlow could be a good starting point for this name bike-shedding.

@ZakTaccardi
Copy link

What about StateFlow? I like the idea that "state" is repayable

@elizarov
Copy link
Contributor Author

Having thought about it a bit more, it looks the whole BroadcastChannel is a misnomer. They are not really channels! They are more like some kind of "hot flows".

@gildor
Copy link
Contributor

gildor commented Apr 11, 2019

+1 to DataFlow

Will this also mean that internal implementation of DataFlow will be changed to be better for MVVM/reactive use case (yes, I'm talking about #395)

@bohsen
Copy link

bohsen commented Apr 11, 2019

I've said it before and will say it again. ConflatedBroadcastChannel IMHO is a really powerful tool for us developers. This could make it even better.

The nice thing about ConflatedBroadcastChannel is that it handles the subscription to the channel and the subscribers for you in a simple and safe manner - it's like messaging. It's very easy to use.

The only thing I'm missing is an easy way to control the storing of the state. I have a usecase where I want to push "an Event" to all subscribers but I don't want this Event stored.

@adibfara
Copy link

I do think that a broadcast channel with the capacity of zero is missing too (like publish subject in RX) .

@elizarov
Copy link
Contributor Author

@adibfara What is your use case for such a zero-capacity broadcast channel?

@adibfara
Copy link

adibfara commented Apr 18, 2019

@adibfara What is your use case for such a zero-capacity broadcast channel?

@elizarov What I meant was, currently I found no way to have a broadcast channel (a channel that does not suspend for writing), that does not emit the value that is holding to the anyone that opens a subscription.
The use case could be the need to listen for events (as in change) of something, and we only need the updates.
The pipe that is holding the value inside the ConflatedBroadCastChannel, might have a very old data, and we open a subscription to it, we get the data, even though we might already have a newer data that has not been pushed to the channel (because we don't want to broadcast it), and we only want to be updated if the broadcast channel emits it to us.

If this seems interesting, I can move it to another thread/issue since it might be unrelated to the title of this issue.

@elizarov
Copy link
Contributor Author

elizarov commented Apr 18, 2019

@adibfara Let's keep this discussion here for now because it is important to take into account for this issue of aligning broadcast channels with flows. It is important to learn more about your use-case, as it related to, at least, proper naming of these concepts. Let me try to rephase how I'm getting this particular use case for zero-capacity (not conflated):

Say we have some classical callback-based even-subscription API:

obj.addEventListener(listener)
...
obj.removeEventListener(listener)

This API is quite error-prone as it is easy to forget removeEventListener, so instead we can represent it is a flow, which give us the advantage of structured concurrency, so we can just write:

launch { // in proper scope
    eventFlow.collect { event -> doSomething() }
}

Moreover we already have an API for that (current called flowViaChannel, see its docs) that lets you adapt existing callback-based API to the flow. But what if we don't have existing callback-based API. We have only a piece of code that emits those events and we don't want to spend time implementing and managing all those "event listeners list". What we want is something that is a Flow, but at the time where we can simply reports events to it and make them delivered to all active "listeners":

val eventFlow = EventFlow()

fun onEvent(event: Event) {
    eventFlow.emit(event) // all collectors receive it
}

The problem that we have is back-pressure and context management. The collectors (listeners) maybe slow and busy we something else at this moment. So what should eventFlow.emit do?

Here is how ArrayBroadcastChannel works now. You can create val bc = BroadcastChannel(1) and then the first call to bc.send(event) completes immediately and starts sending it to all the listeners, but the subsequent call to bc.send(event) would suspend if the previous event was not processed yet.

Is this one-element-channel enough? Do you really have a use-case where eventFlow.emit/bc.send must suspend until all the collectors have processed the event? Why you might need this behavior? Under what circumstances capacity=1 may not suit you?

@LouisCAD
Copy link
Contributor

The way I currently deal with "hot flows" exposed as ReceiveChannels (usually from a BroadcastChannel that may be conflated is by using a variant of consumeEach that cancels the coroutine consuming a value when a new value is received.

You can see how I implemented the consumeEachAndCancelPrevious extension here:

https://github.com/LouisCAD/Splitties/blob/92e54bad60f53a557c0570d9b5dd16323e863eab/sample/src/androidMain/kotlin/com/louiscad/splittiessample/extensions/coroutines/Channels.kt#L19-L36

I use it a lot in my projects, that is my alternative to repeating callbacks that dispatch a state or events, and it allows me to ensure proper cancellation when the state has changed.

I did not use flow in its current form yet because it doesn't seem to support this kind of usages, and that's what I'm need a lot in the project I'm working on.

@adibfara
Copy link

adibfara commented Apr 18, 2019

@elizarov Thanks for the info. The reason I suggested moving it out of this conversation was that with my understanding, what I'm suggesting is somewhat of a hot and maybe endless stream of values, not a cold one.

The problem that we have is back-pressure and context management. The collectors (listeners) maybe slow and busy we something else at this moment.

How this is handled with the ConflatedBroadCastChannel? I was under the assumption that the sender, is not concerned about back-pressure and the context of the channel's listeners.

Here's what I think the current APIs's, from a cold/hot standpoint, so correct me If I'm wrong:

Channels:
Hot, send and consumers both suspend, so senders are back-pressered because of slow receivers. Each item is consumed only once.

Flow:
Cold, like channels, receivers suspend, waiting for the flow to finish. Stuff cannot be send to a flow from outside of it, because they should be built, like channel's produce. Unlike produce, gathering the data does not start immediately, and starts upon subscription. Each item is consumed once.

BroadCastChannel:
Hot, each item gets consumed by every consumer and each sender suspends until everyone has consumed the item.

ConflatedBroadCastChannel:
Hot, only consumers suspend, waiting for the next value, and immediately get the current value upon subscription. Senders can only access the channel from outside of it and send stuff into it and do not suspend. Each item might get consumed by all consumers (if they are not busy), and sending items to the consumers does not need back-pressure handling.

Broadcast (the thing that I was talking about):
Hot, consumers (0 to n) suspend for the next value, and do not get the current value of the stream upon subscription. Everything else is exactly like ConflatedBroadCastChannel. Senders do not care about this item's consumption, so If a new item arrives before previous item is consumed by all consumers, It would get lost unless it is stored in an array or something (with capacity). So the only difference with conflated broadcast channel, is the initial submission upon subscription. Currently, I see this broad cast channel as a general conflated broadcast channel, where it would store the last n items, and emits them to anyone that opens a subscription, and the old conflated broad cast channel is a special case with the capacity of 1. So the capacity of 0 would mean it will not store anything, and a capacity of 5 would mean it would store the latest 5 values. This is like how Rx's subject behave, in which, if you subscribe to a PublishSubject you will only receive items that get emitted after your subscription, and if you subscribe to a BehaviorSubject, upon subscription, you will get the stored value, and the next items after wards. This goes the same for ReplaySubject which takes a capacity parameter, so BehaviorSubject is actually a ReplaySubject(1).

Do you really have a use-case where eventFlow.emit/bc.send must suspend until all the collectors have processed the event?

I do not need the bc.send to wait for every collector to consume it.

@rharter
Copy link

rharter commented Jun 19, 2019

To add to the BroadcastChannel(capacity = 0) discussion, in the Guide to Reactive Streams with Coroutines document you state that BroadcastChannel(capacity) corresponds to Rx's PublishSubject, but that's not true.

ArrayBroadcastChannel contains a buffer and emits all items in the buffer to any new subscriber. That sounds more like a ReplaySubject.

Furthermore, the ArrayBroadcastChannel (and all other channels, as far as I can see) block the sender if the buffer is full instead of dropping the oldest items.

In my use case I'm looking to send events to my UI that are only important if there is a subscriber when the event is fired. For instance, if I need to display a toast notification relating to a screen, but the screen is paused or obstructed, there's not reason to show that notification, and it wouldn't make sense to show it later when the screen resumes (and resubscribes) since the context is lost.

Effectively I would expect the BroadcastChannel(capacity = 0) to create a pipe that I can throw data down, without the sender caring who's on the other end. If no one is there, discard the data. If someone is there, give them the data, but don't give them historical data.

@elizarov
Copy link
Contributor Author

A correction here, despite the fact that ArrayBroadcastChannel contains a buffer it does not emit to new subscribers, so it does not do replay.

Would not ConflatedBroadcastChannel work for your use-case?

@LouisCAD
Copy link
Contributor

@Skaiver You should avoid GlobalScope and use viewModelScope as shown in Android doc.

Also, experimental coroutines are obsolete since Kotlin 1.3 because now, we have stable coroutines.

About your question it'd be better suited for Kotlin's Slack or StackOverflow as I think it goes beyond the scope of this issue.

@LouisCAD
Copy link
Contributor

LouisCAD commented Jun 20, 2019 via email

elizarov added a commit that referenced this issue Jul 18, 2019
DataFlow is a Flow analogue to ConflatedBroadcastChannel. Since Flow API
is simpler than channels APIs, the implementation of DataFlow is
simpler. It consumes and allocates less memory, while still providing
full deadlock-freedom (even though it is not lock-free internally).

Fixes #1082
elizarov added a commit that referenced this issue Jul 18, 2019
DataFlow is a Flow analogue to ConflatedBroadcastChannel. Since Flow API
is simpler than channels APIs, the implementation of DataFlow is
simpler. It consumes and allocates less memory, while still providing
full deadlock-freedom (even though it is not lock-free internally).

Fixes #1082
@elizarov
Copy link
Contributor Author

Take a look at #1354 please. It is a proposed flow-based analogue to ConflatedBroadcastChannel that makes it unnecessary to implement Flow interface in it. The tentative name is DataFlow, because it is a Flow that is designed to be used as a data model.

elizarov added a commit that referenced this issue Jul 18, 2019
DataFlow is a Flow analogue to ConflatedBroadcastChannel. Since Flow API
is simpler than channels APIs, the implementation of DataFlow is
simpler. It consumes and allocates less memory, while still providing
full deadlock-freedom (even though it is not lock-free internally).

Fixes #1082
@elizarov
Copy link
Contributor Author

Let's bikeshed a name:

  • 👍 DataFlow. The name of the current prototype ([DRAFT] DataFlow implementation #1354). It is a play on the LiveData class from Android and the fact that it is going to be used to represent a "data model" in UI and forms a basis of "data flow" programming model.
  • 🚀 ValueFlow. Plays on the fact that this flow has value property that can be read to check what is "the current value" that is kept by this flow and can be updated.
  • 🎉 StateFlow. Represents the fact that this flow represents "an application state" that can be updated. Being a state it is natural to be interested only in the most recent, conflated, value of it.

Note, that the other flow class like this will be most likely called EventFlow and will be designed for "event streams" (like mouse or button clicks) which should not be conflated and where the most recent value does not make any sense. There is not much bikeshedding about its name, since it is quite a good fit. So DataFlow/ValueFlow/StateFlow name should mesh well and be clearly distinct from EventFlow name.

@chris-hatton
Copy link

chris-hatton commented Aug 28, 2019

StateFlow is most incisive:

  • Data Flow implies data in an untyped, raw form, which this is (usually) not.
  • Value Flow somewhat clashes by name with Kotlins val, implying something unchanging, or constant, which this is not.
  • State Flow - as you said yourself, succinctly points to the fact there is an ever-present state of this thing, which can be accessed... but which flows, changes.

@gildor
Copy link
Contributor

gildor commented Aug 28, 2019

I don't like StateFlow name, "an application state" looks too domain-specific for universal primitive that may be used for many use cases not only for keeping some state
Data/Value looks fine, don't see a big difference between them.

Flow implies data in an untyped

A bit strange for, why Data is untyped? why is State typed in this case?

@LouisCAD
Copy link
Contributor

@chris-hatton "Value" doesn't imply the value is unchanging.
You can perfectly have a val that always return a different value on each get. It's read-only, which is valid in this case.
Also, it's "ValueFlow", and still makes sense so long you don't drop "Flow" from it.

ValueFlow is great because it can imply there's always a value, and that only the last value makes sense.

The word "Data" in DataFlow doesn't imply singularity so much to me as we could reason about it for all the data coming in the flow, not the latest.

I also voted StateFlow, but I agree with @gildor, apps that will be using shared Flows (#1261) too for state will have a hard time getting distinct names when there's also StateFlows in the file, or codebase.

@ivanbartsov
Copy link

ivanbartsov commented Aug 28, 2019

My two cents: from the current choices, I cast my vote for StateFlow:
IMO other names don't hint about the flow being conflated and the possibility of skipped values. As I see it, we're looking for something that communicates the "only-the-last-value-matters-flow" idea nicely: commonly true for State, somewhat true for Value, not really as much for abstract Data -- we usually want all of.

I'd like to contribute some more options for the sake of brainstorming, not necessarily good but maybe they'll bounce off someone and bring out something else (feel free to boo or delete of course, and sorry for the length):

  • CurrentFlow -- as in, you're only getting the current something, whatever you missed is not relevant. The obvious disadvantage is it will probably cause a mental collision with "the flow that we're using right now"
  • Just Current -- even more concise, but probably not having the Flow at the end is a bad idea, plus way too much potential to get mixed in with lots of other stuff from different namespaces. The cool thing is, "flow" and "current" are strongly related words in the context of fluids, but in our case it also has the additional "right now" connotation
  • ConflatedFlow -- I don't see it in the comments, I suppose it's not to the point for some reason?
  • SquashFlow -- somewhat to the point IMO, but has a little bit of "flatten" connotation to it (from git squash)
  • ShallowFlow -- as in, "no depth", does hint the "current/last value only" a bit
  • OmniFlow -- as in, "for all", carries the "broadcasted" part of the idea across a little bit, although also suggests "all-encompassing"
  • UniFlow -- deriving from latin "one", commonly perceived as "something single for many somethings" (e.g. universal ~ one fit for many purposes), or may hint "unified" as in "encompassing everything that matters from a multitude of something"

@zach-klippenstein
Copy link
Contributor

Neither DataFlow nor ValueFlow imply that the value is replayed and always available - events are "data" and "values" too. I voted for StateFlow because it is the only one that implies this property for me, even if it's not used to represent "application state" in all cases.

@fvasco
Copy link
Contributor

fvasco commented Sep 6, 2019

Hi @zach-klippenstein,
I wish to reconsider our recent consideration about Flow and DataFlow usage.

For some use case that solution can be impractical, I think to use DataFlow to store a mutable configuration.
The main Flow's issue is the high cost of first() operation, really much expensive that BroadcastChannel.value.

I currently use a custom implementation of observable resource to get the current value and the further updates.
In some our use case a part of configuration is required to execute a function, so we have to await the first configuration value, in other our use case an update of the same configuration dispatches an event.

I cited issue #274 because we need a builder like receiveBroadcastConflatedChannel { ... }, unfortunately I suspect that this issue does not help our use case.

@zach-klippenstein
Copy link
Contributor

In some our use case a part of configuration is required to execute a function

Why is it required? This means that code isn't going to be reactive.

@fvasco
Copy link
Contributor

fvasco commented Sep 6, 2019

@zach-klippenstein you are right.

In this really simple example the code is not reactive

var roles = setOf("admin")

fun checkAccess(role:String){
    check(role in roles)
}

However we want to update the roles variable and, for example, fire an event to recheck all sessions.

@zach-klippenstein
Copy link
Contributor

Still not sure why this needs non-reactive access to the value. If you're firing an event to trigger the checks, you must already be subscribing to the roles flow anyway. So you could do something like:

suspend fun checkAccess(role: String) {
  rolesFlow.collect { roles -> check(role in roles) }
}

// and/or

fun checkAccessIn(role: String, sessionScope: CoroutineScope) {
  sessionScope.launch { checkAccess(role) }
}

where sessionScope is some scope that is tied to your session lifetime, and so the session will be failed if the check fails.

@fvasco
Copy link
Contributor

fvasco commented Sep 6, 2019

Hi @zach-klippenstein,
I my hypothetical use case a configuration (role) can be updated in any time in the future, instead checkAccess must check a role using only the current configuration.
checkAccess can check role using rolesFlow.first() only, further calls using the same role can fail or not, depending by further configurations.

rolesFlow is an infinite flow, a collect can loop forever and sessionScope hangs.

@zach-klippenstein
Copy link
Contributor

I still don't understand what part of your design prevents doing everything reactively, but even if that is a legitimate need, providing non-reactive access to data that can change is often a source of bugs later, because new code isn't forced to deal with the fact that the value can change.

rolesFlow is an infinite flow, a collect can loop forever and sessionScope hangs.

Yes, you'd need to cancel the continuously-running access checking job when the session finishes.

@fvasco
Copy link
Contributor

fvasco commented Sep 7, 2019

Hi @zach-klippenstein,
I try to write a better example.

I use a stock exchange price for my factory: factoryPriceFlow

val factoryPriceFlow: Flow<Double> = flow { TODO() }

I want to use this flow to update the current price on user interface:

factoryPriceFlow
        .onEach { currentPrice -> updatePrice(currentPrice) }
        .launchIn(myScope)

Moreove I want to implement an action to buy a stock using the current price

suspend fun buyStocks(quantity: Int) {
    val currentPrice = factoryPriceFlow.first() // get current value
    newCreditCardTransaction(currentPrice * quantity)
    fetchStock(quantity)
}

buyStocks is not reactive, do you have any suggestion regarding it?

I had considered a new operator to get the latest value, but I don't have any valid proposal to build an unscoped version of it.

val latestFactoryPrice by factoryPriceFlow.latest()

@zach-klippenstein
Copy link
Contributor

I think we're getting a bit into the weeds here about general architecture so maybe we should move this convo to Slack? But that makes sense.

If buyStocks is not a static/top-level/object function, then it already has an implicit scope (in the abstract sense, not CoroutineScope), even if that scope is "the lifetime of your app". If you make that scope explicit, either tied directly to a Job or with simple start/stop methods tied to a private Job/CoroutineScope, then you can implement your latest delegate (taking the scope as a parameter) or just manually cache the latest value in a private property.

Another approach would be to make your buyStocks function just another flow, and let whoever is invoking it worry about the correct scoping:

typealias BuyStocksAction = (quantity: Int) -> Unit

val buyStockActions: Flow<BuyStocksAction> =
  factoryPriceFlow.map { currentPrice ->
    { quantity -> buyStocksAtPrice(quantity, currentPrice) }
  }

private fun buyStocksAtPrice(quantity: Int, currentPrice: Double) {
  …
}

// Then in your UI layer, if you're using something like RxBindings, and #1498 existed:

val buyActions = quantityView.values.withLatestFrom(stockManager.buyStockActions) { quantity, action ->
  { action(quantity) }
}

purchaseButton.clicks.withLatestFrom(buyActions) { _, buyAction -> buyAction() }
  .launchIn(viewScope)

But both of these approaches might seem like a lot of extra boilerplate just to avoid exposing the current price as a simple property. Are they actually better? I would argue yes, because the concept of "factory price" is inherently reactive, so the API should reflect that. The need to sample the reactive stream at a specific instant in time is really only a consequence of the view framework you're working in not being inherently reactive (e.g. Jetpack Compose will likely make this a lot nicer). And because that is a requirement of the view layer, that's the layer that should be responsible for bridging reactive and non-reactive code.

@zach-klippenstein
Copy link
Contributor

Zooming back out from the specifics of this example, another reason that exposing "current value" as part of your API isn't ideal is composability. Smaller API surfaces are easier to compose together than larger ones. If your consumers start depending on your current value directly, then it is harder to add intermediate components between them later. Those components would need to continue to expose both a reactive stream and their current value, so there are now two parallel paths through which data is flowing (through the stream and through the chain of current values). This is just more code to write, but also means you need more tests to cover both code paths.

Once you've started designing your code to be reactive first, it's much simpler to keep everything reactive until you absolutely have to – which is typically at the view layer.

@fvasco
Copy link
Contributor

fvasco commented Sep 7, 2019

Hi @zach-klippenstein,
my issue with the scope occurs when latestFactoryPrice is an instance property, so the scope is the instance lifetime.

If you make that scope explicit

...then I am forced to implement a dispose method in my class and in any classes where it is used, I consider this solution inconvenient in a garbage collected language.
If a developer forgets to dispose an instance then his creates a memory leak, moreover that variable will update forever.

just manually cache the latest value in a private property

this is a workaround on library miss, we are discussing on it, moreover it is not possible to collect a flow without a scope.
Implementing a @Volatile, private property is not a simple boilerplate for developers (really like a lazy variable) and it is an extra CPU work anywhere is defined (even it is not used).

Another approach would be to make your buyStocks function just another flow

So I should get the first() value of buyStockActions to get an instance of buyStocks with currying, I really don't understand how this suggestion solve/simplify my issue.

val buyStocks = buyStockActions.first()
buyStocks(10)

I propose you a simple exercise, please help me to understand.

The current flow send price's updates:

val factoryPriceFlow: Flow<Double> = flow {
    while (true) {
        emit(1.0)
        delay(100)
        emit(2.0)
        delay(200)
    }
}

I have to implement a simple program to print the current price, like a REST service or a trivial main function, please complete the code

fun main() {
    val currentPrice: Double = TODO()
    println("The current price is $currentPrice")
}

It should print "The current price is 1.0".
Can you fix/rewrite my code?

Thank you in advance.

@fvasco
Copy link
Contributor

fvasco commented Sep 8, 2019

I rethink about above checkAccess problem (required by some legacy framework, for example).

first() is a suspending method, I didn't find no way to respond quickly if configuration is not already present.

Using the current API the only proposal is:

val rolesFlow: Flow<Set<String>> = TODO()

override fun checkAccess(role: String) {
    val roles =
            runBlocking {
                withTimeoutOrNull(100) {
                    rolesFlow.first()
                }
            } ?: emptySet()
    check(role in roles)
}

I have to consider that a Flow is inadequate in this use case, instead ConflatedBroadcastChannel works well (value, valueOrNull) but it misses of read-only interface (#274).

@zach-klippenstein your previous post is reasonable, however if we have to expose the same value using WebSocket (reactive) and HTTP long polling (not reactive) then there is not much choices.
I hope that my comments help us to prepare a reasonable response for these use cases.

elizarov added a commit that referenced this issue Sep 9, 2019
StateFlow is a Flow analogue to ConflatedBroadcastChannel. Since Flow API
is simpler than channels APIs, the implementation of StateFlow is
simpler. It consumes and allocates less memory, while still providing
full deadlock-freedom (even though it is not lock-free internally).

Fixes #1082
elizarov added a commit that referenced this issue Sep 9, 2019
StateFlow is a Flow analogue to ConflatedBroadcastChannel. Since Flow API
is simpler than channels APIs, the implementation of StateFlow is
simpler. It consumes and allocates less memory, while still providing
full deadlock-freedom (even though it is not lock-free internally).

Fixes #1082
elizarov added a commit that referenced this issue Sep 11, 2019
StateFlow is a Flow analogue to ConflatedBroadcastChannel. Since Flow API
is simpler than channels APIs, the implementation of StateFlow is
simpler. It consumes and allocates less memory, while still providing
full deadlock-freedom (even though it is not lock-free internally).

Fixes #1082
elizarov added a commit that referenced this issue Sep 11, 2019
StateFlow is a Flow analogue to ConflatedBroadcastChannel. Since Flow API
is simpler than channels APIs, the implementation of StateFlow is
simpler. It consumes and allocates less memory, while still providing
full deadlock-freedom (even though it is not lock-free internally).

Fixes #1082
@eygraber
Copy link

I have a use case for BroadcastChannel with capacity UNLIMITED.

The use case is related to RecyclerView and handling View events on it's children, and I'll be using the example of a View.OnClickListener.

RecyclerView uses the ViewHolder pattern for rendering it's children; it creates a View on demand, and then "caches" it, and reuses it when that View is scrolled off the screen. When it is reused, a method is invoked that gives you a chance to bind your data to that View. One of the benefits of this is increased performance since we don't need to allocate a new View for each child every time the RecyclerView is scrolled, and we don't need to have a 1:1 of Views to children (because that could get memory intensive very quickly).

A best practice while doing this is to limit the amount of allocation that occurs during a bind, and prefer to do it when the View is getting created (or not even allocate then if possible). This often comes into play with setting OnClickListener on a View.

Another issue is that it is difficult to easily pass these events up to a Presenter (or other architecture component) without a lot of manual wiring because of how RecyclerView works.

My solution uses a global OnClickListener. The back end of it looks like this:

/*
Note: specifically not a data class; we want an identity hashCode()
 */
class ListEvent<T>(
    private val capacity: Int = 2
) {
  internal data class Payload(
      val listenerId: Int,
      val payload: Any
  )

  internal val hashCode by lazy { hashCode() }

  fun CoroutineScope.clicks() = produce<T>(capacity = capacity) {
    forwardEvents(GlobalClickChannel, channel)
  }
}

internal val GlobalClickListener = View.OnClickListener { view ->
  GlobalScope.launch(Dispatchers.Main) {
    GlobalClickChannel.send(
        ListEvent.Payload(
            listenerId = view.getTag(R.id.list_model_id) as Int,
            payload = view.getTag(R.id.list_model_on_click)
        )
    )
  }
}

internal val GlobalClickChannel = BroadcastChannel<ListEvent.Payload>(BUFFERED)

private suspend fun <T> TwentyListEvent<T>.forwardEvents(
    globalChannel: BroadcastChannel<ListEvent.Payload>,
    sendChannel: SendChannel<T>
) {
  val receiveChannel = globalChannel.openSubscription()
  for((listenerId, payload) in receiveChannel) {
    if(listenerId == hashCode) {
      @Suppress("UNCHECKED_CAST")
      (payload as? T)?.let { typedPayload ->
        sendChannel.send(typedPayload)
      }
    }
  }
}

When I want to handle a click from a RecyclerView child, in my Presenter I maintain a reference to a ListEvent that was passed to the RecyclerView and is accessible in the child binding function. I can then set the payload, and the global OnClickListener for the view without having to make an allocation:

  protected fun <T> View.setListClickListener(listener: ListEvent<T>, payload: T) {
    setTag(R.id.list_model_on_click, payload)
    setTag(R.id.list_model_id, listener.hashCode)
    setOnClickListener(GlobalClickListener)
  }

  protected fun View.clearListClickListener() {
    setTag(R.id.list_model_on_click, null)
    setTag(R.id.list_model_id, null)
    setOnClickListener(null)
  }

I can consume the clicks from my Presenter by simply:

private fun CoroutineScope.observeVenueClicks() {
    launch {
      val clickEvents = with(clickListEvent) { clicks() }
      for(event in clicks) {
        ...
      }
    }
  }

The problem is, that if there's a misbehaving consumer that doesn't properly consume the events from ListEvent.clicks() (a bug, or it gets blocked, etc...), then the whole system will deadlock, when GlobalClickChannel's buffer is filled. Even if a new consumer comes later and opens a new subscription in forwardEvents it will not be able to drain GlobalClickChannel since it's not conflated. I could keep raising the buffer size, but that only delays the problem.

If I could use UNLIMITED then I would never get to a point where the whole system is deadlocked. I tried using PublishSubject but that has a similar problem, since it will wait for all the collectors, and will also be affected by a misbehaving consumer.

Will DataFlow be able to handle this scenario?

@eygraber
Copy link

After much debugging, it turns out that my issue was that I was receiving from the BroadcastChannel's ReceiveChannel using a for loop. Using consumeEach seems to have fixed the problem.

I think that my above points still stand, even though it wasn't my current issue.

@elizarov
Copy link
Contributor Author

elizarov commented Jan 16, 2020

@eygraber Unfortunately, I don't know a good solution to the problem you've described. That is, if you have potentially misbehaving listeners then you have to choose between:

  • Limited size buffer and blocking other listeners
  • Unlimited size buffer and eventually getting OutOfMemoryError.

Arguably, the latter option might be preferred for mobile apps that would not live long enough to get their memory exhausted, so it does make sense to support unlimited-size buffers, indeed.

@eygraber
Copy link

Is not allowing UNLIMITED already set in stone, or could it still be implemented? Similarly, is BroadcastChannel for sure on track to get deprecated?

@elizarov
Copy link
Contributor Author

We do plan to provide some sort of unlimited-buffered primitive of this kind.

@JavierSegoviaCordoba
Copy link

JavierSegoviaCordoba commented Feb 7, 2020

I have seen the DataFlow API is marked as draft, is it going to be available soon or I should use ConflatedBroadcastChannels?

@carterhudson
Copy link

A correction here, despite the fact that ArrayBroadcastChannel contains a buffer it does not emit to new subscribers, so it does not do replay.

Would not ConflatedBroadcastChannel work for your use-case?

ConflatedBroadcastChannel re-emits the last sent value when I use it. BroadcastChannel(1) does not.

@elizarov
Copy link
Contributor Author

elizarov commented Apr 29, 2020

Thanks a lot for a fruitful discussion in this issue and tons of useful feedback. This issue was originally created to "Consider making BroadcastChannels to implement Flow" and as a result of this feedback and discussions the decision is not to implement Flow in BroadcastChannel interface, but instead provide better-designed Flow-based replacement for the use-case that are currently covered by broadcast channels. The replacements will be tracked by separate issues:

I'm closing this one.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests