Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automatically grow parquet BitWriter (#2226) (~10% faster) #2231

Merged
merged 2 commits into from Jul 31, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 3 additions & 5 deletions parquet/src/column/writer/mod.rs
Expand Up @@ -27,7 +27,7 @@ use crate::column::writer::encoder::{
use crate::compression::{create_codec, Codec};
use crate::data_type::private::ParquetValueType;
use crate::data_type::*;
use crate::encodings::levels::{max_buffer_size, LevelEncoder};
use crate::encodings::levels::LevelEncoder;
use crate::errors::{ParquetError, Result};
use crate::file::metadata::{ColumnIndexBuilder, OffsetIndexBuilder};
use crate::file::properties::EnabledStatistics;
Expand Down Expand Up @@ -782,8 +782,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
levels: &[i16],
max_level: i16,
) -> Result<Vec<u8>> {
let size = max_buffer_size(encoding, max_level, levels.len());
let mut encoder = LevelEncoder::v1(encoding, max_level, vec![0; size]);
let mut encoder = LevelEncoder::v1(encoding, max_level, levels.len());
encoder.put(levels)?;
encoder.consume()
}
Expand All @@ -792,8 +791,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
/// Encoding is always RLE.
#[inline]
fn encode_levels_v2(&self, levels: &[i16], max_level: i16) -> Result<Vec<u8>> {
let size = max_buffer_size(Encoding::RLE, max_level, levels.len());
let mut encoder = LevelEncoder::v2(max_level, vec![0; size]);
let mut encoder = LevelEncoder::v2(max_level, levels.len());
encoder.put(levels)?;
encoder.consume()
}
Expand Down
16 changes: 2 additions & 14 deletions parquet/src/data_type.rs
Expand Up @@ -565,7 +565,7 @@ impl AsBytes for str {

pub(crate) mod private {
use crate::encodings::decoding::PlainDecoderDetails;
use crate::util::bit_util::{round_upto_power_of_2, BitReader, BitWriter};
use crate::util::bit_util::{BitReader, BitWriter};
use crate::util::memory::ByteBufferPtr;

use crate::basic::Type;
Expand Down Expand Up @@ -658,20 +658,8 @@ pub(crate) mod private {
_: &mut W,
bit_writer: &mut BitWriter,
) -> Result<()> {
if bit_writer.bytes_written() + values.len() / 8 >= bit_writer.capacity() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a little bit funky, bool::encode would actually increase the BitWriter capacity instead of erroring 😅 This is now done automatically everywhere

let bits_available =
(bit_writer.capacity() - bit_writer.bytes_written()) * 8;
let bits_needed = values.len() - bits_available;
let bytes_needed = (bits_needed + 7) / 8;
let bytes_needed = round_upto_power_of_2(bytes_needed, 256);
bit_writer.extend(bytes_needed);
}
for value in values {
if !bit_writer.put_value(*value as u64, 1) {
return Err(ParquetError::EOF(
"unable to put boolean value".to_string(),
));
}
bit_writer.put_value(*value as u64, 1)
}
Ok(())
}
Expand Down
10 changes: 4 additions & 6 deletions parquet/src/encodings/encoding/dict_encoder.rs
Expand Up @@ -20,15 +20,14 @@

use crate::basic::{Encoding, Type};
use crate::data_type::private::ParquetValueType;
use crate::data_type::{AsBytes, DataType};
use crate::data_type::DataType;
use crate::encodings::encoding::{Encoder, PlainEncoder};
use crate::encodings::rle::RleEncoder;
use crate::errors::{ParquetError, Result};
use crate::schema::types::ColumnDescPtr;
use crate::util::bit_util::num_required_bits;
use crate::util::interner::{Interner, Storage};
use crate::util::memory::ByteBufferPtr;
use std::io::Write;

#[derive(Debug)]
struct KeyStorage<T: DataType> {
Expand Down Expand Up @@ -127,12 +126,11 @@ impl<T: DataType> DictEncoder<T> {
/// the result.
pub fn write_indices(&mut self) -> Result<ByteBufferPtr> {
let buffer_len = self.estimated_data_encoded_size();
let mut buffer = vec![0; buffer_len];
buffer[0] = self.bit_width() as u8;
let mut buffer = Vec::with_capacity(buffer_len);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

buffer.push(self.bit_width() as u8);

// Write bit width in the first byte
buffer.write_all((self.bit_width() as u8).as_bytes())?;
let mut encoder = RleEncoder::new_from_buf(self.bit_width(), buffer, 1);
let mut encoder = RleEncoder::new_from_buf(self.bit_width(), buffer);
for index in &self.indices {
if !encoder.put(*index as u64)? {
return Err(general_err!("Encoder doesn't have enough space"));
Expand Down
42 changes: 19 additions & 23 deletions parquet/src/encodings/encoding/mod.rs
Expand Up @@ -186,10 +186,13 @@ impl<T: DataType> Encoder<T> for RleValueEncoder<T> {
fn put(&mut self, values: &[T::T]) -> Result<()> {
ensure_phys_ty!(Type::BOOLEAN, "RleValueEncoder only supports BoolType");

if self.encoder.is_none() {
self.encoder = Some(RleEncoder::new(1, DEFAULT_RLE_BUFFER_LEN));
}
let rle_encoder = self.encoder.as_mut().unwrap();
let rle_encoder = self.encoder.get_or_insert_with(|| {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let mut buffer = Vec::with_capacity(DEFAULT_RLE_BUFFER_LEN);
// Reserve space for length
buffer.extend_from_slice(&[0; 4]);
RleEncoder::new_from_buf(1, buffer)
});

for value in values {
let value = value.as_u64()?;
if !rle_encoder.put(value)? {
Expand Down Expand Up @@ -220,25 +223,18 @@ impl<T: DataType> Encoder<T> for RleValueEncoder<T> {
ensure_phys_ty!(Type::BOOLEAN, "RleValueEncoder only supports BoolType");
let rle_encoder = self
.encoder
.as_mut()
.take()
.expect("RLE value encoder is not initialized");

// Flush all encoder buffers and raw values
let encoded_data = {
let buf = rle_encoder.flush_buffer()?;

// Note that buf does not have any offset, all data is encoded bytes
let len = (buf.len() as i32).to_le();
let len_bytes = len.as_bytes();
let mut encoded_data = vec![];
encoded_data.extend_from_slice(len_bytes);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this part of the speed increase -- avoid copying encoded bytes and instead reserve the space for the length up front and updated it at the end?

Copy link
Contributor Author

@tustvold tustvold Jul 31, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potentially, I suspect it might have contributed

Edit: This is actually only used for encoding booleans, which I don't think the benchmarks actually cover

encoded_data.extend_from_slice(buf);
encoded_data
};
// Reset rle encoder for the next batch
rle_encoder.clear();
let mut buf = rle_encoder.consume()?;
assert!(buf.len() > 4, "should have had padding inserted");

// Note that buf does not have any offset, all data is encoded bytes
let len = (buf.len() - 4) as i32;
buf[..4].copy_from_slice(&len.to_le_bytes());

Ok(ByteBufferPtr::new(encoded_data))
Ok(ByteBufferPtr::new(buf))
}
}

Expand Down Expand Up @@ -293,7 +289,7 @@ impl<T: DataType> DeltaBitPackEncoder<T> {
let block_size = DEFAULT_BLOCK_SIZE;
let num_mini_blocks = DEFAULT_NUM_MINI_BLOCKS;
let mini_block_size = block_size / num_mini_blocks;
assert!(mini_block_size % 8 == 0);
assert_eq!(mini_block_size % 8, 0);
Self::assert_supported_type();

DeltaBitPackEncoder {
Expand Down Expand Up @@ -346,7 +342,7 @@ impl<T: DataType> DeltaBitPackEncoder<T> {
self.bit_writer.put_zigzag_vlq_int(min_delta);

// Slice to store bit width for each mini block
let offset = self.bit_writer.skip(self.num_mini_blocks)?;
let offset = self.bit_writer.skip(self.num_mini_blocks);

for i in 0..self.num_mini_blocks {
// Find how many values we need to encode - either block size or whatever
Expand All @@ -364,7 +360,7 @@ impl<T: DataType> DeltaBitPackEncoder<T> {
}

// Compute the max delta in current mini block
let mut max_delta = i64::min_value();
let mut max_delta = i64::MIN;
for j in 0..n {
max_delta =
cmp::max(max_delta, self.deltas[i * self.mini_block_size + j]);
Expand Down Expand Up @@ -873,7 +869,7 @@ mod tests {
let mut values = vec![];
values.extend_from_slice(&[true; 16]);
values.extend_from_slice(&[false; 16]);
run_test::<BoolType>(Encoding::RLE, -1, &values, 0, 2, 0);
run_test::<BoolType>(Encoding::RLE, -1, &values, 0, 6, 0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why did this change from 2 to 6?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because we reserve space for the length up front now


// DELTA_LENGTH_BYTE_ARRAY
run_test::<ByteArrayType>(
Expand Down
79 changes: 19 additions & 60 deletions parquet/src/encodings/levels.rs
Expand Up @@ -65,32 +65,32 @@ impl LevelEncoder {
/// Used to encode levels for Data Page v1.
///
/// Panics, if encoding is not supported.
pub fn v1(encoding: Encoding, max_level: i16, byte_buffer: Vec<u8>) -> Self {
pub fn v1(encoding: Encoding, max_level: i16, capacity: usize) -> Self {
let capacity_bytes = max_buffer_size(encoding, max_level, capacity);
let mut buffer = Vec::with_capacity(capacity_bytes);
let bit_width = num_required_bits(max_level as u64);
match encoding {
Encoding::RLE => LevelEncoder::Rle(RleEncoder::new_from_buf(
bit_width,
byte_buffer,
mem::size_of::<i32>(),
)),
Encoding::RLE => {
buffer.extend_from_slice(&[0; 8]);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
buffer.extend_from_slice(&[0; 8]);
// reserve space for length
buffer.extend_from_slice(&[0; 8]);

Is that what this initial 0 is for?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, although it occurs to me that this allocated 8 bytes instead of 4... Something isn't right here 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this was working because the extra 4 zero bytes would be interpreted as empty RLE runs, I will fix this and add a test that would have caught it

LevelEncoder::Rle(RleEncoder::new_from_buf(bit_width, buffer))
}
Encoding::BIT_PACKED => {
// Here we set full byte buffer without adjusting for num_buffered_values,
// because byte buffer will already be allocated with size from
// `max_buffer_size()` method.
LevelEncoder::BitPacked(
bit_width,
BitWriter::new_from_buf(byte_buffer, 0),
)
LevelEncoder::BitPacked(bit_width, BitWriter::new_from_buf(buffer))
}
_ => panic!("Unsupported encoding type {}", encoding),
}
}

/// Creates new level encoder based on RLE encoding. Used to encode Data Page v2
/// repetition and definition levels.
pub fn v2(max_level: i16, byte_buffer: Vec<u8>) -> Self {
pub fn v2(max_level: i16, capacity: usize) -> Self {
let capacity_bytes = max_buffer_size(Encoding::RLE, max_level, capacity);
let buffer = Vec::with_capacity(capacity_bytes);
let bit_width = num_required_bits(max_level as u64);
LevelEncoder::RleV2(RleEncoder::new_from_buf(bit_width, byte_buffer, 0))
LevelEncoder::RleV2(RleEncoder::new_from_buf(bit_width, buffer))
}

/// Put/encode levels vector into this level encoder.
Expand All @@ -114,9 +114,7 @@ impl LevelEncoder {
}
LevelEncoder::BitPacked(bit_width, ref mut encoder) => {
for value in buffer {
if !encoder.put_value(*value as u64, bit_width as usize) {
return Err(general_err!("Not enough bytes left"));
}
encoder.put_value(*value as u64, bit_width as usize);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 I wonder if removing an error check off the hot path also increases performance

num_encoded += 1;
}
encoder.flush();
Expand Down Expand Up @@ -283,11 +281,10 @@ mod tests {
use crate::util::test_common::random_numbers_range;

fn test_internal_roundtrip(enc: Encoding, levels: &[i16], max_level: i16, v2: bool) {
let size = max_buffer_size(enc, max_level, levels.len());
let mut encoder = if v2 {
LevelEncoder::v2(max_level, vec![0; size])
LevelEncoder::v2(max_level, levels.len())
} else {
LevelEncoder::v1(enc, max_level, vec![0; size])
LevelEncoder::v1(enc, max_level, levels.len())
};
encoder.put(levels).expect("put() should be OK");
let encoded_levels = encoder.consume().expect("consume() should be OK");
Expand Down Expand Up @@ -315,11 +312,10 @@ mod tests {
max_level: i16,
v2: bool,
) {
let size = max_buffer_size(enc, max_level, levels.len());
let mut encoder = if v2 {
LevelEncoder::v2(max_level, vec![0; size])
LevelEncoder::v2(max_level, levels.len())
} else {
LevelEncoder::v1(enc, max_level, vec![0; size])
LevelEncoder::v1(enc, max_level, levels.len())
};
encoder.put(levels).expect("put() should be OK");
let encoded_levels = encoder.consume().expect("consume() should be OK");
Expand Down Expand Up @@ -363,11 +359,10 @@ mod tests {
max_level: i16,
v2: bool,
) {
let size = max_buffer_size(enc, max_level, levels.len());
let mut encoder = if v2 {
LevelEncoder::v2(max_level, vec![0; size])
LevelEncoder::v2(max_level, levels.len())
} else {
LevelEncoder::v1(enc, max_level, vec![0; size])
LevelEncoder::v1(enc, max_level, levels.len())
};
// Encode only one value
let num_encoded = encoder.put(&levels[0..1]).expect("put() should be OK");
Expand All @@ -391,33 +386,6 @@ mod tests {
assert_eq!(buffer[0..num_decoded], levels[0..num_decoded]);
}

// Tests when encoded values are larger than encoder's buffer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are these tests removed? Is it because there is no way to "run out of space" anymore?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct

fn test_internal_roundtrip_overflow(
enc: Encoding,
levels: &[i16],
max_level: i16,
v2: bool,
) {
let size = max_buffer_size(enc, max_level, levels.len());
let mut encoder = if v2 {
LevelEncoder::v2(max_level, vec![0; size])
} else {
LevelEncoder::v1(enc, max_level, vec![0; size])
};
let mut found_err = false;
// Insert a large number of values, so we run out of space
for _ in 0..100 {
if let Err(err) = encoder.put(levels) {
assert!(format!("{}", err).contains("Not enough bytes left"));
found_err = true;
break;
};
}
if !found_err {
panic!("Failed test: no buffer overflow");
}
}

#[test]
fn test_roundtrip_one() {
let levels = vec![0, 1, 1, 1, 1, 0, 0, 0, 0, 1];
Expand Down Expand Up @@ -484,15 +452,6 @@ mod tests {
test_internal_roundtrip_underflow(Encoding::RLE, &levels, max_level, true);
}

#[test]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test no longer makes sense, as the Vec can just grow

fn test_roundtrip_overflow() {
let levels = vec![1, 1, 2, 3, 2, 1, 1, 2, 3, 1];
let max_level = 3;
test_internal_roundtrip_overflow(Encoding::RLE, &levels, max_level, false);
test_internal_roundtrip_overflow(Encoding::BIT_PACKED, &levels, max_level, false);
test_internal_roundtrip_overflow(Encoding::RLE, &levels, max_level, true);
}

#[test]
fn test_rle_decoder_set_data_range() {
// Buffer containing both repetition and definition levels
Expand Down