Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ConflatedBroadcastChannel should deliver last value #1816

Closed
pacher opened this issue Feb 19, 2020 · 9 comments
Closed

ConflatedBroadcastChannel should deliver last value #1816

pacher opened this issue Feb 19, 2020 · 9 comments
Labels

Comments

@pacher
Copy link

pacher commented Feb 19, 2020

For ConflatedChannel the problem was discussed in #332, addressed in #1235 and fixed by #1239,
but not for ConflatedBroadcastChannel

Slightly modified example from #1235:

fun main() = runBlocking {
    val channel = Channel<Int>(capacity = Channel.CONFLATED)
    with(channel) {
        send(1); send(2); send(3); send(4)
        close()
    }
    println(channel.receive())
}

Correctly prints 4, but

fun main() = runBlocking {
    val channel = ConflatedBroadcastChannel<Int>()
    with(channel) {
        send(1); send(2); send(3); send(4)
        close()
    }
    println(channel.openSubscription().receive())
}

throws ClosedReceiveChannelException

Is it by design?

@elizarov
Copy link
Contributor

It is the design now, but we can change it. What is your use-case for ConflatedBroadcastChannel where this difference in behavior becomes important?

@fluidsonic
Copy link

fluidsonic commented Apr 2, 2020

I just ran into the same issue.

I have a ConflatedBroadcastChannel for emitting DownloadStatus (initial, running…, completed).

class DownloadCoordinator {
   private val stateChannel = ConflatedBroadcastChannel<DownloadState>(DownloadState.Initial)
   val states = stateChannel.asFlow()
   // etc.
}

Emitting looks like this:

  • initial
  • running (10%)
  • running (90%)
  • complete
  • close

Now a consumer that's interested in the latest state of the download simply does this:

downloadCoordinator.states.first()

It was totally unexpected to me that .first() doesn't work on a ConflatedBroadcastChannel-backed Flow once the channel is closed:

NoSuchElementException: Expected at least one element

I've expected the flow to re-emit the last value.

I also have other consumers that use states.collect { … } to follow the progression of the download. That should complete automatically once the terminal state (completed) was reached.

From the docs:

ConflatedBroadcastChannel
Broadcasts the most recently sent element (aka value) to all openSubscription subscribers.

Back-to-send sent elements are conflated – only the the most recently sent value is received, while previously sent elements are lost. Every subscriber immediately receives the most recently sent element. Sender to this broadcast channel never suspends and offer always returns true.
It mentions three times that the "most recently sent element" is received by subscribers.

"most recently sent element/value" is mentioned three times.
The only mention of different behavior after closing is in the description of valueOrNull:

The most recently sent element to this channel or null when this class is constructed without initial value and no value was sent yet or if it was closed.

@qwwdfsad qwwdfsad added the design label Apr 3, 2020
@qwwdfsad
Copy link
Member

qwwdfsad commented Apr 3, 2020

Thanks for the detailed explanation!
It seems like this particular problem will be resolved with stateful flow

elizarov added a commit that referenced this issue May 7, 2020
StateFlow is a Flow analogue to ConflatedBroadcastChannel. Since Flow API
is simpler than channels APIs, the implementation of StateFlow is
simpler. It consumes and allocates less memory, while still providing
full deadlock-freedom (even though it is not lock-free internally).

Fixes #1973
Fixes #395
Fixes #1816
@elizarov
Copy link
Contributor

elizarov commented May 7, 2020

It is being provided by implementation in #1974

@pacher
Copy link
Author

pacher commented May 18, 2020

@elizarov Sorry for late reply in these difficult times
My use case is like in #332, that is why I did not specify it. More specifically second use case in your comment.
I have a computation producing intermediate results, which I want to expose as Flow from API. So I have private ConflatedBroadcastChannel and expose it via channel.asFlow().
My original intention was to close channel normally when computation is complete to indicate that the last result is a final result of computation. And close with error if computation fails.
But if the client collects the flow after channel is closed, it gets no results at all.
Currently my solution is to never close channels, which feels wrong. I also had to introduce additional wrapper class with isFinal flag, so that I can use takeUntil { it.isFinal } operator on the flow before returning it. Btw, currently there is no such operator in the library.

Anyway, my point was that ConflatedChannel and ConflatedBroadcastChannel should behave similarly. I don't see why the number of clients should change the semantics wrt last value. Deliver last value or not, but shouldn't it be consistent? What is the fundamental difference in semantics between the two which I am missing? I though that it was just overlooked and forgotten, so I opened this issue to remind.

Now, you can close the issue if you will, but in my opinion it is not resolved. Neither StateFlow nor SharedFlow could be used in case of computation with intermediate values because they are both designed for never-ending streams. They can not be closed and there are no mechanisms for error propagation.

@elizarov
Copy link
Contributor

@pacher As discussed in SharedFlow design #2034 you can materialize completion. There is no takeUntil operator, but there is a takeWhile operator, so you can write takeWhile { !it.isFinal } on the collector's side of the SharedFlow. Moreover, shared flow lets you replay any number of items to the collectors, so you can configure shared flow with replayCapacity = 2 to get both the final result and the completion status of the computation. Thus, it seems to me that the shared flow design in #2034 fully covers your use-case.

@pacher
Copy link
Author

pacher commented May 20, 2020

so you can write takeWhile { !it.isFinal }

@elizarov No I can't, because then final result will not be included, which is a bummer.
The difference to takeWhile is that predicate is checked after emitting the item so that the item which flipped predicate is included as well. RxJava and Reactor both have takeWhile/takeUntil combo. Weirdly I did not find it in kotlin collections, I was sure it is there too.

For implementation I copy-pasted takeWhile and changed

            if (predicate(value)) emit(value)
            else throw AbortFlowException(this)

to

            emit(value)
            if (predicate(value)) throw AbortFlowException(this)

But it would be really nice to have it in the library (even simple takeWhile uses internal classes which I had to copy as well). Maybe somebody can make a PR, maybe even me.

As discussed ... shared flow design in #2034 fully covers your use-case

Yes, I got your point in #2034 that materializing is the way to go, which I still think is a workaround (and not only me). I would rather say: "SharedFlow could be used in building a solution which covers your use-case".

While we are waiting for proper operators in the library, can you please have a look at this naive implementation of dematerialize?

sealed class Event<T> {
    class Emission<T>(val item: T) : Event<T>()
    class Error(val error: Throwable) : Event<Nothing>()
    object Complete : Event<Nothing>()
}

fun <T> Flow<Event<T>>.dematerialize(): Flow<T> = 
    takeWhile { it !is Complete }
        .map { 
            when(it) {
                is Emission -> it.item
                is Error -> throw it.error
                Complete -> error("Can not happen")
            }
        }

Is it usable? Is it idiomatic just to throw from map? Will it play nicely with cancelling subscription upstream, cleaning up and whatnot (my source is currently still ConflatedBroadcastChannel.asFlow() which uses openSubscription)? I guess the answers are yes, but I am still too new to coroutines to feel confident
Thanks

P.S. Again, the issue was not about my use case, but about consistency between ConflatedChannel and ConflatedBroadcastChannel. One delivers last value and the other does not. That's just weird and potentially confusing, that's all. It's up to you to leave it like it is.

@elizarov
Copy link
Contributor

so you can write takeWhile { !it.isFinal }
No I can't, because then final result will not be included, which is a bummer.

@pacher Good point. Thanks. See #2042

While we are waiting for proper operators in the library, can you please have a look at this naive implementation of dematerialize?
Is it usable? Is it idiomatic just to throw from map? Will it play nicely with cancelling subscription upstream, cleaning up and whatnot (my source is currently still ConflatedBroadcastChannel.asFlow() which uses openSubscription)? I guess the answers are yes, but I am still too new to coroutines to feel confident

This implementation looks good to me and it is quite idiomatic. It is similar to what we'll write if we make a decision to include that kind of operator into the library.

@elizarov
Copy link
Contributor

We've decided to add Flow.transformWhile operator for now to cover the use-case of delivering last value and then completing. See #2065 for details.

recheej pushed a commit to recheej/kotlinx.coroutines that referenced this issue Dec 28, 2020
StateFlow is a Flow analogue to ConflatedBroadcastChannel. Since Flow API
is simpler than channels APIs, the implementation of StateFlow is
simpler. It consumes and allocates less memory, while still providing
full deadlock-freedom (even though it is not lock-free internally).

Fixes Kotlin#1973
Fixes Kotlin#395
Fixes Kotlin#1816

Co-authored-by: Vsevolod Tolstopyatov <qwwdfsad@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

4 participants