Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make ConflatedBroadcastChannel allocation-free (#395) #402

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
@@ -1,5 +1,17 @@
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
* Copyright 2016-2017 JetBrains s.r.o.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kotlinx.coroutines.experimental.channels
Expand All @@ -8,6 +20,15 @@ import kotlinx.atomicfu.*
import kotlinx.coroutines.experimental.internal.*
import kotlinx.coroutines.experimental.intrinsics.*
import kotlinx.coroutines.experimental.selects.*
import kotlinx.coroutines.experimental.internal.ReentrantLock
import kotlinx.coroutines.experimental.internal.Symbol
import kotlinx.coroutines.experimental.internal.subscriberList
import kotlinx.coroutines.experimental.internal.withLock
import kotlinx.coroutines.experimental.internalAnnotations.JvmField
import kotlinx.coroutines.experimental.internalAnnotations.Volatile
import kotlinx.coroutines.experimental.intrinsics.startCoroutineUndispatched
import kotlinx.coroutines.experimental.selects.SelectClause2
import kotlinx.coroutines.experimental.selects.SelectInstance

/**
* Broadcasts the most recently sent element (aka [value]) to all [openSubscription] subscribers.
Expand All @@ -20,46 +41,24 @@ import kotlinx.coroutines.experimental.selects.*
* A secondary constructor can be used to create an instance of this class that already holds a value.
* This channel is also created by `BroadcastChannel(Channel.CONFLATED)` factory function invocation.
*
* This implementation is fully lock-free. In this implementation
* This implementation is synchronized. In this implementation
* [opening][openSubscription] and [closing][ReceiveChannel.cancel] subscription takes O(N) time, where N is the
* number of subscribers.
*/
public class ConflatedBroadcastChannel<E>() : BroadcastChannel<E> {
/**
* Creates an instance of this class that already holds a value.
*
* It is as a shortcut to creating an instance with a default constructor and
* immediately sending an element: `ConflatedBroadcastChannel().apply { offer(value) }`.
*/
constructor(value: E) : this() {
_state.lazySet(State<E>(value, null))

public constructor(value: E) : this() {
_state = value
}

private val _state = atomic<Any>(INITIAL_STATE) // State | Closed
private val _updating = atomic(0)
// State transitions: null -> handler -> HANDLER_INVOKED
private val onCloseHandler = atomic<Any?>(null)
private val _lock = ReentrantLock()

private companion object {
@JvmField
val CLOSED = Closed(null)
@Volatile
private var _state: Any? = UNDEFINED

@JvmField
val UNDEFINED = Symbol("UNDEFINED")
private val _subscribers = subscriberList<Subscriber>()

@JvmField
val INITIAL_STATE = State<Any?>(UNDEFINED, null)
}

private class State<E>(
@JvmField val value: Any?, // UNDEFINED | E
@JvmField val subscribers: Array<Subscriber<E>>?
)

private class Closed(@JvmField val closeCause: Throwable?) {
val sendException: Throwable get() = closeCause ?: ClosedSendChannelException(DEFAULT_CLOSE_MESSAGE)
val valueException: Throwable get() = closeCause ?: IllegalStateException(DEFAULT_CLOSE_MESSAGE)
}
private val _onCloseHandlers = subscriberList<Handler>()

/**
* The most recently sent element to this channel.
Expand All @@ -69,211 +68,140 @@ public class ConflatedBroadcastChannel<E>() : BroadcastChannel<E> {
* It throws the original [close][SendChannel.close] cause exception if the channel has _failed_.
*/
@Suppress("UNCHECKED_CAST")
public val value: E get() {
_state.loop { state ->
when (state) {
is Closed -> throw state.valueException
is State<*> -> {
if (state.value === UNDEFINED) throw IllegalStateException("No value")
return state.value as E
}
else -> error("Invalid state $state")
}
public val value: E
get() {
val state = _state
if (state === UNDEFINED) throw IllegalStateException("No value")
(state as? Closed)?.run { throw valueException }
return state as E
}
}

/**
* The most recently sent element to this channel or `null` when this class is constructed without
* initial value and no value was sent yet or if it was [closed][close].
*/
@Suppress("UNCHECKED_CAST")
public val valueOrNull: E? get() {
val state = _state.value
when (state) {
is Closed -> return null
is State<*> -> {
if (state.value === UNDEFINED) return null
return state.value as E
public val valueOrNull: E?
get() {
val state = _state
return when {
state === UNDEFINED -> null
state is Closed -> state.closeCause?.let { throw it }
else -> state as E
}
else -> error("Invalid state $state")
}
}

public override val isClosedForSend: Boolean get() = _state.value is Closed
public override val isFull: Boolean get() = false

@Suppress("UNCHECKED_CAST")
public override fun openSubscription(): ReceiveChannel<E> {
val subscriber = Subscriber(this)
_state.loop { state ->
when (state) {
is Closed -> {
subscriber.close(state.closeCause)
return subscriber
}
is State<*> -> {
if (state.value !== UNDEFINED)
subscriber.offerInternal(state.value as E)
val update = State(state.value, addSubscriber((state as State<E>).subscribers, subscriber))
if (_state.compareAndSet(state, update))
return subscriber
}
else -> error("Invalid state $state")
}
}
}
public override val isClosedForSend: Boolean get() = _state is Closed

@Suppress("UNCHECKED_CAST")
private fun closeSubscriber(subscriber: Subscriber<E>) {
_state.loop { state ->
when (state) {
is Closed -> return
is State<*> -> {
val update = State(state.value, removeSubscriber((state as State<E>).subscribers!!, subscriber))
if (_state.compareAndSet(state, update))
return
public override val onSend: SelectClause2<E, SendChannel<E>>
get() = object : SelectClause2<E, SendChannel<E>> {
override fun <R> registerSelectClause2(select: SelectInstance<R>, param: E, block: suspend (SendChannel<E>) -> R) {
if (!select.trySelect(null)) return
offerInternal(param)?.let {
select.resumeSelectCancellableWithException(it.sendException)
return
}
else -> error("Invalid state $state")
block.startCoroutineUnintercepted(receiver = this@ConflatedBroadcastChannel, completion = select.completion)
}
}
}

private fun addSubscriber(list: Array<Subscriber<E>>?, subscriber: Subscriber<E>): Array<Subscriber<E>> {
if (list == null) return Array(1) { subscriber }
return list + subscriber
}

@Suppress("UNCHECKED_CAST")
private fun removeSubscriber(list: Array<Subscriber<E>>, subscriber: Subscriber<E>): Array<Subscriber<E>>? {
val n = list.size
val i = list.indexOf(subscriber)
check(i >= 0)
if (n == 1) return null
val update = arrayOfNulls<Subscriber<E>>(n - 1)
arraycopy(list, 0, update, 0, i)
arraycopy(list, i + 1, update, i, n - i - 1)
return update as Array<Subscriber<E>>
}
public override fun cancel(cause: Throwable?): Boolean = close(cause)

@Suppress("UNCHECKED_CAST")
public override fun close(cause: Throwable?): Boolean {
_state.loop { state ->
when (state) {
is Closed -> return false
is State<*> -> {
val update = if (cause == null) CLOSED else Closed(cause)
if (_state.compareAndSet(state, update)) {
(state as State<E>).subscribers?.forEach { it.close(cause) }
invokeOnCloseHandler(cause)
return true
}
}
else -> error("Invalid state $state")
}
_lock.withLock {
if (_state is Closed) return false
_state = Closed(cause)

// dispose all subscribers
_subscribers.forEach { it.close(cause) }
_subscribers.clear()

_onCloseHandlers.forEach { it.invoke(cause) }
_onCloseHandlers.clear()

return true
}
}

private fun invokeOnCloseHandler(cause: Throwable?) {
val handler = onCloseHandler.value
if (handler !== null && handler !== HANDLER_INVOKED
&& onCloseHandler.compareAndSet(handler, HANDLER_INVOKED)) {
(handler as Handler)(cause)
}
public override fun offer(element: E): Boolean {
offerInternal(element)?.let { throw it.sendException }
return true
}

override fun invokeOnClose(handler: Handler) {
// Intricate dance for concurrent invokeOnClose and close
if (!onCloseHandler.compareAndSet(null, handler)) {
val value = onCloseHandler.value
if (value === HANDLER_INVOKED) {
throw IllegalStateException("Another handler was already registered and successfully invoked")
} else {
throw IllegalStateException("Another handler was already registered: $value")
private fun offerInternal(element: E): Closed? {
if (_lock.tryLock()) {
try {
(_state as? Closed)?.let { return it }

_state = element
_subscribers.forEach { it.tryOffer(element) }
return null
} finally {
_lock.unlock()
}
} else {
val state = _state.value
if (state is Closed && onCloseHandler.compareAndSet(handler, HANDLER_INVOKED)) {
(handler)(state.closeCause)
}
return _state as? Closed
}
}

/**
* Closes this broadcast channel. Same as [close].
*/
public override fun cancel(cause: Throwable?): Boolean = close(cause)
public override fun openSubscription(): ReceiveChannel<E> {
val subscriber = Subscriber()
_subscribers.add(subscriber)

do {
val state = _state
@Suppress("UNCHECKED_CAST")
when {
state is Closed -> subscriber.close(state.closeCause)
state !== UNDEFINED -> subscriber.tryOffer(state as E)
}
// manage offerInternal/close contention
} while (_state !== state)

/**
* Sends the value to all subscribed receives and stores this value as the most recent state for
* future subscribers. This implementation never suspends.
* It throws exception if the channel [isClosedForSend] (see [close] for details).
*/
public override suspend fun send(element: E) {
offerInternal(element)?.let { throw it.sendException }
if (subscriber.isClosedForSend) _subscribers.remove(subscriber)
return subscriber
}

/**
* Sends the value to all subscribed receives and stores this value as the most recent state for
* future subscribers. This implementation always returns `true`.
* It throws exception if the channel [isClosedForSend] (see [close] for details).
*/
public override fun offer(element: E): Boolean {
offerInternal(element)?.let { throw it.sendException }
return true
public override suspend fun send(element: E) {
offerInternal(element)?.run { throw sendException }
}

@Suppress("UNCHECKED_CAST")
private fun offerInternal(element: E): Closed? {
// If some other thread is updating the state in its offer operation we assume that our offer had linearized
// before that offer (we lost) and that offer overwrote us and conflated our offer.
if (!_updating.compareAndSet(0, 1)) return null
try {
_state.loop { state ->
when (state) {
is Closed -> return state
is State<*> -> {
val update = State(element, (state as State<E>).subscribers)
if (_state.compareAndSet(state, update)) {
// Note: Using offerInternal here to ignore the case when this subscriber was
// already concurrently closed (assume the close had conflated our offer for this
// particular subscriber).
state.subscribers?.forEach { it.offerInternal(element) }
return null
}
}
else -> error("Invalid state $state")
}
override fun invokeOnClose(handler: Handler) {
_lock.withLock {
val state = _state
if (state is Closed) {
handler(state.closeCause)
} else {
_onCloseHandlers += handler
}
} finally {
_updating.value = 0 // reset the updating flag to zero even when something goes wrong
}
}

public override val onSend: SelectClause2<E, SendChannel<E>>
get() = object : SelectClause2<E, SendChannel<E>> {
override fun <R> registerSelectClause2(select: SelectInstance<R>, param: E, block: suspend (SendChannel<E>) -> R) {
registerSelectSend(select, param, block)
}
}
private inner class Subscriber : ConflatedChannel<E>() {

private fun <R> registerSelectSend(select: SelectInstance<R>, element: E, block: suspend (SendChannel<E>) -> R) {
if (!select.trySelect(null)) return
offerInternal(element)?.let {
select.resumeSelectCancellableWithException(it.sendException)
return
/**
* Offer an element without throw exception
*/
fun tryOffer(element: E) {
super.offerInternal(element)
}
block.startCoroutineUnintercepted(receiver = this, completion = select.completion)
}

@Suppress("DEPRECATION")
private class Subscriber<E>(
private val broadcastChannel: ConflatedBroadcastChannel<E>
) : ConflatedChannel<E>(), ReceiveChannel<E>, SubscriptionReceiveChannel<E> {
override fun cancel(cause: Throwable?): Boolean =
close(cause).also { closed ->
if (closed) broadcastChannel.closeSubscriber(this)
override fun cancel(cause: Throwable?): Boolean =
super.cancel(cause).also { closed ->
if (closed) this@ConflatedBroadcastChannel._subscribers.remove(this)
}
}

public override fun offerInternal(element: E): Any = super.offerInternal(element)
}
private class Closed(@JvmField val closeCause: Throwable?) {
val sendException: Throwable get() = closeCause ?: ClosedSendChannelException(DEFAULT_CLOSE_MESSAGE)
val valueException: Throwable get() = closeCause ?: IllegalStateException(DEFAULT_CLOSE_MESSAGE)
}

private companion object {
@JvmField
val UNDEFINED = Symbol("UNDEFINED")
}
}