Skip to content

Commit

Permalink
Merge pull request #169 from alexandrepiveteau/no-onlock
Browse files Browse the repository at this point in the history
Remove `onLock` clauses from `ExchangeScope`
  • Loading branch information
alexandrepiveteau committed Sep 13, 2021
2 parents 61c48bc + e36f02f commit 0033f8d
Show file tree
Hide file tree
Showing 4 changed files with 5 additions and 56 deletions.
2 changes: 0 additions & 2 deletions echo/api/echo.api
Expand Up @@ -85,9 +85,7 @@ public abstract interface class io/github/alexandrepiveteau/echo/projections/Two

public abstract interface class io/github/alexandrepiveteau/echo/protocol/ExchangeScope : kotlinx/coroutines/channels/ReceiveChannel, kotlinx/coroutines/channels/SendChannel {
public abstract fun getLog ()Lio/github/alexandrepiveteau/echo/core/log/MutableEventLog;
public abstract fun getOnEventLogLock ()Lkotlinx/coroutines/selects/SelectClause1;
public abstract fun getOnEventLogUpdate ()Lkotlinx/coroutines/selects/SelectClause0;
public abstract fun getOnMutableEventLogLock ()Lkotlinx/coroutines/selects/SelectClause1;
public abstract fun lock (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun mutate ()V
public abstract fun unlock ()V
Expand Down
Expand Up @@ -5,7 +5,6 @@ import io.github.alexandrepiveteau.echo.core.log.MutableEventLog
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.SendChannel
import kotlinx.coroutines.selects.SelectClause0
import kotlinx.coroutines.selects.SelectClause1

/**
* The scope in which an exchange is performed with another site. More specifically, it offers some
Expand All @@ -32,12 +31,6 @@ interface ExchangeScope<out I, in O> : ReceiveChannel<I>, SendChannel<O> {
/** Atomically marks that the [MutableEventLog] has been mutated. */
fun mutate()

/** A select clause that is made available when the log is available for reading. */
val onEventLogLock: SelectClause1<EventLog>

/** A select clause that is made available when the log is available for reading and writing. */
val onMutableEventLogLock: SelectClause1<MutableEventLog>

/**
* A [SelectClause0] that is made available when the event log is updated with some new content.
* This is typically used to ensure that the state is notified with the latest log state.
Expand Down
Expand Up @@ -5,8 +5,6 @@ import io.github.alexandrepiveteau.echo.core.buffer.mutableEventIdentifierGapBuf
import io.github.alexandrepiveteau.echo.core.causality.EventIdentifier
import io.github.alexandrepiveteau.echo.core.causality.SequenceNumber
import io.github.alexandrepiveteau.echo.core.causality.binarySearchBySite
import io.github.alexandrepiveteau.echo.core.log.isNotEmpty
import io.github.alexandrepiveteau.echo.core.log.mutableEventLogOf
import io.github.alexandrepiveteau.echo.protocol.Message.Incoming as I
import io.github.alexandrepiveteau.echo.protocol.Message.Outgoing as O
import kotlinx.coroutines.selects.select
Expand Down Expand Up @@ -46,7 +44,6 @@ internal suspend fun ExchangeScope<I, O>.awaitEvents(
// The queue of all the messages that still have to be sent to the other side. Messages are sent
// in a FIFO fashion, and should simply be added to the queue.
val queue = ArrayDeque<O>(advertisements.size)
val events = mutableEventLogOf()

// We are done receiving whenever we've received a message from the other side telling us that
// no more events should be sent. Nevertheless, we should still make sure that all the events have
Expand All @@ -55,7 +52,7 @@ internal suspend fun ExchangeScope<I, O>.awaitEvents(
var isDoneReceiving = false

// Repeat until the channel is closed.
while (!isDoneReceiving || events.isNotEmpty()) {
while (!isDoneReceiving) {

// If we are syncing in a one-shot fashion, terminate if we have already received all the events
// that we were expecting in a session.
Expand Down Expand Up @@ -90,14 +87,6 @@ internal suspend fun ExchangeScope<I, O>.awaitEvents(
onSend(firstMsg) { queue.removeFirst() }
}

// Insert batches of events.
if (events.isNotEmpty()) {
onMutableEventLogLock { log ->
log.merge(events)
events.clear()
}
}

// Receive a message from the other side.
if (!isDoneReceiving) {
onReceiveCatching { v ->
Expand All @@ -106,18 +95,10 @@ internal suspend fun ExchangeScope<I, O>.awaitEvents(
if (!stopAfterAdvertised)
advertisements.push(EventIdentifier(msg.nextSeqno, msg.site))
}
is I.Events -> {

events.merge(
mutableEventLogOf(
*msg.events
.asSequence()
.map { (seqno, site, body) -> EventIdentifier(seqno, site) to body }
.toList()
.toTypedArray(),
),
)
}
is I.Events ->
withMutableEventLogLock {
msg.events.forEach { insert(it.seqno, it.site, it.data) }
}
is I.Ready -> error("Unexpected duplicate Ready.")
null -> isDoneReceiving = true
}
Expand Down
Expand Up @@ -17,7 +17,6 @@ import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.SendChannel
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.selects.SelectClause0
import kotlinx.coroutines.selects.SelectClause1
import kotlinx.coroutines.selects.SelectInstance
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
Expand Down Expand Up @@ -155,28 +154,6 @@ private class ExchangeScopeImpl<I, O>(
override fun unlock() = mutex.unlock()
override fun mutate() = mutation()

override val onEventLogLock =
object : SelectClause1<MutableEventLog> {
override fun <R> registerSelectClause1(
select: SelectInstance<R>,
block: suspend (MutableEventLog) -> R,
) = mutex.onLock.registerSelectClause2(select, null) { block(log).also { mutex.unlock() } }
}

override val onMutableEventLogLock =
object : SelectClause1<MutableEventLog> {
override fun <R> registerSelectClause1(
select: SelectInstance<R>,
block: suspend (MutableEventLog) -> R,
) =
mutex.onLock.registerSelectClause2(select, null) {
block(log).also {
mutate()
mutex.unlock()
}
}
}

override val onEventLogUpdate =
object : SelectClause0 {
override fun <R> registerSelectClause0(
Expand Down

0 comments on commit 0033f8d

Please sign in to comment.