diff --git a/CHANGELOG.md b/CHANGELOG.md index 38d892bd..8b4dc2ce 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,18 @@ # Moka Cache — Change Log +## Version 0.8.6 + +### Fixed + +- Fix a bug caused `invalidate_all` and `invalidate_entries_if` of the following + caches will not invalidate entries inserted just before calling them + ([#155][gh-issue-0155]): + - `sync::Cache` + - `sync::SegmentedCache` + - `future::Cache` + - Experimental `dash::Cache` + + ## Version 0.8.5 ### Added @@ -368,6 +381,7 @@ The minimum supported Rust version (MSRV) is now 1.51.0 (2021-03-25). [panic_in_quanta]: https://github.com/moka-rs/moka#integer-overflow-in-quanta-crate-on-some-x86_64-machines [resolving-error-on-32bit]: https://github.com/moka-rs/moka#compile-errors-on-some-32-bit-platforms +[gh-issue-0155]: https://github.com/moka-rs/moka/issues/155/ [gh-issue-0123]: https://github.com/moka-rs/moka/issues/123/ [gh-issue-0119]: https://github.com/moka-rs/moka/issues/119/ [gh-issue-0107]: https://github.com/moka-rs/moka/issues/107/ diff --git a/src/common/concurrent.rs b/src/common/concurrent.rs index 2deefc8e..e1001818 100644 --- a/src/common/concurrent.rs +++ b/src/common/concurrent.rs @@ -81,6 +81,11 @@ impl KeyDate { pub(crate) fn last_modified(&self) -> Option { self.entry_info.last_modified() } + + // #[cfg(any(feature = "sync", feature = "future"))] + pub(crate) fn is_dirty(&self) -> bool { + self.entry_info.is_dirty() + } } pub(crate) struct KeyHashDate { @@ -212,10 +217,6 @@ impl ValueEntry { write_order_q_node: other_nodes.write_order_q_node, } }; - // To prevent this updated ValueEntry from being evicted by an expiration policy, - // set the max value to the timestamps. They will be replaced with the real - // timestamps when applying writes. - entry_info.reset_timestamps(); Self { value, info: entry_info, @@ -231,8 +232,16 @@ impl ValueEntry { self.info.is_admitted() } - pub(crate) fn set_is_admitted(&self, value: bool) { - self.info.set_is_admitted(value); + pub(crate) fn set_admitted(&self, value: bool) { + self.info.set_admitted(value); + } + + pub(crate) fn is_dirty(&self) -> bool { + self.info.is_dirty() + } + + pub(crate) fn set_dirty(&self, value: bool) { + self.info.set_dirty(value); } #[inline] diff --git a/src/common/concurrent/atomic_time/atomic_time.rs b/src/common/concurrent/atomic_time/atomic_time.rs index e6057573..45a85c2f 100644 --- a/src/common/concurrent/atomic_time/atomic_time.rs +++ b/src/common/concurrent/atomic_time/atomic_time.rs @@ -21,8 +21,10 @@ impl Default for AtomicInstant { // quanta v0.10.0 no longer provides `quanta::Instant::as_u64` method. impl AtomicInstant { - pub(crate) fn reset(&self) { - self.instant.store(std::u64::MAX, Ordering::Release); + pub(crate) fn new(timestamp: Instant) -> Self { + let ai = Self::default(); + ai.set_instant(timestamp); + ai } pub(crate) fn is_set(&self) -> bool { diff --git a/src/common/concurrent/atomic_time/atomic_time_compat.rs b/src/common/concurrent/atomic_time/atomic_time_compat.rs index 880eff6e..2a65cc46 100644 --- a/src/common/concurrent/atomic_time/atomic_time_compat.rs +++ b/src/common/concurrent/atomic_time/atomic_time_compat.rs @@ -15,8 +15,10 @@ impl Default for AtomicInstant { } impl AtomicInstant { - pub(crate) fn reset(&self) { - *self.instant.write() = None; + pub(crate) fn new(timestamp: Instant) -> Self { + let ai = Self::default(); + ai.set_instant(timestamp); + ai } pub(crate) fn is_set(&self) -> bool { diff --git a/src/common/concurrent/entry_info.rs b/src/common/concurrent/entry_info.rs index 25f6208e..6fe0a54d 100644 --- a/src/common/concurrent/entry_info.rs +++ b/src/common/concurrent/entry_info.rs @@ -4,7 +4,14 @@ use super::AccessTime; use crate::common::{concurrent::atomic_time::AtomicInstant, time::Instant}; pub(crate) struct EntryInfo { + /// `is_admitted` indicates that the entry has been admitted to the + /// cache. When `false`, it means the entry is _temporary_ admitted to + /// the cache or evicted from the cache (so it should not have LRU nodes). is_admitted: AtomicBool, + /// `is_dirty` indicates that the entry has been inserted (or updated) + /// in the hash table, but the history of the insertion has not yet + /// been applied to the LRU deques and LFU estimator. + is_dirty: AtomicBool, last_accessed: AtomicInstant, last_modified: AtomicInstant, policy_weight: AtomicU32, @@ -12,14 +19,15 @@ pub(crate) struct EntryInfo { impl EntryInfo { #[inline] - pub(crate) fn new(policy_weight: u32) -> Self { + pub(crate) fn new(timestamp: Instant, policy_weight: u32) -> Self { #[cfg(feature = "unstable-debug-counters")] super::debug_counters::InternalGlobalDebugCounters::entry_info_created(); Self { is_admitted: Default::default(), - last_accessed: Default::default(), - last_modified: Default::default(), + is_dirty: AtomicBool::new(true), + last_accessed: AtomicInstant::new(timestamp), + last_modified: AtomicInstant::new(timestamp), policy_weight: AtomicU32::new(policy_weight), } } @@ -30,14 +38,18 @@ impl EntryInfo { } #[inline] - pub(crate) fn set_is_admitted(&self, value: bool) { + pub(crate) fn set_admitted(&self, value: bool) { self.is_admitted.store(value, Ordering::Release); } #[inline] - pub(crate) fn reset_timestamps(&self) { - self.last_accessed.reset(); - self.last_modified.reset(); + pub(crate) fn is_dirty(&self) -> bool { + self.is_dirty.load(Ordering::Acquire) + } + + #[inline] + pub(crate) fn set_dirty(&self, value: bool) { + self.is_dirty.store(value, Ordering::Release); } #[inline] @@ -78,3 +90,35 @@ impl AccessTime for EntryInfo { self.last_modified.set_instant(timestamp); } } + +#[cfg(test)] +mod test { + use super::EntryInfo; + + // Ignore this test by default as struct size may change in the future. + // #[ignore] + #[test] + fn check_struct_size() { + use std::mem::size_of; + + // As of Rust 1.61. + let size = if cfg!(target_pointer_width = "64") { + if cfg!(feature = "quanta") { + 24 + } else { + 72 + } + } else if cfg!(target_pointer_width = "32") { + if cfg!(feature = "quanta") { + 24 + } else { + 40 + } + } else { + // ignore + return; + }; + + assert_eq!(size_of::(), size); + } +} diff --git a/src/dash/base_cache.rs b/src/dash/base_cache.rs index 3aa47eb7..eb4ea84f 100644 --- a/src/dash/base_cache.rs +++ b/src/dash/base_cache.rs @@ -258,6 +258,7 @@ where #[inline] pub(crate) fn do_insert_with_hash(&self, key: Arc, hash: u64, value: V) -> WriteOp { + let ts = self.inner.current_time_from_expiration_clock(); let weight = self.inner.weigh(&key, &value); let mut insert_op = None; let mut update_op = None; @@ -273,7 +274,7 @@ where // prevent this new ValueEntry from being evicted by an expiration policy. // 3. This method will update the policy_weight with the new weight. let old_weight = entry.policy_weight(); - *entry = self.new_value_entry_from(value.clone(), weight, entry); + *entry = self.new_value_entry_from(value.clone(), ts, weight, entry); update_op = Some(WriteOp::Upsert { key_hash: KeyHash::new(Arc::clone(&key), hash), value_entry: TrioArc::clone(entry), @@ -283,7 +284,7 @@ where }) // Insert .or_insert_with(|| { - let entry = self.new_value_entry(value.clone(), weight); + let entry = self.new_value_entry(value.clone(), ts, weight); insert_op = Some(WriteOp::Upsert { key_hash: KeyHash::new(Arc::clone(&key), hash), value_entry: TrioArc::clone(&entry), @@ -301,8 +302,13 @@ where } #[inline] - fn new_value_entry(&self, value: V, policy_weight: u32) -> TrioArc> { - let info = TrioArc::new(EntryInfo::new(policy_weight)); + fn new_value_entry( + &self, + value: V, + timestamp: Instant, + policy_weight: u32, + ) -> TrioArc> { + let info = TrioArc::new(EntryInfo::new(timestamp, policy_weight)); TrioArc::new(ValueEntry::new(value, info)) } @@ -310,10 +316,16 @@ where fn new_value_entry_from( &self, value: V, + timestamp: Instant, policy_weight: u32, other: &ValueEntry, ) -> TrioArc> { let info = TrioArc::clone(other.entry_info()); + // To prevent this updated ValueEntry from being evicted by an expiration policy, + // set the dirty flag to true. It will be reset to false when the write is applied. + info.set_dirty(true); + info.set_last_accessed(timestamp); + info.set_last_modified(timestamp); info.set_policy_weight(policy_weight); TrioArc::new(ValueEntry::new_from(value, info, other)) } @@ -773,7 +785,6 @@ where use WriteOp::*; let freq = self.frequency_sketch.read(); let ch = &self.write_op_ch; - let ts = self.current_time_from_expiration_clock(); for _ in 0..count { match ch.try_recv() { @@ -782,9 +793,7 @@ where value_entry: entry, old_weight, new_weight, - }) => { - self.handle_upsert(kh, entry, old_weight, new_weight, ts, deqs, &freq, counters) - } + }) => self.handle_upsert(kh, entry, old_weight, new_weight, deqs, &freq, counters), Ok(Remove(KvEntry { key: _key, entry })) => { Self::handle_remove(deqs, entry, counters) } @@ -800,13 +809,11 @@ where entry: TrioArc>, old_weight: u32, new_weight: u32, - timestamp: Instant, deqs: &mut Deques, freq: &FrequencySketch, counters: &mut EvictionCounters, ) { - entry.set_last_accessed(timestamp); - entry.set_last_modified(timestamp); + entry.set_dirty(false); if entry.is_admitted() { // The entry has been already admitted, so treat this as an update. @@ -971,7 +978,7 @@ where if self.is_write_order_queue_enabled() { deqs.push_back_wo(KeyDate::new(key, entry.entry_info()), entry); } - entry.set_is_admitted(true); + entry.set_admitted(true); } fn handle_remove( @@ -980,7 +987,7 @@ where counters: &mut EvictionCounters, ) { if entry.is_admitted() { - entry.set_is_admitted(false); + entry.set_admitted(false); counters.saturating_sub(1, entry.policy_weight()); // The following two unlink_* functions will unset the deq nodes. deqs.unlink_ao(&entry); @@ -998,7 +1005,7 @@ where counters: &mut EvictionCounters, ) { if entry.is_admitted() { - entry.set_is_admitted(false); + entry.set_admitted(false); counters.saturating_sub(1, entry.policy_weight()); // The following two unlink_* functions will unset the deq nodes. Deques::unlink_ao_from_deque(ao_deq_name, ao_deq, &entry); @@ -1052,6 +1059,7 @@ where for _ in 0..batch_size { // Peek the front node of the deque and check if it is expired. let key = deq.peek_front().and_then(|node| { + // TODO: Skip the entry if it is dirty. See `evict_lru_entries` method as an example. if is_expired_entry_ao(tti, va, &*node, now) { Some(Arc::clone(node.element.key())) } else { @@ -1090,7 +1098,7 @@ where write_order_deq: &mut Deque>, ) -> bool { if let Some(entry) = self.cache.get(key) { - if entry.last_accessed().is_none() { + if entry.is_dirty() { // The key exists and the entry has been updated. Deques::move_to_back_ao_in_deque(deq_name, deq, &entry); Deques::move_to_back_wo_in_deque(write_order_deq, &entry); @@ -1124,6 +1132,7 @@ where let va = &self.valid_after(); for _ in 0..batch_size { let key = deqs.write_order.peek_front().and_then(|node| { + // TODO: Skip the entry if it is dirty. See `evict_lru_entries` method as an example. if is_expired_entry_wo(ttl, va, &*node, now) { Some(Arc::clone(node.element.key())) } else { @@ -1181,15 +1190,20 @@ where } let maybe_key_and_ts = deq.peek_front().map(|node| { + let entry_info = node.element.entry_info(); ( Arc::clone(node.element.key()), - node.element.entry_info().last_modified(), + entry_info.is_dirty(), + entry_info.last_modified(), ) }); let (key, ts) = match maybe_key_and_ts { - Some((key, Some(ts))) => (key, ts), - Some((key, None)) => { + Some((key, false, Some(ts))) => (key, ts), + // TODO: Remove the second pattern `Some((_key, false, None))` once we change + // `last_modified` and `last_accessed` in `EntryInfo` from `Option` to + // `Instant`. + Some((key, true, _)) | Some((key, false, None)) => { if self.try_skip_updated_entry(&key, DEQ_NAME, deq, write_order_deq) { continue; } else { diff --git a/src/dash/cache.rs b/src/dash/cache.rs index 90c92bb2..2d7832f1 100644 --- a/src/dash/cache.rs +++ b/src/dash/cache.rs @@ -818,7 +818,10 @@ mod tests { assert!(cache.contains_key(&"a")); assert!(cache.contains_key(&"b")); assert!(cache.contains_key(&"c")); - cache.sync(); + + // `cache.sync()` is no longer needed here before invalidating. The last + // modified timestamp of the entries were updated when they were inserted. + // https://github.com/moka-rs/moka/issues/155 cache.invalidate_all(); cache.sync(); diff --git a/src/future/cache.rs b/src/future/cache.rs index 46e65cd9..5be95c7e 100644 --- a/src/future/cache.rs +++ b/src/future/cache.rs @@ -1374,7 +1374,10 @@ mod tests { assert!(cache.contains_key(&"a")); assert!(cache.contains_key(&"b")); assert!(cache.contains_key(&"c")); - cache.sync(); + + // `cache.sync()` is no longer needed here before invalidating. The last + // modified timestamp of the entries were updated when they were inserted. + // https://github.com/moka-rs/moka/issues/155 cache.invalidate_all(); cache.sync(); @@ -1392,6 +1395,19 @@ mod tests { assert!(cache.contains_key(&"d")); } + // This test is for https://github.com/moka-rs/moka/issues/155 + #[tokio::test] + async fn invalidate_all_without_sync() { + let cache = Cache::new(1024); + + assert_eq!(cache.get(&0), None); + cache.insert(0, 1).await; + assert_eq!(cache.get(&0), Some(1)); + + cache.invalidate_all(); + assert_eq!(cache.get(&0), None); + } + #[tokio::test] async fn invalidate_entries_if() -> Result<(), Box> { use std::collections::HashSet; diff --git a/src/sync/cache.rs b/src/sync/cache.rs index df8a81c4..2349c794 100644 --- a/src/sync/cache.rs +++ b/src/sync/cache.rs @@ -1194,7 +1194,10 @@ mod tests { assert!(cache.contains_key(&"a")); assert!(cache.contains_key(&"b")); assert!(cache.contains_key(&"c")); - cache.sync(); + + // `cache.sync()` is no longer needed here before invalidating. The last + // modified timestamp of the entries were updated when they were inserted. + // https://github.com/moka-rs/moka/issues/155 cache.invalidate_all(); cache.sync(); diff --git a/src/sync/segment.rs b/src/sync/segment.rs index c4417754..7d07eef6 100644 --- a/src/sync/segment.rs +++ b/src/sync/segment.rs @@ -874,7 +874,10 @@ mod tests { assert!(cache.contains_key(&"a")); assert!(cache.contains_key(&"b")); assert!(cache.contains_key(&"c")); - cache.sync(); + + // `cache.sync()` is no longer needed here before invalidating. The last + // modified timestamp of the entries were updated when they were inserted. + // https://github.com/moka-rs/moka/issues/155 cache.invalidate_all(); cache.sync(); diff --git a/src/sync_base/base_cache.rs b/src/sync_base/base_cache.rs index 1876fded..6282150a 100644 --- a/src/sync_base/base_cache.rs +++ b/src/sync_base/base_cache.rs @@ -299,6 +299,7 @@ where #[inline] pub(crate) fn do_insert_with_hash(&self, key: Arc, hash: u64, value: V) -> WriteOp { + let ts = self.inner.current_time_from_expiration_clock(); let weight = self.inner.weigh(&key, &value); let op_cnt1 = Rc::new(AtomicU8::new(0)); let op_cnt2 = Rc::clone(&op_cnt1); @@ -318,7 +319,7 @@ where hash, // on_insert || { - let entry = self.new_value_entry(value.clone(), weight); + let entry = self.new_value_entry(value.clone(), ts, weight); let cnt = op_cnt1.fetch_add(1, Ordering::Relaxed); op1 = Some(( cnt, @@ -339,7 +340,7 @@ where // prevent this new ValueEntry from being evicted by an expiration policy. // 3. This method will update the policy_weight with the new weight. let old_weight = old_entry.policy_weight(); - let entry = self.new_value_entry_from(value.clone(), weight, old_entry); + let entry = self.new_value_entry_from(value.clone(), ts, weight, old_entry); let cnt = op_cnt2.fetch_add(1, Ordering::Relaxed); op2 = Some(( cnt, @@ -374,8 +375,13 @@ where } #[inline] - fn new_value_entry(&self, value: V, policy_weight: u32) -> TrioArc> { - let info = TrioArc::new(EntryInfo::new(policy_weight)); + fn new_value_entry( + &self, + value: V, + timestamp: Instant, + policy_weight: u32, + ) -> TrioArc> { + let info = TrioArc::new(EntryInfo::new(timestamp, policy_weight)); TrioArc::new(ValueEntry::new(value, info)) } @@ -383,10 +389,16 @@ where fn new_value_entry_from( &self, value: V, + timestamp: Instant, policy_weight: u32, other: &ValueEntry, ) -> TrioArc> { let info = TrioArc::clone(other.entry_info()); + // To prevent this updated ValueEntry from being evicted by an expiration policy, + // set the dirty flag to true. It will be reset to false when the write is applied. + info.set_dirty(true); + info.set_last_accessed(timestamp); + info.set_last_modified(timestamp); info.set_policy_weight(policy_weight); TrioArc::new(ValueEntry::new_from(value, info, other)) } @@ -954,7 +966,6 @@ where use WriteOp::*; let freq = self.frequency_sketch.read(); let ch = &self.write_op_ch; - let ts = self.current_time_from_expiration_clock(); for _ in 0..count { match ch.try_recv() { @@ -963,9 +974,7 @@ where value_entry: entry, old_weight, new_weight, - }) => { - self.handle_upsert(kh, entry, old_weight, new_weight, ts, deqs, &freq, counters) - } + }) => self.handle_upsert(kh, entry, old_weight, new_weight, deqs, &freq, counters), Ok(Remove(KvEntry { key: _key, entry })) => { Self::handle_remove(deqs, entry, counters) } @@ -981,13 +990,11 @@ where entry: TrioArc>, old_weight: u32, new_weight: u32, - timestamp: Instant, deqs: &mut Deques, freq: &FrequencySketch, counters: &mut EvictionCounters, ) { - entry.set_last_accessed(timestamp); - entry.set_last_modified(timestamp); + entry.set_dirty(false); if entry.is_admitted() { // The entry has been already admitted, so treat this as an update. @@ -1154,7 +1161,7 @@ where if self.is_write_order_queue_enabled() { deqs.push_back_wo(KeyDate::new(key, entry.entry_info()), entry); } - entry.set_is_admitted(true); + entry.set_admitted(true); } fn handle_remove( @@ -1163,7 +1170,7 @@ where counters: &mut EvictionCounters, ) { if entry.is_admitted() { - entry.set_is_admitted(false); + entry.set_admitted(false); counters.saturating_sub(1, entry.policy_weight()); // The following two unlink_* functions will unset the deq nodes. deqs.unlink_ao(&entry); @@ -1181,7 +1188,7 @@ where counters: &mut EvictionCounters, ) { if entry.is_admitted() { - entry.set_is_admitted(false); + entry.set_admitted(false); counters.saturating_sub(1, entry.policy_weight()); // The following two unlink_* functions will unset the deq nodes. Deques::unlink_ao_from_deque(ao_deq_name, ao_deq, &entry); @@ -1235,6 +1242,7 @@ where for _ in 0..batch_size { // Peek the front node of the deque and check if it is expired. let key_hash = deq.peek_front().and_then(|node| { + // TODO: Skip the entry if it is dirty. See `evict_lru_entries` method as an example. if is_expired_entry_ao(tti, va, &*node, now) { Some((Arc::clone(node.element.key()), node.element.hash())) } else { @@ -1274,7 +1282,7 @@ where write_order_deq: &mut Deque>, ) -> bool { if let Some(entry) = self.cache.get(key, hash) { - if entry.last_accessed().is_none() { + if entry.is_dirty() { // The key exists and the entry has been updated. Deques::move_to_back_ao_in_deque(deq_name, deq, &entry); Deques::move_to_back_wo_in_deque(write_order_deq, &entry); @@ -1308,6 +1316,7 @@ where let va = &self.valid_after(); for _ in 0..batch_size { let key = deqs.write_order.peek_front().and_then(|node| { + // TODO: Skip the entry if it is dirty. See `evict_lru_entries` method as an example. if is_expired_entry_wo(ttl, va, &*node, now) { Some(Arc::clone(node.element.key())) } else { @@ -1401,11 +1410,13 @@ where while len < batch_size { if let Some(kd) = iter.next() { - if let Some(ts) = kd.last_modified() { - let key = kd.key(); - let hash = self.hash(key); - candidates.push(KeyDateLite::new(key, hash, ts)); - len += 1; + if !kd.is_dirty() { + if let Some(ts) = kd.last_modified() { + let key = kd.key(); + let hash = self.hash(key); + candidates.push(KeyDateLite::new(key, hash, ts)); + len += 1; + } } } else { break; @@ -1435,16 +1446,21 @@ where } let maybe_key_hash_ts = deq.peek_front().map(|node| { + let entry_info = node.element.entry_info(); ( Arc::clone(node.element.key()), node.element.hash(), - node.element.entry_info().last_modified(), + entry_info.is_dirty(), + entry_info.last_modified(), ) }); let (key, hash, ts) = match maybe_key_hash_ts { - Some((key, hash, Some(ts))) => (key, hash, ts), - Some((key, hash, None)) => { + Some((key, hash, false, Some(ts))) => (key, hash, ts), + // TODO: Remove the second pattern `Some((_key, false, None))` once we change + // `last_modified` and `last_accessed` in `EntryInfo` from `Option` to + // `Instant`. + Some((key, hash, true, _)) | Some((key, hash, false, None)) => { if self.try_skip_updated_entry(&key, hash, DEQ_NAME, deq, write_order_deq) { continue; } else {