Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support skip_values in ByteArrayColumnValueDecoder #2076

Merged
merged 3 commits into from Jul 16, 2022
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
189 changes: 187 additions & 2 deletions parquet/src/arrow/array_reader/byte_array.rs
Expand Up @@ -215,8 +215,13 @@ impl<I: OffsetSizeTrait + ScalarValue> ColumnValueDecoder
decoder.read(out, range.end - range.start, self.dict.as_ref())
}

fn skip_values(&mut self, _num_values: usize) -> Result<usize> {
Err(nyi_err!("https://github.com/apache/arrow-rs/issues/1792"))
fn skip_values(&mut self, num_values: usize) -> Result<usize> {
let decoder = self
.decoder
.as_mut()
.ok_or_else(|| general_err!("no decoder set"))?;

decoder.skip(num_values, self.dict.as_ref())
}
}

Expand Down Expand Up @@ -284,6 +289,25 @@ impl ByteArrayDecoder {
ByteArrayDecoder::DeltaByteArray(d) => d.read(out, len),
}
}

/// Skip `len` values
pub fn skip<I: OffsetSizeTrait + ScalarValue>(
&mut self,
len: usize,
dict: Option<&OffsetBuffer<I>>,
) -> Result<usize> {
match self {
ByteArrayDecoder::Plain(d) => d.skip(len),
ByteArrayDecoder::Dictionary(d) => {
let dict = dict
.ok_or_else(|| general_err!("missing dictionary page for column"))?;

d.skip(dict, len)
}
ByteArrayDecoder::DeltaLength(d) => d.skip(len),
ByteArrayDecoder::DeltaByteArray(d) => d.skip(len),
}
}
}

/// Decoder from [`Encoding::PLAIN`] data to [`OffsetBuffer`]
Expand Down Expand Up @@ -363,6 +387,29 @@ impl ByteArrayDecoderPlain {
}
Ok(to_read)
}

pub fn skip(
&mut self,
to_skip: usize,
) -> Result<usize> {
if self.max_remaining_values < to_skip {
return Err(general_err!("skip in ByteArrayDecoderPlain out of bound."));
}
let mut skip = 0;
let buf = self.buf.as_ref();

while self.offset < self.buf.len() && skip != to_skip {
if self.offset + 4 > buf.len() {
return Err(ParquetError::EOF("eof decoding byte array".into()));
}
let len_bytes: [u8; 4] =
buf[self.offset..self.offset + 4].try_into().unwrap();
let len = u32::from_le_bytes(len_bytes) as usize;
skip += 1;
self.offset = self.offset + 4 + len;
}
Ok(skip)
}
}

/// Decoder from [`Encoding::DELTA_LENGTH_BYTE_ARRAY`] data to [`OffsetBuffer`]
Expand Down Expand Up @@ -431,6 +478,23 @@ impl ByteArrayDecoderDeltaLength {
}
Ok(to_read)
}

fn skip(
&mut self,
to_skip: usize,
) -> Result<usize> {
let remain_values = self.lengths.len() - self.length_offset;
if remain_values < to_skip {
return Err(general_err!("skip in ByteArrayDecoderDeltaLength out of bound."));
}

let src_lengths = &self.lengths[self.length_offset..self.length_offset + to_skip];
let total_bytes: usize = src_lengths.iter().map(|x| *x as usize).sum();

self.data_offset += total_bytes;
self.length_offset += to_skip;
Ok(to_skip)
}
}

/// Decoder from [`Encoding::DELTA_BYTE_ARRAY`] to [`OffsetBuffer`]
Expand Down Expand Up @@ -521,6 +585,40 @@ impl ByteArrayDecoderDelta {
}
Ok(to_read)
}

fn skip(
&mut self,
to_skip: usize,
) -> Result<usize> {
let remain_values = self.prefix_lengths.len() - self.length_offset;
if remain_values < to_skip {
return Err(general_err!("skip in ByteArrayDecoderDelta out of bound."));
}

let length_range = self.length_offset..self.length_offset + to_skip;
let iter = self.prefix_lengths[length_range.clone()]
.iter()
.zip(&self.suffix_lengths[length_range]);

let data = self.data.as_ref();

for (prefix_length, suffix_length) in iter {
let prefix_length = *prefix_length as usize;
let suffix_length = *suffix_length as usize;

if self.data_offset + suffix_length > self.data.len() {
return Err(ParquetError::EOF("eof decoding byte array".into()));
}

self.last_value.truncate(prefix_length);
self.last_value.extend_from_slice(
&data[self.data_offset..self.data_offset + suffix_length],
);
self.data_offset += suffix_length;
}
self.length_offset += to_skip;
Ok(to_skip)
}
}

/// Decoder from [`Encoding::RLE_DICTIONARY`] to [`OffsetBuffer`]
Expand Down Expand Up @@ -589,6 +687,42 @@ impl ByteArrayDecoderDictionary {
}
Ok(values_read)
}

fn skip<I: OffsetSizeTrait + ScalarValue>(
&mut self,
dict: &OffsetBuffer<I>,
to_skip: usize,
) -> Result<usize> {
if self.max_remaining_values < to_skip {
return Err(general_err!("skip in ByteArrayDecoderDictionary out of bound."));
}

// All data must be NULL
if dict.is_empty() {
self.max_remaining_values -= to_skip;
return Ok(to_skip);
}

let mut values_skip = 0;
while values_skip < to_skip {
if self.index_offset == self.index_buf_len {
let read = self.decoder.get_batch(self.index_buf.as_mut())?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possibly something for a follow up PR, but it would be nice if we could avoid decoding values only to dump them on the floor

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree! file an issue #2088

if read == 0 {
break;
}
self.index_buf_len = read;
self.index_offset = 0;
}

let skip = (to_skip - values_skip)
.min(self.index_buf_len - self.index_offset);

self.index_offset += skip;
self.max_remaining_values -= skip;
values_skip += skip;
}
Ok(values_skip)
}
}

#[cfg(test)]
Expand Down Expand Up @@ -653,6 +787,57 @@ mod tests {
}
}

#[test]
fn test_byte_array_decoder_skip() {
let (pages, encoded_dictionary) =
Ted-Jiang marked this conversation as resolved.
Show resolved Hide resolved
byte_array_all_encodings(vec!["hello", "world", "a", "b"]);

let column_desc = utf8_column();
let mut decoder = ByteArrayColumnValueDecoder::new(&column_desc);

decoder
.set_dict(encoded_dictionary, 4, Encoding::RLE_DICTIONARY, false)
.unwrap();

for (encoding, page) in pages {
let mut output = OffsetBuffer::<i32>::default();
decoder.set_data(encoding, page, 4, Some(4)).unwrap();

assert_eq!(decoder.read(&mut output, 0..1).unwrap(), 1);

assert_eq!(output.values.as_slice(), "hello".as_bytes());
assert_eq!(output.offsets.as_slice(), &[0, 5]);

assert_eq!(decoder.skip_values(1).unwrap(), 1);
assert_eq!(decoder.skip_values(1).unwrap(), 1);

assert_eq!(decoder.read(&mut output, 0..1).unwrap(), 1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
assert_eq!(decoder.read(&mut output, 0..1).unwrap(), 1);
assert_eq!(decoder.read(&mut output, 1..2).unwrap(), 1);

The decoder doesn't actually care, as it keeps track of the number of read values, but this is technically more correct

Copy link
Member Author

@Ted-Jiang Ted-Jiang Jul 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your are right! 👍, finally it just need the length.
Why not change

fn read(&mut self, out: &mut Self::Slice, range: Range<usize>) -> Result<usize>;

to

fn read(&mut self, out: &mut Self::Slice, length: usize) -> Result<usize>;

Is there any reason 🤔

Copy link
Contributor

@tustvold tustvold Jul 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because for backwards compatibility we need to support decoding to a slice, as opposed to a container that tracks its length.

See ColumnValueDecoderImpl

assert_eq!(output.values.as_slice(), "hellob".as_bytes());
assert_eq!(output.offsets.as_slice(), &[0, 5, 6]);

assert_eq!(decoder.read(&mut output, 4..8).unwrap(), 0);

let valid = vec![false, false, true, true, false, false];
let valid_buffer = Buffer::from_iter(valid.iter().cloned());

output.pad_nulls(0, 2, valid.len(), valid_buffer.as_slice());
let array = output.into_array(Some(valid_buffer), ArrowType::Utf8);
let strings = array.as_any().downcast_ref::<StringArray>().unwrap();

assert_eq!(
strings.iter().collect::<Vec<_>>(),
vec![
None,
None,
Some("hello"),
Some("b"),
None,
None,
]
);
}
}

#[test]
fn test_byte_array_decoder_nulls() {
let (pages, encoded_dictionary) = byte_array_all_encodings(Vec::<&str>::new());
Expand Down