diff --git a/bloom/src/bloom.rs b/bloom/src/bloom.rs index 152c387e12b5c9..15642069ba172e 100644 --- a/bloom/src/bloom.rs +++ b/bloom/src/bloom.rs @@ -174,11 +174,16 @@ impl AtomicBloom { (index as usize, mask) } - pub fn add(&self, key: &T) { + /// Adds an item to the bloom filter and returns true if the item + /// was not in the filter before. + pub fn add(&self, key: &T) -> bool { + let mut added = false; for k in &self.keys { let (index, mask) = self.pos(key, *k); - self.bits[index].fetch_or(mask, Ordering::Relaxed); + let prev_val = self.bits[index].fetch_or(mask, Ordering::Relaxed); + added = added || prev_val & mask == 0u64; } + added } pub fn contains(&self, key: &T) -> bool { @@ -189,6 +194,12 @@ impl AtomicBloom { }) } + pub fn clear_for_tests(&mut self) { + self.bits.iter().for_each(|bit| { + bit.store(0u64, Ordering::Relaxed); + }); + } + // Only for tests and simulations. pub fn mock_clone(&self) -> Self { Self { @@ -320,7 +331,9 @@ mod test { assert_eq!(bloom.keys.len(), 3); assert_eq!(bloom.num_bits, 6168); assert_eq!(bloom.bits.len(), 97); - hash_values.par_iter().for_each(|v| bloom.add(v)); + hash_values.par_iter().for_each(|v| { + bloom.add(v); + }); let bloom: Bloom = bloom.into(); assert_eq!(bloom.keys.len(), 3); assert_eq!(bloom.bits.len(), 6168); @@ -362,7 +375,9 @@ mod test { } // Round trip, re-inserting the same hash values. let bloom: AtomicBloom<_> = bloom.into(); - hash_values.par_iter().for_each(|v| bloom.add(v)); + hash_values.par_iter().for_each(|v| { + bloom.add(v); + }); for hash_value in &hash_values { assert!(bloom.contains(hash_value)); } @@ -380,7 +395,9 @@ mod test { let bloom: AtomicBloom<_> = bloom.into(); assert_eq!(bloom.num_bits, 9731); assert_eq!(bloom.bits.len(), (9731 + 63) / 64); - more_hash_values.par_iter().for_each(|v| bloom.add(v)); + more_hash_values.par_iter().for_each(|v| { + bloom.add(v); + }); for hash_value in &hash_values { assert!(bloom.contains(hash_value)); } diff --git a/perf/benches/dedup.rs b/perf/benches/dedup.rs index e8590a67ecbfee..8542da9b3f7a12 100644 --- a/perf/benches/dedup.rs +++ b/perf/benches/dedup.rs @@ -1,45 +1,99 @@ +#![allow(clippy::integer_arithmetic)] #![feature(test)] extern crate test; use { + rand::prelude::*, solana_bloom::bloom::{AtomicBloom, Bloom}, - solana_perf::{packet::to_packet_batches, sigverify, test_tx::test_tx}, + solana_perf::{ + packet::{to_packet_batches, PacketBatch}, + sigverify, + }, test::Bencher, }; +fn test_packet_with_size(size: usize, rng: &mut ThreadRng) -> Vec { + // subtract 8 bytes because the length will get serialized as well + (0..size.checked_sub(8).unwrap()) + .map(|_| rng.gen()) + .collect() +} + +fn do_bench_dedup_packets(bencher: &mut Bencher, mut batches: Vec) { + // verify packets + let mut bloom: AtomicBloom<&[u8]> = Bloom::random(1_000_000, 0.0001, 8 << 22).into(); + bencher.iter(|| { + // bench + sigverify::dedup_packets(&bloom, &mut batches); + + // reset + bloom.clear_for_tests(); + batches.iter_mut().for_each(|batch| { + batch + .packets + .iter_mut() + .for_each(|p| p.meta.set_discard(false)) + }); + }) +} + #[bench] -fn bench_dedup_same(bencher: &mut Bencher) { - let tx = test_tx(); +#[ignore] +fn bench_dedup_same_small_packets(bencher: &mut Bencher) { + let mut rng = rand::thread_rng(); + let small_packet = test_packet_with_size(128, &mut rng); - // generate packet vector - let mut batches = to_packet_batches( - &std::iter::repeat(tx).take(64 * 1024).collect::>(), + let batches = to_packet_batches( + &std::iter::repeat(small_packet) + .take(4096) + .collect::>(), 128, ); - let packet_count = sigverify::count_packets_in_batches(&batches); - let bloom: AtomicBloom<&[u8]> = Bloom::random(1_000_000, 0.0001, 8 << 22).into(); - println!("packet_count {} {}", packet_count, batches.len()); + do_bench_dedup_packets(bencher, batches); +} - // verify packets - bencher.iter(|| { - let _ans = sigverify::dedup_packets(&bloom, &mut batches); - }) +#[bench] +#[ignore] +fn bench_dedup_same_big_packets(bencher: &mut Bencher) { + let mut rng = rand::thread_rng(); + let big_packet = test_packet_with_size(1024, &mut rng); + + let batches = to_packet_batches( + &std::iter::repeat(big_packet).take(4096).collect::>(), + 128, + ); + + do_bench_dedup_packets(bencher, batches); } #[bench] -fn bench_dedup_diff(bencher: &mut Bencher) { - // generate packet vector - let mut batches = - to_packet_batches(&(0..64 * 1024).map(|_| test_tx()).collect::>(), 128); - let packet_count = sigverify::count_packets_in_batches(&batches); - let bloom: AtomicBloom<&[u8]> = Bloom::random(1_000_000, 0.0001, 8 << 22).into(); +#[ignore] +fn bench_dedup_diff_small_packets(bencher: &mut Bencher) { + let mut rng = rand::thread_rng(); - println!("packet_count {} {}", packet_count, batches.len()); + let batches = to_packet_batches( + &(0..4096) + .map(|_| test_packet_with_size(128, &mut rng)) + .collect::>(), + 128, + ); - // verify packets - bencher.iter(|| { - let _ans = sigverify::dedup_packets(&bloom, &mut batches); - }) + do_bench_dedup_packets(bencher, batches); +} + +#[bench] +#[ignore] +fn bench_dedup_diff_big_packets(bencher: &mut Bencher) { + let mut rng = rand::thread_rng(); + + let batches = to_packet_batches( + &(0..4096) + .map(|_| test_packet_with_size(1024, &mut rng)) + .collect::>(), + 128, + ); + + do_bench_dedup_packets(bencher, batches); } diff --git a/perf/src/sigverify.rs b/perf/src/sigverify.rs index e6977430d0cd03..fa9f831d252fea 100644 --- a/perf/src/sigverify.rs +++ b/perf/src/sigverify.rs @@ -426,12 +426,11 @@ fn dedup_packet(count: &AtomicU64, packet: &mut Packet, bloom: &AtomicBloom<&[u8 return; } - if bloom.contains(&packet.data.as_slice()) { + // If this packet was not newly added, it's a dup and should be discarded + if !bloom.add(&&packet.data.as_slice()[0..packet.meta.size]) { packet.meta.set_discard(true); count.fetch_add(1, Ordering::Relaxed); - return; } - bloom.add(&packet.data.as_slice()); } pub fn dedup_packets(bloom: &AtomicBloom<&[u8]>, batches: &mut [PacketBatch]) -> u64 {