Skip to content

Commit

Permalink
Merge pull request #6 from lightbend/optimise
Browse files Browse the repository at this point in the history
Optimisations and benchmarking
  • Loading branch information
huntc committed Aug 14, 2023
2 parents 016ff27 + 88c657f commit 12f4f48
Show file tree
Hide file tree
Showing 11 changed files with 280 additions and 181 deletions.
7 changes: 7 additions & 0 deletions Cargo.toml
Expand Up @@ -12,16 +12,19 @@ async-trait = "0.1.72"
chrono = "0.4.23"
ciborium = "0.2"
clap = "4.0.29"
criterion = "0.4.0"
env_logger = "0.10.0"
git-version = "0.3.5"
hex = "0.4.3"
humantime = "2.1.0"
itoa = "1.0"
log = "0.4.17"
lru = "0.11.0"
postcard = { version = "1.0.6", default-features = false }
rand = "0.8"
scopeguard = "1.1"
serde = "1.0.151"
smol_str = "0.2.0"
streambed = { git = "https://github.com/streambed/streambed-rs.git", rev = "4b7e1561666bc1860339d27f5abf9be246ef5ad9" }
streambed-confidant = { git = "https://github.com/streambed/streambed-rs.git", rev = "4b7e1561666bc1860339d27f5abf9be246ef5ad9" }
streambed-logged = { git = "https://github.com/streambed/streambed-rs.git", rev = "4b7e1561666bc1860339d27f5abf9be246ef5ad9" }
Expand All @@ -30,3 +33,7 @@ tokio = "1.23.0"
tokio-stream = "0.1.14"
tokio-util = "0.7.4"
warp = "0.3"

[profile.bench-debug]
inherits = "release"
debug = true
1 change: 1 addition & 0 deletions akka-persistence-rs-commitlog/Cargo.toml
Expand Up @@ -9,6 +9,7 @@ async-trait = { workspace = true }
ciborium = { workspace = true, optional = true }
rand = { workspace = true }
serde = { workspace = true }
smol_str = { workspace = true }
streambed = { workspace = true }
streambed-logged = { workspace = true }
tokio-stream = { workspace = true }
Expand Down
26 changes: 15 additions & 11 deletions akka-persistence-rs-commitlog/src/lib.rs
Expand Up @@ -4,7 +4,7 @@ use akka_persistence_rs::{entity_manager::RecordAdapter, EntityId, Record};
use async_stream::stream;
use async_trait::async_trait;
use serde::{de::DeserializeOwned, Serialize};
use std::{io, marker::PhantomData, pin::Pin};
use std::{io, marker::PhantomData, pin::Pin, sync::Arc};
use streambed::{
commit_log::{CommitLog, ConsumerRecord, Key, ProducerRecord, Subscription, Topic, TopicRef},
secret_store::SecretStore,
Expand Down Expand Up @@ -34,7 +34,7 @@ where

/// Return a path to use for looking up secrets with respect to
/// an entity being encrypted/decrypted.
fn secret_path(&self, entity_id: &EntityId) -> String;
fn secret_path(&self, entity_id: &EntityId) -> Arc<str>;

#[cfg(feature = "cbor")]
async fn record(&self, mut record: ConsumerRecord) -> Option<Record<E>> {
Expand Down Expand Up @@ -227,12 +227,10 @@ where

#[cfg(test)]
mod tests {
use std::{env, fs, time::Duration};
use std::{env, fs, num::NonZeroUsize, time::Duration};

use super::*;
use akka_persistence_rs::{
entity::EventSourcedBehavior, entity_manager::EntityManager, RecordMetadata,
};
use akka_persistence_rs::{entity::EventSourcedBehavior, entity_manager, RecordMetadata};
use serde::Deserialize;
use streambed::{
commit_log::Header,
Expand Down Expand Up @@ -350,7 +348,7 @@ mod tests {
panic!("should not be called")
}

fn secret_path(&self, _entity_id: &EntityId) -> String {
fn secret_path(&self, _entity_id: &EntityId) -> Arc<str> {
panic!("should not be called")
}

Expand All @@ -359,7 +357,7 @@ mod tests {
.headers
.into_iter()
.find(|header| header.key == "entity-id")?;
let entity_id = String::from_utf8(value).ok()?;
let entity_id = EntityId::from(std::str::from_utf8(&value).ok()?);
let value = String::from_utf8(record.value).ok()?;
let event = MyEvent { value };
Some(Record {
Expand All @@ -378,7 +376,7 @@ mod tests {
) -> Option<(ProducerRecord, Record<MyEvent>)> {
let headers = vec![Header {
key: "entity-id".to_string(),
value: record.entity_id.clone().into_bytes(),
value: record.entity_id.as_bytes().into(),
}];
Some((
ProducerRecord {
Expand Down Expand Up @@ -415,7 +413,7 @@ mod tests {

// Scaffolding

let entity_id = "some-entity".to_string();
let entity_id = EntityId::from("some-entity");

// Produce a stream given no prior persistence. Should return an empty stream.

Expand Down Expand Up @@ -503,6 +501,12 @@ mod tests {

let (_, my_command_receiver) = mpsc::channel(10);

EntityManager::new(my_behavior, file_log_topic_adapter, my_command_receiver);
entity_manager::run(
my_behavior,
file_log_topic_adapter,
my_command_receiver,
NonZeroUsize::new(1).unwrap(),
)
.await;
}
}
6 changes: 6 additions & 0 deletions akka-persistence-rs/Cargo.toml
Expand Up @@ -6,6 +6,7 @@ edition = "2021"
[dependencies]
async-trait = { workspace = true }
lru = { workspace = true }
smol_str = { workspace = true }
tokio = { workspace = true, features = [
"sync",
"macros",
Expand All @@ -16,5 +17,10 @@ tokio = { workspace = true, features = [
tokio-stream = { workspace = true }

[dev-dependencies]
criterion = { workspace = true, features = ["async_tokio", "html_reports"] }
env_logger = { workspace = true }
test-log = { workspace = true }

[[bench]]
name = "benches"
harness = false
18 changes: 18 additions & 0 deletions akka-persistence-rs/benches/README.md
@@ -0,0 +1,18 @@
Benchmarking
===

To invoke a benchmark:

```
cd akka-persistence-rs
cargo bench
```

The above will compare with any previous benchmark run. Thus, you can checkout a commit, run the benchmark, then
re-run having applied any changes you have made.

To instrument on OS X (requires `cargo install cargo-instruments` and the Xcode dev tools):

```
cargo instruments --bench "benches" --profile "bench-debug" -t "time" -- --bench
```
112 changes: 112 additions & 0 deletions akka-persistence-rs/benches/benches.rs
@@ -0,0 +1,112 @@
use std::{io, num::NonZeroUsize, pin::Pin, sync::Arc};

use akka_persistence_rs::{
effect::{emit_event, Effect, EffectExt},
entity::{Context, EventSourcedBehavior},
entity_manager::{self, RecordAdapter},
EntityId, Message, Record,
};
use async_trait::async_trait;
use criterion::{criterion_group, criterion_main, Criterion};
use tokio::sync::{mpsc, Notify};
use tokio_stream::Stream;

const NUM_EVENTS: usize = 10_000;

#[derive(Default)]
struct State;

struct Command;

struct Event;

struct Behavior;

impl EventSourcedBehavior for Behavior {
type State = State;
type Command = Command;
type Event = Event;

fn for_command(
_context: &Context,
_state: &Self::State,
_command: Self::Command,
) -> Box<dyn Effect<Self>> {
emit_event(Event).boxed()
}

fn on_event(_context: &Context, _state: &mut Self::State, _event: &Self::Event) {}
}

struct Adapter {
event_count: usize,
events_processed: Arc<Notify>,
}

#[async_trait]
impl RecordAdapter<Event> for Adapter {
async fn produce_initial(
&mut self,
) -> io::Result<Pin<Box<dyn Stream<Item = Record<Event>> + Send + 'async_trait>>> {
Ok(Box::pin(tokio_stream::empty()))
}

async fn produce(
&mut self,
_entity_id: &EntityId,
) -> io::Result<Pin<Box<dyn Stream<Item = Record<Event>> + Send + 'async_trait>>> {
Ok(Box::pin(tokio_stream::empty()))
}

async fn process(&mut self, record: Record<Event>) -> io::Result<Record<Event>> {
self.event_count += 1;
if self.event_count == NUM_EVENTS {
self.events_processed.notify_one();
self.event_count = 0;
}
Ok(record)
}
}

fn criterion_benchmark(c: &mut Criterion) {
c.bench_function("produce events", move |b| {
let rt = tokio::runtime::Builder::new_current_thread()
.build()
.unwrap();

let events_processed = Arc::new(Notify::new());
let (sender, receiver) = mpsc::channel(10);
let _ = rt.spawn(entity_manager::run(
Behavior,
Adapter {
event_count: 0,
events_processed: events_processed.clone(),
},
receiver,
NonZeroUsize::new(1).unwrap(),
));

b.to_async(&rt).iter(|| {
let task_events_processed = events_processed.clone();
let task_sender = sender.clone();
async move {
tokio::spawn(async move {
for _ in 0..NUM_EVENTS {
let _ = task_sender
.send(Message::new("id-1".to_string(), Command))
.await;
}
task_events_processed.notified().await;
})
.await
}
})
});
}

criterion_group! {
name = benches;
config = Criterion::default().sample_size(10);
targets = criterion_benchmark
}
criterion_main!(benches);

0 comments on commit 12f4f48

Please sign in to comment.