Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stub out Skip Records API (#1792) #1998

Merged
merged 5 commits into from Jul 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 8 additions & 0 deletions parquet/src/arrow/array_reader/byte_array.rs
Expand Up @@ -122,6 +122,10 @@ impl<I: OffsetSizeTrait + ScalarValue> ArrayReader for ByteArrayReader<I> {
Ok(buffer.into_array(null_buffer, self.data_type.clone()))
}

fn skip_records(&mut self, num_records: usize) -> Result<usize> {
self.record_reader.skip_records(num_records)
}

fn get_def_levels(&self) -> Option<&[i16]> {
self.def_levels_buffer
.as_ref()
Expand Down Expand Up @@ -210,6 +214,10 @@ impl<I: OffsetSizeTrait + ScalarValue> ColumnValueDecoder

decoder.read(out, range.end - range.start, self.dict.as_ref())
}

fn skip_values(&mut self, _num_values: usize) -> Result<usize> {
Err(nyi_err!("https://github.com/apache/arrow-rs/issues/1792"))
}
}

/// A generic decoder from uncompressed parquet value data to [`OffsetBuffer`]
Expand Down
8 changes: 8 additions & 0 deletions parquet/src/arrow/array_reader/byte_array_dictionary.rs
Expand Up @@ -184,6 +184,10 @@ where
Ok(array)
}

fn skip_records(&mut self, num_records: usize) -> Result<usize> {
self.record_reader.skip_records(num_records)
}

fn get_def_levels(&self) -> Option<&[i16]> {
self.def_levels_buffer
.as_ref()
Expand Down Expand Up @@ -371,6 +375,10 @@ where
}
}
}

fn skip_values(&mut self, _num_values: usize) -> Result<usize> {
Err(nyi_err!("https://github.com/apache/arrow-rs/issues/1792"))
}
}

#[cfg(test)]
Expand Down
7 changes: 7 additions & 0 deletions parquet/src/arrow/array_reader/complex_object_array.rs
Expand Up @@ -163,6 +163,13 @@ where
Ok(array)
}

fn skip_records(&mut self, num_records: usize) -> Result<usize> {
match self.column_reader.as_mut() {
Some(reader) => reader.skip_records(num_records),
None => Ok(0),
}
}

fn get_def_levels(&self) -> Option<&[i16]> {
self.def_levels_buffer.as_deref()
}
Expand Down
6 changes: 6 additions & 0 deletions parquet/src/arrow/array_reader/empty_array.rs
Expand Up @@ -65,6 +65,12 @@ impl ArrayReader for EmptyArrayReader {
Ok(Arc::new(StructArray::from(data)))
}

fn skip_records(&mut self, num_records: usize) -> Result<usize> {
let skipped = self.remaining_rows.min(num_records);
self.remaining_rows -= skipped;
Ok(skipped)
}

fn get_def_levels(&self) -> Option<&[i16]> {
None
}
Expand Down
4 changes: 4 additions & 0 deletions parquet/src/arrow/array_reader/list_array.rs
Expand Up @@ -231,6 +231,10 @@ impl<OffsetSize: OffsetSizeTrait> ArrayReader for ListArrayReader<OffsetSize> {
Ok(Arc::new(result_array))
}

fn skip_records(&mut self, num_records: usize) -> Result<usize> {
self.item_reader.skip_records(num_records)
}

fn get_def_levels(&self) -> Option<&[i16]> {
self.item_reader.get_def_levels()
}
Expand Down
13 changes: 13 additions & 0 deletions parquet/src/arrow/array_reader/map_array.rs
Expand Up @@ -149,6 +149,19 @@ impl ArrayReader for MapArrayReader {
Ok(Arc::new(MapArray::from(array_data)))
}

fn skip_records(&mut self, num_records: usize) -> Result<usize> {
let key_skipped = self.key_reader.skip_records(num_records)?;
let value_skipped = self.value_reader.skip_records(num_records)?;
if key_skipped != value_skipped {
return Err(general_err!(
"MapArrayReader out of sync, skipped {} keys and {} values",
key_skipped,
value_skipped
));
}
Ok(key_skipped)
}

fn get_def_levels(&self) -> Option<&[i16]> {
// Children definition levels should describe the same parent structure,
// so return key_reader only
Expand Down
3 changes: 3 additions & 0 deletions parquet/src/arrow/array_reader/mod.rs
Expand Up @@ -64,6 +64,9 @@ pub trait ArrayReader: Send {
/// Reads at most `batch_size` records into an arrow array and return it.
fn next_batch(&mut self, batch_size: usize) -> Result<ArrayRef>;

/// Skips over `num_records` records, returning the number of rows skipped
fn skip_records(&mut self, num_records: usize) -> Result<usize>;

/// If this array has a non-zero definition level, i.e. has a nullable parent
/// array, returns the definition levels of data from the last call of `next_batch`
///
Expand Down
4 changes: 4 additions & 0 deletions parquet/src/arrow/array_reader/null_array.rs
Expand Up @@ -96,6 +96,10 @@ where
Ok(Arc::new(array))
}

fn skip_records(&mut self, num_records: usize) -> Result<usize> {
self.record_reader.skip_records(num_records)
}

fn get_def_levels(&self) -> Option<&[i16]> {
self.def_levels_buffer.as_ref().map(|buf| buf.typed_data())
}
Expand Down
4 changes: 4 additions & 0 deletions parquet/src/arrow/array_reader/primitive_array.rs
Expand Up @@ -233,6 +233,10 @@ where
Ok(array)
}

fn skip_records(&mut self, num_records: usize) -> Result<usize> {
self.record_reader.skip_records(num_records)
}

fn get_def_levels(&self) -> Option<&[i16]> {
self.def_levels_buffer.as_ref().map(|buf| buf.typed_data())
}
Expand Down
20 changes: 20 additions & 0 deletions parquet/src/arrow/array_reader/struct_array.rs
Expand Up @@ -157,6 +157,26 @@ impl ArrayReader for StructArrayReader {
Ok(Arc::new(StructArray::from(array_data)))
}

fn skip_records(&mut self, num_records: usize) -> Result<usize> {
let mut skipped = None;
for child in self.children.iter_mut() {
let child_skipped = child.skip_records(num_records)?;
match skipped {
Some(expected) => {
if expected != child_skipped {
return Err(general_err!(
"StructArrayReader out of sync, expected {} skipped, got {}",
expected,
child_skipped
));
}
}
None => skipped = Some(child_skipped),
}
}
Ok(skipped.unwrap_or(0))
}

fn get_def_levels(&self) -> Option<&[i16]> {
// Children definition levels should describe the same
// parent structure, so return first child's
Expand Down
5 changes: 5 additions & 0 deletions parquet/src/arrow/array_reader/test_util.rs
Expand Up @@ -170,6 +170,11 @@ impl ArrayReader for InMemoryArrayReader {
Ok(self.array.slice(self.last_idx, read))
}

fn skip_records(&mut self, num_records: usize) -> Result<usize> {
let array = self.next_batch(num_records)?;
Ok(array.len())
}

fn get_def_levels(&self) -> Option<&[i16]> {
self.def_levels
.as_ref()
Expand Down
108 changes: 103 additions & 5 deletions parquet/src/arrow/arrow_reader.rs
Expand Up @@ -17,6 +17,7 @@

//! Contains reader which reads parquet data into arrow [`RecordBatch`]

use std::collections::VecDeque;
use std::sync::Arc;

use arrow::array::Array;
Expand All @@ -29,7 +30,7 @@ use crate::arrow::array_reader::{build_array_reader, ArrayReader};
use crate::arrow::schema::parquet_to_arrow_schema;
use crate::arrow::schema::parquet_to_arrow_schema_by_columns;
use crate::arrow::ProjectionMask;
use crate::errors::Result;
use crate::errors::{ParquetError, Result};
use crate::file::metadata::{KeyValue, ParquetMetaData};
use crate::file::reader::{ChunkReader, FileReader, SerializedFileReader};
use crate::schema::types::SchemaDescriptor;
Expand Down Expand Up @@ -70,9 +71,39 @@ pub trait ArrowReader {
) -> Result<Self::RecordReader>;
}

/// [`RowSelection`] allows selecting or skipping a provided number of rows
/// when scanning the parquet file
#[derive(Debug, Clone, Copy)]
pub(crate) struct RowSelection {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You probably already have thought about this, but I would expect that in certain scenarios, non contiguous rows / skips would be desired

Like "fetch the first 100 rows, skip the next 200, and then fetch the remaining"

Would this interface handle that case?

Copy link
Contributor Author

@tustvold tustvold Jul 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See with_row_selection which takes a Vec to allow for this use-case

/// The number of rows
pub row_count: usize,

/// If true, skip `row_count` rows
pub skip: bool,
}

impl RowSelection {
/// Select `row_count` rows
pub fn select(row_count: usize) -> Self {
Self {
row_count,
skip: false,
}
}

/// Skip `row_count` rows
pub fn skip(row_count: usize) -> Self {
Self {
row_count,
skip: true,
}
}
}

#[derive(Debug, Clone, Default)]
pub struct ArrowReaderOptions {
skip_arrow_metadata: bool,
selection: Option<Vec<RowSelection>>,
}

impl ArrowReaderOptions {
Expand All @@ -90,6 +121,20 @@ impl ArrowReaderOptions {
pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self {
Self {
skip_arrow_metadata,
..self
}
}

/// Scan rows from the parquet file according to the provided `selection`
///
/// TODO: Make public once row selection fully implemented (#1792)
pub(crate) fn with_row_selection(
self,
selection: impl Into<Vec<RowSelection>>,
) -> Self {
Copy link
Member

@Ted-Jiang Ted-Jiang Jul 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add total_row_count to check this selection is valid(maybe like continuous)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it actually an issue if it isn't, e.g. if I only want the first 100 rows?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, got it, it should check in user side.

Self {
selection: Some(selection.into()),
..self
}
}
}
Expand Down Expand Up @@ -139,7 +184,12 @@ impl ArrowReader for ParquetFileArrowReader {
Box::new(self.file_reader.clone()),
)?;

ParquetRecordBatchReader::try_new(batch_size, array_reader)
let selection = self.options.selection.clone().map(Into::into);
Ok(ParquetRecordBatchReader::new(
batch_size,
array_reader,
selection,
))
}
}

Expand Down Expand Up @@ -221,13 +271,47 @@ pub struct ParquetRecordBatchReader {
batch_size: usize,
array_reader: Box<dyn ArrayReader>,
schema: SchemaRef,
selection: Option<VecDeque<RowSelection>>,
}

impl Iterator for ParquetRecordBatchReader {
type Item = ArrowResult<RecordBatch>;

fn next(&mut self) -> Option<Self::Item> {
match self.array_reader.next_batch(self.batch_size) {
let to_read = match self.selection.as_mut() {
Copy link
Member

@Ted-Jiang Ted-Jiang Jul 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

馃憤 pass mask here not each col is more reasonable 馃槀

Some(selection) => loop {
let front = selection.pop_front()?;
if front.skip {
let skipped = match self.array_reader.skip_records(front.row_count) {
Ok(skipped) => skipped,
Err(e) => return Some(Err(e.into())),
};

if skipped != front.row_count {
return Some(Err(general_err!(
"failed to skip rows, expected {}, got {}",
front.row_count,
skipped
)
.into()));
}
continue;
}

let to_read = match front.row_count.checked_sub(self.batch_size) {
Some(remaining) => {
selection.push_front(RowSelection::skip(remaining));
self.batch_size
}
None => front.row_count,
};

break to_read;
},
None => self.batch_size,
};

match self.array_reader.next_batch(to_read) {
Err(error) => Some(Err(error.into())),
Ok(array) => {
let struct_array =
Expand Down Expand Up @@ -257,16 +341,30 @@ impl ParquetRecordBatchReader {
batch_size: usize,
array_reader: Box<dyn ArrayReader>,
) -> Result<Self> {
Ok(Self::new(batch_size, array_reader, None))
}

/// Create a new [`ParquetRecordBatchReader`] that will read at most `batch_size` rows at
/// a time from [`ArrayReader`] based on the configured `selection`. If `selection` is `None`
/// all rows will be returned
///
/// TODO: Make public once row selection fully implemented (#1792)
pub(crate) fn new(
batch_size: usize,
array_reader: Box<dyn ArrayReader>,
selection: Option<VecDeque<RowSelection>>,
) -> Self {
let schema = match array_reader.get_data_type() {
ArrowType::Struct(ref fields) => Schema::new(fields.clone()),
_ => unreachable!("Struct array reader's data type is not struct!"),
};

Ok(Self {
Self {
batch_size,
array_reader,
schema: Arc::new(schema),
})
selection,
}
}
}

Expand Down
10 changes: 9 additions & 1 deletion parquet/src/arrow/async_reader.rs
Expand Up @@ -97,7 +97,7 @@ use crate::arrow::arrow_reader::ParquetRecordBatchReader;
use crate::arrow::schema::parquet_to_arrow_schema;
use crate::arrow::ProjectionMask;
use crate::basic::Compression;
use crate::column::page::{Page, PageIterator, PageReader};
use crate::column::page::{Page, PageIterator, PageMetadata, PageReader};
use crate::compression::{create_codec, Codec};
use crate::errors::{ParquetError, Result};
use crate::file::footer::{decode_footer, decode_metadata};
Expand Down Expand Up @@ -551,6 +551,14 @@ impl PageReader for InMemoryColumnChunkReader {
// We are at the end of this column chunk and no more page left. Return None.
Ok(None)
}

fn peek_next_page(&self) -> Result<Option<PageMetadata>> {
Err(nyi_err!("https://github.com/apache/arrow-rs/issues/1792"))
}

fn skip_next_page(&mut self) -> Result<()> {
Err(nyi_err!("https://github.com/apache/arrow-rs/issues/1792"))
}
}

/// Implements [`PageIterator`] for a single column chunk, yielding a single [`PageReader`]
Expand Down