From 06cfa35f446a436baf4831cd38aef3f7772e7b78 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Thu, 7 Jul 2022 08:54:57 -0400 Subject: [PATCH 1/5] Fix parquet writer statistics --- parquet/src/arrow/schema.rs | 6 +- parquet/src/column/writer.rs | 167 +++++++++++++++++++++++---------- parquet/src/file/properties.rs | 97 ++++++++----------- 3 files changed, 162 insertions(+), 108 deletions(-) diff --git a/parquet/src/arrow/schema.rs b/parquet/src/arrow/schema.rs index a65c7585327..5e574f39d57 100644 --- a/parquet/src/arrow/schema.rs +++ b/parquet/src/arrow/schema.rs @@ -152,7 +152,10 @@ pub(crate) fn add_encoded_arrow_schema_to_metadata( value: Some(encoded), }; - let mut meta = props.key_value_metadata.clone().unwrap_or_default(); + let meta = props + .key_value_metadata + .get_or_insert_with(|| Default::default()); + // check if ARROW:schema exists, and overwrite it let schema_meta = meta .iter() @@ -167,7 +170,6 @@ pub(crate) fn add_encoded_arrow_schema_to_metadata( meta.push(schema_kv); } } - props.key_value_metadata = Some(meta); } /// Convert arrow schema to parquet schema diff --git a/parquet/src/column/writer.rs b/parquet/src/column/writer.rs index d589aef5a50..b8c21733957 100644 --- a/parquet/src/column/writer.rs +++ b/parquet/src/column/writer.rs @@ -31,6 +31,7 @@ use crate::encodings::{ }; use crate::errors::{ParquetError, Result}; use crate::file::metadata::{ColumnIndexBuilder, OffsetIndexBuilder}; +use crate::file::properties::EnabledStatistics; use crate::file::statistics::Statistics; use crate::file::{ metadata::ColumnChunkMetaData, @@ -177,6 +178,8 @@ pub struct ColumnWriterImpl<'a, T: DataType> { // Column writer properties descr: ColumnDescPtr, props: WriterPropertiesPtr, + statistics_enabled: EnabledStatistics, + page_writer: Box, has_dictionary: bool, dict_encoder: Option>, @@ -243,9 +246,12 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { ) .unwrap(); + let statistics_enabled = props.statistics_enabled(descr.path()); + Self { descr, props, + statistics_enabled, page_writer, has_dictionary, dict_encoder, @@ -302,53 +308,71 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { // Find out the minimal length to prevent index out of bound errors. let mut min_len = values.len(); if let Some(levels) = def_levels { - min_len = cmp::min(min_len, levels.len()); + min_len = min_len.min(levels.len()); } if let Some(levels) = rep_levels { - min_len = cmp::min(min_len, levels.len()); + min_len = min_len.min(levels.len()); } // Find out number of batches to process. let write_batch_size = self.props.write_batch_size(); let num_batches = min_len / write_batch_size; - // Process pre-calculated statistics - match (min, max) { - (Some(min), Some(max)) => { - if self - .min_column_value - .as_ref() - .map_or(true, |v| self.compare_greater(v, min)) - { - self.min_column_value = Some(min.clone()); + if self.statistics_enabled == EnabledStatistics::Chunk { + match (min, max) { + (Some(min), Some(max)) => { + if self + .min_column_value + .as_ref() + .map_or(true, |v| self.compare_greater(v, min)) + { + self.min_column_value = Some(min.clone()); + } + + if self + .max_column_value + .as_ref() + .map_or(true, |v| self.compare_greater(max, v)) + { + self.max_column_value = Some(max.clone()); + } } - if self - .max_column_value - .as_ref() - .map_or(true, |v| self.compare_greater(max, v)) - { - self.max_column_value = Some(max.clone()); + (None, Some(_)) | (Some(_), None) => { + panic!("min/max should be both set or both None") } - } - (None, Some(_)) | (Some(_), None) => { - panic!("min/max should be both set or both None") - } - (None, None) => {} + (None, None) => { + for val in values { + if self + .min_column_value + .as_ref() + .map_or(true, |v| self.compare_greater(v, val)) + { + self.min_column_value = Some(val.clone()); + } + + if self + .max_column_value + .as_ref() + .map_or(true, |v| self.compare_greater(val, v)) + { + self.max_column_value = Some(val.clone()); + } + } + } + }; } - if let Some(distinct) = distinct_count { - self.column_distinct_count = - Some(self.column_distinct_count.unwrap_or(0) + distinct); + // We can only set the distinct count if there are no other writes + if self.num_buffered_values == 0 && self.num_page_nulls == 0 { + self.column_distinct_count = distinct_count; + } else { + self.column_distinct_count = None; } if let Some(nulls) = null_count { self.num_column_nulls += nulls; } - let calculate_page_stats = (min.is_none() || max.is_none()) - && null_count.is_none() - && distinct_count.is_none(); - let mut values_offset = 0; let mut levels_offset = 0; for _ in 0..num_batches { @@ -356,7 +380,6 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { &values[values_offset..values_offset + write_batch_size], def_levels.map(|lv| &lv[levels_offset..levels_offset + write_batch_size]), rep_levels.map(|lv| &lv[levels_offset..levels_offset + write_batch_size]), - calculate_page_stats, )?; levels_offset += write_batch_size; } @@ -365,7 +388,6 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { &values[values_offset..], def_levels.map(|lv| &lv[levels_offset..]), rep_levels.map(|lv| &lv[levels_offset..]), - calculate_page_stats, )?; // Return total number of values processed. @@ -393,9 +415,11 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { self.write_batch_internal(values, def_levels, rep_levels, None, None, None, None) } - /// Writer may optionally provide pre-calculated statistics for this batch, in which case we do - /// not calculate page level statistics as this will defeat the purpose of speeding up the write - /// process with pre-calculated statistics. + /// Writer may optionally provide pre-calculated statistics for this batch + /// + /// Note: Unless disabled using by using [`WriterProperties`] to set + /// enabled statistics to [`EnabledStatistics::Chunk`], this will still compute + /// per-page statistics ignoring the data passed pub fn write_batch_with_statistics( &mut self, values: &[T::T], @@ -466,7 +490,6 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { values: &[T::T], def_levels: Option<&[i16]>, rep_levels: Option<&[i16]>, - calculate_page_stats: bool, ) -> Result { let mut values_to_write = 0; @@ -494,7 +517,7 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { for &level in levels { if level == self.descr.max_def_level() { values_to_write += 1; - } else if calculate_page_stats { + } else if self.statistics_enabled == EnabledStatistics::Page { self.num_page_nulls += 1 } } @@ -537,7 +560,7 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { ) })?; - if calculate_page_stats { + if self.statistics_enabled == EnabledStatistics::Page { for val in values_to_write { self.update_page_min_max(val); } @@ -549,7 +572,7 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { self.num_buffered_encoded_values += u32::try_from(values_to_write.len()).unwrap(); if self.should_add_data_page() { - self.add_data_page(calculate_page_stats)?; + self.add_data_page()?; } if self.should_dict_fallback() { @@ -661,7 +684,7 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { /// Adds data page. /// Data page is either buffered in case of dictionary encoding or written directly. - fn add_data_page(&mut self, calculate_page_stat: bool) -> Result<()> { + fn add_data_page(&mut self) -> Result<()> { // Extract encoded values let value_bytes = match self.dict_encoder { Some(ref mut encoder) => encoder.write_indices()?, @@ -678,14 +701,14 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { let max_def_level = self.descr.max_def_level(); let max_rep_level = self.descr.max_rep_level(); - // always update column NULL count, no matter if page stats are used self.num_column_nulls += self.num_page_nulls; - let page_statistics = if calculate_page_stat { - self.update_column_min_max(); - Some(self.make_page_statistics()) - } else { - None + let page_statistics = match self.statistics_enabled { + EnabledStatistics::Page => { + self.update_column_min_max(); + Some(self.make_page_statistics()) + } + _ => None, }; // update column and offset index @@ -810,10 +833,8 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { #[inline] fn flush_data_pages(&mut self) -> Result<()> { // Write all outstanding data to a new page. - let calculate_page_stats = - self.min_page_value.is_some() && self.max_page_value.is_some(); if self.num_buffered_values > 0 { - self.add_data_page(calculate_page_stats)?; + self.add_data_page()?; } while let Some(page) = self.data_pages.pop_front() { @@ -1754,6 +1775,56 @@ mod tests { } } + #[test] + fn test_mixed_precomputed_statistics() { + let mut buf = Vec::with_capacity(100); + let mut write = TrackedWrite::new(&mut buf); + let page_writer = Box::new(SerializedPageWriter::new(&mut write)); + let props = Arc::new(WriterProperties::builder().build()); + let mut writer = get_test_column_writer::(page_writer, 0, 0, props); + + writer.write_batch(&[1, 2, 3, 4], None, None).unwrap(); + writer + .write_batch_with_statistics( + &[5, 6, 7], + None, + None, + Some(&5), + Some(&7), + Some(0), + Some(3), + ) + .unwrap(); + + let (_, _, metadata, _, _) = writer.close().unwrap(); + + let stats = metadata.statistics().unwrap(); + assert_eq!(stats.min_bytes(), 1_i32.to_le_bytes()); + assert_eq!(stats.max_bytes(), 7_i32.to_le_bytes()); + assert_eq!(stats.null_count(), 0); + assert!(stats.distinct_count().is_none()); + + let reader = SerializedPageReader::new( + std::io::Cursor::new(buf), + 7, + Compression::UNCOMPRESSED, + Type::INT32, + ) + .unwrap(); + + let pages = reader.collect::>>().unwrap(); + assert_eq!(pages.len(), 2); + + assert_eq!(pages[0].page_type(), PageType::DICTIONARY_PAGE); + assert_eq!(pages[1].page_type(), PageType::DATA_PAGE); + + let page_statistics = pages[1].statistics().unwrap(); + assert_eq!(page_statistics.min_bytes(), 1_i32.to_le_bytes()); + assert_eq!(page_statistics.max_bytes(), 7_i32.to_le_bytes()); + assert_eq!(page_statistics.null_count(), 0); + assert!(page_statistics.distinct_count().is_none()); + } + #[test] fn test_column_writer_empty_column_roundtrip() { let props = WriterProperties::builder().build(); diff --git a/parquet/src/file/properties.rs b/parquet/src/file/properties.rs index 2baf93933cb..9d195667bee 100644 --- a/parquet/src/file/properties.rs +++ b/parquet/src/file/properties.rs @@ -60,7 +60,7 @@ const DEFAULT_WRITER_VERSION: WriterVersion = WriterVersion::PARQUET_1_0; const DEFAULT_COMPRESSION: Compression = Compression::UNCOMPRESSED; const DEFAULT_DICTIONARY_ENABLED: bool = true; const DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT: usize = DEFAULT_PAGE_SIZE; -const DEFAULT_STATISTICS_ENABLED: bool = true; +const DEFAULT_STATISTICS_ENABLED: EnabledStatistics = EnabledStatistics::Page; const DEFAULT_MAX_STATISTICS_SIZE: usize = 4096; const DEFAULT_MAX_ROW_GROUP_SIZE: usize = 1024 * 1024; const DEFAULT_CREATED_BY: &str = env!("PARQUET_CREATED_BY"); @@ -198,23 +198,13 @@ impl WriterProperties { } /// Returns `true` if statistics are enabled for a column. - pub fn statistics_enabled(&self, col: &ColumnPath) -> bool { + pub fn statistics_enabled(&self, col: &ColumnPath) -> EnabledStatistics { self.column_properties .get(col) .and_then(|c| c.statistics_enabled()) .or_else(|| self.default_column_properties.statistics_enabled()) .unwrap_or(DEFAULT_STATISTICS_ENABLED) } - - /// Returns max size for statistics. - /// Only applicable if statistics are enabled. - pub fn max_statistics_size(&self, col: &ColumnPath) -> usize { - self.column_properties - .get(col) - .and_then(|c| c.max_statistics_size()) - .or_else(|| self.default_column_properties.max_statistics_size()) - .unwrap_or(DEFAULT_MAX_STATISTICS_SIZE) - } } /// Writer properties builder. @@ -339,19 +329,11 @@ impl WriterPropertiesBuilder { } /// Sets flag to enable/disable statistics for any column. - pub fn set_statistics_enabled(mut self, value: bool) -> Self { + pub fn set_statistics_enabled(mut self, value: EnabledStatistics) -> Self { self.default_column_properties.set_statistics_enabled(value); self } - /// Sets max statistics size for any column. - /// Applicable only if statistics are enabled. - pub fn set_max_statistics_size(mut self, value: usize) -> Self { - self.default_column_properties - .set_max_statistics_size(value); - self - } - // ---------------------------------------------------------------------- // Setters for a specific column @@ -394,23 +376,33 @@ impl WriterPropertiesBuilder { /// Sets flag to enable/disable statistics for a column. /// Takes precedence over globally defined settings. - pub fn set_column_statistics_enabled(mut self, col: ColumnPath, value: bool) -> Self { - self.get_mut_props(col).set_statistics_enabled(value); - self - } - - /// Sets max size for statistics for a column. - /// Takes precedence over globally defined settings. - pub fn set_column_max_statistics_size( + pub fn set_column_statistics_enabled( mut self, col: ColumnPath, - value: usize, + value: EnabledStatistics, ) -> Self { - self.get_mut_props(col).set_max_statistics_size(value); + self.get_mut_props(col).set_statistics_enabled(value); self } } +/// Controls the level of statistics to be computed by the writer +#[derive(Debug, Clone, Copy, Eq, PartialEq)] +pub enum EnabledStatistics { + /// Compute no statistics + None, + /// Compute chunk-level statistics but not page-level + Chunk, + /// Compute page-level statistics + Page, +} + +impl Default for EnabledStatistics { + fn default() -> Self { + DEFAULT_STATISTICS_ENABLED + } +} + /// Container for column properties that can be changed as part of writer. /// /// If a field is `None`, it means that no specific value has been set for this column, @@ -420,8 +412,7 @@ struct ColumnProperties { encoding: Option, codec: Option, dictionary_enabled: Option, - statistics_enabled: Option, - max_statistics_size: Option, + statistics_enabled: Option, } impl ColumnProperties { @@ -432,7 +423,6 @@ impl ColumnProperties { codec: None, dictionary_enabled: None, statistics_enabled: None, - max_statistics_size: None, } } @@ -463,15 +453,10 @@ impl ColumnProperties { } /// Sets whether or not statistics are enabled for this column. - fn set_statistics_enabled(&mut self, enabled: bool) { + fn set_statistics_enabled(&mut self, enabled: EnabledStatistics) { self.statistics_enabled = Some(enabled); } - /// Sets max size for statistics for this column. - fn set_max_statistics_size(&mut self, value: usize) { - self.max_statistics_size = Some(value); - } - /// Returns optional encoding for this column. fn encoding(&self) -> Option { self.encoding @@ -491,14 +476,9 @@ impl ColumnProperties { /// Returns `Some(true)` if statistics are enabled for this column, if disabled then /// returns `Some(false)`. If result is `None`, then no setting has been provided. - fn statistics_enabled(&self) -> Option { + fn statistics_enabled(&self) -> Option { self.statistics_enabled } - - /// Returns optional max size in bytes for statistics. - fn max_statistics_size(&self) -> Option { - self.max_statistics_size - } } #[cfg(test)] @@ -537,10 +517,6 @@ mod tests { props.statistics_enabled(&ColumnPath::from("col")), DEFAULT_STATISTICS_ENABLED ); - assert_eq!( - props.max_statistics_size(&ColumnPath::from("col")), - DEFAULT_MAX_STATISTICS_SIZE - ); } #[test] @@ -613,14 +589,15 @@ mod tests { .set_encoding(Encoding::DELTA_BINARY_PACKED) .set_compression(Compression::GZIP) .set_dictionary_enabled(false) - .set_statistics_enabled(false) - .set_max_statistics_size(50) + .set_statistics_enabled(EnabledStatistics::None) // specific column settings .set_column_encoding(ColumnPath::from("col"), Encoding::RLE) .set_column_compression(ColumnPath::from("col"), Compression::SNAPPY) .set_column_dictionary_enabled(ColumnPath::from("col"), true) - .set_column_statistics_enabled(ColumnPath::from("col"), true) - .set_column_max_statistics_size(ColumnPath::from("col"), 123) + .set_column_statistics_enabled( + ColumnPath::from("col"), + EnabledStatistics::Chunk, + ) .build(); assert_eq!(props.writer_version(), WriterVersion::PARQUET_2_0); @@ -642,8 +619,10 @@ mod tests { ); assert_eq!(props.compression(&ColumnPath::from("a")), Compression::GZIP); assert!(!props.dictionary_enabled(&ColumnPath::from("a"))); - assert!(!props.statistics_enabled(&ColumnPath::from("a"))); - assert_eq!(props.max_statistics_size(&ColumnPath::from("a")), 50); + assert_eq!( + props.statistics_enabled(&ColumnPath::from("a")), + EnabledStatistics::None + ); assert_eq!( props.encoding(&ColumnPath::from("col")), @@ -654,8 +633,10 @@ mod tests { Compression::SNAPPY ); assert!(props.dictionary_enabled(&ColumnPath::from("col"))); - assert!(props.statistics_enabled(&ColumnPath::from("col"))); - assert_eq!(props.max_statistics_size(&ColumnPath::from("col")), 123); + assert_eq!( + props.statistics_enabled(&ColumnPath::from("col")), + EnabledStatistics::Chunk + ); } #[test] From a9f74845afed3f162faa49ada94937cfa4005d7a Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Thu, 7 Jul 2022 09:21:04 -0400 Subject: [PATCH 2/5] Fix test_column_writer_precalculated_statistics --- parquet/src/column/writer.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/parquet/src/column/writer.rs b/parquet/src/column/writer.rs index b8c21733957..4e742ad6816 100644 --- a/parquet/src/column/writer.rs +++ b/parquet/src/column/writer.rs @@ -1734,7 +1734,11 @@ mod tests { #[test] fn test_column_writer_precalculated_statistics() { let page_writer = get_test_page_writer(); - let props = Arc::new(WriterProperties::builder().build()); + let props = Arc::new( + WriterProperties::builder() + .set_statistics_enabled(EnabledStatistics::Chunk) + .build(), + ); let mut writer = get_test_column_writer::(page_writer, 0, 0, props); writer .write_batch_with_statistics( From e25136698c17724b8544f0debfb804ba75da3ad0 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Thu, 7 Jul 2022 12:36:37 -0400 Subject: [PATCH 3/5] Handle NaN floats --- parquet/src/column/writer.rs | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/parquet/src/column/writer.rs b/parquet/src/column/writer.rs index 4e742ad6816..91185412841 100644 --- a/parquet/src/column/writer.rs +++ b/parquet/src/column/writer.rs @@ -342,6 +342,13 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { } (None, None) => { for val in values { + if let Type::FLOAT | Type::DOUBLE = T::get_physical_type() { + // Skip NaN values + if val != val { + continue; + } + } + if self .min_column_value .as_ref() @@ -703,8 +710,9 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { self.num_column_nulls += self.num_page_nulls; + let has_min_max = self.min_page_value.is_some() && self.max_page_value.is_some(); let page_statistics = match self.statistics_enabled { - EnabledStatistics::Page => { + EnabledStatistics::Page if has_min_max => { self.update_column_min_max(); Some(self.make_page_statistics()) } From 30491113d8596bd2ef6e9a6387d7119b881f156b Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Thu, 7 Jul 2022 13:23:37 -0400 Subject: [PATCH 4/5] Reduce code duplication --- parquet/src/arrow/schema.rs | 2 +- parquet/src/column/writer.rs | 126 +++++++++++++---------------------- 2 files changed, 47 insertions(+), 81 deletions(-) diff --git a/parquet/src/arrow/schema.rs b/parquet/src/arrow/schema.rs index 5e574f39d57..97611d0ec30 100644 --- a/parquet/src/arrow/schema.rs +++ b/parquet/src/arrow/schema.rs @@ -154,7 +154,7 @@ pub(crate) fn add_encoded_arrow_schema_to_metadata( let meta = props .key_value_metadata - .get_or_insert_with(|| Default::default()); + .get_or_insert_with(Default::default); // check if ARROW:schema exists, and overwrite it let schema_meta = meta diff --git a/parquet/src/column/writer.rs b/parquet/src/column/writer.rs index 91185412841..fa174b781c7 100644 --- a/parquet/src/column/writer.rs +++ b/parquet/src/column/writer.rs @@ -17,7 +17,7 @@ //! Contains column writer API. use parquet_format::{ColumnIndex, OffsetIndex}; -use std::{cmp, collections::VecDeque, convert::TryFrom, marker::PhantomData}; +use std::{collections::VecDeque, convert::TryFrom, marker::PhantomData}; use crate::basic::{Compression, ConvertedType, Encoding, LogicalType, PageType, Type}; use crate::column::page::{CompressedPage, Page, PageWriteSpec, PageWriter}; @@ -37,7 +37,7 @@ use crate::file::{ metadata::ColumnChunkMetaData, properties::{WriterProperties, WriterPropertiesPtr, WriterVersion}, }; -use crate::schema::types::ColumnDescPtr; +use crate::schema::types::{ColumnDescPtr, ColumnDescriptor}; use crate::util::bit_util::FromBytes; use crate::util::memory::ByteBufferPtr; @@ -321,49 +321,16 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { if self.statistics_enabled == EnabledStatistics::Chunk { match (min, max) { (Some(min), Some(max)) => { - if self - .min_column_value - .as_ref() - .map_or(true, |v| self.compare_greater(v, min)) - { - self.min_column_value = Some(min.clone()); - } - - if self - .max_column_value - .as_ref() - .map_or(true, |v| self.compare_greater(max, v)) - { - self.max_column_value = Some(max.clone()); - } + Self::update_min(&self.descr, min, &mut self.min_column_value); + Self::update_max(&self.descr, max, &mut self.max_column_value); } (None, Some(_)) | (Some(_), None) => { panic!("min/max should be both set or both None") } (None, None) => { for val in values { - if let Type::FLOAT | Type::DOUBLE = T::get_physical_type() { - // Skip NaN values - if val != val { - continue; - } - } - - if self - .min_column_value - .as_ref() - .map_or(true, |v| self.compare_greater(v, val)) - { - self.min_column_value = Some(val.clone()); - } - - if self - .max_column_value - .as_ref() - .map_or(true, |v| self.compare_greater(val, v)) - { - self.max_column_value = Some(val.clone()); - } + Self::update_min(&self.descr, val, &mut self.min_column_value); + Self::update_max(&self.descr, val, &mut self.max_column_value); } } }; @@ -1058,8 +1025,36 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { } } - #[allow(clippy::eq_op)] fn update_page_min_max(&mut self, val: &T::T) { + Self::update_min(&self.descr, val, &mut self.min_page_value); + Self::update_max(&self.descr, val, &mut self.max_page_value); + } + + fn update_column_min_max(&mut self) { + let min = self.min_page_value.as_ref().unwrap(); + Self::update_min(&self.descr, min, &mut self.min_column_value); + + let max = self.max_page_value.as_ref().unwrap(); + Self::update_max(&self.descr, max, &mut self.max_column_value); + } + + fn update_min(descr: &ColumnDescriptor, val: &T::T, min: &mut Option) { + Self::update_stat(val, min, |cur| Self::compare_greater(descr, cur, val)) + } + + fn update_max(descr: &ColumnDescriptor, val: &T::T, max: &mut Option) { + Self::update_stat(val, max, |cur| Self::compare_greater(descr, val, cur)) + } + + /// Perform a conditional update of `cur`, skipping any NaN values + /// + /// If `cur` is `None`, sets `cur` to `Some(val)`, otherwise calls `should_update` with + /// the value of `cur`, and updates `cur` to `Some(val)` if it returns `true` + #[allow(clippy::eq_op)] + fn update_stat(val: &T::T, cur: &mut Option, should_update: F) + where + F: Fn(&T::T) -> bool, + { if let Type::FLOAT | Type::DOUBLE = T::get_physical_type() { // Skip NaN values if val != val { @@ -1067,50 +1062,21 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { } } - if self - .min_page_value - .as_ref() - .map_or(true, |min| self.compare_greater(min, val)) - { - self.min_page_value = Some(val.clone()); - } - if self - .max_page_value - .as_ref() - .map_or(true, |max| self.compare_greater(val, max)) - { - self.max_page_value = Some(val.clone()); - } - } - - fn update_column_min_max(&mut self) { - let update_min = self.min_column_value.as_ref().map_or(true, |min| { - let page_value = self.min_page_value.as_ref().unwrap(); - self.compare_greater(min, page_value) - }); - if update_min { - self.min_column_value = self.min_page_value.clone(); - } - - let update_max = self.max_column_value.as_ref().map_or(true, |max| { - let page_value = self.max_page_value.as_ref().unwrap(); - self.compare_greater(page_value, max) - }); - if update_max { - self.max_column_value = self.max_page_value.clone(); + if cur.as_ref().map_or(true, should_update) { + *cur = Some(val.clone()); } } /// Evaluate `a > b` according to underlying logical type. - fn compare_greater(&self, a: &T::T, b: &T::T) -> bool { - if let Some(LogicalType::Integer { is_signed, .. }) = self.descr.logical_type() { + fn compare_greater(descr: &ColumnDescriptor, a: &T::T, b: &T::T) -> bool { + if let Some(LogicalType::Integer { is_signed, .. }) = descr.logical_type() { if !is_signed { // need to compare unsigned return a.as_u64().unwrap() > b.as_u64().unwrap(); } } - match self.descr.converted_type() { + match descr.converted_type() { ConvertedType::UINT_8 | ConvertedType::UINT_16 | ConvertedType::UINT_32 @@ -1120,8 +1086,8 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { _ => {} }; - if let Some(LogicalType::Decimal { .. }) = self.descr.logical_type() { - match self.descr.physical_type() { + if let Some(LogicalType::Decimal { .. }) = descr.logical_type() { + match T::get_physical_type() { Type::FIXED_LEN_BYTE_ARRAY | Type::BYTE_ARRAY => { return compare_greater_byte_array_decimals( a.as_bytes(), @@ -1132,8 +1098,8 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { }; } - if self.descr.converted_type() == ConvertedType::DECIMAL { - match self.descr.physical_type() { + if descr.converted_type() == ConvertedType::DECIMAL { + match T::get_physical_type() { Type::FIXED_LEN_BYTE_ARRAY | Type::BYTE_ARRAY => { return compare_greater_byte_array_decimals( a.as_bytes(), @@ -2362,10 +2328,10 @@ mod tests { let mut max_batch_size = values.len(); if let Some(levels) = def_levels { - max_batch_size = cmp::max(max_batch_size, levels.len()); + max_batch_size = max_batch_size.max(levels.len()); } if let Some(levels) = rep_levels { - max_batch_size = cmp::max(max_batch_size, levels.len()); + max_batch_size = max_batch_size.max(levels.len()); } let mut writer = get_test_column_writer::( From d7a4a89c3a66548bb1b8b9e1e9d6901166e33dc5 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Fri, 8 Jul 2022 10:30:11 -0400 Subject: [PATCH 5/5] Review feedback --- parquet/src/column/writer.rs | 13 ++++++--- parquet/src/file/properties.rs | 51 +++++++++++++++++++++++++++++++++- 2 files changed, 59 insertions(+), 5 deletions(-) diff --git a/parquet/src/column/writer.rs b/parquet/src/column/writer.rs index fa174b781c7..5def721353a 100644 --- a/parquet/src/column/writer.rs +++ b/parquet/src/column/writer.rs @@ -318,6 +318,9 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { let write_batch_size = self.props.write_batch_size(); let num_batches = min_len / write_batch_size; + // If only computing chunk-level statistics compute them here, page-level statistics + // are computed in [`Self::write_mini_batch`] and used to update chunk statistics in + // [`Self::add_data_page`] if self.statistics_enabled == EnabledStatistics::Chunk { match (min, max) { (Some(min), Some(max)) => { @@ -389,11 +392,13 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { self.write_batch_internal(values, def_levels, rep_levels, None, None, None, None) } - /// Writer may optionally provide pre-calculated statistics for this batch + /// Writer may optionally provide pre-calculated statistics for use when computing + /// chunk-level statistics /// - /// Note: Unless disabled using by using [`WriterProperties`] to set - /// enabled statistics to [`EnabledStatistics::Chunk`], this will still compute - /// per-page statistics ignoring the data passed + /// NB: [`WriterProperties::statistics_enabled`] must be set to [`EnabledStatistics::Chunk`] + /// for these statistics to take effect. If [`EnabledStatistics::None`] they will be ignored, + /// and if [`EnabledStatistics::Page`] the chunk statistics will instead be computed from the + /// computed page statistics pub fn write_batch_with_statistics( &mut self, values: &[T::T], diff --git a/parquet/src/file/properties.rs b/parquet/src/file/properties.rs index 9d195667bee..9ca7c4daa59 100644 --- a/parquet/src/file/properties.rs +++ b/parquet/src/file/properties.rs @@ -205,6 +205,16 @@ impl WriterProperties { .or_else(|| self.default_column_properties.statistics_enabled()) .unwrap_or(DEFAULT_STATISTICS_ENABLED) } + + /// Returns max size for statistics. + /// Only applicable if statistics are enabled. + pub fn max_statistics_size(&self, col: &ColumnPath) -> usize { + self.column_properties + .get(col) + .and_then(|c| c.max_statistics_size()) + .or_else(|| self.default_column_properties.max_statistics_size()) + .unwrap_or(DEFAULT_MAX_STATISTICS_SIZE) + } } /// Writer properties builder. @@ -334,6 +344,14 @@ impl WriterPropertiesBuilder { self } + /// Sets max statistics size for any column. + /// Applicable only if statistics are enabled. + pub fn set_max_statistics_size(mut self, value: usize) -> Self { + self.default_column_properties + .set_max_statistics_size(value); + self + } + // ---------------------------------------------------------------------- // Setters for a specific column @@ -384,6 +402,17 @@ impl WriterPropertiesBuilder { self.get_mut_props(col).set_statistics_enabled(value); self } + + /// Sets max size for statistics for a column. + /// Takes precedence over globally defined settings. + pub fn set_column_max_statistics_size( + mut self, + col: ColumnPath, + value: usize, + ) -> Self { + self.get_mut_props(col).set_max_statistics_size(value); + self + } } /// Controls the level of statistics to be computed by the writer @@ -393,7 +422,7 @@ pub enum EnabledStatistics { None, /// Compute chunk-level statistics but not page-level Chunk, - /// Compute page-level statistics + /// Compute page-level and chunk-level statistics Page, } @@ -413,6 +442,7 @@ struct ColumnProperties { codec: Option, dictionary_enabled: Option, statistics_enabled: Option, + max_statistics_size: Option, } impl ColumnProperties { @@ -423,6 +453,7 @@ impl ColumnProperties { codec: None, dictionary_enabled: None, statistics_enabled: None, + max_statistics_size: None, } } @@ -457,6 +488,11 @@ impl ColumnProperties { self.statistics_enabled = Some(enabled); } + /// Sets max size for statistics for this column. + fn set_max_statistics_size(&mut self, value: usize) { + self.max_statistics_size = Some(value); + } + /// Returns optional encoding for this column. fn encoding(&self) -> Option { self.encoding @@ -479,6 +515,11 @@ impl ColumnProperties { fn statistics_enabled(&self) -> Option { self.statistics_enabled } + + /// Returns optional max size in bytes for statistics. + fn max_statistics_size(&self) -> Option { + self.max_statistics_size + } } #[cfg(test)] @@ -517,6 +558,10 @@ mod tests { props.statistics_enabled(&ColumnPath::from("col")), DEFAULT_STATISTICS_ENABLED ); + assert_eq!( + props.max_statistics_size(&ColumnPath::from("col")), + DEFAULT_MAX_STATISTICS_SIZE + ); } #[test] @@ -590,6 +635,7 @@ mod tests { .set_compression(Compression::GZIP) .set_dictionary_enabled(false) .set_statistics_enabled(EnabledStatistics::None) + .set_max_statistics_size(50) // specific column settings .set_column_encoding(ColumnPath::from("col"), Encoding::RLE) .set_column_compression(ColumnPath::from("col"), Compression::SNAPPY) @@ -598,6 +644,7 @@ mod tests { ColumnPath::from("col"), EnabledStatistics::Chunk, ) + .set_column_max_statistics_size(ColumnPath::from("col"), 123) .build(); assert_eq!(props.writer_version(), WriterVersion::PARQUET_2_0); @@ -623,6 +670,7 @@ mod tests { props.statistics_enabled(&ColumnPath::from("a")), EnabledStatistics::None ); + assert_eq!(props.max_statistics_size(&ColumnPath::from("a")), 50); assert_eq!( props.encoding(&ColumnPath::from("col")), @@ -637,6 +685,7 @@ mod tests { props.statistics_enabled(&ColumnPath::from("col")), EnabledStatistics::Chunk ); + assert_eq!(props.max_statistics_size(&ColumnPath::from("col")), 123); } #[test]