Skip to content

Commit

Permalink
fix and refine support skip definition_level
Browse files Browse the repository at this point in the history
  • Loading branch information
Ted-Jiang committed Jul 24, 2022
1 parent 881cc9b commit 6e6d39c
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 54 deletions.
97 changes: 57 additions & 40 deletions parquet/src/arrow/record_reader/definition_levels.rs
Expand Up @@ -20,6 +20,7 @@ use std::ops::Range;
use arrow::array::BooleanBufferBuilder;
use arrow::bitmap::Bitmap;
use arrow::buffer::Buffer;
use arrow::util::bit_chunk_iterator::UnalignedBitChunk;

use crate::arrow::buffer::bit_util::count_set_bits;
use crate::arrow::record_reader::buffer::BufferQueue;
Expand Down Expand Up @@ -219,23 +220,11 @@ impl DefinitionLevelDecoder for DefinitionLevelBufferDecoder {
num_levels: usize,
max_def_level: i16,
) -> Result<(usize, usize)> {
// For now only support max_def_level == 1
if max_def_level == 1 {
let decoder = match self.data.take() {
Some(data) => self
.packed_decoder
.insert(PackedDecoder::new(self.encoding, data)),
None => self
.packed_decoder
.as_mut()
.expect("consistent null_mask_only"),
};
let skip = decoder.skip(num_levels).unwrap();
Ok((skip, skip))
} else {
Err(nyi_err!(
"For now only support skip when max_def_level == 1 && max_rep_level == 0"
))
match &mut self.decoder {
MaybePacked::Fallback(decoder) => {
decoder.skip_def_levels(num_levels, max_def_level)
}
MaybePacked::Packed(decoder) => decoder.skip(num_levels),
}
}
}
Expand Down Expand Up @@ -366,18 +355,29 @@ impl PackedDecoder {
Ok(read)
}

fn skip(&mut self, len: usize) -> Result<usize> {
let mut skipped = 0;
while skipped != len {
/// Skips `level_num` definition levels
///
/// Returns the number of values skipped and the number of levels skipped
fn skip(&mut self, level_num: usize) -> Result<(usize, usize)> {
let mut skipped_value = 0;
let mut skipped_level = 0;
while skipped_level != level_num {
if self.rle_left != 0 {
let to_skip = self.rle_left.min(len - skipped);
let to_skip = self.rle_left.min(level_num - skipped_level);
self.rle_left -= to_skip;
skipped += to_skip;
skipped_level += to_skip;
if self.rle_value {
skipped_value += to_skip;
}
} else if self.packed_count != self.packed_offset {
let to_skip = (self.packed_count - self.packed_offset).min(len - skipped);
let to_skip = (self.packed_count - self.packed_offset)
.min(level_num - skipped_level);
let offset = self.data_offset * 8 + self.packed_offset;
let bit_chunk =
UnalignedBitChunk::new(self.data.as_ref(), offset, to_skip);
skipped_value += bit_chunk.count_ones();
self.packed_offset += to_skip;
skipped += to_skip;

skipped_level += to_skip;
if self.packed_offset == self.packed_count {
self.data_offset += self.packed_count / 8;
}
Expand All @@ -387,7 +387,7 @@ impl PackedDecoder {
self.next_rle_block()?
}
}
Ok(skipped)
Ok((skipped_value, skipped_level))
}
}

Expand Down Expand Up @@ -438,45 +438,62 @@ mod tests {
#[test]
fn test_packed_decoder_skip() {
let mut rng = thread_rng();
let len: usize = rng.gen_range(512..1024) * 8;
let len: usize = rng.gen_range(512..1024);

let mut expected = BooleanBufferBuilder::new(len);
let mut encoder = RleEncoder::new(1, 1024 * 8);
let mut encoder = RleEncoder::new(1, 1024);

let mut total_value = 0;
for _ in 0..len {
let bool = rng.gen_bool(0.8);
assert!(encoder.put(bool as u64).unwrap());
expected.append(bool);
if bool {
total_value += 1;
}
}
assert_eq!(expected.len(), len);

let encoded = encoder.consume().unwrap();
let mut decoder = PackedDecoder::new(Encoding::RLE, ByteBufferPtr::new(encoded));
let mut decoder = PackedDecoder::new();
decoder.set_data(Encoding::RLE, ByteBufferPtr::new(encoded));

let mut skip_value = 0;
let mut read_value = 0;
let mut skip_level = 0;
let mut read_level = 0;

loop {
let offset = read_value + skip_value;
let remaining = len - offset;
if remaining == 0 {
let offset = skip_level + read_level;
let remaining_levels = len - offset;
if remaining_levels == 0 {
break;
}
let to_read = (rng.gen_range(1..=remaining)) / 8 * 8;
let to_read_or_skip_level = rng.gen_range(1..=remaining_levels);
if rng.gen_bool(0.5) {
skip_value += decoder.skip(to_read).unwrap();
} else if to_read > 0 {
let mut decoded = BooleanBufferBuilder::new(to_read);
read_value += decoder.read(&mut decoded, to_read).unwrap();
for i in 0..to_read {
//check each bit
let (skip_val_num, skip_level_num) =
decoder.skip(to_read_or_skip_level).unwrap();
skip_value += skip_val_num;
skip_level += skip_level_num
} else {
let mut decoded = BooleanBufferBuilder::new(to_read_or_skip_level);
let read_level_num =
decoder.read(&mut decoded, to_read_or_skip_level).unwrap();
read_level += read_level_num;
for i in 0..read_level_num {
assert!(decoded.len() > 0);
//check each read bit
let read_bit = decoded.get_bit(i);
if read_bit {
read_value += 1;
}
let expect_bit = expected.get_bit(i + offset);
assert_eq!(read_bit, expect_bit);
}
}
}
assert_eq!(read_value + skip_value, len);
assert_eq!(read_level + skip_level, len);
assert_eq!(read_value + skip_value, total_value);
}

#[test]
Expand Down
41 changes: 27 additions & 14 deletions parquet/src/column/reader/decoder.rs
Expand Up @@ -321,25 +321,38 @@ impl DefinitionLevelDecoder for ColumnLevelDecoderImpl {
num_levels: usize,
max_def_level: i16,
) -> Result<(usize, usize)> {
// For now only support max_def_level == 1
if max_def_level == 1 {
let mut skip = num_levels;
match &mut self.inner {
LevelDecoderInner::Packed(reader, _bit_width) => {
while !reader.skip_value(skip) {
skip /= 2;
let mut level_skip = 0;
let mut value_skip = 0;
match self.decoder.as_mut().unwrap() {
LevelDecoderInner::Packed(reader, bit_width) => {
for _ in 0..num_levels {
// Values are delimited by max_def_level
if max_def_level
== reader
.get_value::<i16>(*bit_width as usize)
.expect("Not enough values in Packed ColumnLevelDecoderImpl.")
{
value_skip += 1;
}
level_skip += 1;
}
LevelDecoderInner::Rle(reader) => {
skip = reader.skip(skip).unwrap();
}
LevelDecoderInner::Rle(reader) => {
for _ in 0..num_levels {
if let Some(level) = reader
.get::<i16>()
.expect("Not enough values in Rle ColumnLevelDecoderImpl.")
{
// Values are delimited by max_def_level
if level == max_def_level {
value_skip += 1;
}
}
level_skip += 1;
}
}
Ok((skip, skip))
} else {
Err(nyi_err!(
"For now only support skip when max_def_level == 1 && max_rep_level == 0"
))
}
Ok((value_skip, level_skip))
}
}

Expand Down

0 comments on commit 6e6d39c

Please sign in to comment.