Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert ReceiveChannel<T> to Flow<T> without consuming #1490

Closed
ZakTaccardi opened this issue Aug 29, 2019 · 9 comments
Closed

Convert ReceiveChannel<T> to Flow<T> without consuming #1490

ZakTaccardi opened this issue Aug 29, 2019 · 9 comments
Assignees

Comments

@ZakTaccardi
Copy link

My ViewModel exposes a stream of events as a ReceiveChannel<T>

// in `ViewModel`
private val events = Channel(capacity = Channel.UNLIMITED) // these are "side effects"

fun events(): ReceiveChannel<T> = events

My activity/fragment observes this stream of events, without consuming on cancellation.

// in an Activity's onCreate

launch {
  for (event in viewModel.events()) {
    ui.handleEvent(event)
  }
}

By not consuming on cancellation (unsubscription), this allows my UI to re-use the same ReceiveChannel<T> instance across configuration changes.

// this exists already
fun <T> ReceiveChannel<T>.consumeAsFlow(): Flow<T>
// this does not
fun <T> ReceiveChannel<T>.asFlowWithoutConsuming: Flow<T>

I think the latter function would be useful. Can it be added?

@elizarov
Copy link
Contributor

Why would you need to have it exposed as Flow type? What is wrong with the code that you currently have that explicitly declares your events with ReceiveChannel<T> type? It looks easier to understand for me this way (the way you have it now).

@ZakTaccardi
Copy link
Author

ZakTaccardi commented Aug 30, 2019

@elizarov because a ReceiveChannel<T> is mutable - it can be closed by the caller.

// this is fine
launch {
  for (event in viewModel.events()) {
    ui.handleEvent(event)
  }
}
// this is a bug, because on rotation when you subscribe again to the same channel instance
// we get a crash because the channel is already closed
launch {
  viewModel.events()
    .consumeEach { event -> ui.handleEvent(event) }
}

@elizarov
Copy link
Contributor

Thanks. That's clear. How about receiveAsFlow name for this conversion?

@ZakTaccardi
Copy link
Author

perfect name!

My original thinking is that receive indicates a single value, but then I realized there is consume for a single value and for multiples there is consumeEach. Now I
am just realizing I originally felt this way because there is no ReceiveChannel.receiveEach. Should there be a .receiveEach { } too? (maybe could make this a separate issue)

@archibishop
Copy link

archibishop commented Sep 23, 2019

Hey @ZakTaccardi , @elizarov. I have had a look at this issue , no one has implemented it yet and would love to take it up and raise a pr on this. To implement this i was thinking of such an approach .

public fun <T> ReceiveChannel<T>.receiveAsFlow(): Flow<T> = ReceiveAsFlow(this)
Then a class similar to the consumeAsFlow that doesn't use the markConsumed().

        private val channel: ReceiveChannel<T>,
        context: CoroutineContext = EmptyCoroutineContext,
        capacity: Int = Channel.OPTIONAL_CHANNEL
) : ChannelFlow<T>(context, capacity) {

    override fun create(context: CoroutineContext, capacity: Int): ChannelFlow<T> =
            ConsumeAsFlow(channel, context, capacity)

    override suspend fun collectTo(scope: ProducerScope<T>) =
            SendingCollector(scope).emitAll(channel) // use efficient channel receiving code from emitAll

    override fun produceImpl(scope: CoroutineScope): ReceiveChannel<T> {
        return if (capacity == Channel.OPTIONAL_CHANNEL) {
            channel // direct
        } else
            super.produceImpl(scope) // extra buffering channel
    }

    override suspend fun collect(collector: FlowCollector<T>) {
        if (capacity == Channel.OPTIONAL_CHANNEL) {
            collector.emitAll(channel) // direct
        } else {
            super.collect(collector) // extra buffering channel, produceImpl will mark it as consumed
        }
    }

    override fun additionalToStringProps(): String = "channel=$channel, "
}

Any thoughts and feedback on this will be appreciated on whether the approach is feasible or not. Thanks.

@elizarov
Copy link
Contributor

elizarov commented Sep 23, 2019

There should not be any cut-and-paste in this method implementation, as we are concerned about the overall size of the library. I've sketched the right-looking implementation in this commit: b16beb0 It needs tests, though. There are also open design issue with respect to the consistency of operator naming and its interaction with emitAll. We'll get back to it when time permits.

@archibishop
Copy link

Thanks for the feedback. I have seen the implementation and definitely learned from it.

@ZakTaccardi
Copy link
Author

Here's a super basic implementation, though may have race conditions where the ReceiveChannel<T> emits but the Flow<T> doesn't due to a race condition around unsubscription?

/**
 * Converts `this` [ReceiveChannel] into a [Flow] without closing the [ReceiveChannel] when the [Flow] is cancelled.
 */
fun <T> ReceiveChannel<T>.receiveAsFlow(): Flow<T> = flow {
    val flowCollector = this

    for (item in this@receiveAsFlow) {
        flowCollector.emit(item)
    }
}

elizarov added a commit that referenced this issue Dec 24, 2019
* Experimental ReceiveChannel.receiveAsFlow extension convert channel to flow in fan-out fashion allowing for multi-use.
* Also, ReceiveChannel.consumeAsFlow is promoted to experimental from preview

Fixes #1490
@elizarov elizarov self-assigned this Dec 24, 2019
@stevefan1999-personal
Copy link

stevefan1999-personal commented Sep 21, 2020

@ZakTaccardi I have faced a weird problem as your implementation worked for me, that I was using kotlinx coroutines with vertx kotlin coroutine extension where they expose the pubsub as a channel, and I tried to use it as a receive flow by the extensions to channel.

Unfortunately it didn't work, and so I came up with the equivalent of what you did:

@ExperimentalCoroutinesApi
fun <T> ReceiveChannel<T>.receiveAsFlowByIterationUnsafe(): Flow<T> = flow {
    this@receiveAsFlowByIterationUnsafe.consumeEach { emit(it) }
}

The original form is like this:

val eventFlow = flow { vertx.eventBus().localConsumer<Event<String, NetSocket>>("fire").toChannel(vertx).consumeEach { emit(it) } }
// do any thing...

After I checked out the consumeEach function I see that it is shockingly the same as you do, and also because receiveAsFlow and Flow.emitAll(chan: ReceiveChannel) are equivalent we cannot use it as an alternative substitute. So I'm also left hanging with the problem you faced

It seems like we need to do a redesign regarding channel and flow. I don't know why, but for some reason there aren't receiveOrClosed from RendezvousChannel?

Anyway, this might be the final form of my extension that preserved the relations receiveAsFlow had while sidestepping the receiveOrClosed abstract method problem:

@ExperimentalCoroutinesApi
fun <T> ReceiveChannel<T>.receiveAsFlowByIteration(): Flow<T> = flow {
    cancel(CancellationException("receive stream terminated",
        this@receiveAsFlowByIteration.runCatching { while (!isClosedForReceive) emit(receive()) }
            .exceptionOrNull()
    ))
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants