Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #753 from xamgore/rocksdb-storage
Extend dialogue storages with RocksDB
- Loading branch information
Showing
6 changed files
with
226 additions
and
3 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,113 @@ | ||
use super::{serializer::Serializer, Storage}; | ||
use futures::future::BoxFuture; | ||
use rocksdb::{DBCompressionType, DBWithThreadMode, MultiThreaded}; | ||
use serde::{de::DeserializeOwned, Serialize}; | ||
use std::{ | ||
convert::Infallible, | ||
fmt::{Debug, Display}, | ||
str, | ||
sync::Arc, | ||
}; | ||
use teloxide_core::types::ChatId; | ||
use thiserror::Error; | ||
|
||
/// A persistent dialogue storage based on [RocksDb](http://rocksdb.org/). | ||
pub struct RocksDbStorage<S> { | ||
db: DBWithThreadMode<MultiThreaded>, | ||
serializer: S, | ||
} | ||
|
||
/// An error returned from [`RocksDbStorage`]. | ||
#[derive(Debug, Error)] | ||
pub enum RocksDbStorageError<SE> | ||
where | ||
SE: Debug + Display, | ||
{ | ||
#[error("dialogue serialization error: {0}")] | ||
SerdeError(SE), | ||
|
||
#[error("RocksDb error: {0}")] | ||
RocksDbError(#[from] rocksdb::Error), | ||
|
||
/// Returned from [`RocksDbStorage::remove_dialogue`]. | ||
#[error("row not found")] | ||
DialogueNotFound, | ||
} | ||
|
||
impl<S> RocksDbStorage<S> { | ||
pub async fn open( | ||
path: &str, | ||
serializer: S, | ||
options: Option<rocksdb::Options>, | ||
) -> Result<Arc<Self>, RocksDbStorageError<Infallible>> { | ||
let options = match options { | ||
Some(opts) => opts, | ||
None => { | ||
let mut opts = rocksdb::Options::default(); | ||
opts.set_compression_type(DBCompressionType::Lz4); | ||
opts.create_if_missing(true); | ||
opts | ||
} | ||
}; | ||
|
||
let db = DBWithThreadMode::<MultiThreaded>::open(&options, path)?; | ||
Ok(Arc::new(Self { db, serializer })) | ||
} | ||
} | ||
|
||
impl<S, D> Storage<D> for RocksDbStorage<S> | ||
where | ||
S: Send + Sync + Serializer<D> + 'static, | ||
D: Send + Serialize + DeserializeOwned + 'static, | ||
<S as Serializer<D>>::Error: Debug + Display, | ||
{ | ||
type Error = RocksDbStorageError<<S as Serializer<D>>::Error>; | ||
|
||
/// Returns [`RocksDbStorageError::DialogueNotFound`] if a dialogue does not | ||
/// exist. | ||
fn remove_dialogue( | ||
self: Arc<Self>, | ||
ChatId(chat_id): ChatId, | ||
) -> BoxFuture<'static, Result<(), Self::Error>> { | ||
Box::pin(async move { | ||
let key = chat_id.to_le_bytes(); | ||
|
||
if self.db.get(&key)?.is_none() { | ||
return Err(RocksDbStorageError::DialogueNotFound); | ||
} | ||
|
||
self.db.delete(&key).unwrap(); | ||
|
||
Ok(()) | ||
}) | ||
} | ||
|
||
fn update_dialogue( | ||
self: Arc<Self>, | ||
ChatId(chat_id): ChatId, | ||
dialogue: D, | ||
) -> BoxFuture<'static, Result<(), Self::Error>> { | ||
Box::pin(async move { | ||
let d = | ||
self.serializer.serialize(&dialogue).map_err(RocksDbStorageError::SerdeError)?; | ||
|
||
let key = chat_id.to_le_bytes(); | ||
self.db.put(&key, &d)?; | ||
|
||
Ok(()) | ||
}) | ||
} | ||
|
||
fn get_dialogue( | ||
self: Arc<Self>, | ||
ChatId(chat_id): ChatId, | ||
) -> BoxFuture<'static, Result<Option<D>, Self::Error>> { | ||
Box::pin(async move { | ||
let key = chat_id.to_le_bytes(); | ||
self.db | ||
.get(&key)? | ||
.map(|d| self.serializer.deserialize(&d).map_err(RocksDbStorageError::SerdeError)) | ||
.transpose() | ||
}) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,95 @@ | ||
use std::{ | ||
fmt::{Debug, Display}, | ||
fs, | ||
sync::Arc, | ||
}; | ||
use teloxide::{ | ||
dispatching::dialogue::{RocksDbStorage, RocksDbStorageError, Serializer, Storage}, | ||
types::ChatId, | ||
}; | ||
|
||
#[tokio::test(flavor = "multi_thread")] | ||
async fn test_rocksdb_json() { | ||
fs::remove_dir_all("./test_db1").ok(); | ||
fs::create_dir("./test_db1").unwrap(); | ||
let storage = RocksDbStorage::open( | ||
"./test_db1/test_db1.rocksdb", | ||
teloxide::dispatching::dialogue::serializer::Json, | ||
None, | ||
) | ||
.await | ||
.unwrap(); | ||
test_rocksdb(storage).await; | ||
fs::remove_dir_all("./test_db1").unwrap(); | ||
} | ||
|
||
#[tokio::test(flavor = "multi_thread")] | ||
async fn test_rocksdb_bincode() { | ||
fs::remove_dir_all("./test_db2").ok(); | ||
fs::create_dir("./test_db2").unwrap(); | ||
let storage = RocksDbStorage::open( | ||
"./test_db2/test_db2.rocksdb", | ||
teloxide::dispatching::dialogue::serializer::Bincode, | ||
None, | ||
) | ||
.await | ||
.unwrap(); | ||
test_rocksdb(storage).await; | ||
fs::remove_dir_all("./test_db2").unwrap(); | ||
} | ||
|
||
#[tokio::test(flavor = "multi_thread")] | ||
async fn test_rocksdb_cbor() { | ||
fs::remove_dir_all("./test_db3").ok(); | ||
fs::create_dir("./test_db3").unwrap(); | ||
let storage = RocksDbStorage::open( | ||
"./test_db3/test_db3.rocksdb", | ||
teloxide::dispatching::dialogue::serializer::Cbor, | ||
None, | ||
) | ||
.await | ||
.unwrap(); | ||
test_rocksdb(storage).await; | ||
fs::remove_dir_all("./test_db3").unwrap(); | ||
} | ||
|
||
type Dialogue = String; | ||
|
||
macro_rules! test_dialogues { | ||
($storage:expr, $_0:expr, $_1:expr, $_2:expr) => { | ||
assert_eq!(Arc::clone(&$storage).get_dialogue(ChatId(1)).await.unwrap(), $_0); | ||
assert_eq!(Arc::clone(&$storage).get_dialogue(ChatId(11)).await.unwrap(), $_1); | ||
assert_eq!(Arc::clone(&$storage).get_dialogue(ChatId(256)).await.unwrap(), $_2); | ||
}; | ||
} | ||
|
||
async fn test_rocksdb<S>(storage: Arc<RocksDbStorage<S>>) | ||
where | ||
S: Send + Sync + Serializer<Dialogue> + 'static, | ||
<S as Serializer<Dialogue>>::Error: Debug + Display, | ||
{ | ||
test_dialogues!(storage, None, None, None); | ||
|
||
Arc::clone(&storage).update_dialogue(ChatId(1), "ABC".to_owned()).await.unwrap(); | ||
Arc::clone(&storage).update_dialogue(ChatId(11), "DEF".to_owned()).await.unwrap(); | ||
Arc::clone(&storage).update_dialogue(ChatId(256), "GHI".to_owned()).await.unwrap(); | ||
|
||
test_dialogues!( | ||
storage, | ||
Some("ABC".to_owned()), | ||
Some("DEF".to_owned()), | ||
Some("GHI".to_owned()) | ||
); | ||
|
||
Arc::clone(&storage).remove_dialogue(ChatId(1)).await.unwrap(); | ||
Arc::clone(&storage).remove_dialogue(ChatId(11)).await.unwrap(); | ||
Arc::clone(&storage).remove_dialogue(ChatId(256)).await.unwrap(); | ||
|
||
test_dialogues!(storage, None, None, None); | ||
|
||
// Check that a try to remove a non-existing dialogue results in an error. | ||
assert!(matches!( | ||
Arc::clone(&storage).remove_dialogue(ChatId(1)).await.unwrap_err(), | ||
RocksDbStorageError::DialogueNotFound | ||
)); | ||
} |