Skip to content

Commit

Permalink
Make ConflatedBroadcastChannel allocation-free
Browse files Browse the repository at this point in the history
  • Loading branch information
fvasco committed Jul 25, 2018
1 parent 0cf99bc commit 0c6de5a
Showing 1 changed file with 99 additions and 177 deletions.
Expand Up @@ -16,11 +16,15 @@

package kotlinx.coroutines.experimental.channels

import kotlinx.atomicfu.*
import kotlinx.coroutines.experimental.internal.*
import kotlinx.coroutines.experimental.internalAnnotations.*
import kotlinx.coroutines.experimental.intrinsics.*
import kotlinx.coroutines.experimental.selects.*
import kotlinx.coroutines.experimental.internal.ReentrantLock
import kotlinx.coroutines.experimental.internal.Symbol
import kotlinx.coroutines.experimental.internal.subscriberList
import kotlinx.coroutines.experimental.internal.withLock
import kotlinx.coroutines.experimental.internalAnnotations.JvmField
import kotlinx.coroutines.experimental.internalAnnotations.Volatile
import kotlinx.coroutines.experimental.intrinsics.startCoroutineUndispatched
import kotlinx.coroutines.experimental.selects.SelectClause2
import kotlinx.coroutines.experimental.selects.SelectInstance

/**
* Broadcasts the most recently sent element (aka [value]) to all [openSubscription] subscribers.
Expand All @@ -33,44 +37,22 @@ import kotlinx.coroutines.experimental.selects.*
* A secondary constructor can be used to create an instance of this class that already holds a value.
* This channel is also created by `BroadcastChannel(Channel.CONFLATED)` factory function invocation.
*
* This implementation is fully lock-free. In this implementation
* This implementation is synchronized. In this implementation
* [opening][openSubscription] and [closing][ReceiveChannel.cancel] subscription takes O(N) time, where N is the
* number of subscribers.
*/
public class ConflatedBroadcastChannel<E>() : BroadcastChannel<E> {
/**
* Creates an instance of this class that already holds a value.
*
* It is as a shortcut to creating an instance with a default constructor and
* immediately sending an element: `ConflatedBroadcastChannel().apply { offer(value) }`.
*/
constructor(value: E) : this() {
_state.lazySet(State<E>(value, null))
}

private val _state = atomic<Any>(INITIAL_STATE) // State | Closed
private val _updating = atomic(0)

private companion object {
@JvmField
val CLOSED = Closed(null)

@JvmField
val UNDEFINED = Symbol("UNDEFINED")

@JvmField
val INITIAL_STATE = State<Any?>(UNDEFINED, null)
public constructor(value: E) : this() {
_state = value
}

private class State<E>(
@JvmField val value: Any?, // UNDEFINED | E
@JvmField val subscribers: Array<Subscriber<E>>?
)
private val _lock = ReentrantLock()

private class Closed(@JvmField val closeCause: Throwable?) {
val sendException: Throwable get() = closeCause ?: ClosedSendChannelException(DEFAULT_CLOSE_MESSAGE)
val valueException: Throwable get() = closeCause ?: IllegalStateException(DEFAULT_CLOSE_MESSAGE)
}
@Volatile
private var _state: Any? = UNDEFINED

private var _subscribers = subscriberList<Subscriber>()

/**
* The most recently sent element to this channel.
Expand All @@ -80,184 +62,124 @@ public class ConflatedBroadcastChannel<E>() : BroadcastChannel<E> {
* It throws the original [close][SendChannel.close] cause exception if the channel has _failed_.
*/
@Suppress("UNCHECKED_CAST")
public val value: E get() {
_state.loop { state ->
when (state) {
is Closed -> throw state.valueException
is State<*> -> {
if (state.value === UNDEFINED) throw IllegalStateException("No value")
return state.value as E
}
else -> error("Invalid state $state")
}
public val value: E
get() {
val state = _state
if (state === UNDEFINED) throw IllegalStateException("No value")
(state as? Closed)?.run { throw valueException }
return state as E
}
}

/**
* The most recently sent element to this channel or `null` when this class is constructed without
* initial value and no value was sent yet or if it was [closed][close].
*/
@Suppress("UNCHECKED_CAST")
public val valueOrNull: E? get() {
val state = _state.value
when (state) {
is Closed -> return null
is State<*> -> {
if (state.value === UNDEFINED) return null
return state.value as E
public val valueOrNull: E?
get() {
val state = _state
return when {
state === UNDEFINED -> null
state is Closed -> state.closeCause?.let { throw it }
else -> state as E
}
else -> error("Invalid state $state")
}
}

public override val isClosedForSend: Boolean get() = _state.value is Closed
public override val isFull: Boolean get() = false

@Suppress("UNCHECKED_CAST")
public override fun openSubscription(): ReceiveChannel<E> {
val subscriber = Subscriber<E>(this)
_state.loop { state ->
when (state) {
is Closed -> {
subscriber.close(state.closeCause)
return subscriber
}
is State<*> -> {
if (state.value !== UNDEFINED)
subscriber.offerInternal(state.value as E)
val update = State(state.value, addSubscriber((state as State<E>).subscribers, subscriber))
if (_state.compareAndSet(state, update))
return subscriber
}
else -> error("Invalid state $state")
}
}
}
public override val isClosedForSend: Boolean get() = _state is Closed

@Suppress("UNCHECKED_CAST")
private fun closeSubscriber(subscriber: Subscriber<E>) {
_state.loop { state ->
when (state) {
is Closed -> return
is State<*> -> {
val update = State(state.value, removeSubscriber((state as State<E>).subscribers!!, subscriber))
if (_state.compareAndSet(state, update))
return
public override val onSend: SelectClause2<E, SendChannel<E>>
get() = object : SelectClause2<E, SendChannel<E>> {
override fun <R> registerSelectClause2(select: SelectInstance<R>, param: E, block: suspend (SendChannel<E>) -> R) {
if (!select.trySelect(null)) return
offerInternal(param)?.let {
select.resumeSelectCancellableWithException(it.sendException)
return
}
else -> error("Invalid state $state")
block.startCoroutineUndispatched(receiver = this@ConflatedBroadcastChannel, completion = select.completion)
}
}
}

private fun addSubscriber(list: Array<Subscriber<E>>?, subscriber: Subscriber<E>): Array<Subscriber<E>> {
if (list == null) return Array(1) { subscriber }
return list + subscriber
}

@Suppress("UNCHECKED_CAST")
private fun removeSubscriber(list: Array<Subscriber<E>>, subscriber: Subscriber<E>): Array<Subscriber<E>>? {
val n = list.size
val i = list.indexOf(subscriber)
check(i >= 0)
if (n == 1) return null
val update = arrayOfNulls<Subscriber<E>>(n - 1)
arraycopy(list, 0, update, 0, i)
arraycopy(list, i + 1, update, i, n - i - 1)
return update as Array<Subscriber<E>>
}
public override fun cancel(cause: Throwable?): Boolean = close(cause)

@Suppress("UNCHECKED_CAST")
public override fun close(cause: Throwable?): Boolean {
_state.loop { state ->
when (state) {
is Closed -> return false
is State<*> -> {
val update = if (cause == null) CLOSED else Closed(cause)
if (_state.compareAndSet(state, update)) {
(state as State<E>).subscribers?.forEach { it.close(cause) }
return true
}
}
else -> error("Invalid state $state")
}
_lock.withLock {
if (_state is Closed) return false
_state = Closed(cause)

// dispose all subscribers
_subscribers.forEach { it.close(cause) }
_subscribers.clear()
return true
}
}

/**
* Closes this broadcast channel. Same as [close].
*/
public override fun cancel(cause: Throwable?): Boolean = close(cause)

/**
* Sends the value to all subscribed receives and stores this value as the most recent state for
* future subscribers. This implementation never suspends.
* It throws exception if the channel [isClosedForSend] (see [close] for details).
*/
public override suspend fun send(element: E) {
offerInternal(element)?.let { throw it.sendException }
}

/**
* Sends the value to all subscribed receives and stores this value as the most recent state for
* future subscribers. This implementation always returns `true`.
* It throws exception if the channel [isClosedForSend] (see [close] for details).
*/
public override fun offer(element: E): Boolean {
offerInternal(element)?.let { throw it.sendException }
return true
}

@Suppress("UNCHECKED_CAST")
private fun offerInternal(element: E): Closed? {
// If some other thread is updating the state in its offer operation we assume that our offer had linearized
// before that offer (we lost) and that offer overwrote us and conflated our offer.
if (!_updating.compareAndSet(0, 1)) return null
try {
_state.loop { state ->
when (state) {
is Closed -> return state
is State<*> -> {
val update = State(element, (state as State<E>).subscribers)
if (_state.compareAndSet(state, update)) {
// Note: Using offerInternal here to ignore the case when this subscriber was
// already concurrently closed (assume the close had conflated our offer for this
// particular subscriber).
state.subscribers?.forEach { it.offerInternal(element) }
return null
}
}
else -> error("Invalid state $state")
}
if (_lock.tryLock()) {
try {
(_state as? Closed)?.let { return it }

_state = element
_subscribers.forEach { it.tryOffer(element) }
return null
} finally {
_lock.unlock()
}
} finally {
_updating.value = 0 // reset the updating flag to zero even when something goes wrong
} else {
return _state as? Closed
}
}

public override val onSend: SelectClause2<E, SendChannel<E>>
get() = object : SelectClause2<E, SendChannel<E>> {
override fun <R> registerSelectClause2(select: SelectInstance<R>, param: E, block: suspend (SendChannel<E>) -> R) {
registerSelectSend(select, param, block)
public override fun openSubscription(): ReceiveChannel<E> {
val subscriber = Subscriber()
_subscribers.add(subscriber)

do {
val state = _state
@Suppress("UNCHECKED_CAST")
when {
state is Closed -> subscriber.close(state.closeCause)
state !== UNDEFINED -> subscriber.tryOffer(state as E)
}
}
// manage offerInternal/close contention
} while (_state !== state)

private fun <R> registerSelectSend(select: SelectInstance<R>, element: E, block: suspend (SendChannel<E>) -> R) {
if (!select.trySelect(null)) return
offerInternal(element)?.let {
select.resumeSelectCancellableWithException(it.sendException)
return
}
block.startCoroutineUndispatched(receiver = this, completion = select.completion)
if (subscriber.isClosedForSend) _subscribers.remove(subscriber)
return subscriber
}

public override suspend fun send(element: E) {
offerInternal(element)?.run { throw sendException }
}

private class Subscriber<E>(
private val broadcastChannel: ConflatedBroadcastChannel<E>
) : ConflatedChannel<E>(), ReceiveChannel<E>, SubscriptionReceiveChannel<E> {
private inner class Subscriber : ConflatedChannel<E>() {

/**
* Offer an element without throw exception
*/
fun tryOffer(element: E) {
super.offerInternal(element)
}

override fun cancel(cause: Throwable?): Boolean =
close(cause).also { closed ->
if (closed) broadcastChannel.closeSubscriber(this)
}
super.cancel(cause).also { closed ->
if (closed) this@ConflatedBroadcastChannel._subscribers.remove(this)
}
}

public override fun offerInternal(element: E): Any = super.offerInternal(element)
private class Closed(@JvmField val closeCause: Throwable?) {
val sendException: Throwable get() = closeCause ?: ClosedSendChannelException(DEFAULT_CLOSE_MESSAGE)
val valueException: Throwable get() = closeCause ?: IllegalStateException(DEFAULT_CLOSE_MESSAGE)
}

private companion object {
@JvmField
val UNDEFINED = Symbol("UNDEFINED")
}
}
}

0 comments on commit 0c6de5a

Please sign in to comment.