Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support generic error types #5

Merged
merged 6 commits into from
Jun 3, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ license = "Apache-2.0"
[dependencies]
tokio = { version = "1.5.0", features = ["full"] }
futures = "0.3.14"
thiserror = "1.0"

# Optional feature based dependencies
lru = { version = "0.6.5", optional = true }
Expand Down
146 changes: 78 additions & 68 deletions src/cache_api.rs
Original file line number Diff line number Diff line change
@@ -1,78 +1,97 @@
use tokio::task::JoinHandle;
use std::hash::Hash;
use futures::Future;
use thiserror::Error;
use crate::internal_cache::{CacheAction, InternalCacheStore, CacheMessage};
use crate::backing::{CacheBacking, HashMapBacking};
use std::fmt::Debug;

#[derive(Debug, Clone)]
pub struct CacheLoadingError {
pub reason_phrase: String,
pub loading_error: Option<LoadingError>,
// todo: nested errors
}

#[derive(Clone)]
pub struct ResultMeta<V> {
pub result: V,
pub cached: bool,
#[derive(Error, Debug)]
pub enum CacheLoadingError<E: Debug> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copying from #3 for contextual proximity:

Potentially, a few helpers for the CacheLoadingError type may be warranted, such as into_loading_error(self) -> Option<E> / as_loading_error(&self) -> Option<&E> helpers.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I implemented these utility functions in my new commit

#[error("Cache communication error")]
ByteAlex marked this conversation as resolved.
Show resolved Hide resolved
CommunicationError(CacheCommunicationError),
#[error("No data found")]
NoData(), // todo better handling here? eventually return loadingerror if possible
#[error("An error occurred when loading the entity from the loader function")]
LoadingError(E)
}

#[derive(Debug, Clone)]
pub struct LoadingError {
error_code: i16,
reason_phrase: Option<String>,
#[derive(Error, Debug)]
pub enum CacheCommunicationError {
#[error("An error occurred when trying to submit the cache request")]
TokioMpscSendError(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potentially, tokio errors may be worth grouping together. Do you intend for the user to ever "handle" one of these?

I'd consider a second error type, CacheCommunicationsError or similar, which then contains details on whether it was a Send issue, a failure to join futures, etc.

Generally, I'd expect the user is to be interested in: LoadingError, LookupLoop, and NoData (Is this one actually expected to be seen by the user at all, or is it internally used to trigger a load?). Beyond that- everything else strikes me as "I probably just want to panic if it's one of these".

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one indeed makes sense. I think the only error actually relevant to the user is the LoadingError.
In any other case something is odd with the cache.
LookupLoop happens when the cache state doesnt mutate at all, and it would have to do the same lookup again.
NoData is when you call an update function, but the loader doesn't return success, then there's "NoData" to update/mutate.
The most frequent error case will be the LoadingError though, eventually NoData will be interesting too.
The others are actually really edge cases.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implemented in new commit, thanks for the suggestion!

#[error("An error occurred when trying to join the result future")]
FutureJoinError(#[from] tokio::task::JoinError),
#[error("An error occurred when waiting for the broadcaster response")]
TokioBroadcastRecvError(#[from] tokio::sync::broadcast::error::RecvError),
#[error("An error occurred when receiving the response")]
TokioOneshotRecvError(#[from] tokio::sync::oneshot::error::RecvError),
#[error("Lookups are looping, internal error")]
LookupLoop(),
}

impl LoadingError {
pub const LOADER_INTERNAL_ERROR: i16 = -1;
impl<E: Debug> CacheLoadingError<E> {

pub fn new(error_code: i16) -> LoadingError {
LoadingError {
error_code,
reason_phrase: None,
pub fn as_loading_error(&self) -> Option<&E> {
match self {
CacheLoadingError::LoadingError(error) => Some(error),
_ => None
}
}

pub fn with_reason_phrase(error_code: i16, reason_phrase: String) -> LoadingError {
LoadingError {
error_code,
reason_phrase: Some(reason_phrase),
pub fn into_loading_error(self) -> Option<E> {
match self {
CacheLoadingError::LoadingError(error) => Some(error),
_ => None
}
}

pub fn get_error_code(&self) -> i16 {
self.error_code
pub fn as_communication_error(&self) -> Option<&CacheCommunicationError> {
match self {
CacheLoadingError::CommunicationError(error) => Some(error),
_ => None
}
}

pub fn get_reason(&self) -> Option<String> {
self.reason_phrase.clone()
pub fn into_communication_error(self) -> Option<CacheCommunicationError> {
match self {
CacheLoadingError::CommunicationError(error) => Some(error),
_ => None
}
}
}

#[derive(Clone)]
pub struct ResultMeta<V> {
pub result: V,
pub cached: bool,
}

#[derive(Debug, Clone)]
pub enum CacheEntry<V> {
pub enum CacheEntry<V, E: Debug> {
Loaded(V),
Loading(tokio::sync::broadcast::Sender<Result<V, LoadingError>>),
Loading(tokio::sync::broadcast::Sender<Result<V, E>>),
}

#[derive(Debug)]
pub enum CacheResult<V> {
pub enum CacheResult<V, E: Debug> {
Found(V),
Loading(JoinHandle<Result<V, CacheLoadingError>>),
Loading(JoinHandle<Result<V, CacheLoadingError<E>>>),
None,
}

pub type CacheHandle = JoinHandle<()>;

#[derive(Debug, Clone)]
pub struct LoadingCache<K, V> {
tx: tokio::sync::mpsc::Sender<CacheMessage<K, V>>
pub struct LoadingCache<K, V, E: Debug> {
tx: tokio::sync::mpsc::Sender<CacheMessage<K, V, E>>
}

impl<
K: Eq + Hash + Clone + Send + 'static,
V: Clone + Sized + Send + 'static,
> LoadingCache<K, V> {
E: Clone + Sized + Send + Debug + 'static,
> LoadingCache<K, V, E> {
/// Creates a new instance of a LoadingCache with the default `HashMapBacking`
///
/// # Arguments
Expand All @@ -89,7 +108,7 @@ impl<
/// # Examples
///
/// ```
/// use cache_loader_async::cache_api::{LoadingCache, LoadingError};
/// use cache_loader_async::cache_api::LoadingCache;
/// use std::collections::HashMap;
/// async fn example() {
/// let static_db: HashMap<String, u32> =
Expand All @@ -100,7 +119,7 @@ impl<
/// let (cache, _) = LoadingCache::new(move |key: String| {
/// let db_clone = static_db.clone();
/// async move {
/// db_clone.get(&key).cloned().ok_or(LoadingError::new(1))
/// db_clone.get(&key).cloned().ok_or(1)
/// }
/// });
///
Expand All @@ -109,8 +128,8 @@ impl<
/// assert_eq!(result, 32);
/// }
/// ```
pub fn new<T, F>(loader: T) -> (LoadingCache<K, V>, CacheHandle)
where F: Future<Output=Result<V, LoadingError>> + Sized + Send + 'static,
pub fn new<T, F>(loader: T) -> (LoadingCache<K, V, E>, CacheHandle)
where F: Future<Output=Result<V, E>> + Sized + Send + 'static,
T: Fn(K) -> F + Send + 'static {
LoadingCache::with_backing(HashMapBacking::new(), loader)
}
Expand All @@ -132,7 +151,7 @@ impl<
/// # Examples
///
/// ```
/// use cache_loader_async::cache_api::{LoadingCache, LoadingError};
/// use cache_loader_async::cache_api::LoadingCache;
/// use std::collections::HashMap;
/// use cache_loader_async::backing::HashMapBacking;
/// async fn example() {
Expand All @@ -146,7 +165,7 @@ impl<
/// move |key: String| {
/// let db_clone = static_db.clone();
/// async move {
/// db_clone.get(&key).cloned().ok_or(LoadingError::new(1))
/// db_clone.get(&key).cloned().ok_or(1)
/// }
/// }
/// );
Expand All @@ -156,10 +175,10 @@ impl<
/// assert_eq!(result, 32);
/// }
/// ```
pub fn with_backing<T, F, B>(backing: B, loader: T) -> (LoadingCache<K, V>, CacheHandle)
where F: Future<Output=Result<V, LoadingError>> + Sized + Send + 'static,
pub fn with_backing<T, F, B>(backing: B, loader: T) -> (LoadingCache<K, V, E>, CacheHandle)
where F: Future<Output=Result<V, E>> + Sized + Send + 'static,
T: Fn(K) -> F + Send + 'static,
B: CacheBacking<K, CacheEntry<V>> + Send + 'static {
B: CacheBacking<K, CacheEntry<V, E>> + Send + 'static {
let (tx, rx) = tokio::sync::mpsc::channel(128);
let store = InternalCacheStore::new(backing, tx.clone(), loader);
let handle = store.run(rx);
Expand All @@ -179,13 +198,13 @@ impl<
/// Returns a Result with:
/// Ok - Value of type V
/// Err - Error of type CacheLoadingError
pub async fn get(&self, key: K) -> Result<V, CacheLoadingError> {
pub async fn get(&self, key: K) -> Result<V, CacheLoadingError<E>> {
self.send_cache_action(CacheAction::Get(key)).await
.map(|opt_result| opt_result.expect("Get should always return either V or CacheLoadingError"))
.map(|meta| meta.result)
}

pub async fn get_with_meta(&self, key: K) -> Result<ResultMeta<V>, CacheLoadingError> {
pub async fn get_with_meta(&self, key: K) -> Result<ResultMeta<V>, CacheLoadingError<E>> {
self.send_cache_action(CacheAction::Get(key)).await
.map(|opt_result| opt_result.expect("Get should always return either V or CacheLoadingError"))
}
Expand All @@ -204,7 +223,7 @@ impl<
/// Ok - Previous value of type V wrapped in an Option depending whether there was a previous
/// value
/// Err - Error of type CacheLoadingError
pub async fn set(&self, key: K, value: V) -> Result<Option<V>, CacheLoadingError> {
pub async fn set(&self, key: K, value: V) -> Result<Option<V>, CacheLoadingError<E>> {
self.send_cache_action(CacheAction::Set(key, value)).await
.map(|opt_meta| opt_meta.map(|meta| meta.result))
}
Expand All @@ -220,7 +239,7 @@ impl<
/// Returns a Result with:
/// Ok - Value of type Option<V>
/// Err - Error of type CacheLoadingError
pub async fn get_if_present(&self, key: K) -> Result<Option<V>, CacheLoadingError> {
pub async fn get_if_present(&self, key: K) -> Result<Option<V>, CacheLoadingError<E>> {
self.send_cache_action(CacheAction::GetIfPresent(key)).await
.map(|opt_meta| opt_meta.map(|meta| meta.result))
}
Expand All @@ -236,7 +255,7 @@ impl<
/// Returns a Result with:
/// Ok - bool
/// Err - Error of type CacheLoadingError
pub async fn exists(&self, key: K) -> Result<bool, CacheLoadingError> {
pub async fn exists(&self, key: K) -> Result<bool, CacheLoadingError<E>> {
self.get_if_present(key).await
.map(|result| result.is_some())
}
Expand All @@ -253,7 +272,7 @@ impl<
/// Returns a Result with:
/// Ok - Value of type Option<V>
/// Err - Error of type CacheLoadingError
pub async fn remove(&self, key: K) -> Result<Option<V>, CacheLoadingError> {
pub async fn remove(&self, key: K) -> Result<Option<V>, CacheLoadingError<E>> {
self.send_cache_action(CacheAction::Remove(key)).await
.map(|opt_meta| opt_meta.map(|meta| meta.result))
}
Expand All @@ -277,21 +296,21 @@ impl<
/// Returns a Result with:
/// Ok - Value of type V which is the previously mapped value
/// Err - Error of type CacheLoadingError
pub async fn update<U>(&self, key: K, update_fn: U) -> Result<V, CacheLoadingError>
pub async fn update<U>(&self, key: K, update_fn: U) -> Result<V, CacheLoadingError<E>>
where U: FnOnce(V) -> V + Send + 'static {
self.send_cache_action(CacheAction::Update(key, Box::new(update_fn))).await
.map(|opt_result| opt_result.expect("Get should always return either V or CacheLoadingError"))
.map(|meta| meta.result)
}

pub async fn update_mut<U>(&self, key: K, update_fn: U) -> Result<V, CacheLoadingError>
pub async fn update_mut<U>(&self, key: K, update_fn: U) -> Result<V, CacheLoadingError<E>>
where U: FnMut(&mut V) -> () + Send + 'static {
self.send_cache_action(CacheAction::UpdateMut(key, Box::new(update_fn))).await
.map(|opt_result| opt_result.expect("Get should always return either V or CacheLoadingError"))
.map(|meta| meta.result)
}

async fn send_cache_action(&self, action: CacheAction<K, V>) -> Result<Option<ResultMeta<V>>, CacheLoadingError> {
async fn send_cache_action(&self, action: CacheAction<K, V>) -> Result<Option<ResultMeta<V>>, CacheLoadingError<E>> {
let (tx, rx) = tokio::sync::oneshot::channel();
match self.tx.send(CacheMessage {
action,
Expand All @@ -315,30 +334,21 @@ impl<
cached: false,
}))
}
Err(_) => {
Err(CacheLoadingError {
reason_phrase: "Error when trying to join loader future".to_owned(),
loading_error: None,
})
Err(err) => {
Err(CacheLoadingError::CommunicationError(CacheCommunicationError::FutureJoinError(err)))
}
}
}
CacheResult::None => { Ok(None) }
}
}
Err(_) => {
Err(CacheLoadingError {
reason_phrase: "Error when receiving cache response".to_owned(),
loading_error: None,
})
Err(err) => {
Err(CacheLoadingError::CommunicationError(CacheCommunicationError::TokioOneshotRecvError(err)))
}
}
}
Err(_) => {
Err(CacheLoadingError {
reason_phrase: "Error when trying to submit cache request".to_owned(),
loading_error: None,
})
Err(CacheLoadingError::CommunicationError(CacheCommunicationError::TokioMpscSendError()))
}
}
}
Expand Down