Skip to content

Commit

Permalink
Respect Page Size Limits in ArrowWriter (apache#2853)
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed Oct 18, 2022
1 parent ede36d7 commit 5ee43de
Show file tree
Hide file tree
Showing 6 changed files with 313 additions and 31 deletions.
3 changes: 1 addition & 2 deletions parquet/src/arrow/arrow_writer/byte_array.rs
Expand Up @@ -379,8 +379,7 @@ impl DictEncoder {

fn estimated_data_page_size(&self) -> usize {
let bit_width = self.bit_width();
1 + RleEncoder::min_buffer_size(bit_width)
+ RleEncoder::max_buffer_size(bit_width, self.indices.len())
1 + RleEncoder::max_buffer_size(bit_width, self.indices.len())
}

fn estimated_dict_page_size(&self) -> usize {
Expand Down
1 change: 1 addition & 0 deletions parquet/src/column/writer/encoder.rs
Expand Up @@ -201,6 +201,7 @@ impl<T: DataType> ColumnValueEncoder for ColumnValueEncoderImpl<T> {
}

fn write_gather(&mut self, values: &Self::Values, indices: &[usize]) -> Result<()> {
self.num_values += indices.len();
let slice: Vec<_> = indices.iter().map(|idx| values[*idx].clone()).collect();
self.write_slice(&slice)
}
Expand Down
3 changes: 1 addition & 2 deletions parquet/src/encodings/encoding/dict_encoder.rs
Expand Up @@ -162,8 +162,7 @@ impl<T: DataType> Encoder<T> for DictEncoder<T> {

fn estimated_data_encoded_size(&self) -> usize {
let bit_width = self.bit_width();
1 + RleEncoder::min_buffer_size(bit_width)
+ RleEncoder::max_buffer_size(bit_width, self.indices.len())
RleEncoder::max_buffer_size(bit_width, self.indices.len())
}

fn flush_buffer(&mut self) -> Result<ByteBufferPtr> {
Expand Down
9 changes: 2 additions & 7 deletions parquet/src/encodings/levels.rs
Expand Up @@ -38,13 +38,8 @@ pub fn max_buffer_size(
) -> usize {
let bit_width = num_required_bits(max_level as u64);
match encoding {
Encoding::RLE => {
RleEncoder::max_buffer_size(bit_width, num_buffered_values)
+ RleEncoder::min_buffer_size(bit_width)
}
Encoding::BIT_PACKED => {
ceil((num_buffered_values * bit_width as usize) as i64, 8) as usize
}
Encoding::RLE => RleEncoder::max_buffer_size(bit_width, num_buffered_values),
Encoding::BIT_PACKED => ceil(num_buffered_values * bit_width as usize, 8),
_ => panic!("Unsupported encoding type {}", encoding),
}
}
Expand Down
43 changes: 23 additions & 20 deletions parquet/src/encodings/rle.rs
Expand Up @@ -42,9 +42,8 @@ use crate::util::{
/// repeated-value := value that is repeated, using a fixed-width of
/// round-up-to-next-byte(bit-width)

/// Maximum groups per bit-packed run. Current value is 64.
/// Maximum groups of 8 values per bit-packed run. Current value is 64.
const MAX_GROUPS_PER_BIT_PACKED_RUN: usize = 1 << 6;
const MAX_VALUES_PER_BIT_PACKED_RUN: usize = MAX_GROUPS_PER_BIT_PACKED_RUN * 8;

/// A RLE/Bit-Packing hybrid encoder.
// TODO: tracking memory usage
Expand Down Expand Up @@ -101,29 +100,33 @@ impl RleEncoder {

/// Returns the minimum buffer size needed to use the encoder for `bit_width`.
/// This is the maximum length of a single run for `bit_width`.
#[allow(unused)]
pub fn min_buffer_size(bit_width: u8) -> usize {
let max_bit_packed_run_size = 1 + bit_util::ceil(
(MAX_VALUES_PER_BIT_PACKED_RUN * bit_width as usize) as i64,
8,
);
let max_rle_run_size =
bit_util::MAX_VLQ_BYTE_LEN + bit_util::ceil(bit_width as i64, 8) as usize;
std::cmp::max(max_bit_packed_run_size as usize, max_rle_run_size)
let b = bit_width as usize;
let max_bit_packed_run_size = 1 + MAX_GROUPS_PER_BIT_PACKED_RUN * b;
let max_rle_run_size = 1 + bit_util::ceil(b, 8);
max_bit_packed_run_size.max(max_rle_run_size)
}

/// Returns the maximum buffer size takes to encode `num_values` values with
/// Returns the maximum buffer size to encode `num_values` values with
/// `bit_width`.
pub fn max_buffer_size(bit_width: u8, num_values: usize) -> usize {
// First the maximum size for bit-packed run
let bytes_per_run = bit_width;
let num_runs = bit_util::ceil(num_values as i64, 8) as usize;
let bit_packed_max_size = num_runs + num_runs * bytes_per_run as usize;

// Second the maximum size for RLE run
let min_rle_run_size = 1 + bit_util::ceil(bit_width as i64, 8) as usize;
let rle_max_size =
bit_util::ceil(num_values as i64, 8) as usize * min_rle_run_size;
std::cmp::max(bit_packed_max_size, rle_max_size) as usize
// The maximum size occurs with the shortest possible runs of 8
let num_runs = bit_util::ceil(num_values, 8);

// The number of bytes in a run of 8
let bytes_per_run = bit_width as usize;

// The maximum size if stored as shortest possible bit packed runs of 8
let bit_packed_max_size = num_runs + num_runs * bytes_per_run;

// The length of an RLE run of 8
let min_rle_run_size = 1 + bit_util::ceil(bit_width as usize, 8);

// The maximum size if stored as shortest possible RLE runs of 8
let rle_max_size = num_runs * min_rle_run_size;

bit_packed_max_size.max(rle_max_size)
}

/// Encodes `value`, which must be representable with `bit_width` bits.
Expand Down
285 changes: 285 additions & 0 deletions parquet/tests/writer_layout.rs
@@ -0,0 +1,285 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Tests that the ArrowWriter correctly lays out values into multiple pages

use arrow::array::Int32Array;
use arrow::record_batch::RecordBatch;
use bytes::Bytes;
use parquet::arrow::arrow_reader::{ArrowReaderOptions, ParquetRecordBatchReaderBuilder};
use parquet::arrow::ArrowWriter;
use parquet::basic::{Encoding, PageType};
use parquet::file::metadata::ParquetMetaData;
use parquet::file::properties::WriterProperties;
use parquet::file::reader::SerializedPageReader;
use std::sync::Arc;

struct Layout {
row_groups: Vec<RowGroup>,
}

struct RowGroup {
columns: Vec<ColumnChunk>,
}

struct ColumnChunk {
pages: Vec<Page>,
dictionary_page: Option<Page>,
}

struct Page {
rows: usize,
compressed_size: usize,
page_header_size: usize,
encoding: Encoding,
page_type: PageType,
}

struct LayoutTest {
props: WriterProperties,
batches: Vec<RecordBatch>,
layout: Layout,
}

fn do_test(test: LayoutTest) {
let mut buf = Vec::with_capacity(1024);

let mut writer =
ArrowWriter::try_new(&mut buf, test.batches[0].schema(), Some(test.props))
.unwrap();
for batch in test.batches {
writer.write(&batch).unwrap();
}
writer.close().unwrap();
let b = Bytes::from(buf);

// Re-read file to decode column index
let read_options = ArrowReaderOptions::new().with_page_index(true);
let reader =
ParquetRecordBatchReaderBuilder::try_new_with_options(b.clone(), read_options)
.unwrap();

assert_layout(&b, reader.metadata().as_ref(), &test.layout);
}

fn assert_layout(file_reader: &Bytes, meta: &ParquetMetaData, layout: &Layout) {
assert_eq!(meta.row_groups().len(), layout.row_groups.len());
for (row_group, row_group_layout) in meta.row_groups().iter().zip(&layout.row_groups)
{
// Check against offset index
let offset_index = row_group.page_offset_index().as_ref().unwrap();
assert_eq!(offset_index.len(), row_group_layout.columns.len());

for (column_index, column_layout) in
offset_index.iter().zip(&row_group_layout.columns)
{
assert_eq!(column_index.len(), column_layout.pages.len());
for (idx, (page, page_layout)) in
column_index.iter().zip(&column_layout.pages).enumerate()
{
assert_eq!(
page.compressed_page_size as usize,
page_layout.compressed_size + page_layout.page_header_size
);
let next_first_row_index = column_index
.get(idx + 1)
.map(|x| x.first_row_index)
.unwrap_or_else(|| row_group.num_rows());

let num_rows = next_first_row_index - page.first_row_index;
assert_eq!(num_rows as usize, page_layout.rows);
}
}

// Check against page data
assert_eq!(row_group.columns().len(), row_group_layout.columns.len());
for (column, column_layout) in
row_group.columns().iter().zip(&row_group_layout.columns)
{
let page_reader = SerializedPageReader::new(
Arc::new(file_reader.clone()),
column,
row_group.num_rows() as usize,
None,
)
.unwrap();

let pages = page_reader.collect::<Result<Vec<_>, _>>().unwrap();
assert_eq!(
pages.len(),
column_layout.pages.len()
+ column_layout.dictionary_page.is_some() as usize
);

let page_layouts = column_layout
.dictionary_page
.iter()
.chain(&column_layout.pages);

for (page, page_layout) in pages.iter().zip(page_layouts) {
assert_eq!(page.encoding(), page_layout.encoding);
assert_eq!(page.buffer().len(), page_layout.compressed_size);
assert_eq!(page.page_type(), page_layout.page_type);
}
}
}
}

#[test]
fn test_primitive() {
let array = Arc::new(Int32Array::from_iter_values(0..2000)) as _;
let batch = RecordBatch::try_from_iter([("col", array)]).unwrap();
let props = WriterProperties::builder()
.set_dictionary_enabled(false)
.set_data_pagesize_limit(1000)
.set_write_batch_size(10)
.build();

// Test spill plain encoding pages
do_test(LayoutTest {
props,
batches: vec![batch.clone()],
layout: Layout {
row_groups: vec![RowGroup {
columns: vec![ColumnChunk {
pages: (0..8)
.map(|_| Page {
rows: 250,
page_header_size: 34,
compressed_size: 1000,
encoding: Encoding::PLAIN,
page_type: PageType::DATA_PAGE,
})
.collect(),
dictionary_page: None,
}],
}],
},
});

// Test spill dictionary
let props = WriterProperties::builder()
.set_dictionary_enabled(true)
.set_dictionary_pagesize_limit(1000)
.set_data_pagesize_limit(10000)
.set_write_batch_size(10)
.build();

do_test(LayoutTest {
props,
batches: vec![batch.clone()],
layout: Layout {
row_groups: vec![RowGroup {
columns: vec![ColumnChunk {
pages: vec![
Page {
rows: 250,
page_header_size: 34,
compressed_size: 258,
encoding: Encoding::RLE_DICTIONARY,
page_type: PageType::DATA_PAGE,
},
Page {
rows: 1750,
page_header_size: 34,
compressed_size: 7000,
encoding: Encoding::PLAIN,
page_type: PageType::DATA_PAGE,
},
],
dictionary_page: Some(Page {
rows: 250,
page_header_size: 34,
compressed_size: 1000,
encoding: Encoding::PLAIN,
page_type: PageType::DICTIONARY_PAGE,
}),
}],
}],
},
});

// Test spill dictionary encoded pages
let props = WriterProperties::builder()
.set_dictionary_enabled(true)
.set_dictionary_pagesize_limit(10000)
.set_data_pagesize_limit(500)
.set_write_batch_size(10)
.build();

do_test(LayoutTest {
props,
batches: vec![batch],
layout: Layout {
row_groups: vec![RowGroup {
columns: vec![ColumnChunk {
pages: vec![
Page {
rows: 400,
page_header_size: 34,
compressed_size: 452,
encoding: Encoding::RLE_DICTIONARY,
page_type: PageType::DATA_PAGE,
},
Page {
rows: 370,
page_header_size: 34,
compressed_size: 472,
encoding: Encoding::RLE_DICTIONARY,
page_type: PageType::DATA_PAGE,
},
Page {
rows: 330,
page_header_size: 34,
compressed_size: 464,
encoding: Encoding::RLE_DICTIONARY,
page_type: PageType::DATA_PAGE,
},
Page {
rows: 330,
page_header_size: 34,
compressed_size: 464,
encoding: Encoding::RLE_DICTIONARY,
page_type: PageType::DATA_PAGE,
},
Page {
rows: 330,
page_header_size: 34,
compressed_size: 464,
encoding: Encoding::RLE_DICTIONARY,
page_type: PageType::DATA_PAGE,
},
Page {
rows: 240,
page_header_size: 34,
compressed_size: 332,
encoding: Encoding::RLE_DICTIONARY,
page_type: PageType::DATA_PAGE,
},
],
dictionary_page: Some(Page {
rows: 1000,
page_header_size: 34,
compressed_size: 8000,
encoding: Encoding::PLAIN,
page_type: PageType::DICTIONARY_PAGE,
}),
}],
}],
},
});
}

0 comments on commit 5ee43de

Please sign in to comment.