From 8f4f8b54a20b6a7d0414e4d471dfbdf1cad21a25 Mon Sep 17 00:00:00 2001 From: yangjiang Date: Wed, 20 Jul 2022 15:39:45 +0800 Subject: [PATCH 1/4] =?UTF-8?q?Support=20skip=5Fdef=5Flevels=20=EF=BC=88on?= =?UTF-8?q?ly=20max=5Fdef=5Flevels=3D1=EF=BC=89=20for=20ColumnLevelDecoder?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../arrow/record_reader/definition_levels.rs | 100 +++++++++++++++++- 1 file changed, 96 insertions(+), 4 deletions(-) diff --git a/parquet/src/arrow/record_reader/definition_levels.rs b/parquet/src/arrow/record_reader/definition_levels.rs index a12772af024..de6846c2e00 100644 --- a/parquet/src/arrow/record_reader/definition_levels.rs +++ b/parquet/src/arrow/record_reader/definition_levels.rs @@ -216,10 +216,27 @@ impl ColumnLevelDecoder for DefinitionLevelBufferDecoder { impl DefinitionLevelDecoder for DefinitionLevelBufferDecoder { fn skip_def_levels( &mut self, - _num_levels: usize, - _max_def_level: i16, + num_levels: usize, + max_def_level: i16, ) -> Result<(usize, usize)> { - Err(nyi_err!("https://github.com/apache/arrow-rs/issues/1792")) + // For now only support max_def_level == 1 + if max_def_level == 1 { + let decoder = match self.data.take() { + Some(data) => self + .packed_decoder + .insert(PackedDecoder::new(self.encoding, data)), + None => self + .packed_decoder + .as_mut() + .expect("consistent null_mask_only"), + }; + let skip = decoder.skip(num_levels).unwrap(); + Ok((skip, skip)) + } else { + Err(nyi_err!( + "For now only support skip when max_def_level == 1 && max_rep_level == 0" + )) + } } } @@ -248,7 +265,9 @@ struct PackedDecoder { impl PackedDecoder { fn next_rle_block(&mut self) -> Result<()> { - let indicator_value = self.decode_header()?; + let indicator_value = self + .decode_header() + .expect("decode_header fail in PackedDecoder"); if indicator_value & 1 == 1 { let len = (indicator_value >> 1) as usize; self.packed_count = len * 8; @@ -346,6 +365,30 @@ impl PackedDecoder { } Ok(read) } + + fn skip(&mut self, len: usize) -> Result { + let mut skipped = 0; + while skipped != len { + if self.rle_left != 0 { + let to_skip = self.rle_left.min(len - skipped); + self.rle_left -= to_skip; + skipped += to_skip; + } else if self.packed_count != self.packed_offset { + let to_skip = (self.packed_count - self.packed_offset).min(len - skipped); + self.packed_offset += to_skip; + skipped += to_skip; + + if self.packed_offset == self.packed_count { + self.data_offset += self.packed_count / 8; + } + } else if self.data_offset == self.data.len() { + break; + } else { + self.next_rle_block()? + } + } + Ok(skipped) + } } #[cfg(test)] @@ -392,6 +435,55 @@ mod tests { assert_eq!(decoded.as_slice(), expected.as_slice()); } + #[test] + fn test_packed_decoder_skip() { + let mut rng = thread_rng(); + let len: usize = rng.gen_range(512..1024) * 8; + + let mut expected = BooleanBufferBuilder::new(len); + let mut encoder = RleEncoder::new(1, 1024 * 8); + + for _ in 0..len { + let bool = rng.gen_bool(0.8); + assert!(encoder.put(bool as u64).unwrap()); + expected.append(bool); + } + assert_eq!(expected.len(), len); + + let encoded = encoder.consume().unwrap(); + let mut decoder = PackedDecoder::new(Encoding::RLE, ByteBufferPtr::new(encoded)); + + let mut skip_value = 0; + let mut read_value = 0; + let mut read_data = vec![]; + + loop { + let remaining = len - read_value - skip_value; + if remaining == 0 { + break; + } + let to_read = (rng.gen_range(1..=remaining)) / 8 * 8; + if rng.gen_bool(0.5) { + skip_value += decoder.skip(to_read).unwrap(); + } else if to_read > 0 { + let mut decoded = BooleanBufferBuilder::new(to_read); + read_value += decoder.read(&mut decoded, to_read).unwrap(); + read_data.push(decoded.as_slice().to_vec()); + } + } + + assert_eq!(read_value + skip_value, len); + + let expected = expected.as_slice(); + for data in read_data.iter().enumerate() { + assert!(find_subsequence(expected, data.1).is_some()); + } + } + + fn find_subsequence(u1: &[u8], u2: &Vec) -> Option { + u1.windows(u2.len()).position(|window| window == u2) + } + #[test] fn test_split_off() { let t = Type::primitive_type_builder("col", PhysicalType::INT32) From 9b2d7e70a63d78d0c1fbdb1b93802bb466a576be Mon Sep 17 00:00:00 2001 From: yangjiang Date: Wed, 20 Jul 2022 19:43:05 +0800 Subject: [PATCH 2/4] =?UTF-8?q?Support=20skip=5Fdef=5Flevels=20=EF=BC=88on?= =?UTF-8?q?ly=20max=5Fdef=5Flevels=3D1=EF=BC=89=20for=20ColumnLevelDecoder?= =?UTF-8?q?Impl?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- parquet/src/column/reader/decoder.rs | 24 +++++++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/parquet/src/column/reader/decoder.rs b/parquet/src/column/reader/decoder.rs index 5879c61804e..57678d27e91 100644 --- a/parquet/src/column/reader/decoder.rs +++ b/parquet/src/column/reader/decoder.rs @@ -318,10 +318,28 @@ impl ColumnLevelDecoder for ColumnLevelDecoderImpl { impl DefinitionLevelDecoder for ColumnLevelDecoderImpl { fn skip_def_levels( &mut self, - _num_levels: usize, - _max_def_level: i16, + num_levels: usize, + max_def_level: i16, ) -> Result<(usize, usize)> { - Err(nyi_err!("https://github.com/apache/arrow-rs/issues/1792")) + // For now only support max_def_level == 1 + if max_def_level == 1 { + let mut skip = num_levels; + match &mut self.inner { + LevelDecoderInner::Packed(reader, _bit_width) => { + while !reader.skip_value(skip) { + skip /= 2; + } + } + LevelDecoderInner::Rle(reader) => { + skip = reader.skip(skip).unwrap(); + } + } + Ok((skip, skip)) + } else { + Err(nyi_err!( + "For now only support skip when max_def_level == 1 && max_rep_level == 0" + )) + } } } From 881cc9b046f4ee03f1afd3f9cae0d7e6bde95e58 Mon Sep 17 00:00:00 2001 From: yangjiang Date: Thu, 21 Jul 2022 15:03:41 +0800 Subject: [PATCH 3/4] fix ut --- .../arrow/record_reader/definition_levels.rs | 21 +++++++------------ 1 file changed, 8 insertions(+), 13 deletions(-) diff --git a/parquet/src/arrow/record_reader/definition_levels.rs b/parquet/src/arrow/record_reader/definition_levels.rs index de6846c2e00..cb6aedf8dcb 100644 --- a/parquet/src/arrow/record_reader/definition_levels.rs +++ b/parquet/src/arrow/record_reader/definition_levels.rs @@ -455,10 +455,10 @@ mod tests { let mut skip_value = 0; let mut read_value = 0; - let mut read_data = vec![]; loop { - let remaining = len - read_value - skip_value; + let offset = read_value + skip_value; + let remaining = len - offset; if remaining == 0 { break; } @@ -468,20 +468,15 @@ mod tests { } else if to_read > 0 { let mut decoded = BooleanBufferBuilder::new(to_read); read_value += decoder.read(&mut decoded, to_read).unwrap(); - read_data.push(decoded.as_slice().to_vec()); + for i in 0..to_read { + //check each bit + let read_bit = decoded.get_bit(i); + let expect_bit = expected.get_bit(i + offset); + assert_eq!(read_bit, expect_bit); + } } } - assert_eq!(read_value + skip_value, len); - - let expected = expected.as_slice(); - for data in read_data.iter().enumerate() { - assert!(find_subsequence(expected, data.1).is_some()); - } - } - - fn find_subsequence(u1: &[u8], u2: &Vec) -> Option { - u1.windows(u2.len()).position(|window| window == u2) } #[test] From 6e6d39cbdb23bf6ebe63420b633d259bccd367f7 Mon Sep 17 00:00:00 2001 From: yangjiang Date: Sun, 24 Jul 2022 18:32:37 +0800 Subject: [PATCH 4/4] fix and refine support skip definition_level --- .../arrow/record_reader/definition_levels.rs | 97 +++++++++++-------- parquet/src/column/reader/decoder.rs | 41 +++++--- 2 files changed, 84 insertions(+), 54 deletions(-) diff --git a/parquet/src/arrow/record_reader/definition_levels.rs b/parquet/src/arrow/record_reader/definition_levels.rs index cb6aedf8dcb..2cd63c5b3a1 100644 --- a/parquet/src/arrow/record_reader/definition_levels.rs +++ b/parquet/src/arrow/record_reader/definition_levels.rs @@ -20,6 +20,7 @@ use std::ops::Range; use arrow::array::BooleanBufferBuilder; use arrow::bitmap::Bitmap; use arrow::buffer::Buffer; +use arrow::util::bit_chunk_iterator::UnalignedBitChunk; use crate::arrow::buffer::bit_util::count_set_bits; use crate::arrow::record_reader::buffer::BufferQueue; @@ -219,23 +220,11 @@ impl DefinitionLevelDecoder for DefinitionLevelBufferDecoder { num_levels: usize, max_def_level: i16, ) -> Result<(usize, usize)> { - // For now only support max_def_level == 1 - if max_def_level == 1 { - let decoder = match self.data.take() { - Some(data) => self - .packed_decoder - .insert(PackedDecoder::new(self.encoding, data)), - None => self - .packed_decoder - .as_mut() - .expect("consistent null_mask_only"), - }; - let skip = decoder.skip(num_levels).unwrap(); - Ok((skip, skip)) - } else { - Err(nyi_err!( - "For now only support skip when max_def_level == 1 && max_rep_level == 0" - )) + match &mut self.decoder { + MaybePacked::Fallback(decoder) => { + decoder.skip_def_levels(num_levels, max_def_level) + } + MaybePacked::Packed(decoder) => decoder.skip(num_levels), } } } @@ -366,18 +355,29 @@ impl PackedDecoder { Ok(read) } - fn skip(&mut self, len: usize) -> Result { - let mut skipped = 0; - while skipped != len { + /// Skips `level_num` definition levels + /// + /// Returns the number of values skipped and the number of levels skipped + fn skip(&mut self, level_num: usize) -> Result<(usize, usize)> { + let mut skipped_value = 0; + let mut skipped_level = 0; + while skipped_level != level_num { if self.rle_left != 0 { - let to_skip = self.rle_left.min(len - skipped); + let to_skip = self.rle_left.min(level_num - skipped_level); self.rle_left -= to_skip; - skipped += to_skip; + skipped_level += to_skip; + if self.rle_value { + skipped_value += to_skip; + } } else if self.packed_count != self.packed_offset { - let to_skip = (self.packed_count - self.packed_offset).min(len - skipped); + let to_skip = (self.packed_count - self.packed_offset) + .min(level_num - skipped_level); + let offset = self.data_offset * 8 + self.packed_offset; + let bit_chunk = + UnalignedBitChunk::new(self.data.as_ref(), offset, to_skip); + skipped_value += bit_chunk.count_ones(); self.packed_offset += to_skip; - skipped += to_skip; - + skipped_level += to_skip; if self.packed_offset == self.packed_count { self.data_offset += self.packed_count / 8; } @@ -387,7 +387,7 @@ impl PackedDecoder { self.next_rle_block()? } } - Ok(skipped) + Ok((skipped_value, skipped_level)) } } @@ -438,45 +438,62 @@ mod tests { #[test] fn test_packed_decoder_skip() { let mut rng = thread_rng(); - let len: usize = rng.gen_range(512..1024) * 8; + let len: usize = rng.gen_range(512..1024); let mut expected = BooleanBufferBuilder::new(len); - let mut encoder = RleEncoder::new(1, 1024 * 8); + let mut encoder = RleEncoder::new(1, 1024); + let mut total_value = 0; for _ in 0..len { let bool = rng.gen_bool(0.8); assert!(encoder.put(bool as u64).unwrap()); expected.append(bool); + if bool { + total_value += 1; + } } assert_eq!(expected.len(), len); let encoded = encoder.consume().unwrap(); - let mut decoder = PackedDecoder::new(Encoding::RLE, ByteBufferPtr::new(encoded)); + let mut decoder = PackedDecoder::new(); + decoder.set_data(Encoding::RLE, ByteBufferPtr::new(encoded)); let mut skip_value = 0; let mut read_value = 0; + let mut skip_level = 0; + let mut read_level = 0; loop { - let offset = read_value + skip_value; - let remaining = len - offset; - if remaining == 0 { + let offset = skip_level + read_level; + let remaining_levels = len - offset; + if remaining_levels == 0 { break; } - let to_read = (rng.gen_range(1..=remaining)) / 8 * 8; + let to_read_or_skip_level = rng.gen_range(1..=remaining_levels); if rng.gen_bool(0.5) { - skip_value += decoder.skip(to_read).unwrap(); - } else if to_read > 0 { - let mut decoded = BooleanBufferBuilder::new(to_read); - read_value += decoder.read(&mut decoded, to_read).unwrap(); - for i in 0..to_read { - //check each bit + let (skip_val_num, skip_level_num) = + decoder.skip(to_read_or_skip_level).unwrap(); + skip_value += skip_val_num; + skip_level += skip_level_num + } else { + let mut decoded = BooleanBufferBuilder::new(to_read_or_skip_level); + let read_level_num = + decoder.read(&mut decoded, to_read_or_skip_level).unwrap(); + read_level += read_level_num; + for i in 0..read_level_num { + assert!(decoded.len() > 0); + //check each read bit let read_bit = decoded.get_bit(i); + if read_bit { + read_value += 1; + } let expect_bit = expected.get_bit(i + offset); assert_eq!(read_bit, expect_bit); } } } - assert_eq!(read_value + skip_value, len); + assert_eq!(read_level + skip_level, len); + assert_eq!(read_value + skip_value, total_value); } #[test] diff --git a/parquet/src/column/reader/decoder.rs b/parquet/src/column/reader/decoder.rs index 57678d27e91..b95b24a21c4 100644 --- a/parquet/src/column/reader/decoder.rs +++ b/parquet/src/column/reader/decoder.rs @@ -321,25 +321,38 @@ impl DefinitionLevelDecoder for ColumnLevelDecoderImpl { num_levels: usize, max_def_level: i16, ) -> Result<(usize, usize)> { - // For now only support max_def_level == 1 - if max_def_level == 1 { - let mut skip = num_levels; - match &mut self.inner { - LevelDecoderInner::Packed(reader, _bit_width) => { - while !reader.skip_value(skip) { - skip /= 2; + let mut level_skip = 0; + let mut value_skip = 0; + match self.decoder.as_mut().unwrap() { + LevelDecoderInner::Packed(reader, bit_width) => { + for _ in 0..num_levels { + // Values are delimited by max_def_level + if max_def_level + == reader + .get_value::(*bit_width as usize) + .expect("Not enough values in Packed ColumnLevelDecoderImpl.") + { + value_skip += 1; } + level_skip += 1; } - LevelDecoderInner::Rle(reader) => { - skip = reader.skip(skip).unwrap(); + } + LevelDecoderInner::Rle(reader) => { + for _ in 0..num_levels { + if let Some(level) = reader + .get::() + .expect("Not enough values in Rle ColumnLevelDecoderImpl.") + { + // Values are delimited by max_def_level + if level == max_def_level { + value_skip += 1; + } + } + level_skip += 1; } } - Ok((skip, skip)) - } else { - Err(nyi_err!( - "For now only support skip when max_def_level == 1 && max_rep_level == 0" - )) } + Ok((value_skip, level_skip)) } }