From fce66260015642e691f246d29666d8f9c3197f8a Mon Sep 17 00:00:00 2001 From: Yang Jiang Date: Sun, 24 Jul 2022 23:46:03 +0800 Subject: [PATCH] Support skip_def_levels (only max_def_levels=1) for ColumnLevelDecoder (#2111) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Support skip_def_levels (only max_def_levels=1) for ColumnLevelDecoder * Support skip_def_levels (only max_def_levels=1) for ColumnLevelDecoderImpl * fix ut * fix and refine support skip definition_level * fix clippy * fix comment --- .../arrow/record_reader/definition_levels.rs | 108 +++++++++++++++++- parquet/src/column/reader/decoder.rs | 37 +++++- 2 files changed, 139 insertions(+), 6 deletions(-) diff --git a/parquet/src/arrow/record_reader/definition_levels.rs b/parquet/src/arrow/record_reader/definition_levels.rs index a12772af024..53eeab9a514 100644 --- a/parquet/src/arrow/record_reader/definition_levels.rs +++ b/parquet/src/arrow/record_reader/definition_levels.rs @@ -20,6 +20,7 @@ use std::ops::Range; use arrow::array::BooleanBufferBuilder; use arrow::bitmap::Bitmap; use arrow::buffer::Buffer; +use arrow::util::bit_chunk_iterator::UnalignedBitChunk; use crate::arrow::buffer::bit_util::count_set_bits; use crate::arrow::record_reader::buffer::BufferQueue; @@ -216,10 +217,15 @@ impl ColumnLevelDecoder for DefinitionLevelBufferDecoder { impl DefinitionLevelDecoder for DefinitionLevelBufferDecoder { fn skip_def_levels( &mut self, - _num_levels: usize, - _max_def_level: i16, + num_levels: usize, + max_def_level: i16, ) -> Result<(usize, usize)> { - Err(nyi_err!("https://github.com/apache/arrow-rs/issues/1792")) + match &mut self.decoder { + MaybePacked::Fallback(decoder) => { + decoder.skip_def_levels(num_levels, max_def_level) + } + MaybePacked::Packed(decoder) => decoder.skip(num_levels), + } } } @@ -346,6 +352,41 @@ impl PackedDecoder { } Ok(read) } + + /// Skips `level_num` definition levels + /// + /// Returns the number of values skipped and the number of levels skipped + fn skip(&mut self, level_num: usize) -> Result<(usize, usize)> { + let mut skipped_value = 0; + let mut skipped_level = 0; + while skipped_level != level_num { + if self.rle_left != 0 { + let to_skip = self.rle_left.min(level_num - skipped_level); + self.rle_left -= to_skip; + skipped_level += to_skip; + if self.rle_value { + skipped_value += to_skip; + } + } else if self.packed_count != self.packed_offset { + let to_skip = (self.packed_count - self.packed_offset) + .min(level_num - skipped_level); + let offset = self.data_offset * 8 + self.packed_offset; + let bit_chunk = + UnalignedBitChunk::new(self.data.as_ref(), offset, to_skip); + skipped_value += bit_chunk.count_ones(); + self.packed_offset += to_skip; + skipped_level += to_skip; + if self.packed_offset == self.packed_count { + self.data_offset += self.packed_count / 8; + } + } else if self.data_offset == self.data.len() { + break; + } else { + self.next_rle_block()? + } + } + Ok((skipped_value, skipped_level)) + } } #[cfg(test)] @@ -392,6 +433,67 @@ mod tests { assert_eq!(decoded.as_slice(), expected.as_slice()); } + #[test] + fn test_packed_decoder_skip() { + let mut rng = thread_rng(); + let len: usize = rng.gen_range(512..1024); + + let mut expected = BooleanBufferBuilder::new(len); + let mut encoder = RleEncoder::new(1, 1024); + + let mut total_value = 0; + for _ in 0..len { + let bool = rng.gen_bool(0.8); + assert!(encoder.put(bool as u64).unwrap()); + expected.append(bool); + if bool { + total_value += 1; + } + } + assert_eq!(expected.len(), len); + + let encoded = encoder.consume().unwrap(); + let mut decoder = PackedDecoder::new(); + decoder.set_data(Encoding::RLE, ByteBufferPtr::new(encoded)); + + let mut skip_value = 0; + let mut read_value = 0; + let mut skip_level = 0; + let mut read_level = 0; + + loop { + let offset = skip_level + read_level; + let remaining_levels = len - offset; + if remaining_levels == 0 { + break; + } + let to_read_or_skip_level = rng.gen_range(1..=remaining_levels); + if rng.gen_bool(0.5) { + let (skip_val_num, skip_level_num) = + decoder.skip(to_read_or_skip_level).unwrap(); + skip_value += skip_val_num; + skip_level += skip_level_num + } else { + let mut decoded = BooleanBufferBuilder::new(to_read_or_skip_level); + let read_level_num = + decoder.read(&mut decoded, to_read_or_skip_level).unwrap(); + read_level += read_level_num; + for i in 0..read_level_num { + assert!(!decoded.is_empty()); + //check each read bit + let read_bit = decoded.get_bit(i); + if read_bit { + read_value += 1; + } + let expect_bit = expected.get_bit(i + offset); + assert_eq!(read_bit, expect_bit); + } + } + } + assert_eq!(read_level + skip_level, len); + assert_eq!(read_value + skip_value, total_value); + } + #[test] fn test_split_off() { let t = Type::primitive_type_builder("col", PhysicalType::INT32) diff --git a/parquet/src/column/reader/decoder.rs b/parquet/src/column/reader/decoder.rs index 5879c61804e..b95b24a21c4 100644 --- a/parquet/src/column/reader/decoder.rs +++ b/parquet/src/column/reader/decoder.rs @@ -318,10 +318,41 @@ impl ColumnLevelDecoder for ColumnLevelDecoderImpl { impl DefinitionLevelDecoder for ColumnLevelDecoderImpl { fn skip_def_levels( &mut self, - _num_levels: usize, - _max_def_level: i16, + num_levels: usize, + max_def_level: i16, ) -> Result<(usize, usize)> { - Err(nyi_err!("https://github.com/apache/arrow-rs/issues/1792")) + let mut level_skip = 0; + let mut value_skip = 0; + match self.decoder.as_mut().unwrap() { + LevelDecoderInner::Packed(reader, bit_width) => { + for _ in 0..num_levels { + // Values are delimited by max_def_level + if max_def_level + == reader + .get_value::(*bit_width as usize) + .expect("Not enough values in Packed ColumnLevelDecoderImpl.") + { + value_skip += 1; + } + level_skip += 1; + } + } + LevelDecoderInner::Rle(reader) => { + for _ in 0..num_levels { + if let Some(level) = reader + .get::() + .expect("Not enough values in Rle ColumnLevelDecoderImpl.") + { + // Values are delimited by max_def_level + if level == max_def_level { + value_skip += 1; + } + } + level_skip += 1; + } + } + } + Ok((value_skip, level_skip)) } }