Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Speed up packet dedup and fix benches (backport #22592) #22613

Merged
merged 1 commit into from
Jan 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
27 changes: 22 additions & 5 deletions bloom/src/bloom.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,11 +174,16 @@ impl<T: BloomHashIndex> AtomicBloom<T> {
(index as usize, mask)
}

pub fn add(&self, key: &T) {
/// Adds an item to the bloom filter and returns true if the item
/// was not in the filter before.
pub fn add(&self, key: &T) -> bool {
let mut added = false;
for k in &self.keys {
let (index, mask) = self.pos(key, *k);
self.bits[index].fetch_or(mask, Ordering::Relaxed);
let prev_val = self.bits[index].fetch_or(mask, Ordering::Relaxed);
added = added || prev_val & mask == 0u64;
}
added
}

pub fn contains(&self, key: &T) -> bool {
Expand All @@ -189,6 +194,12 @@ impl<T: BloomHashIndex> AtomicBloom<T> {
})
}

pub fn clear_for_tests(&mut self) {
self.bits.iter().for_each(|bit| {
bit.store(0u64, Ordering::Relaxed);
});
}

// Only for tests and simulations.
pub fn mock_clone(&self) -> Self {
Self {
Expand Down Expand Up @@ -320,7 +331,9 @@ mod test {
assert_eq!(bloom.keys.len(), 3);
assert_eq!(bloom.num_bits, 6168);
assert_eq!(bloom.bits.len(), 97);
hash_values.par_iter().for_each(|v| bloom.add(v));
hash_values.par_iter().for_each(|v| {
bloom.add(v);
});
let bloom: Bloom<Hash> = bloom.into();
assert_eq!(bloom.keys.len(), 3);
assert_eq!(bloom.bits.len(), 6168);
Expand Down Expand Up @@ -362,7 +375,9 @@ mod test {
}
// Round trip, re-inserting the same hash values.
let bloom: AtomicBloom<_> = bloom.into();
hash_values.par_iter().for_each(|v| bloom.add(v));
hash_values.par_iter().for_each(|v| {
bloom.add(v);
});
for hash_value in &hash_values {
assert!(bloom.contains(hash_value));
}
Expand All @@ -380,7 +395,9 @@ mod test {
let bloom: AtomicBloom<_> = bloom.into();
assert_eq!(bloom.num_bits, 9731);
assert_eq!(bloom.bits.len(), (9731 + 63) / 64);
more_hash_values.par_iter().for_each(|v| bloom.add(v));
more_hash_values.par_iter().for_each(|v| {
bloom.add(v);
});
for hash_value in &hash_values {
assert!(bloom.contains(hash_value));
}
Expand Down
102 changes: 78 additions & 24 deletions perf/benches/dedup.rs
Original file line number Diff line number Diff line change
@@ -1,45 +1,99 @@
#![allow(clippy::integer_arithmetic)]
#![feature(test)]

extern crate test;

use {
rand::prelude::*,
solana_bloom::bloom::{AtomicBloom, Bloom},
solana_perf::{packet::to_packet_batches, sigverify, test_tx::test_tx},
solana_perf::{
packet::{to_packet_batches, PacketBatch},
sigverify,
},
test::Bencher,
};

fn test_packet_with_size(size: usize, rng: &mut ThreadRng) -> Vec<u8> {
// subtract 8 bytes because the length will get serialized as well
(0..size.checked_sub(8).unwrap())
.map(|_| rng.gen())
.collect()
}

fn do_bench_dedup_packets(bencher: &mut Bencher, mut batches: Vec<PacketBatch>) {
// verify packets
let mut bloom: AtomicBloom<&[u8]> = Bloom::random(1_000_000, 0.0001, 8 << 22).into();
bencher.iter(|| {
// bench
sigverify::dedup_packets(&bloom, &mut batches);

// reset
bloom.clear_for_tests();
batches.iter_mut().for_each(|batch| {
batch
.packets
.iter_mut()
.for_each(|p| p.meta.set_discard(false))
});
})
}

#[bench]
fn bench_dedup_same(bencher: &mut Bencher) {
let tx = test_tx();
#[ignore]
fn bench_dedup_same_small_packets(bencher: &mut Bencher) {
let mut rng = rand::thread_rng();
let small_packet = test_packet_with_size(128, &mut rng);

// generate packet vector
let mut batches = to_packet_batches(
&std::iter::repeat(tx).take(64 * 1024).collect::<Vec<_>>(),
let batches = to_packet_batches(
&std::iter::repeat(small_packet)
.take(4096)
.collect::<Vec<_>>(),
128,
);
let packet_count = sigverify::count_packets_in_batches(&batches);
let bloom: AtomicBloom<&[u8]> = Bloom::random(1_000_000, 0.0001, 8 << 22).into();

println!("packet_count {} {}", packet_count, batches.len());
do_bench_dedup_packets(bencher, batches);
}

// verify packets
bencher.iter(|| {
let _ans = sigverify::dedup_packets(&bloom, &mut batches);
})
#[bench]
#[ignore]
fn bench_dedup_same_big_packets(bencher: &mut Bencher) {
let mut rng = rand::thread_rng();
let big_packet = test_packet_with_size(1024, &mut rng);

let batches = to_packet_batches(
&std::iter::repeat(big_packet).take(4096).collect::<Vec<_>>(),
128,
);

do_bench_dedup_packets(bencher, batches);
}

#[bench]
fn bench_dedup_diff(bencher: &mut Bencher) {
// generate packet vector
let mut batches =
to_packet_batches(&(0..64 * 1024).map(|_| test_tx()).collect::<Vec<_>>(), 128);
let packet_count = sigverify::count_packets_in_batches(&batches);
let bloom: AtomicBloom<&[u8]> = Bloom::random(1_000_000, 0.0001, 8 << 22).into();
#[ignore]
fn bench_dedup_diff_small_packets(bencher: &mut Bencher) {
let mut rng = rand::thread_rng();

println!("packet_count {} {}", packet_count, batches.len());
let batches = to_packet_batches(
&(0..4096)
.map(|_| test_packet_with_size(128, &mut rng))
.collect::<Vec<_>>(),
128,
);

// verify packets
bencher.iter(|| {
let _ans = sigverify::dedup_packets(&bloom, &mut batches);
})
do_bench_dedup_packets(bencher, batches);
}

#[bench]
#[ignore]
fn bench_dedup_diff_big_packets(bencher: &mut Bencher) {
let mut rng = rand::thread_rng();

let batches = to_packet_batches(
&(0..4096)
.map(|_| test_packet_with_size(1024, &mut rng))
.collect::<Vec<_>>(),
128,
);

do_bench_dedup_packets(bencher, batches);
}
5 changes: 2 additions & 3 deletions perf/src/sigverify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -426,12 +426,11 @@ fn dedup_packet(count: &AtomicU64, packet: &mut Packet, bloom: &AtomicBloom<&[u8
return;
}

if bloom.contains(&packet.data.as_slice()) {
// If this packet was not newly added, it's a dup and should be discarded
if !bloom.add(&&packet.data.as_slice()[0..packet.meta.size]) {
packet.meta.set_discard(true);
count.fetch_add(1, Ordering::Relaxed);
return;
}
bloom.add(&packet.data.as_slice());
}

pub fn dedup_packets(bloom: &AtomicBloom<&[u8]>, batches: &mut [PacketBatch]) -> u64 {
Expand Down