Skip to content

Commit

Permalink
parquet bloom filter part III: add sbbf writer, remove bloom defaul…
Browse files Browse the repository at this point in the history
…t feature, add reader properties (#3119)

* bloom filter part III

- add reader properties
- add writer properties
- remove `bloom` feature

* update row group vec

* fix clippy

* fix clippy

* remove default feature for twox

* incorporate ndv and fpp

* fix doc

* add unit test

* fix clippy

* Apply suggestions from code review

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>

* remove underflow logic

* refactor write

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
  • Loading branch information
jimexist and alamb committed Nov 22, 2022
1 parent 004a151 commit e214ccc
Show file tree
Hide file tree
Showing 10 changed files with 353 additions and 42 deletions.
9 changes: 4 additions & 5 deletions parquet/Cargo.toml
Expand Up @@ -57,7 +57,8 @@ seq-macro = { version = "0.3", default-features = false }
futures = { version = "0.3", default-features = false, features = ["std"], optional = true }
tokio = { version = "1.0", optional = true, default-features = false, features = ["macros", "rt", "io-util"] }
hashbrown = { version = "0.13", default-features = false }
twox-hash = { version = "1.6", optional = true }
twox-hash = { version = "1.6", default-features = false }
paste = { version = "1.0" }

[dev-dependencies]
base64 = { version = "0.13", default-features = false, features = ["std"] }
Expand All @@ -77,7 +78,7 @@ rand = { version = "0.8", default-features = false, features = ["std", "std_rng"
all-features = true

[features]
default = ["arrow", "bloom", "snap", "brotli", "flate2", "lz4", "zstd", "base64"]
default = ["arrow", "snap", "brotli", "flate2", "lz4", "zstd", "base64"]
# Enable arrow reader/writer APIs
arrow = ["base64", "arrow-array", "arrow-buffer", "arrow-cast", "arrow-data", "arrow-schema", "arrow-select", "arrow-ipc"]
# Enable CLI tools
Expand All @@ -90,8 +91,6 @@ test_common = ["arrow/test_utils"]
experimental = []
# Enable async APIs
async = ["futures", "tokio"]
# Bloomfilter
bloom = ["twox-hash"]

[[test]]
name = "arrow_writer_layout"
Expand All @@ -115,7 +114,7 @@ required-features = ["arrow", "cli"]

[[bin]]
name = "parquet-show-bloom-filter"
required-features = ["cli", "bloom"]
required-features = ["cli"]

[[bench]]
name = "arrow_writer"
Expand Down
5 changes: 1 addition & 4 deletions parquet/src/bin/parquet-show-bloom-filter.rs
Expand Up @@ -78,10 +78,7 @@ fn main() {
let row_group_reader = file_reader
.get_row_group(ri)
.expect("Unable to read row group");
if let Some(sbbf) = row_group_reader
.get_column_bloom_filter(column_index)
.expect("Failed to parse bloom filter")
{
if let Some(sbbf) = row_group_reader.get_column_bloom_filter(column_index) {
args.values.iter().for_each(|value| {
println!(
"Value {} is {} in bloom filter",
Expand Down
125 changes: 124 additions & 1 deletion parquet/src/bloom_filter/mod.rs
Expand Up @@ -24,11 +24,15 @@ use crate::file::metadata::ColumnChunkMetaData;
use crate::file::reader::ChunkReader;
use crate::format::{
BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash, BloomFilterHeader,
SplitBlockAlgorithm, Uncompressed, XxHash,
};
use bytes::{Buf, Bytes};
use std::hash::Hasher;
use std::io::Write;
use std::sync::Arc;
use thrift::protocol::{TCompactInputProtocol, TSerializable};
use thrift::protocol::{
TCompactInputProtocol, TCompactOutputProtocol, TOutputProtocol, TSerializable,
};
use twox_hash::XxHash64;

/// Salt as defined in the [spec](https://github.com/apache/parquet-format/blob/master/BloomFilter.md#technical-approach)
Expand Down Expand Up @@ -80,6 +84,7 @@ fn block_check(block: &Block, hash: u32) -> bool {
}

/// A split block Bloom filter
#[derive(Debug, Clone)]
pub struct Sbbf(Vec<Block>);

const SBBF_HEADER_SIZE_ESTIMATE: usize = 20;
Expand Down Expand Up @@ -113,7 +118,43 @@ fn read_bloom_filter_header_and_length(
))
}

const BITSET_MIN_LENGTH: usize = 32;
const BITSET_MAX_LENGTH: usize = 128 * 1024 * 1024;

#[inline]
fn optimal_num_of_bytes(num_bytes: usize) -> usize {
let num_bytes = num_bytes.min(BITSET_MAX_LENGTH);
let num_bytes = num_bytes.max(BITSET_MIN_LENGTH);
num_bytes.next_power_of_two()
}

// see http://algo2.iti.kit.edu/documents/cacheefficientbloomfilters-jea.pdf
// given fpp = (1 - e^(-k * n / m)) ^ k
// we have m = - k * n / ln(1 - fpp ^ (1 / k))
// where k = number of hash functions, m = number of bits, n = number of distinct values
#[inline]
fn num_of_bits_from_ndv_fpp(ndv: u64, fpp: f64) -> usize {
let num_bits = -8.0 * ndv as f64 / (1.0 - fpp.powf(1.0 / 8.0)).ln();
num_bits as usize
}

impl Sbbf {
/// Create a new [Sbbf] with given number of distinct values and false positive probability.
/// Will panic if `fpp` is greater than 1.0 or less than 0.0.
pub fn new_with_ndv_fpp(ndv: u64, fpp: f64) -> Self {
assert!((0.0..-1.0).contains(&fpp), "invalid fpp: {}", fpp);
let num_bits = num_of_bits_from_ndv_fpp(ndv, fpp);
Self::new_with_num_of_bytes(num_bits / 8)
}

/// Create a new [Sbbf] with given number of bytes, the exact number of bytes will be adjusted
/// to the next power of two bounded by `BITSET_MIN_LENGTH` and `BITSET_MAX_LENGTH`.
pub fn new_with_num_of_bytes(num_bytes: usize) -> Self {
let num_bytes = optimal_num_of_bytes(num_bytes);
let bitset = vec![0_u8; num_bytes];
Self::new(&bitset)
}

fn new(bitset: &[u8]) -> Self {
let data = bitset
.chunks_exact(4 * 8)
Expand All @@ -128,6 +169,45 @@ impl Sbbf {
Self(data)
}

/// Write the bloom filter data (header and then bitset) to the output
pub fn write<W: Write>(&self, mut writer: W) -> Result<(), ParquetError> {
let mut protocol = TCompactOutputProtocol::new(&mut writer);
let header = self.header();
header.write_to_out_protocol(&mut protocol).map_err(|e| {
ParquetError::General(format!("Could not write bloom filter header: {}", e))
})?;
protocol.flush()?;
self.write_bitset(&mut writer)?;
Ok(())
}

/// Write the bitset in serialized form to the writer.
fn write_bitset<W: Write>(&self, mut writer: W) -> Result<(), ParquetError> {
for block in &self.0 {
for word in block {
writer.write_all(&word.to_le_bytes()).map_err(|e| {
ParquetError::General(format!(
"Could not write bloom filter bit set: {}",
e
))
})?;
}
}
Ok(())
}

/// Create and populate [`BloomFilterHeader`] from this bitset for writing to serialized form
fn header(&self) -> BloomFilterHeader {
BloomFilterHeader {
// 8 i32 per block, 4 bytes per i32
num_bytes: self.0.len() as i32 * 4 * 8,
algorithm: BloomFilterAlgorithm::BLOCK(SplitBlockAlgorithm {}),
hash: BloomFilterHash::XXHASH(XxHash {}),
compression: BloomFilterCompression::UNCOMPRESSED(Uncompressed {}),
}
}

/// Read a new bloom filter from the given offset in the given reader.
pub fn read_from_column_chunk<R: ChunkReader>(
column_metadata: &ColumnChunkMetaData,
reader: Arc<R>,
Expand Down Expand Up @@ -292,4 +372,47 @@ mod tests {
assert_eq!(num_bytes, 32_i32);
assert_eq!(20, SBBF_HEADER_SIZE_ESTIMATE);
}

#[test]
fn test_optimal_num_of_bytes() {
for (input, expected) in &[
(0, 32),
(9, 32),
(31, 32),
(32, 32),
(33, 64),
(99, 128),
(1024, 1024),
(999_000_000, 128 * 1024 * 1024),
] {
assert_eq!(*expected, optimal_num_of_bytes(*input));
}
}

#[test]
fn test_num_of_bits_from_ndv_fpp() {
for (fpp, ndv, num_bits) in &[
(0.1, 10, 57),
(0.01, 10, 96),
(0.001, 10, 146),
(0.1, 100, 577),
(0.01, 100, 968),
(0.001, 100, 1460),
(0.1, 1000, 5772),
(0.01, 1000, 9681),
(0.001, 1000, 14607),
(0.1, 10000, 57725),
(0.01, 10000, 96815),
(0.001, 10000, 146076),
(0.1, 100000, 577254),
(0.01, 100000, 968152),
(0.001, 100000, 1460769),
(0.1, 1000000, 5772541),
(0.01, 1000000, 9681526),
(0.001, 1000000, 14607697),
(1e-50, 1_000_000_000_000, 14226231280773240832),
] {
assert_eq!(*num_bits, num_of_bits_from_ndv_fpp(*ndv, *fpp) as u64);
}
}
}
22 changes: 22 additions & 0 deletions parquet/src/column/writer/mod.rs
Expand Up @@ -16,6 +16,8 @@
// under the License.

//! Contains column writer API.

use crate::bloom_filter::Sbbf;
use crate::format::{ColumnIndex, OffsetIndex};
use std::collections::{BTreeSet, VecDeque};

Expand Down Expand Up @@ -154,6 +156,8 @@ pub struct ColumnCloseResult {
pub rows_written: u64,
/// Metadata for this column chunk
pub metadata: ColumnChunkMetaData,
/// Optional bloom filter for this column
pub bloom_filter: Option<Sbbf>,
/// Optional column index, for filtering
pub column_index: Option<ColumnIndex>,
/// Optional offset index, identifying page locations
Expand Down Expand Up @@ -209,6 +213,9 @@ pub struct GenericColumnWriter<'a, E: ColumnValueEncoder> {
rep_levels_sink: Vec<i16>,
data_pages: VecDeque<CompressedPage>,

// bloom filter
bloom_filter: Option<Sbbf>,

// column index and offset index
column_index_builder: ColumnIndexBuilder,
offset_index_builder: OffsetIndexBuilder,
Expand All @@ -231,6 +238,19 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
// Used for level information
encodings.insert(Encoding::RLE);

let bloom_filter_enabled = props.bloom_filter_enabled(descr.path());
let bloom_filter = if bloom_filter_enabled {
if let Some(ndv) = props.bloom_filter_ndv(descr.path()) {
let fpp = props.bloom_filter_fpp(descr.path());
Some(Sbbf::new_with_ndv_fpp(ndv, fpp))
} else {
let max_bytes = props.bloom_filter_max_bytes(descr.path());
Some(Sbbf::new_with_num_of_bytes(max_bytes as usize))
}
} else {
None
};

Self {
descr,
props,
Expand Down Expand Up @@ -260,6 +280,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
num_column_nulls: 0,
column_distinct_count: None,
},
bloom_filter,
column_index_builder: ColumnIndexBuilder::new(),
offset_index_builder: OffsetIndexBuilder::new(),
encodings,
Expand Down Expand Up @@ -458,6 +479,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
Ok(ColumnCloseResult {
bytes_written: self.column_metrics.total_bytes_written,
rows_written: self.column_metrics.total_rows_written,
bloom_filter: self.bloom_filter,
metadata,
column_index,
offset_index,
Expand Down
2 changes: 1 addition & 1 deletion parquet/src/file/metadata.rs
Expand Up @@ -236,7 +236,7 @@ pub struct RowGroupMetaData {
}

impl RowGroupMetaData {
/// Returns builer for row group metadata.
/// Returns builder for row group metadata.
pub fn builder(schema_descr: SchemaDescPtr) -> RowGroupMetaDataBuilder {
RowGroupMetaDataBuilder::new(schema_descr)
}
Expand Down

0 comments on commit e214ccc

Please sign in to comment.