From 880c4d98e6d570db701fb013f3abf5e5e6f42e32 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> Date: Fri, 28 Oct 2022 08:38:48 +1300 Subject: [PATCH] Add optional page row count limit for parquet `WriterProperties` (#2941) (#2942) * Add page row count limit (#2941) * Apply suggestions from code review Co-authored-by: Andrew Lamb Co-authored-by: Andrew Lamb --- parquet/src/column/writer/mod.rs | 6 ++-- parquet/src/file/properties.rs | 43 ++++++++++++++++++++++++++-- parquet/tests/arrow_writer_layout.rs | 30 ++++++++++++++++++- 3 files changed, 73 insertions(+), 6 deletions(-) diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs index 0f96b6fd78e..f9b429f5bc7 100644 --- a/parquet/src/column/writer/mod.rs +++ b/parquet/src/column/writer/mod.rs @@ -569,11 +569,13 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { // // In such a scenario the dictionary decoder may return an estimated encoded // size in excess of the page size limit, even when there are no buffered values - if self.encoder.num_values() == 0 { + if self.page_metrics.num_buffered_values == 0 { return false; } - self.encoder.estimated_data_page_size() >= self.props.data_pagesize_limit() + self.page_metrics.num_buffered_rows as usize + >= self.props.data_page_row_count_limit() + || self.encoder.estimated_data_page_size() >= self.props.data_pagesize_limit() } /// Performs dictionary fallback. diff --git a/parquet/src/file/properties.rs b/parquet/src/file/properties.rs index 57dae323d89..11fb13b4bd6 100644 --- a/parquet/src/file/properties.rs +++ b/parquet/src/file/properties.rs @@ -96,6 +96,7 @@ pub type WriterPropertiesPtr = Arc; pub struct WriterProperties { data_pagesize_limit: usize, dictionary_pagesize_limit: usize, + data_page_row_count_limit: usize, write_batch_size: usize, max_row_group_size: usize, writer_version: WriterVersion, @@ -112,15 +113,29 @@ impl WriterProperties { } /// Returns data page size limit. + /// + /// Note: this is a best effort limit based on the write batch size pub fn data_pagesize_limit(&self) -> usize { self.data_pagesize_limit } /// Returns dictionary page size limit. + /// + /// Note: this is a best effort limit based on the write batch size pub fn dictionary_pagesize_limit(&self) -> usize { self.dictionary_pagesize_limit } + /// Returns the maximum page row count + /// + /// This can be used to limit the number of rows within a page to + /// yield better page pruning + /// + /// Note: this is a best effort limit based on the write batch size + pub fn data_page_row_count_limit(&self) -> usize { + self.data_page_row_count_limit + } + /// Returns configured batch size for writes. /// /// When writing a batch of data, this setting allows to split it internally into @@ -222,6 +237,7 @@ impl WriterProperties { pub struct WriterPropertiesBuilder { data_pagesize_limit: usize, dictionary_pagesize_limit: usize, + data_page_row_count_limit: usize, write_batch_size: usize, max_row_group_size: usize, writer_version: WriterVersion, @@ -237,6 +253,7 @@ impl WriterPropertiesBuilder { Self { data_pagesize_limit: DEFAULT_PAGE_SIZE, dictionary_pagesize_limit: DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT, + data_page_row_count_limit: usize::MAX, write_batch_size: DEFAULT_WRITE_BATCH_SIZE, max_row_group_size: DEFAULT_MAX_ROW_GROUP_SIZE, writer_version: DEFAULT_WRITER_VERSION, @@ -252,6 +269,7 @@ impl WriterPropertiesBuilder { WriterProperties { data_pagesize_limit: self.data_pagesize_limit, dictionary_pagesize_limit: self.dictionary_pagesize_limit, + data_page_row_count_limit: self.data_page_row_count_limit, write_batch_size: self.write_batch_size, max_row_group_size: self.max_row_group_size, writer_version: self.writer_version, @@ -271,19 +289,38 @@ impl WriterPropertiesBuilder { self } - /// Sets data page size limit. + /// Sets best effort maximum size of a data page in bytes + /// + /// Note: this is a best effort limit based on the write batch size pub fn set_data_pagesize_limit(mut self, value: usize) -> Self { self.data_pagesize_limit = value; self } - /// Sets dictionary page size limit. + /// Sets best effort maximum number of rows in a data page + /// + /// + /// This can be used to limit the number of rows within a page to + /// yield better page pruning + /// + /// Note: this is a best effort limit based on the write batch size + pub fn set_data_page_row_count_limit(mut self, value: usize) -> Self { + self.data_page_row_count_limit = value; + self + } + + /// Sets best effort maximum dictionary page size, in bytes + /// + /// Note: this is a best effort limit based on the write batch size pub fn set_dictionary_pagesize_limit(mut self, value: usize) -> Self { self.dictionary_pagesize_limit = value; self } - /// Sets write batch size. + /// Sets write batch size + /// + /// Data is written in batches of this size, acting as an upper-bound on + /// the enforcement granularity of page limits pub fn set_write_batch_size(mut self, value: usize) -> Self { self.write_batch_size = value; self diff --git a/parquet/tests/arrow_writer_layout.rs b/parquet/tests/arrow_writer_layout.rs index 40076add325..e43456eb6f4 100644 --- a/parquet/tests/arrow_writer_layout.rs +++ b/parquet/tests/arrow_writer_layout.rs @@ -249,7 +249,7 @@ fn test_primitive() { do_test(LayoutTest { props, - batches: vec![batch], + batches: vec![batch.clone()], layout: Layout { row_groups: vec![RowGroup { columns: vec![ColumnChunk { @@ -308,6 +308,34 @@ fn test_primitive() { }], }, }); + + // Test row count limit + let props = WriterProperties::builder() + .set_dictionary_enabled(false) + .set_data_page_row_count_limit(100) + .set_write_batch_size(100) + .build(); + + do_test(LayoutTest { + props, + batches: vec![batch], + layout: Layout { + row_groups: vec![RowGroup { + columns: vec![ColumnChunk { + pages: (0..20) + .map(|_| Page { + rows: 100, + page_header_size: 34, + compressed_size: 400, + encoding: Encoding::PLAIN, + page_type: PageType::DATA_PAGE, + }) + .collect(), + dictionary_page: None, + }], + }], + }, + }); } #[test]