Skip to content

Commit

Permalink
Fix parquet writer statistics
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed Jul 7, 2022
1 parent 38370cb commit 06cfa35
Show file tree
Hide file tree
Showing 3 changed files with 162 additions and 108 deletions.
6 changes: 4 additions & 2 deletions parquet/src/arrow/schema.rs
Expand Up @@ -152,7 +152,10 @@ pub(crate) fn add_encoded_arrow_schema_to_metadata(
value: Some(encoded),
};

let mut meta = props.key_value_metadata.clone().unwrap_or_default();
let meta = props
.key_value_metadata
.get_or_insert_with(|| Default::default());

// check if ARROW:schema exists, and overwrite it
let schema_meta = meta
.iter()
Expand All @@ -167,7 +170,6 @@ pub(crate) fn add_encoded_arrow_schema_to_metadata(
meta.push(schema_kv);
}
}
props.key_value_metadata = Some(meta);
}

/// Convert arrow schema to parquet schema
Expand Down
167 changes: 119 additions & 48 deletions parquet/src/column/writer.rs
Expand Up @@ -31,6 +31,7 @@ use crate::encodings::{
};
use crate::errors::{ParquetError, Result};
use crate::file::metadata::{ColumnIndexBuilder, OffsetIndexBuilder};
use crate::file::properties::EnabledStatistics;
use crate::file::statistics::Statistics;
use crate::file::{
metadata::ColumnChunkMetaData,
Expand Down Expand Up @@ -177,6 +178,8 @@ pub struct ColumnWriterImpl<'a, T: DataType> {
// Column writer properties
descr: ColumnDescPtr,
props: WriterPropertiesPtr,
statistics_enabled: EnabledStatistics,

page_writer: Box<dyn PageWriter + 'a>,
has_dictionary: bool,
dict_encoder: Option<DictEncoder<T>>,
Expand Down Expand Up @@ -243,9 +246,12 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
)
.unwrap();

let statistics_enabled = props.statistics_enabled(descr.path());

Self {
descr,
props,
statistics_enabled,
page_writer,
has_dictionary,
dict_encoder,
Expand Down Expand Up @@ -302,61 +308,78 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
// Find out the minimal length to prevent index out of bound errors.
let mut min_len = values.len();
if let Some(levels) = def_levels {
min_len = cmp::min(min_len, levels.len());
min_len = min_len.min(levels.len());
}
if let Some(levels) = rep_levels {
min_len = cmp::min(min_len, levels.len());
min_len = min_len.min(levels.len());
}

// Find out number of batches to process.
let write_batch_size = self.props.write_batch_size();
let num_batches = min_len / write_batch_size;

// Process pre-calculated statistics
match (min, max) {
(Some(min), Some(max)) => {
if self
.min_column_value
.as_ref()
.map_or(true, |v| self.compare_greater(v, min))
{
self.min_column_value = Some(min.clone());
if self.statistics_enabled == EnabledStatistics::Chunk {
match (min, max) {
(Some(min), Some(max)) => {
if self
.min_column_value
.as_ref()
.map_or(true, |v| self.compare_greater(v, min))
{
self.min_column_value = Some(min.clone());
}

if self
.max_column_value
.as_ref()
.map_or(true, |v| self.compare_greater(max, v))
{
self.max_column_value = Some(max.clone());
}
}
if self
.max_column_value
.as_ref()
.map_or(true, |v| self.compare_greater(max, v))
{
self.max_column_value = Some(max.clone());
(None, Some(_)) | (Some(_), None) => {
panic!("min/max should be both set or both None")
}
}
(None, Some(_)) | (Some(_), None) => {
panic!("min/max should be both set or both None")
}
(None, None) => {}
(None, None) => {
for val in values {
if self
.min_column_value
.as_ref()
.map_or(true, |v| self.compare_greater(v, val))
{
self.min_column_value = Some(val.clone());
}

if self
.max_column_value
.as_ref()
.map_or(true, |v| self.compare_greater(val, v))
{
self.max_column_value = Some(val.clone());
}
}
}
};
}

if let Some(distinct) = distinct_count {
self.column_distinct_count =
Some(self.column_distinct_count.unwrap_or(0) + distinct);
// We can only set the distinct count if there are no other writes
if self.num_buffered_values == 0 && self.num_page_nulls == 0 {
self.column_distinct_count = distinct_count;
} else {
self.column_distinct_count = None;
}

if let Some(nulls) = null_count {
self.num_column_nulls += nulls;
}

let calculate_page_stats = (min.is_none() || max.is_none())
&& null_count.is_none()
&& distinct_count.is_none();

let mut values_offset = 0;
let mut levels_offset = 0;
for _ in 0..num_batches {
values_offset += self.write_mini_batch(
&values[values_offset..values_offset + write_batch_size],
def_levels.map(|lv| &lv[levels_offset..levels_offset + write_batch_size]),
rep_levels.map(|lv| &lv[levels_offset..levels_offset + write_batch_size]),
calculate_page_stats,
)?;
levels_offset += write_batch_size;
}
Expand All @@ -365,7 +388,6 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
&values[values_offset..],
def_levels.map(|lv| &lv[levels_offset..]),
rep_levels.map(|lv| &lv[levels_offset..]),
calculate_page_stats,
)?;

// Return total number of values processed.
Expand Down Expand Up @@ -393,9 +415,11 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
self.write_batch_internal(values, def_levels, rep_levels, None, None, None, None)
}

/// Writer may optionally provide pre-calculated statistics for this batch, in which case we do
/// not calculate page level statistics as this will defeat the purpose of speeding up the write
/// process with pre-calculated statistics.
/// Writer may optionally provide pre-calculated statistics for this batch
///
/// Note: Unless disabled using by using [`WriterProperties`] to set
/// enabled statistics to [`EnabledStatistics::Chunk`], this will still compute
/// per-page statistics ignoring the data passed
pub fn write_batch_with_statistics(
&mut self,
values: &[T::T],
Expand Down Expand Up @@ -466,7 +490,6 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
values: &[T::T],
def_levels: Option<&[i16]>,
rep_levels: Option<&[i16]>,
calculate_page_stats: bool,
) -> Result<usize> {
let mut values_to_write = 0;

Expand Down Expand Up @@ -494,7 +517,7 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
for &level in levels {
if level == self.descr.max_def_level() {
values_to_write += 1;
} else if calculate_page_stats {
} else if self.statistics_enabled == EnabledStatistics::Page {
self.num_page_nulls += 1
}
}
Expand Down Expand Up @@ -537,7 +560,7 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
)
})?;

if calculate_page_stats {
if self.statistics_enabled == EnabledStatistics::Page {
for val in values_to_write {
self.update_page_min_max(val);
}
Expand All @@ -549,7 +572,7 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
self.num_buffered_encoded_values += u32::try_from(values_to_write.len()).unwrap();

if self.should_add_data_page() {
self.add_data_page(calculate_page_stats)?;
self.add_data_page()?;
}

if self.should_dict_fallback() {
Expand Down Expand Up @@ -661,7 +684,7 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {

/// Adds data page.
/// Data page is either buffered in case of dictionary encoding or written directly.
fn add_data_page(&mut self, calculate_page_stat: bool) -> Result<()> {
fn add_data_page(&mut self) -> Result<()> {
// Extract encoded values
let value_bytes = match self.dict_encoder {
Some(ref mut encoder) => encoder.write_indices()?,
Expand All @@ -678,14 +701,14 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
let max_def_level = self.descr.max_def_level();
let max_rep_level = self.descr.max_rep_level();

// always update column NULL count, no matter if page stats are used
self.num_column_nulls += self.num_page_nulls;

let page_statistics = if calculate_page_stat {
self.update_column_min_max();
Some(self.make_page_statistics())
} else {
None
let page_statistics = match self.statistics_enabled {
EnabledStatistics::Page => {
self.update_column_min_max();
Some(self.make_page_statistics())
}
_ => None,
};

// update column and offset index
Expand Down Expand Up @@ -810,10 +833,8 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
#[inline]
fn flush_data_pages(&mut self) -> Result<()> {
// Write all outstanding data to a new page.
let calculate_page_stats =
self.min_page_value.is_some() && self.max_page_value.is_some();
if self.num_buffered_values > 0 {
self.add_data_page(calculate_page_stats)?;
self.add_data_page()?;
}

while let Some(page) = self.data_pages.pop_front() {
Expand Down Expand Up @@ -1754,6 +1775,56 @@ mod tests {
}
}

#[test]
fn test_mixed_precomputed_statistics() {
let mut buf = Vec::with_capacity(100);
let mut write = TrackedWrite::new(&mut buf);
let page_writer = Box::new(SerializedPageWriter::new(&mut write));
let props = Arc::new(WriterProperties::builder().build());
let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);

writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
writer
.write_batch_with_statistics(
&[5, 6, 7],
None,
None,
Some(&5),
Some(&7),
Some(0),
Some(3),
)
.unwrap();

let (_, _, metadata, _, _) = writer.close().unwrap();

let stats = metadata.statistics().unwrap();
assert_eq!(stats.min_bytes(), 1_i32.to_le_bytes());
assert_eq!(stats.max_bytes(), 7_i32.to_le_bytes());
assert_eq!(stats.null_count(), 0);
assert!(stats.distinct_count().is_none());

let reader = SerializedPageReader::new(
std::io::Cursor::new(buf),
7,
Compression::UNCOMPRESSED,
Type::INT32,
)
.unwrap();

let pages = reader.collect::<Result<Vec<_>>>().unwrap();
assert_eq!(pages.len(), 2);

assert_eq!(pages[0].page_type(), PageType::DICTIONARY_PAGE);
assert_eq!(pages[1].page_type(), PageType::DATA_PAGE);

let page_statistics = pages[1].statistics().unwrap();
assert_eq!(page_statistics.min_bytes(), 1_i32.to_le_bytes());
assert_eq!(page_statistics.max_bytes(), 7_i32.to_le_bytes());
assert_eq!(page_statistics.null_count(), 0);
assert!(page_statistics.distinct_count().is_none());
}

#[test]
fn test_column_writer_empty_column_roundtrip() {
let props = WriterProperties::builder().build();
Expand Down

0 comments on commit 06cfa35

Please sign in to comment.