From 550b5c863caa1ddd9766b7c5bb99b153e3cfc8f6 Mon Sep 17 00:00:00 2001 From: yangjiang Date: Fri, 15 Jul 2022 17:38:45 +0800 Subject: [PATCH 1/3] Support skip_values in ByteArrayColumnValueDecoder --- parquet/src/arrow/array_reader/byte_array.rs | 189 ++++++++++++++++++- 1 file changed, 187 insertions(+), 2 deletions(-) diff --git a/parquet/src/arrow/array_reader/byte_array.rs b/parquet/src/arrow/array_reader/byte_array.rs index b762236c4be..f8c0e95c610 100644 --- a/parquet/src/arrow/array_reader/byte_array.rs +++ b/parquet/src/arrow/array_reader/byte_array.rs @@ -215,8 +215,13 @@ impl ColumnValueDecoder decoder.read(out, range.end - range.start, self.dict.as_ref()) } - fn skip_values(&mut self, _num_values: usize) -> Result { - Err(nyi_err!("https://github.com/apache/arrow-rs/issues/1792")) + fn skip_values(&mut self, num_values: usize) -> Result { + let decoder = self + .decoder + .as_mut() + .ok_or_else(|| general_err!("no decoder set"))?; + + decoder.skip(num_values, self.dict.as_ref()) } } @@ -284,6 +289,25 @@ impl ByteArrayDecoder { ByteArrayDecoder::DeltaByteArray(d) => d.read(out, len), } } + + /// Skip `len` values + pub fn skip( + &mut self, + len: usize, + dict: Option<&OffsetBuffer>, + ) -> Result { + match self { + ByteArrayDecoder::Plain(d) => d.skip(len), + ByteArrayDecoder::Dictionary(d) => { + let dict = dict + .ok_or_else(|| general_err!("missing dictionary page for column"))?; + + d.skip(dict, len) + } + ByteArrayDecoder::DeltaLength(d) => d.skip(len), + ByteArrayDecoder::DeltaByteArray(d) => d.skip(len), + } + } } /// Decoder from [`Encoding::PLAIN`] data to [`OffsetBuffer`] @@ -363,6 +387,29 @@ impl ByteArrayDecoderPlain { } Ok(to_read) } + + pub fn skip( + &mut self, + to_skip: usize, + ) -> Result { + if self.max_remaining_values < to_skip { + return Err(general_err!("skip in ByteArrayDecoderPlain out of bound.")); + } + let mut skip = 0; + let buf = self.buf.as_ref(); + + while self.offset < self.buf.len() && skip != to_skip { + if self.offset + 4 > buf.len() { + return Err(ParquetError::EOF("eof decoding byte array".into())); + } + let len_bytes: [u8; 4] = + buf[self.offset..self.offset + 4].try_into().unwrap(); + let len = u32::from_le_bytes(len_bytes) as usize; + skip += 1; + self.offset = self.offset + 4 + len; + } + Ok(skip) + } } /// Decoder from [`Encoding::DELTA_LENGTH_BYTE_ARRAY`] data to [`OffsetBuffer`] @@ -431,6 +478,23 @@ impl ByteArrayDecoderDeltaLength { } Ok(to_read) } + + fn skip( + &mut self, + to_skip: usize, + ) -> Result { + let remain_values = self.lengths.len() - self.length_offset; + if remain_values < to_skip { + return Err(general_err!("skip in ByteArrayDecoderDeltaLength out of bound.")); + } + + let src_lengths = &self.lengths[self.length_offset..self.length_offset + to_skip]; + let total_bytes: usize = src_lengths.iter().map(|x| *x as usize).sum(); + + self.data_offset += total_bytes; + self.length_offset += to_skip; + Ok(to_skip) + } } /// Decoder from [`Encoding::DELTA_BYTE_ARRAY`] to [`OffsetBuffer`] @@ -521,6 +585,40 @@ impl ByteArrayDecoderDelta { } Ok(to_read) } + + fn skip( + &mut self, + to_skip: usize, + ) -> Result { + let remain_values = self.prefix_lengths.len() - self.length_offset; + if remain_values < to_skip { + return Err(general_err!("skip in ByteArrayDecoderDelta out of bound.")); + } + + let length_range = self.length_offset..self.length_offset + to_skip; + let iter = self.prefix_lengths[length_range.clone()] + .iter() + .zip(&self.suffix_lengths[length_range]); + + let data = self.data.as_ref(); + + for (prefix_length, suffix_length) in iter { + let prefix_length = *prefix_length as usize; + let suffix_length = *suffix_length as usize; + + if self.data_offset + suffix_length > self.data.len() { + return Err(ParquetError::EOF("eof decoding byte array".into())); + } + + self.last_value.truncate(prefix_length); + self.last_value.extend_from_slice( + &data[self.data_offset..self.data_offset + suffix_length], + ); + self.data_offset += suffix_length; + } + self.length_offset += to_skip; + Ok(to_skip) + } } /// Decoder from [`Encoding::RLE_DICTIONARY`] to [`OffsetBuffer`] @@ -589,6 +687,42 @@ impl ByteArrayDecoderDictionary { } Ok(values_read) } + + fn skip( + &mut self, + dict: &OffsetBuffer, + to_skip: usize, + ) -> Result { + if self.max_remaining_values < to_skip { + return Err(general_err!("skip in ByteArrayDecoderDictionary out of bound.")); + } + + // All data must be NULL + if dict.is_empty() { + self.max_remaining_values -= to_skip; + return Ok(to_skip); + } + + let mut values_skip = 0; + while values_skip < to_skip { + if self.index_offset == self.index_buf_len { + let read = self.decoder.get_batch(self.index_buf.as_mut())?; + if read == 0 { + break; + } + self.index_buf_len = read; + self.index_offset = 0; + } + + let skip = (to_skip - values_skip) + .min(self.index_buf_len - self.index_offset); + + self.index_offset += skip; + self.max_remaining_values -= skip; + values_skip += skip; + } + Ok(values_skip) + } } #[cfg(test)] @@ -653,6 +787,57 @@ mod tests { } } + #[test] + fn test_byte_array_decoder_skip() { + let (pages, encoded_dictionary) = + byte_array_all_encodings(vec!["hello", "world", "a", "b"]); + + let column_desc = utf8_column(); + let mut decoder = ByteArrayColumnValueDecoder::new(&column_desc); + + decoder + .set_dict(encoded_dictionary, 4, Encoding::RLE_DICTIONARY, false) + .unwrap(); + + for (encoding, page) in pages { + let mut output = OffsetBuffer::::default(); + decoder.set_data(encoding, page, 4, Some(4)).unwrap(); + + assert_eq!(decoder.read(&mut output, 0..1).unwrap(), 1); + + assert_eq!(output.values.as_slice(), "hello".as_bytes()); + assert_eq!(output.offsets.as_slice(), &[0, 5]); + + assert_eq!(decoder.skip_values(1).unwrap(), 1); + assert_eq!(decoder.skip_values(1).unwrap(), 1); + + assert_eq!(decoder.read(&mut output, 0..1).unwrap(), 1); + assert_eq!(output.values.as_slice(), "hellob".as_bytes()); + assert_eq!(output.offsets.as_slice(), &[0, 5, 6]); + + assert_eq!(decoder.read(&mut output, 4..8).unwrap(), 0); + + let valid = vec![false, false, true, true, false, false]; + let valid_buffer = Buffer::from_iter(valid.iter().cloned()); + + output.pad_nulls(0, 2, valid.len(), valid_buffer.as_slice()); + let array = output.into_array(Some(valid_buffer), ArrowType::Utf8); + let strings = array.as_any().downcast_ref::().unwrap(); + + assert_eq!( + strings.iter().collect::>(), + vec![ + None, + None, + Some("hello"), + Some("b"), + None, + None, + ] + ); + } + } + #[test] fn test_byte_array_decoder_nulls() { let (pages, encoded_dictionary) = byte_array_all_encodings(Vec::<&str>::new()); From 96823a13ec4571dad54a8bcfd8f17d37a713df48 Mon Sep 17 00:00:00 2001 From: yangjiang Date: Fri, 15 Jul 2022 21:46:03 +0800 Subject: [PATCH 2/3] add test with nulls and api align with read --- parquet/src/arrow/array_reader/byte_array.rs | 30 +++++++++----------- 1 file changed, 13 insertions(+), 17 deletions(-) diff --git a/parquet/src/arrow/array_reader/byte_array.rs b/parquet/src/arrow/array_reader/byte_array.rs index f8c0e95c610..dc5d7369b8b 100644 --- a/parquet/src/arrow/array_reader/byte_array.rs +++ b/parquet/src/arrow/array_reader/byte_array.rs @@ -392,9 +392,7 @@ impl ByteArrayDecoderPlain { &mut self, to_skip: usize, ) -> Result { - if self.max_remaining_values < to_skip { - return Err(general_err!("skip in ByteArrayDecoderPlain out of bound.")); - } + let to_skip = to_skip.min( self.max_remaining_values); let mut skip = 0; let buf = self.buf.as_ref(); @@ -484,9 +482,7 @@ impl ByteArrayDecoderDeltaLength { to_skip: usize, ) -> Result { let remain_values = self.lengths.len() - self.length_offset; - if remain_values < to_skip { - return Err(general_err!("skip in ByteArrayDecoderDeltaLength out of bound.")); - } + let to_skip = remain_values.min(to_skip); let src_lengths = &self.lengths[self.length_offset..self.length_offset + to_skip]; let total_bytes: usize = src_lengths.iter().map(|x| *x as usize).sum(); @@ -590,10 +586,7 @@ impl ByteArrayDecoderDelta { &mut self, to_skip: usize, ) -> Result { - let remain_values = self.prefix_lengths.len() - self.length_offset; - if remain_values < to_skip { - return Err(general_err!("skip in ByteArrayDecoderDelta out of bound.")); - } + let to_skip = to_skip.min(self.prefix_lengths.len() - self.length_offset); let length_range = self.length_offset..self.length_offset + to_skip; let iter = self.prefix_lengths[length_range.clone()] @@ -693,14 +686,10 @@ impl ByteArrayDecoderDictionary { dict: &OffsetBuffer, to_skip: usize, ) -> Result { - if self.max_remaining_values < to_skip { - return Err(general_err!("skip in ByteArrayDecoderDictionary out of bound.")); - } - + let to_skip = to_skip.min(self.max_remaining_values); // All data must be NULL if dict.is_empty() { - self.max_remaining_values -= to_skip; - return Ok(to_skip); + return Ok(0); } let mut values_skip = 0; @@ -849,10 +838,17 @@ mod tests { .set_dict(encoded_dictionary, 4, Encoding::RLE_DICTIONARY, false) .unwrap(); - for (encoding, page) in pages { + // test nulls read + for (encoding, page) in pages.clone() { let mut output = OffsetBuffer::::default(); decoder.set_data(encoding, page, 4, None).unwrap(); assert_eq!(decoder.read(&mut output, 0..1024).unwrap(), 0); } + + // test nulls skip + for (encoding, page) in pages { + decoder.set_data(encoding, page, 4, None).unwrap(); + assert_eq!(decoder.skip_values(1024).unwrap(), 0); + } } } From 087ea278652787d4fb6d77408af9a04937fae78f Mon Sep 17 00:00:00 2001 From: Yang Jiang Date: Sat, 16 Jul 2022 15:42:49 +0800 Subject: [PATCH 3/3] Update parquet/src/arrow/array_reader/byte_array.rs Co-authored-by: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> --- parquet/src/arrow/array_reader/byte_array.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet/src/arrow/array_reader/byte_array.rs b/parquet/src/arrow/array_reader/byte_array.rs index dc5d7369b8b..853bc2b1898 100644 --- a/parquet/src/arrow/array_reader/byte_array.rs +++ b/parquet/src/arrow/array_reader/byte_array.rs @@ -800,7 +800,7 @@ mod tests { assert_eq!(decoder.skip_values(1).unwrap(), 1); assert_eq!(decoder.skip_values(1).unwrap(), 1); - assert_eq!(decoder.read(&mut output, 0..1).unwrap(), 1); + assert_eq!(decoder.read(&mut output, 1..2).unwrap(), 1); assert_eq!(output.values.as_slice(), "hellob".as_bytes()); assert_eq!(output.offsets.as_slice(), &[0, 5, 6]);