Skip to content

Commit

Permalink
Automatically grow parquet BitWriter (apache#2226)
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed Jul 29, 2022
1 parent 393f006 commit f124959
Show file tree
Hide file tree
Showing 8 changed files with 148 additions and 362 deletions.
8 changes: 3 additions & 5 deletions parquet/src/column/writer/mod.rs
Expand Up @@ -27,7 +27,7 @@ use crate::column::writer::encoder::{
use crate::compression::{create_codec, Codec};
use crate::data_type::private::ParquetValueType;
use crate::data_type::*;
use crate::encodings::levels::{max_buffer_size, LevelEncoder};
use crate::encodings::levels::LevelEncoder;
use crate::errors::{ParquetError, Result};
use crate::file::metadata::{ColumnIndexBuilder, OffsetIndexBuilder};
use crate::file::properties::EnabledStatistics;
Expand Down Expand Up @@ -782,8 +782,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
levels: &[i16],
max_level: i16,
) -> Result<Vec<u8>> {
let size = max_buffer_size(encoding, max_level, levels.len());
let mut encoder = LevelEncoder::v1(encoding, max_level, vec![0; size]);
let mut encoder = LevelEncoder::v1(encoding, max_level, levels.len());
encoder.put(levels)?;
encoder.consume()
}
Expand All @@ -792,8 +791,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
/// Encoding is always RLE.
#[inline]
fn encode_levels_v2(&self, levels: &[i16], max_level: i16) -> Result<Vec<u8>> {
let size = max_buffer_size(Encoding::RLE, max_level, levels.len());
let mut encoder = LevelEncoder::v2(max_level, vec![0; size]);
let mut encoder = LevelEncoder::v2(max_level, levels.len());
encoder.put(levels)?;
encoder.consume()
}
Expand Down
16 changes: 2 additions & 14 deletions parquet/src/data_type.rs
Expand Up @@ -565,7 +565,7 @@ impl AsBytes for str {

pub(crate) mod private {
use crate::encodings::decoding::PlainDecoderDetails;
use crate::util::bit_util::{round_upto_power_of_2, BitReader, BitWriter};
use crate::util::bit_util::{BitReader, BitWriter};
use crate::util::memory::ByteBufferPtr;

use crate::basic::Type;
Expand Down Expand Up @@ -658,20 +658,8 @@ pub(crate) mod private {
_: &mut W,
bit_writer: &mut BitWriter,
) -> Result<()> {
if bit_writer.bytes_written() + values.len() / 8 >= bit_writer.capacity() {
let bits_available =
(bit_writer.capacity() - bit_writer.bytes_written()) * 8;
let bits_needed = values.len() - bits_available;
let bytes_needed = (bits_needed + 7) / 8;
let bytes_needed = round_upto_power_of_2(bytes_needed, 256);
bit_writer.extend(bytes_needed);
}
for value in values {
if !bit_writer.put_value(*value as u64, 1) {
return Err(ParquetError::EOF(
"unable to put boolean value".to_string(),
));
}
bit_writer.put_value(*value as u64, 1)
}
Ok(())
}
Expand Down
10 changes: 4 additions & 6 deletions parquet/src/encodings/encoding/dict_encoder.rs
Expand Up @@ -20,15 +20,14 @@

use crate::basic::{Encoding, Type};
use crate::data_type::private::ParquetValueType;
use crate::data_type::{AsBytes, DataType};
use crate::data_type::DataType;
use crate::encodings::encoding::{Encoder, PlainEncoder};
use crate::encodings::rle::RleEncoder;
use crate::errors::{ParquetError, Result};
use crate::schema::types::ColumnDescPtr;
use crate::util::bit_util::num_required_bits;
use crate::util::interner::{Interner, Storage};
use crate::util::memory::ByteBufferPtr;
use std::io::Write;

#[derive(Debug)]
struct KeyStorage<T: DataType> {
Expand Down Expand Up @@ -127,12 +126,11 @@ impl<T: DataType> DictEncoder<T> {
/// the result.
pub fn write_indices(&mut self) -> Result<ByteBufferPtr> {
let buffer_len = self.estimated_data_encoded_size();
let mut buffer = vec![0; buffer_len];
buffer[0] = self.bit_width() as u8;
let mut buffer = Vec::with_capacity(buffer_len);
buffer.push(self.bit_width() as u8);

// Write bit width in the first byte
buffer.write_all((self.bit_width() as u8).as_bytes())?;
let mut encoder = RleEncoder::new_from_buf(self.bit_width(), buffer, 1);
let mut encoder = RleEncoder::new_from_buf(self.bit_width(), buffer);
for index in &self.indices {
if !encoder.put(*index as u64)? {
return Err(general_err!("Encoder doesn't have enough space"));
Expand Down
42 changes: 19 additions & 23 deletions parquet/src/encodings/encoding/mod.rs
Expand Up @@ -186,10 +186,13 @@ impl<T: DataType> Encoder<T> for RleValueEncoder<T> {
fn put(&mut self, values: &[T::T]) -> Result<()> {
ensure_phys_ty!(Type::BOOLEAN, "RleValueEncoder only supports BoolType");

if self.encoder.is_none() {
self.encoder = Some(RleEncoder::new(1, DEFAULT_RLE_BUFFER_LEN));
}
let rle_encoder = self.encoder.as_mut().unwrap();
let rle_encoder = self.encoder.get_or_insert_with(|| {
let mut buffer = Vec::with_capacity(DEFAULT_RLE_BUFFER_LEN);
// Reserve space for length
buffer.extend_from_slice(&[0; 4]);
RleEncoder::new_from_buf(1, buffer)
});

for value in values {
let value = value.as_u64()?;
if !rle_encoder.put(value)? {
Expand Down Expand Up @@ -220,25 +223,18 @@ impl<T: DataType> Encoder<T> for RleValueEncoder<T> {
ensure_phys_ty!(Type::BOOLEAN, "RleValueEncoder only supports BoolType");
let rle_encoder = self
.encoder
.as_mut()
.take()
.expect("RLE value encoder is not initialized");

// Flush all encoder buffers and raw values
let encoded_data = {
let buf = rle_encoder.flush_buffer()?;

// Note that buf does not have any offset, all data is encoded bytes
let len = (buf.len() as i32).to_le();
let len_bytes = len.as_bytes();
let mut encoded_data = vec![];
encoded_data.extend_from_slice(len_bytes);
encoded_data.extend_from_slice(buf);
encoded_data
};
// Reset rle encoder for the next batch
rle_encoder.clear();
let mut buf = rle_encoder.consume()?;
assert!(buf.len() > 4, "should have had padding inserted");

// Note that buf does not have any offset, all data is encoded bytes
let len = (buf.len() - 4) as i32;
buf[..4].copy_from_slice(&len.to_le_bytes());

Ok(ByteBufferPtr::new(encoded_data))
Ok(ByteBufferPtr::new(buf))
}
}

Expand Down Expand Up @@ -293,7 +289,7 @@ impl<T: DataType> DeltaBitPackEncoder<T> {
let block_size = DEFAULT_BLOCK_SIZE;
let num_mini_blocks = DEFAULT_NUM_MINI_BLOCKS;
let mini_block_size = block_size / num_mini_blocks;
assert!(mini_block_size % 8 == 0);
assert_eq!(mini_block_size % 8, 0);
Self::assert_supported_type();

DeltaBitPackEncoder {
Expand Down Expand Up @@ -346,7 +342,7 @@ impl<T: DataType> DeltaBitPackEncoder<T> {
self.bit_writer.put_zigzag_vlq_int(min_delta);

// Slice to store bit width for each mini block
let offset = self.bit_writer.skip(self.num_mini_blocks)?;
let offset = self.bit_writer.skip(self.num_mini_blocks);

for i in 0..self.num_mini_blocks {
// Find how many values we need to encode - either block size or whatever
Expand All @@ -364,7 +360,7 @@ impl<T: DataType> DeltaBitPackEncoder<T> {
}

// Compute the max delta in current mini block
let mut max_delta = i64::min_value();
let mut max_delta = i64::MIN;
for j in 0..n {
max_delta =
cmp::max(max_delta, self.deltas[i * self.mini_block_size + j]);
Expand Down Expand Up @@ -873,7 +869,7 @@ mod tests {
let mut values = vec![];
values.extend_from_slice(&[true; 16]);
values.extend_from_slice(&[false; 16]);
run_test::<BoolType>(Encoding::RLE, -1, &values, 0, 2, 0);
run_test::<BoolType>(Encoding::RLE, -1, &values, 0, 6, 0);

// DELTA_LENGTH_BYTE_ARRAY
run_test::<ByteArrayType>(
Expand Down
79 changes: 19 additions & 60 deletions parquet/src/encodings/levels.rs
Expand Up @@ -65,32 +65,32 @@ impl LevelEncoder {
/// Used to encode levels for Data Page v1.
///
/// Panics, if encoding is not supported.
pub fn v1(encoding: Encoding, max_level: i16, byte_buffer: Vec<u8>) -> Self {
pub fn v1(encoding: Encoding, max_level: i16, capacity: usize) -> Self {
let capacity_bytes = max_buffer_size(encoding, max_level, capacity);
let mut buffer = Vec::with_capacity(capacity_bytes);
let bit_width = num_required_bits(max_level as u64);
match encoding {
Encoding::RLE => LevelEncoder::Rle(RleEncoder::new_from_buf(
bit_width,
byte_buffer,
mem::size_of::<i32>(),
)),
Encoding::RLE => {
buffer.extend_from_slice(&[0; 8]);
LevelEncoder::Rle(RleEncoder::new_from_buf(bit_width, buffer))
}
Encoding::BIT_PACKED => {
// Here we set full byte buffer without adjusting for num_buffered_values,
// because byte buffer will already be allocated with size from
// `max_buffer_size()` method.
LevelEncoder::BitPacked(
bit_width,
BitWriter::new_from_buf(byte_buffer, 0),
)
LevelEncoder::BitPacked(bit_width, BitWriter::new_from_buf(buffer))
}
_ => panic!("Unsupported encoding type {}", encoding),
}
}

/// Creates new level encoder based on RLE encoding. Used to encode Data Page v2
/// repetition and definition levels.
pub fn v2(max_level: i16, byte_buffer: Vec<u8>) -> Self {
pub fn v2(max_level: i16, capacity: usize) -> Self {
let capacity_bytes = max_buffer_size(Encoding::RLE, max_level, capacity);
let buffer = Vec::with_capacity(capacity_bytes);
let bit_width = num_required_bits(max_level as u64);
LevelEncoder::RleV2(RleEncoder::new_from_buf(bit_width, byte_buffer, 0))
LevelEncoder::RleV2(RleEncoder::new_from_buf(bit_width, buffer))
}

/// Put/encode levels vector into this level encoder.
Expand All @@ -114,9 +114,7 @@ impl LevelEncoder {
}
LevelEncoder::BitPacked(bit_width, ref mut encoder) => {
for value in buffer {
if !encoder.put_value(*value as u64, bit_width as usize) {
return Err(general_err!("Not enough bytes left"));
}
encoder.put_value(*value as u64, bit_width as usize);
num_encoded += 1;
}
encoder.flush();
Expand Down Expand Up @@ -283,11 +281,10 @@ mod tests {
use crate::util::test_common::random_numbers_range;

fn test_internal_roundtrip(enc: Encoding, levels: &[i16], max_level: i16, v2: bool) {
let size = max_buffer_size(enc, max_level, levels.len());
let mut encoder = if v2 {
LevelEncoder::v2(max_level, vec![0; size])
LevelEncoder::v2(max_level, levels.len())
} else {
LevelEncoder::v1(enc, max_level, vec![0; size])
LevelEncoder::v1(enc, max_level, levels.len())
};
encoder.put(levels).expect("put() should be OK");
let encoded_levels = encoder.consume().expect("consume() should be OK");
Expand Down Expand Up @@ -315,11 +312,10 @@ mod tests {
max_level: i16,
v2: bool,
) {
let size = max_buffer_size(enc, max_level, levels.len());
let mut encoder = if v2 {
LevelEncoder::v2(max_level, vec![0; size])
LevelEncoder::v2(max_level, levels.len())
} else {
LevelEncoder::v1(enc, max_level, vec![0; size])
LevelEncoder::v1(enc, max_level, levels.len())
};
encoder.put(levels).expect("put() should be OK");
let encoded_levels = encoder.consume().expect("consume() should be OK");
Expand Down Expand Up @@ -363,11 +359,10 @@ mod tests {
max_level: i16,
v2: bool,
) {
let size = max_buffer_size(enc, max_level, levels.len());
let mut encoder = if v2 {
LevelEncoder::v2(max_level, vec![0; size])
LevelEncoder::v2(max_level, levels.len())
} else {
LevelEncoder::v1(enc, max_level, vec![0; size])
LevelEncoder::v1(enc, max_level, levels.len())
};
// Encode only one value
let num_encoded = encoder.put(&levels[0..1]).expect("put() should be OK");
Expand All @@ -391,33 +386,6 @@ mod tests {
assert_eq!(buffer[0..num_decoded], levels[0..num_decoded]);
}

// Tests when encoded values are larger than encoder's buffer
fn test_internal_roundtrip_overflow(
enc: Encoding,
levels: &[i16],
max_level: i16,
v2: bool,
) {
let size = max_buffer_size(enc, max_level, levels.len());
let mut encoder = if v2 {
LevelEncoder::v2(max_level, vec![0; size])
} else {
LevelEncoder::v1(enc, max_level, vec![0; size])
};
let mut found_err = false;
// Insert a large number of values, so we run out of space
for _ in 0..100 {
if let Err(err) = encoder.put(levels) {
assert!(format!("{}", err).contains("Not enough bytes left"));
found_err = true;
break;
};
}
if !found_err {
panic!("Failed test: no buffer overflow");
}
}

#[test]
fn test_roundtrip_one() {
let levels = vec![0, 1, 1, 1, 1, 0, 0, 0, 0, 1];
Expand Down Expand Up @@ -484,15 +452,6 @@ mod tests {
test_internal_roundtrip_underflow(Encoding::RLE, &levels, max_level, true);
}

#[test]
fn test_roundtrip_overflow() {
let levels = vec![1, 1, 2, 3, 2, 1, 1, 2, 3, 1];
let max_level = 3;
test_internal_roundtrip_overflow(Encoding::RLE, &levels, max_level, false);
test_internal_roundtrip_overflow(Encoding::BIT_PACKED, &levels, max_level, false);
test_internal_roundtrip_overflow(Encoding::RLE, &levels, max_level, true);
}

#[test]
fn test_rle_decoder_set_data_range() {
// Buffer containing both repetition and definition levels
Expand Down

0 comments on commit f124959

Please sign in to comment.