Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split out byte array decoders (#2318) #2527

Merged
merged 1 commit into from Aug 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
186 changes: 19 additions & 167 deletions parquet/src/arrow/array_reader/byte_array.rs
Expand Up @@ -17,17 +17,15 @@

use crate::arrow::array_reader::{read_records, skip_records, ArrayReader};
use crate::arrow::buffer::offset_buffer::OffsetBuffer;
use crate::arrow::decoder::{DeltaByteArrayDecoder, DictIndexDecoder};
use crate::arrow::record_reader::buffer::ScalarValue;
use crate::arrow::record_reader::GenericRecordReader;
use crate::arrow::schema::parquet_to_arrow_field;
use crate::basic::{ConvertedType, Encoding};
use crate::column::page::PageIterator;
use crate::column::reader::decoder::ColumnValueDecoder;
use crate::data_type::Int32Type;
use crate::encodings::{
decoding::{Decoder, DeltaBitPackDecoder},
rle::RleDecoder,
};
use crate::encodings::decoding::{Decoder, DeltaBitPackDecoder};
use crate::errors::{ParquetError, Result};
use crate::schema::types::ColumnDescPtr;
use crate::util::memory::ByteBufferPtr;
Expand Down Expand Up @@ -486,45 +484,14 @@ impl ByteArrayDecoderDeltaLength {

/// Decoder from [`Encoding::DELTA_BYTE_ARRAY`] to [`OffsetBuffer`]
pub struct ByteArrayDecoderDelta {
prefix_lengths: Vec<i32>,
suffix_lengths: Vec<i32>,
data: ByteBufferPtr,
length_offset: usize,
data_offset: usize,
last_value: Vec<u8>,
decoder: DeltaByteArrayDecoder,
validate_utf8: bool,
}

impl ByteArrayDecoderDelta {
fn new(data: ByteBufferPtr, validate_utf8: bool) -> Result<Self> {
let mut prefix = DeltaBitPackDecoder::<Int32Type>::new();
prefix.set_data(data.all(), 0)?;

let num_prefix = prefix.values_left();
let mut prefix_lengths = vec![0; num_prefix];
assert_eq!(prefix.get(&mut prefix_lengths)?, num_prefix);

let mut suffix = DeltaBitPackDecoder::<Int32Type>::new();
suffix.set_data(data.start_from(prefix.get_offset()), 0)?;

let num_suffix = suffix.values_left();
let mut suffix_lengths = vec![0; num_suffix];
assert_eq!(suffix.get(&mut suffix_lengths)?, num_suffix);

if num_prefix != num_suffix {
return Err(general_err!(format!(
"inconsistent DELTA_BYTE_ARRAY lengths, prefixes: {}, suffixes: {}",
num_prefix, num_suffix
)));
}

Ok(Self {
prefix_lengths,
suffix_lengths,
data,
length_offset: 0,
data_offset: prefix.get_offset() + suffix.get_offset(),
last_value: vec![],
decoder: DeltaByteArrayDecoder::new(data)?,
validate_utf8,
})
}
Expand All @@ -535,104 +502,32 @@ impl ByteArrayDecoderDelta {
len: usize,
) -> Result<usize> {
let initial_values_length = output.values.len();
assert_eq!(self.prefix_lengths.len(), self.suffix_lengths.len());

let to_read = len.min(self.prefix_lengths.len() - self.length_offset);

output.offsets.reserve(to_read);
output.offsets.reserve(len.min(self.decoder.remaining()));

let length_range = self.length_offset..self.length_offset + to_read;
let iter = self.prefix_lengths[length_range.clone()]
.iter()
.zip(&self.suffix_lengths[length_range]);

let data = self.data.as_ref();

for (prefix_length, suffix_length) in iter {
let prefix_length = *prefix_length as usize;
let suffix_length = *suffix_length as usize;

if self.data_offset + suffix_length > self.data.len() {
return Err(ParquetError::EOF("eof decoding byte array".into()));
}

self.last_value.truncate(prefix_length);
self.last_value.extend_from_slice(
&data[self.data_offset..self.data_offset + suffix_length],
);
output.try_push(&self.last_value, self.validate_utf8)?;

self.data_offset += suffix_length;
}

self.length_offset += to_read;
let read = self
.decoder
.read(len, |bytes| output.try_push(bytes, self.validate_utf8))?;

if self.validate_utf8 {
output.check_valid_utf8(initial_values_length)?;
}
Ok(to_read)
Ok(read)
}

fn skip(&mut self, to_skip: usize) -> Result<usize> {
let to_skip = to_skip.min(self.prefix_lengths.len() - self.length_offset);

let length_range = self.length_offset..self.length_offset + to_skip;
let iter = self.prefix_lengths[length_range.clone()]
.iter()
.zip(&self.suffix_lengths[length_range]);

let data = self.data.as_ref();

for (prefix_length, suffix_length) in iter {
let prefix_length = *prefix_length as usize;
let suffix_length = *suffix_length as usize;

if self.data_offset + suffix_length > self.data.len() {
return Err(ParquetError::EOF("eof decoding byte array".into()));
}

self.last_value.truncate(prefix_length);
self.last_value.extend_from_slice(
&data[self.data_offset..self.data_offset + suffix_length],
);
self.data_offset += suffix_length;
}
self.length_offset += to_skip;
Ok(to_skip)
self.decoder.skip(to_skip)
}
}

/// Decoder from [`Encoding::RLE_DICTIONARY`] to [`OffsetBuffer`]
pub struct ByteArrayDecoderDictionary {
/// Decoder for the dictionary offsets array
decoder: RleDecoder,

/// We want to decode the offsets in chunks so we will maintain an internal buffer of decoded
/// offsets
index_buf: Box<[i32; 1024]>,
/// Current length of `index_buf`
index_buf_len: usize,
/// Current offset into `index_buf`. If `index_buf_offset` == `index_buf_len` then we've consumed
/// the entire buffer and need to decode another chunk of offsets.
index_offset: usize,

/// This is a maximum as the null count is not always known, e.g. value data from
/// a v1 data page
max_remaining_values: usize,
decoder: DictIndexDecoder,
}

impl ByteArrayDecoderDictionary {
fn new(data: ByteBufferPtr, num_levels: usize, num_values: Option<usize>) -> Self {
let bit_width = data[0];
let mut decoder = RleDecoder::new(bit_width);
decoder.set_data(data.start_from(1));

Self {
decoder,
index_buf: Box::new([0; 1024]),
index_buf_len: 0,
index_offset: 0,
max_remaining_values: num_values.unwrap_or(num_levels),
decoder: DictIndexDecoder::new(data, num_levels, num_values),
}
}

Expand All @@ -642,74 +537,31 @@ impl ByteArrayDecoderDictionary {
dict: &OffsetBuffer<I>,
len: usize,
) -> Result<usize> {
// All data must be NULL
if dict.is_empty() {
return Ok(0); // All data must be NULL
return Ok(0);
}

let mut values_read = 0;

while values_read != len && self.max_remaining_values != 0 {
if self.index_offset == self.index_buf_len {
// We've consumed the entire index buffer so we need to reload it before proceeding
let read = self.decoder.get_batch(self.index_buf.as_mut())?;
if read == 0 {
break;
}
self.index_buf_len = read;
self.index_offset = 0;
}

let to_read = (len - values_read)
.min(self.index_buf_len - self.index_offset)
.min(self.max_remaining_values);

self.decoder.read(len, |keys| {
output.extend_from_dictionary(
&self.index_buf[self.index_offset..self.index_offset + to_read],
keys,
dict.offsets.as_slice(),
dict.values.as_slice(),
)?;

self.index_offset += to_read;
values_read += to_read;
self.max_remaining_values -= to_read;
}
Ok(values_read)
)
})
}

fn skip<I: OffsetSizeTrait + ScalarValue>(
&mut self,
dict: &OffsetBuffer<I>,
to_skip: usize,
) -> Result<usize> {
let to_skip = to_skip.min(self.max_remaining_values);
// All data must be NULL
if dict.is_empty() {
return Ok(0);
}

let mut values_skip = 0;
while values_skip < to_skip {
if self.index_offset == self.index_buf_len {
// Instead of reloading the buffer, just skip in the decoder
let skip = self.decoder.skip(to_skip - values_skip)?;

if skip == 0 {
break;
}

self.max_remaining_values -= skip;
values_skip += skip;
} else {
// We still have indices buffered, so skip within the buffer
let skip =
(to_skip - values_skip).min(self.index_buf_len - self.index_offset);

self.index_offset += skip;
self.max_remaining_values -= skip;
values_skip += skip;
}
}
Ok(values_skip)
self.decoder.skip(to_skip)
}
}

Expand Down