diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index e96b5d8fae7..052ef40ee84 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -400,22 +400,46 @@ impl ArrowReaderBuilder> { Self::new_builder(SyncReader(reader), metadata, options) } + /// Build a [`ParquetRecordBatchReader`] + /// + /// Note: this will eagerly evaluate any `RowFilter` before returning pub fn build(self) -> Result { let reader = FileReaderRowGroupCollection::new(Arc::new(self.input.0), self.row_groups); + + let mut filter = self.filter; + let mut selection = self.selection; + + if let Some(filter) = filter.as_mut() { + for predicate in filter.predicates.iter_mut() { + if !selects_any(selection.as_ref()) { + break; + } + + let projection = predicate.projection().clone(); + let array_reader = + build_array_reader(Arc::clone(&self.schema), projection, &reader)?; + + selection = Some(evaluate_predicate( + self.batch_size, + array_reader, + selection, + predicate.as_mut(), + )?); + } + } + let array_reader = build_array_reader(self.schema, self.projection, &reader)?; - if self.filter.is_some() { - // TODO: Support RowFilter within sync interface (#2431) - return Err(nyi_err!( - "RowFilter is currently not supported within the sync interface" - )); + // If selection is empty, truncate + if !selects_any(selection.as_ref()) { + selection = Some(RowSelection::from(vec![])); } Ok(ParquetRecordBatchReader::new( self.batch_size, array_reader, - self.selection, + selection, )) } } @@ -541,12 +565,16 @@ impl ParquetRecordBatchReader { } } +/// Returns `true` if `selection` is `None` or selects some rows +pub(crate) fn selects_any(selection: Option<&RowSelection>) -> bool { + selection.map(|x| x.selects_any()).unwrap_or(true) +} + /// Evaluates an [`ArrowPredicate`] returning the [`RowSelection`] /// /// If this [`ParquetRecordBatchReader`] has a [`RowSelection`], the /// returned [`RowSelection`] will be the conjunction of this and /// the rows selected by `predicate` -#[allow(unused)] pub(crate) fn evaluate_predicate( batch_size: usize, array_reader: Box, @@ -576,6 +604,7 @@ mod tests { use bytes::Bytes; use std::cmp::min; use std::collections::VecDeque; + use std::fmt::Formatter; use std::fs::File; use std::io::Seek; use std::path::PathBuf; @@ -591,8 +620,8 @@ mod tests { use arrow::record_batch::{RecordBatch, RecordBatchReader}; use crate::arrow::arrow_reader::{ - ArrowReaderOptions, ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder, - RowSelection, RowSelector, + ArrowPredicateFn, ArrowReaderOptions, ParquetRecordBatchReader, + ParquetRecordBatchReaderBuilder, RowFilter, RowSelection, RowSelector, }; use crate::arrow::buffer::converter::{ Converter, FixedSizeArrayConverter, IntervalDayTimeArrayConverter, @@ -1021,7 +1050,7 @@ mod tests { } /// Parameters for single_column_reader_test - #[derive(Debug, Clone)] + #[derive(Clone)] struct TestOptions { /// Number of row group to write to parquet (row group size = /// num_row_groups / num_rows) @@ -1047,8 +1076,30 @@ mod tests { enabled_statistics: EnabledStatistics, /// Encoding encoding: Encoding, - //row selections and total selected row count + /// row selections and total selected row count row_selections: Option<(RowSelection, usize)>, + /// row filter + row_filter: Option>, + } + + /// Manually implement this to avoid printing entire contents of row_selections and row_filter + impl std::fmt::Debug for TestOptions { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("TestOptions") + .field("num_row_groups", &self.num_row_groups) + .field("num_rows", &self.num_rows) + .field("record_batch_size", &self.record_batch_size) + .field("null_percent", &self.null_percent) + .field("write_batch_size", &self.write_batch_size) + .field("max_data_page_size", &self.max_data_page_size) + .field("max_dict_page_size", &self.max_dict_page_size) + .field("writer_version", &self.writer_version) + .field("enabled_statistics", &self.enabled_statistics) + .field("encoding", &self.encoding) + .field("row_selections", &self.row_selections.is_some()) + .field("row_filter", &self.row_filter.is_some()) + .finish() + } } impl Default for TestOptions { @@ -1065,6 +1116,7 @@ mod tests { enabled_statistics: EnabledStatistics::Page, encoding: Encoding::PLAIN, row_selections: None, + row_filter: None, } } } @@ -1108,6 +1160,8 @@ mod tests { } fn with_row_selections(self) -> Self { + assert!(self.row_filter.is_none(), "Must set row selection first"); + let mut rng = thread_rng(); let step = rng.gen_range(self.record_batch_size..self.num_rows); let row_selections = create_test_selection( @@ -1121,6 +1175,19 @@ mod tests { } } + fn with_row_filter(self) -> Self { + let row_count = match &self.row_selections { + Some((_, count)) => *count, + None => self.num_row_groups * self.num_rows, + }; + + let mut rng = thread_rng(); + Self { + row_filter: Some((0..row_count).map(|_| rng.gen_bool(0.9)).collect()), + ..self + } + } + fn writer_props(&self) -> WriterProperties { let builder = WriterProperties::builder() .set_data_pagesize_limit(self.max_data_page_size) @@ -1158,7 +1225,7 @@ mod tests { G: RandGen, F: Fn(&[Option]) -> ArrayRef, { - let mut all_options = vec![ + let all_options = vec![ // choose record_batch_batch (15) so batches cross row // group boundaries (50 rows in 2 row groups) cases. TestOptions::new(2, 100, 15), @@ -1187,9 +1254,8 @@ mod tests { TestOptions::new(2, 256, 91) .with_null_percent(25) .with_enabled_statistics(EnabledStatistics::None), - ]; + // Test skip - let skip_options = vec![ // choose record_batch_batch (15) so batches cross row // group boundaries (50 rows in 2 row groups) cases. TestOptions::new(2, 100, 15).with_row_selections(), @@ -1218,10 +1284,25 @@ mod tests { TestOptions::new(2, 256, 93) .with_null_percent(25) .with_row_selections(), + // Test filter + + // Test with row filter + TestOptions::new(4, 100, 25).with_row_filter(), + // Test with row selection and row filter + TestOptions::new(4, 100, 25) + .with_row_selections() + .with_row_filter(), + // Test with nulls and row filter + TestOptions::new(2, 256, 93) + .with_null_percent(25) + .with_row_filter(), + // Test with nulls and row filter + TestOptions::new(2, 256, 93) + .with_null_percent(25) + .with_row_selections() + .with_row_filter(), ]; - all_options.extend(skip_options); - all_options.into_iter().for_each(|opts| { for writer_version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] { @@ -1365,6 +1446,36 @@ mod tests { } }; + let expected_data = match opts.row_filter { + Some(filter) => { + let expected_data = expected_data + .into_iter() + .zip(filter.iter()) + .filter_map(|(d, f)| f.then(|| d)) + .collect(); + + let mut filter_offset = 0; + let filter = RowFilter::new(vec![Box::new(ArrowPredicateFn::new( + ProjectionMask::all(), + move |b| { + let array = BooleanArray::from_iter( + filter + .iter() + .skip(filter_offset) + .take(b.num_rows()) + .map(|x| Some(*x)), + ); + filter_offset += b.num_rows(); + Ok(array) + }, + ))]); + + builder = builder.with_row_filter(filter); + expected_data + } + None => expected_data, + }; + let mut record_reader = builder .with_batch_size(opts.record_batch_size) .build() diff --git a/parquet/src/arrow/async_reader.rs b/parquet/src/arrow/async_reader.rs index abe34cf1e88..6c449bef49e 100644 --- a/parquet/src/arrow/async_reader.rs +++ b/parquet/src/arrow/async_reader.rs @@ -96,8 +96,8 @@ use arrow::record_batch::RecordBatch; use crate::arrow::array_reader::{build_array_reader, RowGroupCollection}; use crate::arrow::arrow_reader::{ - evaluate_predicate, ArrowReaderBuilder, ParquetRecordBatchReader, RowFilter, - RowSelection, + evaluate_predicate, selects_any, ArrowReaderBuilder, ParquetRecordBatchReader, + RowFilter, RowSelection, }; use crate::arrow::ProjectionMask; use crate::basic::Compression; @@ -283,9 +283,6 @@ where batch_size: usize, ) -> ReadResult { // TODO: calling build_array multiple times is wasteful - let selects_any = |selection: Option<&RowSelection>| { - selection.map(|x| x.selects_any()).unwrap_or(true) - }; let meta = self.metadata.row_group(row_group_idx); let mut row_group = InMemoryRowGroup {