Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimisations and benchmarking #6

Merged
merged 3 commits into from Aug 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 7 additions & 0 deletions Cargo.toml
Expand Up @@ -12,16 +12,19 @@ async-trait = "0.1.72"
chrono = "0.4.23"
ciborium = "0.2"
clap = "4.0.29"
criterion = "0.4.0"
env_logger = "0.10.0"
git-version = "0.3.5"
hex = "0.4.3"
humantime = "2.1.0"
itoa = "1.0"
log = "0.4.17"
lru = "0.11.0"
postcard = { version = "1.0.6", default-features = false }
rand = "0.8"
scopeguard = "1.1"
serde = "1.0.151"
smol_str = "0.2.0"
streambed = { git = "https://github.com/streambed/streambed-rs.git", rev = "4b7e1561666bc1860339d27f5abf9be246ef5ad9" }
streambed-confidant = { git = "https://github.com/streambed/streambed-rs.git", rev = "4b7e1561666bc1860339d27f5abf9be246ef5ad9" }
streambed-logged = { git = "https://github.com/streambed/streambed-rs.git", rev = "4b7e1561666bc1860339d27f5abf9be246ef5ad9" }
Expand All @@ -30,3 +33,7 @@ tokio = "1.23.0"
tokio-stream = "0.1.14"
tokio-util = "0.7.4"
warp = "0.3"

[profile.bench-debug]
inherits = "release"
debug = true
1 change: 1 addition & 0 deletions akka-persistence-rs-commitlog/Cargo.toml
Expand Up @@ -9,6 +9,7 @@ async-trait = { workspace = true }
ciborium = { workspace = true, optional = true }
rand = { workspace = true }
serde = { workspace = true }
smol_str = { workspace = true }
streambed = { workspace = true }
streambed-logged = { workspace = true }
tokio-stream = { workspace = true }
Expand Down
26 changes: 15 additions & 11 deletions akka-persistence-rs-commitlog/src/lib.rs
Expand Up @@ -4,7 +4,7 @@ use akka_persistence_rs::{entity_manager::RecordAdapter, EntityId, Record};
use async_stream::stream;
use async_trait::async_trait;
use serde::{de::DeserializeOwned, Serialize};
use std::{io, marker::PhantomData, pin::Pin};
use std::{io, marker::PhantomData, pin::Pin, sync::Arc};
use streambed::{
commit_log::{CommitLog, ConsumerRecord, Key, ProducerRecord, Subscription, Topic, TopicRef},
secret_store::SecretStore,
Expand Down Expand Up @@ -34,7 +34,7 @@ where

/// Return a path to use for looking up secrets with respect to
/// an entity being encrypted/decrypted.
fn secret_path(&self, entity_id: &EntityId) -> String;
fn secret_path(&self, entity_id: &EntityId) -> Arc<str>;

#[cfg(feature = "cbor")]
async fn record(&self, mut record: ConsumerRecord) -> Option<Record<E>> {
Expand Down Expand Up @@ -227,12 +227,10 @@ where

#[cfg(test)]
mod tests {
use std::{env, fs, time::Duration};
use std::{env, fs, num::NonZeroUsize, time::Duration};

use super::*;
use akka_persistence_rs::{
entity::EventSourcedBehavior, entity_manager::EntityManager, RecordMetadata,
};
use akka_persistence_rs::{entity::EventSourcedBehavior, entity_manager, RecordMetadata};
use serde::Deserialize;
use streambed::{
commit_log::Header,
Expand Down Expand Up @@ -350,7 +348,7 @@ mod tests {
panic!("should not be called")
}

fn secret_path(&self, _entity_id: &EntityId) -> String {
fn secret_path(&self, _entity_id: &EntityId) -> Arc<str> {
panic!("should not be called")
}

Expand All @@ -359,7 +357,7 @@ mod tests {
.headers
.into_iter()
.find(|header| header.key == "entity-id")?;
let entity_id = String::from_utf8(value).ok()?;
let entity_id = EntityId::from(std::str::from_utf8(&value).ok()?);
let value = String::from_utf8(record.value).ok()?;
let event = MyEvent { value };
Some(Record {
Expand All @@ -378,7 +376,7 @@ mod tests {
) -> Option<(ProducerRecord, Record<MyEvent>)> {
let headers = vec![Header {
key: "entity-id".to_string(),
value: record.entity_id.clone().into_bytes(),
value: record.entity_id.as_bytes().into(),
}];
Some((
ProducerRecord {
Expand Down Expand Up @@ -415,7 +413,7 @@ mod tests {

// Scaffolding

let entity_id = "some-entity".to_string();
let entity_id = EntityId::from("some-entity");

// Produce a stream given no prior persistence. Should return an empty stream.

Expand Down Expand Up @@ -503,6 +501,12 @@ mod tests {

let (_, my_command_receiver) = mpsc::channel(10);

EntityManager::new(my_behavior, file_log_topic_adapter, my_command_receiver);
entity_manager::run(
my_behavior,
file_log_topic_adapter,
my_command_receiver,
NonZeroUsize::new(1).unwrap(),
)
.await;
}
}
6 changes: 6 additions & 0 deletions akka-persistence-rs/Cargo.toml
Expand Up @@ -6,6 +6,7 @@ edition = "2021"
[dependencies]
async-trait = { workspace = true }
lru = { workspace = true }
smol_str = { workspace = true }
tokio = { workspace = true, features = [
"sync",
"macros",
Expand All @@ -16,5 +17,10 @@ tokio = { workspace = true, features = [
tokio-stream = { workspace = true }

[dev-dependencies]
criterion = { workspace = true, features = ["async_tokio", "html_reports"] }
env_logger = { workspace = true }
test-log = { workspace = true }

[[bench]]
name = "benches"
harness = false
18 changes: 18 additions & 0 deletions akka-persistence-rs/benches/README.md
@@ -0,0 +1,18 @@
Benchmarking
===

To invoke a benchmark:

```
cd akka-persistence-rs
cargo bench
```

The above will compare with any previous benchmark run. Thus, you can checkout a commit, run the benchmark, then
re-run having applied any changes you have made.

To instrument on OS X (requires `cargo install cargo-instruments` and the Xcode dev tools):

```
cargo instruments --bench "benches" --profile "bench-debug" -t "time" -- --bench
```
112 changes: 112 additions & 0 deletions akka-persistence-rs/benches/benches.rs
@@ -0,0 +1,112 @@
use std::{io, num::NonZeroUsize, pin::Pin, sync::Arc};

use akka_persistence_rs::{
effect::{emit_event, Effect, EffectExt},
entity::{Context, EventSourcedBehavior},
entity_manager::{self, RecordAdapter},
EntityId, Message, Record,
};
use async_trait::async_trait;
use criterion::{criterion_group, criterion_main, Criterion};
use tokio::sync::{mpsc, Notify};
use tokio_stream::Stream;

const NUM_EVENTS: usize = 10_000;

#[derive(Default)]
struct State;

struct Command;

struct Event;

struct Behavior;

impl EventSourcedBehavior for Behavior {
type State = State;
type Command = Command;
type Event = Event;

fn for_command(
_context: &Context,
_state: &Self::State,
_command: Self::Command,
) -> Box<dyn Effect<Self>> {
emit_event(Event).boxed()
}

fn on_event(_context: &Context, _state: &mut Self::State, _event: &Self::Event) {}
}

struct Adapter {
event_count: usize,
events_processed: Arc<Notify>,
}

#[async_trait]
impl RecordAdapter<Event> for Adapter {
async fn produce_initial(
&mut self,
) -> io::Result<Pin<Box<dyn Stream<Item = Record<Event>> + Send + 'async_trait>>> {
Ok(Box::pin(tokio_stream::empty()))
}

async fn produce(
&mut self,
_entity_id: &EntityId,
) -> io::Result<Pin<Box<dyn Stream<Item = Record<Event>> + Send + 'async_trait>>> {
Ok(Box::pin(tokio_stream::empty()))
}

async fn process(&mut self, record: Record<Event>) -> io::Result<Record<Event>> {
self.event_count += 1;
if self.event_count == NUM_EVENTS {
self.events_processed.notify_one();
self.event_count = 0;
}
Ok(record)
}
}

fn criterion_benchmark(c: &mut Criterion) {
c.bench_function("produce events", move |b| {
let rt = tokio::runtime::Builder::new_current_thread()
.build()
.unwrap();

let events_processed = Arc::new(Notify::new());
let (sender, receiver) = mpsc::channel(10);
let _ = rt.spawn(entity_manager::run(
Behavior,
Adapter {
event_count: 0,
events_processed: events_processed.clone(),
},
receiver,
NonZeroUsize::new(1).unwrap(),
));

b.to_async(&rt).iter(|| {
let task_events_processed = events_processed.clone();
let task_sender = sender.clone();
async move {
tokio::spawn(async move {
for _ in 0..NUM_EVENTS {
let _ = task_sender
.send(Message::new("id-1".to_string(), Command))
.await;
}
task_events_processed.notified().await;
})
.await
}
})
});
}

criterion_group! {
name = benches;
config = Criterion::default().sample_size(10);
targets = criterion_benchmark
}
criterion_main!(benches);