Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support skip_values in ByteArrayColumnValueDecoder #2076

Merged
merged 3 commits into from Jul 16, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
187 changes: 184 additions & 3 deletions parquet/src/arrow/array_reader/byte_array.rs
Expand Up @@ -215,8 +215,13 @@ impl<I: OffsetSizeTrait + ScalarValue> ColumnValueDecoder
decoder.read(out, range.end - range.start, self.dict.as_ref())
}

fn skip_values(&mut self, _num_values: usize) -> Result<usize> {
Err(nyi_err!("https://github.com/apache/arrow-rs/issues/1792"))
fn skip_values(&mut self, num_values: usize) -> Result<usize> {
let decoder = self
.decoder
.as_mut()
.ok_or_else(|| general_err!("no decoder set"))?;

decoder.skip(num_values, self.dict.as_ref())
}
}

Expand Down Expand Up @@ -284,6 +289,25 @@ impl ByteArrayDecoder {
ByteArrayDecoder::DeltaByteArray(d) => d.read(out, len),
}
}

/// Skip `len` values
pub fn skip<I: OffsetSizeTrait + ScalarValue>(
&mut self,
len: usize,
dict: Option<&OffsetBuffer<I>>,
) -> Result<usize> {
match self {
ByteArrayDecoder::Plain(d) => d.skip(len),
ByteArrayDecoder::Dictionary(d) => {
let dict = dict
.ok_or_else(|| general_err!("missing dictionary page for column"))?;

d.skip(dict, len)
}
ByteArrayDecoder::DeltaLength(d) => d.skip(len),
ByteArrayDecoder::DeltaByteArray(d) => d.skip(len),
}
}
}

/// Decoder from [`Encoding::PLAIN`] data to [`OffsetBuffer`]
Expand Down Expand Up @@ -363,6 +387,27 @@ impl ByteArrayDecoderPlain {
}
Ok(to_read)
}

pub fn skip(
&mut self,
to_skip: usize,
) -> Result<usize> {
let to_skip = to_skip.min( self.max_remaining_values);
let mut skip = 0;
let buf = self.buf.as_ref();

while self.offset < self.buf.len() && skip != to_skip {
if self.offset + 4 > buf.len() {
return Err(ParquetError::EOF("eof decoding byte array".into()));
}
let len_bytes: [u8; 4] =
buf[self.offset..self.offset + 4].try_into().unwrap();
let len = u32::from_le_bytes(len_bytes) as usize;
skip += 1;
self.offset = self.offset + 4 + len;
}
Ok(skip)
}
}

/// Decoder from [`Encoding::DELTA_LENGTH_BYTE_ARRAY`] data to [`OffsetBuffer`]
Expand Down Expand Up @@ -431,6 +476,21 @@ impl ByteArrayDecoderDeltaLength {
}
Ok(to_read)
}

fn skip(
&mut self,
to_skip: usize,
) -> Result<usize> {
let remain_values = self.lengths.len() - self.length_offset;
let to_skip = remain_values.min(to_skip);

let src_lengths = &self.lengths[self.length_offset..self.length_offset + to_skip];
let total_bytes: usize = src_lengths.iter().map(|x| *x as usize).sum();

self.data_offset += total_bytes;
self.length_offset += to_skip;
Ok(to_skip)
}
}

/// Decoder from [`Encoding::DELTA_BYTE_ARRAY`] to [`OffsetBuffer`]
Expand Down Expand Up @@ -521,6 +581,37 @@ impl ByteArrayDecoderDelta {
}
Ok(to_read)
}

fn skip(
&mut self,
to_skip: usize,
) -> Result<usize> {
let to_skip = to_skip.min(self.prefix_lengths.len() - self.length_offset);

let length_range = self.length_offset..self.length_offset + to_skip;
let iter = self.prefix_lengths[length_range.clone()]
.iter()
.zip(&self.suffix_lengths[length_range]);

let data = self.data.as_ref();

for (prefix_length, suffix_length) in iter {
let prefix_length = *prefix_length as usize;
let suffix_length = *suffix_length as usize;

if self.data_offset + suffix_length > self.data.len() {
return Err(ParquetError::EOF("eof decoding byte array".into()));
}

self.last_value.truncate(prefix_length);
self.last_value.extend_from_slice(
&data[self.data_offset..self.data_offset + suffix_length],
);
self.data_offset += suffix_length;
}
self.length_offset += to_skip;
Ok(to_skip)
}
}

/// Decoder from [`Encoding::RLE_DICTIONARY`] to [`OffsetBuffer`]
Expand Down Expand Up @@ -589,6 +680,38 @@ impl ByteArrayDecoderDictionary {
}
Ok(values_read)
}

fn skip<I: OffsetSizeTrait + ScalarValue>(
&mut self,
dict: &OffsetBuffer<I>,
to_skip: usize,
) -> Result<usize> {
let to_skip = to_skip.min(self.max_remaining_values);
// All data must be NULL
if dict.is_empty() {
return Ok(0);
}

let mut values_skip = 0;
while values_skip < to_skip {
if self.index_offset == self.index_buf_len {
let read = self.decoder.get_batch(self.index_buf.as_mut())?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possibly something for a follow up PR, but it would be nice if we could avoid decoding values only to dump them on the floor

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree! file an issue #2088

if read == 0 {
break;
}
self.index_buf_len = read;
self.index_offset = 0;
}

let skip = (to_skip - values_skip)
.min(self.index_buf_len - self.index_offset);

self.index_offset += skip;
self.max_remaining_values -= skip;
values_skip += skip;
}
Ok(values_skip)
}
}

#[cfg(test)]
Expand Down Expand Up @@ -653,6 +776,57 @@ mod tests {
}
}

#[test]
fn test_byte_array_decoder_skip() {
let (pages, encoded_dictionary) =
Ted-Jiang marked this conversation as resolved.
Show resolved Hide resolved
byte_array_all_encodings(vec!["hello", "world", "a", "b"]);

let column_desc = utf8_column();
let mut decoder = ByteArrayColumnValueDecoder::new(&column_desc);

decoder
.set_dict(encoded_dictionary, 4, Encoding::RLE_DICTIONARY, false)
.unwrap();

for (encoding, page) in pages {
let mut output = OffsetBuffer::<i32>::default();
decoder.set_data(encoding, page, 4, Some(4)).unwrap();

assert_eq!(decoder.read(&mut output, 0..1).unwrap(), 1);

assert_eq!(output.values.as_slice(), "hello".as_bytes());
assert_eq!(output.offsets.as_slice(), &[0, 5]);

assert_eq!(decoder.skip_values(1).unwrap(), 1);
assert_eq!(decoder.skip_values(1).unwrap(), 1);

assert_eq!(decoder.read(&mut output, 1..2).unwrap(), 1);
assert_eq!(output.values.as_slice(), "hellob".as_bytes());
assert_eq!(output.offsets.as_slice(), &[0, 5, 6]);

assert_eq!(decoder.read(&mut output, 4..8).unwrap(), 0);

let valid = vec![false, false, true, true, false, false];
let valid_buffer = Buffer::from_iter(valid.iter().cloned());

output.pad_nulls(0, 2, valid.len(), valid_buffer.as_slice());
let array = output.into_array(Some(valid_buffer), ArrowType::Utf8);
let strings = array.as_any().downcast_ref::<StringArray>().unwrap();

assert_eq!(
strings.iter().collect::<Vec<_>>(),
vec![
None,
None,
Some("hello"),
Some("b"),
None,
None,
]
);
}
}

#[test]
fn test_byte_array_decoder_nulls() {
let (pages, encoded_dictionary) = byte_array_all_encodings(Vec::<&str>::new());
Expand All @@ -664,10 +838,17 @@ mod tests {
.set_dict(encoded_dictionary, 4, Encoding::RLE_DICTIONARY, false)
.unwrap();

for (encoding, page) in pages {
// test nulls read
for (encoding, page) in pages.clone() {
let mut output = OffsetBuffer::<i32>::default();
decoder.set_data(encoding, page, 4, None).unwrap();
assert_eq!(decoder.read(&mut output, 0..1024).unwrap(), 0);
}

// test nulls skip
for (encoding, page) in pages {
decoder.set_data(encoding, page, 4, None).unwrap();
assert_eq!(decoder.skip_values(1024).unwrap(), 0);
}
}
}