Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support skip_def_levels for ColumnLevelDecoder #2111

Merged
merged 6 commits into from Jul 24, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
112 changes: 108 additions & 4 deletions parquet/src/arrow/record_reader/definition_levels.rs
Expand Up @@ -20,6 +20,7 @@ use std::ops::Range;
use arrow::array::BooleanBufferBuilder;
use arrow::bitmap::Bitmap;
use arrow::buffer::Buffer;
use arrow::util::bit_chunk_iterator::UnalignedBitChunk;

use crate::arrow::buffer::bit_util::count_set_bits;
use crate::arrow::record_reader::buffer::BufferQueue;
Expand Down Expand Up @@ -216,10 +217,15 @@ impl ColumnLevelDecoder for DefinitionLevelBufferDecoder {
impl DefinitionLevelDecoder for DefinitionLevelBufferDecoder {
fn skip_def_levels(
&mut self,
_num_levels: usize,
_max_def_level: i16,
num_levels: usize,
max_def_level: i16,
) -> Result<(usize, usize)> {
Err(nyi_err!("https://github.com/apache/arrow-rs/issues/1792"))
match &mut self.decoder {
MaybePacked::Fallback(decoder) => {
decoder.skip_def_levels(num_levels, max_def_level)
}
MaybePacked::Packed(decoder) => decoder.skip(num_levels),
}
}
}

Expand Down Expand Up @@ -248,7 +254,9 @@ struct PackedDecoder {

impl PackedDecoder {
fn next_rle_block(&mut self) -> Result<()> {
let indicator_value = self.decode_header()?;
let indicator_value = self
.decode_header()
.expect("decode_header fail in PackedDecoder");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this change?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before met not set decode_header when coding. Better have the error info.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before we propogated the error to the caller, instead of panicking?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh! my fault. it should use previous code pass the error. fix in fix comment

if indicator_value & 1 == 1 {
let len = (indicator_value >> 1) as usize;
self.packed_count = len * 8;
Expand Down Expand Up @@ -346,6 +354,41 @@ impl PackedDecoder {
}
Ok(read)
}

/// Skips `level_num` definition levels
///
/// Returns the number of values skipped and the number of levels skipped
fn skip(&mut self, level_num: usize) -> Result<(usize, usize)> {
let mut skipped_value = 0;
let mut skipped_level = 0;
while skipped_level != level_num {
if self.rle_left != 0 {
let to_skip = self.rle_left.min(level_num - skipped_level);
self.rle_left -= to_skip;
skipped_level += to_skip;
if self.rle_value {
skipped_value += to_skip;
}
} else if self.packed_count != self.packed_offset {
let to_skip = (self.packed_count - self.packed_offset)
.min(level_num - skipped_level);
let offset = self.data_offset * 8 + self.packed_offset;
let bit_chunk =
UnalignedBitChunk::new(self.data.as_ref(), offset, to_skip);
skipped_value += bit_chunk.count_ones();
self.packed_offset += to_skip;
skipped_level += to_skip;
if self.packed_offset == self.packed_count {
self.data_offset += self.packed_count / 8;
}
} else if self.data_offset == self.data.len() {
break;
} else {
self.next_rle_block()?
}
}
Ok((skipped_value, skipped_level))
}
}

#[cfg(test)]
Expand Down Expand Up @@ -392,6 +435,67 @@ mod tests {
assert_eq!(decoded.as_slice(), expected.as_slice());
}

#[test]
fn test_packed_decoder_skip() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

let mut rng = thread_rng();
let len: usize = rng.gen_range(512..1024);

let mut expected = BooleanBufferBuilder::new(len);
let mut encoder = RleEncoder::new(1, 1024);

let mut total_value = 0;
for _ in 0..len {
let bool = rng.gen_bool(0.8);
assert!(encoder.put(bool as u64).unwrap());
expected.append(bool);
if bool {
total_value += 1;
}
}
assert_eq!(expected.len(), len);

let encoded = encoder.consume().unwrap();
let mut decoder = PackedDecoder::new();
decoder.set_data(Encoding::RLE, ByteBufferPtr::new(encoded));

let mut skip_value = 0;
let mut read_value = 0;
let mut skip_level = 0;
let mut read_level = 0;

loop {
let offset = skip_level + read_level;
let remaining_levels = len - offset;
if remaining_levels == 0 {
break;
}
let to_read_or_skip_level = rng.gen_range(1..=remaining_levels);
if rng.gen_bool(0.5) {
let (skip_val_num, skip_level_num) =
decoder.skip(to_read_or_skip_level).unwrap();
skip_value += skip_val_num;
skip_level += skip_level_num
} else {
let mut decoded = BooleanBufferBuilder::new(to_read_or_skip_level);
let read_level_num =
decoder.read(&mut decoded, to_read_or_skip_level).unwrap();
read_level += read_level_num;
for i in 0..read_level_num {
assert!(!decoded.is_empty());
//check each read bit
let read_bit = decoded.get_bit(i);
if read_bit {
read_value += 1;
}
let expect_bit = expected.get_bit(i + offset);
assert_eq!(read_bit, expect_bit);
}
}
}
assert_eq!(read_level + skip_level, len);
assert_eq!(read_value + skip_value, total_value);
}

#[test]
fn test_split_off() {
let t = Type::primitive_type_builder("col", PhysicalType::INT32)
Expand Down
37 changes: 34 additions & 3 deletions parquet/src/column/reader/decoder.rs
Expand Up @@ -318,10 +318,41 @@ impl ColumnLevelDecoder for ColumnLevelDecoderImpl {
impl DefinitionLevelDecoder for ColumnLevelDecoderImpl {
fn skip_def_levels(
&mut self,
_num_levels: usize,
_max_def_level: i16,
num_levels: usize,
max_def_level: i16,
) -> Result<(usize, usize)> {
Err(nyi_err!("https://github.com/apache/arrow-rs/issues/1792"))
let mut level_skip = 0;
let mut value_skip = 0;
match self.decoder.as_mut().unwrap() {
LevelDecoderInner::Packed(reader, bit_width) => {
for _ in 0..num_levels {
Ted-Jiang marked this conversation as resolved.
Show resolved Hide resolved
// Values are delimited by max_def_level
if max_def_level
== reader
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will test in #2106

.get_value::<i16>(*bit_width as usize)
.expect("Not enough values in Packed ColumnLevelDecoderImpl.")
{
value_skip += 1;
}
level_skip += 1;
}
}
LevelDecoderInner::Rle(reader) => {
for _ in 0..num_levels {
if let Some(level) = reader
.get::<i16>()
.expect("Not enough values in Rle ColumnLevelDecoderImpl.")
{
// Values are delimited by max_def_level
if level == max_def_level {
value_skip += 1;
}
}
level_skip += 1;
}
}
}
Ok((value_skip, level_skip))
}
}

Expand Down