diff --git a/parquet/src/arrow/array_reader/byte_array.rs b/parquet/src/arrow/array_reader/byte_array.rs index 172aeb96d6d..00d69d05092 100644 --- a/parquet/src/arrow/array_reader/byte_array.rs +++ b/parquet/src/arrow/array_reader/byte_array.rs @@ -17,6 +17,7 @@ use crate::arrow::array_reader::{read_records, skip_records, ArrayReader}; use crate::arrow::buffer::offset_buffer::OffsetBuffer; +use crate::arrow::decoder::{DeltaByteArrayDecoder, DictIndexDecoder}; use crate::arrow::record_reader::buffer::ScalarValue; use crate::arrow::record_reader::GenericRecordReader; use crate::arrow::schema::parquet_to_arrow_field; @@ -24,10 +25,7 @@ use crate::basic::{ConvertedType, Encoding}; use crate::column::page::PageIterator; use crate::column::reader::decoder::ColumnValueDecoder; use crate::data_type::Int32Type; -use crate::encodings::{ - decoding::{Decoder, DeltaBitPackDecoder}, - rle::RleDecoder, -}; +use crate::encodings::decoding::{Decoder, DeltaBitPackDecoder}; use crate::errors::{ParquetError, Result}; use crate::schema::types::ColumnDescPtr; use crate::util::memory::ByteBufferPtr; @@ -486,45 +484,14 @@ impl ByteArrayDecoderDeltaLength { /// Decoder from [`Encoding::DELTA_BYTE_ARRAY`] to [`OffsetBuffer`] pub struct ByteArrayDecoderDelta { - prefix_lengths: Vec, - suffix_lengths: Vec, - data: ByteBufferPtr, - length_offset: usize, - data_offset: usize, - last_value: Vec, + decoder: DeltaByteArrayDecoder, validate_utf8: bool, } impl ByteArrayDecoderDelta { fn new(data: ByteBufferPtr, validate_utf8: bool) -> Result { - let mut prefix = DeltaBitPackDecoder::::new(); - prefix.set_data(data.all(), 0)?; - - let num_prefix = prefix.values_left(); - let mut prefix_lengths = vec![0; num_prefix]; - assert_eq!(prefix.get(&mut prefix_lengths)?, num_prefix); - - let mut suffix = DeltaBitPackDecoder::::new(); - suffix.set_data(data.start_from(prefix.get_offset()), 0)?; - - let num_suffix = suffix.values_left(); - let mut suffix_lengths = vec![0; num_suffix]; - assert_eq!(suffix.get(&mut suffix_lengths)?, num_suffix); - - if num_prefix != num_suffix { - return Err(general_err!(format!( - "inconsistent DELTA_BYTE_ARRAY lengths, prefixes: {}, suffixes: {}", - num_prefix, num_suffix - ))); - } - Ok(Self { - prefix_lengths, - suffix_lengths, - data, - length_offset: 0, - data_offset: prefix.get_offset() + suffix.get_offset(), - last_value: vec![], + decoder: DeltaByteArrayDecoder::new(data)?, validate_utf8, }) } @@ -535,104 +502,32 @@ impl ByteArrayDecoderDelta { len: usize, ) -> Result { let initial_values_length = output.values.len(); - assert_eq!(self.prefix_lengths.len(), self.suffix_lengths.len()); - - let to_read = len.min(self.prefix_lengths.len() - self.length_offset); - - output.offsets.reserve(to_read); + output.offsets.reserve(len.min(self.decoder.remaining())); - let length_range = self.length_offset..self.length_offset + to_read; - let iter = self.prefix_lengths[length_range.clone()] - .iter() - .zip(&self.suffix_lengths[length_range]); - - let data = self.data.as_ref(); - - for (prefix_length, suffix_length) in iter { - let prefix_length = *prefix_length as usize; - let suffix_length = *suffix_length as usize; - - if self.data_offset + suffix_length > self.data.len() { - return Err(ParquetError::EOF("eof decoding byte array".into())); - } - - self.last_value.truncate(prefix_length); - self.last_value.extend_from_slice( - &data[self.data_offset..self.data_offset + suffix_length], - ); - output.try_push(&self.last_value, self.validate_utf8)?; - - self.data_offset += suffix_length; - } - - self.length_offset += to_read; + let read = self + .decoder + .read(len, |bytes| output.try_push(bytes, self.validate_utf8))?; if self.validate_utf8 { output.check_valid_utf8(initial_values_length)?; } - Ok(to_read) + Ok(read) } fn skip(&mut self, to_skip: usize) -> Result { - let to_skip = to_skip.min(self.prefix_lengths.len() - self.length_offset); - - let length_range = self.length_offset..self.length_offset + to_skip; - let iter = self.prefix_lengths[length_range.clone()] - .iter() - .zip(&self.suffix_lengths[length_range]); - - let data = self.data.as_ref(); - - for (prefix_length, suffix_length) in iter { - let prefix_length = *prefix_length as usize; - let suffix_length = *suffix_length as usize; - - if self.data_offset + suffix_length > self.data.len() { - return Err(ParquetError::EOF("eof decoding byte array".into())); - } - - self.last_value.truncate(prefix_length); - self.last_value.extend_from_slice( - &data[self.data_offset..self.data_offset + suffix_length], - ); - self.data_offset += suffix_length; - } - self.length_offset += to_skip; - Ok(to_skip) + self.decoder.skip(to_skip) } } /// Decoder from [`Encoding::RLE_DICTIONARY`] to [`OffsetBuffer`] pub struct ByteArrayDecoderDictionary { - /// Decoder for the dictionary offsets array - decoder: RleDecoder, - - /// We want to decode the offsets in chunks so we will maintain an internal buffer of decoded - /// offsets - index_buf: Box<[i32; 1024]>, - /// Current length of `index_buf` - index_buf_len: usize, - /// Current offset into `index_buf`. If `index_buf_offset` == `index_buf_len` then we've consumed - /// the entire buffer and need to decode another chunk of offsets. - index_offset: usize, - - /// This is a maximum as the null count is not always known, e.g. value data from - /// a v1 data page - max_remaining_values: usize, + decoder: DictIndexDecoder, } impl ByteArrayDecoderDictionary { fn new(data: ByteBufferPtr, num_levels: usize, num_values: Option) -> Self { - let bit_width = data[0]; - let mut decoder = RleDecoder::new(bit_width); - decoder.set_data(data.start_from(1)); - Self { - decoder, - index_buf: Box::new([0; 1024]), - index_buf_len: 0, - index_offset: 0, - max_remaining_values: num_values.unwrap_or(num_levels), + decoder: DictIndexDecoder::new(data, num_levels, num_values), } } @@ -642,38 +537,18 @@ impl ByteArrayDecoderDictionary { dict: &OffsetBuffer, len: usize, ) -> Result { + // All data must be NULL if dict.is_empty() { - return Ok(0); // All data must be NULL + return Ok(0); } - let mut values_read = 0; - - while values_read != len && self.max_remaining_values != 0 { - if self.index_offset == self.index_buf_len { - // We've consumed the entire index buffer so we need to reload it before proceeding - let read = self.decoder.get_batch(self.index_buf.as_mut())?; - if read == 0 { - break; - } - self.index_buf_len = read; - self.index_offset = 0; - } - - let to_read = (len - values_read) - .min(self.index_buf_len - self.index_offset) - .min(self.max_remaining_values); - + self.decoder.read(len, |keys| { output.extend_from_dictionary( - &self.index_buf[self.index_offset..self.index_offset + to_read], + keys, dict.offsets.as_slice(), dict.values.as_slice(), - )?; - - self.index_offset += to_read; - values_read += to_read; - self.max_remaining_values -= to_read; - } - Ok(values_read) + ) + }) } fn skip( @@ -681,35 +556,12 @@ impl ByteArrayDecoderDictionary { dict: &OffsetBuffer, to_skip: usize, ) -> Result { - let to_skip = to_skip.min(self.max_remaining_values); // All data must be NULL if dict.is_empty() { return Ok(0); } - let mut values_skip = 0; - while values_skip < to_skip { - if self.index_offset == self.index_buf_len { - // Instead of reloading the buffer, just skip in the decoder - let skip = self.decoder.skip(to_skip - values_skip)?; - - if skip == 0 { - break; - } - - self.max_remaining_values -= skip; - values_skip += skip; - } else { - // We still have indices buffered, so skip within the buffer - let skip = - (to_skip - values_skip).min(self.index_buf_len - self.index_offset); - - self.index_offset += skip; - self.max_remaining_values -= skip; - values_skip += skip; - } - } - Ok(values_skip) + self.decoder.skip(to_skip) } } diff --git a/parquet/src/arrow/decoder/delta_byte_array.rs b/parquet/src/arrow/decoder/delta_byte_array.rs new file mode 100644 index 00000000000..af73f4f25eb --- /dev/null +++ b/parquet/src/arrow/decoder/delta_byte_array.rs @@ -0,0 +1,140 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::data_type::Int32Type; +use crate::encodings::decoding::{Decoder, DeltaBitPackDecoder}; +use crate::errors::{ParquetError, Result}; +use crate::util::memory::ByteBufferPtr; + +/// Decoder for `Encoding::DELTA_BYTE_ARRAY` +pub struct DeltaByteArrayDecoder { + prefix_lengths: Vec, + suffix_lengths: Vec, + data: ByteBufferPtr, + length_offset: usize, + data_offset: usize, + last_value: Vec, +} + +impl DeltaByteArrayDecoder { + /// Create a new [`DeltaByteArrayDecoder`] with the provided data page + pub fn new(data: ByteBufferPtr) -> Result { + let mut prefix = DeltaBitPackDecoder::::new(); + prefix.set_data(data.all(), 0)?; + + let num_prefix = prefix.values_left(); + let mut prefix_lengths = vec![0; num_prefix]; + assert_eq!(prefix.get(&mut prefix_lengths)?, num_prefix); + + let mut suffix = DeltaBitPackDecoder::::new(); + suffix.set_data(data.start_from(prefix.get_offset()), 0)?; + + let num_suffix = suffix.values_left(); + let mut suffix_lengths = vec![0; num_suffix]; + assert_eq!(suffix.get(&mut suffix_lengths)?, num_suffix); + + if num_prefix != num_suffix { + return Err(general_err!(format!( + "inconsistent DELTA_BYTE_ARRAY lengths, prefixes: {}, suffixes: {}", + num_prefix, num_suffix + ))); + } + + assert_eq!(prefix_lengths.len(), suffix_lengths.len()); + + Ok(Self { + prefix_lengths, + suffix_lengths, + data, + length_offset: 0, + data_offset: prefix.get_offset() + suffix.get_offset(), + last_value: vec![], + }) + } + + /// Returns the number of values remaining + pub fn remaining(&self) -> usize { + self.prefix_lengths.len() - self.length_offset + } + + /// Read up to `len` values, returning the number of values read + /// and calling `f` with each decoded byte slice + /// + /// Will short-circuit and return on error + pub fn read(&mut self, len: usize, mut f: F) -> Result + where + F: FnMut(&[u8]) -> Result<()>, + { + let to_read = len.min(self.remaining()); + + let length_range = self.length_offset..self.length_offset + to_read; + let iter = self.prefix_lengths[length_range.clone()] + .iter() + .zip(&self.suffix_lengths[length_range]); + + let data = self.data.as_ref(); + + for (prefix_length, suffix_length) in iter { + let prefix_length = *prefix_length as usize; + let suffix_length = *suffix_length as usize; + + if self.data_offset + suffix_length > self.data.len() { + return Err(ParquetError::EOF("eof decoding byte array".into())); + } + + self.last_value.truncate(prefix_length); + self.last_value.extend_from_slice( + &data[self.data_offset..self.data_offset + suffix_length], + ); + f(&self.last_value)?; + + self.data_offset += suffix_length; + } + + self.length_offset += to_read; + Ok(to_read) + } + + /// Skip up to `to_skip` values, returning the number of values skipped + pub fn skip(&mut self, to_skip: usize) -> Result { + let to_skip = to_skip.min(self.prefix_lengths.len() - self.length_offset); + + let length_range = self.length_offset..self.length_offset + to_skip; + let iter = self.prefix_lengths[length_range.clone()] + .iter() + .zip(&self.suffix_lengths[length_range]); + + let data = self.data.as_ref(); + + for (prefix_length, suffix_length) in iter { + let prefix_length = *prefix_length as usize; + let suffix_length = *suffix_length as usize; + + if self.data_offset + suffix_length > self.data.len() { + return Err(ParquetError::EOF("eof decoding byte array".into())); + } + + self.last_value.truncate(prefix_length); + self.last_value.extend_from_slice( + &data[self.data_offset..self.data_offset + suffix_length], + ); + self.data_offset += suffix_length; + } + self.length_offset += to_skip; + Ok(to_skip) + } +} diff --git a/parquet/src/arrow/decoder/dictionary_index.rs b/parquet/src/arrow/decoder/dictionary_index.rs new file mode 100644 index 00000000000..3d258309dd3 --- /dev/null +++ b/parquet/src/arrow/decoder/dictionary_index.rs @@ -0,0 +1,125 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::encodings::rle::RleDecoder; +use crate::errors::Result; +use crate::util::memory::ByteBufferPtr; + +/// Decoder for `Encoding::RLE_DICTIONARY` indices +pub struct DictIndexDecoder { + /// Decoder for the dictionary offsets array + decoder: RleDecoder, + + /// We want to decode the offsets in chunks so we will maintain an internal buffer of decoded + /// offsets + index_buf: Box<[i32; 1024]>, + /// Current length of `index_buf` + index_buf_len: usize, + /// Current offset into `index_buf`. If `index_buf_offset` == `index_buf_len` then we've consumed + /// the entire buffer and need to decode another chunk of offsets. + index_offset: usize, + + /// This is a maximum as the null count is not always known, e.g. value data from + /// a v1 data page + max_remaining_values: usize, +} + +impl DictIndexDecoder { + /// Create a new [`DictIndexDecoder`] with the provided data page, the number of levels + /// associated with this data page, and the number of non-null values (if known) + pub fn new( + data: ByteBufferPtr, + num_levels: usize, + num_values: Option, + ) -> Self { + let bit_width = data[0]; + let mut decoder = RleDecoder::new(bit_width); + decoder.set_data(data.start_from(1)); + + Self { + decoder, + index_buf: Box::new([0; 1024]), + index_buf_len: 0, + index_offset: 0, + max_remaining_values: num_values.unwrap_or(num_levels), + } + } + + /// Read up to `len` values, returning the number of values read + /// and calling `f` with each decoded dictionary index + /// + /// Will short-circuit and return on error + pub fn read Result<()>>( + &mut self, + len: usize, + mut f: F, + ) -> Result { + let mut values_read = 0; + + while values_read != len && self.max_remaining_values != 0 { + if self.index_offset == self.index_buf_len { + // We've consumed the entire index buffer so we need to reload it before proceeding + let read = self.decoder.get_batch(self.index_buf.as_mut())?; + if read == 0 { + break; + } + self.index_buf_len = read; + self.index_offset = 0; + } + + let to_read = (len - values_read) + .min(self.index_buf_len - self.index_offset) + .min(self.max_remaining_values); + + f(&self.index_buf[self.index_offset..self.index_offset + to_read])?; + + self.index_offset += to_read; + values_read += to_read; + self.max_remaining_values -= to_read; + } + Ok(values_read) + } + + /// Skip up to `to_skip` values, returning the number of values skipped + pub fn skip(&mut self, to_skip: usize) -> Result { + let to_skip = to_skip.min(self.max_remaining_values); + + let mut values_skip = 0; + while values_skip < to_skip { + if self.index_offset == self.index_buf_len { + // Instead of reloading the buffer, just skip in the decoder + let skip = self.decoder.skip(to_skip - values_skip)?; + + if skip == 0 { + break; + } + + self.max_remaining_values -= skip; + values_skip += skip; + } else { + // We still have indices buffered, so skip within the buffer + let skip = + (to_skip - values_skip).min(self.index_buf_len - self.index_offset); + + self.index_offset += skip; + self.max_remaining_values -= skip; + values_skip += skip; + } + } + Ok(values_skip) + } +} diff --git a/parquet/src/arrow/decoder/mod.rs b/parquet/src/arrow/decoder/mod.rs new file mode 100644 index 00000000000..dc1000ffd15 --- /dev/null +++ b/parquet/src/arrow/decoder/mod.rs @@ -0,0 +1,24 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Specialized decoders optimised for decoding to arrow format + +mod delta_byte_array; +mod dictionary_index; + +pub use delta_byte_array::DeltaByteArrayDecoder; +pub use dictionary_index::DictIndexDecoder; diff --git a/parquet/src/arrow/mod.rs b/parquet/src/arrow/mod.rs index dda6025898c..c0de656bf9c 100644 --- a/parquet/src/arrow/mod.rs +++ b/parquet/src/arrow/mod.rs @@ -123,6 +123,7 @@ experimental!(mod array_reader); pub mod arrow_reader; pub mod arrow_writer; mod buffer; +mod decoder; #[cfg(feature = "async")] pub mod async_reader;