Skip to content

Commit

Permalink
Speed up packet dedup and fix benches (#22592)
Browse files Browse the repository at this point in the history
* Speed up packet dedup and fix benches

* fix tests

* allow int arithmetic in bench

(cherry picked from commit a2d251c)
  • Loading branch information
jstarry authored and mergify-bot committed Jan 20, 2022
1 parent e9e35fd commit b0edbef
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 32 deletions.
27 changes: 22 additions & 5 deletions bloom/src/bloom.rs
Expand Up @@ -174,11 +174,16 @@ impl<T: BloomHashIndex> AtomicBloom<T> {
(index as usize, mask)
}

pub fn add(&self, key: &T) {
/// Adds an item to the bloom filter and returns true if the item
/// was not in the filter before.
pub fn add(&self, key: &T) -> bool {
let mut added = false;
for k in &self.keys {
let (index, mask) = self.pos(key, *k);
self.bits[index].fetch_or(mask, Ordering::Relaxed);
let prev_val = self.bits[index].fetch_or(mask, Ordering::Relaxed);
added = added || prev_val & mask == 0u64;
}
added
}

pub fn contains(&self, key: &T) -> bool {
Expand All @@ -189,6 +194,12 @@ impl<T: BloomHashIndex> AtomicBloom<T> {
})
}

pub fn clear_for_tests(&mut self) {
self.bits.iter().for_each(|bit| {
bit.store(0u64, Ordering::Relaxed);
});
}

// Only for tests and simulations.
pub fn mock_clone(&self) -> Self {
Self {
Expand Down Expand Up @@ -320,7 +331,9 @@ mod test {
assert_eq!(bloom.keys.len(), 3);
assert_eq!(bloom.num_bits, 6168);
assert_eq!(bloom.bits.len(), 97);
hash_values.par_iter().for_each(|v| bloom.add(v));
hash_values.par_iter().for_each(|v| {
bloom.add(v);
});
let bloom: Bloom<Hash> = bloom.into();
assert_eq!(bloom.keys.len(), 3);
assert_eq!(bloom.bits.len(), 6168);
Expand Down Expand Up @@ -362,7 +375,9 @@ mod test {
}
// Round trip, re-inserting the same hash values.
let bloom: AtomicBloom<_> = bloom.into();
hash_values.par_iter().for_each(|v| bloom.add(v));
hash_values.par_iter().for_each(|v| {
bloom.add(v);
});
for hash_value in &hash_values {
assert!(bloom.contains(hash_value));
}
Expand All @@ -380,7 +395,9 @@ mod test {
let bloom: AtomicBloom<_> = bloom.into();
assert_eq!(bloom.num_bits, 9731);
assert_eq!(bloom.bits.len(), (9731 + 63) / 64);
more_hash_values.par_iter().for_each(|v| bloom.add(v));
more_hash_values.par_iter().for_each(|v| {
bloom.add(v);
});
for hash_value in &hash_values {
assert!(bloom.contains(hash_value));
}
Expand Down
102 changes: 78 additions & 24 deletions perf/benches/dedup.rs
@@ -1,45 +1,99 @@
#![allow(clippy::integer_arithmetic)]
#![feature(test)]

extern crate test;

use {
rand::prelude::*,
solana_bloom::bloom::{AtomicBloom, Bloom},
solana_perf::{packet::to_packet_batches, sigverify, test_tx::test_tx},
solana_perf::{
packet::{to_packet_batches, PacketBatch},
sigverify,
},
test::Bencher,
};

fn test_packet_with_size(size: usize, rng: &mut ThreadRng) -> Vec<u8> {
// subtract 8 bytes because the length will get serialized as well
(0..size.checked_sub(8).unwrap())
.map(|_| rng.gen())
.collect()
}

fn do_bench_dedup_packets(bencher: &mut Bencher, mut batches: Vec<PacketBatch>) {
// verify packets
let mut bloom: AtomicBloom<&[u8]> = Bloom::random(1_000_000, 0.0001, 8 << 22).into();
bencher.iter(|| {
// bench
sigverify::dedup_packets(&bloom, &mut batches);

// reset
bloom.clear_for_tests();
batches.iter_mut().for_each(|batch| {
batch
.packets
.iter_mut()
.for_each(|p| p.meta.set_discard(false))
});
})
}

#[bench]
fn bench_dedup_same(bencher: &mut Bencher) {
let tx = test_tx();
#[ignore]
fn bench_dedup_same_small_packets(bencher: &mut Bencher) {
let mut rng = rand::thread_rng();
let small_packet = test_packet_with_size(128, &mut rng);

// generate packet vector
let mut batches = to_packet_batches(
&std::iter::repeat(tx).take(64 * 1024).collect::<Vec<_>>(),
let batches = to_packet_batches(
&std::iter::repeat(small_packet)
.take(4096)
.collect::<Vec<_>>(),
128,
);
let packet_count = sigverify::count_packets_in_batches(&batches);
let bloom: AtomicBloom<&[u8]> = Bloom::random(1_000_000, 0.0001, 8 << 22).into();

println!("packet_count {} {}", packet_count, batches.len());
do_bench_dedup_packets(bencher, batches);
}

// verify packets
bencher.iter(|| {
let _ans = sigverify::dedup_packets(&bloom, &mut batches);
})
#[bench]
#[ignore]
fn bench_dedup_same_big_packets(bencher: &mut Bencher) {
let mut rng = rand::thread_rng();
let big_packet = test_packet_with_size(1024, &mut rng);

let batches = to_packet_batches(
&std::iter::repeat(big_packet).take(4096).collect::<Vec<_>>(),
128,
);

do_bench_dedup_packets(bencher, batches);
}

#[bench]
fn bench_dedup_diff(bencher: &mut Bencher) {
// generate packet vector
let mut batches =
to_packet_batches(&(0..64 * 1024).map(|_| test_tx()).collect::<Vec<_>>(), 128);
let packet_count = sigverify::count_packets_in_batches(&batches);
let bloom: AtomicBloom<&[u8]> = Bloom::random(1_000_000, 0.0001, 8 << 22).into();
#[ignore]
fn bench_dedup_diff_small_packets(bencher: &mut Bencher) {
let mut rng = rand::thread_rng();

println!("packet_count {} {}", packet_count, batches.len());
let batches = to_packet_batches(
&(0..4096)
.map(|_| test_packet_with_size(128, &mut rng))
.collect::<Vec<_>>(),
128,
);

// verify packets
bencher.iter(|| {
let _ans = sigverify::dedup_packets(&bloom, &mut batches);
})
do_bench_dedup_packets(bencher, batches);
}

#[bench]
#[ignore]
fn bench_dedup_diff_big_packets(bencher: &mut Bencher) {
let mut rng = rand::thread_rng();

let batches = to_packet_batches(
&(0..4096)
.map(|_| test_packet_with_size(1024, &mut rng))
.collect::<Vec<_>>(),
128,
);

do_bench_dedup_packets(bencher, batches);
}
5 changes: 2 additions & 3 deletions perf/src/sigverify.rs
Expand Up @@ -426,12 +426,11 @@ fn dedup_packet(count: &AtomicU64, packet: &mut Packet, bloom: &AtomicBloom<&[u8
return;
}

if bloom.contains(&packet.data.as_slice()) {
// If this packet was not newly added, it's a dup and should be discarded
if !bloom.add(&&packet.data.as_slice()[0..packet.meta.size]) {
packet.meta.set_discard(true);
count.fetch_add(1, Ordering::Relaxed);
return;
}
bloom.add(&packet.data.as_slice());
}

pub fn dedup_packets(bloom: &AtomicBloom<&[u8]>, batches: &mut [PacketBatch]) -> u64 {
Expand Down

0 comments on commit b0edbef

Please sign in to comment.