Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extend dialogue storages with RocksDB #753

Merged
merged 3 commits into from Oct 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 3 additions & 2 deletions CHANGELOG.md
Expand Up @@ -8,11 +8,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

- `teloxide::dispatching::repls::CommandReplExt`, `teloxide::prelude::CommandReplExt` ([issue #740](https://github.com/teloxide/teloxide/issues/740))
- The `rocksdb-storage` feature -- enables the RocksDB support ([PR #753](https://github.com/teloxide/teloxide/pull/753))
- `teloxide::dispatching::repls::CommandReplExt`, `teloxide::prelude::CommandReplExt` ([issue #740](https://github.com/teloxide/teloxide/issues/740))

### Deprecated

- `teloxide::dispatching::repls::{commands_repl, commands_repl_with_listener}`, `teloxide::utils::command::BotCommands::ty` (use `CommandReplExt` instead)
- `teloxide::dispatching::repls::{commands_repl, commands_repl_with_listener}`, `teloxide::utils::command::BotCommands::ty` (use `CommandReplExt` instead)

## 0.11.0 - 2022-10-07

Expand Down
5 changes: 5 additions & 0 deletions Cargo.toml
Expand Up @@ -19,6 +19,7 @@ webhooks-axum = ["webhooks", "axum", "tower", "tower-http"]

sqlite-storage = ["sqlx"]
redis-storage = ["redis"]
rocksdb-storage = ["rocksdb"]
cbor-serializer = ["serde_cbor"]
bincode-serializer = ["bincode"]

Expand All @@ -42,6 +43,7 @@ full = [
"webhooks-axum",
"sqlite-storage",
"redis-storage",
"rocksdb-storage",
"cbor-serializer",
"bincode-serializer",
"macros",
Expand Down Expand Up @@ -92,6 +94,9 @@ sqlx = { version = "0.6", optional = true, default-features = false, features =
"sqlite",
] }
redis = { version = "0.21", features = ["tokio-comp"], optional = true }
rocksdb = { version = "0.19", optional = true, default-features = false, features = [
"lz4",
] }
serde_cbor = { version = "0.11", optional = true }
bincode = { version = "1.3", optional = true }
axum = { version = "0.5.13", optional = true }
Expand Down
3 changes: 2 additions & 1 deletion README.md
Expand Up @@ -31,10 +31,11 @@

- **Feature-rich.** You can use both long polling and webhooks, configure an underlying HTTPS client, set a custom URL of a Telegram API server, do graceful shutdown, and much more.

- **Simple dialogues.** Our dialogues subsystem is simple and easy-to-use, and, furthermore, is agnostic of how/where dialogues are stored. For example, you can just replace a one line to achieve [persistence]. Out-of-the-box storages include [Redis] and [Sqlite].
- **Simple dialogues.** Our dialogues subsystem is simple and easy-to-use, and, furthermore, is agnostic of how/where dialogues are stored. For example, you can just replace a one line to achieve [persistence]. Out-of-the-box storages include [Redis], [RocksDB] and [Sqlite].

[persistence]: https://en.wikipedia.org/wiki/Persistence_(computer_science)
[Redis]: https://redis.io/
[RocksDB]: https://rocksdb.org/
[Sqlite]: https://www.sqlite.org

- **Strongly typed commands.** Define bot commands as an `enum` and teloxide will parse them automatically — just like JSON structures in [`serde-json`] and command-line arguments in [`structopt`].
Expand Down
8 changes: 8 additions & 0 deletions src/dispatching/dialogue/storage.rs
Expand Up @@ -9,6 +9,9 @@ mod redis_storage;
#[cfg(feature = "sqlite-storage")]
mod sqlite_storage;

#[cfg(feature = "rocksdb-storage")]
mod rocksdb_storage;

use futures::future::BoxFuture;
use teloxide_core::types::ChatId;

Expand All @@ -25,6 +28,9 @@ use std::sync::Arc;
#[cfg(feature = "sqlite-storage")]
pub use sqlite_storage::{SqliteStorage, SqliteStorageError};

#[cfg(feature = "rocksdb-storage")]
pub use rocksdb_storage::{RocksDbStorage, RocksDbStorageError};

/// A storage with an erased error type.
pub type ErasedStorage<D> =
dyn Storage<D, Error = Box<dyn std::error::Error + Send + Sync>> + Send + Sync;
Expand All @@ -41,10 +47,12 @@ pub type ErasedStorage<D> =
///
/// - [`InMemStorage`] -- a storage based on [`std::collections::HashMap`].
/// - [`RedisStorage`] -- a Redis-based storage.
/// - [`RocksDbStorage`] -- a RocksDB-based persistent storage.
/// - [`SqliteStorage`] -- an SQLite-based persistent storage.
///
/// [`InMemStorage`]: crate::dispatching::dialogue::InMemStorage
/// [`RedisStorage`]: crate::dispatching::dialogue::RedisStorage
/// [`RocksDbStorage`]: crate::dispatching::dialogue::RocksDbStorage
/// [`SqliteStorage`]: crate::dispatching::dialogue::SqliteStorage
pub trait Storage<D> {
type Error;
Expand Down
113 changes: 113 additions & 0 deletions src/dispatching/dialogue/storage/rocksdb_storage.rs
@@ -0,0 +1,113 @@
use super::{serializer::Serializer, Storage};
use futures::future::BoxFuture;
use rocksdb::{DBCompressionType, DBWithThreadMode, MultiThreaded};
use serde::{de::DeserializeOwned, Serialize};
use std::{
convert::Infallible,
fmt::{Debug, Display},
str,
sync::Arc,
};
use teloxide_core::types::ChatId;
use thiserror::Error;

/// A persistent dialogue storage based on [RocksDb](http://rocksdb.org/).
pub struct RocksDbStorage<S> {
db: DBWithThreadMode<MultiThreaded>,
serializer: S,
}

/// An error returned from [`RocksDbStorage`].
#[derive(Debug, Error)]
pub enum RocksDbStorageError<SE>
where
SE: Debug + Display,
{
#[error("dialogue serialization error: {0}")]
SerdeError(SE),

#[error("RocksDb error: {0}")]
RocksDbError(#[from] rocksdb::Error),

/// Returned from [`RocksDbStorage::remove_dialogue`].
#[error("row not found")]
DialogueNotFound,
}

impl<S> RocksDbStorage<S> {
pub async fn open(
path: &str,
serializer: S,
options: Option<rocksdb::Options>,
) -> Result<Arc<Self>, RocksDbStorageError<Infallible>> {
let options = match options {
Some(opts) => opts,
None => {
let mut opts = rocksdb::Options::default();
opts.set_compression_type(DBCompressionType::Lz4);
opts.create_if_missing(true);
opts
}
};

let db = DBWithThreadMode::<MultiThreaded>::open(&options, path)?;
Ok(Arc::new(Self { db, serializer }))
}
}

impl<S, D> Storage<D> for RocksDbStorage<S>
where
S: Send + Sync + Serializer<D> + 'static,
D: Send + Serialize + DeserializeOwned + 'static,
<S as Serializer<D>>::Error: Debug + Display,
{
type Error = RocksDbStorageError<<S as Serializer<D>>::Error>;

/// Returns [`RocksDbStorageError::DialogueNotFound`] if a dialogue does not
/// exist.
fn remove_dialogue(
self: Arc<Self>,
ChatId(chat_id): ChatId,
) -> BoxFuture<'static, Result<(), Self::Error>> {
Box::pin(async move {
let key = chat_id.to_le_bytes();

if self.db.get(&key)?.is_none() {
return Err(RocksDbStorageError::DialogueNotFound);
}

self.db.delete(&key).unwrap();

Ok(())
})
}

fn update_dialogue(
self: Arc<Self>,
ChatId(chat_id): ChatId,
dialogue: D,
) -> BoxFuture<'static, Result<(), Self::Error>> {
Box::pin(async move {
let d =
self.serializer.serialize(&dialogue).map_err(RocksDbStorageError::SerdeError)?;

let key = chat_id.to_le_bytes();
self.db.put(&key, &d)?;

Ok(())
})
}

fn get_dialogue(
self: Arc<Self>,
ChatId(chat_id): ChatId,
) -> BoxFuture<'static, Result<Option<D>, Self::Error>> {
Box::pin(async move {
let key = chat_id.to_le_bytes();
self.db
.get(&key)?
.map(|d| self.serializer.deserialize(&d).map_err(RocksDbStorageError::SerdeError))
.transpose()
})
}
}
95 changes: 95 additions & 0 deletions tests/rocksdb.rs
@@ -0,0 +1,95 @@
use std::{
fmt::{Debug, Display},
fs,
sync::Arc,
};
use teloxide::{
dispatching::dialogue::{RocksDbStorage, RocksDbStorageError, Serializer, Storage},
types::ChatId,
};

#[tokio::test(flavor = "multi_thread")]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a particular reason to use flavor = "multi_thread"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Honestly, I have just copied tests from tests/sqlite.rs, but can't find any arguments for it. Should I remove it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, if this already existed then this can be left to a followup PR. I think we could have a function like test_storage() (or a number of functions anyway...) so that we test all storages in the same exact way.

async fn test_rocksdb_json() {
fs::remove_dir_all("./test_db1").ok();
fs::create_dir("./test_db1").unwrap();
let storage = RocksDbStorage::open(
"./test_db1/test_db1.rocksdb",
teloxide::dispatching::dialogue::serializer::Json,
None,
)
.await
.unwrap();
test_rocksdb(storage).await;
fs::remove_dir_all("./test_db1").unwrap();
}

#[tokio::test(flavor = "multi_thread")]
async fn test_rocksdb_bincode() {
fs::remove_dir_all("./test_db2").ok();
fs::create_dir("./test_db2").unwrap();
let storage = RocksDbStorage::open(
"./test_db2/test_db2.rocksdb",
teloxide::dispatching::dialogue::serializer::Bincode,
None,
)
.await
.unwrap();
test_rocksdb(storage).await;
fs::remove_dir_all("./test_db2").unwrap();
}

#[tokio::test(flavor = "multi_thread")]
async fn test_rocksdb_cbor() {
fs::remove_dir_all("./test_db3").ok();
fs::create_dir("./test_db3").unwrap();
let storage = RocksDbStorage::open(
"./test_db3/test_db3.rocksdb",
teloxide::dispatching::dialogue::serializer::Cbor,
None,
)
.await
.unwrap();
test_rocksdb(storage).await;
fs::remove_dir_all("./test_db3").unwrap();
}

type Dialogue = String;

macro_rules! test_dialogues {
($storage:expr, $_0:expr, $_1:expr, $_2:expr) => {
assert_eq!(Arc::clone(&$storage).get_dialogue(ChatId(1)).await.unwrap(), $_0);
assert_eq!(Arc::clone(&$storage).get_dialogue(ChatId(11)).await.unwrap(), $_1);
assert_eq!(Arc::clone(&$storage).get_dialogue(ChatId(256)).await.unwrap(), $_2);
};
}

async fn test_rocksdb<S>(storage: Arc<RocksDbStorage<S>>)
where
S: Send + Sync + Serializer<Dialogue> + 'static,
<S as Serializer<Dialogue>>::Error: Debug + Display,
{
test_dialogues!(storage, None, None, None);

Arc::clone(&storage).update_dialogue(ChatId(1), "ABC".to_owned()).await.unwrap();
Arc::clone(&storage).update_dialogue(ChatId(11), "DEF".to_owned()).await.unwrap();
Arc::clone(&storage).update_dialogue(ChatId(256), "GHI".to_owned()).await.unwrap();

test_dialogues!(
storage,
Some("ABC".to_owned()),
Some("DEF".to_owned()),
Some("GHI".to_owned())
);

Arc::clone(&storage).remove_dialogue(ChatId(1)).await.unwrap();
Arc::clone(&storage).remove_dialogue(ChatId(11)).await.unwrap();
Arc::clone(&storage).remove_dialogue(ChatId(256)).await.unwrap();

test_dialogues!(storage, None, None, None);

// Check that a try to remove a non-existing dialogue results in an error.
assert!(matches!(
Arc::clone(&storage).remove_dialogue(ChatId(1)).await.unwrap_err(),
RocksDbStorageError::DialogueNotFound
));
}