Skip to content

Commit

Permalink
Remove fallibility from RLEEncoder (#2226) (#2259)
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed Aug 2, 2022
1 parent 9a4b1c9 commit 4222f5a
Show file tree
Hide file tree
Showing 8 changed files with 70 additions and 92 deletions.
14 changes: 6 additions & 8 deletions parquet/src/arrow/arrow_writer/byte_array.rs
Expand Up @@ -374,28 +374,26 @@ impl DictEncoder {
&mut self,
min_value: Option<ByteArray>,
max_value: Option<ByteArray>,
) -> Result<DataPageValues<ByteArray>> {
) -> DataPageValues<ByteArray> {
let num_values = self.indices.len();
let buffer_len = self.estimated_data_page_size();
let mut buffer = Vec::with_capacity(buffer_len);
buffer.push(self.bit_width() as u8);

let mut encoder = RleEncoder::new_from_buf(self.bit_width(), buffer);
for index in &self.indices {
if !encoder.put(*index as u64)? {
return Err(general_err!("Encoder doesn't have enough space"));
}
encoder.put(*index as u64)
}

self.indices.clear();

Ok(DataPageValues {
buf: encoder.consume()?.into(),
DataPageValues {
buf: encoder.consume().into(),
num_values,
encoding: Encoding::RLE_DICTIONARY,
min_value,
max_value,
})
}
}
}

Expand Down Expand Up @@ -500,7 +498,7 @@ impl ColumnValueEncoder for ByteArrayEncoder {
let max_value = self.max_value.take();

match &mut self.dict_encoder {
Some(encoder) => encoder.flush_data_page(min_value, max_value),
Some(encoder) => Ok(encoder.flush_data_page(min_value, max_value)),
_ => self.fallback.flush_data_page(min_value, max_value),
}
}
Expand Down
8 changes: 4 additions & 4 deletions parquet/src/arrow/record_reader/definition_levels.rs
Expand Up @@ -408,12 +408,12 @@ mod tests {
let mut encoder = RleEncoder::new(1, 1024);
for _ in 0..len {
let bool = rng.gen_bool(0.8);
assert!(encoder.put(bool as u64).unwrap());
encoder.put(bool as u64);
expected.append(bool);
}
assert_eq!(expected.len(), len);

let encoded = encoder.consume().unwrap();
let encoded = encoder.consume();
let mut decoder = PackedDecoder::new();
decoder.set_data(Encoding::RLE, ByteBufferPtr::new(encoded));

Expand Down Expand Up @@ -444,15 +444,15 @@ mod tests {
let mut total_value = 0;
for _ in 0..len {
let bool = rng.gen_bool(0.8);
assert!(encoder.put(bool as u64).unwrap());
encoder.put(bool as u64);
expected.append(bool);
if bool {
total_value += 1;
}
}
assert_eq!(expected.len(), len);

let encoded = encoder.consume().unwrap();
let encoded = encoder.consume();
let mut decoder = PackedDecoder::new();
decoder.set_data(Encoding::RLE, ByteBufferPtr::new(encoded));

Expand Down
16 changes: 8 additions & 8 deletions parquet/src/column/writer/mod.rs
Expand Up @@ -630,7 +630,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
Encoding::RLE,
&self.rep_levels_sink[..],
max_rep_level,
)?[..],
)[..],
);
}

Expand All @@ -640,7 +640,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
Encoding::RLE,
&self.def_levels_sink[..],
max_def_level,
)?[..],
)[..],
);
}

Expand Down Expand Up @@ -671,14 +671,14 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {

if max_rep_level > 0 {
let levels =
self.encode_levels_v2(&self.rep_levels_sink[..], max_rep_level)?;
self.encode_levels_v2(&self.rep_levels_sink[..], max_rep_level);
rep_levels_byte_len = levels.len();
buffer.extend_from_slice(&levels[..]);
}

if max_def_level > 0 {
let levels =
self.encode_levels_v2(&self.def_levels_sink[..], max_def_level)?;
self.encode_levels_v2(&self.def_levels_sink[..], max_def_level);
def_levels_byte_len = levels.len();
buffer.extend_from_slice(&levels[..]);
}
Expand Down Expand Up @@ -794,18 +794,18 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
encoding: Encoding,
levels: &[i16],
max_level: i16,
) -> Result<Vec<u8>> {
) -> Vec<u8> {
let mut encoder = LevelEncoder::v1(encoding, max_level, levels.len());
encoder.put(levels)?;
encoder.put(levels);
encoder.consume()
}

/// Encodes definition or repetition levels for Data Page v2.
/// Encoding is always RLE.
#[inline]
fn encode_levels_v2(&self, levels: &[i16], max_level: i16) -> Result<Vec<u8>> {
fn encode_levels_v2(&self, levels: &[i16], max_level: i16) -> Vec<u8> {
let mut encoder = LevelEncoder::v2(max_level, levels.len());
encoder.put(levels)?;
encoder.put(levels);
encoder.consume()
}

Expand Down
8 changes: 3 additions & 5 deletions parquet/src/encodings/encoding/dict_encoder.rs
Expand Up @@ -23,7 +23,7 @@ use crate::data_type::private::ParquetValueType;
use crate::data_type::DataType;
use crate::encodings::encoding::{Encoder, PlainEncoder};
use crate::encodings::rle::RleEncoder;
use crate::errors::{ParquetError, Result};
use crate::errors::Result;
use crate::schema::types::ColumnDescPtr;
use crate::util::bit_util::num_required_bits;
use crate::util::interner::{Interner, Storage};
Expand Down Expand Up @@ -132,12 +132,10 @@ impl<T: DataType> DictEncoder<T> {
// Write bit width in the first byte
let mut encoder = RleEncoder::new_from_buf(self.bit_width(), buffer);
for index in &self.indices {
if !encoder.put(*index as u64)? {
return Err(general_err!("Encoder doesn't have enough space"));
}
encoder.put(*index as u64)
}
self.indices.clear();
Ok(ByteBufferPtr::new(encoder.consume()?))
Ok(ByteBufferPtr::new(encoder.consume()))
}

fn put_one(&mut self, value: &T::T) {
Expand Down
6 changes: 2 additions & 4 deletions parquet/src/encodings/encoding/mod.rs
Expand Up @@ -195,9 +195,7 @@ impl<T: DataType> Encoder<T> for RleValueEncoder<T> {

for value in values {
let value = value.as_u64()?;
if !rle_encoder.put(value)? {
return Err(general_err!("RLE buffer is full"));
}
rle_encoder.put(value)
}
Ok(())
}
Expand Down Expand Up @@ -227,7 +225,7 @@ impl<T: DataType> Encoder<T> for RleValueEncoder<T> {
.expect("RLE value encoder is not initialized");

// Flush all encoder buffers and raw values
let mut buf = rle_encoder.consume()?;
let mut buf = rle_encoder.consume();
assert!(buf.len() > 4, "should have had padding inserted");

// Note that buf does not have any offset, all data is encoded bytes
Expand Down
35 changes: 15 additions & 20 deletions parquet/src/encodings/levels.rs
Expand Up @@ -21,7 +21,7 @@ use super::rle::{RleDecoder, RleEncoder};

use crate::basic::Encoding;
use crate::data_type::AsBytes;
use crate::errors::{ParquetError, Result};
use crate::errors::Result;
use crate::util::{
bit_util::{ceil, num_required_bits, BitReader, BitWriter},
memory::ByteBufferPtr,
Expand Down Expand Up @@ -97,21 +97,16 @@ impl LevelEncoder {
/// Put/encode levels vector into this level encoder.
/// Returns number of encoded values that are less than or equal to length of the
/// input buffer.
///
/// RLE and BIT_PACKED level encoders return Err() when internal buffer overflows or
/// flush fails.
#[inline]
pub fn put(&mut self, buffer: &[i16]) -> Result<usize> {
pub fn put(&mut self, buffer: &[i16]) -> usize {
let mut num_encoded = 0;
match *self {
LevelEncoder::Rle(ref mut encoder) | LevelEncoder::RleV2(ref mut encoder) => {
for value in buffer {
if !encoder.put(*value as u64)? {
return Err(general_err!("RLE buffer is full"));
}
encoder.put(*value as u64);
num_encoded += 1;
}
encoder.flush()?;
encoder.flush();
}
LevelEncoder::BitPacked(bit_width, ref mut encoder) => {
for value in buffer {
Expand All @@ -121,25 +116,25 @@ impl LevelEncoder {
encoder.flush();
}
}
Ok(num_encoded)
num_encoded
}

/// Finalizes level encoder, flush all intermediate buffers and return resulting
/// encoded buffer. Returned buffer is already truncated to encoded bytes only.
#[inline]
pub fn consume(self) -> Result<Vec<u8>> {
pub fn consume(self) -> Vec<u8> {
match self {
LevelEncoder::Rle(encoder) => {
let mut encoded_data = encoder.consume()?;
let mut encoded_data = encoder.consume();
// Account for the buffer offset
let encoded_len = encoded_data.len() - mem::size_of::<i32>();
let len = (encoded_len as i32).to_le();
let len_bytes = len.as_bytes();
encoded_data[0..len_bytes.len()].copy_from_slice(len_bytes);
Ok(encoded_data)
encoded_data
}
LevelEncoder::RleV2(encoder) => encoder.consume(),
LevelEncoder::BitPacked(_, encoder) => Ok(encoder.consume()),
LevelEncoder::BitPacked(_, encoder) => encoder.consume(),
}
}
}
Expand Down Expand Up @@ -287,8 +282,8 @@ mod tests {
} else {
LevelEncoder::v1(enc, max_level, levels.len())
};
encoder.put(levels).expect("put() should be OK");
let encoded_levels = encoder.consume().expect("consume() should be OK");
encoder.put(levels);
let encoded_levels = encoder.consume();

let byte_buf = ByteBufferPtr::new(encoded_levels);
let mut decoder;
Expand Down Expand Up @@ -318,8 +313,8 @@ mod tests {
} else {
LevelEncoder::v1(enc, max_level, levels.len())
};
encoder.put(levels).expect("put() should be OK");
let encoded_levels = encoder.consume().expect("consume() should be OK");
encoder.put(levels);
let encoded_levels = encoder.consume();

let byte_buf = ByteBufferPtr::new(encoded_levels);
let mut decoder;
Expand Down Expand Up @@ -366,8 +361,8 @@ mod tests {
LevelEncoder::v1(enc, max_level, levels.len())
};
// Encode only one value
let num_encoded = encoder.put(&levels[0..1]).expect("put() should be OK");
let encoded_levels = encoder.consume().expect("consume() should be OK");
let num_encoded = encoder.put(&levels[0..1]);
let encoded_levels = encoder.consume();
assert_eq!(num_encoded, 1);

let byte_buf = ByteBufferPtr::new(encoded_levels);
Expand Down

0 comments on commit 4222f5a

Please sign in to comment.