Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support RowFilter within ParquetRecordBatchReader (#2431) #2452

Merged
merged 3 commits into from Aug 15, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
142 changes: 126 additions & 16 deletions parquet/src/arrow/arrow_reader/mod.rs
Expand Up @@ -400,22 +400,46 @@ impl<T: ChunkReader + 'static> ArrowReaderBuilder<SyncReader<T>> {
Self::new_builder(SyncReader(reader), metadata, options)
}

/// Build a [`ParquetRecordBatchReader`]
///
/// Note: this will eagerly evaluate any [`RowFilter`] before returning
pub fn build(self) -> Result<ParquetRecordBatchReader> {
let reader =
FileReaderRowGroupCollection::new(Arc::new(self.input.0), self.row_groups);

let mut filter = self.filter;
let mut selection = self.selection;

if let Some(filter) = filter.as_mut() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand the mut here -- it seems off to me because I wouldn't expect that the filter / predicate contains state that could be modified. Maybe I misunderstand something

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I purposefully opted to make ArrowPredicate take &mut self, this can be exploited to allow for tracking of the current position - something this PR in fact makes use of.

for predicate in filter.predicates.iter_mut() {
if !selects_any(selection.as_ref()) {
break;
}

let projection = predicate.projection().clone();
let array_reader =
build_array_reader(Arc::clone(&self.schema), projection, &reader)?;

selection = Some(evaluate_predicate(
self.batch_size,
array_reader,
selection,
predicate.as_mut(),
)?);
}
}

let array_reader = build_array_reader(self.schema, self.projection, &reader)?;

if self.filter.is_some() {
// TODO: Support RowFilter within sync interface (#2431)
return Err(nyi_err!(
"RowFilter is currently not supported within the sync interface"
));
// If selection is empty, truncate
if !selects_any(selection.as_ref()) {
selection = Some(RowSelection::from(vec![]));
}

Ok(ParquetRecordBatchReader::new(
self.batch_size,
array_reader,
self.selection,
selection,
))
}
}
Expand Down Expand Up @@ -541,12 +565,16 @@ impl ParquetRecordBatchReader {
}
}

/// Returns `true` if `selection` is `None` or selects some rows
pub(crate) fn selects_any(selection: Option<&RowSelection>) -> bool {
selection.map(|x| x.selects_any()).unwrap_or(true)
}

/// Evaluates an [`ArrowPredicate`] returning the [`RowSelection`]
///
/// If this [`ParquetRecordBatchReader`] has a [`RowSelection`], the
/// returned [`RowSelection`] will be the conjunction of this and
/// the rows selected by `predicate`
#[allow(unused)]
pub(crate) fn evaluate_predicate(
batch_size: usize,
array_reader: Box<dyn ArrayReader>,
Expand Down Expand Up @@ -576,6 +604,7 @@ mod tests {
use bytes::Bytes;
use std::cmp::min;
use std::collections::VecDeque;
use std::fmt::Formatter;
use std::fs::File;
use std::io::Seek;
use std::path::PathBuf;
Expand All @@ -591,8 +620,8 @@ mod tests {
use arrow::record_batch::{RecordBatch, RecordBatchReader};

use crate::arrow::arrow_reader::{
ArrowReaderOptions, ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder,
RowSelection, RowSelector,
ArrowPredicateFn, ArrowReaderOptions, ParquetRecordBatchReader,
ParquetRecordBatchReaderBuilder, RowFilter, RowSelection, RowSelector,
};
use crate::arrow::buffer::converter::{
Converter, FixedSizeArrayConverter, IntervalDayTimeArrayConverter,
Expand Down Expand Up @@ -1021,7 +1050,7 @@ mod tests {
}

/// Parameters for single_column_reader_test
#[derive(Debug, Clone)]
#[derive(Clone)]
struct TestOptions {
/// Number of row group to write to parquet (row group size =
/// num_row_groups / num_rows)
Expand All @@ -1047,8 +1076,29 @@ mod tests {
enabled_statistics: EnabledStatistics,
/// Encoding
encoding: Encoding,
//row selections and total selected row count
/// row selections and total selected row count
row_selections: Option<(RowSelection, usize)>,
/// row filter
row_filter: Option<Vec<bool>>,
}

impl std::fmt::Debug for TestOptions {
tustvold marked this conversation as resolved.
Show resolved Hide resolved
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TestOptions")
.field("num_row_groups", &self.num_row_groups)
.field("num_rows", &self.num_rows)
.field("record_batch_size", &self.record_batch_size)
.field("null_percent", &self.null_percent)
.field("write_batch_size", &self.write_batch_size)
.field("max_data_page_size", &self.max_data_page_size)
.field("max_dict_page_size", &self.max_dict_page_size)
.field("writer_version", &self.writer_version)
.field("enabled_statistics", &self.enabled_statistics)
.field("encoding", &self.encoding)
.field("row_selections", &self.row_selections.is_some())
.field("row_filter", &self.row_filter.is_some())
.finish()
}
}

impl Default for TestOptions {
Expand All @@ -1065,6 +1115,7 @@ mod tests {
enabled_statistics: EnabledStatistics::Page,
encoding: Encoding::PLAIN,
row_selections: None,
row_filter: None,
}
}
}
Expand Down Expand Up @@ -1108,6 +1159,8 @@ mod tests {
}

fn with_row_selections(self) -> Self {
assert!(self.row_filter.is_none(), "Must set row selection first");

let mut rng = thread_rng();
let step = rng.gen_range(self.record_batch_size..self.num_rows);
let row_selections = create_test_selection(
Expand All @@ -1121,6 +1174,19 @@ mod tests {
}
}

fn with_row_filter(self) -> Self {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am pretty chuffed with this fuzz test harness, in particular how easy it is to add new test cases. It has definitely been worth the investment

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIL "chuffed": https://www.merriam-webster.com/dictionary/chuffed (not common in American usage 😆 )

let row_count = match &self.row_selections {
Some((_, count)) => *count,
None => self.num_row_groups * self.num_rows,
};

let mut rng = thread_rng();
Self {
row_filter: Some((0..row_count).map(|_| rng.gen_bool(0.9)).collect()),
..self
}
}

fn writer_props(&self) -> WriterProperties {
let builder = WriterProperties::builder()
.set_data_pagesize_limit(self.max_data_page_size)
Expand Down Expand Up @@ -1158,7 +1224,7 @@ mod tests {
G: RandGen<T>,
F: Fn(&[Option<T::T>]) -> ArrayRef,
{
let mut all_options = vec![
let all_options = vec![
// choose record_batch_batch (15) so batches cross row
// group boundaries (50 rows in 2 row groups) cases.
TestOptions::new(2, 100, 15),
Expand Down Expand Up @@ -1187,9 +1253,8 @@ mod tests {
TestOptions::new(2, 256, 91)
.with_null_percent(25)
.with_enabled_statistics(EnabledStatistics::None),
];
// Test skip

let skip_options = vec![
// choose record_batch_batch (15) so batches cross row
// group boundaries (50 rows in 2 row groups) cases.
TestOptions::new(2, 100, 15).with_row_selections(),
Expand Down Expand Up @@ -1218,10 +1283,25 @@ mod tests {
TestOptions::new(2, 256, 93)
.with_null_percent(25)
.with_row_selections(),
// Test filter

// Test with row filter
TestOptions::new(4, 100, 25).with_row_filter(),
// Test with row selection and row filter
TestOptions::new(4, 100, 25)
.with_row_selections()
.with_row_filter(),
// Test with nulls and row filter
TestOptions::new(2, 256, 93)
.with_null_percent(25)
.with_row_filter(),
// Test with nulls and row filter
TestOptions::new(2, 256, 93)
.with_null_percent(25)
.with_row_selections()
.with_row_filter(),
];

all_options.extend(skip_options);

all_options.into_iter().for_each(|opts| {
for writer_version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0]
{
Expand Down Expand Up @@ -1365,6 +1445,36 @@ mod tests {
}
};

let expected_data = match opts.row_filter {
Some(filter) => {
let expected_data = expected_data
.into_iter()
.zip(filter.iter())
.filter_map(|(d, f)| f.then(|| d))
.collect();

let mut filter_offset = 0;
let filter = RowFilter::new(vec![Box::new(ArrowPredicateFn::new(
ProjectionMask::all(),
move |b| {
let array = BooleanArray::from_iter(
filter
.iter()
.skip(filter_offset)
.take(b.num_rows())
.map(|x| Some(*x)),
);
filter_offset += b.num_rows();
Ok(array)
},
))]);

builder = builder.with_row_filter(filter);
expected_data
}
None => expected_data,
};

let mut record_reader = builder
.with_batch_size(opts.record_batch_size)
.build()
Expand Down
7 changes: 2 additions & 5 deletions parquet/src/arrow/async_reader.rs
Expand Up @@ -96,8 +96,8 @@ use arrow::record_batch::RecordBatch;

use crate::arrow::array_reader::{build_array_reader, RowGroupCollection};
use crate::arrow::arrow_reader::{
evaluate_predicate, ArrowReaderBuilder, ParquetRecordBatchReader, RowFilter,
RowSelection,
evaluate_predicate, selects_any, ArrowReaderBuilder, ParquetRecordBatchReader,
RowFilter, RowSelection,
};
use crate::arrow::ProjectionMask;
use crate::basic::Compression;
Expand Down Expand Up @@ -283,9 +283,6 @@ where
batch_size: usize,
) -> ReadResult<T> {
// TODO: calling build_array multiple times is wasteful
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this TODO still valid?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I intend to fix in a follow up

let selects_any = |selection: Option<&RowSelection>| {
selection.map(|x| x.selects_any()).unwrap_or(true)
};

let meta = self.metadata.row_group(row_group_idx);
let mut row_group = InMemoryRowGroup {
Expand Down