From d738abedd5fd2499b985662ba79bc2ac545e3c94 Mon Sep 17 00:00:00 2001 From: buffalu <85544055+buffalu@users.noreply.github.com> Date: Wed, 19 Jan 2022 00:13:07 -0800 Subject: [PATCH] Add PacketBatch packet_indexes stat (#22564) * collect stats on packet batch indicies * cleanup * cleanup * cleanup * change name (cherry picked from commit 650882217c497f8dcd52fc38007099ea4dff7238) # Conflicts: # core/src/banking_stage.rs --- core/src/banking_stage.rs | 51 +++++++++++++++++++++++++++++++-------- 1 file changed, 41 insertions(+), 10 deletions(-) diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index c37899afc4d962..4bd4f8b7f9a422 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -4,6 +4,7 @@ use { crate::packet_hasher::PacketHasher, crossbeam_channel::{Receiver as CrossbeamReceiver, RecvTimeoutError}, + histogram::Histogram, itertools::Itertools, lru::LruCache, retain_mut::RetainMut, @@ -100,6 +101,7 @@ pub struct BankingStageStats { consumed_buffered_packets_count: AtomicUsize, cost_tracker_check_count: AtomicUsize, cost_forced_retry_transactions_count: AtomicUsize, + batch_packet_indexes_len: Histogram, // Timing consume_buffered_packets_elapsed: AtomicU64, @@ -119,11 +121,15 @@ impl BankingStageStats { pub fn new(id: u32) -> Self { BankingStageStats { id, + batch_packet_indexes_len: Histogram::configure() + .max_value(PACKETS_PER_BATCH as u64) + .build() + .unwrap(), ..BankingStageStats::default() } } - fn report(&self, report_interval_ms: u64) { + fn report(&mut self, report_interval_ms: u64) { if self.last_report.should_update(report_interval_ms) { datapoint_info!( "banking_stage-loop-stats", @@ -254,7 +260,28 @@ impl BankingStageStats { self.cost_tracker_check_elapsed.swap(0, Ordering::Relaxed) as i64, i64 ), + ( + "packet_batch_indices_len_min", + self.batch_packet_indexes_len.minimum().unwrap_or(0) as i64, + i64 + ), + ( + "packet_batch_indices_len_max", + self.batch_packet_indexes_len.maximum().unwrap_or(0) as i64, + i64 + ), + ( + "packet_batch_indices_len_mean", + self.batch_packet_indexes_len.mean().unwrap_or(0) as i64, + i64 + ), + ( + "packet_batch_indices_len_90pct", + self.batch_packet_indexes_len.percentile(90.0).unwrap_or(0) as i64, + i64 + ), ); + self.batch_packet_indexes_len.clear(); } } } @@ -723,7 +750,7 @@ impl BankingStage { let recorder = poh_recorder.lock().unwrap().recorder(); let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let mut buffered_packet_batches = VecDeque::with_capacity(batch_limit); - let banking_stage_stats = BankingStageStats::new(id); + let mut banking_stage_stats = BankingStageStats::new(id); loop { while !buffered_packet_batches.is_empty() { let decision = Self::process_buffered_packets( @@ -770,7 +797,7 @@ impl BankingStage { transaction_status_sender.clone(), &gossip_vote_sender, &mut buffered_packet_batches, - &banking_stage_stats, + &mut banking_stage_stats, duplicates, &recorder, &cost_model, @@ -1428,7 +1455,7 @@ impl BankingStage { transaction_status_sender: Option, gossip_vote_sender: &ReplayVoteSender, buffered_packet_batches: &mut UnprocessedPacketBatches, - banking_stage_stats: &BankingStageStats, + banking_stage_stats: &mut BankingStageStats, duplicates: &Arc, PacketHasher)>>, recorder: &TransactionRecorder, cost_model: &Arc>, @@ -1589,7 +1616,7 @@ impl BankingStage { newly_buffered_packets_count: &mut usize, batch_limit: usize, duplicates: &Arc, PacketHasher)>>, - banking_stage_stats: &BankingStageStats, + banking_stage_stats: &mut BankingStageStats, ) { { let original_packets_count = packet_indexes.len(); @@ -1624,6 +1651,10 @@ impl BankingStage { *dropped_packets_count += dropped_batch.1.len(); } } + let _ = banking_stage_stats + .batch_packet_indexes_len + .increment(packet_indexes.len() as u64); + *newly_buffered_packets_count += packet_indexes.len(); unprocessed_packet_batches.push_back((packet_batch, packet_indexes, false)); } @@ -3197,7 +3228,7 @@ mod tests { let mut dropped_packet_batches_count = 0; let mut dropped_packets_count = 0; let mut newly_buffered_packets_count = 0; - let banking_stage_stats = BankingStageStats::default(); + let mut banking_stage_stats = BankingStageStats::default(); // Because the set of unprocessed `packet_indexes` is empty, the // packets are not added to the unprocessed queue BankingStage::push_unprocessed( @@ -3209,7 +3240,7 @@ mod tests { &mut newly_buffered_packets_count, batch_limit, &duplicates, - &banking_stage_stats, + &mut banking_stage_stats, ); assert_eq!(unprocessed_packets.len(), 1); assert_eq!(dropped_packet_batches_count, 0); @@ -3228,7 +3259,7 @@ mod tests { &mut newly_buffered_packets_count, batch_limit, &duplicates, - &banking_stage_stats, + &mut banking_stage_stats, ); assert_eq!(unprocessed_packets.len(), 2); assert_eq!(dropped_packet_batches_count, 0); @@ -3252,7 +3283,7 @@ mod tests { &mut newly_buffered_packets_count, batch_limit, &duplicates, - &banking_stage_stats, + &mut banking_stage_stats, ); assert_eq!(unprocessed_packets.len(), 2); assert_eq!( @@ -3273,7 +3304,7 @@ mod tests { &mut newly_buffered_packets_count, 3, &duplicates, - &banking_stage_stats, + &mut banking_stage_stats, ); assert_eq!(unprocessed_packets.len(), 2); assert_eq!(