Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove onLock clauses from ExchangeScope #169

Merged
merged 1 commit into from Sep 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 0 additions & 2 deletions echo/api/echo.api
Expand Up @@ -85,9 +85,7 @@ public abstract interface class io/github/alexandrepiveteau/echo/projections/Two

public abstract interface class io/github/alexandrepiveteau/echo/protocol/ExchangeScope : kotlinx/coroutines/channels/ReceiveChannel, kotlinx/coroutines/channels/SendChannel {
public abstract fun getLog ()Lio/github/alexandrepiveteau/echo/core/log/MutableEventLog;
public abstract fun getOnEventLogLock ()Lkotlinx/coroutines/selects/SelectClause1;
public abstract fun getOnEventLogUpdate ()Lkotlinx/coroutines/selects/SelectClause0;
public abstract fun getOnMutableEventLogLock ()Lkotlinx/coroutines/selects/SelectClause1;
public abstract fun lock (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun mutate ()V
public abstract fun unlock ()V
Expand Down
Expand Up @@ -5,7 +5,6 @@ import io.github.alexandrepiveteau.echo.core.log.MutableEventLog
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.SendChannel
import kotlinx.coroutines.selects.SelectClause0
import kotlinx.coroutines.selects.SelectClause1

/**
* The scope in which an exchange is performed with another site. More specifically, it offers some
Expand All @@ -32,12 +31,6 @@ interface ExchangeScope<out I, in O> : ReceiveChannel<I>, SendChannel<O> {
/** Atomically marks that the [MutableEventLog] has been mutated. */
fun mutate()

/** A select clause that is made available when the log is available for reading. */
val onEventLogLock: SelectClause1<EventLog>

/** A select clause that is made available when the log is available for reading and writing. */
val onMutableEventLogLock: SelectClause1<MutableEventLog>

/**
* A [SelectClause0] that is made available when the event log is updated with some new content.
* This is typically used to ensure that the state is notified with the latest log state.
Expand Down
Expand Up @@ -5,8 +5,6 @@ import io.github.alexandrepiveteau.echo.core.buffer.mutableEventIdentifierGapBuf
import io.github.alexandrepiveteau.echo.core.causality.EventIdentifier
import io.github.alexandrepiveteau.echo.core.causality.SequenceNumber
import io.github.alexandrepiveteau.echo.core.causality.binarySearchBySite
import io.github.alexandrepiveteau.echo.core.log.isNotEmpty
import io.github.alexandrepiveteau.echo.core.log.mutableEventLogOf
import io.github.alexandrepiveteau.echo.protocol.Message.Incoming as I
import io.github.alexandrepiveteau.echo.protocol.Message.Outgoing as O
import kotlinx.coroutines.selects.select
Expand Down Expand Up @@ -46,7 +44,6 @@ internal suspend fun ExchangeScope<I, O>.awaitEvents(
// The queue of all the messages that still have to be sent to the other side. Messages are sent
// in a FIFO fashion, and should simply be added to the queue.
val queue = ArrayDeque<O>(advertisements.size)
val events = mutableEventLogOf()

// We are done receiving whenever we've received a message from the other side telling us that
// no more events should be sent. Nevertheless, we should still make sure that all the events have
Expand All @@ -55,7 +52,7 @@ internal suspend fun ExchangeScope<I, O>.awaitEvents(
var isDoneReceiving = false

// Repeat until the channel is closed.
while (!isDoneReceiving || events.isNotEmpty()) {
while (!isDoneReceiving) {

// If we are syncing in a one-shot fashion, terminate if we have already received all the events
// that we were expecting in a session.
Expand Down Expand Up @@ -90,14 +87,6 @@ internal suspend fun ExchangeScope<I, O>.awaitEvents(
onSend(firstMsg) { queue.removeFirst() }
}

// Insert batches of events.
if (events.isNotEmpty()) {
onMutableEventLogLock { log ->
log.merge(events)
events.clear()
}
}

// Receive a message from the other side.
if (!isDoneReceiving) {
onReceiveCatching { v ->
Expand All @@ -106,18 +95,10 @@ internal suspend fun ExchangeScope<I, O>.awaitEvents(
if (!stopAfterAdvertised)
advertisements.push(EventIdentifier(msg.nextSeqno, msg.site))
}
is I.Events -> {

events.merge(
mutableEventLogOf(
*msg.events
.asSequence()
.map { (seqno, site, body) -> EventIdentifier(seqno, site) to body }
.toList()
.toTypedArray(),
),
)
}
is I.Events ->
withMutableEventLogLock {
msg.events.forEach { insert(it.seqno, it.site, it.data) }
}
is I.Ready -> error("Unexpected duplicate Ready.")
null -> isDoneReceiving = true
}
Expand Down
Expand Up @@ -17,7 +17,6 @@ import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.SendChannel
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.selects.SelectClause0
import kotlinx.coroutines.selects.SelectClause1
import kotlinx.coroutines.selects.SelectInstance
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
Expand Down Expand Up @@ -155,28 +154,6 @@ private class ExchangeScopeImpl<I, O>(
override fun unlock() = mutex.unlock()
override fun mutate() = mutation()

override val onEventLogLock =
object : SelectClause1<MutableEventLog> {
override fun <R> registerSelectClause1(
select: SelectInstance<R>,
block: suspend (MutableEventLog) -> R,
) = mutex.onLock.registerSelectClause2(select, null) { block(log).also { mutex.unlock() } }
}

override val onMutableEventLogLock =
object : SelectClause1<MutableEventLog> {
override fun <R> registerSelectClause1(
select: SelectInstance<R>,
block: suspend (MutableEventLog) -> R,
) =
mutex.onLock.registerSelectClause2(select, null) {
block(log).also {
mutate()
mutex.unlock()
}
}
}

override val onEventLogUpdate =
object : SelectClause0 {
override fun <R> registerSelectClause0(
Expand Down