Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support generic error types #5

Merged
merged 6 commits into from
Jun 3, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ license = "Apache-2.0"
[dependencies]
tokio = { version = "1.5.0", features = ["full"] }
futures = "0.3.14"
thiserror = "1.0"

# Optional feature based dependencies
lru = { version = "0.6.5", optional = true }
Expand Down
125 changes: 53 additions & 72 deletions src/cache_api.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,27 @@
use tokio::task::JoinHandle;
use std::hash::Hash;
use futures::Future;
use thiserror::Error;
use crate::internal_cache::{CacheAction, InternalCacheStore, CacheMessage};
use crate::backing::{CacheBacking, HashMapBacking};
use std::fmt::Debug;

#[derive(Debug, Clone)]
pub struct CacheLoadingError {
pub reason_phrase: String,
pub loading_error: Option<LoadingError>,
// todo: nested errors
#[derive(Error, Debug)]
pub enum CacheLoadingError<E: Debug> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copying from #3 for contextual proximity:

Potentially, a few helpers for the CacheLoadingError type may be warranted, such as into_loading_error(self) -> Option<E> / as_loading_error(&self) -> Option<&E> helpers.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I implemented these utility functions in my new commit

#[error("An error occurred when trying to submit the cache request")]
TokioMpscSendError(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potentially, tokio errors may be worth grouping together. Do you intend for the user to ever "handle" one of these?

I'd consider a second error type, CacheCommunicationsError or similar, which then contains details on whether it was a Send issue, a failure to join futures, etc.

Generally, I'd expect the user is to be interested in: LoadingError, LookupLoop, and NoData (Is this one actually expected to be seen by the user at all, or is it internally used to trigger a load?). Beyond that- everything else strikes me as "I probably just want to panic if it's one of these".

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one indeed makes sense. I think the only error actually relevant to the user is the LoadingError.
In any other case something is odd with the cache.
LookupLoop happens when the cache state doesnt mutate at all, and it would have to do the same lookup again.
NoData is when you call an update function, but the loader doesn't return success, then there's "NoData" to update/mutate.
The most frequent error case will be the LoadingError though, eventually NoData will be interesting too.
The others are actually really edge cases.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implemented in new commit, thanks for the suggestion!

#[error("An error occurred when trying to join the result future")]
FutureJoinError(tokio::task::JoinError),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider using #[from] for each of these to automatically generate Into implementations:

Suggested change
FutureJoinError(tokio::task::JoinError),
FutureJoinError(#[from] tokio::task::JoinError),

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your suggested change doesn't compile.
I'm not quite sure, but it seems the derive generates some code which clashes with the code generated from the #[from].
I'll check if its a simple thing, or if I just implement the Into traits myself.

Copy link
Contributor

@Dessix Dessix Jun 3, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting- if you post the compilation error, I can suggest what may be breaking.

It's also possible that the issue is because thiserror struggles with Generic Parameters: dtolnay/thiserror#79

#[error("An error occurred when waiting for the broadcaster response")]
TokioBroadcastRecvError(tokio::sync::broadcast::error::RecvError),
#[error("An error occurred when receiving the response")]
TokioOneshotRecvError(tokio::sync::oneshot::error::RecvError),
#[error("Lookups are looping, internal error")]
LookupLoop(),
#[error("No data found")]
NoData(),
#[error("An error occurred when loading the entity from the loader function")]
LoadingError(LoadingError<E>)
}

#[derive(Clone)]
Expand All @@ -17,62 +30,39 @@ pub struct ResultMeta<V> {
pub cached: bool,
}

#[derive(Debug, Clone)]
pub struct LoadingError {
error_code: i16,
reason_phrase: Option<String>,
}

impl LoadingError {
pub const LOADER_INTERNAL_ERROR: i16 = -1;

pub fn new(error_code: i16) -> LoadingError {
LoadingError {
error_code,
reason_phrase: None,
}
}

pub fn with_reason_phrase(error_code: i16, reason_phrase: String) -> LoadingError {
LoadingError {
error_code,
reason_phrase: Some(reason_phrase),
}
}

pub fn get_error_code(&self) -> i16 {
self.error_code
}

pub fn get_reason(&self) -> Option<String> {
self.reason_phrase.clone()
}
#[derive(Error, Debug, Clone)]
pub enum LoadingError<E: Debug> {
#[error("The loader got cancelled")]
LoadCancelled(),
#[error("A custom loader error occurred")]
LoadError(E)
}

#[derive(Debug, Clone)]
pub enum CacheEntry<V> {
pub enum CacheEntry<V, E: Debug> {
Loaded(V),
Loading(tokio::sync::broadcast::Sender<Result<V, LoadingError>>),
Loading(tokio::sync::broadcast::Sender<Result<V, LoadingError<E>>>),
}

#[derive(Debug)]
pub enum CacheResult<V> {
pub enum CacheResult<V, E: Debug> {
Found(V),
Loading(JoinHandle<Result<V, CacheLoadingError>>),
Loading(JoinHandle<Result<V, CacheLoadingError<E>>>),
None,
}

pub type CacheHandle = JoinHandle<()>;

#[derive(Debug, Clone)]
pub struct LoadingCache<K, V> {
tx: tokio::sync::mpsc::Sender<CacheMessage<K, V>>
pub struct LoadingCache<K, V, E: Debug> {
tx: tokio::sync::mpsc::Sender<CacheMessage<K, V, E>>
}

impl<
K: Eq + Hash + Clone + Send + 'static,
V: Clone + Sized + Send + 'static,
> LoadingCache<K, V> {
E: Clone + Sized + Send + Debug + 'static,
> LoadingCache<K, V, E> {
/// Creates a new instance of a LoadingCache with the default `HashMapBacking`
///
/// # Arguments
Expand Down Expand Up @@ -100,7 +90,7 @@ impl<
/// let (cache, _) = LoadingCache::new(move |key: String| {
/// let db_clone = static_db.clone();
/// async move {
/// db_clone.get(&key).cloned().ok_or(LoadingError::new(1))
/// db_clone.get(&key).cloned().ok_or(LoadingError::LoadError(1))
/// }
/// });
///
Expand All @@ -109,8 +99,8 @@ impl<
/// assert_eq!(result, 32);
/// }
/// ```
pub fn new<T, F>(loader: T) -> (LoadingCache<K, V>, CacheHandle)
where F: Future<Output=Result<V, LoadingError>> + Sized + Send + 'static,
pub fn new<T, F>(loader: T) -> (LoadingCache<K, V, E>, CacheHandle)
where F: Future<Output=Result<V, LoadingError<E>>> + Sized + Send + 'static,
T: Fn(K) -> F + Send + 'static {
LoadingCache::with_backing(HashMapBacking::new(), loader)
}
Expand Down Expand Up @@ -146,7 +136,7 @@ impl<
/// move |key: String| {
/// let db_clone = static_db.clone();
/// async move {
/// db_clone.get(&key).cloned().ok_or(LoadingError::new(1))
/// db_clone.get(&key).cloned().ok_or(LoadingError::LoadError(1))
/// }
/// }
/// );
Expand All @@ -156,10 +146,10 @@ impl<
/// assert_eq!(result, 32);
/// }
/// ```
pub fn with_backing<T, F, B>(backing: B, loader: T) -> (LoadingCache<K, V>, CacheHandle)
where F: Future<Output=Result<V, LoadingError>> + Sized + Send + 'static,
pub fn with_backing<T, F, B>(backing: B, loader: T) -> (LoadingCache<K, V, E>, CacheHandle)
where F: Future<Output=Result<V, LoadingError<E>>> + Sized + Send + 'static,
T: Fn(K) -> F + Send + 'static,
B: CacheBacking<K, CacheEntry<V>> + Send + 'static {
B: CacheBacking<K, CacheEntry<V, E>> + Send + 'static {
let (tx, rx) = tokio::sync::mpsc::channel(128);
let store = InternalCacheStore::new(backing, tx.clone(), loader);
let handle = store.run(rx);
Expand All @@ -179,13 +169,13 @@ impl<
/// Returns a Result with:
/// Ok - Value of type V
/// Err - Error of type CacheLoadingError
pub async fn get(&self, key: K) -> Result<V, CacheLoadingError> {
pub async fn get(&self, key: K) -> Result<V, CacheLoadingError<E>> {
self.send_cache_action(CacheAction::Get(key)).await
.map(|opt_result| opt_result.expect("Get should always return either V or CacheLoadingError"))
.map(|meta| meta.result)
}

pub async fn get_with_meta(&self, key: K) -> Result<ResultMeta<V>, CacheLoadingError> {
pub async fn get_with_meta(&self, key: K) -> Result<ResultMeta<V>, CacheLoadingError<E>> {
self.send_cache_action(CacheAction::Get(key)).await
.map(|opt_result| opt_result.expect("Get should always return either V or CacheLoadingError"))
}
Expand All @@ -204,7 +194,7 @@ impl<
/// Ok - Previous value of type V wrapped in an Option depending whether there was a previous
/// value
/// Err - Error of type CacheLoadingError
pub async fn set(&self, key: K, value: V) -> Result<Option<V>, CacheLoadingError> {
pub async fn set(&self, key: K, value: V) -> Result<Option<V>, CacheLoadingError<E>> {
self.send_cache_action(CacheAction::Set(key, value)).await
.map(|opt_meta| opt_meta.map(|meta| meta.result))
}
Expand All @@ -220,7 +210,7 @@ impl<
/// Returns a Result with:
/// Ok - Value of type Option<V>
/// Err - Error of type CacheLoadingError
pub async fn get_if_present(&self, key: K) -> Result<Option<V>, CacheLoadingError> {
pub async fn get_if_present(&self, key: K) -> Result<Option<V>, CacheLoadingError<E>> {
self.send_cache_action(CacheAction::GetIfPresent(key)).await
.map(|opt_meta| opt_meta.map(|meta| meta.result))
}
Expand All @@ -236,7 +226,7 @@ impl<
/// Returns a Result with:
/// Ok - bool
/// Err - Error of type CacheLoadingError
pub async fn exists(&self, key: K) -> Result<bool, CacheLoadingError> {
pub async fn exists(&self, key: K) -> Result<bool, CacheLoadingError<E>> {
self.get_if_present(key).await
.map(|result| result.is_some())
}
Expand All @@ -253,7 +243,7 @@ impl<
/// Returns a Result with:
/// Ok - Value of type Option<V>
/// Err - Error of type CacheLoadingError
pub async fn remove(&self, key: K) -> Result<Option<V>, CacheLoadingError> {
pub async fn remove(&self, key: K) -> Result<Option<V>, CacheLoadingError<E>> {
self.send_cache_action(CacheAction::Remove(key)).await
.map(|opt_meta| opt_meta.map(|meta| meta.result))
}
Expand All @@ -277,21 +267,21 @@ impl<
/// Returns a Result with:
/// Ok - Value of type V which is the previously mapped value
/// Err - Error of type CacheLoadingError
pub async fn update<U>(&self, key: K, update_fn: U) -> Result<V, CacheLoadingError>
pub async fn update<U>(&self, key: K, update_fn: U) -> Result<V, CacheLoadingError<E>>
where U: FnOnce(V) -> V + Send + 'static {
self.send_cache_action(CacheAction::Update(key, Box::new(update_fn))).await
.map(|opt_result| opt_result.expect("Get should always return either V or CacheLoadingError"))
.map(|meta| meta.result)
}

pub async fn update_mut<U>(&self, key: K, update_fn: U) -> Result<V, CacheLoadingError>
pub async fn update_mut<U>(&self, key: K, update_fn: U) -> Result<V, CacheLoadingError<E>>
where U: FnMut(&mut V) -> () + Send + 'static {
self.send_cache_action(CacheAction::UpdateMut(key, Box::new(update_fn))).await
.map(|opt_result| opt_result.expect("Get should always return either V or CacheLoadingError"))
.map(|meta| meta.result)
}

async fn send_cache_action(&self, action: CacheAction<K, V>) -> Result<Option<ResultMeta<V>>, CacheLoadingError> {
async fn send_cache_action(&self, action: CacheAction<K, V>) -> Result<Option<ResultMeta<V>>, CacheLoadingError<E>> {
let (tx, rx) = tokio::sync::oneshot::channel();
match self.tx.send(CacheMessage {
action,
Expand All @@ -315,30 +305,21 @@ impl<
cached: false,
}))
}
Err(_) => {
Err(CacheLoadingError {
reason_phrase: "Error when trying to join loader future".to_owned(),
loading_error: None,
})
Err(err) => {
Err(CacheLoadingError::FutureJoinError(err))
}
}
}
CacheResult::None => { Ok(None) }
}
}
Err(_) => {
Err(CacheLoadingError {
reason_phrase: "Error when receiving cache response".to_owned(),
loading_error: None,
})
Err(err) => {
Err(CacheLoadingError::TokioOneshotRecvError(err))
}
}
}
Err(_) => {
Err(CacheLoadingError {
reason_phrase: "Error when trying to submit cache request".to_owned(),
loading_error: None,
})
Err(CacheLoadingError::TokioMpscSendError())
}
}
}
Expand Down