Skip to content

Commit

Permalink
Add ExecutionContexts as a parameter to MapDb.
Browse files Browse the repository at this point in the history
* Add ExecutionContexts as a parameter to MapDb.

For use with ScalaTest's SerialExecutionContext - to make test order
more deterministic.

Defined a new `ComputeAndBlockingExecutionContext` interface to pass in
to MapDb, made QuineDistpacthes a subclass of that, and then added a
test-only `FromSingleExecutionContext` impl that uses the same
ExecutionContext for both.

Used in MapDbPersistorSpec.
MapDbPersistorTests times out if I try to use SerialExecutionContext -
maybe it's blocking some threads or otherwise relies on multithreading?

* Use SerialExecutionContext in RocksDbPersistorSpec too

* Switched MapDb and RocksDb to use parasitic ExecutionContext in tests

scalatest/scalatest#2314 (comment)

* Also convert HistoricalQueryTests to use parasitic ExecutionContext

GitOrigin-RevId: d83efbcd7126be0a2ad9f6294a03ab52a5e1c54c
  • Loading branch information
LeifW authored and thatbot-copy[bot] committed Mar 7, 2024
1 parent 377e9fd commit 93ec7fa
Show file tree
Hide file tree
Showing 9 changed files with 58 additions and 22 deletions.
@@ -1,20 +1,27 @@
package com.thatdot.quine.util

import scala.concurrent.ExecutionContext

import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.dispatch.MessageDispatcher
import org.apache.pekko.dispatch.{Dispatchers, MessageDispatcher}

import com.typesafe.scalalogging.LazyLogging

import com.thatdot.quine.util.QuineDispatchers._

abstract class ComputeAndBlockingExecutionContexts {
def nodeDispatcherEC: ExecutionContext
def blockingDispatcherEC: ExecutionContext
}

/** Initializes and maintains the canonical reference to each of the dispatchers Quine uses.
* Similar to pekko-typed's DispatcherSelector
*
* See quine-core's `reference.conf` for definitions and documentation of the dispatchers
*
* @param system the actorsystem for which the dispatchers will be retrieved
*/
class QuineDispatchers(system: ActorSystem) extends LazyLogging {
class QuineDispatchers(system: ActorSystem) extends ComputeAndBlockingExecutionContexts with LazyLogging {
val shardDispatcherEC: MessageDispatcher =
system.dispatchers.lookup(shardDispatcherName)
val nodeDispatcherEC: MessageDispatcher =
Expand Down
@@ -0,0 +1,13 @@
package com.thatdot.quine.util

import scala.concurrent.ExecutionContext

/** Use the same EC for both of them. Intended for use with ScalaTest's SerialExecutionContext
* @param executionContext
*/
class FromSingleExecutionContext(executionContext: ExecutionContext) extends ComputeAndBlockingExecutionContexts {

val nodeDispatcherEC: ExecutionContext = executionContext

val blockingDispatcherEC: ExecutionContext = executionContext
}
Expand Up @@ -9,18 +9,19 @@ import org.apache.pekko.stream.Materializer
import com.codahale.metrics.MetricRegistry

import com.thatdot.quine.graph.NamespaceId
import com.thatdot.quine.util.QuineDispatchers
import com.thatdot.quine.util.{ComputeAndBlockingExecutionContexts, QuineDispatchers}

abstract class AbstractMapDbPrimePersistor(
writeAheadLog: Boolean,
commitInterval: FiniteDuration,
metricRegistry: MetricRegistry,
persistenceConfig: PersistenceConfig,
bloomFilterSize: Option[Long] = None
bloomFilterSize: Option[Long] = None,
executionContexts: ComputeAndBlockingExecutionContexts
)(implicit materializer: Materializer)
extends UnifiedPrimePersistor(persistenceConfig, bloomFilterSize) {

private val quineDispatchers = new QuineDispatchers(materializer.system)
//private val quineDispatchers = new QuineDispatchers(materializer.system)
private val interval = Option.when(writeAheadLog)(commitInterval)
def dbForPath(dbPath: MapDbPersistor.DbPath) =
new MapDbPersistor(
Expand All @@ -30,7 +31,7 @@ abstract class AbstractMapDbPrimePersistor(
interval,
persistenceConfig,
metricRegistry,
quineDispatchers,
executionContexts,
materializer.system.scheduler
)

Expand All @@ -41,14 +42,16 @@ class TempMapDbPrimePersistor(
commitInterval: FiniteDuration,
metricRegistry: MetricRegistry,
persistenceConfig: PersistenceConfig,
bloomFilterSize: Option[Long] = None
bloomFilterSize: Option[Long],
executionContexts: ComputeAndBlockingExecutionContexts
)(implicit materializer: Materializer)
extends AbstractMapDbPrimePersistor(
writeAheadLog,
commitInterval,
metricRegistry,
persistenceConfig,
bloomFilterSize
bloomFilterSize,
executionContexts
) {

protected def agentCreator(persistenceConfig: PersistenceConfig, namespace: NamespaceId): PersistenceAgent =
Expand All @@ -66,14 +69,16 @@ class PersistedMapDbPrimePersistor(
commitInterval: FiniteDuration,
metricRegistry: MetricRegistry,
persistenceConfig: PersistenceConfig,
bloomFilterSize: Option[Long] = None
bloomFilterSize: Option[Long],
executionContexts: ComputeAndBlockingExecutionContexts
)(implicit materializer: Materializer)
extends AbstractMapDbPrimePersistor(
writeAheadLog,
commitInterval,
metricRegistry,
persistenceConfig,
bloomFilterSize
bloomFilterSize,
executionContexts
) {

private val parentDir = basePath.getAbsoluteFile.getParentFile
Expand Down
Expand Up @@ -40,7 +40,7 @@ import com.thatdot.quine.persistor.codecs.{
StandingQueryCodec
}
import com.thatdot.quine.util.PekkoStreams.distinctConsecutive
import com.thatdot.quine.util.QuineDispatchers
import com.thatdot.quine.util.{ComputeAndBlockingExecutionContexts, QuineDispatchers}

/** Embedded persistence implementation based on MapDB
*
Expand Down Expand Up @@ -76,7 +76,7 @@ final class MapDbPersistor(
transactionCommitInterval: Option[FiniteDuration] = None,
val persistenceConfig: PersistenceConfig = PersistenceConfig(),
metricRegistry: MetricRegistry = new NoopMetricRegistry(),
quineDispatchers: QuineDispatchers,
executionContexts: ComputeAndBlockingExecutionContexts,
scheduler: Scheduler
) extends PersistenceAgent {

Expand All @@ -85,7 +85,7 @@ final class MapDbPersistor(
val nodeEventTotalSize: Counter =
metricRegistry.counter(MetricRegistry.name("map-db-persistor", "journal-event-total-size"))

import quineDispatchers.{blockingDispatcherEC, nodeDispatcherEC}
import executionContexts.{blockingDispatcherEC, nodeDispatcherEC}

// TODO: Consider: should the concurrencyScale parameter equal the thread pool size in `pekko.quine.persistor-blocking-dispatcher.thread-pool-executor.fixed-pool-size ? Or a multiple of...?
// TODO: don't hardcode magical values - config them
Expand Down
@@ -1,10 +1,11 @@
package com.thatdot.quine.persistor

import scala.concurrent.ExecutionContext
import scala.concurrent.duration.DurationInt

import com.codahale.metrics.{MetricRegistry, NoopMetricRegistry}
import com.codahale.metrics.NoopMetricRegistry

import com.thatdot.quine.util.QuineDispatchers
import com.thatdot.quine.util.FromSingleExecutionContext

class MapDbPersistorSpec extends PersistenceAgentSpec {

Expand All @@ -14,6 +15,8 @@ class MapDbPersistorSpec extends PersistenceAgentSpec {
numberPartitions = 1,
commitInterval = 1.second, // NB this is unused while `writeAheadLog = false
metricRegistry = new NoopMetricRegistry(),
persistenceConfig = PersistenceConfig()
persistenceConfig = PersistenceConfig(),
bloomFilterSize = None,
executionContexts = new FromSingleExecutionContext(ExecutionContext.parasitic)
)
}
@@ -1,10 +1,12 @@
package com.thatdot.quine.persistor

import scala.concurrent.ExecutionContext

import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.stream.Materializer

import com.thatdot.quine.graph.HistoricalQueryTests
import com.thatdot.quine.util.QuineDispatchers
import com.thatdot.quine.util.{FromSingleExecutionContext, QuineDispatchers}

class MapDbPersistorTests extends HistoricalQueryTests {

Expand All @@ -16,7 +18,7 @@ class MapDbPersistorTests extends HistoricalQueryTests {
filePath = MapDbPersistor.InMemoryDb,
ns,
persistenceConfig = pc,
quineDispatchers = new QuineDispatchers(system),
executionContexts = new FromSingleExecutionContext(ExecutionContext.parasitic),
scheduler = system.scheduler
)
)(Materializer.matFromSystem(system))
Expand Down
Expand Up @@ -3,6 +3,8 @@ package com.thatdot.quine.persistor
import java.nio.file.Files
import java.util.Properties

import scala.concurrent.ExecutionContext

import org.apache.pekko.actor.CoordinatedShutdown

import org.apache.commons.io.FileUtils
Expand All @@ -27,7 +29,7 @@ class RocksDbPersistorSpec extends PersistenceAgentSpec {
syncWrites = false,
dbOptionProperties = new Properties(),
PersistenceConfig(),
ioDispatcher = new QuineDispatchers(system).blockingDispatcherEC
ioDispatcher = ExecutionContext.parasitic
)
} else {
new StatelessPrimePersistor(PersistenceConfig(), None, new EmptyPersistor(_, _))
Expand Down
Expand Up @@ -3,6 +3,8 @@ package com.thatdot.quine.persistor
import java.nio.file.Files
import java.util.Properties

import scala.concurrent.ExecutionContext

import org.apache.pekko.actor.{ActorSystem, CoordinatedShutdown}
import org.apache.pekko.stream.Materializer

Expand All @@ -27,7 +29,7 @@ class RocksDbPersistorTests extends HistoricalQueryTests {
dbOptionProperties = new Properties(),
persistenceConfig = PersistenceConfig(),
bloomFilterSize = None,
ioDispatcher = new QuineDispatchers(system).blockingDispatcherEC
ioDispatcher = ExecutionContext.parasitic
)(Materializer.matFromSystem(system))
} else {
new StatelessPrimePersistor(PersistenceConfig(), None, new EmptyPersistor(_, _))(
Expand Down
Expand Up @@ -56,7 +56,8 @@ object PersistenceBuilder extends LazyLogging {
m.commitInterval,
Metrics,
persistenceConfig,
m.bloomFilterSize
m.bloomFilterSize,
quineDispatchers
)
case None =>
new TempMapDbPrimePersistor(
Expand All @@ -65,7 +66,8 @@ object PersistenceBuilder extends LazyLogging {
m.commitInterval,
Metrics,
persistenceConfig,
m.bloomFilterSize
m.bloomFilterSize,
quineDispatchers
)
}

Expand Down

0 comments on commit 93ec7fa

Please sign in to comment.