Skip to content

Commit

Permalink
Support generic error types (#5)
Browse files Browse the repository at this point in the history
* Implement generic error solution

* Remove unnecessary complexity / LoadingError wrapper

* Cleanup CacheLoadingError + Add utility functions
Move internal stuff into CacheCommunicationError

* Implement #[from] in CacheCommunicationError

* Forward nested error

Co-authored-by: Zoey <Dessix@Dessix.net>

Co-authored-by: ByteAlex <bytealex@zerotwo.bot>
Co-authored-by: Zoey <Dessix@Dessix.net>
  • Loading branch information
3 people committed Jun 3, 2021
1 parent 8fbc17f commit 734bb54
Show file tree
Hide file tree
Showing 4 changed files with 151 additions and 135 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Expand Up @@ -14,6 +14,7 @@ license = "Apache-2.0"
[dependencies]
tokio = { version = "1.5.0", features = ["full"] }
futures = "0.3.14"
thiserror = "1.0"

# Optional feature based dependencies
lru = { version = "0.6.5", optional = true }
Expand Down
148 changes: 79 additions & 69 deletions src/cache_api.rs
@@ -1,78 +1,97 @@
use tokio::task::JoinHandle;
use std::hash::Hash;
use futures::Future;
use thiserror::Error;
use crate::internal_cache::{CacheAction, InternalCacheStore, CacheMessage};
use crate::backing::{CacheBacking, HashMapBacking};
use std::fmt::Debug;

#[derive(Debug, Clone)]
pub struct CacheLoadingError {
pub reason_phrase: String,
pub loading_error: Option<LoadingError>,
// todo: nested errors
#[derive(Error, Debug)]
pub enum CacheLoadingError<E: Debug> {
#[error(transparent)]
CommunicationError(CacheCommunicationError),
#[error("No data found")]
NoData(), // todo better handling here? eventually return loadingerror if possible
#[error("An error occurred when loading the entity from the loader function")]
LoadingError(E)
}

#[derive(Clone)]
pub struct ResultMeta<V> {
pub result: V,
pub cached: bool,
#[derive(Error, Debug)]
pub enum CacheCommunicationError {
#[error("An error occurred when trying to submit the cache request")]
TokioMpscSendError(),
#[error("An error occurred when trying to join the result future")]
FutureJoinError(#[from] tokio::task::JoinError),
#[error("An error occurred when waiting for the broadcaster response")]
TokioBroadcastRecvError(#[from] tokio::sync::broadcast::error::RecvError),
#[error("An error occurred when receiving the response")]
TokioOneshotRecvError(#[from] tokio::sync::oneshot::error::RecvError),
#[error("Lookups are looping, internal error")]
LookupLoop(),
}

#[derive(Debug, Clone)]
pub struct LoadingError {
error_code: i16,
reason_phrase: Option<String>,
}
impl<E: Debug> CacheLoadingError<E> {

impl LoadingError {
pub const LOADER_INTERNAL_ERROR: i16 = -1;

pub fn new(error_code: i16) -> LoadingError {
LoadingError {
error_code,
reason_phrase: None,
pub fn as_loading_error(&self) -> Option<&E> {
match self {
CacheLoadingError::LoadingError(error) => Some(error),
_ => None
}
}

pub fn with_reason_phrase(error_code: i16, reason_phrase: String) -> LoadingError {
LoadingError {
error_code,
reason_phrase: Some(reason_phrase),
pub fn into_loading_error(self) -> Option<E> {
match self {
CacheLoadingError::LoadingError(error) => Some(error),
_ => None
}
}

pub fn get_error_code(&self) -> i16 {
self.error_code
pub fn as_communication_error(&self) -> Option<&CacheCommunicationError> {
match self {
CacheLoadingError::CommunicationError(error) => Some(error),
_ => None
}
}

pub fn get_reason(&self) -> Option<String> {
self.reason_phrase.clone()
pub fn into_communication_error(self) -> Option<CacheCommunicationError> {
match self {
CacheLoadingError::CommunicationError(error) => Some(error),
_ => None
}
}
}

#[derive(Clone)]
pub struct ResultMeta<V> {
pub result: V,
pub cached: bool,
}

#[derive(Debug, Clone)]
pub enum CacheEntry<V> {
pub enum CacheEntry<V, E: Debug> {
Loaded(V),
Loading(tokio::sync::broadcast::Sender<Result<V, LoadingError>>),
Loading(tokio::sync::broadcast::Sender<Result<V, E>>),
}

#[derive(Debug)]
pub enum CacheResult<V> {
pub enum CacheResult<V, E: Debug> {
Found(V),
Loading(JoinHandle<Result<V, CacheLoadingError>>),
Loading(JoinHandle<Result<V, CacheLoadingError<E>>>),
None,
}

pub type CacheHandle = JoinHandle<()>;

#[derive(Debug, Clone)]
pub struct LoadingCache<K, V> {
tx: tokio::sync::mpsc::Sender<CacheMessage<K, V>>
pub struct LoadingCache<K, V, E: Debug> {
tx: tokio::sync::mpsc::Sender<CacheMessage<K, V, E>>
}

impl<
K: Eq + Hash + Clone + Send + 'static,
V: Clone + Sized + Send + 'static,
> LoadingCache<K, V> {
E: Clone + Sized + Send + Debug + 'static,
> LoadingCache<K, V, E> {
/// Creates a new instance of a LoadingCache with the default `HashMapBacking`
///
/// # Arguments
Expand All @@ -89,7 +108,7 @@ impl<
/// # Examples
///
/// ```
/// use cache_loader_async::cache_api::{LoadingCache, LoadingError};
/// use cache_loader_async::cache_api::LoadingCache;
/// use std::collections::HashMap;
/// async fn example() {
/// let static_db: HashMap<String, u32> =
Expand All @@ -100,7 +119,7 @@ impl<
/// let (cache, _) = LoadingCache::new(move |key: String| {
/// let db_clone = static_db.clone();
/// async move {
/// db_clone.get(&key).cloned().ok_or(LoadingError::new(1))
/// db_clone.get(&key).cloned().ok_or(1)
/// }
/// });
///
Expand All @@ -109,8 +128,8 @@ impl<
/// assert_eq!(result, 32);
/// }
/// ```
pub fn new<T, F>(loader: T) -> (LoadingCache<K, V>, CacheHandle)
where F: Future<Output=Result<V, LoadingError>> + Sized + Send + 'static,
pub fn new<T, F>(loader: T) -> (LoadingCache<K, V, E>, CacheHandle)
where F: Future<Output=Result<V, E>> + Sized + Send + 'static,
T: Fn(K) -> F + Send + 'static {
LoadingCache::with_backing(HashMapBacking::new(), loader)
}
Expand All @@ -132,7 +151,7 @@ impl<
/// # Examples
///
/// ```
/// use cache_loader_async::cache_api::{LoadingCache, LoadingError};
/// use cache_loader_async::cache_api::LoadingCache;
/// use std::collections::HashMap;
/// use cache_loader_async::backing::HashMapBacking;
/// async fn example() {
Expand All @@ -146,7 +165,7 @@ impl<
/// move |key: String| {
/// let db_clone = static_db.clone();
/// async move {
/// db_clone.get(&key).cloned().ok_or(LoadingError::new(1))
/// db_clone.get(&key).cloned().ok_or(1)
/// }
/// }
/// );
Expand All @@ -156,10 +175,10 @@ impl<
/// assert_eq!(result, 32);
/// }
/// ```
pub fn with_backing<T, F, B>(backing: B, loader: T) -> (LoadingCache<K, V>, CacheHandle)
where F: Future<Output=Result<V, LoadingError>> + Sized + Send + 'static,
pub fn with_backing<T, F, B>(backing: B, loader: T) -> (LoadingCache<K, V, E>, CacheHandle)
where F: Future<Output=Result<V, E>> + Sized + Send + 'static,
T: Fn(K) -> F + Send + 'static,
B: CacheBacking<K, CacheEntry<V>> + Send + 'static {
B: CacheBacking<K, CacheEntry<V, E>> + Send + 'static {
let (tx, rx) = tokio::sync::mpsc::channel(128);
let store = InternalCacheStore::new(backing, tx.clone(), loader);
let handle = store.run(rx);
Expand All @@ -179,13 +198,13 @@ impl<
/// Returns a Result with:
/// Ok - Value of type V
/// Err - Error of type CacheLoadingError
pub async fn get(&self, key: K) -> Result<V, CacheLoadingError> {
pub async fn get(&self, key: K) -> Result<V, CacheLoadingError<E>> {
self.send_cache_action(CacheAction::Get(key)).await
.map(|opt_result| opt_result.expect("Get should always return either V or CacheLoadingError"))
.map(|meta| meta.result)
}

pub async fn get_with_meta(&self, key: K) -> Result<ResultMeta<V>, CacheLoadingError> {
pub async fn get_with_meta(&self, key: K) -> Result<ResultMeta<V>, CacheLoadingError<E>> {
self.send_cache_action(CacheAction::Get(key)).await
.map(|opt_result| opt_result.expect("Get should always return either V or CacheLoadingError"))
}
Expand All @@ -204,7 +223,7 @@ impl<
/// Ok - Previous value of type V wrapped in an Option depending whether there was a previous
/// value
/// Err - Error of type CacheLoadingError
pub async fn set(&self, key: K, value: V) -> Result<Option<V>, CacheLoadingError> {
pub async fn set(&self, key: K, value: V) -> Result<Option<V>, CacheLoadingError<E>> {
self.send_cache_action(CacheAction::Set(key, value)).await
.map(|opt_meta| opt_meta.map(|meta| meta.result))
}
Expand All @@ -220,7 +239,7 @@ impl<
/// Returns a Result with:
/// Ok - Value of type Option<V>
/// Err - Error of type CacheLoadingError
pub async fn get_if_present(&self, key: K) -> Result<Option<V>, CacheLoadingError> {
pub async fn get_if_present(&self, key: K) -> Result<Option<V>, CacheLoadingError<E>> {
self.send_cache_action(CacheAction::GetIfPresent(key)).await
.map(|opt_meta| opt_meta.map(|meta| meta.result))
}
Expand All @@ -236,7 +255,7 @@ impl<
/// Returns a Result with:
/// Ok - bool
/// Err - Error of type CacheLoadingError
pub async fn exists(&self, key: K) -> Result<bool, CacheLoadingError> {
pub async fn exists(&self, key: K) -> Result<bool, CacheLoadingError<E>> {
self.get_if_present(key).await
.map(|result| result.is_some())
}
Expand All @@ -253,7 +272,7 @@ impl<
/// Returns a Result with:
/// Ok - Value of type Option<V>
/// Err - Error of type CacheLoadingError
pub async fn remove(&self, key: K) -> Result<Option<V>, CacheLoadingError> {
pub async fn remove(&self, key: K) -> Result<Option<V>, CacheLoadingError<E>> {
self.send_cache_action(CacheAction::Remove(key)).await
.map(|opt_meta| opt_meta.map(|meta| meta.result))
}
Expand All @@ -277,21 +296,21 @@ impl<
/// Returns a Result with:
/// Ok - Value of type V which is the previously mapped value
/// Err - Error of type CacheLoadingError
pub async fn update<U>(&self, key: K, update_fn: U) -> Result<V, CacheLoadingError>
pub async fn update<U>(&self, key: K, update_fn: U) -> Result<V, CacheLoadingError<E>>
where U: FnOnce(V) -> V + Send + 'static {
self.send_cache_action(CacheAction::Update(key, Box::new(update_fn))).await
.map(|opt_result| opt_result.expect("Get should always return either V or CacheLoadingError"))
.map(|meta| meta.result)
}

pub async fn update_mut<U>(&self, key: K, update_fn: U) -> Result<V, CacheLoadingError>
pub async fn update_mut<U>(&self, key: K, update_fn: U) -> Result<V, CacheLoadingError<E>>
where U: FnMut(&mut V) -> () + Send + 'static {
self.send_cache_action(CacheAction::UpdateMut(key, Box::new(update_fn))).await
.map(|opt_result| opt_result.expect("Get should always return either V or CacheLoadingError"))
.map(|meta| meta.result)
}

async fn send_cache_action(&self, action: CacheAction<K, V>) -> Result<Option<ResultMeta<V>>, CacheLoadingError> {
async fn send_cache_action(&self, action: CacheAction<K, V>) -> Result<Option<ResultMeta<V>>, CacheLoadingError<E>> {
let (tx, rx) = tokio::sync::oneshot::channel();
match self.tx.send(CacheMessage {
action,
Expand All @@ -315,31 +334,22 @@ impl<
cached: false,
}))
}
Err(_) => {
Err(CacheLoadingError {
reason_phrase: "Error when trying to join loader future".to_owned(),
loading_error: None,
})
Err(err) => {
Err(CacheLoadingError::CommunicationError(CacheCommunicationError::FutureJoinError(err)))
}
}
}
CacheResult::None => { Ok(None) }
}
}
Err(_) => {
Err(CacheLoadingError {
reason_phrase: "Error when receiving cache response".to_owned(),
loading_error: None,
})
Err(err) => {
Err(CacheLoadingError::CommunicationError(CacheCommunicationError::TokioOneshotRecvError(err)))
}
}
}
Err(_) => {
Err(CacheLoadingError {
reason_phrase: "Error when trying to submit cache request".to_owned(),
loading_error: None,
})
Err(CacheLoadingError::CommunicationError(CacheCommunicationError::TokioMpscSendError()))
}
}
}
}
}

0 comments on commit 734bb54

Please sign in to comment.