From 65a1106d17847b2ee78095dcfa3cec3c83fb5005 Mon Sep 17 00:00:00 2001 From: Lucas B Date: Tue, 18 Jan 2022 12:02:34 -0800 Subject: [PATCH 1/5] collect stats on packet batch indicies --- core/src/banking_stage.rs | 37 ++++++++++++++++++++++++++++++++----- 1 file changed, 32 insertions(+), 5 deletions(-) diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 23fd7336ffe1f7..72a24357ce6089 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -1,6 +1,7 @@ //! The `banking_stage` processes Transaction messages. It is intended to be used //! to contruct a software pipeline. The stage uses all available CPU cores and //! can do its processing in parallel with signature verification on the GPU. +use histogram::Histogram; use { crate::{packet_deduper::PacketDeduper, qos_service::QosService}, crossbeam_channel::{Receiver as CrossbeamReceiver, RecvTimeoutError}, @@ -95,6 +96,7 @@ pub struct BankingStageStats { current_buffered_packet_batches_count: AtomicUsize, rebuffered_packets_count: AtomicUsize, consumed_buffered_packets_count: AtomicUsize, + pub(crate) packet_batch_indices_len: Histogram, // Timing consume_buffered_packets_elapsed: AtomicU64, @@ -111,6 +113,7 @@ impl BankingStageStats { pub fn new(id: u32) -> Self { BankingStageStats { id, + packet_batch_indices_len: Histogram::configure().max_value(PACKETS_PER_BATCH as u64).build().unwrap(), ..BankingStageStats::default() } } @@ -147,9 +150,10 @@ impl BankingStageStats { .unprocessed_packet_conversion_elapsed .load(Ordering::Relaxed) + self.transaction_processing_elapsed.load(Ordering::Relaxed) + + self.packet_batch_indices_len.entries() } - fn report(&self, report_interval_ms: u64) { + fn report(&mut self, report_interval_ms: u64) { // skip repoting metrics if stats is empty if self.is_empty() { return; @@ -255,7 +259,28 @@ impl BankingStageStats { .swap(0, Ordering::Relaxed) as i64, i64 ), + ( + "packet_batch_indices_len_min", + self.packet_batch_indices_len.minimum().unwrap_or(0) as i64, + i64 + ), + ( + "packet_batch_indices_len_max", + self.packet_batch_indices_len.maximum().unwrap_or(0) as i64, + i64 + ), + ( + "packet_batch_indices_len_mean", + self.packet_batch_indices_len.mean().unwrap_or(0) as i64, + i64 + ), + ( + "packet_batch_indices_len_90pct", + self.packet_batch_indices_len.percentile(90.0).unwrap_or(0) as i64, + i64 + ) ); + self.packet_batch_indices_len.clear(); } } } @@ -734,7 +759,7 @@ impl BankingStage { let recorder = poh_recorder.lock().unwrap().recorder(); let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let mut buffered_packet_batches = VecDeque::with_capacity(batch_limit); - let banking_stage_stats = BankingStageStats::new(id); + let mut banking_stage_stats = BankingStageStats::new(id); let qos_service = QosService::new(cost_model, id); loop { let my_pubkey = cluster_info.id(); @@ -779,7 +804,7 @@ impl BankingStage { id, batch_limit, &mut buffered_packet_batches, - &banking_stage_stats, + &mut banking_stage_stats, packet_deduper, ) { Ok(()) | Err(RecvTimeoutError::Timeout) => (), @@ -1384,7 +1409,7 @@ impl BankingStage { id: u32, batch_limit: usize, buffered_packet_batches: &mut UnprocessedPacketBatches, - banking_stage_stats: &BankingStageStats, + banking_stage_stats: &mut BankingStageStats, packet_deduper: &PacketDeduper, ) -> Result<(), RecvTimeoutError> { let mut recv_time = Measure::start("receive_and_buffer_packets_recv"); @@ -1468,7 +1493,7 @@ impl BankingStage { newly_buffered_packets_count: &mut usize, batch_limit: usize, packet_deduper: &PacketDeduper, - banking_stage_stats: &BankingStageStats, + banking_stage_stats: &mut BankingStageStats, ) { packet_deduper.dedupe_packets(&packet_batch, &mut packet_indexes, banking_stage_stats); if Self::packet_has_more_unprocessed_transactions(&packet_indexes) { @@ -1478,6 +1503,8 @@ impl BankingStage { *dropped_packets_count += dropped_batch.1.len(); } } + let _ = banking_stage_stats.packet_batch_indices_len.increment(packet_indexes.len() as u64); + *newly_buffered_packets_count += packet_indexes.len(); unprocessed_packet_batches.push_back((packet_batch, packet_indexes, false)); } From 50e5268631e7504422edee278870a76b3ef9c14f Mon Sep 17 00:00:00 2001 From: Lucas B Date: Tue, 18 Jan 2022 12:04:12 -0800 Subject: [PATCH 2/5] cleanup --- core/src/banking_stage.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 72a24357ce6089..a91303fdfbdb4d 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -1,10 +1,10 @@ //! The `banking_stage` processes Transaction messages. It is intended to be used //! to contruct a software pipeline. The stage uses all available CPU cores and //! can do its processing in parallel with signature verification on the GPU. -use histogram::Histogram; use { crate::{packet_deduper::PacketDeduper, qos_service::QosService}, crossbeam_channel::{Receiver as CrossbeamReceiver, RecvTimeoutError}, + histogram::Histogram, itertools::Itertools, retain_mut::RetainMut, solana_entry::entry::hash_transactions, @@ -96,7 +96,7 @@ pub struct BankingStageStats { current_buffered_packet_batches_count: AtomicUsize, rebuffered_packets_count: AtomicUsize, consumed_buffered_packets_count: AtomicUsize, - pub(crate) packet_batch_indices_len: Histogram, + packet_batch_indices_len: Histogram, // Timing consume_buffered_packets_elapsed: AtomicU64, From 4b827d2bc3b54c2ebeeb11bc933ff059912642f4 Mon Sep 17 00:00:00 2001 From: Lucas B Date: Tue, 18 Jan 2022 14:02:25 -0800 Subject: [PATCH 3/5] cleanup --- core/src/banking_stage.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index a91303fdfbdb4d..a463c8348518dd 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -3236,7 +3236,7 @@ mod tests { let mut dropped_packet_batches_count = 0; let mut dropped_packets_count = 0; let mut newly_buffered_packets_count = 0; - let banking_stage_stats = BankingStageStats::default(); + let mut banking_stage_stats = BankingStageStats::default(); // Because the set of unprocessed `packet_indexes` is empty, the // packets are not added to the unprocessed queue BankingStage::push_unprocessed( @@ -3248,7 +3248,7 @@ mod tests { &mut newly_buffered_packets_count, batch_limit, &packet_deduper, - &banking_stage_stats, + &mut banking_stage_stats, ); assert_eq!(unprocessed_packets.len(), 1); assert_eq!(dropped_packet_batches_count, 0); @@ -3267,7 +3267,7 @@ mod tests { &mut newly_buffered_packets_count, batch_limit, &packet_deduper, - &banking_stage_stats, + &mut banking_stage_stats, ); assert_eq!(unprocessed_packets.len(), 2); assert_eq!(dropped_packet_batches_count, 0); @@ -3291,7 +3291,7 @@ mod tests { &mut newly_buffered_packets_count, batch_limit, &packet_deduper, - &banking_stage_stats, + &mut banking_stage_stats, ); assert_eq!(unprocessed_packets.len(), 2); assert_eq!( @@ -3312,7 +3312,7 @@ mod tests { &mut newly_buffered_packets_count, 3, &packet_deduper, - &banking_stage_stats, + &mut banking_stage_stats, ); assert_eq!(unprocessed_packets.len(), 2); assert_eq!( From 3369d6d9cd3f23e1348edb16ed61f753462e7ea7 Mon Sep 17 00:00:00 2001 From: Lucas B Date: Tue, 18 Jan 2022 15:08:23 -0800 Subject: [PATCH 4/5] cleanup --- core/src/banking_stage.rs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index a463c8348518dd..cc33e0bc99e350 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -113,7 +113,10 @@ impl BankingStageStats { pub fn new(id: u32) -> Self { BankingStageStats { id, - packet_batch_indices_len: Histogram::configure().max_value(PACKETS_PER_BATCH as u64).build().unwrap(), + packet_batch_indices_len: Histogram::configure() + .max_value(PACKETS_PER_BATCH as u64) + .build() + .unwrap(), ..BankingStageStats::default() } } @@ -1503,7 +1506,9 @@ impl BankingStage { *dropped_packets_count += dropped_batch.1.len(); } } - let _ = banking_stage_stats.packet_batch_indices_len.increment(packet_indexes.len() as u64); + let _ = banking_stage_stats + .packet_batch_indices_len + .increment(packet_indexes.len() as u64); *newly_buffered_packets_count += packet_indexes.len(); unprocessed_packet_batches.push_back((packet_batch, packet_indexes, false)); From 7d20f8934dcbe4ce71d17584163fdd32306b7839 Mon Sep 17 00:00:00 2001 From: Lucas B Date: Tue, 18 Jan 2022 22:15:37 -0800 Subject: [PATCH 5/5] change name --- core/src/banking_stage.rs | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index cc33e0bc99e350..c2e439c17ce1e4 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -96,7 +96,7 @@ pub struct BankingStageStats { current_buffered_packet_batches_count: AtomicUsize, rebuffered_packets_count: AtomicUsize, consumed_buffered_packets_count: AtomicUsize, - packet_batch_indices_len: Histogram, + batch_packet_indexes_len: Histogram, // Timing consume_buffered_packets_elapsed: AtomicU64, @@ -113,7 +113,7 @@ impl BankingStageStats { pub fn new(id: u32) -> Self { BankingStageStats { id, - packet_batch_indices_len: Histogram::configure() + batch_packet_indexes_len: Histogram::configure() .max_value(PACKETS_PER_BATCH as u64) .build() .unwrap(), @@ -153,7 +153,7 @@ impl BankingStageStats { .unprocessed_packet_conversion_elapsed .load(Ordering::Relaxed) + self.transaction_processing_elapsed.load(Ordering::Relaxed) - + self.packet_batch_indices_len.entries() + + self.batch_packet_indexes_len.entries() } fn report(&mut self, report_interval_ms: u64) { @@ -264,26 +264,26 @@ impl BankingStageStats { ), ( "packet_batch_indices_len_min", - self.packet_batch_indices_len.minimum().unwrap_or(0) as i64, + self.batch_packet_indexes_len.minimum().unwrap_or(0) as i64, i64 ), ( "packet_batch_indices_len_max", - self.packet_batch_indices_len.maximum().unwrap_or(0) as i64, + self.batch_packet_indexes_len.maximum().unwrap_or(0) as i64, i64 ), ( "packet_batch_indices_len_mean", - self.packet_batch_indices_len.mean().unwrap_or(0) as i64, + self.batch_packet_indexes_len.mean().unwrap_or(0) as i64, i64 ), ( "packet_batch_indices_len_90pct", - self.packet_batch_indices_len.percentile(90.0).unwrap_or(0) as i64, + self.batch_packet_indexes_len.percentile(90.0).unwrap_or(0) as i64, i64 ) ); - self.packet_batch_indices_len.clear(); + self.batch_packet_indexes_len.clear(); } } } @@ -1507,7 +1507,7 @@ impl BankingStage { } } let _ = banking_stage_stats - .packet_batch_indices_len + .batch_packet_indexes_len .increment(packet_indexes.len() as u64); *newly_buffered_packets_count += packet_indexes.len();