Skip to content

Commit

Permalink
Support skip_values in ByteArrayColumnValueDecoder (#2076)
Browse files Browse the repository at this point in the history
* Support skip_values in ByteArrayColumnValueDecoder

* add test with nulls and api align with read

* Update parquet/src/arrow/array_reader/byte_array.rs

Co-authored-by: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com>

Co-authored-by: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com>
  • Loading branch information
Ted-Jiang and tustvold committed Jul 16, 2022
1 parent 32867e3 commit 72dada6
Showing 1 changed file with 184 additions and 3 deletions.
187 changes: 184 additions & 3 deletions parquet/src/arrow/array_reader/byte_array.rs
Expand Up @@ -215,8 +215,13 @@ impl<I: OffsetSizeTrait + ScalarValue> ColumnValueDecoder
decoder.read(out, range.end - range.start, self.dict.as_ref())
}

fn skip_values(&mut self, _num_values: usize) -> Result<usize> {
Err(nyi_err!("https://github.com/apache/arrow-rs/issues/1792"))
fn skip_values(&mut self, num_values: usize) -> Result<usize> {
let decoder = self
.decoder
.as_mut()
.ok_or_else(|| general_err!("no decoder set"))?;

decoder.skip(num_values, self.dict.as_ref())
}
}

Expand Down Expand Up @@ -284,6 +289,25 @@ impl ByteArrayDecoder {
ByteArrayDecoder::DeltaByteArray(d) => d.read(out, len),
}
}

/// Skip `len` values
pub fn skip<I: OffsetSizeTrait + ScalarValue>(
&mut self,
len: usize,
dict: Option<&OffsetBuffer<I>>,
) -> Result<usize> {
match self {
ByteArrayDecoder::Plain(d) => d.skip(len),
ByteArrayDecoder::Dictionary(d) => {
let dict = dict
.ok_or_else(|| general_err!("missing dictionary page for column"))?;

d.skip(dict, len)
}
ByteArrayDecoder::DeltaLength(d) => d.skip(len),
ByteArrayDecoder::DeltaByteArray(d) => d.skip(len),
}
}
}

/// Decoder from [`Encoding::PLAIN`] data to [`OffsetBuffer`]
Expand Down Expand Up @@ -363,6 +387,27 @@ impl ByteArrayDecoderPlain {
}
Ok(to_read)
}

pub fn skip(
&mut self,
to_skip: usize,
) -> Result<usize> {
let to_skip = to_skip.min( self.max_remaining_values);
let mut skip = 0;
let buf = self.buf.as_ref();

while self.offset < self.buf.len() && skip != to_skip {
if self.offset + 4 > buf.len() {
return Err(ParquetError::EOF("eof decoding byte array".into()));
}
let len_bytes: [u8; 4] =
buf[self.offset..self.offset + 4].try_into().unwrap();
let len = u32::from_le_bytes(len_bytes) as usize;
skip += 1;
self.offset = self.offset + 4 + len;
}
Ok(skip)
}
}

/// Decoder from [`Encoding::DELTA_LENGTH_BYTE_ARRAY`] data to [`OffsetBuffer`]
Expand Down Expand Up @@ -431,6 +476,21 @@ impl ByteArrayDecoderDeltaLength {
}
Ok(to_read)
}

fn skip(
&mut self,
to_skip: usize,
) -> Result<usize> {
let remain_values = self.lengths.len() - self.length_offset;
let to_skip = remain_values.min(to_skip);

let src_lengths = &self.lengths[self.length_offset..self.length_offset + to_skip];
let total_bytes: usize = src_lengths.iter().map(|x| *x as usize).sum();

self.data_offset += total_bytes;
self.length_offset += to_skip;
Ok(to_skip)
}
}

/// Decoder from [`Encoding::DELTA_BYTE_ARRAY`] to [`OffsetBuffer`]
Expand Down Expand Up @@ -521,6 +581,37 @@ impl ByteArrayDecoderDelta {
}
Ok(to_read)
}

fn skip(
&mut self,
to_skip: usize,
) -> Result<usize> {
let to_skip = to_skip.min(self.prefix_lengths.len() - self.length_offset);

let length_range = self.length_offset..self.length_offset + to_skip;
let iter = self.prefix_lengths[length_range.clone()]
.iter()
.zip(&self.suffix_lengths[length_range]);

let data = self.data.as_ref();

for (prefix_length, suffix_length) in iter {
let prefix_length = *prefix_length as usize;
let suffix_length = *suffix_length as usize;

if self.data_offset + suffix_length > self.data.len() {
return Err(ParquetError::EOF("eof decoding byte array".into()));
}

self.last_value.truncate(prefix_length);
self.last_value.extend_from_slice(
&data[self.data_offset..self.data_offset + suffix_length],
);
self.data_offset += suffix_length;
}
self.length_offset += to_skip;
Ok(to_skip)
}
}

/// Decoder from [`Encoding::RLE_DICTIONARY`] to [`OffsetBuffer`]
Expand Down Expand Up @@ -589,6 +680,38 @@ impl ByteArrayDecoderDictionary {
}
Ok(values_read)
}

fn skip<I: OffsetSizeTrait + ScalarValue>(
&mut self,
dict: &OffsetBuffer<I>,
to_skip: usize,
) -> Result<usize> {
let to_skip = to_skip.min(self.max_remaining_values);
// All data must be NULL
if dict.is_empty() {
return Ok(0);
}

let mut values_skip = 0;
while values_skip < to_skip {
if self.index_offset == self.index_buf_len {
let read = self.decoder.get_batch(self.index_buf.as_mut())?;
if read == 0 {
break;
}
self.index_buf_len = read;
self.index_offset = 0;
}

let skip = (to_skip - values_skip)
.min(self.index_buf_len - self.index_offset);

self.index_offset += skip;
self.max_remaining_values -= skip;
values_skip += skip;
}
Ok(values_skip)
}
}

#[cfg(test)]
Expand Down Expand Up @@ -653,6 +776,57 @@ mod tests {
}
}

#[test]
fn test_byte_array_decoder_skip() {
let (pages, encoded_dictionary) =
byte_array_all_encodings(vec!["hello", "world", "a", "b"]);

let column_desc = utf8_column();
let mut decoder = ByteArrayColumnValueDecoder::new(&column_desc);

decoder
.set_dict(encoded_dictionary, 4, Encoding::RLE_DICTIONARY, false)
.unwrap();

for (encoding, page) in pages {
let mut output = OffsetBuffer::<i32>::default();
decoder.set_data(encoding, page, 4, Some(4)).unwrap();

assert_eq!(decoder.read(&mut output, 0..1).unwrap(), 1);

assert_eq!(output.values.as_slice(), "hello".as_bytes());
assert_eq!(output.offsets.as_slice(), &[0, 5]);

assert_eq!(decoder.skip_values(1).unwrap(), 1);
assert_eq!(decoder.skip_values(1).unwrap(), 1);

assert_eq!(decoder.read(&mut output, 1..2).unwrap(), 1);
assert_eq!(output.values.as_slice(), "hellob".as_bytes());
assert_eq!(output.offsets.as_slice(), &[0, 5, 6]);

assert_eq!(decoder.read(&mut output, 4..8).unwrap(), 0);

let valid = vec![false, false, true, true, false, false];
let valid_buffer = Buffer::from_iter(valid.iter().cloned());

output.pad_nulls(0, 2, valid.len(), valid_buffer.as_slice());
let array = output.into_array(Some(valid_buffer), ArrowType::Utf8);
let strings = array.as_any().downcast_ref::<StringArray>().unwrap();

assert_eq!(
strings.iter().collect::<Vec<_>>(),
vec![
None,
None,
Some("hello"),
Some("b"),
None,
None,
]
);
}
}

#[test]
fn test_byte_array_decoder_nulls() {
let (pages, encoded_dictionary) = byte_array_all_encodings(Vec::<&str>::new());
Expand All @@ -664,10 +838,17 @@ mod tests {
.set_dict(encoded_dictionary, 4, Encoding::RLE_DICTIONARY, false)
.unwrap();

for (encoding, page) in pages {
// test nulls read
for (encoding, page) in pages.clone() {
let mut output = OffsetBuffer::<i32>::default();
decoder.set_data(encoding, page, 4, None).unwrap();
assert_eq!(decoder.read(&mut output, 0..1024).unwrap(), 0);
}

// test nulls skip
for (encoding, page) in pages {
decoder.set_data(encoding, page, 4, None).unwrap();
assert_eq!(decoder.skip_values(1024).unwrap(), 0);
}
}
}

0 comments on commit 72dada6

Please sign in to comment.