Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Executor cache count primer (backport #22333) #22374

Merged
merged 1 commit into from Jan 8, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
157 changes: 129 additions & 28 deletions runtime/src/bank.rs
Expand Up @@ -69,6 +69,7 @@ use {
dashmap::DashMap,
itertools::Itertools,
log::*,
rand::Rng,
rayon::{
iter::{IntoParallelIterator, IntoParallelRefIterator, ParallelIterator},
ThreadPool, ThreadPoolBuilder,
Expand Down Expand Up @@ -132,7 +133,7 @@ use {
collections::{HashMap, HashSet},
convert::{TryFrom, TryInto},
fmt, mem,
ops::RangeInclusive,
ops::{Div, RangeInclusive},
path::PathBuf,
ptr,
rc::Rc,
Expand Down Expand Up @@ -360,7 +361,7 @@ struct CachedExecutorsEntry {
struct CachedExecutors {
max: usize,
current_epoch: Epoch,
executors: HashMap<Pubkey, CachedExecutorsEntry>,
pub(self) executors: HashMap<Pubkey, CachedExecutorsEntry>,
stats: executor_cache::Stats,
}
impl Default for CachedExecutors {
Expand Down Expand Up @@ -449,29 +450,32 @@ impl CachedExecutors {
entry
} else {
saturating_add_assign!(self.stats.misses, 1);
if self.executors.len() >= self.max {
let mut least = u64::MAX;
let default_key = Pubkey::default();
let mut least_key = &default_key;

for (key, entry) in self.executors.iter() {
let mut counts = self
.executors
.iter()
.map(|(key, entry)| {
let count = entry.prev_epoch_count + entry.epoch_count.load(Relaxed);
if count < least {
least = count;
least_key = key;
}
(key, count)
})
.collect::<Vec<_>>();
counts.sort_unstable_by_key(|(_, count)| *count);

let primer_count = Self::get_primer_count(counts.as_slice());

if self.executors.len() >= self.max {
if let Some(least_key) = counts.first().map(|least| *least.0) {
let _ = self.executors.remove(&least_key);
self.stats
.evictions
.entry(least_key)
.and_modify(|c| saturating_add_assign!(*c, 1))
.or_insert(1);
}
let least_key = *least_key;
let _ = self.executors.remove(&least_key);
self.stats
.evictions
.entry(least_key)
.and_modify(|c| saturating_add_assign!(*c, 1))
.or_insert(1);
}

CachedExecutorsEntry {
prev_epoch_count: 0,
epoch_count: AtomicU64::new(0),
epoch_count: AtomicU64::new(primer_count),
executor,
}
};
Expand All @@ -481,6 +485,38 @@ impl CachedExecutors {
fn remove(&mut self, pubkey: &Pubkey) {
let _ = self.executors.remove(pubkey);
}

fn get_primer_count_upper_bound_inclusive(counts: &[(&Pubkey, u64)]) -> u64 {
const PRIMER_COUNT_TARGET_PERCENTILE: u64 = 85;
#[allow(clippy::assertions_on_constants)]
{
assert!(PRIMER_COUNT_TARGET_PERCENTILE <= 100);
}
// Executor use-frequencies are assumed to fit a Pareto distribution. Choose an
// upper-bound for our primer count as the actual count at the target rank to avoid
// an upward bias

let target_index = u64::try_from(counts.len().saturating_sub(1))
.ok()
.and_then(|counts| {
let index = counts
.saturating_mul(PRIMER_COUNT_TARGET_PERCENTILE)
.div(100); // switch to u64::saturating_div once stable
usize::try_from(index).ok()
})
.unwrap_or(0);

counts
.get(target_index)
.map(|(_, count)| *count)
.unwrap_or(0)
}

fn get_primer_count(counts: &[(&Pubkey, u64)]) -> u64 {
let max_primer_count = Self::get_primer_count_upper_bound_inclusive(counts);
let mut rng = rand::thread_rng();
rng.gen_range(0, max_primer_count.saturating_add(1))
}
}

pub struct TxComputeMeter {
Expand Down Expand Up @@ -11899,19 +11935,25 @@ pub(crate) mod tests {
assert!(cache.get(&key1).is_some());
assert!(cache.get(&key2).is_some());
cache.put(&key4, executor.clone());
assert!(cache.get(&key1).is_some());
assert!(cache.get(&key2).is_some());
assert!(cache.get(&key3).is_none());
assert!(cache.get(&key4).is_some());
let num_retained = [&key1, &key2, &key3]
.iter()
.map(|key| cache.get(key))
.flatten()
.count();
assert_eq!(num_retained, 2);

assert!(cache.get(&key4).is_some());
assert!(cache.get(&key4).is_some());
assert!(cache.get(&key4).is_some());
cache.put(&key3, executor.clone());
assert!(cache.get(&key1).is_some());
assert!(cache.get(&key2).is_none());
assert!(cache.get(&key3).is_some());
assert!(cache.get(&key4).is_some());
let num_retained = [&key1, &key2, &key4]
.iter()
.map(|key| cache.get(key))
.flatten()
.count();
assert_eq!(num_retained, 2);
}

#[test]
Expand Down Expand Up @@ -11940,12 +11982,23 @@ pub(crate) mod tests {
Arc::make_mut(&mut cache).put(&key4, executor.clone());

assert!(cache.get(&key4).is_some());
assert!(cache.get(&key3).is_none());
let num_retained = [&key1, &key2, &key3]
.iter()
.map(|key| cache.get(key))
.flatten()
.count();
assert_eq!(num_retained, 2);

Arc::make_mut(&mut cache).put(&key1, executor.clone());
Arc::make_mut(&mut cache).put(&key3, executor.clone());
assert!(cache.get(&key1).is_some());
assert!(cache.get(&key4).is_none());
assert!(cache.get(&key3).is_some());
let num_retained = [&key2, &key4]
.iter()
.map(|key| cache.get(key))
.flatten()
.count();
assert_eq!(num_retained, 1);

cache = cache.clone_with_epoch(2);
assert!(cache.current_epoch == 2);
Expand All @@ -11954,6 +12007,35 @@ pub(crate) mod tests {
assert!(cache.get(&key3).is_some());
}

#[test]
fn test_cached_executors_evicts_smallest() {
let key1 = solana_sdk::pubkey::new_rand();
let key2 = solana_sdk::pubkey::new_rand();
let key3 = solana_sdk::pubkey::new_rand();
let executor: Arc<dyn Executor> = Arc::new(TestExecutor {});
let mut cache = CachedExecutors::new(2, 0);

cache.put(&key1, executor.clone());
for _ in 0..5 {
let _ = cache.get(&key1);
}
cache.put(&key2, executor.clone());
// make key1's use-count for sure greater than key2's
let _ = cache.get(&key1);

let mut entries = cache
.executors
.iter()
.map(|(k, v)| (*k, v.epoch_count.load(Relaxed)))
.collect::<Vec<_>>();
entries.sort_by_key(|(_, v)| *v);
assert!(entries[0].1 < entries[1].1);

cache.put(&key3, executor.clone());
assert!(cache.get(&entries[0].0).is_none());
assert!(cache.get(&entries[1].0).is_some());
}

#[test]
fn test_bank_executor_cache() {
solana_logger::setup();
Expand Down Expand Up @@ -14173,4 +14255,23 @@ pub(crate) mod tests {
Some(Vec::<TransactionLogInfo>::new()),
);
}
#[test]
fn test_executor_cache_get_primer_count_upper_bound_inclusive() {
let pubkey = Pubkey::default();
let v = [];
assert_eq!(
CachedExecutors::get_primer_count_upper_bound_inclusive(&v),
0
);
let v = [(&pubkey, 1)];
assert_eq!(
CachedExecutors::get_primer_count_upper_bound_inclusive(&v),
1
);
let v = (0u64..10).map(|i| (&pubkey, i)).collect::<Vec<_>>();
assert_eq!(
CachedExecutors::get_primer_count_upper_bound_inclusive(v.as_slice()),
7
);
}
}