Skip to content

Commit

Permalink
Stub out Skip Records API (#1792) (#1998)
Browse files Browse the repository at this point in the history
* Stub API for parquet record skipping

* Update parquet/src/arrow/record_reader/mod.rs

Co-authored-by: Yang Jiang <jiangyang381@163.com>

* Remove empty google.protobuf.rs

* Replace todo with nyi_err

* Update doc comment

Co-authored-by: Yang Jiang <jiangyang381@163.com>
  • Loading branch information
tustvold and Ted-Jiang committed Jul 7, 2022
1 parent da3879e commit e59b023
Show file tree
Hide file tree
Showing 20 changed files with 391 additions and 17 deletions.
8 changes: 8 additions & 0 deletions parquet/src/arrow/array_reader/byte_array.rs
Expand Up @@ -122,6 +122,10 @@ impl<I: OffsetSizeTrait + ScalarValue> ArrayReader for ByteArrayReader<I> {
Ok(buffer.into_array(null_buffer, self.data_type.clone()))
}

fn skip_records(&mut self, num_records: usize) -> Result<usize> {
self.record_reader.skip_records(num_records)
}

fn get_def_levels(&self) -> Option<&[i16]> {
self.def_levels_buffer
.as_ref()
Expand Down Expand Up @@ -210,6 +214,10 @@ impl<I: OffsetSizeTrait + ScalarValue> ColumnValueDecoder

decoder.read(out, range.end - range.start, self.dict.as_ref())
}

fn skip_values(&mut self, _num_values: usize) -> Result<usize> {
Err(nyi_err!("https://github.com/apache/arrow-rs/issues/1792"))
}
}

/// A generic decoder from uncompressed parquet value data to [`OffsetBuffer`]
Expand Down
8 changes: 8 additions & 0 deletions parquet/src/arrow/array_reader/byte_array_dictionary.rs
Expand Up @@ -184,6 +184,10 @@ where
Ok(array)
}

fn skip_records(&mut self, num_records: usize) -> Result<usize> {
self.record_reader.skip_records(num_records)
}

fn get_def_levels(&self) -> Option<&[i16]> {
self.def_levels_buffer
.as_ref()
Expand Down Expand Up @@ -371,6 +375,10 @@ where
}
}
}

fn skip_values(&mut self, _num_values: usize) -> Result<usize> {
Err(nyi_err!("https://github.com/apache/arrow-rs/issues/1792"))
}
}

#[cfg(test)]
Expand Down
7 changes: 7 additions & 0 deletions parquet/src/arrow/array_reader/complex_object_array.rs
Expand Up @@ -163,6 +163,13 @@ where
Ok(array)
}

fn skip_records(&mut self, num_records: usize) -> Result<usize> {
match self.column_reader.as_mut() {
Some(reader) => reader.skip_records(num_records),
None => Ok(0),
}
}

fn get_def_levels(&self) -> Option<&[i16]> {
self.def_levels_buffer.as_deref()
}
Expand Down
6 changes: 6 additions & 0 deletions parquet/src/arrow/array_reader/empty_array.rs
Expand Up @@ -65,6 +65,12 @@ impl ArrayReader for EmptyArrayReader {
Ok(Arc::new(StructArray::from(data)))
}

fn skip_records(&mut self, num_records: usize) -> Result<usize> {
let skipped = self.remaining_rows.min(num_records);
self.remaining_rows -= skipped;
Ok(skipped)
}

fn get_def_levels(&self) -> Option<&[i16]> {
None
}
Expand Down
4 changes: 4 additions & 0 deletions parquet/src/arrow/array_reader/list_array.rs
Expand Up @@ -231,6 +231,10 @@ impl<OffsetSize: OffsetSizeTrait> ArrayReader for ListArrayReader<OffsetSize> {
Ok(Arc::new(result_array))
}

fn skip_records(&mut self, num_records: usize) -> Result<usize> {
self.item_reader.skip_records(num_records)
}

fn get_def_levels(&self) -> Option<&[i16]> {
self.item_reader.get_def_levels()
}
Expand Down
13 changes: 13 additions & 0 deletions parquet/src/arrow/array_reader/map_array.rs
Expand Up @@ -149,6 +149,19 @@ impl ArrayReader for MapArrayReader {
Ok(Arc::new(MapArray::from(array_data)))
}

fn skip_records(&mut self, num_records: usize) -> Result<usize> {
let key_skipped = self.key_reader.skip_records(num_records)?;
let value_skipped = self.value_reader.skip_records(num_records)?;
if key_skipped != value_skipped {
return Err(general_err!(
"MapArrayReader out of sync, skipped {} keys and {} values",
key_skipped,
value_skipped
));
}
Ok(key_skipped)
}

fn get_def_levels(&self) -> Option<&[i16]> {
// Children definition levels should describe the same parent structure,
// so return key_reader only
Expand Down
3 changes: 3 additions & 0 deletions parquet/src/arrow/array_reader/mod.rs
Expand Up @@ -64,6 +64,9 @@ pub trait ArrayReader: Send {
/// Reads at most `batch_size` records into an arrow array and return it.
fn next_batch(&mut self, batch_size: usize) -> Result<ArrayRef>;

/// Skips over `num_records` records, returning the number of rows skipped
fn skip_records(&mut self, num_records: usize) -> Result<usize>;

/// If this array has a non-zero definition level, i.e. has a nullable parent
/// array, returns the definition levels of data from the last call of `next_batch`
///
Expand Down
4 changes: 4 additions & 0 deletions parquet/src/arrow/array_reader/null_array.rs
Expand Up @@ -96,6 +96,10 @@ where
Ok(Arc::new(array))
}

fn skip_records(&mut self, num_records: usize) -> Result<usize> {
self.record_reader.skip_records(num_records)
}

fn get_def_levels(&self) -> Option<&[i16]> {
self.def_levels_buffer.as_ref().map(|buf| buf.typed_data())
}
Expand Down
4 changes: 4 additions & 0 deletions parquet/src/arrow/array_reader/primitive_array.rs
Expand Up @@ -233,6 +233,10 @@ where
Ok(array)
}

fn skip_records(&mut self, num_records: usize) -> Result<usize> {
self.record_reader.skip_records(num_records)
}

fn get_def_levels(&self) -> Option<&[i16]> {
self.def_levels_buffer.as_ref().map(|buf| buf.typed_data())
}
Expand Down
20 changes: 20 additions & 0 deletions parquet/src/arrow/array_reader/struct_array.rs
Expand Up @@ -157,6 +157,26 @@ impl ArrayReader for StructArrayReader {
Ok(Arc::new(StructArray::from(array_data)))
}

fn skip_records(&mut self, num_records: usize) -> Result<usize> {
let mut skipped = None;
for child in self.children.iter_mut() {
let child_skipped = child.skip_records(num_records)?;
match skipped {
Some(expected) => {
if expected != child_skipped {
return Err(general_err!(
"StructArrayReader out of sync, expected {} skipped, got {}",
expected,
child_skipped
));
}
}
None => skipped = Some(child_skipped),
}
}
Ok(skipped.unwrap_or(0))
}

fn get_def_levels(&self) -> Option<&[i16]> {
// Children definition levels should describe the same
// parent structure, so return first child's
Expand Down
5 changes: 5 additions & 0 deletions parquet/src/arrow/array_reader/test_util.rs
Expand Up @@ -170,6 +170,11 @@ impl ArrayReader for InMemoryArrayReader {
Ok(self.array.slice(self.last_idx, read))
}

fn skip_records(&mut self, num_records: usize) -> Result<usize> {
let array = self.next_batch(num_records)?;
Ok(array.len())
}

fn get_def_levels(&self) -> Option<&[i16]> {
self.def_levels
.as_ref()
Expand Down
108 changes: 103 additions & 5 deletions parquet/src/arrow/arrow_reader.rs
Expand Up @@ -17,6 +17,7 @@

//! Contains reader which reads parquet data into arrow [`RecordBatch`]

use std::collections::VecDeque;
use std::sync::Arc;

use arrow::array::Array;
Expand All @@ -29,7 +30,7 @@ use crate::arrow::array_reader::{build_array_reader, ArrayReader};
use crate::arrow::schema::parquet_to_arrow_schema;
use crate::arrow::schema::parquet_to_arrow_schema_by_columns;
use crate::arrow::ProjectionMask;
use crate::errors::Result;
use crate::errors::{ParquetError, Result};
use crate::file::metadata::{KeyValue, ParquetMetaData};
use crate::file::reader::{ChunkReader, FileReader, SerializedFileReader};
use crate::schema::types::SchemaDescriptor;
Expand Down Expand Up @@ -70,9 +71,39 @@ pub trait ArrowReader {
) -> Result<Self::RecordReader>;
}

/// [`RowSelection`] allows selecting or skipping a provided number of rows
/// when scanning the parquet file
#[derive(Debug, Clone, Copy)]
pub(crate) struct RowSelection {
/// The number of rows
pub row_count: usize,

/// If true, skip `row_count` rows
pub skip: bool,
}

impl RowSelection {
/// Select `row_count` rows
pub fn select(row_count: usize) -> Self {
Self {
row_count,
skip: false,
}
}

/// Skip `row_count` rows
pub fn skip(row_count: usize) -> Self {
Self {
row_count,
skip: true,
}
}
}

#[derive(Debug, Clone, Default)]
pub struct ArrowReaderOptions {
skip_arrow_metadata: bool,
selection: Option<Vec<RowSelection>>,
}

impl ArrowReaderOptions {
Expand All @@ -90,6 +121,20 @@ impl ArrowReaderOptions {
pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self {
Self {
skip_arrow_metadata,
..self
}
}

/// Scan rows from the parquet file according to the provided `selection`
///
/// TODO: Make public once row selection fully implemented (#1792)
pub(crate) fn with_row_selection(
self,
selection: impl Into<Vec<RowSelection>>,
) -> Self {
Self {
selection: Some(selection.into()),
..self
}
}
}
Expand Down Expand Up @@ -139,7 +184,12 @@ impl ArrowReader for ParquetFileArrowReader {
Box::new(self.file_reader.clone()),
)?;

ParquetRecordBatchReader::try_new(batch_size, array_reader)
let selection = self.options.selection.clone().map(Into::into);
Ok(ParquetRecordBatchReader::new(
batch_size,
array_reader,
selection,
))
}
}

Expand Down Expand Up @@ -221,13 +271,47 @@ pub struct ParquetRecordBatchReader {
batch_size: usize,
array_reader: Box<dyn ArrayReader>,
schema: SchemaRef,
selection: Option<VecDeque<RowSelection>>,
}

impl Iterator for ParquetRecordBatchReader {
type Item = ArrowResult<RecordBatch>;

fn next(&mut self) -> Option<Self::Item> {
match self.array_reader.next_batch(self.batch_size) {
let to_read = match self.selection.as_mut() {
Some(selection) => loop {
let front = selection.pop_front()?;
if front.skip {
let skipped = match self.array_reader.skip_records(front.row_count) {
Ok(skipped) => skipped,
Err(e) => return Some(Err(e.into())),
};

if skipped != front.row_count {
return Some(Err(general_err!(
"failed to skip rows, expected {}, got {}",
front.row_count,
skipped
)
.into()));
}
continue;
}

let to_read = match front.row_count.checked_sub(self.batch_size) {
Some(remaining) => {
selection.push_front(RowSelection::skip(remaining));
self.batch_size
}
None => front.row_count,
};

break to_read;
},
None => self.batch_size,
};

match self.array_reader.next_batch(to_read) {
Err(error) => Some(Err(error.into())),
Ok(array) => {
let struct_array =
Expand Down Expand Up @@ -257,16 +341,30 @@ impl ParquetRecordBatchReader {
batch_size: usize,
array_reader: Box<dyn ArrayReader>,
) -> Result<Self> {
Ok(Self::new(batch_size, array_reader, None))
}

/// Create a new [`ParquetRecordBatchReader`] that will read at most `batch_size` rows at
/// a time from [`ArrayReader`] based on the configured `selection`. If `selection` is `None`
/// all rows will be returned
///
/// TODO: Make public once row selection fully implemented (#1792)
pub(crate) fn new(
batch_size: usize,
array_reader: Box<dyn ArrayReader>,
selection: Option<VecDeque<RowSelection>>,
) -> Self {
let schema = match array_reader.get_data_type() {
ArrowType::Struct(ref fields) => Schema::new(fields.clone()),
_ => unreachable!("Struct array reader's data type is not struct!"),
};

Ok(Self {
Self {
batch_size,
array_reader,
schema: Arc::new(schema),
})
selection,
}
}
}

Expand Down
10 changes: 9 additions & 1 deletion parquet/src/arrow/async_reader.rs
Expand Up @@ -97,7 +97,7 @@ use crate::arrow::arrow_reader::ParquetRecordBatchReader;
use crate::arrow::schema::parquet_to_arrow_schema;
use crate::arrow::ProjectionMask;
use crate::basic::Compression;
use crate::column::page::{Page, PageIterator, PageReader};
use crate::column::page::{Page, PageIterator, PageMetadata, PageReader};
use crate::compression::{create_codec, Codec};
use crate::errors::{ParquetError, Result};
use crate::file::footer::{decode_footer, decode_metadata};
Expand Down Expand Up @@ -551,6 +551,14 @@ impl PageReader for InMemoryColumnChunkReader {
// We are at the end of this column chunk and no more page left. Return None.
Ok(None)
}

fn peek_next_page(&self) -> Result<Option<PageMetadata>> {
Err(nyi_err!("https://github.com/apache/arrow-rs/issues/1792"))
}

fn skip_next_page(&mut self) -> Result<()> {
Err(nyi_err!("https://github.com/apache/arrow-rs/issues/1792"))
}
}

/// Implements [`PageIterator`] for a single column chunk, yielding a single [`PageReader`]
Expand Down

0 comments on commit e59b023

Please sign in to comment.