From 6e6d39cbdb23bf6ebe63420b633d259bccd367f7 Mon Sep 17 00:00:00 2001 From: yangjiang Date: Sun, 24 Jul 2022 18:32:37 +0800 Subject: [PATCH] fix and refine support skip definition_level --- .../arrow/record_reader/definition_levels.rs | 97 +++++++++++-------- parquet/src/column/reader/decoder.rs | 41 +++++--- 2 files changed, 84 insertions(+), 54 deletions(-) diff --git a/parquet/src/arrow/record_reader/definition_levels.rs b/parquet/src/arrow/record_reader/definition_levels.rs index cb6aedf8dcb..2cd63c5b3a1 100644 --- a/parquet/src/arrow/record_reader/definition_levels.rs +++ b/parquet/src/arrow/record_reader/definition_levels.rs @@ -20,6 +20,7 @@ use std::ops::Range; use arrow::array::BooleanBufferBuilder; use arrow::bitmap::Bitmap; use arrow::buffer::Buffer; +use arrow::util::bit_chunk_iterator::UnalignedBitChunk; use crate::arrow::buffer::bit_util::count_set_bits; use crate::arrow::record_reader::buffer::BufferQueue; @@ -219,23 +220,11 @@ impl DefinitionLevelDecoder for DefinitionLevelBufferDecoder { num_levels: usize, max_def_level: i16, ) -> Result<(usize, usize)> { - // For now only support max_def_level == 1 - if max_def_level == 1 { - let decoder = match self.data.take() { - Some(data) => self - .packed_decoder - .insert(PackedDecoder::new(self.encoding, data)), - None => self - .packed_decoder - .as_mut() - .expect("consistent null_mask_only"), - }; - let skip = decoder.skip(num_levels).unwrap(); - Ok((skip, skip)) - } else { - Err(nyi_err!( - "For now only support skip when max_def_level == 1 && max_rep_level == 0" - )) + match &mut self.decoder { + MaybePacked::Fallback(decoder) => { + decoder.skip_def_levels(num_levels, max_def_level) + } + MaybePacked::Packed(decoder) => decoder.skip(num_levels), } } } @@ -366,18 +355,29 @@ impl PackedDecoder { Ok(read) } - fn skip(&mut self, len: usize) -> Result { - let mut skipped = 0; - while skipped != len { + /// Skips `level_num` definition levels + /// + /// Returns the number of values skipped and the number of levels skipped + fn skip(&mut self, level_num: usize) -> Result<(usize, usize)> { + let mut skipped_value = 0; + let mut skipped_level = 0; + while skipped_level != level_num { if self.rle_left != 0 { - let to_skip = self.rle_left.min(len - skipped); + let to_skip = self.rle_left.min(level_num - skipped_level); self.rle_left -= to_skip; - skipped += to_skip; + skipped_level += to_skip; + if self.rle_value { + skipped_value += to_skip; + } } else if self.packed_count != self.packed_offset { - let to_skip = (self.packed_count - self.packed_offset).min(len - skipped); + let to_skip = (self.packed_count - self.packed_offset) + .min(level_num - skipped_level); + let offset = self.data_offset * 8 + self.packed_offset; + let bit_chunk = + UnalignedBitChunk::new(self.data.as_ref(), offset, to_skip); + skipped_value += bit_chunk.count_ones(); self.packed_offset += to_skip; - skipped += to_skip; - + skipped_level += to_skip; if self.packed_offset == self.packed_count { self.data_offset += self.packed_count / 8; } @@ -387,7 +387,7 @@ impl PackedDecoder { self.next_rle_block()? } } - Ok(skipped) + Ok((skipped_value, skipped_level)) } } @@ -438,45 +438,62 @@ mod tests { #[test] fn test_packed_decoder_skip() { let mut rng = thread_rng(); - let len: usize = rng.gen_range(512..1024) * 8; + let len: usize = rng.gen_range(512..1024); let mut expected = BooleanBufferBuilder::new(len); - let mut encoder = RleEncoder::new(1, 1024 * 8); + let mut encoder = RleEncoder::new(1, 1024); + let mut total_value = 0; for _ in 0..len { let bool = rng.gen_bool(0.8); assert!(encoder.put(bool as u64).unwrap()); expected.append(bool); + if bool { + total_value += 1; + } } assert_eq!(expected.len(), len); let encoded = encoder.consume().unwrap(); - let mut decoder = PackedDecoder::new(Encoding::RLE, ByteBufferPtr::new(encoded)); + let mut decoder = PackedDecoder::new(); + decoder.set_data(Encoding::RLE, ByteBufferPtr::new(encoded)); let mut skip_value = 0; let mut read_value = 0; + let mut skip_level = 0; + let mut read_level = 0; loop { - let offset = read_value + skip_value; - let remaining = len - offset; - if remaining == 0 { + let offset = skip_level + read_level; + let remaining_levels = len - offset; + if remaining_levels == 0 { break; } - let to_read = (rng.gen_range(1..=remaining)) / 8 * 8; + let to_read_or_skip_level = rng.gen_range(1..=remaining_levels); if rng.gen_bool(0.5) { - skip_value += decoder.skip(to_read).unwrap(); - } else if to_read > 0 { - let mut decoded = BooleanBufferBuilder::new(to_read); - read_value += decoder.read(&mut decoded, to_read).unwrap(); - for i in 0..to_read { - //check each bit + let (skip_val_num, skip_level_num) = + decoder.skip(to_read_or_skip_level).unwrap(); + skip_value += skip_val_num; + skip_level += skip_level_num + } else { + let mut decoded = BooleanBufferBuilder::new(to_read_or_skip_level); + let read_level_num = + decoder.read(&mut decoded, to_read_or_skip_level).unwrap(); + read_level += read_level_num; + for i in 0..read_level_num { + assert!(decoded.len() > 0); + //check each read bit let read_bit = decoded.get_bit(i); + if read_bit { + read_value += 1; + } let expect_bit = expected.get_bit(i + offset); assert_eq!(read_bit, expect_bit); } } } - assert_eq!(read_value + skip_value, len); + assert_eq!(read_level + skip_level, len); + assert_eq!(read_value + skip_value, total_value); } #[test] diff --git a/parquet/src/column/reader/decoder.rs b/parquet/src/column/reader/decoder.rs index 57678d27e91..b95b24a21c4 100644 --- a/parquet/src/column/reader/decoder.rs +++ b/parquet/src/column/reader/decoder.rs @@ -321,25 +321,38 @@ impl DefinitionLevelDecoder for ColumnLevelDecoderImpl { num_levels: usize, max_def_level: i16, ) -> Result<(usize, usize)> { - // For now only support max_def_level == 1 - if max_def_level == 1 { - let mut skip = num_levels; - match &mut self.inner { - LevelDecoderInner::Packed(reader, _bit_width) => { - while !reader.skip_value(skip) { - skip /= 2; + let mut level_skip = 0; + let mut value_skip = 0; + match self.decoder.as_mut().unwrap() { + LevelDecoderInner::Packed(reader, bit_width) => { + for _ in 0..num_levels { + // Values are delimited by max_def_level + if max_def_level + == reader + .get_value::(*bit_width as usize) + .expect("Not enough values in Packed ColumnLevelDecoderImpl.") + { + value_skip += 1; } + level_skip += 1; } - LevelDecoderInner::Rle(reader) => { - skip = reader.skip(skip).unwrap(); + } + LevelDecoderInner::Rle(reader) => { + for _ in 0..num_levels { + if let Some(level) = reader + .get::() + .expect("Not enough values in Rle ColumnLevelDecoderImpl.") + { + // Values are delimited by max_def_level + if level == max_def_level { + value_skip += 1; + } + } + level_skip += 1; } } - Ok((skip, skip)) - } else { - Err(nyi_err!( - "For now only support skip when max_def_level == 1 && max_rep_level == 0" - )) } + Ok((value_skip, level_skip)) } }