diff --git a/CHANGELOG.md b/CHANGELOG.md index 5f6c5e262..b1b8937a4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,11 +8,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - - `teloxide::dispatching::repls::CommandReplExt`, `teloxide::prelude::CommandReplExt` ([issue #740](https://github.com/teloxide/teloxide/issues/740)) +- The `rocksdb-storage` feature -- enables the RocksDB support ([PR #753](https://github.com/teloxide/teloxide/pull/753)) +- `teloxide::dispatching::repls::CommandReplExt`, `teloxide::prelude::CommandReplExt` ([issue #740](https://github.com/teloxide/teloxide/issues/740)) ### Deprecated - - `teloxide::dispatching::repls::{commands_repl, commands_repl_with_listener}`, `teloxide::utils::command::BotCommands::ty` (use `CommandReplExt` instead) +- `teloxide::dispatching::repls::{commands_repl, commands_repl_with_listener}`, `teloxide::utils::command::BotCommands::ty` (use `CommandReplExt` instead) ## 0.11.0 - 2022-10-07 diff --git a/Cargo.toml b/Cargo.toml index 65e5135ae..8eb1b3515 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,6 +19,7 @@ webhooks-axum = ["webhooks", "axum", "tower", "tower-http"] sqlite-storage = ["sqlx"] redis-storage = ["redis"] +rocksdb-storage = ["rocksdb"] cbor-serializer = ["serde_cbor"] bincode-serializer = ["bincode"] @@ -42,6 +43,7 @@ full = [ "webhooks-axum", "sqlite-storage", "redis-storage", + "rocksdb-storage", "cbor-serializer", "bincode-serializer", "macros", @@ -92,6 +94,9 @@ sqlx = { version = "0.6", optional = true, default-features = false, features = "sqlite", ] } redis = { version = "0.21", features = ["tokio-comp"], optional = true } +rocksdb = { version = "0.19", optional = true, default-features = false, features = [ + "lz4", +] } serde_cbor = { version = "0.11", optional = true } bincode = { version = "1.3", optional = true } axum = { version = "0.5.13", optional = true } diff --git a/README.md b/README.md index 17f5b9046..b1e528a13 100644 --- a/README.md +++ b/README.md @@ -31,10 +31,11 @@ - **Feature-rich.** You can use both long polling and webhooks, configure an underlying HTTPS client, set a custom URL of a Telegram API server, do graceful shutdown, and much more. - - **Simple dialogues.** Our dialogues subsystem is simple and easy-to-use, and, furthermore, is agnostic of how/where dialogues are stored. For example, you can just replace a one line to achieve [persistence]. Out-of-the-box storages include [Redis] and [Sqlite]. + - **Simple dialogues.** Our dialogues subsystem is simple and easy-to-use, and, furthermore, is agnostic of how/where dialogues are stored. For example, you can just replace a one line to achieve [persistence]. Out-of-the-box storages include [Redis], [RocksDB] and [Sqlite]. [persistence]: https://en.wikipedia.org/wiki/Persistence_(computer_science) [Redis]: https://redis.io/ +[RocksDB]: https://rocksdb.org/ [Sqlite]: https://www.sqlite.org - **Strongly typed commands.** Define bot commands as an `enum` and teloxide will parse them automatically — just like JSON structures in [`serde-json`] and command-line arguments in [`structopt`]. diff --git a/src/dispatching/dialogue/storage.rs b/src/dispatching/dialogue/storage.rs index b78cd72e0..e8d2c1587 100644 --- a/src/dispatching/dialogue/storage.rs +++ b/src/dispatching/dialogue/storage.rs @@ -9,6 +9,9 @@ mod redis_storage; #[cfg(feature = "sqlite-storage")] mod sqlite_storage; +#[cfg(feature = "rocksdb-storage")] +mod rocksdb_storage; + use futures::future::BoxFuture; use teloxide_core::types::ChatId; @@ -25,6 +28,9 @@ use std::sync::Arc; #[cfg(feature = "sqlite-storage")] pub use sqlite_storage::{SqliteStorage, SqliteStorageError}; +#[cfg(feature = "rocksdb-storage")] +pub use rocksdb_storage::{RocksDbStorage, RocksDbStorageError}; + /// A storage with an erased error type. pub type ErasedStorage = dyn Storage> + Send + Sync; @@ -41,10 +47,12 @@ pub type ErasedStorage = /// /// - [`InMemStorage`] -- a storage based on [`std::collections::HashMap`]. /// - [`RedisStorage`] -- a Redis-based storage. +/// - [`RocksDbStorage`] -- a RocksDB-based persistent storage. /// - [`SqliteStorage`] -- an SQLite-based persistent storage. /// /// [`InMemStorage`]: crate::dispatching::dialogue::InMemStorage /// [`RedisStorage`]: crate::dispatching::dialogue::RedisStorage +/// [`RocksDbStorage`]: crate::dispatching::dialogue::RocksDbStorage /// [`SqliteStorage`]: crate::dispatching::dialogue::SqliteStorage pub trait Storage { type Error; diff --git a/src/dispatching/dialogue/storage/rocksdb_storage.rs b/src/dispatching/dialogue/storage/rocksdb_storage.rs new file mode 100644 index 000000000..d01d8fd9b --- /dev/null +++ b/src/dispatching/dialogue/storage/rocksdb_storage.rs @@ -0,0 +1,113 @@ +use super::{serializer::Serializer, Storage}; +use futures::future::BoxFuture; +use rocksdb::{DBCompressionType, DBWithThreadMode, MultiThreaded}; +use serde::{de::DeserializeOwned, Serialize}; +use std::{ + convert::Infallible, + fmt::{Debug, Display}, + str, + sync::Arc, +}; +use teloxide_core::types::ChatId; +use thiserror::Error; + +/// A persistent dialogue storage based on [RocksDb](http://rocksdb.org/). +pub struct RocksDbStorage { + db: DBWithThreadMode, + serializer: S, +} + +/// An error returned from [`RocksDbStorage`]. +#[derive(Debug, Error)] +pub enum RocksDbStorageError +where + SE: Debug + Display, +{ + #[error("dialogue serialization error: {0}")] + SerdeError(SE), + + #[error("RocksDb error: {0}")] + RocksDbError(#[from] rocksdb::Error), + + /// Returned from [`RocksDbStorage::remove_dialogue`]. + #[error("row not found")] + DialogueNotFound, +} + +impl RocksDbStorage { + pub async fn open( + path: &str, + serializer: S, + options: Option, + ) -> Result, RocksDbStorageError> { + let options = match options { + Some(opts) => opts, + None => { + let mut opts = rocksdb::Options::default(); + opts.set_compression_type(DBCompressionType::Lz4); + opts.create_if_missing(true); + opts + } + }; + + let db = DBWithThreadMode::::open(&options, path)?; + Ok(Arc::new(Self { db, serializer })) + } +} + +impl Storage for RocksDbStorage +where + S: Send + Sync + Serializer + 'static, + D: Send + Serialize + DeserializeOwned + 'static, + >::Error: Debug + Display, +{ + type Error = RocksDbStorageError<>::Error>; + + /// Returns [`RocksDbStorageError::DialogueNotFound`] if a dialogue does not + /// exist. + fn remove_dialogue( + self: Arc, + ChatId(chat_id): ChatId, + ) -> BoxFuture<'static, Result<(), Self::Error>> { + Box::pin(async move { + let key = chat_id.to_le_bytes(); + + if self.db.get(&key)?.is_none() { + return Err(RocksDbStorageError::DialogueNotFound); + } + + self.db.delete(&key).unwrap(); + + Ok(()) + }) + } + + fn update_dialogue( + self: Arc, + ChatId(chat_id): ChatId, + dialogue: D, + ) -> BoxFuture<'static, Result<(), Self::Error>> { + Box::pin(async move { + let d = + self.serializer.serialize(&dialogue).map_err(RocksDbStorageError::SerdeError)?; + + let key = chat_id.to_le_bytes(); + self.db.put(&key, &d)?; + + Ok(()) + }) + } + + fn get_dialogue( + self: Arc, + ChatId(chat_id): ChatId, + ) -> BoxFuture<'static, Result, Self::Error>> { + Box::pin(async move { + let key = chat_id.to_le_bytes(); + self.db + .get(&key)? + .map(|d| self.serializer.deserialize(&d).map_err(RocksDbStorageError::SerdeError)) + .transpose() + }) + } +} diff --git a/tests/rocksdb.rs b/tests/rocksdb.rs new file mode 100644 index 000000000..7366a2629 --- /dev/null +++ b/tests/rocksdb.rs @@ -0,0 +1,95 @@ +use std::{ + fmt::{Debug, Display}, + fs, + sync::Arc, +}; +use teloxide::{ + dispatching::dialogue::{RocksDbStorage, RocksDbStorageError, Serializer, Storage}, + types::ChatId, +}; + +#[tokio::test(flavor = "multi_thread")] +async fn test_rocksdb_json() { + fs::remove_dir_all("./test_db1").ok(); + fs::create_dir("./test_db1").unwrap(); + let storage = RocksDbStorage::open( + "./test_db1/test_db1.rocksdb", + teloxide::dispatching::dialogue::serializer::Json, + None, + ) + .await + .unwrap(); + test_rocksdb(storage).await; + fs::remove_dir_all("./test_db1").unwrap(); +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_rocksdb_bincode() { + fs::remove_dir_all("./test_db2").ok(); + fs::create_dir("./test_db2").unwrap(); + let storage = RocksDbStorage::open( + "./test_db2/test_db2.rocksdb", + teloxide::dispatching::dialogue::serializer::Bincode, + None, + ) + .await + .unwrap(); + test_rocksdb(storage).await; + fs::remove_dir_all("./test_db2").unwrap(); +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_rocksdb_cbor() { + fs::remove_dir_all("./test_db3").ok(); + fs::create_dir("./test_db3").unwrap(); + let storage = RocksDbStorage::open( + "./test_db3/test_db3.rocksdb", + teloxide::dispatching::dialogue::serializer::Cbor, + None, + ) + .await + .unwrap(); + test_rocksdb(storage).await; + fs::remove_dir_all("./test_db3").unwrap(); +} + +type Dialogue = String; + +macro_rules! test_dialogues { + ($storage:expr, $_0:expr, $_1:expr, $_2:expr) => { + assert_eq!(Arc::clone(&$storage).get_dialogue(ChatId(1)).await.unwrap(), $_0); + assert_eq!(Arc::clone(&$storage).get_dialogue(ChatId(11)).await.unwrap(), $_1); + assert_eq!(Arc::clone(&$storage).get_dialogue(ChatId(256)).await.unwrap(), $_2); + }; +} + +async fn test_rocksdb(storage: Arc>) +where + S: Send + Sync + Serializer + 'static, + >::Error: Debug + Display, +{ + test_dialogues!(storage, None, None, None); + + Arc::clone(&storage).update_dialogue(ChatId(1), "ABC".to_owned()).await.unwrap(); + Arc::clone(&storage).update_dialogue(ChatId(11), "DEF".to_owned()).await.unwrap(); + Arc::clone(&storage).update_dialogue(ChatId(256), "GHI".to_owned()).await.unwrap(); + + test_dialogues!( + storage, + Some("ABC".to_owned()), + Some("DEF".to_owned()), + Some("GHI".to_owned()) + ); + + Arc::clone(&storage).remove_dialogue(ChatId(1)).await.unwrap(); + Arc::clone(&storage).remove_dialogue(ChatId(11)).await.unwrap(); + Arc::clone(&storage).remove_dialogue(ChatId(256)).await.unwrap(); + + test_dialogues!(storage, None, None, None); + + // Check that a try to remove a non-existing dialogue results in an error. + assert!(matches!( + Arc::clone(&storage).remove_dialogue(ChatId(1)).await.unwrap_err(), + RocksDbStorageError::DialogueNotFound + )); +}