Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

parquet bloom filter part III: add sbbf writer, remove bloom default feature, add reader properties #3119

Merged
merged 12 commits into from Nov 22, 2022
9 changes: 4 additions & 5 deletions parquet/Cargo.toml
Expand Up @@ -57,7 +57,8 @@ seq-macro = { version = "0.3", default-features = false }
futures = { version = "0.3", default-features = false, features = ["std"], optional = true }
tokio = { version = "1.0", optional = true, default-features = false, features = ["macros", "rt", "io-util"] }
hashbrown = { version = "0.13", default-features = false }
twox-hash = { version = "1.6", optional = true }
twox-hash = { version = "1.6", default-features = false }
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it by default relies on rand which breaks wasm32

paste = { version = "1.0" }

[dev-dependencies]
base64 = { version = "0.13", default-features = false, features = ["std"] }
Expand All @@ -77,7 +78,7 @@ rand = { version = "0.8", default-features = false, features = ["std", "std_rng"
all-features = true

[features]
default = ["arrow", "bloom", "snap", "brotli", "flate2", "lz4", "zstd", "base64"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

default = ["arrow", "snap", "brotli", "flate2", "lz4", "zstd", "base64"]
# Enable arrow reader/writer APIs
arrow = ["base64", "arrow-array", "arrow-buffer", "arrow-cast", "arrow-data", "arrow-schema", "arrow-select", "arrow-ipc"]
# Enable CLI tools
Expand All @@ -90,8 +91,6 @@ test_common = ["arrow/test_utils"]
experimental = []
# Enable async APIs
async = ["futures", "tokio"]
# Bloomfilter
bloom = ["twox-hash"]

[[test]]
name = "arrow_writer_layout"
Expand All @@ -115,7 +114,7 @@ required-features = ["arrow", "cli"]

[[bin]]
name = "parquet-show-bloom-filter"
required-features = ["cli", "bloom"]
required-features = ["cli"]

[[bench]]
name = "arrow_writer"
Expand Down
5 changes: 1 addition & 4 deletions parquet/src/bin/parquet-show-bloom-filter.rs
Expand Up @@ -78,10 +78,7 @@ fn main() {
let row_group_reader = file_reader
.get_row_group(ri)
.expect("Unable to read row group");
if let Some(sbbf) = row_group_reader
.get_column_bloom_filter(column_index)
.expect("Failed to parse bloom filter")
{
if let Some(sbbf) = row_group_reader.get_column_bloom_filter(column_index) {
args.values.iter().for_each(|value| {
println!(
"Value {} is {} in bloom filter",
Expand Down
125 changes: 124 additions & 1 deletion parquet/src/bloom_filter/mod.rs
Expand Up @@ -24,11 +24,15 @@ use crate::file::metadata::ColumnChunkMetaData;
use crate::file::reader::ChunkReader;
use crate::format::{
BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash, BloomFilterHeader,
SplitBlockAlgorithm, Uncompressed, XxHash,
};
use bytes::{Buf, Bytes};
use std::hash::Hasher;
use std::io::Write;
use std::sync::Arc;
use thrift::protocol::{TCompactInputProtocol, TSerializable};
use thrift::protocol::{
TCompactInputProtocol, TCompactOutputProtocol, TOutputProtocol, TSerializable,
};
use twox_hash::XxHash64;

/// Salt as defined in the [spec](https://github.com/apache/parquet-format/blob/master/BloomFilter.md#technical-approach)
Expand Down Expand Up @@ -80,6 +84,7 @@ fn block_check(block: &Block, hash: u32) -> bool {
}

/// A split block Bloom filter
#[derive(Debug, Clone)]
pub struct Sbbf(Vec<Block>);

const SBBF_HEADER_SIZE_ESTIMATE: usize = 20;
Expand Down Expand Up @@ -113,7 +118,43 @@ fn read_bloom_filter_header_and_length(
))
}

const BITSET_MIN_LENGTH: usize = 32;
const BITSET_MAX_LENGTH: usize = 128 * 1024 * 1024;

#[inline]
fn optimal_num_of_bytes(num_bytes: usize) -> usize {
let num_bytes = num_bytes.min(BITSET_MAX_LENGTH);
let num_bytes = num_bytes.max(BITSET_MIN_LENGTH);
num_bytes.next_power_of_two()
}

// see http://algo2.iti.kit.edu/documents/cacheefficientbloomfilters-jea.pdf
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// given fpp = (1 - e^(-k * n / m)) ^ k
// we have m = - k * n / ln(1 - fpp ^ (1 / k))
// where k = number of hash functions, m = number of bits, n = number of distinct values
#[inline]
fn num_of_bits_from_ndv_fpp(ndv: u64, fpp: f64) -> usize {
let num_bits = -8.0 * ndv as f64 / (1.0 - fpp.powf(1.0 / 8.0)).ln();
num_bits as usize
}

impl Sbbf {
/// Create a new [Sbbf] with given number of distinct values and false positive probability.
/// Will panic if `fpp` is greater than 1.0 or less than 0.0.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// Will panic if `fpp` is greater than 1.0 or less than 0.0.
/// Will panic if `fpp` is greater than or equal to 1.0 or less than 0.0.

pub fn new_with_ndv_fpp(ndv: u64, fpp: f64) -> Self {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is a function meant for use outside the parquet crate I would prefer it return an error rather than panic with bad input.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will update once we decided on using fpp or ndv or both:

assert!((0.0..-1.0).contains(&fpp), "invalid fpp: {}", fpp);
let num_bits = num_of_bits_from_ndv_fpp(ndv, fpp);
Self::new_with_num_of_bytes(num_bits / 8)
}

/// Create a new [Sbbf] with given number of bytes, the exact number of bytes will be adjusted
/// to the next power of two bounded by `BITSET_MIN_LENGTH` and `BITSET_MAX_LENGTH`.
pub fn new_with_num_of_bytes(num_bytes: usize) -> Self {
let num_bytes = optimal_num_of_bytes(num_bytes);
let bitset = vec![0_u8; num_bytes];
Self::new(&bitset)
}

fn new(bitset: &[u8]) -> Self {
let data = bitset
.chunks_exact(4 * 8)
Expand All @@ -128,6 +169,45 @@ impl Sbbf {
Self(data)
}

/// Write the bloom filter data (header and then bitset) to the output
pub fn write<W: Write>(&self, mut writer: W) -> Result<(), ParquetError> {
let mut protocol = TCompactOutputProtocol::new(&mut writer);
let header = self.header();
header.write_to_out_protocol(&mut protocol).map_err(|e| {
ParquetError::General(format!("Could not write bloom filter header: {}", e))
})?;
protocol.flush()?;
self.write_bitset(&mut writer)?;
Ok(())
}

/// Write the bitset in serialized form to the writer.
fn write_bitset<W: Write>(&self, mut writer: W) -> Result<(), ParquetError> {
for block in &self.0 {
for word in block {
writer.write_all(&word.to_le_bytes()).map_err(|e| {
ParquetError::General(format!(
"Could not write bloom filter bit set: {}",
e
))
})?;
}
}
Ok(())
}

/// Create and populate [`BloomFilterHeader`] from this bitset for writing to serialized form
fn header(&self) -> BloomFilterHeader {
BloomFilterHeader {
// 8 i32 per block, 4 bytes per i32
num_bytes: self.0.len() as i32 * 4 * 8,
algorithm: BloomFilterAlgorithm::BLOCK(SplitBlockAlgorithm {}),
hash: BloomFilterHash::XXHASH(XxHash {}),
compression: BloomFilterCompression::UNCOMPRESSED(Uncompressed {}),
}
}

/// Read a new bloom filter from the given offset in the given reader.
pub fn read_from_column_chunk<R: ChunkReader>(
column_metadata: &ColumnChunkMetaData,
reader: Arc<R>,
Expand Down Expand Up @@ -292,4 +372,47 @@ mod tests {
assert_eq!(num_bytes, 32_i32);
assert_eq!(20, SBBF_HEADER_SIZE_ESTIMATE);
}

#[test]
fn test_optimal_num_of_bytes() {
for (input, expected) in &[
(0, 32),
(9, 32),
(31, 32),
(32, 32),
(33, 64),
(99, 128),
(1024, 1024),
(999_000_000, 128 * 1024 * 1024),
] {
assert_eq!(*expected, optimal_num_of_bytes(*input));
}
}

#[test]
fn test_num_of_bits_from_ndv_fpp() {
for (fpp, ndv, num_bits) in &[
(0.1, 10, 57),
(0.01, 10, 96),
(0.001, 10, 146),
(0.1, 100, 577),
(0.01, 100, 968),
(0.001, 100, 1460),
(0.1, 1000, 5772),
(0.01, 1000, 9681),
(0.001, 1000, 14607),
(0.1, 10000, 57725),
(0.01, 10000, 96815),
(0.001, 10000, 146076),
(0.1, 100000, 577254),
(0.01, 100000, 968152),
(0.001, 100000, 1460769),
(0.1, 1000000, 5772541),
(0.01, 1000000, 9681526),
(0.001, 1000000, 14607697),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this mean a 14MB bloom filter? Its ok as there is a limit in optimal_num_of_bytes but when I saw this it was just 🤯

It might also be good to pass in some value larger than 2^32 to test there isn't an overflow problem lurking

(1e-50, 1_000_000_000_000, 14226231280773240832),
] {
assert_eq!(*num_bits, num_of_bits_from_ndv_fpp(*ndv, *fpp) as u64);
}
}
}
22 changes: 22 additions & 0 deletions parquet/src/column/writer/mod.rs
Expand Up @@ -16,6 +16,8 @@
// under the License.

//! Contains column writer API.

use crate::bloom_filter::Sbbf;
use crate::format::{ColumnIndex, OffsetIndex};
use std::collections::{BTreeSet, VecDeque};

Expand Down Expand Up @@ -154,6 +156,8 @@ pub struct ColumnCloseResult {
pub rows_written: u64,
/// Metadata for this column chunk
pub metadata: ColumnChunkMetaData,
/// Optional bloom filter for this column
pub bloom_filter: Option<Sbbf>,
/// Optional column index, for filtering
pub column_index: Option<ColumnIndex>,
/// Optional offset index, identifying page locations
Expand Down Expand Up @@ -209,6 +213,9 @@ pub struct GenericColumnWriter<'a, E: ColumnValueEncoder> {
rep_levels_sink: Vec<i16>,
data_pages: VecDeque<CompressedPage>,

// bloom filter
bloom_filter: Option<Sbbf>,

// column index and offset index
column_index_builder: ColumnIndexBuilder,
offset_index_builder: OffsetIndexBuilder,
Expand All @@ -231,6 +238,19 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
// Used for level information
encodings.insert(Encoding::RLE);

let bloom_filter_enabled = props.bloom_filter_enabled(descr.path());
let bloom_filter = if bloom_filter_enabled {
if let Some(ndv) = props.bloom_filter_ndv(descr.path()) {
let fpp = props.bloom_filter_fpp(descr.path());
Some(Sbbf::new_with_ndv_fpp(ndv, fpp))
} else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is perhaps a little surprising that bloom_filter_max_bytes is ignored if ndv is set

let max_bytes = props.bloom_filter_max_bytes(descr.path());
Some(Sbbf::new_with_num_of_bytes(max_bytes as usize))
}
} else {
None
};

Self {
descr,
props,
Expand Down Expand Up @@ -260,6 +280,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
num_column_nulls: 0,
column_distinct_count: None,
},
bloom_filter,
column_index_builder: ColumnIndexBuilder::new(),
offset_index_builder: OffsetIndexBuilder::new(),
encodings,
Expand Down Expand Up @@ -458,6 +479,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
Ok(ColumnCloseResult {
bytes_written: self.column_metrics.total_bytes_written,
rows_written: self.column_metrics.total_rows_written,
bloom_filter: self.bloom_filter,
metadata,
column_index,
offset_index,
Expand Down
2 changes: 1 addition & 1 deletion parquet/src/file/metadata.rs
Expand Up @@ -236,7 +236,7 @@ pub struct RowGroupMetaData {
}

impl RowGroupMetaData {
/// Returns builer for row group metadata.
/// Returns builder for row group metadata.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

pub fn builder(schema_descr: SchemaDescPtr) -> RowGroupMetaDataBuilder {
RowGroupMetaDataBuilder::new(schema_descr)
}
Expand Down