Skip to content

Commit

Permalink
Merge pull request #156 from moka-rs/update-timestamps-immediately
Browse files Browse the repository at this point in the history
Set entry's timestamps immediately after insertion
  • Loading branch information
tatsuya6502 committed Jun 27, 2022
2 parents 59542ca + 029e168 commit 8f61b35
Show file tree
Hide file tree
Showing 11 changed files with 188 additions and 62 deletions.
14 changes: 14 additions & 0 deletions CHANGELOG.md
@@ -1,5 +1,18 @@
# Moka Cache — Change Log

## Version 0.8.6

### Fixed

- Fix a bug caused `invalidate_all` and `invalidate_entries_if` of the following
caches will not invalidate entries inserted just before calling them
([#155][gh-issue-0155]):
- `sync::Cache`
- `sync::SegmentedCache`
- `future::Cache`
- Experimental `dash::Cache`


## Version 0.8.5

### Added
Expand Down Expand Up @@ -368,6 +381,7 @@ The minimum supported Rust version (MSRV) is now 1.51.0 (2021-03-25).
[panic_in_quanta]: https://github.com/moka-rs/moka#integer-overflow-in-quanta-crate-on-some-x86_64-machines
[resolving-error-on-32bit]: https://github.com/moka-rs/moka#compile-errors-on-some-32-bit-platforms

[gh-issue-0155]: https://github.com/moka-rs/moka/issues/155/
[gh-issue-0123]: https://github.com/moka-rs/moka/issues/123/
[gh-issue-0119]: https://github.com/moka-rs/moka/issues/119/
[gh-issue-0107]: https://github.com/moka-rs/moka/issues/107/
Expand Down
21 changes: 15 additions & 6 deletions src/common/concurrent.rs
Expand Up @@ -81,6 +81,11 @@ impl<K> KeyDate<K> {
pub(crate) fn last_modified(&self) -> Option<Instant> {
self.entry_info.last_modified()
}

// #[cfg(any(feature = "sync", feature = "future"))]
pub(crate) fn is_dirty(&self) -> bool {
self.entry_info.is_dirty()
}
}

pub(crate) struct KeyHashDate<K> {
Expand Down Expand Up @@ -212,10 +217,6 @@ impl<K, V> ValueEntry<K, V> {
write_order_q_node: other_nodes.write_order_q_node,
}
};
// To prevent this updated ValueEntry from being evicted by an expiration policy,
// set the max value to the timestamps. They will be replaced with the real
// timestamps when applying writes.
entry_info.reset_timestamps();
Self {
value,
info: entry_info,
Expand All @@ -231,8 +232,16 @@ impl<K, V> ValueEntry<K, V> {
self.info.is_admitted()
}

pub(crate) fn set_is_admitted(&self, value: bool) {
self.info.set_is_admitted(value);
pub(crate) fn set_admitted(&self, value: bool) {
self.info.set_admitted(value);
}

pub(crate) fn is_dirty(&self) -> bool {
self.info.is_dirty()
}

pub(crate) fn set_dirty(&self, value: bool) {
self.info.set_dirty(value);
}

#[inline]
Expand Down
6 changes: 4 additions & 2 deletions src/common/concurrent/atomic_time/atomic_time.rs
Expand Up @@ -21,8 +21,10 @@ impl Default for AtomicInstant {
// quanta v0.10.0 no longer provides `quanta::Instant::as_u64` method.

impl AtomicInstant {
pub(crate) fn reset(&self) {
self.instant.store(std::u64::MAX, Ordering::Release);
pub(crate) fn new(timestamp: Instant) -> Self {
let ai = Self::default();
ai.set_instant(timestamp);
ai
}

pub(crate) fn is_set(&self) -> bool {
Expand Down
6 changes: 4 additions & 2 deletions src/common/concurrent/atomic_time/atomic_time_compat.rs
Expand Up @@ -15,8 +15,10 @@ impl Default for AtomicInstant {
}

impl AtomicInstant {
pub(crate) fn reset(&self) {
*self.instant.write() = None;
pub(crate) fn new(timestamp: Instant) -> Self {
let ai = Self::default();
ai.set_instant(timestamp);
ai
}

pub(crate) fn is_set(&self) -> bool {
Expand Down
58 changes: 51 additions & 7 deletions src/common/concurrent/entry_info.rs
Expand Up @@ -4,22 +4,30 @@ use super::AccessTime;
use crate::common::{concurrent::atomic_time::AtomicInstant, time::Instant};

pub(crate) struct EntryInfo {
/// `is_admitted` indicates that the entry has been admitted to the
/// cache. When `false`, it means the entry is _temporary_ admitted to
/// the cache or evicted from the cache (so it should not have LRU nodes).
is_admitted: AtomicBool,
/// `is_dirty` indicates that the entry has been inserted (or updated)
/// in the hash table, but the history of the insertion has not yet
/// been applied to the LRU deques and LFU estimator.
is_dirty: AtomicBool,
last_accessed: AtomicInstant,
last_modified: AtomicInstant,
policy_weight: AtomicU32,
}

impl EntryInfo {
#[inline]
pub(crate) fn new(policy_weight: u32) -> Self {
pub(crate) fn new(timestamp: Instant, policy_weight: u32) -> Self {
#[cfg(feature = "unstable-debug-counters")]
super::debug_counters::InternalGlobalDebugCounters::entry_info_created();

Self {
is_admitted: Default::default(),
last_accessed: Default::default(),
last_modified: Default::default(),
is_dirty: AtomicBool::new(true),
last_accessed: AtomicInstant::new(timestamp),
last_modified: AtomicInstant::new(timestamp),
policy_weight: AtomicU32::new(policy_weight),
}
}
Expand All @@ -30,14 +38,18 @@ impl EntryInfo {
}

#[inline]
pub(crate) fn set_is_admitted(&self, value: bool) {
pub(crate) fn set_admitted(&self, value: bool) {
self.is_admitted.store(value, Ordering::Release);
}

#[inline]
pub(crate) fn reset_timestamps(&self) {
self.last_accessed.reset();
self.last_modified.reset();
pub(crate) fn is_dirty(&self) -> bool {
self.is_dirty.load(Ordering::Acquire)
}

#[inline]
pub(crate) fn set_dirty(&self, value: bool) {
self.is_dirty.store(value, Ordering::Release);
}

#[inline]
Expand Down Expand Up @@ -78,3 +90,35 @@ impl AccessTime for EntryInfo {
self.last_modified.set_instant(timestamp);
}
}

#[cfg(test)]
mod test {
use super::EntryInfo;

// Ignore this test by default as struct size may change in the future.
// #[ignore]
#[test]
fn check_struct_size() {
use std::mem::size_of;

// As of Rust 1.61.
let size = if cfg!(target_pointer_width = "64") {
if cfg!(feature = "quanta") {
24
} else {
72
}
} else if cfg!(target_pointer_width = "32") {
if cfg!(feature = "quanta") {
24
} else {
40
}
} else {
// ignore
return;
};

assert_eq!(size_of::<EntryInfo>(), size);
}
}
50 changes: 32 additions & 18 deletions src/dash/base_cache.rs
Expand Up @@ -258,6 +258,7 @@ where

#[inline]
pub(crate) fn do_insert_with_hash(&self, key: Arc<K>, hash: u64, value: V) -> WriteOp<K, V> {
let ts = self.inner.current_time_from_expiration_clock();
let weight = self.inner.weigh(&key, &value);
let mut insert_op = None;
let mut update_op = None;
Expand All @@ -273,7 +274,7 @@ where
// prevent this new ValueEntry from being evicted by an expiration policy.
// 3. This method will update the policy_weight with the new weight.
let old_weight = entry.policy_weight();
*entry = self.new_value_entry_from(value.clone(), weight, entry);
*entry = self.new_value_entry_from(value.clone(), ts, weight, entry);
update_op = Some(WriteOp::Upsert {
key_hash: KeyHash::new(Arc::clone(&key), hash),
value_entry: TrioArc::clone(entry),
Expand All @@ -283,7 +284,7 @@ where
})
// Insert
.or_insert_with(|| {
let entry = self.new_value_entry(value.clone(), weight);
let entry = self.new_value_entry(value.clone(), ts, weight);
insert_op = Some(WriteOp::Upsert {
key_hash: KeyHash::new(Arc::clone(&key), hash),
value_entry: TrioArc::clone(&entry),
Expand All @@ -301,19 +302,30 @@ where
}

#[inline]
fn new_value_entry(&self, value: V, policy_weight: u32) -> TrioArc<ValueEntry<K, V>> {
let info = TrioArc::new(EntryInfo::new(policy_weight));
fn new_value_entry(
&self,
value: V,
timestamp: Instant,
policy_weight: u32,
) -> TrioArc<ValueEntry<K, V>> {
let info = TrioArc::new(EntryInfo::new(timestamp, policy_weight));
TrioArc::new(ValueEntry::new(value, info))
}

#[inline]
fn new_value_entry_from(
&self,
value: V,
timestamp: Instant,
policy_weight: u32,
other: &ValueEntry<K, V>,
) -> TrioArc<ValueEntry<K, V>> {
let info = TrioArc::clone(other.entry_info());
// To prevent this updated ValueEntry from being evicted by an expiration policy,
// set the dirty flag to true. It will be reset to false when the write is applied.
info.set_dirty(true);
info.set_last_accessed(timestamp);
info.set_last_modified(timestamp);
info.set_policy_weight(policy_weight);
TrioArc::new(ValueEntry::new_from(value, info, other))
}
Expand Down Expand Up @@ -773,7 +785,6 @@ where
use WriteOp::*;
let freq = self.frequency_sketch.read();
let ch = &self.write_op_ch;
let ts = self.current_time_from_expiration_clock();

for _ in 0..count {
match ch.try_recv() {
Expand All @@ -782,9 +793,7 @@ where
value_entry: entry,
old_weight,
new_weight,
}) => {
self.handle_upsert(kh, entry, old_weight, new_weight, ts, deqs, &freq, counters)
}
}) => self.handle_upsert(kh, entry, old_weight, new_weight, deqs, &freq, counters),
Ok(Remove(KvEntry { key: _key, entry })) => {
Self::handle_remove(deqs, entry, counters)
}
Expand All @@ -800,13 +809,11 @@ where
entry: TrioArc<ValueEntry<K, V>>,
old_weight: u32,
new_weight: u32,
timestamp: Instant,
deqs: &mut Deques<K>,
freq: &FrequencySketch,
counters: &mut EvictionCounters,
) {
entry.set_last_accessed(timestamp);
entry.set_last_modified(timestamp);
entry.set_dirty(false);

if entry.is_admitted() {
// The entry has been already admitted, so treat this as an update.
Expand Down Expand Up @@ -971,7 +978,7 @@ where
if self.is_write_order_queue_enabled() {
deqs.push_back_wo(KeyDate::new(key, entry.entry_info()), entry);
}
entry.set_is_admitted(true);
entry.set_admitted(true);
}

fn handle_remove(
Expand All @@ -980,7 +987,7 @@ where
counters: &mut EvictionCounters,
) {
if entry.is_admitted() {
entry.set_is_admitted(false);
entry.set_admitted(false);
counters.saturating_sub(1, entry.policy_weight());
// The following two unlink_* functions will unset the deq nodes.
deqs.unlink_ao(&entry);
Expand All @@ -998,7 +1005,7 @@ where
counters: &mut EvictionCounters,
) {
if entry.is_admitted() {
entry.set_is_admitted(false);
entry.set_admitted(false);
counters.saturating_sub(1, entry.policy_weight());
// The following two unlink_* functions will unset the deq nodes.
Deques::unlink_ao_from_deque(ao_deq_name, ao_deq, &entry);
Expand Down Expand Up @@ -1052,6 +1059,7 @@ where
for _ in 0..batch_size {
// Peek the front node of the deque and check if it is expired.
let key = deq.peek_front().and_then(|node| {
// TODO: Skip the entry if it is dirty. See `evict_lru_entries` method as an example.
if is_expired_entry_ao(tti, va, &*node, now) {
Some(Arc::clone(node.element.key()))
} else {
Expand Down Expand Up @@ -1090,7 +1098,7 @@ where
write_order_deq: &mut Deque<KeyDate<K>>,
) -> bool {
if let Some(entry) = self.cache.get(key) {
if entry.last_accessed().is_none() {
if entry.is_dirty() {
// The key exists and the entry has been updated.
Deques::move_to_back_ao_in_deque(deq_name, deq, &entry);
Deques::move_to_back_wo_in_deque(write_order_deq, &entry);
Expand Down Expand Up @@ -1124,6 +1132,7 @@ where
let va = &self.valid_after();
for _ in 0..batch_size {
let key = deqs.write_order.peek_front().and_then(|node| {
// TODO: Skip the entry if it is dirty. See `evict_lru_entries` method as an example.
if is_expired_entry_wo(ttl, va, &*node, now) {
Some(Arc::clone(node.element.key()))
} else {
Expand Down Expand Up @@ -1181,15 +1190,20 @@ where
}

let maybe_key_and_ts = deq.peek_front().map(|node| {
let entry_info = node.element.entry_info();
(
Arc::clone(node.element.key()),
node.element.entry_info().last_modified(),
entry_info.is_dirty(),
entry_info.last_modified(),
)
});

let (key, ts) = match maybe_key_and_ts {
Some((key, Some(ts))) => (key, ts),
Some((key, None)) => {
Some((key, false, Some(ts))) => (key, ts),
// TODO: Remove the second pattern `Some((_key, false, None))` once we change
// `last_modified` and `last_accessed` in `EntryInfo` from `Option<Instant>` to
// `Instant`.
Some((key, true, _)) | Some((key, false, None)) => {
if self.try_skip_updated_entry(&key, DEQ_NAME, deq, write_order_deq) {
continue;
} else {
Expand Down
5 changes: 4 additions & 1 deletion src/dash/cache.rs
Expand Up @@ -818,7 +818,10 @@ mod tests {
assert!(cache.contains_key(&"a"));
assert!(cache.contains_key(&"b"));
assert!(cache.contains_key(&"c"));
cache.sync();

// `cache.sync()` is no longer needed here before invalidating. The last
// modified timestamp of the entries were updated when they were inserted.
// https://github.com/moka-rs/moka/issues/155

cache.invalidate_all();
cache.sync();
Expand Down

0 comments on commit 8f61b35

Please sign in to comment.