From 06cfa35f446a436baf4831cd38aef3f7772e7b78 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Thu, 7 Jul 2022 08:54:57 -0400 Subject: [PATCH] Fix parquet writer statistics --- parquet/src/arrow/schema.rs | 6 +- parquet/src/column/writer.rs | 167 +++++++++++++++++++++++---------- parquet/src/file/properties.rs | 97 ++++++++----------- 3 files changed, 162 insertions(+), 108 deletions(-) diff --git a/parquet/src/arrow/schema.rs b/parquet/src/arrow/schema.rs index a65c7585327..5e574f39d57 100644 --- a/parquet/src/arrow/schema.rs +++ b/parquet/src/arrow/schema.rs @@ -152,7 +152,10 @@ pub(crate) fn add_encoded_arrow_schema_to_metadata( value: Some(encoded), }; - let mut meta = props.key_value_metadata.clone().unwrap_or_default(); + let meta = props + .key_value_metadata + .get_or_insert_with(|| Default::default()); + // check if ARROW:schema exists, and overwrite it let schema_meta = meta .iter() @@ -167,7 +170,6 @@ pub(crate) fn add_encoded_arrow_schema_to_metadata( meta.push(schema_kv); } } - props.key_value_metadata = Some(meta); } /// Convert arrow schema to parquet schema diff --git a/parquet/src/column/writer.rs b/parquet/src/column/writer.rs index d589aef5a50..b8c21733957 100644 --- a/parquet/src/column/writer.rs +++ b/parquet/src/column/writer.rs @@ -31,6 +31,7 @@ use crate::encodings::{ }; use crate::errors::{ParquetError, Result}; use crate::file::metadata::{ColumnIndexBuilder, OffsetIndexBuilder}; +use crate::file::properties::EnabledStatistics; use crate::file::statistics::Statistics; use crate::file::{ metadata::ColumnChunkMetaData, @@ -177,6 +178,8 @@ pub struct ColumnWriterImpl<'a, T: DataType> { // Column writer properties descr: ColumnDescPtr, props: WriterPropertiesPtr, + statistics_enabled: EnabledStatistics, + page_writer: Box, has_dictionary: bool, dict_encoder: Option>, @@ -243,9 +246,12 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { ) .unwrap(); + let statistics_enabled = props.statistics_enabled(descr.path()); + Self { descr, props, + statistics_enabled, page_writer, has_dictionary, dict_encoder, @@ -302,53 +308,71 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { // Find out the minimal length to prevent index out of bound errors. let mut min_len = values.len(); if let Some(levels) = def_levels { - min_len = cmp::min(min_len, levels.len()); + min_len = min_len.min(levels.len()); } if let Some(levels) = rep_levels { - min_len = cmp::min(min_len, levels.len()); + min_len = min_len.min(levels.len()); } // Find out number of batches to process. let write_batch_size = self.props.write_batch_size(); let num_batches = min_len / write_batch_size; - // Process pre-calculated statistics - match (min, max) { - (Some(min), Some(max)) => { - if self - .min_column_value - .as_ref() - .map_or(true, |v| self.compare_greater(v, min)) - { - self.min_column_value = Some(min.clone()); + if self.statistics_enabled == EnabledStatistics::Chunk { + match (min, max) { + (Some(min), Some(max)) => { + if self + .min_column_value + .as_ref() + .map_or(true, |v| self.compare_greater(v, min)) + { + self.min_column_value = Some(min.clone()); + } + + if self + .max_column_value + .as_ref() + .map_or(true, |v| self.compare_greater(max, v)) + { + self.max_column_value = Some(max.clone()); + } } - if self - .max_column_value - .as_ref() - .map_or(true, |v| self.compare_greater(max, v)) - { - self.max_column_value = Some(max.clone()); + (None, Some(_)) | (Some(_), None) => { + panic!("min/max should be both set or both None") } - } - (None, Some(_)) | (Some(_), None) => { - panic!("min/max should be both set or both None") - } - (None, None) => {} + (None, None) => { + for val in values { + if self + .min_column_value + .as_ref() + .map_or(true, |v| self.compare_greater(v, val)) + { + self.min_column_value = Some(val.clone()); + } + + if self + .max_column_value + .as_ref() + .map_or(true, |v| self.compare_greater(val, v)) + { + self.max_column_value = Some(val.clone()); + } + } + } + }; } - if let Some(distinct) = distinct_count { - self.column_distinct_count = - Some(self.column_distinct_count.unwrap_or(0) + distinct); + // We can only set the distinct count if there are no other writes + if self.num_buffered_values == 0 && self.num_page_nulls == 0 { + self.column_distinct_count = distinct_count; + } else { + self.column_distinct_count = None; } if let Some(nulls) = null_count { self.num_column_nulls += nulls; } - let calculate_page_stats = (min.is_none() || max.is_none()) - && null_count.is_none() - && distinct_count.is_none(); - let mut values_offset = 0; let mut levels_offset = 0; for _ in 0..num_batches { @@ -356,7 +380,6 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { &values[values_offset..values_offset + write_batch_size], def_levels.map(|lv| &lv[levels_offset..levels_offset + write_batch_size]), rep_levels.map(|lv| &lv[levels_offset..levels_offset + write_batch_size]), - calculate_page_stats, )?; levels_offset += write_batch_size; } @@ -365,7 +388,6 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { &values[values_offset..], def_levels.map(|lv| &lv[levels_offset..]), rep_levels.map(|lv| &lv[levels_offset..]), - calculate_page_stats, )?; // Return total number of values processed. @@ -393,9 +415,11 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { self.write_batch_internal(values, def_levels, rep_levels, None, None, None, None) } - /// Writer may optionally provide pre-calculated statistics for this batch, in which case we do - /// not calculate page level statistics as this will defeat the purpose of speeding up the write - /// process with pre-calculated statistics. + /// Writer may optionally provide pre-calculated statistics for this batch + /// + /// Note: Unless disabled using by using [`WriterProperties`] to set + /// enabled statistics to [`EnabledStatistics::Chunk`], this will still compute + /// per-page statistics ignoring the data passed pub fn write_batch_with_statistics( &mut self, values: &[T::T], @@ -466,7 +490,6 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { values: &[T::T], def_levels: Option<&[i16]>, rep_levels: Option<&[i16]>, - calculate_page_stats: bool, ) -> Result { let mut values_to_write = 0; @@ -494,7 +517,7 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { for &level in levels { if level == self.descr.max_def_level() { values_to_write += 1; - } else if calculate_page_stats { + } else if self.statistics_enabled == EnabledStatistics::Page { self.num_page_nulls += 1 } } @@ -537,7 +560,7 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { ) })?; - if calculate_page_stats { + if self.statistics_enabled == EnabledStatistics::Page { for val in values_to_write { self.update_page_min_max(val); } @@ -549,7 +572,7 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { self.num_buffered_encoded_values += u32::try_from(values_to_write.len()).unwrap(); if self.should_add_data_page() { - self.add_data_page(calculate_page_stats)?; + self.add_data_page()?; } if self.should_dict_fallback() { @@ -661,7 +684,7 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { /// Adds data page. /// Data page is either buffered in case of dictionary encoding or written directly. - fn add_data_page(&mut self, calculate_page_stat: bool) -> Result<()> { + fn add_data_page(&mut self) -> Result<()> { // Extract encoded values let value_bytes = match self.dict_encoder { Some(ref mut encoder) => encoder.write_indices()?, @@ -678,14 +701,14 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { let max_def_level = self.descr.max_def_level(); let max_rep_level = self.descr.max_rep_level(); - // always update column NULL count, no matter if page stats are used self.num_column_nulls += self.num_page_nulls; - let page_statistics = if calculate_page_stat { - self.update_column_min_max(); - Some(self.make_page_statistics()) - } else { - None + let page_statistics = match self.statistics_enabled { + EnabledStatistics::Page => { + self.update_column_min_max(); + Some(self.make_page_statistics()) + } + _ => None, }; // update column and offset index @@ -810,10 +833,8 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { #[inline] fn flush_data_pages(&mut self) -> Result<()> { // Write all outstanding data to a new page. - let calculate_page_stats = - self.min_page_value.is_some() && self.max_page_value.is_some(); if self.num_buffered_values > 0 { - self.add_data_page(calculate_page_stats)?; + self.add_data_page()?; } while let Some(page) = self.data_pages.pop_front() { @@ -1754,6 +1775,56 @@ mod tests { } } + #[test] + fn test_mixed_precomputed_statistics() { + let mut buf = Vec::with_capacity(100); + let mut write = TrackedWrite::new(&mut buf); + let page_writer = Box::new(SerializedPageWriter::new(&mut write)); + let props = Arc::new(WriterProperties::builder().build()); + let mut writer = get_test_column_writer::(page_writer, 0, 0, props); + + writer.write_batch(&[1, 2, 3, 4], None, None).unwrap(); + writer + .write_batch_with_statistics( + &[5, 6, 7], + None, + None, + Some(&5), + Some(&7), + Some(0), + Some(3), + ) + .unwrap(); + + let (_, _, metadata, _, _) = writer.close().unwrap(); + + let stats = metadata.statistics().unwrap(); + assert_eq!(stats.min_bytes(), 1_i32.to_le_bytes()); + assert_eq!(stats.max_bytes(), 7_i32.to_le_bytes()); + assert_eq!(stats.null_count(), 0); + assert!(stats.distinct_count().is_none()); + + let reader = SerializedPageReader::new( + std::io::Cursor::new(buf), + 7, + Compression::UNCOMPRESSED, + Type::INT32, + ) + .unwrap(); + + let pages = reader.collect::>>().unwrap(); + assert_eq!(pages.len(), 2); + + assert_eq!(pages[0].page_type(), PageType::DICTIONARY_PAGE); + assert_eq!(pages[1].page_type(), PageType::DATA_PAGE); + + let page_statistics = pages[1].statistics().unwrap(); + assert_eq!(page_statistics.min_bytes(), 1_i32.to_le_bytes()); + assert_eq!(page_statistics.max_bytes(), 7_i32.to_le_bytes()); + assert_eq!(page_statistics.null_count(), 0); + assert!(page_statistics.distinct_count().is_none()); + } + #[test] fn test_column_writer_empty_column_roundtrip() { let props = WriterProperties::builder().build(); diff --git a/parquet/src/file/properties.rs b/parquet/src/file/properties.rs index 2baf93933cb..9d195667bee 100644 --- a/parquet/src/file/properties.rs +++ b/parquet/src/file/properties.rs @@ -60,7 +60,7 @@ const DEFAULT_WRITER_VERSION: WriterVersion = WriterVersion::PARQUET_1_0; const DEFAULT_COMPRESSION: Compression = Compression::UNCOMPRESSED; const DEFAULT_DICTIONARY_ENABLED: bool = true; const DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT: usize = DEFAULT_PAGE_SIZE; -const DEFAULT_STATISTICS_ENABLED: bool = true; +const DEFAULT_STATISTICS_ENABLED: EnabledStatistics = EnabledStatistics::Page; const DEFAULT_MAX_STATISTICS_SIZE: usize = 4096; const DEFAULT_MAX_ROW_GROUP_SIZE: usize = 1024 * 1024; const DEFAULT_CREATED_BY: &str = env!("PARQUET_CREATED_BY"); @@ -198,23 +198,13 @@ impl WriterProperties { } /// Returns `true` if statistics are enabled for a column. - pub fn statistics_enabled(&self, col: &ColumnPath) -> bool { + pub fn statistics_enabled(&self, col: &ColumnPath) -> EnabledStatistics { self.column_properties .get(col) .and_then(|c| c.statistics_enabled()) .or_else(|| self.default_column_properties.statistics_enabled()) .unwrap_or(DEFAULT_STATISTICS_ENABLED) } - - /// Returns max size for statistics. - /// Only applicable if statistics are enabled. - pub fn max_statistics_size(&self, col: &ColumnPath) -> usize { - self.column_properties - .get(col) - .and_then(|c| c.max_statistics_size()) - .or_else(|| self.default_column_properties.max_statistics_size()) - .unwrap_or(DEFAULT_MAX_STATISTICS_SIZE) - } } /// Writer properties builder. @@ -339,19 +329,11 @@ impl WriterPropertiesBuilder { } /// Sets flag to enable/disable statistics for any column. - pub fn set_statistics_enabled(mut self, value: bool) -> Self { + pub fn set_statistics_enabled(mut self, value: EnabledStatistics) -> Self { self.default_column_properties.set_statistics_enabled(value); self } - /// Sets max statistics size for any column. - /// Applicable only if statistics are enabled. - pub fn set_max_statistics_size(mut self, value: usize) -> Self { - self.default_column_properties - .set_max_statistics_size(value); - self - } - // ---------------------------------------------------------------------- // Setters for a specific column @@ -394,23 +376,33 @@ impl WriterPropertiesBuilder { /// Sets flag to enable/disable statistics for a column. /// Takes precedence over globally defined settings. - pub fn set_column_statistics_enabled(mut self, col: ColumnPath, value: bool) -> Self { - self.get_mut_props(col).set_statistics_enabled(value); - self - } - - /// Sets max size for statistics for a column. - /// Takes precedence over globally defined settings. - pub fn set_column_max_statistics_size( + pub fn set_column_statistics_enabled( mut self, col: ColumnPath, - value: usize, + value: EnabledStatistics, ) -> Self { - self.get_mut_props(col).set_max_statistics_size(value); + self.get_mut_props(col).set_statistics_enabled(value); self } } +/// Controls the level of statistics to be computed by the writer +#[derive(Debug, Clone, Copy, Eq, PartialEq)] +pub enum EnabledStatistics { + /// Compute no statistics + None, + /// Compute chunk-level statistics but not page-level + Chunk, + /// Compute page-level statistics + Page, +} + +impl Default for EnabledStatistics { + fn default() -> Self { + DEFAULT_STATISTICS_ENABLED + } +} + /// Container for column properties that can be changed as part of writer. /// /// If a field is `None`, it means that no specific value has been set for this column, @@ -420,8 +412,7 @@ struct ColumnProperties { encoding: Option, codec: Option, dictionary_enabled: Option, - statistics_enabled: Option, - max_statistics_size: Option, + statistics_enabled: Option, } impl ColumnProperties { @@ -432,7 +423,6 @@ impl ColumnProperties { codec: None, dictionary_enabled: None, statistics_enabled: None, - max_statistics_size: None, } } @@ -463,15 +453,10 @@ impl ColumnProperties { } /// Sets whether or not statistics are enabled for this column. - fn set_statistics_enabled(&mut self, enabled: bool) { + fn set_statistics_enabled(&mut self, enabled: EnabledStatistics) { self.statistics_enabled = Some(enabled); } - /// Sets max size for statistics for this column. - fn set_max_statistics_size(&mut self, value: usize) { - self.max_statistics_size = Some(value); - } - /// Returns optional encoding for this column. fn encoding(&self) -> Option { self.encoding @@ -491,14 +476,9 @@ impl ColumnProperties { /// Returns `Some(true)` if statistics are enabled for this column, if disabled then /// returns `Some(false)`. If result is `None`, then no setting has been provided. - fn statistics_enabled(&self) -> Option { + fn statistics_enabled(&self) -> Option { self.statistics_enabled } - - /// Returns optional max size in bytes for statistics. - fn max_statistics_size(&self) -> Option { - self.max_statistics_size - } } #[cfg(test)] @@ -537,10 +517,6 @@ mod tests { props.statistics_enabled(&ColumnPath::from("col")), DEFAULT_STATISTICS_ENABLED ); - assert_eq!( - props.max_statistics_size(&ColumnPath::from("col")), - DEFAULT_MAX_STATISTICS_SIZE - ); } #[test] @@ -613,14 +589,15 @@ mod tests { .set_encoding(Encoding::DELTA_BINARY_PACKED) .set_compression(Compression::GZIP) .set_dictionary_enabled(false) - .set_statistics_enabled(false) - .set_max_statistics_size(50) + .set_statistics_enabled(EnabledStatistics::None) // specific column settings .set_column_encoding(ColumnPath::from("col"), Encoding::RLE) .set_column_compression(ColumnPath::from("col"), Compression::SNAPPY) .set_column_dictionary_enabled(ColumnPath::from("col"), true) - .set_column_statistics_enabled(ColumnPath::from("col"), true) - .set_column_max_statistics_size(ColumnPath::from("col"), 123) + .set_column_statistics_enabled( + ColumnPath::from("col"), + EnabledStatistics::Chunk, + ) .build(); assert_eq!(props.writer_version(), WriterVersion::PARQUET_2_0); @@ -642,8 +619,10 @@ mod tests { ); assert_eq!(props.compression(&ColumnPath::from("a")), Compression::GZIP); assert!(!props.dictionary_enabled(&ColumnPath::from("a"))); - assert!(!props.statistics_enabled(&ColumnPath::from("a"))); - assert_eq!(props.max_statistics_size(&ColumnPath::from("a")), 50); + assert_eq!( + props.statistics_enabled(&ColumnPath::from("a")), + EnabledStatistics::None + ); assert_eq!( props.encoding(&ColumnPath::from("col")), @@ -654,8 +633,10 @@ mod tests { Compression::SNAPPY ); assert!(props.dictionary_enabled(&ColumnPath::from("col"))); - assert!(props.statistics_enabled(&ColumnPath::from("col"))); - assert_eq!(props.max_statistics_size(&ColumnPath::from("col")), 123); + assert_eq!( + props.statistics_enabled(&ColumnPath::from("col")), + EnabledStatistics::Chunk + ); } #[test]