From b350a9fe2255a3de79fc5d2f958d1e8dd6a392b9 Mon Sep 17 00:00:00 2001 From: Tatsuya Kawano Date: Sat, 25 Jun 2022 08:58:07 +0000 Subject: [PATCH 01/11] Update entry's timestamps immediately after insertion - `EntryInfo`'s timestamps are no longer optional and set when it is inserted. - Add `is_dirty` flag to the `EntryInfo` to indicate it was just inserted but has not been processed by the housekeeper. - Update the unit tests for `invalidate_all`; remove `cache.sync()` after insertions. --- src/common/concurrent.rs | 16 +++++--- .../concurrent/atomic_time/atomic_time.rs | 6 ++- src/common/concurrent/entry_info.rs | 39 +++++++++++++++---- src/dash/base_cache.rs | 39 ++++++++++++------- src/dash/cache.rs | 5 ++- src/future/cache.rs | 23 ++++++++++- src/sync/cache.rs | 5 ++- src/sync/segment.rs | 5 ++- src/sync_base/base_cache.rs | 39 ++++++++++++------- 9 files changed, 128 insertions(+), 49 deletions(-) diff --git a/src/common/concurrent.rs b/src/common/concurrent.rs index 2deefc8e..f86cfb95 100644 --- a/src/common/concurrent.rs +++ b/src/common/concurrent.rs @@ -212,10 +212,6 @@ impl ValueEntry { write_order_q_node: other_nodes.write_order_q_node, } }; - // To prevent this updated ValueEntry from being evicted by an expiration policy, - // set the max value to the timestamps. They will be replaced with the real - // timestamps when applying writes. - entry_info.reset_timestamps(); Self { value, info: entry_info, @@ -231,8 +227,16 @@ impl ValueEntry { self.info.is_admitted() } - pub(crate) fn set_is_admitted(&self, value: bool) { - self.info.set_is_admitted(value); + pub(crate) fn set_admitted(&self, value: bool) { + self.info.set_admitted(value); + } + + pub(crate) fn is_dirty(&self) -> bool { + self.info.is_dirty() + } + + pub(crate) fn set_dirty(&self, value: bool) { + self.info.set_dirty(value); } #[inline] diff --git a/src/common/concurrent/atomic_time/atomic_time.rs b/src/common/concurrent/atomic_time/atomic_time.rs index e6057573..45a85c2f 100644 --- a/src/common/concurrent/atomic_time/atomic_time.rs +++ b/src/common/concurrent/atomic_time/atomic_time.rs @@ -21,8 +21,10 @@ impl Default for AtomicInstant { // quanta v0.10.0 no longer provides `quanta::Instant::as_u64` method. impl AtomicInstant { - pub(crate) fn reset(&self) { - self.instant.store(std::u64::MAX, Ordering::Release); + pub(crate) fn new(timestamp: Instant) -> Self { + let ai = Self::default(); + ai.set_instant(timestamp); + ai } pub(crate) fn is_set(&self) -> bool { diff --git a/src/common/concurrent/entry_info.rs b/src/common/concurrent/entry_info.rs index 25f6208e..31d22d00 100644 --- a/src/common/concurrent/entry_info.rs +++ b/src/common/concurrent/entry_info.rs @@ -5,6 +5,7 @@ use crate::common::{concurrent::atomic_time::AtomicInstant, time::Instant}; pub(crate) struct EntryInfo { is_admitted: AtomicBool, + is_dirty: AtomicBool, last_accessed: AtomicInstant, last_modified: AtomicInstant, policy_weight: AtomicU32, @@ -12,14 +13,15 @@ pub(crate) struct EntryInfo { impl EntryInfo { #[inline] - pub(crate) fn new(policy_weight: u32) -> Self { + pub(crate) fn new(timestamp: Instant, policy_weight: u32) -> Self { #[cfg(feature = "unstable-debug-counters")] super::debug_counters::InternalGlobalDebugCounters::entry_info_created(); Self { is_admitted: Default::default(), - last_accessed: Default::default(), - last_modified: Default::default(), + is_dirty: AtomicBool::new(true), + last_accessed: AtomicInstant::new(timestamp), + last_modified: AtomicInstant::new(timestamp), policy_weight: AtomicU32::new(policy_weight), } } @@ -30,14 +32,18 @@ impl EntryInfo { } #[inline] - pub(crate) fn set_is_admitted(&self, value: bool) { + pub(crate) fn set_admitted(&self, value: bool) { self.is_admitted.store(value, Ordering::Release); } #[inline] - pub(crate) fn reset_timestamps(&self) { - self.last_accessed.reset(); - self.last_modified.reset(); + pub(crate) fn is_dirty(&self) -> bool { + self.is_dirty.load(Ordering::Acquire) + } + + #[inline] + pub(crate) fn set_dirty(&self, value: bool) { + self.is_dirty.store(value, Ordering::Release); } #[inline] @@ -78,3 +84,22 @@ impl AccessTime for EntryInfo { self.last_modified.set_instant(timestamp); } } + +#[cfg(test)] +mod test { + use super::EntryInfo; + + // Ignore this test by default as struct size may change in the future. + // #[ignore] + #[test] + fn check_struct_size() { + use std::mem::size_of; + + // As of Rust 1.61. + if cfg!(target_pointer_width = "64") || cfg!(target_pointer_width = "32") { + assert_eq!(size_of::(), 24); + } else { + // ignore + } + } +} diff --git a/src/dash/base_cache.rs b/src/dash/base_cache.rs index 3aa47eb7..6e7b7fdf 100644 --- a/src/dash/base_cache.rs +++ b/src/dash/base_cache.rs @@ -258,6 +258,7 @@ where #[inline] pub(crate) fn do_insert_with_hash(&self, key: Arc, hash: u64, value: V) -> WriteOp { + let ts = self.inner.current_time_from_expiration_clock(); let weight = self.inner.weigh(&key, &value); let mut insert_op = None; let mut update_op = None; @@ -273,7 +274,7 @@ where // prevent this new ValueEntry from being evicted by an expiration policy. // 3. This method will update the policy_weight with the new weight. let old_weight = entry.policy_weight(); - *entry = self.new_value_entry_from(value.clone(), weight, entry); + *entry = self.new_value_entry_from(value.clone(), ts, weight, entry); update_op = Some(WriteOp::Upsert { key_hash: KeyHash::new(Arc::clone(&key), hash), value_entry: TrioArc::clone(entry), @@ -283,7 +284,7 @@ where }) // Insert .or_insert_with(|| { - let entry = self.new_value_entry(value.clone(), weight); + let entry = self.new_value_entry(value.clone(), ts, weight); insert_op = Some(WriteOp::Upsert { key_hash: KeyHash::new(Arc::clone(&key), hash), value_entry: TrioArc::clone(&entry), @@ -301,8 +302,13 @@ where } #[inline] - fn new_value_entry(&self, value: V, policy_weight: u32) -> TrioArc> { - let info = TrioArc::new(EntryInfo::new(policy_weight)); + fn new_value_entry( + &self, + value: V, + timestamp: Instant, + policy_weight: u32, + ) -> TrioArc> { + let info = TrioArc::new(EntryInfo::new(timestamp, policy_weight)); TrioArc::new(ValueEntry::new(value, info)) } @@ -310,10 +316,16 @@ where fn new_value_entry_from( &self, value: V, + timestamp: Instant, policy_weight: u32, other: &ValueEntry, ) -> TrioArc> { let info = TrioArc::clone(other.entry_info()); + // To prevent this updated ValueEntry from being evicted by an expiration policy, + // set the dirty flag to true. It will be reset to false when the write is applied. + info.set_dirty(true); + info.set_last_accessed(timestamp); + info.set_last_modified(timestamp); info.set_policy_weight(policy_weight); TrioArc::new(ValueEntry::new_from(value, info, other)) } @@ -773,7 +785,6 @@ where use WriteOp::*; let freq = self.frequency_sketch.read(); let ch = &self.write_op_ch; - let ts = self.current_time_from_expiration_clock(); for _ in 0..count { match ch.try_recv() { @@ -782,9 +793,7 @@ where value_entry: entry, old_weight, new_weight, - }) => { - self.handle_upsert(kh, entry, old_weight, new_weight, ts, deqs, &freq, counters) - } + }) => self.handle_upsert(kh, entry, old_weight, new_weight, deqs, &freq, counters), Ok(Remove(KvEntry { key: _key, entry })) => { Self::handle_remove(deqs, entry, counters) } @@ -800,13 +809,13 @@ where entry: TrioArc>, old_weight: u32, new_weight: u32, - timestamp: Instant, deqs: &mut Deques, freq: &FrequencySketch, counters: &mut EvictionCounters, ) { - entry.set_last_accessed(timestamp); - entry.set_last_modified(timestamp); + // entry.set_last_accessed(timestamp); + // entry.set_last_modified(timestamp); + entry.set_dirty(false); if entry.is_admitted() { // The entry has been already admitted, so treat this as an update. @@ -971,7 +980,7 @@ where if self.is_write_order_queue_enabled() { deqs.push_back_wo(KeyDate::new(key, entry.entry_info()), entry); } - entry.set_is_admitted(true); + entry.set_admitted(true); } fn handle_remove( @@ -980,7 +989,7 @@ where counters: &mut EvictionCounters, ) { if entry.is_admitted() { - entry.set_is_admitted(false); + entry.set_admitted(false); counters.saturating_sub(1, entry.policy_weight()); // The following two unlink_* functions will unset the deq nodes. deqs.unlink_ao(&entry); @@ -998,7 +1007,7 @@ where counters: &mut EvictionCounters, ) { if entry.is_admitted() { - entry.set_is_admitted(false); + entry.set_admitted(false); counters.saturating_sub(1, entry.policy_weight()); // The following two unlink_* functions will unset the deq nodes. Deques::unlink_ao_from_deque(ao_deq_name, ao_deq, &entry); @@ -1090,7 +1099,7 @@ where write_order_deq: &mut Deque>, ) -> bool { if let Some(entry) = self.cache.get(key) { - if entry.last_accessed().is_none() { + if entry.is_dirty() { // The key exists and the entry has been updated. Deques::move_to_back_ao_in_deque(deq_name, deq, &entry); Deques::move_to_back_wo_in_deque(write_order_deq, &entry); diff --git a/src/dash/cache.rs b/src/dash/cache.rs index 90c92bb2..2d7832f1 100644 --- a/src/dash/cache.rs +++ b/src/dash/cache.rs @@ -818,7 +818,10 @@ mod tests { assert!(cache.contains_key(&"a")); assert!(cache.contains_key(&"b")); assert!(cache.contains_key(&"c")); - cache.sync(); + + // `cache.sync()` is no longer needed here before invalidating. The last + // modified timestamp of the entries were updated when they were inserted. + // https://github.com/moka-rs/moka/issues/155 cache.invalidate_all(); cache.sync(); diff --git a/src/future/cache.rs b/src/future/cache.rs index 46e65cd9..d7878846 100644 --- a/src/future/cache.rs +++ b/src/future/cache.rs @@ -1374,7 +1374,10 @@ mod tests { assert!(cache.contains_key(&"a")); assert!(cache.contains_key(&"b")); assert!(cache.contains_key(&"c")); - cache.sync(); + + // `cache.sync()` is no longer needed here before invalidating. The last + // modified timestamp of the entries were updated when they were inserted. + // https://github.com/moka-rs/moka/issues/155 cache.invalidate_all(); cache.sync(); @@ -1392,6 +1395,24 @@ mod tests { assert!(cache.contains_key(&"d")); } + // This test is for https://github.com/moka-rs/moka/issues/155 + #[tokio::test] + async fn invalidate_all_without_sync() { + let cache = Cache::new(1024); + + assert_eq!(cache.get(&0), None); + cache.insert(0, 1).await; + assert_eq!(cache.get(&0), Some(1)); + + // use moka::sync::ConcurrentCacheExt; + // cache.sync(); + + // Timer::after(Duration::from_millis(520)).await; + + cache.invalidate_all(); + assert_eq!(cache.get(&0), None); + } + #[tokio::test] async fn invalidate_entries_if() -> Result<(), Box> { use std::collections::HashSet; diff --git a/src/sync/cache.rs b/src/sync/cache.rs index df8a81c4..2349c794 100644 --- a/src/sync/cache.rs +++ b/src/sync/cache.rs @@ -1194,7 +1194,10 @@ mod tests { assert!(cache.contains_key(&"a")); assert!(cache.contains_key(&"b")); assert!(cache.contains_key(&"c")); - cache.sync(); + + // `cache.sync()` is no longer needed here before invalidating. The last + // modified timestamp of the entries were updated when they were inserted. + // https://github.com/moka-rs/moka/issues/155 cache.invalidate_all(); cache.sync(); diff --git a/src/sync/segment.rs b/src/sync/segment.rs index c4417754..7d07eef6 100644 --- a/src/sync/segment.rs +++ b/src/sync/segment.rs @@ -874,7 +874,10 @@ mod tests { assert!(cache.contains_key(&"a")); assert!(cache.contains_key(&"b")); assert!(cache.contains_key(&"c")); - cache.sync(); + + // `cache.sync()` is no longer needed here before invalidating. The last + // modified timestamp of the entries were updated when they were inserted. + // https://github.com/moka-rs/moka/issues/155 cache.invalidate_all(); cache.sync(); diff --git a/src/sync_base/base_cache.rs b/src/sync_base/base_cache.rs index 1876fded..4a02d064 100644 --- a/src/sync_base/base_cache.rs +++ b/src/sync_base/base_cache.rs @@ -299,6 +299,7 @@ where #[inline] pub(crate) fn do_insert_with_hash(&self, key: Arc, hash: u64, value: V) -> WriteOp { + let ts = self.inner.current_time_from_expiration_clock(); let weight = self.inner.weigh(&key, &value); let op_cnt1 = Rc::new(AtomicU8::new(0)); let op_cnt2 = Rc::clone(&op_cnt1); @@ -318,7 +319,7 @@ where hash, // on_insert || { - let entry = self.new_value_entry(value.clone(), weight); + let entry = self.new_value_entry(value.clone(), ts, weight); let cnt = op_cnt1.fetch_add(1, Ordering::Relaxed); op1 = Some(( cnt, @@ -339,7 +340,7 @@ where // prevent this new ValueEntry from being evicted by an expiration policy. // 3. This method will update the policy_weight with the new weight. let old_weight = old_entry.policy_weight(); - let entry = self.new_value_entry_from(value.clone(), weight, old_entry); + let entry = self.new_value_entry_from(value.clone(), ts, weight, old_entry); let cnt = op_cnt2.fetch_add(1, Ordering::Relaxed); op2 = Some(( cnt, @@ -374,8 +375,13 @@ where } #[inline] - fn new_value_entry(&self, value: V, policy_weight: u32) -> TrioArc> { - let info = TrioArc::new(EntryInfo::new(policy_weight)); + fn new_value_entry( + &self, + value: V, + timestamp: Instant, + policy_weight: u32, + ) -> TrioArc> { + let info = TrioArc::new(EntryInfo::new(timestamp, policy_weight)); TrioArc::new(ValueEntry::new(value, info)) } @@ -383,10 +389,16 @@ where fn new_value_entry_from( &self, value: V, + timestamp: Instant, policy_weight: u32, other: &ValueEntry, ) -> TrioArc> { let info = TrioArc::clone(other.entry_info()); + // To prevent this updated ValueEntry from being evicted by an expiration policy, + // set the dirty flag to true. It will be reset to false when the write is applied. + info.set_dirty(true); + info.set_last_accessed(timestamp); + info.set_last_modified(timestamp); info.set_policy_weight(policy_weight); TrioArc::new(ValueEntry::new_from(value, info, other)) } @@ -954,7 +966,6 @@ where use WriteOp::*; let freq = self.frequency_sketch.read(); let ch = &self.write_op_ch; - let ts = self.current_time_from_expiration_clock(); for _ in 0..count { match ch.try_recv() { @@ -963,9 +974,7 @@ where value_entry: entry, old_weight, new_weight, - }) => { - self.handle_upsert(kh, entry, old_weight, new_weight, ts, deqs, &freq, counters) - } + }) => self.handle_upsert(kh, entry, old_weight, new_weight, deqs, &freq, counters), Ok(Remove(KvEntry { key: _key, entry })) => { Self::handle_remove(deqs, entry, counters) } @@ -981,13 +990,13 @@ where entry: TrioArc>, old_weight: u32, new_weight: u32, - timestamp: Instant, deqs: &mut Deques, freq: &FrequencySketch, counters: &mut EvictionCounters, ) { - entry.set_last_accessed(timestamp); - entry.set_last_modified(timestamp); + // entry.set_last_accessed(timestamp); + // entry.set_last_modified(timestamp); + entry.set_dirty(false); if entry.is_admitted() { // The entry has been already admitted, so treat this as an update. @@ -1154,7 +1163,7 @@ where if self.is_write_order_queue_enabled() { deqs.push_back_wo(KeyDate::new(key, entry.entry_info()), entry); } - entry.set_is_admitted(true); + entry.set_admitted(true); } fn handle_remove( @@ -1163,7 +1172,7 @@ where counters: &mut EvictionCounters, ) { if entry.is_admitted() { - entry.set_is_admitted(false); + entry.set_admitted(false); counters.saturating_sub(1, entry.policy_weight()); // The following two unlink_* functions will unset the deq nodes. deqs.unlink_ao(&entry); @@ -1181,7 +1190,7 @@ where counters: &mut EvictionCounters, ) { if entry.is_admitted() { - entry.set_is_admitted(false); + entry.set_admitted(false); counters.saturating_sub(1, entry.policy_weight()); // The following two unlink_* functions will unset the deq nodes. Deques::unlink_ao_from_deque(ao_deq_name, ao_deq, &entry); @@ -1274,7 +1283,7 @@ where write_order_deq: &mut Deque>, ) -> bool { if let Some(entry) = self.cache.get(key, hash) { - if entry.last_accessed().is_none() { + if entry.is_dirty() { // The key exists and the entry has been updated. Deques::move_to_back_ao_in_deque(deq_name, deq, &entry); Deques::move_to_back_wo_in_deque(write_order_deq, &entry); From 124183d2f1b58661266de0668ffde9400f8a9b26 Mon Sep 17 00:00:00 2001 From: Tatsuya Kawano Date: Sat, 25 Jun 2022 10:22:28 +0000 Subject: [PATCH 02/11] Update entry's timestamps immediately after insertion Add `AtomicInstant::new` function `atomic_time_compat` module like we did for`atomic_time` module in a previous commit. --- .../atomic_time/atomic_time_compat.rs | 6 ++++-- src/common/concurrent/entry_info.rs | 19 ++++++++++++++++--- 2 files changed, 20 insertions(+), 5 deletions(-) diff --git a/src/common/concurrent/atomic_time/atomic_time_compat.rs b/src/common/concurrent/atomic_time/atomic_time_compat.rs index 880eff6e..2a65cc46 100644 --- a/src/common/concurrent/atomic_time/atomic_time_compat.rs +++ b/src/common/concurrent/atomic_time/atomic_time_compat.rs @@ -15,8 +15,10 @@ impl Default for AtomicInstant { } impl AtomicInstant { - pub(crate) fn reset(&self) { - *self.instant.write() = None; + pub(crate) fn new(timestamp: Instant) -> Self { + let ai = Self::default(); + ai.set_instant(timestamp); + ai } pub(crate) fn is_set(&self) -> bool { diff --git a/src/common/concurrent/entry_info.rs b/src/common/concurrent/entry_info.rs index 31d22d00..9609dfbc 100644 --- a/src/common/concurrent/entry_info.rs +++ b/src/common/concurrent/entry_info.rs @@ -96,10 +96,23 @@ mod test { use std::mem::size_of; // As of Rust 1.61. - if cfg!(target_pointer_width = "64") || cfg!(target_pointer_width = "32") { - assert_eq!(size_of::(), 24); + let size = if cfg!(target_pointer_width = "64") { + if cfg!(feature = "quanta") { + 24 + } else { + 72 + } + } else if cfg!(target_pointer_width = "32") { + if cfg!(feature = "quanta") { + 24 + } else { + 48 + } } else { // ignore - } + return; + }; + + assert_eq!(size_of::(), size); } } From 32d9e33771866819aea70da0213efe05c8cd8e6b Mon Sep 17 00:00:00 2001 From: Tatsuya Kawano Date: Sat, 25 Jun 2022 10:36:33 +0000 Subject: [PATCH 03/11] Update entry's timestamps immediately after insertion Add `AtomicInstant::new` function `atomic_time_compat` module like we did for`atomic_time` module in a previous commit. --- src/common/concurrent/entry_info.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/common/concurrent/entry_info.rs b/src/common/concurrent/entry_info.rs index 9609dfbc..9de7b0f5 100644 --- a/src/common/concurrent/entry_info.rs +++ b/src/common/concurrent/entry_info.rs @@ -106,7 +106,7 @@ mod test { if cfg!(feature = "quanta") { 24 } else { - 48 + 40 } } else { // ignore From 68229d251513ef9c7aabb64d63743aea95e8686c Mon Sep 17 00:00:00 2001 From: Tatsuya Kawano Date: Sat, 25 Jun 2022 10:36:45 +0000 Subject: [PATCH 04/11] Update the change log --- CHANGELOG.md | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 38d892bd..f25d90d1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,13 @@ # Moka Cache — Change Log +## Version 0.8.6 + +### Fixed + +- Fix a bug caused `invalidate_all` will not invalidate entries inserted just + before calling it. ([#155](gh-issue-0155)) + + ## Version 0.8.5 ### Added @@ -368,6 +376,7 @@ The minimum supported Rust version (MSRV) is now 1.51.0 (2021-03-25). [panic_in_quanta]: https://github.com/moka-rs/moka#integer-overflow-in-quanta-crate-on-some-x86_64-machines [resolving-error-on-32bit]: https://github.com/moka-rs/moka#compile-errors-on-some-32-bit-platforms +[gh-issue-0155]: https://github.com/moka-rs/moka/issues/155/ [gh-issue-0123]: https://github.com/moka-rs/moka/issues/123/ [gh-issue-0119]: https://github.com/moka-rs/moka/issues/119/ [gh-issue-0107]: https://github.com/moka-rs/moka/issues/107/ From 2d0b31c245c7ad8488620a2312880c434f41623b Mon Sep 17 00:00:00 2001 From: Tatsuya Kawano Date: Sat, 25 Jun 2022 11:04:43 +0000 Subject: [PATCH 05/11] Update the change log --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f25d90d1..65691540 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,7 +5,7 @@ ### Fixed - Fix a bug caused `invalidate_all` will not invalidate entries inserted just - before calling it. ([#155](gh-issue-0155)) + before calling it. ([#155][gh-issue-0155]) ## Version 0.8.5 From 26b0e963d6ec54e470f6e603052a6c337b8d4307 Mon Sep 17 00:00:00 2001 From: Tatsuya Kawano Date: Sat, 25 Jun 2022 11:05:17 +0000 Subject: [PATCH 06/11] Update entry's timestamps immediately after insertion Remove unused statements. --- src/dash/base_cache.rs | 2 -- src/sync_base/base_cache.rs | 2 -- 2 files changed, 4 deletions(-) diff --git a/src/dash/base_cache.rs b/src/dash/base_cache.rs index 6e7b7fdf..a2adb903 100644 --- a/src/dash/base_cache.rs +++ b/src/dash/base_cache.rs @@ -813,8 +813,6 @@ where freq: &FrequencySketch, counters: &mut EvictionCounters, ) { - // entry.set_last_accessed(timestamp); - // entry.set_last_modified(timestamp); entry.set_dirty(false); if entry.is_admitted() { diff --git a/src/sync_base/base_cache.rs b/src/sync_base/base_cache.rs index 4a02d064..9ac3dd4b 100644 --- a/src/sync_base/base_cache.rs +++ b/src/sync_base/base_cache.rs @@ -994,8 +994,6 @@ where freq: &FrequencySketch, counters: &mut EvictionCounters, ) { - // entry.set_last_accessed(timestamp); - // entry.set_last_modified(timestamp); entry.set_dirty(false); if entry.is_admitted() { From 264f99fa52f44b075f9d539d761bbb31db6faa22 Mon Sep 17 00:00:00 2001 From: Tatsuya Kawano Date: Sat, 25 Jun 2022 11:55:23 +0000 Subject: [PATCH 07/11] Update entry's timestamps immediately after insertion Remove some source code comments. --- src/future/cache.rs | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/future/cache.rs b/src/future/cache.rs index d7878846..5be95c7e 100644 --- a/src/future/cache.rs +++ b/src/future/cache.rs @@ -1404,11 +1404,6 @@ mod tests { cache.insert(0, 1).await; assert_eq!(cache.get(&0), Some(1)); - // use moka::sync::ConcurrentCacheExt; - // cache.sync(); - - // Timer::after(Duration::from_millis(520)).await; - cache.invalidate_all(); assert_eq!(cache.get(&0), None); } From 01f75176f0c396b3b4f45956bf401bcc8d488c31 Mon Sep 17 00:00:00 2001 From: Tatsuya Kawano Date: Sun, 26 Jun 2022 09:37:26 +0000 Subject: [PATCH 08/11] Update entry's timestamps immediately after insertion - Update the change log. - Add doc comments to some of the fields of `EntryInfo`. --- CHANGELOG.md | 9 +++++++-- src/common/concurrent/entry_info.rs | 6 ++++++ 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 65691540..8b4dc2ce 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,8 +4,13 @@ ### Fixed -- Fix a bug caused `invalidate_all` will not invalidate entries inserted just - before calling it. ([#155][gh-issue-0155]) +- Fix a bug caused `invalidate_all` and `invalidate_entries_if` of the following + caches will not invalidate entries inserted just before calling them + ([#155][gh-issue-0155]): + - `sync::Cache` + - `sync::SegmentedCache` + - `future::Cache` + - Experimental `dash::Cache` ## Version 0.8.5 diff --git a/src/common/concurrent/entry_info.rs b/src/common/concurrent/entry_info.rs index 9de7b0f5..61c4e390 100644 --- a/src/common/concurrent/entry_info.rs +++ b/src/common/concurrent/entry_info.rs @@ -4,7 +4,13 @@ use super::AccessTime; use crate::common::{concurrent::atomic_time::AtomicInstant, time::Instant}; pub(crate) struct EntryInfo { + /// `is_admitted` indicates that the entry has been admitted to the + /// cache. When `false`, it means the entry is _temporary_ admitted to + /// the cache or evicted from the cache (so it should not have LRU nodes). is_admitted: AtomicBool, + /// `is_dirty` indicates that the entry has been inserted (or updated) + /// in the hash table, but the history of the insertion has not yet + /// been applied to the LRU deques and LFU estimator. is_dirty: AtomicBool, last_accessed: AtomicInstant, last_modified: AtomicInstant, From fefc0d4572eb7d104609e44b3bc2fc0b921705b5 Mon Sep 17 00:00:00 2001 From: Tatsuya Kawano Date: Sun, 26 Jun 2022 09:49:29 +0000 Subject: [PATCH 09/11] rustfmt --- src/common/concurrent/entry_info.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/common/concurrent/entry_info.rs b/src/common/concurrent/entry_info.rs index 61c4e390..6fe0a54d 100644 --- a/src/common/concurrent/entry_info.rs +++ b/src/common/concurrent/entry_info.rs @@ -9,7 +9,7 @@ pub(crate) struct EntryInfo { /// the cache or evicted from the cache (so it should not have LRU nodes). is_admitted: AtomicBool, /// `is_dirty` indicates that the entry has been inserted (or updated) - /// in the hash table, but the history of the insertion has not yet + /// in the hash table, but the history of the insertion has not yet /// been applied to the LRU deques and LFU estimator. is_dirty: AtomicBool, last_accessed: AtomicInstant, From 518c8091afd572f466578cf2091b32d09343bdf7 Mon Sep 17 00:00:00 2001 From: Tatsuya Kawano Date: Sun, 26 Jun 2022 13:09:31 +0000 Subject: [PATCH 10/11] Update entry's timestamps immediately after insertion - Update internal `evict_lru_entries` and `submit_invalidation_task` methods to check if entry is dirty. - Add some TODO comments about skipping dirty entries to internal `remove_expired_ao` and `remove_expired_wo` methods . --- src/common/concurrent.rs | 6 +++++- src/dash/base_cache.rs | 13 ++++++++++--- src/sync_base/base_cache.rs | 25 +++++++++++++++++-------- 3 files changed, 32 insertions(+), 12 deletions(-) diff --git a/src/common/concurrent.rs b/src/common/concurrent.rs index f86cfb95..49ad8911 100644 --- a/src/common/concurrent.rs +++ b/src/common/concurrent.rs @@ -77,10 +77,14 @@ impl KeyDate { &self.key } - // #[cfg(any(feature = "sync", feature = "future"))] pub(crate) fn last_modified(&self) -> Option { self.entry_info.last_modified() } + + #[cfg(any(feature = "sync", feature = "future"))] + pub(crate) fn is_dirty(&self) -> bool { + self.entry_info.is_dirty() + } } pub(crate) struct KeyHashDate { diff --git a/src/dash/base_cache.rs b/src/dash/base_cache.rs index a2adb903..eb4ea84f 100644 --- a/src/dash/base_cache.rs +++ b/src/dash/base_cache.rs @@ -1059,6 +1059,7 @@ where for _ in 0..batch_size { // Peek the front node of the deque and check if it is expired. let key = deq.peek_front().and_then(|node| { + // TODO: Skip the entry if it is dirty. See `evict_lru_entries` method as an example. if is_expired_entry_ao(tti, va, &*node, now) { Some(Arc::clone(node.element.key())) } else { @@ -1131,6 +1132,7 @@ where let va = &self.valid_after(); for _ in 0..batch_size { let key = deqs.write_order.peek_front().and_then(|node| { + // TODO: Skip the entry if it is dirty. See `evict_lru_entries` method as an example. if is_expired_entry_wo(ttl, va, &*node, now) { Some(Arc::clone(node.element.key())) } else { @@ -1188,15 +1190,20 @@ where } let maybe_key_and_ts = deq.peek_front().map(|node| { + let entry_info = node.element.entry_info(); ( Arc::clone(node.element.key()), - node.element.entry_info().last_modified(), + entry_info.is_dirty(), + entry_info.last_modified(), ) }); let (key, ts) = match maybe_key_and_ts { - Some((key, Some(ts))) => (key, ts), - Some((key, None)) => { + Some((key, false, Some(ts))) => (key, ts), + // TODO: Remove the second pattern `Some((_key, false, None))` once we change + // `last_modified` and `last_accessed` in `EntryInfo` from `Option` to + // `Instant`. + Some((key, true, _)) | Some((key, false, None)) => { if self.try_skip_updated_entry(&key, DEQ_NAME, deq, write_order_deq) { continue; } else { diff --git a/src/sync_base/base_cache.rs b/src/sync_base/base_cache.rs index 9ac3dd4b..6282150a 100644 --- a/src/sync_base/base_cache.rs +++ b/src/sync_base/base_cache.rs @@ -1242,6 +1242,7 @@ where for _ in 0..batch_size { // Peek the front node of the deque and check if it is expired. let key_hash = deq.peek_front().and_then(|node| { + // TODO: Skip the entry if it is dirty. See `evict_lru_entries` method as an example. if is_expired_entry_ao(tti, va, &*node, now) { Some((Arc::clone(node.element.key()), node.element.hash())) } else { @@ -1315,6 +1316,7 @@ where let va = &self.valid_after(); for _ in 0..batch_size { let key = deqs.write_order.peek_front().and_then(|node| { + // TODO: Skip the entry if it is dirty. See `evict_lru_entries` method as an example. if is_expired_entry_wo(ttl, va, &*node, now) { Some(Arc::clone(node.element.key())) } else { @@ -1408,11 +1410,13 @@ where while len < batch_size { if let Some(kd) = iter.next() { - if let Some(ts) = kd.last_modified() { - let key = kd.key(); - let hash = self.hash(key); - candidates.push(KeyDateLite::new(key, hash, ts)); - len += 1; + if !kd.is_dirty() { + if let Some(ts) = kd.last_modified() { + let key = kd.key(); + let hash = self.hash(key); + candidates.push(KeyDateLite::new(key, hash, ts)); + len += 1; + } } } else { break; @@ -1442,16 +1446,21 @@ where } let maybe_key_hash_ts = deq.peek_front().map(|node| { + let entry_info = node.element.entry_info(); ( Arc::clone(node.element.key()), node.element.hash(), - node.element.entry_info().last_modified(), + entry_info.is_dirty(), + entry_info.last_modified(), ) }); let (key, hash, ts) = match maybe_key_hash_ts { - Some((key, hash, Some(ts))) => (key, hash, ts), - Some((key, hash, None)) => { + Some((key, hash, false, Some(ts))) => (key, hash, ts), + // TODO: Remove the second pattern `Some((_key, false, None))` once we change + // `last_modified` and `last_accessed` in `EntryInfo` from `Option` to + // `Instant`. + Some((key, hash, true, _)) | Some((key, hash, false, None)) => { if self.try_skip_updated_entry(&key, hash, DEQ_NAME, deq, write_order_deq) { continue; } else { From 029e1686ac8189e8443af10945b2e8508b968d2e Mon Sep 17 00:00:00 2001 From: Tatsuya Kawano Date: Sun, 26 Jun 2022 13:26:00 +0000 Subject: [PATCH 11/11] Update entry's timestamps immediately after insertion Fix compile errors with `KeyDate::is_dirty` method. --- src/common/concurrent.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/common/concurrent.rs b/src/common/concurrent.rs index 49ad8911..e1001818 100644 --- a/src/common/concurrent.rs +++ b/src/common/concurrent.rs @@ -77,11 +77,12 @@ impl KeyDate { &self.key } + // #[cfg(any(feature = "sync", feature = "future"))] pub(crate) fn last_modified(&self) -> Option { self.entry_info.last_modified() } - #[cfg(any(feature = "sync", feature = "future"))] + // #[cfg(any(feature = "sync", feature = "future"))] pub(crate) fn is_dirty(&self) -> bool { self.entry_info.is_dirty() }