From f4558aeb294a9798c4fd474192615a023ef02851 Mon Sep 17 00:00:00 2001 From: Jiayu Liu Date: Sun, 13 Nov 2022 13:13:05 +0000 Subject: [PATCH 01/12] bloom filter part III - add reader properties - add writer properties - remove `bloom` feature --- parquet/Cargo.toml | 9 +- parquet/src/bloom_filter/mod.rs | 30 ++++++ parquet/src/column/writer/mod.rs | 10 ++ parquet/src/file/metadata.rs | 2 +- parquet/src/file/properties.rs | 139 +++++++++++++++++++++++--- parquet/src/file/reader.rs | 4 +- parquet/src/file/serialized_reader.rs | 27 +++-- parquet/src/file/writer.rs | 42 +++++++- parquet/src/lib.rs | 1 - 9 files changed, 229 insertions(+), 35 deletions(-) diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml index fc7c8218ad0..0af5369ced1 100644 --- a/parquet/Cargo.toml +++ b/parquet/Cargo.toml @@ -57,7 +57,8 @@ seq-macro = { version = "0.3", default-features = false } futures = { version = "0.3", default-features = false, features = ["std"], optional = true } tokio = { version = "1.0", optional = true, default-features = false, features = ["macros", "rt", "io-util"] } hashbrown = { version = "0.13", default-features = false } -twox-hash = { version = "1.6", optional = true } +twox-hash = { version = "1.6" } +paste = { version = "1.0" } [dev-dependencies] base64 = { version = "0.13", default-features = false, features = ["std"] } @@ -77,7 +78,7 @@ rand = { version = "0.8", default-features = false, features = ["std", "std_rng" all-features = true [features] -default = ["arrow", "bloom", "snap", "brotli", "flate2", "lz4", "zstd", "base64"] +default = ["arrow", "snap", "brotli", "flate2", "lz4", "zstd", "base64"] # Enable arrow reader/writer APIs arrow = ["base64", "arrow-array", "arrow-buffer", "arrow-cast", "arrow-data", "arrow-schema", "arrow-select", "arrow-ipc"] # Enable CLI tools @@ -90,8 +91,6 @@ test_common = ["arrow/test_utils"] experimental = [] # Enable async APIs async = ["futures", "tokio"] -# Bloomfilter -bloom = ["twox-hash"] [[test]] name = "arrow_writer_layout" @@ -115,7 +114,7 @@ required-features = ["arrow", "cli"] [[bin]] name = "parquet-show-bloom-filter" -required-features = ["cli", "bloom"] +required-features = ["cli"] [[bench]] name = "arrow_writer" diff --git a/parquet/src/bloom_filter/mod.rs b/parquet/src/bloom_filter/mod.rs index 4944a93f848..90f00d0d7c1 100644 --- a/parquet/src/bloom_filter/mod.rs +++ b/parquet/src/bloom_filter/mod.rs @@ -24,9 +24,11 @@ use crate::file::metadata::ColumnChunkMetaData; use crate::file::reader::ChunkReader; use crate::format::{ BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash, BloomFilterHeader, + SplitBlockAlgorithm, Uncompressed, XxHash, }; use bytes::{Buf, Bytes}; use std::hash::Hasher; +use std::io::Write; use std::sync::Arc; use thrift::protocol::{TCompactInputProtocol, TSerializable}; use twox_hash::XxHash64; @@ -80,6 +82,7 @@ fn block_check(block: &Block, hash: u32) -> bool { } /// A split block Bloom filter +#[derive(Debug, Clone)] pub struct Sbbf(Vec); const SBBF_HEADER_SIZE_ESTIMATE: usize = 20; @@ -128,6 +131,33 @@ impl Sbbf { Self(data) } + /// Write the bitset in serialized form to the writer. + pub fn write_bitset(&self, mut writer: W) -> Result<(), ParquetError> { + for block in &self.0 { + for word in block { + writer.write_all(&word.to_le_bytes()).map_err(|e| { + ParquetError::General(format!( + "Could not write bloom filter bit set: {}", + e + )) + })?; + } + } + Ok(()) + } + + /// Create and populate [`BloomFilterHeader`] from this bitset for writing to serialized form + pub fn header(&self) -> BloomFilterHeader { + BloomFilterHeader { + // 8 i32 per block, 4 bytes per i32 + num_bytes: self.0.len() as i32 * 4 * 8, + algorithm: BloomFilterAlgorithm::BLOCK(SplitBlockAlgorithm {}), + hash: BloomFilterHash::XXHASH(XxHash {}), + compression: BloomFilterCompression::UNCOMPRESSED(Uncompressed {}), + } + } + + /// Read a new bloom filter from the given offset in the given reader. pub fn read_from_column_chunk( column_metadata: &ColumnChunkMetaData, reader: Arc, diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs index 3cdf04f5494..dd6915aa4e4 100644 --- a/parquet/src/column/writer/mod.rs +++ b/parquet/src/column/writer/mod.rs @@ -16,6 +16,8 @@ // under the License. //! Contains column writer API. + +use crate::bloom_filter::Sbbf; use crate::format::{ColumnIndex, OffsetIndex}; use std::collections::{BTreeSet, VecDeque}; @@ -154,6 +156,8 @@ pub struct ColumnCloseResult { pub rows_written: u64, /// Metadata for this column chunk pub metadata: ColumnChunkMetaData, + /// Optional bloom filter for this column + pub bloom_filter: Option, /// Optional column index, for filtering pub column_index: Option, /// Optional offset index, identifying page locations @@ -209,6 +213,9 @@ pub struct GenericColumnWriter<'a, E: ColumnValueEncoder> { rep_levels_sink: Vec, data_pages: VecDeque, + // bloom filter + bloom_filter: Option, + // column index and offset index column_index_builder: ColumnIndexBuilder, offset_index_builder: OffsetIndexBuilder, @@ -260,6 +267,8 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { num_column_nulls: 0, column_distinct_count: None, }, + // TODO! + bloom_filter: None, column_index_builder: ColumnIndexBuilder::new(), offset_index_builder: OffsetIndexBuilder::new(), encodings, @@ -458,6 +467,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { Ok(ColumnCloseResult { bytes_written: self.column_metrics.total_bytes_written, rows_written: self.column_metrics.total_rows_written, + bloom_filter: self.bloom_filter, metadata, column_index, offset_index, diff --git a/parquet/src/file/metadata.rs b/parquet/src/file/metadata.rs index 895776a8a42..2ba50fa31a1 100644 --- a/parquet/src/file/metadata.rs +++ b/parquet/src/file/metadata.rs @@ -236,7 +236,7 @@ pub struct RowGroupMetaData { } impl RowGroupMetaData { - /// Returns builer for row group metadata. + /// Returns builder for row group metadata. pub fn builder(schema_descr: SchemaDescPtr) -> RowGroupMetaDataBuilder { RowGroupMetaDataBuilder::new(schema_descr) } diff --git a/parquet/src/file/properties.rs b/parquet/src/file/properties.rs index c65ba8035ee..31b380adbf2 100644 --- a/parquet/src/file/properties.rs +++ b/parquet/src/file/properties.rs @@ -64,6 +64,7 @@ //! .build(); //! ``` +use paste::paste; use std::{collections::HashMap, sync::Arc}; use crate::basic::{Compression, Encoding}; @@ -82,6 +83,9 @@ const DEFAULT_STATISTICS_ENABLED: EnabledStatistics = EnabledStatistics::Page; const DEFAULT_MAX_STATISTICS_SIZE: usize = 4096; const DEFAULT_MAX_ROW_GROUP_SIZE: usize = 1024 * 1024; const DEFAULT_CREATED_BY: &str = env!("PARQUET_CREATED_BY"); +const DEFAULT_BLOOM_FILTER_ENABLED: bool = false; +const DEFAULT_BLOOM_FILTER_MAX_BYTES: u32 = 1024 * 1024; +const DEFAULT_BLOOM_FILTER_FPP: f64 = 0.01; /// Parquet writer version. /// @@ -125,6 +129,26 @@ pub struct WriterProperties { sorting_columns: Option>, } +macro_rules! def_col_property_getter { + ($field:ident, $field_type:ty) => { + pub fn $field(&self, col: &ColumnPath) -> Option<$field_type> { + self.column_properties + .get(col) + .and_then(|c| c.$field()) + .or_else(|| self.default_column_properties.$field()) + } + }; + ($field:ident, $field_type:ty, $default_val:expr) => { + pub fn $field(&self, col: &ColumnPath) -> $field_type { + self.column_properties + .get(col) + .and_then(|c| c.$field()) + .or_else(|| self.default_column_properties.$field()) + .unwrap_or($default_val) + } + }; +} + impl WriterProperties { /// Returns builder for writer properties with default values. pub fn builder() -> WriterPropertiesBuilder { @@ -255,6 +279,11 @@ impl WriterProperties { .or_else(|| self.default_column_properties.max_statistics_size()) .unwrap_or(DEFAULT_MAX_STATISTICS_SIZE) } + + def_col_property_getter!(bloom_filter_enabled, bool, DEFAULT_BLOOM_FILTER_ENABLED); + def_col_property_getter!(bloom_filter_fpp, f64, DEFAULT_BLOOM_FILTER_FPP); + def_col_property_getter!(bloom_filter_ndv, u64); + def_col_property_getter!(bloom_filter_max_bytes, u32, DEFAULT_BLOOM_FILTER_MAX_BYTES); } /// Writer properties builder. @@ -272,6 +301,40 @@ pub struct WriterPropertiesBuilder { sorting_columns: Option>, } +macro_rules! def_opt_field_setter { + ($field: ident, $type: ty) => { + paste! { + pub fn [](&mut self, value: $type) -> &mut Self { + self.$field = Some(value); + self + } + } + }; +} + +macro_rules! def_opt_field_getter { + ($field: ident, $type: ty) => { + paste! { + #[doc = "Returns " $field " if set."] + pub fn $field(&self) -> Option<$type> { + self.$field + } + } + }; +} + +macro_rules! def_per_col_setter { + ($field:ident, $field_type:ty) => { + paste! { + #[doc = "Sets " $field " for a column. Takes precedence over globally defined settings."] + pub fn [](mut self, col: ColumnPath, value: $field_type) -> Self { + self.get_mut_props(col).[](value); + self + } + } + } +} + impl WriterPropertiesBuilder { /// Returns default state of the builder. fn with_defaults() -> Self { @@ -284,7 +347,7 @@ impl WriterPropertiesBuilder { writer_version: DEFAULT_WRITER_VERSION, created_by: DEFAULT_CREATED_BY.to_string(), key_value_metadata: None, - default_column_properties: ColumnProperties::new(), + default_column_properties: Default::default(), column_properties: HashMap::new(), sorting_columns: None, } @@ -439,7 +502,7 @@ impl WriterPropertiesBuilder { fn get_mut_props(&mut self, col: ColumnPath) -> &mut ColumnProperties { self.column_properties .entry(col) - .or_insert_with(ColumnProperties::new) + .or_insert_with(Default::default) } /// Sets encoding for a column. @@ -492,6 +555,11 @@ impl WriterPropertiesBuilder { self.get_mut_props(col).set_max_statistics_size(value); self } + + def_per_col_setter!(bloom_filter_enabled, bool); + def_per_col_setter!(bloom_filter_fpp, f64); + def_per_col_setter!(bloom_filter_max_bytes, u32); + def_per_col_setter!(bloom_filter_ndv, u64); } /// Controls the level of statistics to be computed by the writer @@ -515,27 +583,24 @@ impl Default for EnabledStatistics { /// /// If a field is `None`, it means that no specific value has been set for this column, /// so some subsequent or default value must be used. -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone, Default, PartialEq)] struct ColumnProperties { encoding: Option, codec: Option, dictionary_enabled: Option, statistics_enabled: Option, max_statistics_size: Option, + /// bloom filter enabled + bloom_filter_enabled: Option, + /// bloom filter expected number of distinct values + bloom_filter_ndv: Option, + /// bloom filter false positive probability + bloom_filter_fpp: Option, + /// bloom filter max number of bytes + bloom_filter_max_bytes: Option, } impl ColumnProperties { - /// Initialise column properties with default values. - fn new() -> Self { - Self { - encoding: None, - codec: None, - dictionary_enabled: None, - statistics_enabled: None, - max_statistics_size: None, - } - } - /// Sets encoding for this column. /// /// If dictionary is not enabled, this is treated as a primary encoding for a column. @@ -572,6 +637,11 @@ impl ColumnProperties { self.max_statistics_size = Some(value); } + def_opt_field_setter!(bloom_filter_enabled, bool); + def_opt_field_setter!(bloom_filter_fpp, f64); + def_opt_field_setter!(bloom_filter_max_bytes, u32); + def_opt_field_setter!(bloom_filter_ndv, u64); + /// Returns optional encoding for this column. fn encoding(&self) -> Option { self.encoding @@ -599,17 +669,25 @@ impl ColumnProperties { fn max_statistics_size(&self) -> Option { self.max_statistics_size } + + def_opt_field_getter!(bloom_filter_enabled, bool); + def_opt_field_getter!(bloom_filter_fpp, f64); + def_opt_field_getter!(bloom_filter_max_bytes, u32); + def_opt_field_getter!(bloom_filter_ndv, u64); } /// Reference counted reader properties. pub type ReaderPropertiesPtr = Arc; +const DEFAULT_READ_BLOOM_FILTER: bool = true; + /// Reader properties. /// /// All properties are immutable and `Send` + `Sync`. /// Use [`ReaderPropertiesBuilder`] to assemble these properties. pub struct ReaderProperties { codec_options: CodecOptions, + read_bloom_filter: bool, } impl ReaderProperties { @@ -622,11 +700,17 @@ impl ReaderProperties { pub(crate) fn codec_options(&self) -> &CodecOptions { &self.codec_options } + + /// Returns whether to read bloom filter + pub(crate) fn read_bloom_filter(&self) -> bool { + self.read_bloom_filter + } } /// Reader properties builder. pub struct ReaderPropertiesBuilder { codec_options_builder: CodecOptionsBuilder, + read_bloom_filter: Option, } /// Reader properties builder. @@ -635,6 +719,7 @@ impl ReaderPropertiesBuilder { fn with_defaults() -> Self { Self { codec_options_builder: CodecOptionsBuilder::default(), + read_bloom_filter: None, } } @@ -642,6 +727,9 @@ impl ReaderPropertiesBuilder { pub fn build(self) -> ReaderProperties { ReaderProperties { codec_options: self.codec_options_builder.build(), + read_bloom_filter: self + .read_bloom_filter + .unwrap_or(DEFAULT_READ_BLOOM_FILTER), } } @@ -659,6 +747,17 @@ impl ReaderPropertiesBuilder { .set_backward_compatible_lz4(value); self } + + /// Enable/disable reading bloom filter + /// + /// If reading bloom filter is enabled, bloom filter will be read from the file. + /// If reading bloom filter is disabled, bloom filter will not be read from the file. + /// + /// By default bloom filter is set to be read. + pub fn set_read_bloom_filter(mut self, value: bool) -> Self { + self.read_bloom_filter = Some(value); + self + } } #[cfg(test)] @@ -701,6 +800,13 @@ mod tests { props.max_statistics_size(&ColumnPath::from("col")), DEFAULT_MAX_STATISTICS_SIZE ); + assert_eq!(props.bloom_filter_enabled(&ColumnPath::from("col")), false); + assert_eq!(props.bloom_filter_fpp(&ColumnPath::from("col")), 0.01); + assert_eq!(props.bloom_filter_ndv(&ColumnPath::from("col")), None); + assert_eq!( + props.bloom_filter_max_bytes(&ColumnPath::from("col")), + 1024 * 1024 + ); } #[test] @@ -784,6 +890,10 @@ mod tests { EnabledStatistics::Chunk, ) .set_column_max_statistics_size(ColumnPath::from("col"), 123) + .set_column_bloom_filter_enabled(ColumnPath::from("col"), true) + .set_column_bloom_filter_ndv(ColumnPath::from("col"), 100) + .set_column_bloom_filter_fpp(ColumnPath::from("col"), 0.1) + .set_column_bloom_filter_max_bytes(ColumnPath::from("col"), 1000) .build(); assert_eq!(props.writer_version(), WriterVersion::PARQUET_2_0); @@ -858,6 +968,7 @@ mod tests { .build(); assert_eq!(props.codec_options(), &codec_options); + assert_eq!(props.read_bloom_filter(), true); } #[test] diff --git a/parquet/src/file/reader.rs b/parquet/src/file/reader.rs index 325944c2168..4610ccc54fb 100644 --- a/parquet/src/file/reader.rs +++ b/parquet/src/file/reader.rs @@ -21,7 +21,6 @@ use bytes::Bytes; use std::{boxed::Box, io::Read, sync::Arc}; -#[cfg(feature = "bloom")] use crate::bloom_filter::Sbbf; use crate::column::page::PageIterator; use crate::column::{page::PageReader, reader::ColumnReader}; @@ -145,9 +144,8 @@ pub trait RowGroupReader: Send + Sync { Ok(col_reader) } - #[cfg(feature = "bloom")] /// Get bloom filter for the `i`th column chunk, if present. - fn get_column_bloom_filter(&self, i: usize) -> Result>; + fn get_column_bloom_filter(&self, i: usize) -> Option<&Sbbf>; /// Get iterator of `Row`s from this row group. /// diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs index cb39dd19487..84768aa23c8 100644 --- a/parquet/src/file/serialized_reader.rs +++ b/parquet/src/file/serialized_reader.rs @@ -20,10 +20,10 @@ use std::collections::VecDeque; use std::io::Cursor; +use std::iter; use std::{convert::TryFrom, fs::File, io::Read, path::Path, sync::Arc}; use crate::basic::{Encoding, Type}; -#[cfg(feature = "bloom")] use crate::bloom_filter::Sbbf; use crate::column::page::{Page, PageMetadata, PageReader}; use crate::compression::{create_codec, Codec}; @@ -329,7 +329,7 @@ impl FileReader for SerializedFileReader { f, row_group_metadata, props, - ))) + )?)) } fn get_row_iter(&self, projection: Option) -> Result { @@ -342,6 +342,7 @@ pub struct SerializedRowGroupReader<'a, R: ChunkReader> { chunk_reader: Arc, metadata: &'a RowGroupMetaData, props: ReaderPropertiesPtr, + bloom_filters: Vec>, } impl<'a, R: ChunkReader> SerializedRowGroupReader<'a, R> { @@ -350,12 +351,22 @@ impl<'a, R: ChunkReader> SerializedRowGroupReader<'a, R> { chunk_reader: Arc, metadata: &'a RowGroupMetaData, props: ReaderPropertiesPtr, - ) -> Self { - Self { + ) -> Result { + let bloom_filters = if props.read_bloom_filter() { + metadata + .columns() + .iter() + .map(|col| Sbbf::read_from_column_chunk(col, chunk_reader.clone())) + .collect::>>()? + } else { + iter::repeat(None).take(metadata.columns().len()).collect() + }; + Ok(Self { chunk_reader, metadata, props, - } + bloom_filters, + }) } } @@ -388,11 +399,9 @@ impl<'a, R: 'static + ChunkReader> RowGroupReader for SerializedRowGroupReader<' )?)) } - #[cfg(feature = "bloom")] /// get bloom filter for the `i`th column - fn get_column_bloom_filter(&self, i: usize) -> Result> { - let col = self.metadata.column(i); - Sbbf::read_from_column_chunk(col, self.chunk_reader.clone()) + fn get_column_bloom_filter(&self, i: usize) -> Option<&Sbbf> { + self.bloom_filters[i].as_ref() } fn get_row_iter(&self, projection: Option) -> Result { diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs index b67bdccfe39..c96cc957930 100644 --- a/parquet/src/file/writer.rs +++ b/parquet/src/file/writer.rs @@ -18,10 +18,10 @@ //! Contains file writer API, and provides methods to write row groups and columns by //! using row group writers and column writers respectively. -use std::{io::Write, sync::Arc}; - +use crate::bloom_filter::Sbbf; use crate::format as parquet; use crate::format::{ColumnIndex, OffsetIndex, RowGroup}; +use std::{io::Write, sync::Arc}; use thrift::protocol::{TCompactOutputProtocol, TOutputProtocol, TSerializable}; use crate::basic::PageType; @@ -116,6 +116,7 @@ pub struct SerializedFileWriter { descr: SchemaDescPtr, props: WriterPropertiesPtr, row_groups: Vec, + bloom_filters: Vec>>, column_indexes: Vec>>, offset_indexes: Vec>>, row_group_index: usize, @@ -132,6 +133,7 @@ impl SerializedFileWriter { descr: Arc::new(SchemaDescriptor::new(schema)), props: properties, row_groups: vec![], + bloom_filters: vec![], column_indexes: Vec::new(), offset_indexes: Vec::new(), row_group_index: 0, @@ -212,6 +214,36 @@ impl SerializedFileWriter { Ok(()) } + /// Serialize all the bloom filter to the file + fn write_bloom_filters(&mut self, row_groups: &mut [RowGroup]) -> Result<()> { + // iter row group + // iter each column + // write bloom filter to the file + for (row_group_idx, row_group) in row_groups.iter_mut().enumerate() { + for (column_idx, column_metadata) in row_group.columns.iter_mut().enumerate() + { + match &self.bloom_filters[row_group_idx][column_idx] { + Some(bloom_filter) => { + let start_offset = self.buf.bytes_written(); + let mut protocol = TCompactOutputProtocol::new(&mut self.buf); + let header = bloom_filter.header(); + header.write_to_out_protocol(&mut protocol)?; + protocol.flush()?; + bloom_filter.write_bitset(&mut self.buf)?; + // set offset and index for bloom filter + column_metadata + .meta_data + .as_mut() + .expect("can't have bloom filter without column metadata") + .bloom_filter_offset = Some(start_offset as i64); + } + None => {} + } + } + } + Ok(()) + } + /// Serialize all the column index to the file fn write_column_indexes(&mut self, row_groups: &mut [RowGroup]) -> Result<()> { // iter row group @@ -250,6 +282,7 @@ impl SerializedFileWriter { .map(|v| v.to_thrift()) .collect::>(); + self.write_bloom_filters(&mut row_groups)?; // Write column indexes and offset indexes self.write_column_indexes(&mut row_groups)?; self.write_offset_indexes(&mut row_groups)?; @@ -320,6 +353,7 @@ pub struct SerializedRowGroupWriter<'a, W: Write> { column_index: usize, row_group_metadata: Option, column_chunks: Vec, + bloom_filters: Vec>, column_indexes: Vec>, offset_indexes: Vec>, on_close: Option>, @@ -348,6 +382,7 @@ impl<'a, W: Write> SerializedRowGroupWriter<'a, W> { column_index: 0, row_group_metadata: None, column_chunks: Vec::with_capacity(num_columns), + bloom_filters: Vec::with_capacity(num_columns), column_indexes: Vec::with_capacity(num_columns), offset_indexes: Vec::with_capacity(num_columns), total_bytes_written: 0, @@ -380,11 +415,13 @@ impl<'a, W: Write> SerializedRowGroupWriter<'a, W> { let column_chunks = &mut self.column_chunks; let column_indexes = &mut self.column_indexes; let offset_indexes = &mut self.offset_indexes; + let bloom_filters = &mut self.bloom_filters; let on_close = |r: ColumnCloseResult| { // Update row group writer metrics *total_bytes_written += r.bytes_written; column_chunks.push(r.metadata); + bloom_filters.push(r.bloom_filter); column_indexes.push(r.column_index); offset_indexes.push(r.offset_index); @@ -623,6 +660,7 @@ impl<'a, W: Write> PageWriter for SerializedPageWriter<'a, W> { Ok(spec) } + fn write_metadata(&mut self, metadata: &ColumnChunkMetaData) -> Result<()> { let mut protocol = TCompactOutputProtocol::new(&mut self.sink); metadata diff --git a/parquet/src/lib.rs b/parquet/src/lib.rs index cd29d02f808..4cdba1dc55e 100644 --- a/parquet/src/lib.rs +++ b/parquet/src/lib.rs @@ -84,7 +84,6 @@ pub mod arrow; pub mod column; experimental!(mod compression); experimental!(mod encodings); -#[cfg(feature = "bloom")] pub mod bloom_filter; pub mod file; pub mod record; From ea13d0aca048b695b5aaf0fd198617c9b836736b Mon Sep 17 00:00:00 2001 From: Jiayu Liu Date: Wed, 16 Nov 2022 16:16:19 +0800 Subject: [PATCH 02/12] update row group vec --- parquet/src/file/writer.rs | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs index c96cc957930..f149747c7d4 100644 --- a/parquet/src/file/writer.rs +++ b/parquet/src/file/writer.rs @@ -92,6 +92,7 @@ pub type OnCloseColumnChunk<'a> = Box Result<() pub type OnCloseRowGroup<'a> = Box< dyn FnOnce( RowGroupMetaDataPtr, + Vec>, Vec>, Vec>, ) -> Result<()> @@ -151,10 +152,15 @@ impl SerializedFileWriter { self.row_group_index += 1; let row_groups = &mut self.row_groups; + let row_bloom_filters = &mut self.bloom_filters; let row_column_indexes = &mut self.column_indexes; let row_offset_indexes = &mut self.offset_indexes; - let on_close = |metadata, row_group_column_index, row_group_offset_index| { + let on_close = |metadata, + row_group_bloom_filter, + row_group_column_index, + row_group_offset_index| { row_groups.push(metadata); + row_bloom_filters.push(row_group_bloom_filter); row_column_indexes.push(row_group_column_index); row_offset_indexes.push(row_group_offset_index); Ok(()) @@ -220,8 +226,7 @@ impl SerializedFileWriter { // iter each column // write bloom filter to the file for (row_group_idx, row_group) in row_groups.iter_mut().enumerate() { - for (column_idx, column_metadata) in row_group.columns.iter_mut().enumerate() - { + for (column_idx, column_chunk) in row_group.columns.iter_mut().enumerate() { match &self.bloom_filters[row_group_idx][column_idx] { Some(bloom_filter) => { let start_offset = self.buf.bytes_written(); @@ -231,7 +236,7 @@ impl SerializedFileWriter { protocol.flush()?; bloom_filter.write_bitset(&mut self.buf)?; // set offset and index for bloom filter - column_metadata + column_chunk .meta_data .as_mut() .expect("can't have bloom filter without column metadata") @@ -480,6 +485,7 @@ impl<'a, W: Write> SerializedRowGroupWriter<'a, W> { if let Some(on_close) = self.on_close.take() { on_close( metadata, + self.bloom_filters.clone(), self.column_indexes.clone(), self.offset_indexes.clone(), )? From 03edb7df705c94281ef62110f0c4be37f822e574 Mon Sep 17 00:00:00 2001 From: Jiayu Liu Date: Wed, 16 Nov 2022 18:31:56 +0800 Subject: [PATCH 03/12] fix clippy --- parquet/src/file/properties.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/parquet/src/file/properties.rs b/parquet/src/file/properties.rs index 31b380adbf2..1ddbc0a272c 100644 --- a/parquet/src/file/properties.rs +++ b/parquet/src/file/properties.rs @@ -800,7 +800,7 @@ mod tests { props.max_statistics_size(&ColumnPath::from("col")), DEFAULT_MAX_STATISTICS_SIZE ); - assert_eq!(props.bloom_filter_enabled(&ColumnPath::from("col")), false); + assert!(!props.bloom_filter_enabled(&ColumnPath::from("col"))); assert_eq!(props.bloom_filter_fpp(&ColumnPath::from("col")), 0.01); assert_eq!(props.bloom_filter_ndv(&ColumnPath::from("col")), None); assert_eq!( @@ -968,7 +968,7 @@ mod tests { .build(); assert_eq!(props.codec_options(), &codec_options); - assert_eq!(props.read_bloom_filter(), true); + assert!(props.read_bloom_filter()); } #[test] From 5fa74765f6fe562581c804aee21ddb25c6fe2f03 Mon Sep 17 00:00:00 2001 From: Jiayu Liu Date: Wed, 16 Nov 2022 18:39:02 +0800 Subject: [PATCH 04/12] fix clippy --- parquet/src/bin/parquet-show-bloom-filter.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/parquet/src/bin/parquet-show-bloom-filter.rs b/parquet/src/bin/parquet-show-bloom-filter.rs index a4dbdbe67de..28493a94c49 100644 --- a/parquet/src/bin/parquet-show-bloom-filter.rs +++ b/parquet/src/bin/parquet-show-bloom-filter.rs @@ -78,10 +78,7 @@ fn main() { let row_group_reader = file_reader .get_row_group(ri) .expect("Unable to read row group"); - if let Some(sbbf) = row_group_reader - .get_column_bloom_filter(column_index) - .expect("Failed to parse bloom filter") - { + if let Some(sbbf) = row_group_reader.get_column_bloom_filter(column_index) { args.values.iter().for_each(|value| { println!( "Value {} is {} in bloom filter", From 3732e436cc841ee44202456d3e98b88e066a9a76 Mon Sep 17 00:00:00 2001 From: Jiayu Liu Date: Wed, 16 Nov 2022 20:51:42 +0800 Subject: [PATCH 05/12] remove default feature for twox --- parquet/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml index 0af5369ced1..9d957fba859 100644 --- a/parquet/Cargo.toml +++ b/parquet/Cargo.toml @@ -57,7 +57,7 @@ seq-macro = { version = "0.3", default-features = false } futures = { version = "0.3", default-features = false, features = ["std"], optional = true } tokio = { version = "1.0", optional = true, default-features = false, features = ["macros", "rt", "io-util"] } hashbrown = { version = "0.13", default-features = false } -twox-hash = { version = "1.6" } +twox-hash = { version = "1.6", default-features = false } paste = { version = "1.0" } [dev-dependencies] From 7f46a4b6ebd13f82d6f92fdac52b69c847c9a600 Mon Sep 17 00:00:00 2001 From: Jiayu Liu Date: Thu, 17 Nov 2022 15:12:08 +0800 Subject: [PATCH 06/12] incorporate ndv and fpp --- parquet/src/bloom_filter/mod.rs | 48 ++++++++++++++++++++++++++++++++ parquet/src/column/writer/mod.rs | 16 +++++++++-- parquet/src/file/properties.rs | 14 +++++++++- 3 files changed, 75 insertions(+), 3 deletions(-) diff --git a/parquet/src/bloom_filter/mod.rs b/parquet/src/bloom_filter/mod.rs index 90f00d0d7c1..1b6d9b1c1fc 100644 --- a/parquet/src/bloom_filter/mod.rs +++ b/parquet/src/bloom_filter/mod.rs @@ -116,7 +116,39 @@ fn read_bloom_filter_header_and_length( )) } +const BITSET_MIN_LENGTH: usize = 32; +const BITSET_MAX_LENGTH: usize = 128 * 1024 * 1024; + +#[inline] +fn optimal_num_of_bytes(num_bytes: usize) -> usize { + let num_bytes = num_bytes.min(BITSET_MAX_LENGTH); + let num_bytes = num_bytes.max(BITSET_MIN_LENGTH); + num_bytes.next_power_of_two() +} + impl Sbbf { + /// Create a new Sbbf with given number of distinct values and false positive probability. + /// Will panic if `fpp` is greater than 1.0 or less than 0.0. + pub fn new_with_ndv_fpp(ndv: u64, fpp: f64) -> Self { + assert!(0.0 <= fpp && fpp <= 1.0, "invalid fpp: {}", fpp); + let num_bits: f64 = -8.0 * ndv as f64 / (1.0 - fpp.powf(1.0 / 8.0)).ln(); + let num_bits = if num_bits < 0.0 { + // overflow here + BITSET_MAX_LENGTH * 8 + } else { + num_bits as usize + }; + Self::new_with_num_of_bytes(num_bits / 8) + } + + /// Create a new Sbbf with given number of bytes, the exact number of bytes will be adjusted + /// to the next power of two bounded by [BITSET_MIN_LENGTH] and [BITSET_MAX_LENGTH]. + pub fn new_with_num_of_bytes(num_bytes: usize) -> Self { + let num_bytes = optimal_num_of_bytes(num_bytes); + let bitset = vec![0_u8; num_bytes]; + Self::new(&bitset) + } + fn new(bitset: &[u8]) -> Self { let data = bitset .chunks_exact(4 * 8) @@ -322,4 +354,20 @@ mod tests { assert_eq!(num_bytes, 32_i32); assert_eq!(20, SBBF_HEADER_SIZE_ESTIMATE); } + + #[test] + fn test_optimal_num_of_bytes() { + for (input, expected) in &[ + (0, 32), + (9, 32), + (31, 32), + (32, 32), + (33, 64), + (99, 128), + (1024, 1024), + (999_000_000, 128 * 1024 * 1024), + ] { + assert_eq!(*expected, optimal_num_of_bytes(*input)); + } + } } diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs index dd6915aa4e4..ae7920e2283 100644 --- a/parquet/src/column/writer/mod.rs +++ b/parquet/src/column/writer/mod.rs @@ -238,6 +238,19 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { // Used for level information encodings.insert(Encoding::RLE); + let bloom_filter_enabled = props.bloom_filter_enabled(descr.path()); + let bloom_filter = if bloom_filter_enabled { + if let Some(ndv) = props.bloom_filter_ndv(descr.path()) { + let fpp = props.bloom_filter_fpp(descr.path()); + Some(Sbbf::new_with_ndv_fpp(ndv, fpp)) + } else { + let max_bytes = props.bloom_filter_max_bytes(descr.path()); + Some(Sbbf::new_with_num_of_bytes(max_bytes as usize)) + } + } else { + None + }; + Self { descr, props, @@ -267,8 +280,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { num_column_nulls: 0, column_distinct_count: None, }, - // TODO! - bloom_filter: None, + bloom_filter, column_index_builder: ColumnIndexBuilder::new(), offset_index_builder: OffsetIndexBuilder::new(), encodings, diff --git a/parquet/src/file/properties.rs b/parquet/src/file/properties.rs index 1ddbc0a272c..91d0e35165e 100644 --- a/parquet/src/file/properties.rs +++ b/parquet/src/file/properties.rs @@ -310,6 +310,18 @@ macro_rules! def_opt_field_setter { } } }; + ($field: ident, $type: ty, $min_value:expr, $max_value:expr) => { + paste! { + pub fn [](&mut self, value: $type) -> &mut Self { + if value >= $min_value && value <= $max_value { + self.$field = Some(value); + } else { + self.$field = None + } + self + } + } + }; } macro_rules! def_opt_field_getter { @@ -638,7 +650,7 @@ impl ColumnProperties { } def_opt_field_setter!(bloom_filter_enabled, bool); - def_opt_field_setter!(bloom_filter_fpp, f64); + def_opt_field_setter!(bloom_filter_fpp, f64, 0.0, 1.0); def_opt_field_setter!(bloom_filter_max_bytes, u32); def_opt_field_setter!(bloom_filter_ndv, u64); From 27a404d3cceb618aa0c165fdd8989a0ead7b6f16 Mon Sep 17 00:00:00 2001 From: Jiayu Liu Date: Thu, 17 Nov 2022 15:40:12 +0800 Subject: [PATCH 07/12] fix doc --- parquet/src/bloom_filter/mod.rs | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/parquet/src/bloom_filter/mod.rs b/parquet/src/bloom_filter/mod.rs index 1b6d9b1c1fc..0f08853ee16 100644 --- a/parquet/src/bloom_filter/mod.rs +++ b/parquet/src/bloom_filter/mod.rs @@ -127,10 +127,14 @@ fn optimal_num_of_bytes(num_bytes: usize) -> usize { } impl Sbbf { - /// Create a new Sbbf with given number of distinct values and false positive probability. + /// Create a new [Sbbf] with given number of distinct values and false positive probability. /// Will panic if `fpp` is greater than 1.0 or less than 0.0. pub fn new_with_ndv_fpp(ndv: u64, fpp: f64) -> Self { assert!(0.0 <= fpp && fpp <= 1.0, "invalid fpp: {}", fpp); + // see http://algo2.iti.kit.edu/documents/cacheefficientbloomfilters-jea.pdf + // given fpp = (1 - e^(-k * n / m)) ^ k + // we have m = - k * n / ln(1 - fpp ^ (1 / k)) + // where k = number of hash functions, m = number of bits, n = number of distinct values let num_bits: f64 = -8.0 * ndv as f64 / (1.0 - fpp.powf(1.0 / 8.0)).ln(); let num_bits = if num_bits < 0.0 { // overflow here @@ -141,8 +145,8 @@ impl Sbbf { Self::new_with_num_of_bytes(num_bits / 8) } - /// Create a new Sbbf with given number of bytes, the exact number of bytes will be adjusted - /// to the next power of two bounded by [BITSET_MIN_LENGTH] and [BITSET_MAX_LENGTH]. + /// Create a new [Sbbf] with given number of bytes, the exact number of bytes will be adjusted + /// to the next power of two bounded by `BITSET_MIN_LENGTH` and `BITSET_MAX_LENGTH`. pub fn new_with_num_of_bytes(num_bytes: usize) -> Self { let num_bytes = optimal_num_of_bytes(num_bytes); let bitset = vec![0_u8; num_bytes]; From 35e56c135cef1f4c3d119faee212fc296501184d Mon Sep 17 00:00:00 2001 From: Jiayu Liu Date: Thu, 17 Nov 2022 15:55:02 +0800 Subject: [PATCH 08/12] add unit test --- parquet/src/bloom_filter/mod.rs | 41 +++++++++++++++++++++++++++++---- 1 file changed, 36 insertions(+), 5 deletions(-) diff --git a/parquet/src/bloom_filter/mod.rs b/parquet/src/bloom_filter/mod.rs index 0f08853ee16..1fca440cf6e 100644 --- a/parquet/src/bloom_filter/mod.rs +++ b/parquet/src/bloom_filter/mod.rs @@ -126,16 +126,21 @@ fn optimal_num_of_bytes(num_bytes: usize) -> usize { num_bytes.next_power_of_two() } +// see http://algo2.iti.kit.edu/documents/cacheefficientbloomfilters-jea.pdf +// given fpp = (1 - e^(-k * n / m)) ^ k +// we have m = - k * n / ln(1 - fpp ^ (1 / k)) +// where k = number of hash functions, m = number of bits, n = number of distinct values +#[inline] +fn num_of_bits_from_ndv_fpp(ndv: u64, fpp: f64) -> f64 { + -8.0 * ndv as f64 / (1.0 - fpp.powf(1.0 / 8.0)).ln() +} + impl Sbbf { /// Create a new [Sbbf] with given number of distinct values and false positive probability. /// Will panic if `fpp` is greater than 1.0 or less than 0.0. pub fn new_with_ndv_fpp(ndv: u64, fpp: f64) -> Self { assert!(0.0 <= fpp && fpp <= 1.0, "invalid fpp: {}", fpp); - // see http://algo2.iti.kit.edu/documents/cacheefficientbloomfilters-jea.pdf - // given fpp = (1 - e^(-k * n / m)) ^ k - // we have m = - k * n / ln(1 - fpp ^ (1 / k)) - // where k = number of hash functions, m = number of bits, n = number of distinct values - let num_bits: f64 = -8.0 * ndv as f64 / (1.0 - fpp.powf(1.0 / 8.0)).ln(); + let num_bits = num_of_bits_from_ndv_fpp(ndv, fpp); let num_bits = if num_bits < 0.0 { // overflow here BITSET_MAX_LENGTH * 8 @@ -374,4 +379,30 @@ mod tests { assert_eq!(*expected, optimal_num_of_bytes(*input)); } } + + #[test] + fn test_num_of_bits_from_ndv_fpp() { + for (fpp, ndv, num_bits) in &[ + (0.1, 10, 57), + (0.01, 10, 96), + (0.001, 10, 146), + (0.1, 100, 577), + (0.01, 100, 968), + (0.001, 100, 1460), + (0.1, 1000, 5772), + (0.01, 1000, 9681), + (0.001, 1000, 14607), + (0.1, 10000, 57725), + (0.01, 10000, 96815), + (0.001, 10000, 146076), + (0.1, 100000, 577254), + (0.01, 100000, 968152), + (0.001, 100000, 1460769), + (0.1, 1000000, 5772541), + (0.01, 1000000, 9681526), + (0.001, 1000000, 14607697), + ] { + assert_eq!(*num_bits, num_of_bits_from_ndv_fpp(*ndv, *fpp) as u64); + } + } } From ec68e695e5e0347783f875e8601eeca6a275cbdd Mon Sep 17 00:00:00 2001 From: Jiayu Liu Date: Thu, 17 Nov 2022 15:57:07 +0800 Subject: [PATCH 09/12] fix clippy --- parquet/src/bloom_filter/mod.rs | 2 +- parquet/src/file/properties.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/parquet/src/bloom_filter/mod.rs b/parquet/src/bloom_filter/mod.rs index 1fca440cf6e..265a3768f62 100644 --- a/parquet/src/bloom_filter/mod.rs +++ b/parquet/src/bloom_filter/mod.rs @@ -139,7 +139,7 @@ impl Sbbf { /// Create a new [Sbbf] with given number of distinct values and false positive probability. /// Will panic if `fpp` is greater than 1.0 or less than 0.0. pub fn new_with_ndv_fpp(ndv: u64, fpp: f64) -> Self { - assert!(0.0 <= fpp && fpp <= 1.0, "invalid fpp: {}", fpp); + assert!((0.0..-1.0).contains(&fpp), "invalid fpp: {}", fpp); let num_bits = num_of_bits_from_ndv_fpp(ndv, fpp); let num_bits = if num_bits < 0.0 { // overflow here diff --git a/parquet/src/file/properties.rs b/parquet/src/file/properties.rs index 91d0e35165e..f68e5b3cac1 100644 --- a/parquet/src/file/properties.rs +++ b/parquet/src/file/properties.rs @@ -313,7 +313,7 @@ macro_rules! def_opt_field_setter { ($field: ident, $type: ty, $min_value:expr, $max_value:expr) => { paste! { pub fn [](&mut self, value: $type) -> &mut Self { - if value >= $min_value && value <= $max_value { + if ($min_value..=$max_value).contains(&value) { self.$field = Some(value); } else { self.$field = None From 85014cea8ad57dbc9a381f79db755e5fc5b63ac3 Mon Sep 17 00:00:00 2001 From: Jiayu Liu Date: Sat, 19 Nov 2022 13:28:22 +0800 Subject: [PATCH 10/12] Apply suggestions from code review Co-authored-by: Andrew Lamb --- parquet/src/file/properties.rs | 2 +- parquet/src/file/reader.rs | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/parquet/src/file/properties.rs b/parquet/src/file/properties.rs index f68e5b3cac1..3401687dbd1 100644 --- a/parquet/src/file/properties.rs +++ b/parquet/src/file/properties.rs @@ -691,7 +691,7 @@ impl ColumnProperties { /// Reference counted reader properties. pub type ReaderPropertiesPtr = Arc; -const DEFAULT_READ_BLOOM_FILTER: bool = true; +const DEFAULT_READ_BLOOM_FILTER: bool = false; /// Reader properties. /// diff --git a/parquet/src/file/reader.rs b/parquet/src/file/reader.rs index 4610ccc54fb..bb82f229927 100644 --- a/parquet/src/file/reader.rs +++ b/parquet/src/file/reader.rs @@ -144,7 +144,8 @@ pub trait RowGroupReader: Send + Sync { Ok(col_reader) } - /// Get bloom filter for the `i`th column chunk, if present. + /// Get bloom filter for the `i`th column chunk, if present and the reader was configured + /// to read bloom filters. fn get_column_bloom_filter(&self, i: usize) -> Option<&Sbbf>; /// Get iterator of `Row`s from this row group. From 0d6540dd045dfbeab32a6b496e94f2ea982fad0d Mon Sep 17 00:00:00 2001 From: Jiayu Liu Date: Sat, 19 Nov 2022 13:58:06 +0800 Subject: [PATCH 11/12] remove underflow logic --- parquet/src/bloom_filter/mod.rs | 12 ++++-------- parquet/src/file/properties.rs | 2 +- 2 files changed, 5 insertions(+), 9 deletions(-) diff --git a/parquet/src/bloom_filter/mod.rs b/parquet/src/bloom_filter/mod.rs index 265a3768f62..e2fc8c9d64a 100644 --- a/parquet/src/bloom_filter/mod.rs +++ b/parquet/src/bloom_filter/mod.rs @@ -131,8 +131,9 @@ fn optimal_num_of_bytes(num_bytes: usize) -> usize { // we have m = - k * n / ln(1 - fpp ^ (1 / k)) // where k = number of hash functions, m = number of bits, n = number of distinct values #[inline] -fn num_of_bits_from_ndv_fpp(ndv: u64, fpp: f64) -> f64 { - -8.0 * ndv as f64 / (1.0 - fpp.powf(1.0 / 8.0)).ln() +fn num_of_bits_from_ndv_fpp(ndv: u64, fpp: f64) -> usize { + let num_bits = -8.0 * ndv as f64 / (1.0 - fpp.powf(1.0 / 8.0)).ln(); + num_bits as usize } impl Sbbf { @@ -141,12 +142,6 @@ impl Sbbf { pub fn new_with_ndv_fpp(ndv: u64, fpp: f64) -> Self { assert!((0.0..-1.0).contains(&fpp), "invalid fpp: {}", fpp); let num_bits = num_of_bits_from_ndv_fpp(ndv, fpp); - let num_bits = if num_bits < 0.0 { - // overflow here - BITSET_MAX_LENGTH * 8 - } else { - num_bits as usize - }; Self::new_with_num_of_bytes(num_bits / 8) } @@ -401,6 +396,7 @@ mod tests { (0.1, 1000000, 5772541), (0.01, 1000000, 9681526), (0.001, 1000000, 14607697), + (1e-50, 1_000_000_000_000, 14226231280773240832), ] { assert_eq!(*num_bits, num_of_bits_from_ndv_fpp(*ndv, *fpp) as u64); } diff --git a/parquet/src/file/properties.rs b/parquet/src/file/properties.rs index 3401687dbd1..03117d4cb07 100644 --- a/parquet/src/file/properties.rs +++ b/parquet/src/file/properties.rs @@ -980,7 +980,7 @@ mod tests { .build(); assert_eq!(props.codec_options(), &codec_options); - assert!(props.read_bloom_filter()); + assert!(!props.read_bloom_filter()); } #[test] From 37e145d38e705b4230895afa8c3ad4f469919367 Mon Sep 17 00:00:00 2001 From: Jiayu Liu Date: Sat, 19 Nov 2022 14:04:24 +0800 Subject: [PATCH 12/12] refactor write --- parquet/src/bloom_filter/mod.rs | 20 +++++++++++++++++--- parquet/src/file/writer.rs | 6 +----- 2 files changed, 18 insertions(+), 8 deletions(-) diff --git a/parquet/src/bloom_filter/mod.rs b/parquet/src/bloom_filter/mod.rs index e2fc8c9d64a..4efba3834de 100644 --- a/parquet/src/bloom_filter/mod.rs +++ b/parquet/src/bloom_filter/mod.rs @@ -30,7 +30,9 @@ use bytes::{Buf, Bytes}; use std::hash::Hasher; use std::io::Write; use std::sync::Arc; -use thrift::protocol::{TCompactInputProtocol, TSerializable}; +use thrift::protocol::{ + TCompactInputProtocol, TCompactOutputProtocol, TOutputProtocol, TSerializable, +}; use twox_hash::XxHash64; /// Salt as defined in the [spec](https://github.com/apache/parquet-format/blob/master/BloomFilter.md#technical-approach) @@ -167,8 +169,20 @@ impl Sbbf { Self(data) } + /// Write the bloom filter data (header and then bitset) to the output + pub fn write(&self, mut writer: W) -> Result<(), ParquetError> { + let mut protocol = TCompactOutputProtocol::new(&mut writer); + let header = self.header(); + header.write_to_out_protocol(&mut protocol).map_err(|e| { + ParquetError::General(format!("Could not write bloom filter header: {}", e)) + })?; + protocol.flush()?; + self.write_bitset(&mut writer)?; + Ok(()) + } + /// Write the bitset in serialized form to the writer. - pub fn write_bitset(&self, mut writer: W) -> Result<(), ParquetError> { + fn write_bitset(&self, mut writer: W) -> Result<(), ParquetError> { for block in &self.0 { for word in block { writer.write_all(&word.to_le_bytes()).map_err(|e| { @@ -183,7 +197,7 @@ impl Sbbf { } /// Create and populate [`BloomFilterHeader`] from this bitset for writing to serialized form - pub fn header(&self) -> BloomFilterHeader { + fn header(&self) -> BloomFilterHeader { BloomFilterHeader { // 8 i32 per block, 4 bytes per i32 num_bytes: self.0.len() as i32 * 4 * 8, diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs index f149747c7d4..ff1d643978b 100644 --- a/parquet/src/file/writer.rs +++ b/parquet/src/file/writer.rs @@ -230,11 +230,7 @@ impl SerializedFileWriter { match &self.bloom_filters[row_group_idx][column_idx] { Some(bloom_filter) => { let start_offset = self.buf.bytes_written(); - let mut protocol = TCompactOutputProtocol::new(&mut self.buf); - let header = bloom_filter.header(); - header.write_to_out_protocol(&mut protocol)?; - protocol.flush()?; - bloom_filter.write_bitset(&mut self.buf)?; + bloom_filter.write(&mut self.buf)?; // set offset and index for bloom filter column_chunk .meta_data