From 2383ede4d87110261b5e48342677f608b83a45e1 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Fri, 29 Jul 2022 22:14:52 +0100 Subject: [PATCH] Automatically grow parquet BitWriter (#2226) --- parquet/src/column/writer/mod.rs | 8 +- parquet/src/data_type.rs | 16 +- .../src/encodings/encoding/dict_encoder.rs | 10 +- parquet/src/encodings/encoding/mod.rs | 42 ++- parquet/src/encodings/levels.rs | 79 ++---- parquet/src/encodings/rle.rs | 72 +++-- parquet/src/util/bit_util.rs | 255 +++++------------- parquet/src/util/test_common/page_util.rs | 6 +- 8 files changed, 148 insertions(+), 340 deletions(-) diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs index 9a371bc2710..2d555611110 100644 --- a/parquet/src/column/writer/mod.rs +++ b/parquet/src/column/writer/mod.rs @@ -27,7 +27,7 @@ use crate::column::writer::encoder::{ use crate::compression::{create_codec, Codec}; use crate::data_type::private::ParquetValueType; use crate::data_type::*; -use crate::encodings::levels::{max_buffer_size, LevelEncoder}; +use crate::encodings::levels::LevelEncoder; use crate::errors::{ParquetError, Result}; use crate::file::metadata::{ColumnIndexBuilder, OffsetIndexBuilder}; use crate::file::properties::EnabledStatistics; @@ -782,8 +782,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { levels: &[i16], max_level: i16, ) -> Result> { - let size = max_buffer_size(encoding, max_level, levels.len()); - let mut encoder = LevelEncoder::v1(encoding, max_level, vec![0; size]); + let mut encoder = LevelEncoder::v1(encoding, max_level, levels.len()); encoder.put(levels)?; encoder.consume() } @@ -792,8 +791,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { /// Encoding is always RLE. #[inline] fn encode_levels_v2(&self, levels: &[i16], max_level: i16) -> Result> { - let size = max_buffer_size(Encoding::RLE, max_level, levels.len()); - let mut encoder = LevelEncoder::v2(max_level, vec![0; size]); + let mut encoder = LevelEncoder::v2(max_level, levels.len()); encoder.put(levels)?; encoder.consume() } diff --git a/parquet/src/data_type.rs b/parquet/src/data_type.rs index 1d0b5b231c6..43c9a4238a7 100644 --- a/parquet/src/data_type.rs +++ b/parquet/src/data_type.rs @@ -565,7 +565,7 @@ impl AsBytes for str { pub(crate) mod private { use crate::encodings::decoding::PlainDecoderDetails; - use crate::util::bit_util::{round_upto_power_of_2, BitReader, BitWriter}; + use crate::util::bit_util::{BitReader, BitWriter}; use crate::util::memory::ByteBufferPtr; use crate::basic::Type; @@ -658,20 +658,8 @@ pub(crate) mod private { _: &mut W, bit_writer: &mut BitWriter, ) -> Result<()> { - if bit_writer.bytes_written() + values.len() / 8 >= bit_writer.capacity() { - let bits_available = - (bit_writer.capacity() - bit_writer.bytes_written()) * 8; - let bits_needed = values.len() - bits_available; - let bytes_needed = (bits_needed + 7) / 8; - let bytes_needed = round_upto_power_of_2(bytes_needed, 256); - bit_writer.extend(bytes_needed); - } for value in values { - if !bit_writer.put_value(*value as u64, 1) { - return Err(ParquetError::EOF( - "unable to put boolean value".to_string(), - )); - } + bit_writer.put_value(*value as u64, 1) } Ok(()) } diff --git a/parquet/src/encodings/encoding/dict_encoder.rs b/parquet/src/encodings/encoding/dict_encoder.rs index 7bf98325466..1b386448bb8 100644 --- a/parquet/src/encodings/encoding/dict_encoder.rs +++ b/parquet/src/encodings/encoding/dict_encoder.rs @@ -20,7 +20,7 @@ use crate::basic::{Encoding, Type}; use crate::data_type::private::ParquetValueType; -use crate::data_type::{AsBytes, DataType}; +use crate::data_type::DataType; use crate::encodings::encoding::{Encoder, PlainEncoder}; use crate::encodings::rle::RleEncoder; use crate::errors::{ParquetError, Result}; @@ -28,7 +28,6 @@ use crate::schema::types::ColumnDescPtr; use crate::util::bit_util::num_required_bits; use crate::util::interner::{Interner, Storage}; use crate::util::memory::ByteBufferPtr; -use std::io::Write; #[derive(Debug)] struct KeyStorage { @@ -127,12 +126,11 @@ impl DictEncoder { /// the result. pub fn write_indices(&mut self) -> Result { let buffer_len = self.estimated_data_encoded_size(); - let mut buffer = vec![0; buffer_len]; - buffer[0] = self.bit_width() as u8; + let mut buffer = Vec::with_capacity(buffer_len); + buffer.push(self.bit_width() as u8); // Write bit width in the first byte - buffer.write_all((self.bit_width() as u8).as_bytes())?; - let mut encoder = RleEncoder::new_from_buf(self.bit_width(), buffer, 1); + let mut encoder = RleEncoder::new_from_buf(self.bit_width(), buffer); for index in &self.indices { if !encoder.put(*index as u64)? { return Err(general_err!("Encoder doesn't have enough space")); diff --git a/parquet/src/encodings/encoding/mod.rs b/parquet/src/encodings/encoding/mod.rs index 5cb94b7c0ae..383211f128c 100644 --- a/parquet/src/encodings/encoding/mod.rs +++ b/parquet/src/encodings/encoding/mod.rs @@ -186,10 +186,13 @@ impl Encoder for RleValueEncoder { fn put(&mut self, values: &[T::T]) -> Result<()> { ensure_phys_ty!(Type::BOOLEAN, "RleValueEncoder only supports BoolType"); - if self.encoder.is_none() { - self.encoder = Some(RleEncoder::new(1, DEFAULT_RLE_BUFFER_LEN)); - } - let rle_encoder = self.encoder.as_mut().unwrap(); + let rle_encoder = self.encoder.get_or_insert_with(|| { + let mut buffer = Vec::with_capacity(DEFAULT_RLE_BUFFER_LEN); + // Reserve space for length + buffer.extend_from_slice(&[0; 4]); + RleEncoder::new_from_buf(1, buffer) + }); + for value in values { let value = value.as_u64()?; if !rle_encoder.put(value)? { @@ -220,25 +223,18 @@ impl Encoder for RleValueEncoder { ensure_phys_ty!(Type::BOOLEAN, "RleValueEncoder only supports BoolType"); let rle_encoder = self .encoder - .as_mut() + .take() .expect("RLE value encoder is not initialized"); // Flush all encoder buffers and raw values - let encoded_data = { - let buf = rle_encoder.flush_buffer()?; - - // Note that buf does not have any offset, all data is encoded bytes - let len = (buf.len() as i32).to_le(); - let len_bytes = len.as_bytes(); - let mut encoded_data = vec![]; - encoded_data.extend_from_slice(len_bytes); - encoded_data.extend_from_slice(buf); - encoded_data - }; - // Reset rle encoder for the next batch - rle_encoder.clear(); + let mut buf = rle_encoder.consume()?; + assert!(buf.len() > 4, "should have had padding inserted"); + + // Note that buf does not have any offset, all data is encoded bytes + let len = (buf.len() - 4) as i32; + buf[..4].copy_from_slice(&len.to_le_bytes()); - Ok(ByteBufferPtr::new(encoded_data)) + Ok(ByteBufferPtr::new(buf)) } } @@ -293,7 +289,7 @@ impl DeltaBitPackEncoder { let block_size = DEFAULT_BLOCK_SIZE; let num_mini_blocks = DEFAULT_NUM_MINI_BLOCKS; let mini_block_size = block_size / num_mini_blocks; - assert!(mini_block_size % 8 == 0); + assert_eq!(mini_block_size % 8, 0); Self::assert_supported_type(); DeltaBitPackEncoder { @@ -346,7 +342,7 @@ impl DeltaBitPackEncoder { self.bit_writer.put_zigzag_vlq_int(min_delta); // Slice to store bit width for each mini block - let offset = self.bit_writer.skip(self.num_mini_blocks)?; + let offset = self.bit_writer.skip(self.num_mini_blocks); for i in 0..self.num_mini_blocks { // Find how many values we need to encode - either block size or whatever @@ -364,7 +360,7 @@ impl DeltaBitPackEncoder { } // Compute the max delta in current mini block - let mut max_delta = i64::min_value(); + let mut max_delta = i64::MIN; for j in 0..n { max_delta = cmp::max(max_delta, self.deltas[i * self.mini_block_size + j]); @@ -873,7 +869,7 @@ mod tests { let mut values = vec![]; values.extend_from_slice(&[true; 16]); values.extend_from_slice(&[false; 16]); - run_test::(Encoding::RLE, -1, &values, 0, 2, 0); + run_test::(Encoding::RLE, -1, &values, 0, 6, 0); // DELTA_LENGTH_BYTE_ARRAY run_test::( diff --git a/parquet/src/encodings/levels.rs b/parquet/src/encodings/levels.rs index 28fb6388169..0f86792e027 100644 --- a/parquet/src/encodings/levels.rs +++ b/parquet/src/encodings/levels.rs @@ -65,22 +65,20 @@ impl LevelEncoder { /// Used to encode levels for Data Page v1. /// /// Panics, if encoding is not supported. - pub fn v1(encoding: Encoding, max_level: i16, byte_buffer: Vec) -> Self { + pub fn v1(encoding: Encoding, max_level: i16, capacity: usize) -> Self { + let capacity_bytes = max_buffer_size(encoding, max_level, capacity); + let mut buffer = Vec::with_capacity(capacity_bytes); let bit_width = num_required_bits(max_level as u64); match encoding { - Encoding::RLE => LevelEncoder::Rle(RleEncoder::new_from_buf( - bit_width, - byte_buffer, - mem::size_of::(), - )), + Encoding::RLE => { + buffer.extend_from_slice(&[0; 8]); + LevelEncoder::Rle(RleEncoder::new_from_buf(bit_width, buffer)) + } Encoding::BIT_PACKED => { // Here we set full byte buffer without adjusting for num_buffered_values, // because byte buffer will already be allocated with size from // `max_buffer_size()` method. - LevelEncoder::BitPacked( - bit_width, - BitWriter::new_from_buf(byte_buffer, 0), - ) + LevelEncoder::BitPacked(bit_width, BitWriter::new_from_buf(buffer)) } _ => panic!("Unsupported encoding type {}", encoding), } @@ -88,9 +86,11 @@ impl LevelEncoder { /// Creates new level encoder based on RLE encoding. Used to encode Data Page v2 /// repetition and definition levels. - pub fn v2(max_level: i16, byte_buffer: Vec) -> Self { + pub fn v2(max_level: i16, capacity: usize) -> Self { + let capacity_bytes = max_buffer_size(Encoding::RLE, max_level, capacity); + let buffer = Vec::with_capacity(capacity_bytes); let bit_width = num_required_bits(max_level as u64); - LevelEncoder::RleV2(RleEncoder::new_from_buf(bit_width, byte_buffer, 0)) + LevelEncoder::RleV2(RleEncoder::new_from_buf(bit_width, buffer)) } /// Put/encode levels vector into this level encoder. @@ -114,9 +114,7 @@ impl LevelEncoder { } LevelEncoder::BitPacked(bit_width, ref mut encoder) => { for value in buffer { - if !encoder.put_value(*value as u64, bit_width as usize) { - return Err(general_err!("Not enough bytes left")); - } + encoder.put_value(*value as u64, bit_width as usize); num_encoded += 1; } encoder.flush(); @@ -283,11 +281,10 @@ mod tests { use crate::util::test_common::random_numbers_range; fn test_internal_roundtrip(enc: Encoding, levels: &[i16], max_level: i16, v2: bool) { - let size = max_buffer_size(enc, max_level, levels.len()); let mut encoder = if v2 { - LevelEncoder::v2(max_level, vec![0; size]) + LevelEncoder::v2(max_level, levels.len()) } else { - LevelEncoder::v1(enc, max_level, vec![0; size]) + LevelEncoder::v1(enc, max_level, levels.len()) }; encoder.put(levels).expect("put() should be OK"); let encoded_levels = encoder.consume().expect("consume() should be OK"); @@ -315,11 +312,10 @@ mod tests { max_level: i16, v2: bool, ) { - let size = max_buffer_size(enc, max_level, levels.len()); let mut encoder = if v2 { - LevelEncoder::v2(max_level, vec![0; size]) + LevelEncoder::v2(max_level, levels.len()) } else { - LevelEncoder::v1(enc, max_level, vec![0; size]) + LevelEncoder::v1(enc, max_level, levels.len()) }; encoder.put(levels).expect("put() should be OK"); let encoded_levels = encoder.consume().expect("consume() should be OK"); @@ -363,11 +359,10 @@ mod tests { max_level: i16, v2: bool, ) { - let size = max_buffer_size(enc, max_level, levels.len()); let mut encoder = if v2 { - LevelEncoder::v2(max_level, vec![0; size]) + LevelEncoder::v2(max_level, levels.len()) } else { - LevelEncoder::v1(enc, max_level, vec![0; size]) + LevelEncoder::v1(enc, max_level, levels.len()) }; // Encode only one value let num_encoded = encoder.put(&levels[0..1]).expect("put() should be OK"); @@ -391,33 +386,6 @@ mod tests { assert_eq!(buffer[0..num_decoded], levels[0..num_decoded]); } - // Tests when encoded values are larger than encoder's buffer - fn test_internal_roundtrip_overflow( - enc: Encoding, - levels: &[i16], - max_level: i16, - v2: bool, - ) { - let size = max_buffer_size(enc, max_level, levels.len()); - let mut encoder = if v2 { - LevelEncoder::v2(max_level, vec![0; size]) - } else { - LevelEncoder::v1(enc, max_level, vec![0; size]) - }; - let mut found_err = false; - // Insert a large number of values, so we run out of space - for _ in 0..100 { - if let Err(err) = encoder.put(levels) { - assert!(format!("{}", err).contains("Not enough bytes left")); - found_err = true; - break; - }; - } - if !found_err { - panic!("Failed test: no buffer overflow"); - } - } - #[test] fn test_roundtrip_one() { let levels = vec![0, 1, 1, 1, 1, 0, 0, 0, 0, 1]; @@ -484,15 +452,6 @@ mod tests { test_internal_roundtrip_underflow(Encoding::RLE, &levels, max_level, true); } - #[test] - fn test_roundtrip_overflow() { - let levels = vec![1, 1, 2, 3, 2, 1, 1, 2, 3, 1]; - let max_level = 3; - test_internal_roundtrip_overflow(Encoding::RLE, &levels, max_level, false); - test_internal_roundtrip_overflow(Encoding::BIT_PACKED, &levels, max_level, false); - test_internal_roundtrip_overflow(Encoding::RLE, &levels, max_level, true); - } - #[test] fn test_rle_decoder_set_data_range() { // Buffer containing both repetition and definition levels diff --git a/parquet/src/encodings/rle.rs b/parquet/src/encodings/rle.rs index 8a19dd5452a..28ebd7d3a17 100644 --- a/parquet/src/encodings/rle.rs +++ b/parquet/src/encodings/rle.rs @@ -83,21 +83,14 @@ pub struct RleEncoder { impl RleEncoder { pub fn new(bit_width: u8, buffer_len: usize) -> Self { - let buffer = vec![0; buffer_len]; - RleEncoder::new_from_buf(bit_width, buffer, 0) + let buffer = Vec::with_capacity(buffer_len); + RleEncoder::new_from_buf(bit_width, buffer) } - /// Initialize the encoder from existing `buffer` and the starting offset `start`. - pub fn new_from_buf(bit_width: u8, buffer: Vec, start: usize) -> Self { - assert!(bit_width <= 64, "bit_width ({}) out of range.", bit_width); + /// Initialize the encoder from existing `buffer` + pub fn new_from_buf(bit_width: u8, buffer: Vec) -> Self { let max_run_byte_size = RleEncoder::min_buffer_size(bit_width); - assert!( - buffer.len() >= max_run_byte_size, - "buffer length {} must be greater than {}", - buffer.len(), - max_run_byte_size - ); - let bit_writer = BitWriter::new_from_buf(buffer, start); + let bit_writer = BitWriter::new_from_buf(buffer); RleEncoder { bit_width, bit_writer, @@ -244,14 +237,11 @@ impl RleEncoder { fn flush_rle_run(&mut self) -> Result<()> { assert!(self.repeat_count > 0); let indicator_value = self.repeat_count << 1; - let mut result = self.bit_writer.put_vlq_int(indicator_value as u64); - result &= self.bit_writer.put_aligned( + self.bit_writer.put_vlq_int(indicator_value as u64); + self.bit_writer.put_aligned( self.current_value, bit_util::ceil(self.bit_width as i64, 8) as usize, ); - if !result { - return Err(general_err!("Failed to write RLE run")); - } self.num_buffered_values = 0; self.repeat_count = 0; Ok(()) @@ -259,13 +249,12 @@ impl RleEncoder { fn flush_bit_packed_run(&mut self, update_indicator_byte: bool) -> Result<()> { if self.indicator_byte_pos < 0 { - self.indicator_byte_pos = self.bit_writer.skip(1)? as i64; + self.indicator_byte_pos = self.bit_writer.skip(1) as i64; } // Write all buffered values as bit-packed literals for i in 0..self.num_buffered_values { - let _ = self - .bit_writer + self.bit_writer .put_value(self.buffered_values[i], self.bit_width as usize); } self.num_buffered_values = 0; @@ -273,13 +262,11 @@ impl RleEncoder { // Write the indicator byte to the reserved position in `bit_writer` let num_groups = self.bit_packed_count / 8; let indicator_byte = ((num_groups << 1) | 1) as u8; - if !self.bit_writer.put_aligned_offset( + self.bit_writer.put_aligned_offset( indicator_byte, 1, self.indicator_byte_pos as usize, - ) { - return Err(general_err!("Not enough space to write indicator byte")); - } + ); self.indicator_byte_pos = -1; self.bit_packed_count = 0; } @@ -443,7 +430,8 @@ impl RleDecoder { let mut values_skipped = 0; while values_skipped < num_values { if self.rle_left > 0 { - let num_values = cmp::min(num_values - values_skipped, self.rle_left as usize); + let num_values = + cmp::min(num_values - values_skipped, self.rle_left as usize); self.rle_left -= num_values as u32; values_skipped += num_values; } else if self.bit_packed_left > 0 { @@ -452,10 +440,7 @@ impl RleDecoder { let bit_reader = self.bit_reader.as_mut().expect("bit_reader should be set"); - num_values = bit_reader.skip( - num_values, - self.bit_width as usize, - ); + num_values = bit_reader.skip(num_values, self.bit_width as usize); if num_values == 0 { // Handle writers which truncate the final block self.bit_packed_left = 0; @@ -587,7 +572,9 @@ mod tests { assert_eq!(skipped, 2); let mut buffer = vec![0; 6]; - let remaining = decoder.get_batch::(&mut buffer).expect("getting remaining"); + let remaining = decoder + .get_batch::(&mut buffer) + .expect("getting remaining"); assert_eq!(remaining, 6); assert_eq!(buffer, expected); } @@ -671,7 +658,9 @@ mod tests { let skipped = decoder.skip(50).expect("skipping first 50"); assert_eq!(skipped, 50); - let remainder = decoder.get_batch::(&mut buffer).expect("getting remaining 50"); + let remainder = decoder + .get_batch::(&mut buffer) + .expect("getting remaining 50"); assert_eq!(remainder, 50); assert_eq!(buffer, expected); @@ -687,7 +676,9 @@ mod tests { } let skipped = decoder.skip(50).expect("skipping first 50"); assert_eq!(skipped, 50); - let remainder = decoder.get_batch::(&mut buffer).expect("getting remaining 50"); + let remainder = decoder + .get_batch::(&mut buffer) + .expect("getting remaining 50"); assert_eq!(remainder, 50); assert_eq!(buffer, expected); } @@ -739,7 +730,9 @@ mod tests { let expected = vec![10, 20, 20, 20, 20, 30, 30, 30, 30, 30]; let skipped = decoder.skip(2).expect("skipping two values"); assert_eq!(skipped, 2); - let remainder = decoder.get_batch_with_dict::(&dict, &mut buffer, 10).expect("getting remainder"); + let remainder = decoder + .get_batch_with_dict::(&dict, &mut buffer, 10) + .expect("getting remainder"); assert_eq!(remainder, 10); assert_eq!(buffer, expected); @@ -751,17 +744,12 @@ mod tests { let mut decoder: RleDecoder = RleDecoder::new(3); decoder.set_data(data); let mut buffer = vec![""; 8]; - let expected = vec![ - "eee", "fff", "ddd", "eee", "fff", "eee", "fff", - "fff", - ]; + let expected = vec!["eee", "fff", "ddd", "eee", "fff", "eee", "fff", "fff"]; let skipped = decoder.skip(4).expect("skipping four values"); assert_eq!(skipped, 4); - let remainder = decoder.get_batch_with_dict::<&str>( - dict.as_slice(), - buffer.as_mut_slice(), - 8, - ).expect("getting remainder"); + let remainder = decoder + .get_batch_with_dict::<&str>(dict.as_slice(), buffer.as_mut_slice(), 8) + .expect("getting remainder"); assert_eq!(remainder, 8); assert_eq!(buffer, expected); } diff --git a/parquet/src/util/bit_util.rs b/parquet/src/util/bit_util.rs index 29269c4ad7e..7cecf2ce3f0 100644 --- a/parquet/src/util/bit_util.rs +++ b/parquet/src/util/bit_util.rs @@ -18,7 +18,6 @@ use std::{cmp, mem::size_of}; use crate::data_type::AsBytes; -use crate::errors::{ParquetError, Result}; use crate::util::{bit_packing::unpack32, memory::ByteBufferPtr}; #[inline] @@ -138,7 +137,7 @@ where /// This function should be removed after /// [`int_roundings`](https://github.com/rust-lang/rust/issues/88581) is stable. #[inline] -pub fn ceil(value: i64, divisor: i64) -> i64 { +pub fn ceil(value: T, divisor: T) -> T { num::Integer::div_ceil(&value, &divisor) } @@ -148,7 +147,7 @@ pub fn trailing_bits(v: u64, num_bits: usize) -> u64 { if num_bits >= 64 { v } else { - v & ((1< bool { /// bit packed or byte aligned fashion. pub struct BitWriter { buffer: Vec, - max_bytes: usize, buffered_values: u64, - byte_offset: usize, - bit_offset: usize, - start: usize, + bit_offset: u8, } impl BitWriter { pub fn new(max_bytes: usize) -> Self { Self { - buffer: vec![0; max_bytes], - max_bytes, + buffer: Vec::with_capacity(max_bytes), buffered_values: 0, - byte_offset: 0, bit_offset: 0, - start: 0, } } - /// Initializes the writer from the existing buffer `buffer` and starting - /// offset `start`. - pub fn new_from_buf(buffer: Vec, start: usize) -> Self { - assert!(start < buffer.len()); - let len = buffer.len(); + /// Initializes the writer appending to the existing buffer `buffer` + pub fn new_from_buf(buffer: Vec) -> Self { Self { buffer, - max_bytes: len, buffered_values: 0, - byte_offset: start, bit_offset: 0, - start, } } - /// Extend buffer size by `increment` bytes - #[inline] - pub fn extend(&mut self, increment: usize) { - self.max_bytes += increment; - let extra = vec![0; increment]; - self.buffer.extend(extra); - } - - /// Report buffer size, in bytes - #[inline] - pub fn capacity(&mut self) -> usize { - self.max_bytes - } - /// Consumes and returns the current buffer. #[inline] pub fn consume(mut self) -> Vec { self.flush(); - self.buffer.truncate(self.byte_offset); self.buffer } @@ -241,53 +213,37 @@ impl BitWriter { #[inline] pub fn flush_buffer(&mut self) -> &[u8] { self.flush(); - &self.buffer()[0..self.byte_offset] + self.buffer() } /// Clears the internal state so the buffer can be reused. #[inline] pub fn clear(&mut self) { + self.buffer.clear(); self.buffered_values = 0; - self.byte_offset = self.start; self.bit_offset = 0; } /// Flushes the internal buffered bits and the align the buffer to the next byte. #[inline] pub fn flush(&mut self) { - let num_bytes = ceil(self.bit_offset as i64, 8) as usize; - assert!(self.byte_offset + num_bytes <= self.max_bytes); - memcpy_value( - &self.buffered_values, - num_bytes, - &mut self.buffer[self.byte_offset..], - ); + let num_bytes = ceil(self.bit_offset, 8); + let slice = &self.buffered_values.to_ne_bytes()[..num_bytes as usize]; + self.buffer.extend_from_slice(slice); self.buffered_values = 0; self.bit_offset = 0; - self.byte_offset += num_bytes; } /// Advances the current offset by skipping `num_bytes`, flushing the internal bit /// buffer first. /// This is useful when you want to jump over `num_bytes` bytes and come back later /// to fill these bytes. - /// - /// Returns error if `num_bytes` is beyond the boundary of the internal buffer. - /// Otherwise, returns the old offset. #[inline] - pub fn skip(&mut self, num_bytes: usize) -> Result { + pub fn skip(&mut self, num_bytes: usize) -> usize { self.flush(); - assert!(self.byte_offset <= self.max_bytes); - if self.byte_offset + num_bytes > self.max_bytes { - return Err(general_err!( - "Not enough bytes left in BitWriter. Need {} but only have {}", - self.byte_offset + num_bytes, - self.max_bytes - )); - } - let result = self.byte_offset; - self.byte_offset += num_bytes; - Ok(result) + let result = self.buffer.len(); + self.buffer.extend(std::iter::repeat(0).take(num_bytes)); + result } /// Returns a slice containing the next `num_bytes` bytes starting from the current @@ -295,32 +251,24 @@ impl BitWriter { /// This is useful when you want to jump over `num_bytes` bytes and come back later /// to fill these bytes. #[inline] - pub fn get_next_byte_ptr(&mut self, num_bytes: usize) -> Result<&mut [u8]> { - let offset = self.skip(num_bytes)?; - Ok(&mut self.buffer[offset..offset + num_bytes]) + pub fn get_next_byte_ptr(&mut self, num_bytes: usize) -> &mut [u8] { + let offset = self.skip(num_bytes); + &mut self.buffer[offset..offset + num_bytes] } #[inline] pub fn bytes_written(&self) -> usize { - self.byte_offset - self.start + ceil(self.bit_offset as i64, 8) as usize + self.buffer.len() + ceil(self.bit_offset, 8) as usize } #[inline] pub fn buffer(&self) -> &[u8] { - &self.buffer[self.start..] + &self.buffer } #[inline] pub fn byte_offset(&self) -> usize { - self.byte_offset - } - - /// Returns the internal buffer length. This is the maximum number of bytes that this - /// writer can write. User needs to call `consume` to consume the current buffer - /// before more data can be written. - #[inline] - pub fn buffer_len(&self) -> usize { - self.max_bytes + self.buffer.len() } /// Writes the entire byte `value` at the byte `offset` @@ -330,53 +278,36 @@ impl BitWriter { /// Writes the `num_bits` LSB of value `v` to the internal buffer of this writer. /// The `num_bits` must not be greater than 64. This is bit packed. - /// - /// Returns false if there's not enough room left. True otherwise. #[inline] - pub fn put_value(&mut self, v: u64, num_bits: usize) -> bool { + pub fn put_value(&mut self, v: u64, num_bits: usize) { assert!(num_bits <= 64); + let num_bits = num_bits as u8; assert_eq!(v.checked_shr(num_bits as u32).unwrap_or(0), 0); // covers case v >> 64 - if self.byte_offset * 8 + self.bit_offset + num_bits > self.max_bytes as usize * 8 - { - return false; - } - + // Add value to buffered_values self.buffered_values |= v << self.bit_offset; self.bit_offset += num_bits; - if self.bit_offset >= 64 { - memcpy_value( - &self.buffered_values, - 8, - &mut self.buffer[self.byte_offset..], - ); - self.byte_offset += 8; - self.bit_offset -= 64; - self.buffered_values = 0; + if let Some(remaining) = self.bit_offset.checked_sub(64) { + self.buffer + .extend_from_slice(&self.buffered_values.to_le_bytes()); + self.bit_offset = remaining; + // Perform checked right shift: v >> offset, where offset < 64, otherwise we // shift all bits self.buffered_values = v .checked_shr((num_bits - self.bit_offset) as u32) .unwrap_or(0); } - assert!(self.bit_offset < 64); - true } /// Writes `val` of `num_bytes` bytes to the next aligned byte. If size of `T` is /// larger than `num_bytes`, extra higher ordered bytes will be ignored. - /// - /// Returns false if there's not enough room left. True otherwise. #[inline] - pub fn put_aligned(&mut self, val: T, num_bytes: usize) -> bool { - let result = self.get_next_byte_ptr(num_bytes); - if result.is_err() { - // TODO: should we return `Result` for this func? - return false; - } - let ptr = result.unwrap(); - memcpy_value(&val, num_bytes, ptr); - true + pub fn put_aligned(&mut self, val: T, num_bytes: usize) { + self.flush(); + let slice = val.as_bytes(); + let len = num_bytes.min(slice.len()); + self.buffer.extend_from_slice(&slice[..len]); } /// Writes `val` of `num_bytes` bytes at the designated `offset`. The `offset` is the @@ -384,49 +315,34 @@ impl BitWriter { /// maintains. Note that this will overwrite any existing data between `offset` and /// `offset + num_bytes`. Also that if size of `T` is larger than `num_bytes`, extra /// higher ordered bytes will be ignored. - /// - /// Returns false if there's not enough room left, or the `pos` is not valid. - /// True otherwise. #[inline] pub fn put_aligned_offset( &mut self, val: T, num_bytes: usize, offset: usize, - ) -> bool { - if num_bytes + offset > self.max_bytes { - return false; - } - memcpy_value( - &val, - num_bytes, - &mut self.buffer[offset..offset + num_bytes], - ); - true + ) { + let slice = val.as_bytes(); + let len = num_bytes.min(slice.len()); + self.buffer[offset..offset + len].copy_from_slice(&slice[..len]) } /// Writes a VLQ encoded integer `v` to this buffer. The value is byte aligned. - /// - /// Returns false if there's not enough room left. True otherwise. #[inline] - pub fn put_vlq_int(&mut self, mut v: u64) -> bool { - let mut result = true; + pub fn put_vlq_int(&mut self, mut v: u64) { while v & 0xFFFFFFFFFFFFFF80 != 0 { - result &= self.put_aligned::(((v & 0x7F) | 0x80) as u8, 1); + self.put_aligned::(((v & 0x7F) | 0x80) as u8, 1); v >>= 7; } - result &= self.put_aligned::((v & 0x7F) as u8, 1); - result + self.put_aligned::((v & 0x7F) as u8, 1); } /// Writes a zigzag-VLQ encoded (in little endian order) int `v` to this buffer. /// Zigzag-VLQ is a variant of VLQ encoding where negative and positive /// numbers are encoded in a zigzag fashion. /// See: https://developers.google.com/protocol-buffers/docs/encoding - /// - /// Returns false if there's not enough room left. True otherwise. #[inline] - pub fn put_zigzag_vlq_int(&mut self, v: i64) -> bool { + pub fn put_zigzag_vlq_int(&mut self, v: i64) { let u: u64 = ((v << 1) ^ (v >> 63)) as u64; self.put_vlq_int(u) } @@ -488,7 +404,7 @@ impl BitReader { /// Gets the current byte offset #[inline] pub fn get_byte_offset(&self) -> usize { - self.byte_offset + ceil(self.bit_offset as i64, 8) as usize + self.byte_offset + ceil(self.bit_offset, 8) } /// Reads a value of type `T` and of size `num_bits`. @@ -568,7 +484,7 @@ impl BitReader { .expect("expected to have more data"); i += 1; } - return values_to_read + return values_to_read; } // First align bit offset to byte offset @@ -645,8 +561,7 @@ impl BitReader { // First align bit offset to byte offset if self.bit_offset != 0 { while values_skipped < num_values && self.bit_offset != 0 { - self - .skip_value(num_bits); + self.skip_value(num_bits); values_skipped += 1; } } @@ -656,7 +571,6 @@ impl BitReader { values_skipped += 32; } - assert!(num_values - values_skipped < 32); self.reload_buffer_values(); @@ -696,7 +610,7 @@ impl BitReader { /// Returns `Some` if there's enough bytes left to form a value of `T`. /// Otherwise `None`. pub fn get_aligned(&mut self, num_bytes: usize) -> Option { - let bytes_read = ceil(self.bit_offset as i64, 8) as usize; + let bytes_read = ceil(self.bit_offset, 8); if self.byte_offset + bytes_read + num_bytes > self.total_bytes { return None; } @@ -792,9 +706,9 @@ mod tests { assert_eq!(ceil(8, 8), 1); assert_eq!(ceil(9, 8), 2); assert_eq!(ceil(9, 9), 1); - assert_eq!(ceil(10000000000, 10), 1000000000); - assert_eq!(ceil(10, 10000000000), 1); - assert_eq!(ceil(10000000000, 1000000000), 10); + assert_eq!(ceil(10000000000_u64, 10), 1000000000); + assert_eq!(ceil(10_u64, 10000000000), 1); + assert_eq!(ceil(10000000000_u64, 1000000000), 10); } #[test] @@ -846,16 +760,16 @@ mod tests { fn test_bit_reader_skip() { let buffer = vec![255, 0]; let mut bit_reader = BitReader::from(buffer); - let skipped = bit_reader.skip(1,1); + let skipped = bit_reader.skip(1, 1); assert_eq!(skipped, 1); assert_eq!(bit_reader.get_value::(1), Some(1)); - let skipped = bit_reader.skip(2,2); + let skipped = bit_reader.skip(2, 2); assert_eq!(skipped, 2); assert_eq!(bit_reader.get_value::(2), Some(3)); - let skipped = bit_reader.skip(4,1); + let skipped = bit_reader.skip(4, 1); assert_eq!(skipped, 4); assert_eq!(bit_reader.get_value::(4), Some(0)); - let skipped = bit_reader.skip(1,1); + let skipped = bit_reader.skip(1, 1); assert_eq!(skipped, 0); } @@ -973,7 +887,7 @@ mod tests { #[test] fn test_skip() { let mut writer = BitWriter::new(5); - let old_offset = writer.skip(1).expect("skip() should return OK"); + let old_offset = writer.skip(1); writer.put_aligned(42, 4); writer.put_aligned_offset(0x10, 1, old_offset); let result = writer.consume(); @@ -981,16 +895,15 @@ mod tests { writer = BitWriter::new(4); let result = writer.skip(5); - assert!(result.is_err()); + assert_eq!(result, 0); + assert_eq!(writer.buffer(), &[0; 5]) } #[test] fn test_get_next_byte_ptr() { let mut writer = BitWriter::new(5); { - let first_byte = writer - .get_next_byte_ptr(1) - .expect("get_next_byte_ptr() should return OK"); + let first_byte = writer.get_next_byte_ptr(1); first_byte[0] = 0x10; } writer.put_aligned(42, 4); @@ -1017,8 +930,7 @@ mod tests { let mut writer = BitWriter::new(len); for i in 0..8 { - let result = writer.put_value(i % 2, 1); - assert!(result); + writer.put_value(i % 2, 1); } writer.flush(); @@ -1029,11 +941,10 @@ mod tests { // Write 00110011 for i in 0..8 { - let result = match i { + match i { 0 | 1 | 4 | 5 => writer.put_value(false as u64, 1), _ => writer.put_value(true as u64, 1), - }; - assert!(result); + } } writer.flush(); { @@ -1078,19 +989,13 @@ mod tests { fn test_put_value_rand_numbers(total: usize, num_bits: usize) { assert!(num_bits < 64); - let num_bytes = ceil(num_bits as i64, 8); + let num_bytes = ceil(num_bits, 8); let mut writer = BitWriter::new(num_bytes as usize * total); let values: Vec = random_numbers::(total) .iter() .map(|v| v & ((1 << num_bits) - 1)) .collect(); - (0..total).for_each(|i| { - assert!( - writer.put_value(values[i] as u64, num_bits), - "[{}]: put_value() failed", - i - ); - }); + (0..total).for_each(|i| writer.put_value(values[i] as u64, num_bits)); let mut reader = BitReader::from(writer.consume()); (0..total).for_each(|i| { @@ -1124,7 +1029,7 @@ mod tests { T: FromBytes + Default + Clone + Debug + Eq, { assert!(num_bits <= 32); - let num_bytes = ceil(num_bits as i64, 8); + let num_bytes = ceil(num_bits, 8); let mut writer = BitWriter::new(num_bytes as usize * total); let values: Vec = random_numbers::(total) @@ -1136,9 +1041,7 @@ mod tests { let expected_values: Vec = values.iter().map(|v| from_ne_slice(v.as_bytes())).collect(); - (0..total).for_each(|i| { - assert!(writer.put_value(values[i] as u64, num_bits)); - }); + (0..total).for_each(|i| writer.put_value(values[i] as u64, num_bits)); let buf = writer.consume(); let mut reader = BitReader::from(buf); @@ -1175,7 +1078,7 @@ mod tests { assert!(total % 2 == 0); let aligned_value_byte_width = std::mem::size_of::(); - let value_byte_width = ceil(num_bits as i64, 8) as usize; + let value_byte_width = ceil(num_bits, 8); let mut writer = BitWriter::new((total / 2) * (aligned_value_byte_width + value_byte_width)); let values: Vec = random_numbers::(total / 2) @@ -1187,17 +1090,9 @@ mod tests { for i in 0..total { let j = i / 2; if i % 2 == 0 { - assert!( - writer.put_value(values[j] as u64, num_bits), - "[{}]: put_value() failed", - i - ); + writer.put_value(values[j] as u64, num_bits); } else { - assert!( - writer.put_aligned::(aligned_values[j], aligned_value_byte_width), - "[{}]: put_aligned() failed", - i - ); + writer.put_aligned::(aligned_values[j], aligned_value_byte_width) } } @@ -1231,13 +1126,7 @@ mod tests { let total = 64; let mut writer = BitWriter::new(total * 32); let values = random_numbers::(total); - (0..total).for_each(|i| { - assert!( - writer.put_vlq_int(values[i] as u64), - "[{}]; put_vlq_int() failed", - i - ); - }); + (0..total).for_each(|i| writer.put_vlq_int(values[i] as u64)); let mut reader = BitReader::from(writer.consume()); (0..total).for_each(|i| { @@ -1257,13 +1146,7 @@ mod tests { let total = 64; let mut writer = BitWriter::new(total * 32); let values = random_numbers::(total); - (0..total).for_each(|i| { - assert!( - writer.put_zigzag_vlq_int(values[i] as i64), - "[{}]; put_zigzag_vlq_int() failed", - i - ); - }); + (0..total).for_each(|i| writer.put_zigzag_vlq_int(values[i] as i64)); let mut reader = BitReader::from(writer.consume()); (0..total).for_each(|i| { diff --git a/parquet/src/util/test_common/page_util.rs b/parquet/src/util/test_common/page_util.rs index f56eaf85e63..d7653d4e585 100644 --- a/parquet/src/util/test_common/page_util.rs +++ b/parquet/src/util/test_common/page_util.rs @@ -16,11 +16,10 @@ // under the License. use crate::basic::Encoding; -use crate::column::page::{PageMetadata, PageReader}; use crate::column::page::{Page, PageIterator}; +use crate::column::page::{PageMetadata, PageReader}; use crate::data_type::DataType; use crate::encodings::encoding::{get_encoder, DictEncoder, Encoder}; -use crate::encodings::levels::max_buffer_size; use crate::encodings::levels::LevelEncoder; use crate::errors::Result; use crate::schema::types::{ColumnDescPtr, SchemaDescPtr}; @@ -75,8 +74,7 @@ impl DataPageBuilderImpl { if max_level <= 0 { return 0; } - let size = max_buffer_size(Encoding::RLE, max_level, levels.len()); - let mut level_encoder = LevelEncoder::v1(Encoding::RLE, max_level, vec![0; size]); + let mut level_encoder = LevelEncoder::v1(Encoding::RLE, max_level, levels.len()); level_encoder.put(levels).expect("put() should be OK"); let encoded_levels = level_encoder.consume().expect("consume() should be OK"); // Actual encoded bytes (without length offset)