From 5640a5b0c7d456a68c7c0cc562425ccf5494ecec Mon Sep 17 00:00:00 2001 From: Jiayu Liu Date: Thu, 24 Nov 2022 18:16:18 +0800 Subject: [PATCH] bloom filter part IV: adjust writer properties, bloom filter properties, and incorporate into column encoder (#3165) * rework bloom filter 1. update number of properties 2. push down hashing to encoder level 3. add more docs * move bloom filter * update prompt * remove unused updates --- parquet/src/arrow/array_reader/mod.rs | 2 +- parquet/src/arrow/arrow_writer/byte_array.rs | 6 + parquet/src/bin/parquet-fromcsv-help.txt | 9 +- parquet/src/bin/parquet-fromcsv.rs | 22 +- parquet/src/bin/parquet-show-bloom-filter.rs | 2 +- parquet/src/bloom_filter/mod.rs | 43 +-- parquet/src/column/writer/encoder.rs | 32 ++ parquet/src/column/writer/mod.rs | 22 +- parquet/src/file/properties.rs | 297 ++++++++++++------- 9 files changed, 284 insertions(+), 151 deletions(-) diff --git a/parquet/src/arrow/array_reader/mod.rs b/parquet/src/arrow/array_reader/mod.rs index aede5e86c69..f46f6073a71 100644 --- a/parquet/src/arrow/array_reader/mod.rs +++ b/parquet/src/arrow/array_reader/mod.rs @@ -102,7 +102,7 @@ pub trait RowGroupCollection { /// Get schema of parquet file. fn schema(&self) -> SchemaDescPtr; - /// Get the numer of rows in this collection + /// Get the number of rows in this collection fn num_rows(&self) -> usize; /// Returns an iterator over the column chunks for particular column diff --git a/parquet/src/arrow/arrow_writer/byte_array.rs b/parquet/src/arrow/arrow_writer/byte_array.rs index d5231785280..d870ac54fe4 100644 --- a/parquet/src/arrow/arrow_writer/byte_array.rs +++ b/parquet/src/arrow/arrow_writer/byte_array.rs @@ -17,6 +17,7 @@ use crate::arrow::arrow_writer::levels::LevelInfo; use crate::basic::Encoding; +use crate::bloom_filter::Sbbf; use crate::column::page::PageWriter; use crate::column::writer::encoder::{ ColumnValueEncoder, DataPageValues, DictionaryPage, @@ -451,6 +452,11 @@ impl ColumnValueEncoder for ByteArrayEncoder { } } + fn flush_bloom_filter(&mut self) -> Option { + // TODO FIX ME need to handle bloom filter in arrow writer + None + } + fn try_new(descr: &ColumnDescPtr, props: &WriterProperties) -> Result where Self: Sized, diff --git a/parquet/src/bin/parquet-fromcsv-help.txt b/parquet/src/bin/parquet-fromcsv-help.txt index f599a13f0f1..ec7eb0cc13f 100644 --- a/parquet/src/bin/parquet-fromcsv-help.txt +++ b/parquet/src/bin/parquet-fromcsv-help.txt @@ -37,10 +37,10 @@ Options: [possible values: lf, crlf, cr] -e, --escape-char - escape charactor + escape character -q, --quote-char - quate charactor + quote character -D, --double-quote double quote @@ -58,6 +58,11 @@ Options: -m, --max-row-group-size max row group size + --enable-bloom-filter + whether to enable bloom filter writing + + [possible values: true, false] + --help display usage help diff --git a/parquet/src/bin/parquet-fromcsv.rs b/parquet/src/bin/parquet-fromcsv.rs index 5fdece7cc8a..b11f3406cb3 100644 --- a/parquet/src/bin/parquet-fromcsv.rs +++ b/parquet/src/bin/parquet-fromcsv.rs @@ -57,11 +57,11 @@ //! //! - `-i`, `--input-file` : Path to input CSV file //! - `-f`, `--input-format` : Dialect for input file, `csv` or `tsv`. -//! - `-d`, `--delimiter : Field delimitor for CSV file, default depends `--input-format` -//! - `-e`, `--escape` : Escape charactor for input file +//! - `-d`, `--delimiter : Field delimiter for CSV file, default depends `--input-format` +//! - `-e`, `--escape` : Escape character for input file //! - `-h`, `--has-header` : Input has header -//! - `-r`, `--record-terminator` : Record terminator charactor for input. default is CRLF -//! - `-q`, `--quote-char` : Input quoting charactor +//! - `-r`, `--record-terminator` : Record terminator character for input. default is CRLF +//! - `-q`, `--quote-char` : Input quoting character //! use std::{ @@ -182,9 +182,9 @@ struct Args { delimiter: Option, #[clap(value_enum, short, long, help("record terminator"))] record_terminator: Option, - #[clap(short, long, help("escape charactor"))] + #[clap(short, long, help("escape character"))] escape_char: Option, - #[clap(short, long, help("quate charactor"))] + #[clap(short, long, help("quote character"))] quote_char: Option, #[clap(short('D'), long, help("double quote"))] double_quote: Option, @@ -197,6 +197,8 @@ struct Args { writer_version: Option, #[clap(short, long, help("max row group size"))] max_row_group_size: Option, + #[clap(long, help("whether to enable bloom filter writing"))] + enable_bloom_filter: Option, #[clap(long, action=clap::ArgAction::Help, help("display usage help"))] help: Option, @@ -290,6 +292,10 @@ fn configure_writer_properties(args: &Args) -> WriterProperties { properties_builder = properties_builder.set_max_row_group_size(max_row_group_size); } + if let Some(enable_bloom_filter) = args.enable_bloom_filter { + properties_builder = + properties_builder.set_bloom_filter_enabled(enable_bloom_filter); + } properties_builder.build() } @@ -548,6 +554,7 @@ mod tests { parquet_compression: Compression::SNAPPY, writer_version: None, max_row_group_size: None, + enable_bloom_filter: None, help: None, }; let arrow_schema = Arc::new(Schema::new(vec![ @@ -582,6 +589,7 @@ mod tests { parquet_compression: Compression::SNAPPY, writer_version: None, max_row_group_size: None, + enable_bloom_filter: None, help: None, }; let arrow_schema = Arc::new(Schema::new(vec![ @@ -636,6 +644,8 @@ mod tests { parquet_compression: Compression::SNAPPY, writer_version: None, max_row_group_size: None, + // by default we shall test bloom filter writing + enable_bloom_filter: Some(true), help: None, }; convert_csv_to_parquet(&args).unwrap(); diff --git a/parquet/src/bin/parquet-show-bloom-filter.rs b/parquet/src/bin/parquet-show-bloom-filter.rs index 28493a94c49..55ecb2abf13 100644 --- a/parquet/src/bin/parquet-show-bloom-filter.rs +++ b/parquet/src/bin/parquet-show-bloom-filter.rs @@ -83,7 +83,7 @@ fn main() { println!( "Value {} is {} in bloom filter", value, - if sbbf.check(value.as_str()) { + if sbbf.check(&value.as_str()) { "present" } else { "absent" diff --git a/parquet/src/bloom_filter/mod.rs b/parquet/src/bloom_filter/mod.rs index 4efba3834de..15c38cf5915 100644 --- a/parquet/src/bloom_filter/mod.rs +++ b/parquet/src/bloom_filter/mod.rs @@ -16,7 +16,7 @@ // under the License. //! Bloom filter implementation specific to Parquet, as described -//! in the [spec](https://github.com/apache/parquet-format/blob/master/BloomFilter.md) +//! in the [spec](https://github.com/apache/parquet-format/blob/master/BloomFilter.md). use crate::data_type::AsBytes; use crate::errors::ParquetError; @@ -35,7 +35,7 @@ use thrift::protocol::{ }; use twox_hash::XxHash64; -/// Salt as defined in the [spec](https://github.com/apache/parquet-format/blob/master/BloomFilter.md#technical-approach) +/// Salt as defined in the [spec](https://github.com/apache/parquet-format/blob/master/BloomFilter.md#technical-approach). const SALT: [u32; 8] = [ 0x47b6137b_u32, 0x44974d91_u32, @@ -83,7 +83,9 @@ fn block_check(block: &Block, hash: u32) -> bool { true } -/// A split block Bloom filter +/// A split block Bloom filter. The creation of this structure is based on the +/// [`crate::file::properties::BloomFilterProperties`] struct set via [`crate::file::properties::WriterProperties`] and +/// is thus hidden by default. #[derive(Debug, Clone)] pub struct Sbbf(Vec); @@ -118,8 +120,8 @@ fn read_bloom_filter_header_and_length( )) } -const BITSET_MIN_LENGTH: usize = 32; -const BITSET_MAX_LENGTH: usize = 128 * 1024 * 1024; +pub(crate) const BITSET_MIN_LENGTH: usize = 32; +pub(crate) const BITSET_MAX_LENGTH: usize = 128 * 1024 * 1024; #[inline] fn optimal_num_of_bytes(num_bytes: usize) -> usize { @@ -141,15 +143,20 @@ fn num_of_bits_from_ndv_fpp(ndv: u64, fpp: f64) -> usize { impl Sbbf { /// Create a new [Sbbf] with given number of distinct values and false positive probability. /// Will panic if `fpp` is greater than 1.0 or less than 0.0. - pub fn new_with_ndv_fpp(ndv: u64, fpp: f64) -> Self { - assert!((0.0..-1.0).contains(&fpp), "invalid fpp: {}", fpp); + pub(crate) fn new_with_ndv_fpp(ndv: u64, fpp: f64) -> Result { + if !(0.0..1.0).contains(&fpp) { + return Err(ParquetError::General(format!( + "False positive probability must be between 0.0 and 1.0, got {}", + fpp + ))); + } let num_bits = num_of_bits_from_ndv_fpp(ndv, fpp); - Self::new_with_num_of_bytes(num_bits / 8) + Ok(Self::new_with_num_of_bytes(num_bits / 8)) } /// Create a new [Sbbf] with given number of bytes, the exact number of bytes will be adjusted - /// to the next power of two bounded by `BITSET_MIN_LENGTH` and `BITSET_MAX_LENGTH`. - pub fn new_with_num_of_bytes(num_bytes: usize) -> Self { + /// to the next power of two bounded by [BITSET_MIN_LENGTH] and [BITSET_MAX_LENGTH]. + pub(crate) fn new_with_num_of_bytes(num_bytes: usize) -> Self { let num_bytes = optimal_num_of_bytes(num_bytes); let bitset = vec![0_u8; num_bytes]; Self::new(&bitset) @@ -170,7 +177,7 @@ impl Sbbf { } /// Write the bloom filter data (header and then bitset) to the output - pub fn write(&self, mut writer: W) -> Result<(), ParquetError> { + pub(crate) fn write(&self, mut writer: W) -> Result<(), ParquetError> { let mut protocol = TCompactOutputProtocol::new(&mut writer); let header = self.header(); header.write_to_out_protocol(&mut protocol).map_err(|e| { @@ -208,7 +215,7 @@ impl Sbbf { } /// Read a new bloom filter from the given offset in the given reader. - pub fn read_from_column_chunk( + pub(crate) fn read_from_column_chunk( column_metadata: &ColumnChunkMetaData, reader: Arc, ) -> Result, ParquetError> { @@ -254,7 +261,7 @@ impl Sbbf { } /// Insert an [AsBytes] value into the filter - pub fn insert(&mut self, value: T) { + pub fn insert(&mut self, value: &T) { self.insert_hash(hash_as_bytes(value)); } @@ -266,7 +273,7 @@ impl Sbbf { } /// Check if an [AsBytes] value is probably present or definitely absent in the filter - pub fn check(&self, value: T) -> bool { + pub fn check(&self, value: &T) -> bool { self.check_hash(hash_as_bytes(value)) } @@ -284,7 +291,7 @@ impl Sbbf { const SEED: u64 = 0; #[inline] -fn hash_as_bytes(value: A) -> u64 { +fn hash_as_bytes(value: &A) -> u64 { let mut hasher = XxHash64::with_seed(SEED); hasher.write(value.as_bytes()); hasher.finish() @@ -324,8 +331,8 @@ mod tests { fn test_sbbf_insert_and_check() { let mut sbbf = Sbbf(vec![[0_u32; 8]; 1_000]); for i in 0..1_000_000 { - sbbf.insert(i); - assert!(sbbf.check(i)); + sbbf.insert(&i); + assert!(sbbf.check(&i)); } } @@ -339,7 +346,7 @@ mod tests { let sbbf = Sbbf::new(bitset); for a in 0..10i64 { let value = format!("a{}", a); - assert!(sbbf.check(value.as_str())); + assert!(sbbf.check(&value.as_str())); } } diff --git a/parquet/src/column/writer/encoder.rs b/parquet/src/column/writer/encoder.rs index 22cc71f6cd5..0d0716d7a7d 100644 --- a/parquet/src/column/writer/encoder.rs +++ b/parquet/src/column/writer/encoder.rs @@ -16,6 +16,7 @@ // under the License. use crate::basic::Encoding; +use crate::bloom_filter::Sbbf; use crate::column::writer::{ compare_greater, fallback_encoding, has_dictionary_support, is_nan, update_max, update_min, @@ -24,6 +25,7 @@ use crate::data_type::private::ParquetValueType; use crate::data_type::DataType; use crate::encodings::encoding::{get_encoder, DictEncoder, Encoder}; use crate::errors::{ParquetError, Result}; +use crate::file::properties::BloomFilterProperties; use crate::file::properties::{EnabledStatistics, WriterProperties}; use crate::schema::types::{ColumnDescPtr, ColumnDescriptor}; use crate::util::memory::ByteBufferPtr; @@ -115,6 +117,11 @@ pub trait ColumnValueEncoder { /// Flush the next data page for this column chunk fn flush_data_page(&mut self) -> Result>; + + /// Flushes bloom filter if enabled and returns it, otherwise returns `None`. Subsequent writes + /// will *not* be tracked by the bloom filter as it is empty since. This should be called once + /// near the end of encoding. + fn flush_bloom_filter(&mut self) -> Option; } pub struct ColumnValueEncoderImpl { @@ -125,6 +132,7 @@ pub struct ColumnValueEncoderImpl { statistics_enabled: EnabledStatistics, min_value: Option, max_value: Option, + bloom_filter: Option, } impl ColumnValueEncoderImpl { @@ -136,6 +144,13 @@ impl ColumnValueEncoderImpl { } } + // encode the values into bloom filter if enabled + if let Some(bloom_filter) = &mut self.bloom_filter { + for value in slice { + bloom_filter.insert(value); + } + } + match &mut self.dict_encoder { Some(encoder) => encoder.put(slice), _ => self.encoder.put(slice), @@ -161,6 +176,10 @@ impl ColumnValueEncoder for ColumnValueEncoderImpl { } } + fn flush_bloom_filter(&mut self) -> Option { + self.bloom_filter.take() + } + fn try_new(descr: &ColumnDescPtr, props: &WriterProperties) -> Result { let dict_supported = props.dictionary_enabled(descr.path()) && has_dictionary_support(T::get_physical_type(), props); @@ -175,12 +194,25 @@ impl ColumnValueEncoder for ColumnValueEncoderImpl { let statistics_enabled = props.statistics_enabled(descr.path()); + let bloom_filter_enabled = props.bloom_filter_enabled(descr.path()); + let bloom_filter = + if let Some(BloomFilterProperties { ndv, fpp }) = bloom_filter_enabled { + Sbbf::new_with_ndv_fpp(ndv, fpp) + .map_err(|e| { + eprintln!("invalid bloom filter properties: {}", e); + }) + .ok() + } else { + None + }; + Ok(Self { encoder, dict_encoder, descr: descr.clone(), num_values: 0, statistics_enabled, + bloom_filter, min_value: None, max_value: None, }) diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs index ae7920e2283..40f8c99403f 100644 --- a/parquet/src/column/writer/mod.rs +++ b/parquet/src/column/writer/mod.rs @@ -212,10 +212,6 @@ pub struct GenericColumnWriter<'a, E: ColumnValueEncoder> { def_levels_sink: Vec, rep_levels_sink: Vec, data_pages: VecDeque, - - // bloom filter - bloom_filter: Option, - // column index and offset index column_index_builder: ColumnIndexBuilder, offset_index_builder: OffsetIndexBuilder, @@ -238,19 +234,6 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { // Used for level information encodings.insert(Encoding::RLE); - let bloom_filter_enabled = props.bloom_filter_enabled(descr.path()); - let bloom_filter = if bloom_filter_enabled { - if let Some(ndv) = props.bloom_filter_ndv(descr.path()) { - let fpp = props.bloom_filter_fpp(descr.path()); - Some(Sbbf::new_with_ndv_fpp(ndv, fpp)) - } else { - let max_bytes = props.bloom_filter_max_bytes(descr.path()); - Some(Sbbf::new_with_num_of_bytes(max_bytes as usize)) - } - } else { - None - }; - Self { descr, props, @@ -280,7 +263,6 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { num_column_nulls: 0, column_distinct_count: None, }, - bloom_filter, column_index_builder: ColumnIndexBuilder::new(), offset_index_builder: OffsetIndexBuilder::new(), encodings, @@ -454,7 +436,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { &self.descr } - /// Finalises writes and closes the column writer. + /// Finalizes writes and closes the column writer. /// Returns total bytes written, total rows written and column chunk metadata. pub fn close(mut self) -> Result { if self.page_metrics.num_buffered_values > 0 { @@ -479,7 +461,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { Ok(ColumnCloseResult { bytes_written: self.column_metrics.total_bytes_written, rows_written: self.column_metrics.total_rows_written, - bloom_filter: self.bloom_filter, + bloom_filter: self.encoder.flush_bloom_filter(), metadata, column_index, offset_index, diff --git a/parquet/src/file/properties.rs b/parquet/src/file/properties.rs index 03117d4cb07..6d30be2e4ba 100644 --- a/parquet/src/file/properties.rs +++ b/parquet/src/file/properties.rs @@ -64,7 +64,6 @@ //! .build(); //! ``` -use paste::paste; use std::{collections::HashMap, sync::Arc}; use crate::basic::{Compression, Encoding}; @@ -83,9 +82,10 @@ const DEFAULT_STATISTICS_ENABLED: EnabledStatistics = EnabledStatistics::Page; const DEFAULT_MAX_STATISTICS_SIZE: usize = 4096; const DEFAULT_MAX_ROW_GROUP_SIZE: usize = 1024 * 1024; const DEFAULT_CREATED_BY: &str = env!("PARQUET_CREATED_BY"); -const DEFAULT_BLOOM_FILTER_ENABLED: bool = false; -const DEFAULT_BLOOM_FILTER_MAX_BYTES: u32 = 1024 * 1024; -const DEFAULT_BLOOM_FILTER_FPP: f64 = 0.01; +/// default value for the false positive probability used in a bloom filter. +pub const DEFAULT_BLOOM_FILTER_FPP: f64 = 0.05; +/// default value for the expected number of distinct values used in a bloom filter. +pub const DEFAULT_BLOOM_FILTER_NDV: u64 = 1_000_000_u64; /// Parquet writer version. /// @@ -129,26 +129,6 @@ pub struct WriterProperties { sorting_columns: Option>, } -macro_rules! def_col_property_getter { - ($field:ident, $field_type:ty) => { - pub fn $field(&self, col: &ColumnPath) -> Option<$field_type> { - self.column_properties - .get(col) - .and_then(|c| c.$field()) - .or_else(|| self.default_column_properties.$field()) - } - }; - ($field:ident, $field_type:ty, $default_val:expr) => { - pub fn $field(&self, col: &ColumnPath) -> $field_type { - self.column_properties - .get(col) - .and_then(|c| c.$field()) - .or_else(|| self.default_column_properties.$field()) - .unwrap_or($default_val) - } - }; -} - impl WriterProperties { /// Returns builder for writer properties with default values. pub fn builder() -> WriterPropertiesBuilder { @@ -280,10 +260,17 @@ impl WriterProperties { .unwrap_or(DEFAULT_MAX_STATISTICS_SIZE) } - def_col_property_getter!(bloom_filter_enabled, bool, DEFAULT_BLOOM_FILTER_ENABLED); - def_col_property_getter!(bloom_filter_fpp, f64, DEFAULT_BLOOM_FILTER_FPP); - def_col_property_getter!(bloom_filter_ndv, u64); - def_col_property_getter!(bloom_filter_max_bytes, u32, DEFAULT_BLOOM_FILTER_MAX_BYTES); + /// Returns whether bloom filter is enabled for a given column. Bloom filter can be enabled over + /// all or for a specific column, and is by default set to be disabled. + pub fn bloom_filter_enabled( + &self, + col: &ColumnPath, + ) -> Option { + self.column_properties + .get(col) + .and_then(|c| c.bloom_filter_enabled()) + .or_else(|| self.default_column_properties.bloom_filter_enabled()) + } } /// Writer properties builder. @@ -301,52 +288,6 @@ pub struct WriterPropertiesBuilder { sorting_columns: Option>, } -macro_rules! def_opt_field_setter { - ($field: ident, $type: ty) => { - paste! { - pub fn [](&mut self, value: $type) -> &mut Self { - self.$field = Some(value); - self - } - } - }; - ($field: ident, $type: ty, $min_value:expr, $max_value:expr) => { - paste! { - pub fn [](&mut self, value: $type) -> &mut Self { - if ($min_value..=$max_value).contains(&value) { - self.$field = Some(value); - } else { - self.$field = None - } - self - } - } - }; -} - -macro_rules! def_opt_field_getter { - ($field: ident, $type: ty) => { - paste! { - #[doc = "Returns " $field " if set."] - pub fn $field(&self) -> Option<$type> { - self.$field - } - } - }; -} - -macro_rules! def_per_col_setter { - ($field:ident, $field_type:ty) => { - paste! { - #[doc = "Sets " $field " for a column. Takes precedence over globally defined settings."] - pub fn [](mut self, col: ColumnPath, value: $field_type) -> Self { - self.get_mut_props(col).[](value); - self - } - } - } -} - impl WriterPropertiesBuilder { /// Returns default state of the builder. fn with_defaults() -> Self { @@ -506,6 +447,30 @@ impl WriterPropertiesBuilder { self } + /// Sets whether bloom filter is enabled for any column. + /// If the bloom filter is enabled previously then it is a no-op. + /// If the bloom filter is not yet enabled, a default set of ndv and fpp value will be used. + /// You can use [`set_bloom_filter_ndv`](Self::set_bloom_filter_ndv) and [`set_bloom_filter_fpp`](Self::set_bloom_filter_fpp) to further adjust the ndv and fpp. + pub fn set_bloom_filter_enabled(mut self, value: bool) -> Self { + self.default_column_properties + .set_bloom_filter_enabled(value); + self + } + + /// Sets bloom filter false positive probability (fpp) for any column. + /// Implicitly [`set_bloom_filter_enabled`](Self::set_bloom_filter_enabled). + pub fn set_bloom_filter_fpp(mut self, value: f64) -> Self { + self.default_column_properties.set_bloom_filter_fpp(value); + self + } + + /// Sets number of distinct values (ndv) for bloom filter for any column. + /// Implicitly [`set_bloom_filter_enabled`](Self::set_bloom_filter_enabled). + pub fn set_bloom_filter_ndv(mut self, value: u64) -> Self { + self.default_column_properties.set_bloom_filter_ndv(value); + self + } + // ---------------------------------------------------------------------- // Setters for a specific column @@ -568,10 +533,33 @@ impl WriterPropertiesBuilder { self } - def_per_col_setter!(bloom_filter_enabled, bool); - def_per_col_setter!(bloom_filter_fpp, f64); - def_per_col_setter!(bloom_filter_max_bytes, u32); - def_per_col_setter!(bloom_filter_ndv, u64); + /// Sets whether a bloom filter should be created for a specific column. + /// The behavior is similar to [`set_bloom_filter_enabled`](Self::set_bloom_filter_enabled). + /// Takes precedence over globally defined settings. + pub fn set_column_bloom_filter_enabled( + mut self, + col: ColumnPath, + value: bool, + ) -> Self { + self.get_mut_props(col).set_bloom_filter_enabled(value); + self + } + + /// Sets the false positive probability for bloom filter for a specific column. + /// The behavior is similar to [`set_bloom_filter_fpp`](Self::set_bloom_filter_fpp) but will + /// override the default. + pub fn set_column_bloom_filter_fpp(mut self, col: ColumnPath, value: f64) -> Self { + self.get_mut_props(col).set_bloom_filter_fpp(value); + self + } + + /// Sets the number of distinct values for bloom filter for a specific column. + /// The behavior is similar to [`set_bloom_filter_ndv`](Self::set_bloom_filter_ndv) but will + /// override the default. + pub fn set_column_bloom_filter_ndv(mut self, col: ColumnPath, value: u64) -> Self { + self.get_mut_props(col).set_bloom_filter_ndv(value); + self + } } /// Controls the level of statistics to be computed by the writer @@ -591,6 +579,43 @@ impl Default for EnabledStatistics { } } +/// Controls the bloom filter to be computed by the writer. +#[derive(Debug, Clone, PartialEq)] +pub struct BloomFilterProperties { + /// False positive probability, should be always between 0 and 1 exclusive. Defaults to [`DEFAULT_BLOOM_FILTER_FPP`]. + /// + /// You should set this value by calling [`WriterPropertiesBuilder::set_bloom_filter_fpp`]. + /// + /// The bloom filter data structure is a trade of between disk and memory space versus fpp, the + /// smaller the fpp, the more memory and disk space is required, thus setting it to a reasonable value + /// e.g. 0.1, 0.05, or 0.001 is recommended. + /// + /// Setting to very small number diminishes the value of the filter itself, as the bitset size is + /// even larger than just storing the whole value. You are also expected to set `ndv` if it can + /// be known in advance in order to largely reduce space usage. + pub fpp: f64, + /// Number of distinct values, should be non-negative to be meaningful. Defaults to [`DEFAULT_BLOOM_FILTER_NDV`]. + /// + /// You should set this value by calling [`WriterPropertiesBuilder::set_bloom_filter_ndv`]. + /// + /// Usage of bloom filter is most beneficial for columns with large cardinality, so a good heuristic + /// is to set ndv to number of rows. However it can reduce disk size if you know in advance a smaller + /// number of distinct values. For very small ndv value it is probably not worth it to use bloom filter + /// anyway. + /// + /// Increasing this value (without increasing fpp) will result in an increase in disk or memory size. + pub ndv: u64, +} + +impl Default for BloomFilterProperties { + fn default() -> Self { + BloomFilterProperties { + fpp: DEFAULT_BLOOM_FILTER_FPP, + ndv: DEFAULT_BLOOM_FILTER_NDV, + } + } +} + /// Container for column properties that can be changed as part of writer. /// /// If a field is `None`, it means that no specific value has been set for this column, @@ -602,14 +627,8 @@ struct ColumnProperties { dictionary_enabled: Option, statistics_enabled: Option, max_statistics_size: Option, - /// bloom filter enabled - bloom_filter_enabled: Option, - /// bloom filter expected number of distinct values - bloom_filter_ndv: Option, - /// bloom filter false positive probability - bloom_filter_fpp: Option, - /// bloom filter max number of bytes - bloom_filter_max_bytes: Option, + /// bloom filter related properties + bloom_filter_enabled: Option, } impl ColumnProperties { @@ -649,10 +668,45 @@ impl ColumnProperties { self.max_statistics_size = Some(value); } - def_opt_field_setter!(bloom_filter_enabled, bool); - def_opt_field_setter!(bloom_filter_fpp, f64, 0.0, 1.0); - def_opt_field_setter!(bloom_filter_max_bytes, u32); - def_opt_field_setter!(bloom_filter_ndv, u64); + /// If `value` is `true`, sets bloom filter properties to default values if not previously set, + /// otherwise it is a no-op. + /// If `value` is `false`, resets bloom filter properties to `None`. + fn set_bloom_filter_enabled(&mut self, value: bool) { + if value { + self.bloom_filter_enabled = self + .bloom_filter_enabled() + .or_else(|| Some(Default::default())); + } else { + self.bloom_filter_enabled = None; + } + } + + /// Sets the false positive probability for bloom filter for this column, and implicitly enables + /// bloom filter if not previously enabled. If the `value` is not between 0 and 1 exclusive, it is + /// discarded as no-op. + fn set_bloom_filter_fpp(&mut self, value: f64) { + if (0.0..1.0).contains(&value) { + self.bloom_filter_enabled = self + .bloom_filter_enabled() + .or_else(|| Some(Default::default())) + .map(|BloomFilterProperties { ndv, .. }| BloomFilterProperties { + ndv, + fpp: value, + }); + } + } + + /// Sets the number of distinct (unique) values for bloom filter for this column, and implicitly + /// enables bloom filter if not previously enabled. + fn set_bloom_filter_ndv(&mut self, value: u64) { + self.bloom_filter_enabled = self + .bloom_filter_enabled() + .or_else(|| Some(Default::default())) + .map(|BloomFilterProperties { fpp, .. }| BloomFilterProperties { + ndv: value, + fpp, + }); + } /// Returns optional encoding for this column. fn encoding(&self) -> Option { @@ -682,10 +736,10 @@ impl ColumnProperties { self.max_statistics_size } - def_opt_field_getter!(bloom_filter_enabled, bool); - def_opt_field_getter!(bloom_filter_fpp, f64); - def_opt_field_getter!(bloom_filter_max_bytes, u32); - def_opt_field_getter!(bloom_filter_ndv, u64); + /// Returns bloom filter properties if set. + fn bloom_filter_enabled(&self) -> Option { + self.bloom_filter_enabled.clone() + } } /// Reference counted reader properties. @@ -812,13 +866,9 @@ mod tests { props.max_statistics_size(&ColumnPath::from("col")), DEFAULT_MAX_STATISTICS_SIZE ); - assert!(!props.bloom_filter_enabled(&ColumnPath::from("col"))); - assert_eq!(props.bloom_filter_fpp(&ColumnPath::from("col")), 0.01); - assert_eq!(props.bloom_filter_ndv(&ColumnPath::from("col")), None); - assert_eq!( - props.bloom_filter_max_bytes(&ColumnPath::from("col")), - 1024 * 1024 - ); + assert!(props + .bloom_filter_enabled(&ColumnPath::from("col")) + .is_none()); } #[test] @@ -903,9 +953,8 @@ mod tests { ) .set_column_max_statistics_size(ColumnPath::from("col"), 123) .set_column_bloom_filter_enabled(ColumnPath::from("col"), true) - .set_column_bloom_filter_ndv(ColumnPath::from("col"), 100) + .set_column_bloom_filter_ndv(ColumnPath::from("col"), 100_u64) .set_column_bloom_filter_fpp(ColumnPath::from("col"), 0.1) - .set_column_bloom_filter_max_bytes(ColumnPath::from("col"), 1000) .build(); assert_eq!(props.writer_version(), WriterVersion::PARQUET_2_0); @@ -947,6 +996,10 @@ mod tests { EnabledStatistics::Chunk ); assert_eq!(props.max_statistics_size(&ColumnPath::from("col")), 123); + assert_eq!( + props.bloom_filter_enabled(&ColumnPath::from("col")), + Some(BloomFilterProperties { fpp: 0.1, ndv: 100 }) + ); } #[test] @@ -954,6 +1007,7 @@ mod tests { let props = WriterProperties::builder() .set_encoding(Encoding::DELTA_BINARY_PACKED) .set_compression(Compression::GZIP) + .set_bloom_filter_enabled(true) .set_column_encoding(ColumnPath::from("col"), Encoding::RLE) .build(); @@ -969,6 +1023,43 @@ mod tests { props.dictionary_enabled(&ColumnPath::from("col")), DEFAULT_DICTIONARY_ENABLED ); + assert_eq!( + props.bloom_filter_enabled(&ColumnPath::from("col")), + Some(BloomFilterProperties { + fpp: 0.05, + ndv: 1_000_000_u64 + }) + ); + } + + #[test] + fn test_writer_properties_bloom_filter_ndv_fpp_set() { + assert_eq!( + WriterProperties::builder() + .build() + .bloom_filter_enabled(&ColumnPath::from("col")), + None + ); + assert_eq!( + WriterProperties::builder() + .set_bloom_filter_ndv(100) + .build() + .bloom_filter_enabled(&ColumnPath::from("col")), + Some(BloomFilterProperties { + fpp: 0.05, + ndv: 100 + }) + ); + assert_eq!( + WriterProperties::builder() + .set_bloom_filter_fpp(0.1) + .build() + .bloom_filter_enabled(&ColumnPath::from("col")), + Some(BloomFilterProperties { + fpp: 0.1, + ndv: 1_000_000_u64 + }) + ); } #[test]