diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index e0d5cf7f8ccb59..647a3eeac2b68f 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -56,6 +56,7 @@ use { genesis_config::ClusterType, hash::Hash, pubkey::Pubkey, + saturating_add_assign, signature::{Keypair, Signature, Signer}, timing::timestamp, transaction::Transaction, @@ -162,6 +163,10 @@ pub struct ReplayTiming { process_duplicate_slots_elapsed: u64, process_unfrozen_gossip_verified_vote_hashes_elapsed: u64, repair_correct_slots_elapsed: u64, + generate_new_bank_forks_read_lock_us: u64, + generate_new_bank_forks_get_slots_since_us: u64, + generate_new_bank_forks_loop_us: u64, + generate_new_bank_forks_write_lock_us: u64, } impl ReplayTiming { #[allow(clippy::too_many_arguments)] @@ -293,7 +298,27 @@ impl ReplayTiming { "repair_correct_slots_elapsed", self.repair_correct_slots_elapsed as i64, i64 - ) + ), + ( + "generate_new_bank_forks_read_lock_us", + self.generate_new_bank_forks_read_lock_us as i64, + i64 + ), + ( + "generate_new_bank_forks_get_slots_since_us", + self.generate_new_bank_forks_get_slots_since_us as i64, + i64 + ), + ( + "generate_new_bank_forks_loop_us", + self.generate_new_bank_forks_loop_us as i64, + i64 + ), + ( + "generate_new_bank_forks_write_lock_us", + self.generate_new_bank_forks_write_lock_us as i64, + i64 + ), ); *self = ReplayTiming::default(); @@ -404,6 +429,7 @@ impl ReplayStage { &leader_schedule_cache, &rpc_subscriptions, &mut progress, + &mut replay_timing, ); generate_new_bank_forks_time.stop(); @@ -2786,24 +2812,34 @@ impl ReplayStage { leader_schedule_cache: &Arc, rpc_subscriptions: &Arc, progress: &mut ProgressMap, + replay_timing: &mut ReplayTiming, ) { // Find the next slot that chains to the old slot + let mut generate_new_bank_forks_read_lock = + Measure::start("generate_new_bank_forks_read_lock"); let forks = bank_forks.read().unwrap(); + generate_new_bank_forks_read_lock.stop(); + let frozen_banks = forks.frozen_banks(); let frozen_bank_slots: Vec = frozen_banks .keys() .cloned() .filter(|s| *s >= forks.root()) .collect(); + let mut generate_new_bank_forks_get_slots_since = + Measure::start("generate_new_bank_forks_get_slots_since"); let next_slots = blockstore .get_slots_since(&frozen_bank_slots) .expect("Db error"); + generate_new_bank_forks_get_slots_since.stop(); + // Filter out what we've already seen trace!("generate new forks {:?}", { let mut next_slots = next_slots.iter().collect::>(); next_slots.sort(); next_slots }); + let mut generate_new_bank_forks_loop = Measure::start("generate_new_bank_forks_loop"); let mut new_banks = HashMap::new(); for (parent_slot, children) in next_slots { let parent_bank = frozen_banks @@ -2844,11 +2880,31 @@ impl ReplayStage { } } drop(forks); + generate_new_bank_forks_loop.stop(); + let mut generate_new_bank_forks_write_lock = + Measure::start("generate_new_bank_forks_write_lock"); let mut forks = bank_forks.write().unwrap(); for (_, bank) in new_banks { forks.insert(bank); } + generate_new_bank_forks_write_lock.stop(); + saturating_add_assign!( + replay_timing.generate_new_bank_forks_read_lock_us, + generate_new_bank_forks_read_lock.as_us() + ); + saturating_add_assign!( + replay_timing.generate_new_bank_forks_get_slots_since_us, + generate_new_bank_forks_get_slots_since.as_us() + ); + saturating_add_assign!( + replay_timing.generate_new_bank_forks_loop_us, + generate_new_bank_forks_loop.as_us() + ); + saturating_add_assign!( + replay_timing.generate_new_bank_forks_write_lock_us, + generate_new_bank_forks_write_lock.as_us() + ); } fn new_bank_from_parent_with_notify( @@ -3122,12 +3178,14 @@ pub mod tests { .unwrap() .get(NUM_CONSECUTIVE_LEADER_SLOTS) .is_none()); + let mut replay_timing = ReplayTiming::default(); ReplayStage::generate_new_bank_forks( &blockstore, &bank_forks, &leader_schedule_cache, &rpc_subscriptions, &mut progress, + &mut replay_timing, ); assert!(bank_forks .read() @@ -3150,6 +3208,7 @@ pub mod tests { &leader_schedule_cache, &rpc_subscriptions, &mut progress, + &mut replay_timing, ); assert!(bank_forks .read()