Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix several bugs in parquet writer statistics generation, add EnabledStatistics to control level of statistics generated #2022

Merged
merged 5 commits into from Jul 8, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 4 additions & 2 deletions parquet/src/arrow/schema.rs
Expand Up @@ -152,7 +152,10 @@ pub(crate) fn add_encoded_arrow_schema_to_metadata(
value: Some(encoded),
};

let mut meta = props.key_value_metadata.clone().unwrap_or_default();
let meta = props
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this change have any effect (other than to make the code more readable)?

Copy link
Contributor Author

@tustvold tustvold Jul 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no user-facing change, but it does avoid an unnecessary clone

.key_value_metadata
.get_or_insert_with(|| Default::default());

// check if ARROW:schema exists, and overwrite it
let schema_meta = meta
.iter()
Expand All @@ -167,7 +170,6 @@ pub(crate) fn add_encoded_arrow_schema_to_metadata(
meta.push(schema_kv);
}
}
props.key_value_metadata = Some(meta);
}

/// Convert arrow schema to parquet schema
Expand Down
181 changes: 132 additions & 49 deletions parquet/src/column/writer.rs
Expand Up @@ -31,6 +31,7 @@ use crate::encodings::{
};
use crate::errors::{ParquetError, Result};
use crate::file::metadata::{ColumnIndexBuilder, OffsetIndexBuilder};
use crate::file::properties::EnabledStatistics;
use crate::file::statistics::Statistics;
use crate::file::{
metadata::ColumnChunkMetaData,
Expand Down Expand Up @@ -177,6 +178,8 @@ pub struct ColumnWriterImpl<'a, T: DataType> {
// Column writer properties
descr: ColumnDescPtr,
props: WriterPropertiesPtr,
statistics_enabled: EnabledStatistics,

page_writer: Box<dyn PageWriter + 'a>,
has_dictionary: bool,
dict_encoder: Option<DictEncoder<T>>,
Expand Down Expand Up @@ -243,9 +246,12 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
)
.unwrap();

let statistics_enabled = props.statistics_enabled(descr.path());

Self {
descr,
props,
statistics_enabled,
page_writer,
has_dictionary,
dict_encoder,
Expand Down Expand Up @@ -302,61 +308,85 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
// Find out the minimal length to prevent index out of bound errors.
let mut min_len = values.len();
if let Some(levels) = def_levels {
min_len = cmp::min(min_len, levels.len());
min_len = min_len.min(levels.len());
}
if let Some(levels) = rep_levels {
min_len = cmp::min(min_len, levels.len());
min_len = min_len.min(levels.len());
}

// Find out number of batches to process.
let write_batch_size = self.props.write_batch_size();
let num_batches = min_len / write_batch_size;

// Process pre-calculated statistics
match (min, max) {
(Some(min), Some(max)) => {
if self
.min_column_value
.as_ref()
.map_or(true, |v| self.compare_greater(v, min))
{
self.min_column_value = Some(min.clone());
if self.statistics_enabled == EnabledStatistics::Chunk {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would personally find match self.statistics_enabled ... to be easier to validate that other variants of EnabledStatistics::Chunk did not apply.

Or perhaps a function EnabledStatistics::compute_chunk_statistics().

Basically I am thinking about "what if someone adds a new variant" -- it would be nice if the compiler told them where to update

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact, no that I re-read this, shouldn't this also be set if we are computing page level statistics?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add a comment explaining why we only care if it is exactly this variant, any other variants would need to be computed at a different level

match (min, max) {
(Some(min), Some(max)) => {
if self
.min_column_value
.as_ref()
.map_or(true, |v| self.compare_greater(v, min))
{
self.min_column_value = Some(min.clone());
}

if self
.max_column_value
.as_ref()
.map_or(true, |v| self.compare_greater(max, v))
{
self.max_column_value = Some(max.clone());
}
}
if self
.max_column_value
.as_ref()
.map_or(true, |v| self.compare_greater(max, v))
{
self.max_column_value = Some(max.clone());
(None, Some(_)) | (Some(_), None) => {
panic!("min/max should be both set or both None")
}
}
(None, Some(_)) | (Some(_), None) => {
panic!("min/max should be both set or both None")
}
(None, None) => {}
(None, None) => {
for val in values {
if let Type::FLOAT | Type::DOUBLE = T::get_physical_type() {
// Skip NaN values
if val != val {
continue;
}
}

if self
.min_column_value
.as_ref()
.map_or(true, |v| self.compare_greater(v, val))
{
self.min_column_value = Some(val.clone());
}

if self
.max_column_value
.as_ref()
.map_or(true, |v| self.compare_greater(val, v))
{
self.max_column_value = Some(val.clone());
}
}
}
};
}

if let Some(distinct) = distinct_count {
self.column_distinct_count =
Some(self.column_distinct_count.unwrap_or(0) + distinct);
// We can only set the distinct count if there are no other writes
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the fix for #2016

if self.num_buffered_values == 0 && self.num_page_nulls == 0 {
self.column_distinct_count = distinct_count;
} else {
self.column_distinct_count = None;
}

if let Some(nulls) = null_count {
self.num_column_nulls += nulls;
}

let calculate_page_stats = (min.is_none() || max.is_none())
&& null_count.is_none()
&& distinct_count.is_none();

let mut values_offset = 0;
let mut levels_offset = 0;
for _ in 0..num_batches {
values_offset += self.write_mini_batch(
&values[values_offset..values_offset + write_batch_size],
def_levels.map(|lv| &lv[levels_offset..levels_offset + write_batch_size]),
rep_levels.map(|lv| &lv[levels_offset..levels_offset + write_batch_size]),
calculate_page_stats,
)?;
levels_offset += write_batch_size;
}
Expand All @@ -365,7 +395,6 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
&values[values_offset..],
def_levels.map(|lv| &lv[levels_offset..]),
rep_levels.map(|lv| &lv[levels_offset..]),
calculate_page_stats,
)?;

// Return total number of values processed.
Expand Down Expand Up @@ -393,9 +422,11 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
self.write_batch_internal(values, def_levels, rep_levels, None, None, None, None)
}

/// Writer may optionally provide pre-calculated statistics for this batch, in which case we do
/// not calculate page level statistics as this will defeat the purpose of speeding up the write
/// process with pre-calculated statistics.
/// Writer may optionally provide pre-calculated statistics for this batch
///
/// Note: Unless disabled using by using [`WriterProperties`] to set
/// enabled statistics to [`EnabledStatistics::Chunk`], this will still compute
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment almost reads the opposite of what I think the code is doing (using the passed in pre-calculated statistics or computing them if passed in)

/// per-page statistics ignoring the data passed
pub fn write_batch_with_statistics(
&mut self,
values: &[T::T],
Expand Down Expand Up @@ -466,7 +497,6 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
values: &[T::T],
def_levels: Option<&[i16]>,
rep_levels: Option<&[i16]>,
calculate_page_stats: bool,
) -> Result<usize> {
let mut values_to_write = 0;

Expand Down Expand Up @@ -494,7 +524,7 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
for &level in levels {
if level == self.descr.max_def_level() {
values_to_write += 1;
} else if calculate_page_stats {
} else if self.statistics_enabled == EnabledStatistics::Page {
self.num_page_nulls += 1
}
}
Expand Down Expand Up @@ -537,7 +567,7 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
)
})?;

if calculate_page_stats {
if self.statistics_enabled == EnabledStatistics::Page {
for val in values_to_write {
self.update_page_min_max(val);
}
Expand All @@ -549,7 +579,7 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
self.num_buffered_encoded_values += u32::try_from(values_to_write.len()).unwrap();

if self.should_add_data_page() {
self.add_data_page(calculate_page_stats)?;
self.add_data_page()?;
}

if self.should_dict_fallback() {
Expand Down Expand Up @@ -661,7 +691,7 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {

/// Adds data page.
/// Data page is either buffered in case of dictionary encoding or written directly.
fn add_data_page(&mut self, calculate_page_stat: bool) -> Result<()> {
fn add_data_page(&mut self) -> Result<()> {
// Extract encoded values
let value_bytes = match self.dict_encoder {
Some(ref mut encoder) => encoder.write_indices()?,
Expand All @@ -678,14 +708,15 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
let max_def_level = self.descr.max_def_level();
let max_rep_level = self.descr.max_rep_level();

// always update column NULL count, no matter if page stats are used
self.num_column_nulls += self.num_page_nulls;

let page_statistics = if calculate_page_stat {
self.update_column_min_max();
Some(self.make_page_statistics())
} else {
None
let has_min_max = self.min_page_value.is_some() && self.max_page_value.is_some();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic is moved from flush_data_pages

let page_statistics = match self.statistics_enabled {
EnabledStatistics::Page if has_min_max => {
self.update_column_min_max();
Some(self.make_page_statistics())
}
_ => None,
};

// update column and offset index
Expand Down Expand Up @@ -810,10 +841,8 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
#[inline]
fn flush_data_pages(&mut self) -> Result<()> {
// Write all outstanding data to a new page.
let calculate_page_stats =
self.min_page_value.is_some() && self.max_page_value.is_some();
if self.num_buffered_values > 0 {
self.add_data_page(calculate_page_stats)?;
self.add_data_page()?;
}

while let Some(page) = self.data_pages.pop_front() {
Expand Down Expand Up @@ -1713,7 +1742,11 @@ mod tests {
#[test]
fn test_column_writer_precalculated_statistics() {
let page_writer = get_test_page_writer();
let props = Arc::new(WriterProperties::builder().build());
let props = Arc::new(
WriterProperties::builder()
.set_statistics_enabled(EnabledStatistics::Chunk)
.build(),
);
let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);
writer
.write_batch_with_statistics(
Expand Down Expand Up @@ -1754,6 +1787,56 @@ mod tests {
}
}

#[test]
fn test_mixed_precomputed_statistics() {
let mut buf = Vec::with_capacity(100);
let mut write = TrackedWrite::new(&mut buf);
let page_writer = Box::new(SerializedPageWriter::new(&mut write));
let props = Arc::new(WriterProperties::builder().build());
let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props);

writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
writer
.write_batch_with_statistics(
&[5, 6, 7],
None,
None,
Some(&5),
Some(&7),
Some(0),
Some(3),
)
.unwrap();

let (_, _, metadata, _, _) = writer.close().unwrap();

let stats = metadata.statistics().unwrap();
assert_eq!(stats.min_bytes(), 1_i32.to_le_bytes());
assert_eq!(stats.max_bytes(), 7_i32.to_le_bytes());
assert_eq!(stats.null_count(), 0);
assert!(stats.distinct_count().is_none());

let reader = SerializedPageReader::new(
std::io::Cursor::new(buf),
7,
Compression::UNCOMPRESSED,
Type::INT32,
)
.unwrap();

let pages = reader.collect::<Result<Vec<_>>>().unwrap();
assert_eq!(pages.len(), 2);

assert_eq!(pages[0].page_type(), PageType::DICTIONARY_PAGE);
assert_eq!(pages[1].page_type(), PageType::DATA_PAGE);

let page_statistics = pages[1].statistics().unwrap();
assert_eq!(page_statistics.min_bytes(), 1_i32.to_le_bytes());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

assert_eq!(page_statistics.max_bytes(), 7_i32.to_le_bytes());
assert_eq!(page_statistics.null_count(), 0);
assert!(page_statistics.distinct_count().is_none());
}

#[test]
fn test_column_writer_empty_column_roundtrip() {
let props = WriterProperties::builder().build();
Expand Down