Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make ConflatedBroadcastChannel allocation-free #395

Closed
elizarov opened this issue Jun 11, 2018 · 7 comments
Closed

Make ConflatedBroadcastChannel allocation-free #395

elizarov opened this issue Jun 11, 2018 · 7 comments

Comments

@elizarov
Copy link
Contributor

elizarov commented Jun 11, 2018

ConflatedBroadcastChannel is used in UI applications as "observable value source" and it would be useful to make it operate in a fully allocation-free way on its send-receive path even though for the cost of losing some lock-freedom (it is not much of importance there).

@fvasco
Copy link
Contributor

fvasco commented Jun 11, 2018

"Observable value source" was discussed here #274

I made a similar proposal (with some issues), here the synchronized version.

import kotlinx.coroutines.experimental.channels.Closed
import kotlinx.coroutines.experimental.channels.ConflatedChannel
import kotlinx.coroutines.experimental.channels.ReceiveChannel

private class SubscribableVariableImpl<T>(initialValue: T) : SubscribableVariable<T> {

    /**
     * Subscription list
     */
    private var _subscriptions: Array<Subscription>? = null


    @Volatile
    public override var value: T = initialValue
        set(newValue) {
            // lock value and notify value update
            synchronized(this) {
                field = newValue
                _subscriptions?.forEach { it.offer(newValue) }
            }
        }

    public override fun openSubscription(): ReceiveChannel<T> {
        val subscription = Subscription()
        synchronized(this) {
            _subscriptions = _subscriptions?.let { it + subscription } ?: arrayOf(subscription)
            // offer initial value
            subscription.offer(value)
        }
        return subscription
    }

    /**
     * Remove [subscription] from [_subscriptions] list
     */
    private fun removeSubscription(subscription: Subscription) {
        synchronized(this) {
            val oldSubscriptions = _subscriptions ?: error("Subscription not found")

            if (oldSubscriptions.size == 1) {
                if (oldSubscriptions[0] == subscription) {
                    _subscriptions = null
                    return
                } else {
                    error("Subscription not found")
                }
            }

            val newSubscriptions: Array<Subscription?> = arrayOfNulls(oldSubscriptions.size - 1)
            var newI = 0
            for (oldSubscription in oldSubscriptions) {
                if (oldSubscription != subscription) {
                    if (newI == newSubscriptions.size)
                        error("Subscription not found")
                    newSubscriptions[newI] = oldSubscription
                    newI++
                }
            }
            _subscriptions = newSubscriptions as Array<Subscription>
        }
    }

    /**
     * Private [ReceiveChannel] implementation
     */
    private inner class Subscription : ConflatedChannel<T>(), ReceiveChannel<T> {
        override fun onClosed(closed: Closed<T>) {
            removeSubscription(this)
            super.onClosed(closed)
        }
    }
}

@fvasco
Copy link
Contributor

fvasco commented Jun 12, 2018

In the above draft replacing Array to MutableList reduces the code complexity and allocation in synchronized blocks.

Moreover the inner class Subscription is now unuseful.

@elizarov
Copy link
Contributor Author

@fvasco Does it pass all the ConflatedBroadcastChannel tests? If it does, then you can consider making a PR with the code, marking it as "Fixes #395" (this issue).

@gildor
Copy link
Contributor

gildor commented Apr 8, 2019

Any updates on this? Maybe something changed after the release of cold streams preview because even now with Flow, to implement something like "observable value source" you have to use a channel with all allocation problems, maybe we should expect some optimization at least for channel created for Flow (flowViaChannel() builder), because it can guarantee a single producer and a single consumer

@elizarov
Copy link
Contributor Author

No updates yet. We are now focused on two areas:

  1. Integrating flows
  2. Rewriting regular channels for better performance. However, the focus is on scalability under load and their throughput for server-side applications (think of competing with Go) not on allocation-freedom itself. They will allocate less but will not be free from allocations. Moreover, the first release is going to address only point-to-point channels. "BroadcastChannels" rewrite to the new, faster algorithms, will have to wait.

The idea that I have in mind, though, is that for #1082, instead of turning "ConflatedBroadcastChannel" into the "DataFlow" we can reimplement "DataFlow" from scratch, maybe dropping some for its channel features, thus making it simpler, and open the road to lock-based allocation-free implementation.

@gildor
Copy link
Contributor

gildor commented Apr 15, 2019

@elizarov Thanks for the update!
Yes, I understand that Flow now has higher priority, I also agree that probably reimplementing it as DataFlow with some more specific use case is a better approach than attempt to fix ConflatedBroadcastChannel.
I believe that it's a crucial part of kotlinx.coroutines required for many UI use cases, you can see that even now we already have a few attempts to implement Reactive Behavior/LiveData like streams for reactive UI implementation for MPP projects, also it's important to projects who would like to switch (or start from scratch) to kotlinx.coroutines from LiveData or RxJava.

elizarov added a commit that referenced this issue May 7, 2020
StateFlow is a Flow analogue to ConflatedBroadcastChannel. Since Flow API
is simpler than channels APIs, the implementation of StateFlow is
simpler. It consumes and allocates less memory, while still providing
full deadlock-freedom (even though it is not lock-free internally).

Fixes #1973
Fixes #395
@elizarov
Copy link
Contributor Author

elizarov commented May 7, 2020

This issue will be fixed in #1974 by providing allocation-free capabilities similar to ConflatedBroadcastChannel.

elizarov added a commit that referenced this issue May 7, 2020
StateFlow is a Flow analogue to ConflatedBroadcastChannel. Since Flow API
is simpler than channels APIs, the implementation of StateFlow is
simpler. It consumes and allocates less memory, while still providing
full deadlock-freedom (even though it is not lock-free internally).

Fixes #1973
Fixes #395
Fixes #1816
recheej pushed a commit to recheej/kotlinx.coroutines that referenced this issue Dec 28, 2020
StateFlow is a Flow analogue to ConflatedBroadcastChannel. Since Flow API
is simpler than channels APIs, the implementation of StateFlow is
simpler. It consumes and allocates less memory, while still providing
full deadlock-freedom (even though it is not lock-free internally).

Fixes Kotlin#1973
Fixes Kotlin#395
Fixes Kotlin#1816

Co-authored-by: Vsevolod Tolstopyatov <qwwdfsad@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants