From 52bf18a9425de5effe94c52cfa98b415ee729bdf Mon Sep 17 00:00:00 2001 From: Jiayu Liu Date: Sun, 13 Nov 2022 13:13:05 +0000 Subject: [PATCH] add feature flag --- ...-a9b1-084447078e60-c000.snappy.parquet.crc | Bin 0 -> 16 bytes bla.parquet/_SUCCESS | 0 ...4f6e-a9b1-084447078e60-c000.snappy.parquet | Bin 0 -> 587 bytes parquet/Cargo.toml | 1 + parquet/src/bloom_filter/mod.rs | 27 +++++ parquet/src/column/writer/mod.rs | 15 +++ parquet/src/file/properties.rs | 114 +++++++++++++++--- parquet/src/file/writer.rs | 51 +++++++- 8 files changed, 192 insertions(+), 16 deletions(-) create mode 100644 bla.parquet/.part-00000-e0d3a667-4c2e-4f6e-a9b1-084447078e60-c000.snappy.parquet.crc create mode 100644 bla.parquet/_SUCCESS create mode 100644 bla.parquet/part-00000-e0d3a667-4c2e-4f6e-a9b1-084447078e60-c000.snappy.parquet diff --git a/bla.parquet/.part-00000-e0d3a667-4c2e-4f6e-a9b1-084447078e60-c000.snappy.parquet.crc b/bla.parquet/.part-00000-e0d3a667-4c2e-4f6e-a9b1-084447078e60-c000.snappy.parquet.crc new file mode 100644 index 0000000000000000000000000000000000000000..9ae4b93f87823746151049b23c1977c7d66d49cf GIT binary patch literal 16 XcmYc;N@ieSU}ErQxg~if??wm!Aom30 literal 0 HcmV?d00001 diff --git a/bla.parquet/_SUCCESS b/bla.parquet/_SUCCESS new file mode 100644 index 00000000000..e69de29bb2d diff --git a/bla.parquet/part-00000-e0d3a667-4c2e-4f6e-a9b1-084447078e60-c000.snappy.parquet b/bla.parquet/part-00000-e0d3a667-4c2e-4f6e-a9b1-084447078e60-c000.snappy.parquet new file mode 100644 index 0000000000000000000000000000000000000000..1f3f139eb5808f1b1adb5d88706f837589e13e2c GIT binary patch literal 587 zcmZWny^hmB5FRhK3jAaWK#!{j?VXWAje;razP#}a*geZ^zaU0t?EbJs_JI7BH ziGl|RRJ1@t%~Pc14Jas2@CuBhJ5f-~?9R{k%{ST|A3pR^#@9H(A78)xRu8a_GQdHd z0f1;JD)@>j2TZyOoPsu4hpelpt=_Jszw-kGe2Xrgeg5-7vv4aBr-~ZDt@b4%+Dejq z?Q-zw^wW0*h~B)yOoNG5Ol`=wH{&B)dw=6P zDB&T4pYG_vkxY2^Zmn2|{N?i^d=X~?8DOo!43#9uux55-Rin{K zgp3OMmg&DFDm68$3wxxfk_M(rI}kb0YSp2CBm2lu*Uuzu5@UsJY6Pv5de<=K`LQP! zf)sc=G2UsM7s-5PDBRo^l@eJP5);d*7_y2;NBm IWBNva0k=DcOaK4? literal 0 HcmV?d00001 diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml index fc7c8218ad0..72baaf33834 100644 --- a/parquet/Cargo.toml +++ b/parquet/Cargo.toml @@ -58,6 +58,7 @@ futures = { version = "0.3", default-features = false, features = ["std"], optio tokio = { version = "1.0", optional = true, default-features = false, features = ["macros", "rt", "io-util"] } hashbrown = { version = "0.13", default-features = false } twox-hash = { version = "1.6", optional = true } +paste = "1.0" [dev-dependencies] base64 = { version = "0.13", default-features = false, features = ["std"] } diff --git a/parquet/src/bloom_filter/mod.rs b/parquet/src/bloom_filter/mod.rs index 4944a93f848..0122a3a7679 100644 --- a/parquet/src/bloom_filter/mod.rs +++ b/parquet/src/bloom_filter/mod.rs @@ -24,9 +24,11 @@ use crate::file::metadata::ColumnChunkMetaData; use crate::file::reader::ChunkReader; use crate::format::{ BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash, BloomFilterHeader, + SplitBlockAlgorithm, Uncompressed, XxHash, }; use bytes::{Buf, Bytes}; use std::hash::Hasher; +use std::io::Write; use std::sync::Arc; use thrift::protocol::{TCompactInputProtocol, TSerializable}; use twox_hash::XxHash64; @@ -80,6 +82,7 @@ fn block_check(block: &Block, hash: u32) -> bool { } /// A split block Bloom filter +#[derive(Debug, Clone)] pub struct Sbbf(Vec); const SBBF_HEADER_SIZE_ESTIMATE: usize = 20; @@ -128,6 +131,30 @@ impl Sbbf { Self(data) } + pub fn write_bitset(&self, mut writer: W) -> Result<(), ParquetError> { + for block in &self.0 { + for word in block { + writer.write_all(&word.to_le_bytes()).map_err(|e| { + ParquetError::General(format!( + "Could not write bloom filter bit set: {}", + e + )) + })?; + } + } + Ok(()) + } + + pub fn header(&self) -> BloomFilterHeader { + BloomFilterHeader { + // 8 i32 per block, 4 bytes per i32 + num_bytes: self.0.len() as i32 * 4 * 8, + algorithm: BloomFilterAlgorithm::BLOCK(SplitBlockAlgorithm {}), + hash: BloomFilterHash::XXHASH(XxHash {}), + compression: BloomFilterCompression::UNCOMPRESSED(Uncompressed {}), + } + } + pub fn read_from_column_chunk( column_metadata: &ColumnChunkMetaData, reader: Arc, diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs index 3cdf04f5494..f8e79d7928c 100644 --- a/parquet/src/column/writer/mod.rs +++ b/parquet/src/column/writer/mod.rs @@ -16,6 +16,9 @@ // under the License. //! Contains column writer API. + +#[cfg(feature = "bloom")] +use crate::bloom_filter::Sbbf; use crate::format::{ColumnIndex, OffsetIndex}; use std::collections::{BTreeSet, VecDeque}; @@ -154,6 +157,9 @@ pub struct ColumnCloseResult { pub rows_written: u64, /// Metadata for this column chunk pub metadata: ColumnChunkMetaData, + /// Optional bloom filter for this column + #[cfg(feature = "bloom")] + pub bloom_filter: Option, /// Optional column index, for filtering pub column_index: Option, /// Optional offset index, identifying page locations @@ -209,6 +215,10 @@ pub struct GenericColumnWriter<'a, E: ColumnValueEncoder> { rep_levels_sink: Vec, data_pages: VecDeque, + // bloom filter + #[cfg(feature = "bloom")] + bloom_filter: Option, + // column index and offset index column_index_builder: ColumnIndexBuilder, offset_index_builder: OffsetIndexBuilder, @@ -260,6 +270,9 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { num_column_nulls: 0, column_distinct_count: None, }, + // TODO! + #[cfg(feature = "bloom")] + bloom_filter: None, column_index_builder: ColumnIndexBuilder::new(), offset_index_builder: OffsetIndexBuilder::new(), encodings, @@ -458,6 +471,8 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { Ok(ColumnCloseResult { bytes_written: self.column_metrics.total_bytes_written, rows_written: self.column_metrics.total_rows_written, + #[cfg(feature = "bloom")] + bloom_filter: self.bloom_filter, metadata, column_index, offset_index, diff --git a/parquet/src/file/properties.rs b/parquet/src/file/properties.rs index c65ba8035ee..95ca6bbb1ec 100644 --- a/parquet/src/file/properties.rs +++ b/parquet/src/file/properties.rs @@ -64,6 +64,7 @@ //! .build(); //! ``` +use paste::paste; use std::{collections::HashMap, sync::Arc}; use crate::basic::{Compression, Encoding}; @@ -82,6 +83,9 @@ const DEFAULT_STATISTICS_ENABLED: EnabledStatistics = EnabledStatistics::Page; const DEFAULT_MAX_STATISTICS_SIZE: usize = 4096; const DEFAULT_MAX_ROW_GROUP_SIZE: usize = 1024 * 1024; const DEFAULT_CREATED_BY: &str = env!("PARQUET_CREATED_BY"); +const DEFAULT_BLOOM_FILTER_ENABLED: bool = false; +const DEFAULT_BLOOM_FILTER_MAX_BYTES: u32 = 1024 * 1024; +const DEFAULT_BLOOM_FILTER_FPP: f64 = 0.01; /// Parquet writer version. /// @@ -125,6 +129,26 @@ pub struct WriterProperties { sorting_columns: Option>, } +macro_rules! def_col_property_getter { + ($field:ident, $field_type:ty) => { + pub fn $field(&self, col: &ColumnPath) -> Option<$field_type> { + self.column_properties + .get(col) + .and_then(|c| c.$field()) + .or_else(|| self.default_column_properties.$field()) + } + }; + ($field:ident, $field_type:ty, $default_val:expr) => { + pub fn $field(&self, col: &ColumnPath) -> $field_type { + self.column_properties + .get(col) + .and_then(|c| c.$field()) + .or_else(|| self.default_column_properties.$field()) + .unwrap_or($default_val) + } + }; +} + impl WriterProperties { /// Returns builder for writer properties with default values. pub fn builder() -> WriterPropertiesBuilder { @@ -255,6 +279,11 @@ impl WriterProperties { .or_else(|| self.default_column_properties.max_statistics_size()) .unwrap_or(DEFAULT_MAX_STATISTICS_SIZE) } + + def_col_property_getter!(bloom_filter_enabled, bool, DEFAULT_BLOOM_FILTER_ENABLED); + def_col_property_getter!(bloom_filter_fpp, f64, DEFAULT_BLOOM_FILTER_FPP); + def_col_property_getter!(bloom_filter_ndv, u64); + def_col_property_getter!(bloom_filter_max_bytes, u32, DEFAULT_BLOOM_FILTER_MAX_BYTES); } /// Writer properties builder. @@ -272,6 +301,40 @@ pub struct WriterPropertiesBuilder { sorting_columns: Option>, } +macro_rules! def_opt_field_setter { + ($field: ident, $type: ty) => { + paste! { + pub fn [](&mut self, value: $type) -> &mut Self { + self.$field = Some(value); + self + } + } + }; +} + +macro_rules! def_opt_field_getter { + ($field: ident, $type: ty) => { + paste! { + #[doc = "Returns " $field " if set."] + pub fn $field(&self) -> Option<$type> { + self.$field + } + } + }; +} + +macro_rules! def_per_col_setter { + ($field:ident, $field_type:ty) => { + paste! { + #[doc = "Sets " $field " for a column. Takes precedence over globally defined settings."] + pub fn [](mut self, col: ColumnPath, value: $field_type) -> Self { + self.get_mut_props(col).[](value); + self + } + } + } +} + impl WriterPropertiesBuilder { /// Returns default state of the builder. fn with_defaults() -> Self { @@ -284,7 +347,7 @@ impl WriterPropertiesBuilder { writer_version: DEFAULT_WRITER_VERSION, created_by: DEFAULT_CREATED_BY.to_string(), key_value_metadata: None, - default_column_properties: ColumnProperties::new(), + default_column_properties: Default::default(), column_properties: HashMap::new(), sorting_columns: None, } @@ -439,7 +502,7 @@ impl WriterPropertiesBuilder { fn get_mut_props(&mut self, col: ColumnPath) -> &mut ColumnProperties { self.column_properties .entry(col) - .or_insert_with(ColumnProperties::new) + .or_insert_with(Default::default) } /// Sets encoding for a column. @@ -492,6 +555,11 @@ impl WriterPropertiesBuilder { self.get_mut_props(col).set_max_statistics_size(value); self } + + def_per_col_setter!(bloom_filter_enabled, bool); + def_per_col_setter!(bloom_filter_fpp, f64); + def_per_col_setter!(bloom_filter_max_bytes, u32); + def_per_col_setter!(bloom_filter_ndv, u64); } /// Controls the level of statistics to be computed by the writer @@ -515,27 +583,24 @@ impl Default for EnabledStatistics { /// /// If a field is `None`, it means that no specific value has been set for this column, /// so some subsequent or default value must be used. -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone, Default, PartialEq)] struct ColumnProperties { encoding: Option, codec: Option, dictionary_enabled: Option, statistics_enabled: Option, max_statistics_size: Option, + /// bloom filter enabled + bloom_filter_enabled: Option, + /// bloom filter expected number of distinct values + bloom_filter_ndv: Option, + /// bloom filter false positive probability + bloom_filter_fpp: Option, + /// bloom filter max number of bytes + bloom_filter_max_bytes: Option, } impl ColumnProperties { - /// Initialise column properties with default values. - fn new() -> Self { - Self { - encoding: None, - codec: None, - dictionary_enabled: None, - statistics_enabled: None, - max_statistics_size: None, - } - } - /// Sets encoding for this column. /// /// If dictionary is not enabled, this is treated as a primary encoding for a column. @@ -572,6 +637,11 @@ impl ColumnProperties { self.max_statistics_size = Some(value); } + def_opt_field_setter!(bloom_filter_enabled, bool); + def_opt_field_setter!(bloom_filter_fpp, f64); + def_opt_field_setter!(bloom_filter_max_bytes, u32); + def_opt_field_setter!(bloom_filter_ndv, u64); + /// Returns optional encoding for this column. fn encoding(&self) -> Option { self.encoding @@ -599,6 +669,11 @@ impl ColumnProperties { fn max_statistics_size(&self) -> Option { self.max_statistics_size } + + def_opt_field_getter!(bloom_filter_enabled, bool); + def_opt_field_getter!(bloom_filter_fpp, f64); + def_opt_field_getter!(bloom_filter_max_bytes, u32); + def_opt_field_getter!(bloom_filter_ndv, u64); } /// Reference counted reader properties. @@ -701,6 +776,13 @@ mod tests { props.max_statistics_size(&ColumnPath::from("col")), DEFAULT_MAX_STATISTICS_SIZE ); + assert_eq!(props.bloom_filter_enabled(&ColumnPath::from("col")), false); + assert_eq!(props.bloom_filter_fpp(&ColumnPath::from("col")), 0.01); + assert_eq!(props.bloom_filter_ndv(&ColumnPath::from("col")), None); + assert_eq!( + props.bloom_filter_max_bytes(&ColumnPath::from("col")), + 1024 * 1024 + ); } #[test] @@ -784,6 +866,10 @@ mod tests { EnabledStatistics::Chunk, ) .set_column_max_statistics_size(ColumnPath::from("col"), 123) + .set_column_bloom_filter_enabled(ColumnPath::from("col"), true) + .set_column_bloom_filter_ndv(ColumnPath::from("col"), 100) + .set_column_bloom_filter_fpp(ColumnPath::from("col"), 0.1) + .set_column_bloom_filter_max_bytes(ColumnPath::from("col"), 1000) .build(); assert_eq!(props.writer_version(), WriterVersion::PARQUET_2_0); diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs index b67bdccfe39..324a8349ccf 100644 --- a/parquet/src/file/writer.rs +++ b/parquet/src/file/writer.rs @@ -18,10 +18,11 @@ //! Contains file writer API, and provides methods to write row groups and columns by //! using row group writers and column writers respectively. -use std::{io::Write, sync::Arc}; - +#[cfg(feature = "bloom")] +use crate::bloom_filter::Sbbf; use crate::format as parquet; use crate::format::{ColumnIndex, OffsetIndex, RowGroup}; +use std::{io::Write, sync::Arc}; use thrift::protocol::{TCompactOutputProtocol, TOutputProtocol, TSerializable}; use crate::basic::PageType; @@ -116,6 +117,8 @@ pub struct SerializedFileWriter { descr: SchemaDescPtr, props: WriterPropertiesPtr, row_groups: Vec, + #[cfg(feature = "bloom")] + bloom_filters: Vec>>, column_indexes: Vec>>, offset_indexes: Vec>>, row_group_index: usize, @@ -132,6 +135,8 @@ impl SerializedFileWriter { descr: Arc::new(SchemaDescriptor::new(schema)), props: properties, row_groups: vec![], + #[cfg(feature = "bloom")] + bloom_filters: vec![], column_indexes: Vec::new(), offset_indexes: Vec::new(), row_group_index: 0, @@ -212,6 +217,37 @@ impl SerializedFileWriter { Ok(()) } + #[cfg(feature = "bloom")] + /// Serialize all the bloom filter to the file + fn write_bloom_filters(&mut self, row_groups: &mut [RowGroup]) -> Result<()> { + // iter row group + // iter each column + // write bloom filter to the file + for (row_group_idx, row_group) in row_groups.iter_mut().enumerate() { + for (column_idx, column_metadata) in row_group.columns.iter_mut().enumerate() + { + match &self.bloom_filters[row_group_idx][column_idx] { + Some(bloom_filter) => { + let start_offset = self.buf.bytes_written(); + let mut protocol = TCompactOutputProtocol::new(&mut self.buf); + let header = bloom_filter.header(); + header.write_to_out_protocol(&mut protocol)?; + protocol.flush()?; + bloom_filter.write_bitset(&mut self.buf)?; + // set offset and index for bloom filter + column_metadata + .meta_data + .as_mut() + .expect("can't have bloom filter without column metadata") + .bloom_filter_offset = Some(start_offset as i64); + } + None => {} + } + } + } + Ok(()) + } + /// Serialize all the column index to the file fn write_column_indexes(&mut self, row_groups: &mut [RowGroup]) -> Result<()> { // iter row group @@ -250,6 +286,8 @@ impl SerializedFileWriter { .map(|v| v.to_thrift()) .collect::>(); + #[cfg(feature = "bloom")] + self.write_bloom_filters(&mut row_groups)?; // Write column indexes and offset indexes self.write_column_indexes(&mut row_groups)?; self.write_offset_indexes(&mut row_groups)?; @@ -320,6 +358,8 @@ pub struct SerializedRowGroupWriter<'a, W: Write> { column_index: usize, row_group_metadata: Option, column_chunks: Vec, + #[cfg(feature = "bloom")] + bloom_filters: Vec>, column_indexes: Vec>, offset_indexes: Vec>, on_close: Option>, @@ -348,6 +388,8 @@ impl<'a, W: Write> SerializedRowGroupWriter<'a, W> { column_index: 0, row_group_metadata: None, column_chunks: Vec::with_capacity(num_columns), + #[cfg(feature = "bloom")] + bloom_filters: Vec::with_capacity(num_columns), column_indexes: Vec::with_capacity(num_columns), offset_indexes: Vec::with_capacity(num_columns), total_bytes_written: 0, @@ -380,11 +422,15 @@ impl<'a, W: Write> SerializedRowGroupWriter<'a, W> { let column_chunks = &mut self.column_chunks; let column_indexes = &mut self.column_indexes; let offset_indexes = &mut self.offset_indexes; + #[cfg(feature = "bloom")] + let bloom_filters = &mut self.bloom_filters; let on_close = |r: ColumnCloseResult| { // Update row group writer metrics *total_bytes_written += r.bytes_written; column_chunks.push(r.metadata); + #[cfg(feature = "bloom")] + bloom_filters.push(r.bloom_filter); column_indexes.push(r.column_index); offset_indexes.push(r.offset_index); @@ -623,6 +669,7 @@ impl<'a, W: Write> PageWriter for SerializedPageWriter<'a, W> { Ok(spec) } + fn write_metadata(&mut self, metadata: &ColumnChunkMetaData) -> Result<()> { let mut protocol = TCompactOutputProtocol::new(&mut self.sink); metadata