From 5ee43dec690191987ac1e220152912206cc494ed Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Tue, 18 Oct 2022 14:51:17 +1300 Subject: [PATCH 1/5] Respect Page Size Limits in ArrowWriter (#2853) --- parquet/src/arrow/arrow_writer/byte_array.rs | 3 +- parquet/src/column/writer/encoder.rs | 1 + .../src/encodings/encoding/dict_encoder.rs | 3 +- parquet/src/encodings/levels.rs | 9 +- parquet/src/encodings/rle.rs | 43 +-- parquet/tests/writer_layout.rs | 285 ++++++++++++++++++ 6 files changed, 313 insertions(+), 31 deletions(-) create mode 100644 parquet/tests/writer_layout.rs diff --git a/parquet/src/arrow/arrow_writer/byte_array.rs b/parquet/src/arrow/arrow_writer/byte_array.rs index 9ea3767a28e..1bdc52b63e6 100644 --- a/parquet/src/arrow/arrow_writer/byte_array.rs +++ b/parquet/src/arrow/arrow_writer/byte_array.rs @@ -379,8 +379,7 @@ impl DictEncoder { fn estimated_data_page_size(&self) -> usize { let bit_width = self.bit_width(); - 1 + RleEncoder::min_buffer_size(bit_width) - + RleEncoder::max_buffer_size(bit_width, self.indices.len()) + 1 + RleEncoder::max_buffer_size(bit_width, self.indices.len()) } fn estimated_dict_page_size(&self) -> usize { diff --git a/parquet/src/column/writer/encoder.rs b/parquet/src/column/writer/encoder.rs index 4fb4f210e14..9227c4ba1ce 100644 --- a/parquet/src/column/writer/encoder.rs +++ b/parquet/src/column/writer/encoder.rs @@ -201,6 +201,7 @@ impl ColumnValueEncoder for ColumnValueEncoderImpl { } fn write_gather(&mut self, values: &Self::Values, indices: &[usize]) -> Result<()> { + self.num_values += indices.len(); let slice: Vec<_> = indices.iter().map(|idx| values[*idx].clone()).collect(); self.write_slice(&slice) } diff --git a/parquet/src/encodings/encoding/dict_encoder.rs b/parquet/src/encodings/encoding/dict_encoder.rs index 18deba65e68..1b516452083 100644 --- a/parquet/src/encodings/encoding/dict_encoder.rs +++ b/parquet/src/encodings/encoding/dict_encoder.rs @@ -162,8 +162,7 @@ impl Encoder for DictEncoder { fn estimated_data_encoded_size(&self) -> usize { let bit_width = self.bit_width(); - 1 + RleEncoder::min_buffer_size(bit_width) - + RleEncoder::max_buffer_size(bit_width, self.indices.len()) + RleEncoder::max_buffer_size(bit_width, self.indices.len()) } fn flush_buffer(&mut self) -> Result { diff --git a/parquet/src/encodings/levels.rs b/parquet/src/encodings/levels.rs index 95384926ddb..cf1da20b684 100644 --- a/parquet/src/encodings/levels.rs +++ b/parquet/src/encodings/levels.rs @@ -38,13 +38,8 @@ pub fn max_buffer_size( ) -> usize { let bit_width = num_required_bits(max_level as u64); match encoding { - Encoding::RLE => { - RleEncoder::max_buffer_size(bit_width, num_buffered_values) - + RleEncoder::min_buffer_size(bit_width) - } - Encoding::BIT_PACKED => { - ceil((num_buffered_values * bit_width as usize) as i64, 8) as usize - } + Encoding::RLE => RleEncoder::max_buffer_size(bit_width, num_buffered_values), + Encoding::BIT_PACKED => ceil(num_buffered_values * bit_width as usize, 8), _ => panic!("Unsupported encoding type {}", encoding), } } diff --git a/parquet/src/encodings/rle.rs b/parquet/src/encodings/rle.rs index 93dd4ab565c..e8f5b6ad826 100644 --- a/parquet/src/encodings/rle.rs +++ b/parquet/src/encodings/rle.rs @@ -42,9 +42,8 @@ use crate::util::{ /// repeated-value := value that is repeated, using a fixed-width of /// round-up-to-next-byte(bit-width) -/// Maximum groups per bit-packed run. Current value is 64. +/// Maximum groups of 8 values per bit-packed run. Current value is 64. const MAX_GROUPS_PER_BIT_PACKED_RUN: usize = 1 << 6; -const MAX_VALUES_PER_BIT_PACKED_RUN: usize = MAX_GROUPS_PER_BIT_PACKED_RUN * 8; /// A RLE/Bit-Packing hybrid encoder. // TODO: tracking memory usage @@ -101,29 +100,33 @@ impl RleEncoder { /// Returns the minimum buffer size needed to use the encoder for `bit_width`. /// This is the maximum length of a single run for `bit_width`. + #[allow(unused)] pub fn min_buffer_size(bit_width: u8) -> usize { - let max_bit_packed_run_size = 1 + bit_util::ceil( - (MAX_VALUES_PER_BIT_PACKED_RUN * bit_width as usize) as i64, - 8, - ); - let max_rle_run_size = - bit_util::MAX_VLQ_BYTE_LEN + bit_util::ceil(bit_width as i64, 8) as usize; - std::cmp::max(max_bit_packed_run_size as usize, max_rle_run_size) + let b = bit_width as usize; + let max_bit_packed_run_size = 1 + MAX_GROUPS_PER_BIT_PACKED_RUN * b; + let max_rle_run_size = 1 + bit_util::ceil(b, 8); + max_bit_packed_run_size.max(max_rle_run_size) } - /// Returns the maximum buffer size takes to encode `num_values` values with + /// Returns the maximum buffer size to encode `num_values` values with /// `bit_width`. pub fn max_buffer_size(bit_width: u8, num_values: usize) -> usize { - // First the maximum size for bit-packed run - let bytes_per_run = bit_width; - let num_runs = bit_util::ceil(num_values as i64, 8) as usize; - let bit_packed_max_size = num_runs + num_runs * bytes_per_run as usize; - - // Second the maximum size for RLE run - let min_rle_run_size = 1 + bit_util::ceil(bit_width as i64, 8) as usize; - let rle_max_size = - bit_util::ceil(num_values as i64, 8) as usize * min_rle_run_size; - std::cmp::max(bit_packed_max_size, rle_max_size) as usize + // The maximum size occurs with the shortest possible runs of 8 + let num_runs = bit_util::ceil(num_values, 8); + + // The number of bytes in a run of 8 + let bytes_per_run = bit_width as usize; + + // The maximum size if stored as shortest possible bit packed runs of 8 + let bit_packed_max_size = num_runs + num_runs * bytes_per_run; + + // The length of an RLE run of 8 + let min_rle_run_size = 1 + bit_util::ceil(bit_width as usize, 8); + + // The maximum size if stored as shortest possible RLE runs of 8 + let rle_max_size = num_runs * min_rle_run_size; + + bit_packed_max_size.max(rle_max_size) } /// Encodes `value`, which must be representable with `bit_width` bits. diff --git a/parquet/tests/writer_layout.rs b/parquet/tests/writer_layout.rs new file mode 100644 index 00000000000..de72ea9c06b --- /dev/null +++ b/parquet/tests/writer_layout.rs @@ -0,0 +1,285 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Tests that the ArrowWriter correctly lays out values into multiple pages + +use arrow::array::Int32Array; +use arrow::record_batch::RecordBatch; +use bytes::Bytes; +use parquet::arrow::arrow_reader::{ArrowReaderOptions, ParquetRecordBatchReaderBuilder}; +use parquet::arrow::ArrowWriter; +use parquet::basic::{Encoding, PageType}; +use parquet::file::metadata::ParquetMetaData; +use parquet::file::properties::WriterProperties; +use parquet::file::reader::SerializedPageReader; +use std::sync::Arc; + +struct Layout { + row_groups: Vec, +} + +struct RowGroup { + columns: Vec, +} + +struct ColumnChunk { + pages: Vec, + dictionary_page: Option, +} + +struct Page { + rows: usize, + compressed_size: usize, + page_header_size: usize, + encoding: Encoding, + page_type: PageType, +} + +struct LayoutTest { + props: WriterProperties, + batches: Vec, + layout: Layout, +} + +fn do_test(test: LayoutTest) { + let mut buf = Vec::with_capacity(1024); + + let mut writer = + ArrowWriter::try_new(&mut buf, test.batches[0].schema(), Some(test.props)) + .unwrap(); + for batch in test.batches { + writer.write(&batch).unwrap(); + } + writer.close().unwrap(); + let b = Bytes::from(buf); + + // Re-read file to decode column index + let read_options = ArrowReaderOptions::new().with_page_index(true); + let reader = + ParquetRecordBatchReaderBuilder::try_new_with_options(b.clone(), read_options) + .unwrap(); + + assert_layout(&b, reader.metadata().as_ref(), &test.layout); +} + +fn assert_layout(file_reader: &Bytes, meta: &ParquetMetaData, layout: &Layout) { + assert_eq!(meta.row_groups().len(), layout.row_groups.len()); + for (row_group, row_group_layout) in meta.row_groups().iter().zip(&layout.row_groups) + { + // Check against offset index + let offset_index = row_group.page_offset_index().as_ref().unwrap(); + assert_eq!(offset_index.len(), row_group_layout.columns.len()); + + for (column_index, column_layout) in + offset_index.iter().zip(&row_group_layout.columns) + { + assert_eq!(column_index.len(), column_layout.pages.len()); + for (idx, (page, page_layout)) in + column_index.iter().zip(&column_layout.pages).enumerate() + { + assert_eq!( + page.compressed_page_size as usize, + page_layout.compressed_size + page_layout.page_header_size + ); + let next_first_row_index = column_index + .get(idx + 1) + .map(|x| x.first_row_index) + .unwrap_or_else(|| row_group.num_rows()); + + let num_rows = next_first_row_index - page.first_row_index; + assert_eq!(num_rows as usize, page_layout.rows); + } + } + + // Check against page data + assert_eq!(row_group.columns().len(), row_group_layout.columns.len()); + for (column, column_layout) in + row_group.columns().iter().zip(&row_group_layout.columns) + { + let page_reader = SerializedPageReader::new( + Arc::new(file_reader.clone()), + column, + row_group.num_rows() as usize, + None, + ) + .unwrap(); + + let pages = page_reader.collect::, _>>().unwrap(); + assert_eq!( + pages.len(), + column_layout.pages.len() + + column_layout.dictionary_page.is_some() as usize + ); + + let page_layouts = column_layout + .dictionary_page + .iter() + .chain(&column_layout.pages); + + for (page, page_layout) in pages.iter().zip(page_layouts) { + assert_eq!(page.encoding(), page_layout.encoding); + assert_eq!(page.buffer().len(), page_layout.compressed_size); + assert_eq!(page.page_type(), page_layout.page_type); + } + } + } +} + +#[test] +fn test_primitive() { + let array = Arc::new(Int32Array::from_iter_values(0..2000)) as _; + let batch = RecordBatch::try_from_iter([("col", array)]).unwrap(); + let props = WriterProperties::builder() + .set_dictionary_enabled(false) + .set_data_pagesize_limit(1000) + .set_write_batch_size(10) + .build(); + + // Test spill plain encoding pages + do_test(LayoutTest { + props, + batches: vec![batch.clone()], + layout: Layout { + row_groups: vec![RowGroup { + columns: vec![ColumnChunk { + pages: (0..8) + .map(|_| Page { + rows: 250, + page_header_size: 34, + compressed_size: 1000, + encoding: Encoding::PLAIN, + page_type: PageType::DATA_PAGE, + }) + .collect(), + dictionary_page: None, + }], + }], + }, + }); + + // Test spill dictionary + let props = WriterProperties::builder() + .set_dictionary_enabled(true) + .set_dictionary_pagesize_limit(1000) + .set_data_pagesize_limit(10000) + .set_write_batch_size(10) + .build(); + + do_test(LayoutTest { + props, + batches: vec![batch.clone()], + layout: Layout { + row_groups: vec![RowGroup { + columns: vec![ColumnChunk { + pages: vec![ + Page { + rows: 250, + page_header_size: 34, + compressed_size: 258, + encoding: Encoding::RLE_DICTIONARY, + page_type: PageType::DATA_PAGE, + }, + Page { + rows: 1750, + page_header_size: 34, + compressed_size: 7000, + encoding: Encoding::PLAIN, + page_type: PageType::DATA_PAGE, + }, + ], + dictionary_page: Some(Page { + rows: 250, + page_header_size: 34, + compressed_size: 1000, + encoding: Encoding::PLAIN, + page_type: PageType::DICTIONARY_PAGE, + }), + }], + }], + }, + }); + + // Test spill dictionary encoded pages + let props = WriterProperties::builder() + .set_dictionary_enabled(true) + .set_dictionary_pagesize_limit(10000) + .set_data_pagesize_limit(500) + .set_write_batch_size(10) + .build(); + + do_test(LayoutTest { + props, + batches: vec![batch], + layout: Layout { + row_groups: vec![RowGroup { + columns: vec![ColumnChunk { + pages: vec![ + Page { + rows: 400, + page_header_size: 34, + compressed_size: 452, + encoding: Encoding::RLE_DICTIONARY, + page_type: PageType::DATA_PAGE, + }, + Page { + rows: 370, + page_header_size: 34, + compressed_size: 472, + encoding: Encoding::RLE_DICTIONARY, + page_type: PageType::DATA_PAGE, + }, + Page { + rows: 330, + page_header_size: 34, + compressed_size: 464, + encoding: Encoding::RLE_DICTIONARY, + page_type: PageType::DATA_PAGE, + }, + Page { + rows: 330, + page_header_size: 34, + compressed_size: 464, + encoding: Encoding::RLE_DICTIONARY, + page_type: PageType::DATA_PAGE, + }, + Page { + rows: 330, + page_header_size: 34, + compressed_size: 464, + encoding: Encoding::RLE_DICTIONARY, + page_type: PageType::DATA_PAGE, + }, + Page { + rows: 240, + page_header_size: 34, + compressed_size: 332, + encoding: Encoding::RLE_DICTIONARY, + page_type: PageType::DATA_PAGE, + }, + ], + dictionary_page: Some(Page { + rows: 1000, + page_header_size: 34, + compressed_size: 8000, + encoding: Encoding::PLAIN, + page_type: PageType::DICTIONARY_PAGE, + }), + }], + }], + }, + }); +} From e631366d1ab1fd4712907fc2a859d44e9667a997 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Tue, 18 Oct 2022 15:11:11 +1300 Subject: [PATCH 2/5] Update tests --- parquet/src/column/writer/mod.rs | 12 +++++------- parquet/src/encodings/encoding/mod.rs | 2 +- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs index 55e667043d3..0f96b6fd78e 100644 --- a/parquet/src/column/writer/mod.rs +++ b/parquet/src/column/writer/mod.rs @@ -1825,7 +1825,7 @@ mod tests { let page_writer = Box::new(SerializedPageWriter::new(&mut writer)); let props = Arc::new( WriterProperties::builder() - .set_data_pagesize_limit(15) // actually each page will have size 15-18 bytes + .set_data_pagesize_limit(10) .set_write_batch_size(3) // write 3 values at a time .build(), ); @@ -1846,16 +1846,14 @@ mod tests { ); let mut res = Vec::new(); while let Some(page) = page_reader.get_next_page().unwrap() { - res.push((page.page_type(), page.num_values())); + res.push((page.page_type(), page.num_values(), page.buffer().len())); } assert_eq!( res, vec![ - (PageType::DICTIONARY_PAGE, 10), - (PageType::DATA_PAGE, 3), - (PageType::DATA_PAGE, 3), - (PageType::DATA_PAGE, 3), - (PageType::DATA_PAGE, 1) + (PageType::DICTIONARY_PAGE, 10, 40), + (PageType::DATA_PAGE, 9, 10), + (PageType::DATA_PAGE, 1, 3), ] ); } diff --git a/parquet/src/encodings/encoding/mod.rs b/parquet/src/encodings/encoding/mod.rs index 34d3bb3d4c7..78f4a8b97b3 100644 --- a/parquet/src/encodings/encoding/mod.rs +++ b/parquet/src/encodings/encoding/mod.rs @@ -888,7 +888,7 @@ mod tests { // DICTIONARY // NOTE: The final size is almost the same because the dictionary entries are // preserved after encoded values have been written. - run_test::(Encoding::RLE_DICTIONARY, -1, &[123, 1024], 11, 68, 66); + run_test::(Encoding::RLE_DICTIONARY, -1, &[123, 1024], 0, 2, 0); // DELTA_BINARY_PACKED run_test::(Encoding::DELTA_BINARY_PACKED, -1, &[123; 1024], 0, 35, 0); From f0ff52baf4f869690a2deb5f1a7c1f38ed047f58 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Tue, 18 Oct 2022 15:41:17 +1300 Subject: [PATCH 3/5] Add test required features --- parquet/Cargo.toml | 4 ++++ parquet/tests/{writer_layout.rs => arrow_writer_layout.rs} | 0 2 files changed, 4 insertions(+) rename parquet/tests/{writer_layout.rs => arrow_writer_layout.rs} (100%) diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml index 819f41bca32..9c7da94f9dd 100644 --- a/parquet/Cargo.toml +++ b/parquet/Cargo.toml @@ -81,6 +81,10 @@ experimental = [] # Enable async APIs async = ["futures", "tokio"] +[[test]] +name = "arrow_writer_layout" +required-features = ["arrow"] + [[bin]] name = "parquet-read" required-features = ["cli"] diff --git a/parquet/tests/writer_layout.rs b/parquet/tests/arrow_writer_layout.rs similarity index 100% rename from parquet/tests/writer_layout.rs rename to parquet/tests/arrow_writer_layout.rs From 95a5eb8204d13a7c7c7ff2fc0abfbca5c4dbb0c9 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Tue, 18 Oct 2022 17:31:15 +1300 Subject: [PATCH 4/5] Fix strings --- parquet/src/arrow/arrow_writer/byte_array.rs | 14 +- parquet/tests/arrow_writer_layout.rs | 209 ++++++++++++++++++- 2 files changed, 204 insertions(+), 19 deletions(-) diff --git a/parquet/src/arrow/arrow_writer/byte_array.rs b/parquet/src/arrow/arrow_writer/byte_array.rs index 1bdc52b63e6..7070cecacf2 100644 --- a/parquet/src/arrow/arrow_writer/byte_array.rs +++ b/parquet/src/arrow/arrow_writer/byte_array.rs @@ -426,7 +426,6 @@ impl DictEncoder { struct ByteArrayEncoder { fallback: FallbackEncoder, dict_encoder: Option, - num_values: usize, min_value: Option, max_value: Option, } @@ -465,7 +464,6 @@ impl ColumnValueEncoder for ByteArrayEncoder { Ok(Self { fallback, dict_encoder: dictionary, - num_values: 0, min_value: None, max_value: None, }) @@ -486,7 +484,10 @@ impl ColumnValueEncoder for ByteArrayEncoder { } fn num_values(&self) -> usize { - self.num_values + match &self.dict_encoder { + Some(encoder) => encoder.indices.len(), + None => self.fallback.num_values, + } } fn has_dictionary(&self) -> bool { @@ -507,7 +508,7 @@ impl ColumnValueEncoder for ByteArrayEncoder { fn flush_dict_page(&mut self) -> Result> { match self.dict_encoder.take() { Some(encoder) => { - if self.num_values != 0 { + if !encoder.indices.is_empty() { return Err(general_err!( "Must flush data pages before flushing dictionary" )); @@ -550,10 +551,7 @@ where match &mut encoder.dict_encoder { Some(dict_encoder) => dict_encoder.encode(values, indices), - None => { - encoder.num_values += indices.len(); - encoder.fallback.encode(values, indices) - } + None => encoder.fallback.encode(values, indices), } } diff --git a/parquet/tests/arrow_writer_layout.rs b/parquet/tests/arrow_writer_layout.rs index de72ea9c06b..40076add325 100644 --- a/parquet/tests/arrow_writer_layout.rs +++ b/parquet/tests/arrow_writer_layout.rs @@ -17,7 +17,7 @@ //! Tests that the ArrowWriter correctly lays out values into multiple pages -use arrow::array::Int32Array; +use arrow::array::{Int32Array, StringArray}; use arrow::record_batch::RecordBatch; use bytes::Bytes; use parquet::arrow::arrow_reader::{ArrowReaderOptions, ParquetRecordBatchReaderBuilder}; @@ -87,13 +87,19 @@ fn assert_layout(file_reader: &Bytes, meta: &ParquetMetaData, layout: &Layout) { for (column_index, column_layout) in offset_index.iter().zip(&row_group_layout.columns) { - assert_eq!(column_index.len(), column_layout.pages.len()); + assert_eq!( + column_index.len(), + column_layout.pages.len(), + "index page count mismatch" + ); for (idx, (page, page_layout)) in column_index.iter().zip(&column_layout.pages).enumerate() { assert_eq!( page.compressed_page_size as usize, - page_layout.compressed_size + page_layout.page_header_size + page_layout.compressed_size + page_layout.page_header_size, + "index page {} size mismatch", + idx ); let next_first_row_index = column_index .get(idx + 1) @@ -101,15 +107,28 @@ fn assert_layout(file_reader: &Bytes, meta: &ParquetMetaData, layout: &Layout) { .unwrap_or_else(|| row_group.num_rows()); let num_rows = next_first_row_index - page.first_row_index; - assert_eq!(num_rows as usize, page_layout.rows); + assert_eq!( + num_rows as usize, page_layout.rows, + "index page {} row count", + idx + ); } } // Check against page data - assert_eq!(row_group.columns().len(), row_group_layout.columns.len()); - for (column, column_layout) in - row_group.columns().iter().zip(&row_group_layout.columns) - { + assert_eq!( + row_group.columns().len(), + row_group_layout.columns.len(), + "column count mismatch" + ); + + let iter = row_group + .columns() + .iter() + .zip(&row_group_layout.columns) + .enumerate(); + + for (idx, (column, column_layout)) in iter { let page_reader = SerializedPageReader::new( Arc::new(file_reader.clone()), column, @@ -122,7 +141,9 @@ fn assert_layout(file_reader: &Bytes, meta: &ParquetMetaData, layout: &Layout) { assert_eq!( pages.len(), column_layout.pages.len() - + column_layout.dictionary_page.is_some() as usize + + column_layout.dictionary_page.is_some() as usize, + "page {} count mismatch", + idx ); let page_layouts = column_layout @@ -132,7 +153,12 @@ fn assert_layout(file_reader: &Bytes, meta: &ParquetMetaData, layout: &Layout) { for (page, page_layout) in pages.iter().zip(page_layouts) { assert_eq!(page.encoding(), page_layout.encoding); - assert_eq!(page.buffer().len(), page_layout.compressed_size); + assert_eq!( + page.buffer().len(), + page_layout.compressed_size, + "page {} size mismatch", + idx + ); assert_eq!(page.page_type(), page_layout.page_type); } } @@ -272,7 +298,7 @@ fn test_primitive() { }, ], dictionary_page: Some(Page { - rows: 1000, + rows: 2000, page_header_size: 34, compressed_size: 8000, encoding: Encoding::PLAIN, @@ -283,3 +309,164 @@ fn test_primitive() { }, }); } + +#[test] +fn test_string() { + let array = Arc::new(StringArray::from_iter_values( + (0..2000).map(|x| format!("{:04}", x)), + )) as _; + let batch = RecordBatch::try_from_iter([("col", array)]).unwrap(); + let props = WriterProperties::builder() + .set_dictionary_enabled(false) + .set_data_pagesize_limit(1000) + .set_write_batch_size(10) + .build(); + + // Test spill plain encoding pages + do_test(LayoutTest { + props, + batches: vec![batch.clone()], + layout: Layout { + row_groups: vec![RowGroup { + columns: vec![ColumnChunk { + pages: (0..15) + .map(|_| Page { + rows: 130, + page_header_size: 34, + compressed_size: 1040, + encoding: Encoding::PLAIN, + page_type: PageType::DATA_PAGE, + }) + .chain(std::iter::once(Page { + rows: 50, + page_header_size: 33, + compressed_size: 400, + encoding: Encoding::PLAIN, + page_type: PageType::DATA_PAGE, + })) + .collect(), + dictionary_page: None, + }], + }], + }, + }); + + // Test spill dictionary + let props = WriterProperties::builder() + .set_dictionary_enabled(true) + .set_dictionary_pagesize_limit(1000) + .set_data_pagesize_limit(10000) + .set_write_batch_size(10) + .build(); + + do_test(LayoutTest { + props, + batches: vec![batch.clone()], + layout: Layout { + row_groups: vec![RowGroup { + columns: vec![ColumnChunk { + pages: vec![ + Page { + rows: 130, + page_header_size: 34, + compressed_size: 138, + encoding: Encoding::RLE_DICTIONARY, + page_type: PageType::DATA_PAGE, + }, + Page { + rows: 1250, + page_header_size: 36, + compressed_size: 10000, + encoding: Encoding::PLAIN, + page_type: PageType::DATA_PAGE, + }, + Page { + rows: 620, + page_header_size: 34, + compressed_size: 4960, + encoding: Encoding::PLAIN, + page_type: PageType::DATA_PAGE, + }, + ], + dictionary_page: Some(Page { + rows: 130, + page_header_size: 34, + compressed_size: 1040, + encoding: Encoding::PLAIN, + page_type: PageType::DICTIONARY_PAGE, + }), + }], + }], + }, + }); + + // Test spill dictionary encoded pages + let props = WriterProperties::builder() + .set_dictionary_enabled(true) + .set_dictionary_pagesize_limit(20000) + .set_data_pagesize_limit(500) + .set_write_batch_size(10) + .build(); + + do_test(LayoutTest { + props, + batches: vec![batch], + layout: Layout { + row_groups: vec![RowGroup { + columns: vec![ColumnChunk { + pages: vec![ + Page { + rows: 400, + page_header_size: 34, + compressed_size: 452, + encoding: Encoding::RLE_DICTIONARY, + page_type: PageType::DATA_PAGE, + }, + Page { + rows: 370, + page_header_size: 34, + compressed_size: 472, + encoding: Encoding::RLE_DICTIONARY, + page_type: PageType::DATA_PAGE, + }, + Page { + rows: 330, + page_header_size: 34, + compressed_size: 464, + encoding: Encoding::RLE_DICTIONARY, + page_type: PageType::DATA_PAGE, + }, + Page { + rows: 330, + page_header_size: 34, + compressed_size: 464, + encoding: Encoding::RLE_DICTIONARY, + page_type: PageType::DATA_PAGE, + }, + Page { + rows: 330, + page_header_size: 34, + compressed_size: 464, + encoding: Encoding::RLE_DICTIONARY, + page_type: PageType::DATA_PAGE, + }, + Page { + rows: 240, + page_header_size: 34, + compressed_size: 332, + encoding: Encoding::RLE_DICTIONARY, + page_type: PageType::DATA_PAGE, + }, + ], + dictionary_page: Some(Page { + rows: 2000, + page_header_size: 34, + compressed_size: 16000, + encoding: Encoding::PLAIN, + page_type: PageType::DICTIONARY_PAGE, + }), + }], + }], + }, + }); +} From b85e526828c90842540b0cf4e2f755c917935ac8 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Wed, 19 Oct 2022 07:47:06 +1300 Subject: [PATCH 5/5] Review feedback --- parquet/src/encodings/rle.rs | 17 +++++------------ 1 file changed, 5 insertions(+), 12 deletions(-) diff --git a/parquet/src/encodings/rle.rs b/parquet/src/encodings/rle.rs index e8f5b6ad826..9475275cb62 100644 --- a/parquet/src/encodings/rle.rs +++ b/parquet/src/encodings/rle.rs @@ -98,16 +98,6 @@ impl RleEncoder { } } - /// Returns the minimum buffer size needed to use the encoder for `bit_width`. - /// This is the maximum length of a single run for `bit_width`. - #[allow(unused)] - pub fn min_buffer_size(bit_width: u8) -> usize { - let b = bit_width as usize; - let max_bit_packed_run_size = 1 + MAX_GROUPS_PER_BIT_PACKED_RUN * b; - let max_rle_run_size = 1 + bit_util::ceil(b, 8); - max_bit_packed_run_size.max(max_rle_run_size) - } - /// Returns the maximum buffer size to encode `num_values` values with /// `bit_width`. pub fn max_buffer_size(bit_width: u8, num_values: usize) -> usize { @@ -120,8 +110,11 @@ impl RleEncoder { // The maximum size if stored as shortest possible bit packed runs of 8 let bit_packed_max_size = num_runs + num_runs * bytes_per_run; + // The length of `8` VLQ encoded + let rle_len_prefix = 1; + // The length of an RLE run of 8 - let min_rle_run_size = 1 + bit_util::ceil(bit_width as usize, 8); + let min_rle_run_size = rle_len_prefix + bit_util::ceil(bit_width as usize, 8); // The maximum size if stored as shortest possible RLE runs of 8 let rle_max_size = num_runs * min_rle_run_size; @@ -908,8 +901,8 @@ mod tests { #[test] fn test_rle_specific_roundtrip() { let bit_width = 1; - let buffer_len = RleEncoder::min_buffer_size(bit_width); let values: Vec = vec![0, 1, 1, 1, 1, 0, 0, 0, 0, 1]; + let buffer_len = RleEncoder::max_buffer_size(bit_width, values.len()); let mut encoder = RleEncoder::new(bit_width, buffer_len); for v in &values { encoder.put(*v as u64)