Skip to content

Commit

Permalink
Fix disabling parquet statistics (apache#2185)
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed Jul 27, 2022
1 parent d10d962 commit 9329e0c
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 19 deletions.
23 changes: 21 additions & 2 deletions parquet/src/arrow/arrow_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ mod tests {
FixedLenByteArrayType, Int32Type, Int64Type,
};
use crate::errors::Result;
use crate::file::properties::{WriterProperties, WriterVersion};
use crate::file::properties::{EnabledStatistics, WriterProperties, WriterVersion};
use crate::file::reader::{FileReader, SerializedFileReader};
use crate::file::writer::SerializedFileWriter;
use crate::schema::parser::parse_message_type;
Expand Down Expand Up @@ -787,6 +787,8 @@ mod tests {
max_dict_page_size: usize,
/// Writer version
writer_version: WriterVersion,
/// Enabled statistics
enabled_statistics: EnabledStatistics,
/// Encoding
encoding: Encoding,
}
Expand All @@ -802,6 +804,7 @@ mod tests {
max_data_page_size: 1024 * 1024,
max_dict_page_size: 1024 * 1024,
writer_version: WriterVersion::PARQUET_1_0,
enabled_statistics: EnabledStatistics::Page,
encoding: Encoding::PLAIN,
}
}
Expand Down Expand Up @@ -838,11 +841,19 @@ mod tests {
}
}

fn with_enabled_statistics(self, enabled_statistics: EnabledStatistics) -> Self {
Self {
enabled_statistics,
..self
}
}

fn writer_props(&self) -> WriterProperties {
let builder = WriterProperties::builder()
.set_data_pagesize_limit(self.max_data_page_size)
.set_write_batch_size(self.write_batch_size)
.set_writer_version(self.writer_version);
.set_writer_version(self.writer_version)
.set_statistics_enabled(self.enabled_statistics);

let builder = match self.encoding {
Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => builder
Expand Down Expand Up @@ -896,6 +907,14 @@ mod tests {
TestOptions::new(2, 256, 127).with_null_percent(0),
// Test optional with nulls
TestOptions::new(2, 256, 93).with_null_percent(25),
// Test with no page-level statistics
TestOptions::new(2, 256, 91)
.with_null_percent(25)
.with_enabled_statistics(EnabledStatistics::Chunk),
// Test with no statistics
TestOptions::new(2, 256, 91)
.with_null_percent(25)
.with_enabled_statistics(EnabledStatistics::None),
];

all_options.into_iter().for_each(|opts| {
Expand Down
14 changes: 10 additions & 4 deletions parquet/src/column/writer/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::data_type::private::ParquetValueType;
use crate::data_type::DataType;
use crate::encodings::encoding::{get_encoder, DictEncoder, Encoder};
use crate::errors::{ParquetError, Result};
use crate::file::properties::WriterProperties;
use crate::file::properties::{EnabledStatistics, WriterProperties};
use crate::schema::types::{ColumnDescPtr, ColumnDescriptor};
use crate::util::memory::ByteBufferPtr;

Expand Down Expand Up @@ -105,6 +105,7 @@ pub struct ColumnValueEncoderImpl<T: DataType> {
dict_encoder: Option<DictEncoder<T>>,
descr: ColumnDescPtr,
num_values: usize,
statistics_enabled: EnabledStatistics,
min_value: Option<T::T>,
max_value: Option<T::T>,
}
Expand All @@ -127,11 +128,14 @@ impl<T: DataType> ColumnValueEncoder for ColumnValueEncoderImpl<T> {
.unwrap_or_else(|| fallback_encoding(T::get_physical_type(), props)),
)?;

let statistics_enabled = props.statistics_enabled(descr.path());

Ok(Self {
encoder,
dict_encoder,
descr: descr.clone(),
num_values: 0,
statistics_enabled,
min_value: None,
max_value: None,
})
Expand All @@ -148,9 +152,11 @@ impl<T: DataType> ColumnValueEncoder for ColumnValueEncoderImpl<T> {
)
})?;

if let Some((min, max)) = slice.min_max(&self.descr) {
update_min(&self.descr, min, &mut self.min_value);
update_max(&self.descr, max, &mut self.max_value);
if self.statistics_enabled == EnabledStatistics::Page {
if let Some((min, max)) = slice.min_max(&self.descr) {
update_min(&self.descr, min, &mut self.min_value);
update_max(&self.descr, max, &mut self.max_value);
}
}

match &mut self.dict_encoder {
Expand Down
79 changes: 66 additions & 13 deletions parquet/src/column/writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,8 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
for &level in levels {
if level == self.descr.max_def_level() {
values_to_write += 1;
} else if self.statistics_enabled == EnabledStatistics::Page {
} else {
// We must always compute this as it is used to populate v2 pages
self.num_page_nulls += 1
}
}
Expand Down Expand Up @@ -746,26 +747,28 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
None => data_page_offset + total_compressed_size,
};

let statistics = Statistics::new(
self.min_column_value.clone(),
self.max_column_value.clone(),
self.column_distinct_count,
self.num_column_nulls,
false,
);

let metadata = ColumnChunkMetaData::builder(self.descr.clone())
let mut builder = ColumnChunkMetaData::builder(self.descr.clone())
.set_compression(self.codec)
.set_encodings(self.encodings.iter().cloned().collect())
.set_file_offset(file_offset)
.set_total_compressed_size(total_compressed_size)
.set_total_uncompressed_size(total_uncompressed_size)
.set_num_values(num_values)
.set_data_page_offset(data_page_offset)
.set_dictionary_page_offset(dict_page_offset)
.set_statistics(statistics)
.build()?;
.set_dictionary_page_offset(dict_page_offset);

if self.statistics_enabled != EnabledStatistics::None {
let statistics = Statistics::new(
self.min_column_value.clone(),
self.max_column_value.clone(),
self.column_distinct_count,
self.num_column_nulls,
false,
);
builder = builder.set_statistics(statistics);
}

let metadata = builder.build()?;
self.page_writer.write_metadata(&metadata)?;

Ok(metadata)
Expand Down Expand Up @@ -1639,6 +1642,56 @@ mod tests {
assert!(page_statistics.distinct_count().is_none());
}

#[test]
fn test_disabled_statistics() {
let mut buf = Vec::with_capacity(100);
let mut write = TrackedWrite::new(&mut buf);
let page_writer = Box::new(SerializedPageWriter::new(&mut write));
let props = WriterProperties::builder()
.set_statistics_enabled(EnabledStatistics::None)
.set_writer_version(WriterVersion::PARQUET_2_0)
.build();
let props = Arc::new(props);

let mut writer = get_test_column_writer::<Int32Type>(page_writer, 1, 0, props);
writer
.write_batch(&[1, 2, 3, 4], Some(&[1, 0, 0, 1, 1, 1]), None)
.unwrap();

let (_, _, metadata, _, _) = writer.close().unwrap();
assert!(metadata.statistics().is_none());

let reader = SerializedPageReader::new(
std::io::Cursor::new(buf),
6,
Compression::UNCOMPRESSED,
Type::INT32,
)
.unwrap();

let pages = reader.collect::<Result<Vec<_>>>().unwrap();
assert_eq!(pages.len(), 2);

assert_eq!(pages[0].page_type(), PageType::DICTIONARY_PAGE);
assert_eq!(pages[1].page_type(), PageType::DATA_PAGE_V2);

match &pages[1] {
Page::DataPageV2 {
num_values,
num_nulls,
num_rows,
statistics,
..
} => {
assert_eq!(*num_values, 6);
assert_eq!(*num_nulls, 2);
assert_eq!(*num_rows, 6);
assert!(statistics.is_none());
}
_ => unreachable!(),
}
}

#[test]
fn test_column_writer_empty_column_roundtrip() {
let props = WriterProperties::builder().build();
Expand Down

0 comments on commit 9329e0c

Please sign in to comment.