From 5ee43dec690191987ac1e220152912206cc494ed Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Tue, 18 Oct 2022 14:51:17 +1300 Subject: [PATCH] Respect Page Size Limits in ArrowWriter (#2853) --- parquet/src/arrow/arrow_writer/byte_array.rs | 3 +- parquet/src/column/writer/encoder.rs | 1 + .../src/encodings/encoding/dict_encoder.rs | 3 +- parquet/src/encodings/levels.rs | 9 +- parquet/src/encodings/rle.rs | 43 +-- parquet/tests/writer_layout.rs | 285 ++++++++++++++++++ 6 files changed, 313 insertions(+), 31 deletions(-) create mode 100644 parquet/tests/writer_layout.rs diff --git a/parquet/src/arrow/arrow_writer/byte_array.rs b/parquet/src/arrow/arrow_writer/byte_array.rs index 9ea3767a28e..1bdc52b63e6 100644 --- a/parquet/src/arrow/arrow_writer/byte_array.rs +++ b/parquet/src/arrow/arrow_writer/byte_array.rs @@ -379,8 +379,7 @@ impl DictEncoder { fn estimated_data_page_size(&self) -> usize { let bit_width = self.bit_width(); - 1 + RleEncoder::min_buffer_size(bit_width) - + RleEncoder::max_buffer_size(bit_width, self.indices.len()) + 1 + RleEncoder::max_buffer_size(bit_width, self.indices.len()) } fn estimated_dict_page_size(&self) -> usize { diff --git a/parquet/src/column/writer/encoder.rs b/parquet/src/column/writer/encoder.rs index 4fb4f210e14..9227c4ba1ce 100644 --- a/parquet/src/column/writer/encoder.rs +++ b/parquet/src/column/writer/encoder.rs @@ -201,6 +201,7 @@ impl ColumnValueEncoder for ColumnValueEncoderImpl { } fn write_gather(&mut self, values: &Self::Values, indices: &[usize]) -> Result<()> { + self.num_values += indices.len(); let slice: Vec<_> = indices.iter().map(|idx| values[*idx].clone()).collect(); self.write_slice(&slice) } diff --git a/parquet/src/encodings/encoding/dict_encoder.rs b/parquet/src/encodings/encoding/dict_encoder.rs index 18deba65e68..1b516452083 100644 --- a/parquet/src/encodings/encoding/dict_encoder.rs +++ b/parquet/src/encodings/encoding/dict_encoder.rs @@ -162,8 +162,7 @@ impl Encoder for DictEncoder { fn estimated_data_encoded_size(&self) -> usize { let bit_width = self.bit_width(); - 1 + RleEncoder::min_buffer_size(bit_width) - + RleEncoder::max_buffer_size(bit_width, self.indices.len()) + RleEncoder::max_buffer_size(bit_width, self.indices.len()) } fn flush_buffer(&mut self) -> Result { diff --git a/parquet/src/encodings/levels.rs b/parquet/src/encodings/levels.rs index 95384926ddb..cf1da20b684 100644 --- a/parquet/src/encodings/levels.rs +++ b/parquet/src/encodings/levels.rs @@ -38,13 +38,8 @@ pub fn max_buffer_size( ) -> usize { let bit_width = num_required_bits(max_level as u64); match encoding { - Encoding::RLE => { - RleEncoder::max_buffer_size(bit_width, num_buffered_values) - + RleEncoder::min_buffer_size(bit_width) - } - Encoding::BIT_PACKED => { - ceil((num_buffered_values * bit_width as usize) as i64, 8) as usize - } + Encoding::RLE => RleEncoder::max_buffer_size(bit_width, num_buffered_values), + Encoding::BIT_PACKED => ceil(num_buffered_values * bit_width as usize, 8), _ => panic!("Unsupported encoding type {}", encoding), } } diff --git a/parquet/src/encodings/rle.rs b/parquet/src/encodings/rle.rs index 93dd4ab565c..e8f5b6ad826 100644 --- a/parquet/src/encodings/rle.rs +++ b/parquet/src/encodings/rle.rs @@ -42,9 +42,8 @@ use crate::util::{ /// repeated-value := value that is repeated, using a fixed-width of /// round-up-to-next-byte(bit-width) -/// Maximum groups per bit-packed run. Current value is 64. +/// Maximum groups of 8 values per bit-packed run. Current value is 64. const MAX_GROUPS_PER_BIT_PACKED_RUN: usize = 1 << 6; -const MAX_VALUES_PER_BIT_PACKED_RUN: usize = MAX_GROUPS_PER_BIT_PACKED_RUN * 8; /// A RLE/Bit-Packing hybrid encoder. // TODO: tracking memory usage @@ -101,29 +100,33 @@ impl RleEncoder { /// Returns the minimum buffer size needed to use the encoder for `bit_width`. /// This is the maximum length of a single run for `bit_width`. + #[allow(unused)] pub fn min_buffer_size(bit_width: u8) -> usize { - let max_bit_packed_run_size = 1 + bit_util::ceil( - (MAX_VALUES_PER_BIT_PACKED_RUN * bit_width as usize) as i64, - 8, - ); - let max_rle_run_size = - bit_util::MAX_VLQ_BYTE_LEN + bit_util::ceil(bit_width as i64, 8) as usize; - std::cmp::max(max_bit_packed_run_size as usize, max_rle_run_size) + let b = bit_width as usize; + let max_bit_packed_run_size = 1 + MAX_GROUPS_PER_BIT_PACKED_RUN * b; + let max_rle_run_size = 1 + bit_util::ceil(b, 8); + max_bit_packed_run_size.max(max_rle_run_size) } - /// Returns the maximum buffer size takes to encode `num_values` values with + /// Returns the maximum buffer size to encode `num_values` values with /// `bit_width`. pub fn max_buffer_size(bit_width: u8, num_values: usize) -> usize { - // First the maximum size for bit-packed run - let bytes_per_run = bit_width; - let num_runs = bit_util::ceil(num_values as i64, 8) as usize; - let bit_packed_max_size = num_runs + num_runs * bytes_per_run as usize; - - // Second the maximum size for RLE run - let min_rle_run_size = 1 + bit_util::ceil(bit_width as i64, 8) as usize; - let rle_max_size = - bit_util::ceil(num_values as i64, 8) as usize * min_rle_run_size; - std::cmp::max(bit_packed_max_size, rle_max_size) as usize + // The maximum size occurs with the shortest possible runs of 8 + let num_runs = bit_util::ceil(num_values, 8); + + // The number of bytes in a run of 8 + let bytes_per_run = bit_width as usize; + + // The maximum size if stored as shortest possible bit packed runs of 8 + let bit_packed_max_size = num_runs + num_runs * bytes_per_run; + + // The length of an RLE run of 8 + let min_rle_run_size = 1 + bit_util::ceil(bit_width as usize, 8); + + // The maximum size if stored as shortest possible RLE runs of 8 + let rle_max_size = num_runs * min_rle_run_size; + + bit_packed_max_size.max(rle_max_size) } /// Encodes `value`, which must be representable with `bit_width` bits. diff --git a/parquet/tests/writer_layout.rs b/parquet/tests/writer_layout.rs new file mode 100644 index 00000000000..de72ea9c06b --- /dev/null +++ b/parquet/tests/writer_layout.rs @@ -0,0 +1,285 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Tests that the ArrowWriter correctly lays out values into multiple pages + +use arrow::array::Int32Array; +use arrow::record_batch::RecordBatch; +use bytes::Bytes; +use parquet::arrow::arrow_reader::{ArrowReaderOptions, ParquetRecordBatchReaderBuilder}; +use parquet::arrow::ArrowWriter; +use parquet::basic::{Encoding, PageType}; +use parquet::file::metadata::ParquetMetaData; +use parquet::file::properties::WriterProperties; +use parquet::file::reader::SerializedPageReader; +use std::sync::Arc; + +struct Layout { + row_groups: Vec, +} + +struct RowGroup { + columns: Vec, +} + +struct ColumnChunk { + pages: Vec, + dictionary_page: Option, +} + +struct Page { + rows: usize, + compressed_size: usize, + page_header_size: usize, + encoding: Encoding, + page_type: PageType, +} + +struct LayoutTest { + props: WriterProperties, + batches: Vec, + layout: Layout, +} + +fn do_test(test: LayoutTest) { + let mut buf = Vec::with_capacity(1024); + + let mut writer = + ArrowWriter::try_new(&mut buf, test.batches[0].schema(), Some(test.props)) + .unwrap(); + for batch in test.batches { + writer.write(&batch).unwrap(); + } + writer.close().unwrap(); + let b = Bytes::from(buf); + + // Re-read file to decode column index + let read_options = ArrowReaderOptions::new().with_page_index(true); + let reader = + ParquetRecordBatchReaderBuilder::try_new_with_options(b.clone(), read_options) + .unwrap(); + + assert_layout(&b, reader.metadata().as_ref(), &test.layout); +} + +fn assert_layout(file_reader: &Bytes, meta: &ParquetMetaData, layout: &Layout) { + assert_eq!(meta.row_groups().len(), layout.row_groups.len()); + for (row_group, row_group_layout) in meta.row_groups().iter().zip(&layout.row_groups) + { + // Check against offset index + let offset_index = row_group.page_offset_index().as_ref().unwrap(); + assert_eq!(offset_index.len(), row_group_layout.columns.len()); + + for (column_index, column_layout) in + offset_index.iter().zip(&row_group_layout.columns) + { + assert_eq!(column_index.len(), column_layout.pages.len()); + for (idx, (page, page_layout)) in + column_index.iter().zip(&column_layout.pages).enumerate() + { + assert_eq!( + page.compressed_page_size as usize, + page_layout.compressed_size + page_layout.page_header_size + ); + let next_first_row_index = column_index + .get(idx + 1) + .map(|x| x.first_row_index) + .unwrap_or_else(|| row_group.num_rows()); + + let num_rows = next_first_row_index - page.first_row_index; + assert_eq!(num_rows as usize, page_layout.rows); + } + } + + // Check against page data + assert_eq!(row_group.columns().len(), row_group_layout.columns.len()); + for (column, column_layout) in + row_group.columns().iter().zip(&row_group_layout.columns) + { + let page_reader = SerializedPageReader::new( + Arc::new(file_reader.clone()), + column, + row_group.num_rows() as usize, + None, + ) + .unwrap(); + + let pages = page_reader.collect::, _>>().unwrap(); + assert_eq!( + pages.len(), + column_layout.pages.len() + + column_layout.dictionary_page.is_some() as usize + ); + + let page_layouts = column_layout + .dictionary_page + .iter() + .chain(&column_layout.pages); + + for (page, page_layout) in pages.iter().zip(page_layouts) { + assert_eq!(page.encoding(), page_layout.encoding); + assert_eq!(page.buffer().len(), page_layout.compressed_size); + assert_eq!(page.page_type(), page_layout.page_type); + } + } + } +} + +#[test] +fn test_primitive() { + let array = Arc::new(Int32Array::from_iter_values(0..2000)) as _; + let batch = RecordBatch::try_from_iter([("col", array)]).unwrap(); + let props = WriterProperties::builder() + .set_dictionary_enabled(false) + .set_data_pagesize_limit(1000) + .set_write_batch_size(10) + .build(); + + // Test spill plain encoding pages + do_test(LayoutTest { + props, + batches: vec![batch.clone()], + layout: Layout { + row_groups: vec![RowGroup { + columns: vec![ColumnChunk { + pages: (0..8) + .map(|_| Page { + rows: 250, + page_header_size: 34, + compressed_size: 1000, + encoding: Encoding::PLAIN, + page_type: PageType::DATA_PAGE, + }) + .collect(), + dictionary_page: None, + }], + }], + }, + }); + + // Test spill dictionary + let props = WriterProperties::builder() + .set_dictionary_enabled(true) + .set_dictionary_pagesize_limit(1000) + .set_data_pagesize_limit(10000) + .set_write_batch_size(10) + .build(); + + do_test(LayoutTest { + props, + batches: vec![batch.clone()], + layout: Layout { + row_groups: vec![RowGroup { + columns: vec![ColumnChunk { + pages: vec![ + Page { + rows: 250, + page_header_size: 34, + compressed_size: 258, + encoding: Encoding::RLE_DICTIONARY, + page_type: PageType::DATA_PAGE, + }, + Page { + rows: 1750, + page_header_size: 34, + compressed_size: 7000, + encoding: Encoding::PLAIN, + page_type: PageType::DATA_PAGE, + }, + ], + dictionary_page: Some(Page { + rows: 250, + page_header_size: 34, + compressed_size: 1000, + encoding: Encoding::PLAIN, + page_type: PageType::DICTIONARY_PAGE, + }), + }], + }], + }, + }); + + // Test spill dictionary encoded pages + let props = WriterProperties::builder() + .set_dictionary_enabled(true) + .set_dictionary_pagesize_limit(10000) + .set_data_pagesize_limit(500) + .set_write_batch_size(10) + .build(); + + do_test(LayoutTest { + props, + batches: vec![batch], + layout: Layout { + row_groups: vec![RowGroup { + columns: vec![ColumnChunk { + pages: vec![ + Page { + rows: 400, + page_header_size: 34, + compressed_size: 452, + encoding: Encoding::RLE_DICTIONARY, + page_type: PageType::DATA_PAGE, + }, + Page { + rows: 370, + page_header_size: 34, + compressed_size: 472, + encoding: Encoding::RLE_DICTIONARY, + page_type: PageType::DATA_PAGE, + }, + Page { + rows: 330, + page_header_size: 34, + compressed_size: 464, + encoding: Encoding::RLE_DICTIONARY, + page_type: PageType::DATA_PAGE, + }, + Page { + rows: 330, + page_header_size: 34, + compressed_size: 464, + encoding: Encoding::RLE_DICTIONARY, + page_type: PageType::DATA_PAGE, + }, + Page { + rows: 330, + page_header_size: 34, + compressed_size: 464, + encoding: Encoding::RLE_DICTIONARY, + page_type: PageType::DATA_PAGE, + }, + Page { + rows: 240, + page_header_size: 34, + compressed_size: 332, + encoding: Encoding::RLE_DICTIONARY, + page_type: PageType::DATA_PAGE, + }, + ], + dictionary_page: Some(Page { + rows: 1000, + page_header_size: 34, + compressed_size: 8000, + encoding: Encoding::PLAIN, + page_type: PageType::DICTIONARY_PAGE, + }), + }], + }], + }, + }); +}