From edf1954817ac22780d9f7e4dc756c94d0a6829c4 Mon Sep 17 00:00:00 2001 From: anatoly yakovenko Date: Fri, 21 Jan 2022 08:19:55 -0800 Subject: [PATCH] Faster dedup v1.8 (#22619) * Faster dedup --- Cargo.lock | 31 +++++--- core/src/sigverify_stage.rs | 33 +++----- perf/Cargo.toml | 1 + perf/benches/dedup.rs | 54 +++++++++---- perf/src/sigverify.rs | 149 +++++++++++++++++++++++++++--------- programs/bpf/Cargo.lock | 22 ++++-- 6 files changed, 197 insertions(+), 93 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 601f97ba8c7e51..ef7ec7dab7fc87 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -39,18 +39,18 @@ version = "0.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "865f8b0b3fced577b7df82e9b0eb7609595d7209c0b39e78d0646672e244b1b1" dependencies = [ - "getrandom 0.2.0", + "getrandom 0.2.4", "lazy_static", "version_check 0.9.2", ] [[package]] name = "ahash" -version = "0.7.4" +version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43bb833f0bf979d8475d38fbf09ed3b8a55e1885fe93ad3f93239fc6a4f17b98" +checksum = "fcb51a0695d8f838b1ee009b3fbf66bda078cd64590202a864a8f3e8c4315c47" dependencies = [ - "getrandom 0.2.0", + "getrandom 0.2.4", "once_cell", "version_check 0.9.2", ] @@ -197,7 +197,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9fe17f59a06fe8b87a6fc8bf53bb70b3aba76d7685f432487a68cd5552853625" dependencies = [ "futures-core", - "getrandom 0.2.0", + "getrandom 0.2.4", "instant", "pin-project 1.0.1", "rand 0.8.3", @@ -1580,18 +1580,18 @@ checksum = "7abc8dd8451921606d809ba32e95b6111925cd2906060d2dcc29c070220503eb" dependencies = [ "cfg-if 0.1.10", "libc", - "wasi", + "wasi 0.9.0+wasi-snapshot-preview1", ] [[package]] name = "getrandom" -version = "0.2.0" +version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ee8025cf36f917e6a52cce185b7c7177689b838b7ec138364e50cc2277a56cf4" +checksum = "418d37c8b1d42553c93648be529cb70f920d3baf8ef469b74b9638df426e0b4c" dependencies = [ - "cfg-if 0.1.10", + "cfg-if 1.0.0", "libc", - "wasi", + "wasi 0.10.2+wasi-snapshot-preview1", ] [[package]] @@ -1698,7 +1698,7 @@ version = "0.11.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ab5ef0d4909ef3724cc8cce6ccc8572c5c817592e9285f5464f8e86f8bd3726e" dependencies = [ - "ahash 0.7.4", + "ahash 0.7.6", ] [[package]] @@ -3389,7 +3389,7 @@ version = "0.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "34cf66eb183df1c5876e2dcf6b13d57340741e8dc255b48e40a26de954d06ae7" dependencies = [ - "getrandom 0.2.0", + "getrandom 0.2.4", ] [[package]] @@ -5254,6 +5254,7 @@ dependencies = [ name = "solana-perf" version = "1.8.14" dependencies = [ + "ahash 0.7.6", "bincode", "caps", "curve25519-dalek 2.1.0", @@ -7005,6 +7006,12 @@ version = "0.9.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cccddf32554fecc6acb585f82a32a72e28b48f8c4c1883ddfeeeaa96f7d8e519" +[[package]] +name = "wasi" +version = "0.10.2+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fd6fbd9a79829dd1ad0cc20627bf1ed606756a7f77edff7b66b7064f9cb327c6" + [[package]] name = "wasm-bindgen" version = "0.2.70" diff --git a/core/src/sigverify_stage.rs b/core/src/sigverify_stage.rs index 38b7eaeef7ccca..d45adefb877c55 100644 --- a/core/src/sigverify_stage.rs +++ b/core/src/sigverify_stage.rs @@ -7,13 +7,11 @@ use { crate::sigverify, - core::time::Duration, crossbeam_channel::{SendError, Sender as CrossbeamSender}, itertools::Itertools, - solana_bloom::bloom::{AtomicBloom, Bloom}, solana_measure::measure::Measure, solana_perf::packet::PacketBatch, - solana_perf::sigverify::dedup_packets, + solana_perf::sigverify::Deduper, solana_sdk::timing, solana_streamer::streamer::{self, PacketBatchReceiver, StreamerError}, std::{ @@ -215,7 +213,7 @@ impl SigVerifyStage { } fn verifier( - bloom: &AtomicBloom<&[u8]>, + deduper: &Deduper, recvr: &PacketBatchReceiver, sendr: &CrossbeamSender>, verifier: &T, @@ -231,15 +229,15 @@ impl SigVerifyStage { ); let mut dedup_time = Measure::start("sigverify_dedup_time"); - let dedup_fail = dedup_packets(bloom, &mut batches) as usize; + let dedup_fail = deduper.dedup_packets(&mut batches) as usize; dedup_time.stop(); - let valid_packets = num_packets.saturating_sub(dedup_fail); + let num_unique = num_packets.saturating_sub(dedup_fail); let mut discard_time = Measure::start("sigverify_discard_time"); - if valid_packets > MAX_SIGVERIFY_BATCH { + if num_unique > MAX_SIGVERIFY_BATCH { Self::discard_excess_packets(&mut batches, MAX_SIGVERIFY_BATCH) }; - let excess_fail = valid_packets.saturating_sub(MAX_SIGVERIFY_BATCH); + let excess_fail = num_unique.saturating_sub(MAX_SIGVERIFY_BATCH); discard_time.stop(); let mut verify_batch_time = Measure::start("sigverify_batch_time"); @@ -298,25 +296,16 @@ impl SigVerifyStage { let verifier = verifier.clone(); let mut stats = SigVerifierStats::default(); let mut last_print = Instant::now(); - const MAX_BLOOM_AGE: Duration = Duration::from_millis(2_000); - const MAX_BLOOM_ITEMS: usize = 1_000_000; - const MAX_BLOOM_FAIL: f64 = 0.0001; - const MAX_BLOOM_BITS: usize = 8 << 22; + const MAX_DEDUPER_AGE_MS: u64 = 2_000; + const MAX_DEDUPER_ITEMS: u32 = 1_000_000; Builder::new() .name("solana-verifier".to_string()) .spawn(move || { - let mut bloom = - Bloom::random(MAX_BLOOM_ITEMS, MAX_BLOOM_FAIL, MAX_BLOOM_BITS).into(); - let mut bloom_age = Instant::now(); + let mut deduper = Deduper::new(MAX_DEDUPER_ITEMS, MAX_DEDUPER_AGE_MS); loop { - let now = Instant::now(); - if now.duration_since(bloom_age) > MAX_BLOOM_AGE { - bloom = - Bloom::random(MAX_BLOOM_ITEMS, MAX_BLOOM_FAIL, MAX_BLOOM_BITS).into(); - bloom_age = now; - } + deduper.reset(); if let Err(e) = Self::verifier( - &bloom, + &deduper, &packet_receiver, &verified_sender, &verifier, diff --git a/perf/Cargo.toml b/perf/Cargo.toml index 64c4e4cff26af9..91dd34c56e50cb 100644 --- a/perf/Cargo.toml +++ b/perf/Cargo.toml @@ -12,6 +12,7 @@ edition = "2018" [dependencies] bincode = "1.3.1" curve25519-dalek = { version = "2" } +ahash = "0.7.6" dlopen = "0.1.8" dlopen_derive = "0.1.4" lazy_static = "1.4.0" diff --git a/perf/benches/dedup.rs b/perf/benches/dedup.rs index a802e095bb7f1c..8009bf2804bcfe 100644 --- a/perf/benches/dedup.rs +++ b/perf/benches/dedup.rs @@ -5,7 +5,6 @@ extern crate test; use { rand::prelude::*, - solana_bloom::bloom::{AtomicBloom, Bloom}, solana_perf::{ packet::{to_packet_batches, PacketBatch}, sigverify, @@ -13,6 +12,8 @@ use { test::Bencher, }; +const NUM: usize = 4096; + fn test_packet_with_size(size: usize, rng: &mut ThreadRng) -> Vec { // subtract 8 bytes because the length will get serialized as well (0..size.checked_sub(8).unwrap()) @@ -22,19 +23,14 @@ fn test_packet_with_size(size: usize, rng: &mut ThreadRng) -> Vec { fn do_bench_dedup_packets(bencher: &mut Bencher, mut batches: Vec) { // verify packets - let mut bloom: AtomicBloom<&[u8]> = Bloom::random(1_000_000, 0.0001, 8 << 22).into(); + let mut deduper = sigverify::Deduper::new(1_000_000, 2_000); bencher.iter(|| { - // bench - sigverify::dedup_packets(&bloom, &mut batches); - - // reset - bloom.clear_for_tests(); - batches.iter_mut().for_each(|batch| { - batch.packets.iter_mut().for_each(|p| { - p.meta.discard = false; - }) - }); - }) + let _ans = deduper.dedup_packets(&mut batches); + deduper.reset(); + batches + .iter_mut() + .for_each(|b| b.packets.iter_mut().for_each(|p| p.meta.discard = false)); + }); } #[bench] @@ -45,7 +41,7 @@ fn bench_dedup_same_small_packets(bencher: &mut Bencher) { let batches = to_packet_batches( &std::iter::repeat(small_packet) - .take(4096) + .take(NUM) .collect::>(), 128, ); @@ -60,7 +56,7 @@ fn bench_dedup_same_big_packets(bencher: &mut Bencher) { let big_packet = test_packet_with_size(1024, &mut rng); let batches = to_packet_batches( - &std::iter::repeat(big_packet).take(4096).collect::>(), + &std::iter::repeat(big_packet).take(NUM).collect::>(), 128, ); @@ -73,7 +69,7 @@ fn bench_dedup_diff_small_packets(bencher: &mut Bencher) { let mut rng = rand::thread_rng(); let batches = to_packet_batches( - &(0..4096) + &(0..NUM) .map(|_| test_packet_with_size(128, &mut rng)) .collect::>(), 128, @@ -88,7 +84,7 @@ fn bench_dedup_diff_big_packets(bencher: &mut Bencher) { let mut rng = rand::thread_rng(); let batches = to_packet_batches( - &(0..4096) + &(0..NUM) .map(|_| test_packet_with_size(1024, &mut rng)) .collect::>(), 128, @@ -96,3 +92,27 @@ fn bench_dedup_diff_big_packets(bencher: &mut Bencher) { do_bench_dedup_packets(bencher, batches); } + +#[bench] +#[ignore] +fn bench_dedup_baseline(bencher: &mut Bencher) { + let mut rng = rand::thread_rng(); + + let batches = to_packet_batches( + &(0..0) + .map(|_| test_packet_with_size(128, &mut rng)) + .collect::>(), + 128, + ); + + do_bench_dedup_packets(bencher, batches); +} + +#[bench] +#[ignore] +fn bench_dedup_reset(bencher: &mut Bencher) { + let mut deduper = sigverify::Deduper::new(1_000_000, 0); + bencher.iter(|| { + deduper.reset(); + }); +} diff --git a/perf/src/sigverify.rs b/perf/src/sigverify.rs index feefa56f58d2f4..192e8cad09b3e2 100644 --- a/perf/src/sigverify.rs +++ b/perf/src/sigverify.rs @@ -4,7 +4,6 @@ //! to the GPU. //! -use solana_bloom::bloom::AtomicBloom; #[cfg(test)] use solana_sdk::transaction::Transaction; use { @@ -14,6 +13,8 @@ use { perf_libs, recycler::Recycler, }, + ahash::AHasher, + rand::{thread_rng, Rng}, rayon::ThreadPool, solana_metrics::inc_new_counter_debug, solana_rayon_threadlimit::get_thread_count, @@ -21,7 +22,9 @@ use { hash::Hash, message::MESSAGE_HEADER_LENGTH, pubkey::Pubkey, short_vec::decode_shortu16_len, signature::Signature, }, - std::sync::atomic::{AtomicU64, Ordering}, + std::hash::Hasher, + std::sync::atomic::{AtomicBool, AtomicU64, Ordering}, + std::time::{Duration, Instant}, std::{convert::TryFrom, mem::size_of}, }; @@ -390,34 +393,76 @@ pub fn generate_offsets( ) } -fn dedup_packet(count: &AtomicU64, packet: &mut Packet, bloom: &AtomicBloom<&[u8]>) { - // If this packet was already marked as discard, drop it - if packet.meta.discard { - return; +pub struct Deduper { + filter: Vec, + seed: (u128, u128), + age: Instant, + max_age: Duration, + pub saturated: AtomicBool, +} + +impl Deduper { + pub fn new(size: u32, max_age_ms: u64) -> Self { + let mut filter: Vec = Vec::with_capacity(size as usize); + filter.resize_with(size as usize, Default::default); + let seed = thread_rng().gen(); + Self { + filter, + seed, + age: Instant::now(), + max_age: Duration::from_millis(max_age_ms), + saturated: AtomicBool::new(false), + } } - // If this packet was not newly added, it's a dup and should be discarded - if !bloom.add(&&packet.data[0..packet.meta.size]) { - packet.meta.discard = true; - count.fetch_add(1, Ordering::Relaxed); + pub fn reset(&mut self) { + let now = Instant::now(); + //this should reset every 500k unique packets per 1m sized deduper + //false positive rate is 1/1000 at that point + let saturated = self.saturated.load(Ordering::Relaxed); + if saturated || now.duration_since(self.age) > self.max_age { + for i in &self.filter { + i.store(0, Ordering::Relaxed); + } + self.seed = thread_rng().gen(); + self.age = now; + self.saturated.store(false, Ordering::Relaxed); + } } -} -pub fn dedup_packets(bloom: &AtomicBloom<&[u8]>, batches: &mut [PacketBatch]) -> u64 { - use rayon::prelude::*; - let packet_count = count_packets_in_batches(batches); - // machine specific random offset to read the u64 from the packet signature - let count = AtomicU64::new(0); - PAR_THREAD_POOL.install(|| { - batches.into_par_iter().for_each(|batch| { + fn dedup_packet(&self, count: &AtomicU64, packet: &mut Packet) { + // If this packet was already marked as discard, drop it + if packet.meta.discard { + return; + } + let mut hasher = AHasher::new_with_keys(self.seed.0, self.seed.1); + hasher.write(&packet.data[0..packet.meta.size]); + let hash = hasher.finish(); + let len = self.filter.len(); + let pos = (usize::try_from(hash).unwrap()).wrapping_rem(len); + // saturate each position with or + let prev = self.filter[pos].fetch_or(hash, Ordering::Relaxed); + if prev == u64::MAX { + self.saturated.store(true, Ordering::Relaxed); + //reset this value + self.filter[pos].store(hash, Ordering::Relaxed); + } + if hash == prev & hash { + packet.meta.discard = true; + count.fetch_add(1, Ordering::Relaxed); + } + } + + pub fn dedup_packets(&self, batches: &mut [PacketBatch]) -> u64 { + let count = AtomicU64::new(0); + batches.iter_mut().for_each(|batch| { batch .packets - .par_iter_mut() - .for_each(|p| dedup_packet(&count, p, bloom)) - }) - }); - inc_new_counter_debug!("dedup_packets_total", packet_count); - count.load(Ordering::Relaxed) + .iter_mut() + .for_each(|p| self.dedup_packet(&count, p)) + }); + count.load(Ordering::Relaxed) + } } pub fn ed25519_verify_cpu(batches: &mut [PacketBatch], reject_non_vote: bool) { @@ -430,7 +475,7 @@ pub fn ed25519_verify_cpu(batches: &mut [PacketBatch], reject_non_vote: bool) { .packets .par_iter_mut() .for_each(|p| verify_packet(p, reject_non_vote)) - }) + }); }); inc_new_counter_debug!("ed25519_verify_cpu", packet_count); } @@ -604,7 +649,6 @@ mod tests { test_tx::{test_multisig_tx, test_tx, vote_tx}, }, bincode::{deserialize, serialize}, - solana_bloom::bloom::{AtomicBloom, Bloom}, solana_sdk::{ instruction::CompiledInstruction, message::{Message, MessageHeader}, @@ -1246,26 +1290,57 @@ mod tests { fn test_dedup_same() { let tx = test_tx(); - // generate packet vector let mut batches = to_packet_batches(&std::iter::repeat(tx).take(1024).collect::>(), 128); let packet_count = sigverify::count_packets_in_batches(&batches); - let bloom: AtomicBloom<&[u8]> = Bloom::random(1_000_000, 0.0001, 8 << 20).into(); - let discard = sigverify::dedup_packets(&bloom, &mut batches) as usize; - // because dedup uses a threadpool, there maybe up to N threads of txs that go through - let n = get_thread_count(); - assert!(packet_count < discard + n * 2); + let filter = Deduper::new(1_000_000, 0); + let discard = filter.dedup_packets(&mut batches) as usize; + assert_eq!(packet_count, discard + 1); } #[test] fn test_dedup_diff() { - // generate packet vector + let mut filter = Deduper::new(1_000_000, 0); let mut batches = to_packet_batches(&(0..1024).map(|_| test_tx()).collect::>(), 128); - let bloom: AtomicBloom<&[u8]> = Bloom::random(1_000_000, 0.0001, 8 << 20).into(); - let discard = sigverify::dedup_packets(&bloom, &mut batches) as usize; + let discard = filter.dedup_packets(&mut batches) as usize; // because dedup uses a threadpool, there maybe up to N threads of txs that go through - let n = get_thread_count(); - assert!(discard < n * 2); + assert_eq!(discard, 0); + filter.reset(); + for i in filter.filter { + assert_eq!(i.load(Ordering::Relaxed), 0); + } + } + + #[test] + #[ignore] + fn test_dedup_saturated() { + let filter = Deduper::new(1_000_000, 0); + let mut discard = 0; + assert!(!filter.saturated.load(Ordering::Relaxed)); + for i in 0..1000 { + let mut batches = + to_packet_batches(&(0..1000).map(|_| test_tx()).collect::>(), 128); + discard += filter.dedup_packets(&mut batches) as usize; + println!("{} {}", i, discard); + if filter.saturated.load(Ordering::Relaxed) { + break; + } + } + assert!(filter.saturated.load(Ordering::Relaxed)); + } + + #[test] + fn test_dedup_false_positive() { + let filter = Deduper::new(1_000_000, 0); + let mut discard = 0; + for i in 0..10 { + let mut batches = + to_packet_batches(&(0..1024).map(|_| test_tx()).collect::>(), 128); + discard += filter.dedup_packets(&mut batches) as usize; + println!("false positive rate: {}/{}", discard, i * 1024); + } + //allow for 1 false positive even if extremely unlikely + assert!(discard < 2); } } diff --git a/programs/bpf/Cargo.lock b/programs/bpf/Cargo.lock index ea776258c67edd..248d0f12e88567 100644 --- a/programs/bpf/Cargo.lock +++ b/programs/bpf/Cargo.lock @@ -33,6 +33,17 @@ version = "0.4.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "739f4a8db6605981345c5654f3a85b056ce52f37a39d34da03f25bf2151ea16e" +[[package]] +name = "ahash" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fcb51a0695d8f838b1ee009b3fbf66bda078cd64590202a864a8f3e8c4315c47" +dependencies = [ + "getrandom 0.2.4", + "once_cell", + "version_check", +] + [[package]] name = "aho-corasick" version = "0.7.10" @@ -1136,9 +1147,9 @@ dependencies = [ [[package]] name = "getrandom" -version = "0.2.1" +version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4060f4657be78b8e766215b02b18a2e862d83745545de804638e2b545e81aee6" +checksum = "418d37c8b1d42553c93648be529cb70f920d3baf8ef469b74b9638df426e0b4c" dependencies = [ "cfg-if 1.0.0", "libc", @@ -1202,7 +1213,7 @@ version = "0.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d7afe4a420e3fe79967a00898cc1f4db7c8a49a9333a29f8a4bd76a253d5cd04" dependencies = [ - "ahash", + "ahash 0.4.7", ] [[package]] @@ -2197,7 +2208,7 @@ version = "0.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "34cf66eb183df1c5876e2dcf6b13d57340741e8dc255b48e40a26de954d06ae7" dependencies = [ - "getrandom 0.2.1", + "getrandom 0.2.4", ] [[package]] @@ -2273,7 +2284,7 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "528532f3d801c87aec9def2add9ca802fe569e44a544afe633765267840abe64" dependencies = [ - "getrandom 0.2.1", + "getrandom 0.2.4", "redox_syscall 0.2.4", ] @@ -3378,6 +3389,7 @@ dependencies = [ name = "solana-perf" version = "1.8.14" dependencies = [ + "ahash 0.7.6", "bincode", "caps", "curve25519-dalek 2.1.0",