Skip to content

Commit

Permalink
Support RowFilter within ParquetRecordBatchReader (apache#2431)
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed Aug 15, 2022
1 parent 76cfe83 commit dd4a2df
Show file tree
Hide file tree
Showing 2 changed files with 128 additions and 21 deletions.
142 changes: 126 additions & 16 deletions parquet/src/arrow/arrow_reader/mod.rs
Expand Up @@ -400,22 +400,46 @@ impl<T: ChunkReader + 'static> ArrowReaderBuilder<SyncReader<T>> {
Self::new_builder(SyncReader(reader), metadata, options)
}

/// Build a [`ParquetRecordBatchReader`]
///
/// Note: this will eagerly evaluate any [`RowFilter`] before returning
pub fn build(self) -> Result<ParquetRecordBatchReader> {
let reader =
FileReaderRowGroupCollection::new(Arc::new(self.input.0), self.row_groups);

let mut filter = self.filter;
let mut selection = self.selection;

if let Some(filter) = filter.as_mut() {
for predicate in filter.predicates.iter_mut() {
if !selects_any(selection.as_ref()) {
break;
}

let projection = predicate.projection().clone();
let array_reader =
build_array_reader(Arc::clone(&self.schema), projection, &reader)?;

selection = Some(evaluate_predicate(
self.batch_size,
array_reader,
selection,
predicate.as_mut(),
)?);
}
}

let array_reader = build_array_reader(self.schema, self.projection, &reader)?;

if self.filter.is_some() {
// TODO: Support RowFilter within sync interface (#2431)
return Err(nyi_err!(
"RowFilter is currently not supported within the sync interface"
));
// If selection is empty, truncate
if !selects_any(selection.as_ref()) {
selection = Some(RowSelection::from(vec![]));
}

Ok(ParquetRecordBatchReader::new(
self.batch_size,
array_reader,
self.selection,
selection,
))
}
}
Expand Down Expand Up @@ -541,12 +565,16 @@ impl ParquetRecordBatchReader {
}
}

/// Returns `true` if `selection` is `None` or selects some rows
pub(crate) fn selects_any(selection: Option<&RowSelection>) -> bool {
selection.map(|x| x.selects_any()).unwrap_or(true)
}

/// Evaluates an [`ArrowPredicate`] returning the [`RowSelection`]
///
/// If this [`ParquetRecordBatchReader`] has a [`RowSelection`], the
/// returned [`RowSelection`] will be the conjunction of this and
/// the rows selected by `predicate`
#[allow(unused)]
pub(crate) fn evaluate_predicate(
batch_size: usize,
array_reader: Box<dyn ArrayReader>,
Expand Down Expand Up @@ -576,6 +604,7 @@ mod tests {
use bytes::Bytes;
use std::cmp::min;
use std::collections::VecDeque;
use std::fmt::Formatter;
use std::fs::File;
use std::io::Seek;
use std::path::PathBuf;
Expand All @@ -591,8 +620,8 @@ mod tests {
use arrow::record_batch::{RecordBatch, RecordBatchReader};

use crate::arrow::arrow_reader::{
ArrowReaderOptions, ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder,
RowSelection, RowSelector,
ArrowPredicateFn, ArrowReaderOptions, ParquetRecordBatchReader,
ParquetRecordBatchReaderBuilder, RowFilter, RowSelection, RowSelector,
};
use crate::arrow::buffer::converter::{
Converter, FixedSizeArrayConverter, IntervalDayTimeArrayConverter,
Expand Down Expand Up @@ -1021,7 +1050,7 @@ mod tests {
}

/// Parameters for single_column_reader_test
#[derive(Debug, Clone)]
#[derive(Clone)]
struct TestOptions {
/// Number of row group to write to parquet (row group size =
/// num_row_groups / num_rows)
Expand All @@ -1047,8 +1076,29 @@ mod tests {
enabled_statistics: EnabledStatistics,
/// Encoding
encoding: Encoding,
//row selections and total selected row count
/// row selections and total selected row count
row_selections: Option<(RowSelection, usize)>,
/// row filter
row_filter: Option<Vec<bool>>,
}

impl std::fmt::Debug for TestOptions {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TestOptions")
.field("num_row_groups", &self.num_row_groups)
.field("num_rows", &self.num_rows)
.field("record_batch_size", &self.record_batch_size)
.field("null_percent", &self.null_percent)
.field("write_batch_size", &self.write_batch_size)
.field("max_data_page_size", &self.max_data_page_size)
.field("max_dict_page_size", &self.max_dict_page_size)
.field("writer_version", &self.writer_version)
.field("enabled_statistics", &self.enabled_statistics)
.field("encoding", &self.encoding)
.field("row_selections", &self.row_selections.is_some())
.field("row_filter", &self.row_filter.is_some())
.finish()
}
}

impl Default for TestOptions {
Expand All @@ -1065,6 +1115,7 @@ mod tests {
enabled_statistics: EnabledStatistics::Page,
encoding: Encoding::PLAIN,
row_selections: None,
row_filter: None,
}
}
}
Expand Down Expand Up @@ -1108,6 +1159,8 @@ mod tests {
}

fn with_row_selections(self) -> Self {
assert!(self.row_filter.is_none(), "Must set row selection first");

let mut rng = thread_rng();
let step = rng.gen_range(self.record_batch_size..self.num_rows);
let row_selections = create_test_selection(
Expand All @@ -1121,6 +1174,19 @@ mod tests {
}
}

fn with_row_filter(self) -> Self {
let row_count = match &self.row_selections {
Some((_, count)) => *count,
None => self.num_row_groups * self.num_rows,
};

let mut rng = thread_rng();
Self {
row_filter: Some((0..row_count).map(|_| rng.gen_bool(0.9)).collect()),
..self
}
}

fn writer_props(&self) -> WriterProperties {
let builder = WriterProperties::builder()
.set_data_pagesize_limit(self.max_data_page_size)
Expand Down Expand Up @@ -1158,7 +1224,7 @@ mod tests {
G: RandGen<T>,
F: Fn(&[Option<T::T>]) -> ArrayRef,
{
let mut all_options = vec![
let all_options = vec![
// choose record_batch_batch (15) so batches cross row
// group boundaries (50 rows in 2 row groups) cases.
TestOptions::new(2, 100, 15),
Expand Down Expand Up @@ -1187,9 +1253,8 @@ mod tests {
TestOptions::new(2, 256, 91)
.with_null_percent(25)
.with_enabled_statistics(EnabledStatistics::None),
];
// Test skip

let skip_options = vec![
// choose record_batch_batch (15) so batches cross row
// group boundaries (50 rows in 2 row groups) cases.
TestOptions::new(2, 100, 15).with_row_selections(),
Expand Down Expand Up @@ -1218,10 +1283,25 @@ mod tests {
TestOptions::new(2, 256, 93)
.with_null_percent(25)
.with_row_selections(),
// Test filter

// Test with row filter
TestOptions::new(4, 100, 25).with_row_filter(),
// Test with row selection and row filter
TestOptions::new(4, 100, 25)
.with_row_selections()
.with_row_filter(),
// Test with nulls and row filter
TestOptions::new(2, 256, 93)
.with_null_percent(25)
.with_row_filter(),
// Test with nulls and row filter
TestOptions::new(2, 256, 93)
.with_null_percent(25)
.with_row_selections()
.with_row_filter(),
];

all_options.extend(skip_options);

all_options.into_iter().for_each(|opts| {
for writer_version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0]
{
Expand Down Expand Up @@ -1365,6 +1445,36 @@ mod tests {
}
};

let expected_data = match opts.row_filter {
Some(filter) => {
let expected_data = expected_data
.into_iter()
.zip(filter.iter())
.filter_map(|(d, f)| f.then(|| d))
.collect();

let mut filter_offset = 0;
let filter = RowFilter::new(vec![Box::new(ArrowPredicateFn::new(
ProjectionMask::all(),
move |b| {
let array = BooleanArray::from_iter(
filter
.iter()
.skip(filter_offset)
.take(b.num_rows())
.map(|x| Some(*x)),
);
filter_offset += b.num_rows();
Ok(array)
},
))]);

builder = builder.with_row_filter(filter);
expected_data
}
None => expected_data,
};

let mut record_reader = builder
.with_batch_size(opts.record_batch_size)
.build()
Expand Down
7 changes: 2 additions & 5 deletions parquet/src/arrow/async_reader.rs
Expand Up @@ -96,8 +96,8 @@ use arrow::record_batch::RecordBatch;

use crate::arrow::array_reader::{build_array_reader, RowGroupCollection};
use crate::arrow::arrow_reader::{
evaluate_predicate, ArrowReaderBuilder, ParquetRecordBatchReader, RowFilter,
RowSelection,
evaluate_predicate, selects_any, ArrowReaderBuilder, ParquetRecordBatchReader,
RowFilter, RowSelection,
};
use crate::arrow::ProjectionMask;
use crate::basic::Compression;
Expand Down Expand Up @@ -283,9 +283,6 @@ where
batch_size: usize,
) -> ReadResult<T> {
// TODO: calling build_array multiple times is wasteful
let selects_any = |selection: Option<&RowSelection>| {
selection.map(|x| x.selects_any()).unwrap_or(true)
};

let meta = self.metadata.row_group(row_group_idx);
let mut row_group = InMemoryRowGroup {
Expand Down

0 comments on commit dd4a2df

Please sign in to comment.