Skip to content

Commit

Permalink
Use small strings for entities and ref count secret paths
Browse files Browse the repository at this point in the history
Speeds things up considerably
  • Loading branch information
huntc committed Aug 14, 2023
1 parent 149090f commit cfca6b0
Show file tree
Hide file tree
Showing 8 changed files with 19 additions and 13 deletions.
2 changes: 2 additions & 0 deletions Cargo.toml
Expand Up @@ -17,12 +17,14 @@ env_logger = "0.10.0"
git-version = "0.3.5"
hex = "0.4.3"
humantime = "2.1.0"
itoa = "1.0"
log = "0.4.17"
lru = "0.11.0"
postcard = { version = "1.0.6", default-features = false }
rand = "0.8"
scopeguard = "1.1"
serde = "1.0.151"
smol_str = "0.2.0"
streambed = { git = "https://github.com/streambed/streambed-rs.git", rev = "4b7e1561666bc1860339d27f5abf9be246ef5ad9" }
streambed-confidant = { git = "https://github.com/streambed/streambed-rs.git", rev = "4b7e1561666bc1860339d27f5abf9be246ef5ad9" }
streambed-logged = { git = "https://github.com/streambed/streambed-rs.git", rev = "4b7e1561666bc1860339d27f5abf9be246ef5ad9" }
Expand Down
1 change: 1 addition & 0 deletions akka-persistence-rs-commitlog/Cargo.toml
Expand Up @@ -9,6 +9,7 @@ async-trait = { workspace = true }
ciborium = { workspace = true, optional = true }
rand = { workspace = true }
serde = { workspace = true }
smol_str = { workspace = true }
streambed = { workspace = true }
streambed-logged = { workspace = true }
tokio-stream = { workspace = true }
Expand Down
12 changes: 6 additions & 6 deletions akka-persistence-rs-commitlog/src/lib.rs
Expand Up @@ -4,7 +4,7 @@ use akka_persistence_rs::{entity_manager::RecordAdapter, EntityId, Record};
use async_stream::stream;
use async_trait::async_trait;
use serde::{de::DeserializeOwned, Serialize};
use std::{io, marker::PhantomData, pin::Pin};
use std::{io, marker::PhantomData, pin::Pin, sync::Arc};
use streambed::{
commit_log::{CommitLog, ConsumerRecord, Key, ProducerRecord, Subscription, Topic, TopicRef},
secret_store::SecretStore,
Expand Down Expand Up @@ -34,7 +34,7 @@ where

/// Return a path to use for looking up secrets with respect to
/// an entity being encrypted/decrypted.
fn secret_path(&self, entity_id: &EntityId) -> String;
fn secret_path(&self, entity_id: &EntityId) -> Arc<str>;

#[cfg(feature = "cbor")]
async fn record(&self, mut record: ConsumerRecord) -> Option<Record<E>> {
Expand Down Expand Up @@ -348,7 +348,7 @@ mod tests {
panic!("should not be called")
}

fn secret_path(&self, _entity_id: &EntityId) -> String {
fn secret_path(&self, _entity_id: &EntityId) -> Arc<str> {
panic!("should not be called")
}

Expand All @@ -357,7 +357,7 @@ mod tests {
.headers
.into_iter()
.find(|header| header.key == "entity-id")?;
let entity_id = String::from_utf8(value).ok()?;
let entity_id = EntityId::from(std::str::from_utf8(&value).ok()?);
let value = String::from_utf8(record.value).ok()?;
let event = MyEvent { value };
Some(Record {
Expand All @@ -376,7 +376,7 @@ mod tests {
) -> Option<(ProducerRecord, Record<MyEvent>)> {
let headers = vec![Header {
key: "entity-id".to_string(),
value: record.entity_id.clone().into_bytes(),
value: record.entity_id.as_bytes().into(),
}];
Some((
ProducerRecord {
Expand Down Expand Up @@ -413,7 +413,7 @@ mod tests {

// Scaffolding

let entity_id = "some-entity".to_string();
let entity_id = EntityId::from("some-entity");

// Produce a stream given no prior persistence. Should return an empty stream.

Expand Down
1 change: 1 addition & 0 deletions akka-persistence-rs/Cargo.toml
Expand Up @@ -6,6 +6,7 @@ edition = "2021"
[dependencies]
async-trait = { workspace = true }
lru = { workspace = true }
smol_str = { workspace = true }
tokio = { workspace = true, features = [
"sync",
"macros",
Expand Down
2 changes: 1 addition & 1 deletion akka-persistence-rs/src/entity_manager.rs
Expand Up @@ -434,7 +434,7 @@ mod tests {
assert_eq!(
temp_sensor_events_captured.recv().await.unwrap(),
Record {
entity_id: "id-1".to_string(),
entity_id: EntityId::from("id-1"),
event: TempEvent::Deregistered,
metadata: crate::RecordMetadata {
deletion_event: true
Expand Down
2 changes: 1 addition & 1 deletion akka-persistence-rs/src/lib.rs
Expand Up @@ -5,7 +5,7 @@ pub mod entity;
pub mod entity_manager;

/// Uniquely identifies an entity, or entity instance.
pub type EntityId = String;
pub type EntityId = smol_str::SmolStr;

/// A message encapsulates a command that is addressed to a specific entity.
#[derive(Debug, PartialEq)]
Expand Down
1 change: 1 addition & 0 deletions examples/iot-service/Cargo.toml
Expand Up @@ -12,6 +12,7 @@ env_logger = { workspace = true }
git-version = { workspace = true }
hex = { workspace = true }
humantime = { workspace = true }
itoa = { workspace = true }
log = { workspace = true }
postcard = { workspace = true, default-features = false, features = [
"use-std",
Expand Down
11 changes: 6 additions & 5 deletions examples/iot-service/src/temperature.rs
@@ -1,4 +1,4 @@
use std::{collections::VecDeque, num::NonZeroUsize};
use std::{collections::VecDeque, num::NonZeroUsize, sync::Arc};

use akka_persistence_rs::{
effect::{emit_event, reply, unhandled, EffectExt},
Expand Down Expand Up @@ -91,7 +91,7 @@ impl EventSourcedBehavior for Behavior {
// keys in any way required.

struct RecordMarshaler {
events_key_secret_path: String,
events_key_secret_path: Arc<str>,
secret_store: FileSecretStore,
}

Expand All @@ -118,14 +118,15 @@ impl CommitLogRecordMarshaler<Event> for RecordMarshaler {

fn to_entity_id(record: &streambed::commit_log::ConsumerRecord) -> Option<EntityId> {
let entity_id = (record.key & EVENT_ID_BIT_MASK) as u32;
Some(entity_id.to_string())
let mut buffer = itoa::Buffer::new();
Some(EntityId::from(buffer.format(entity_id)))
}

fn secret_store(&self) -> &Self::SecretStore {
&self.secret_store
}

fn secret_path(&self, _entity_id: &EntityId) -> String {
fn secret_path(&self, _entity_id: &EntityId) -> Arc<str> {
self.events_key_secret_path.clone()
}
}
Expand Down Expand Up @@ -153,7 +154,7 @@ pub async fn task(
let file_log_topic_adapter = CommitLogTopicAdapter::new(
commit_log,
RecordMarshaler {
events_key_secret_path,
events_key_secret_path: Arc::from(events_key_secret_path),
secret_store,
},
"iot-service",
Expand Down

0 comments on commit cfca6b0

Please sign in to comment.