Skip to content

Commit

Permalink
Pushdown RowFilter in ParquetExec (#3380)
Browse files Browse the repository at this point in the history
* Initial implementation

* Remove debugging cruft

* Add license header

* PR comments

* no need to prep null mask

* PR comments

* unit tests

* Fix comment

* fix doc string

* Update datafusion/core/src/physical_plan/file_format/row_filter.rs

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>

* Update datafusion/expr/src/expr_fn.rs

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>

* Update datafusion/core/src/physical_plan/file_format/parquet.rs

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>

* PR comments

* Fix compiler error after rebase

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
  • Loading branch information
thinkharderdev and alamb committed Sep 13, 2022
1 parent af0d50a commit 9fbee1a
Show file tree
Hide file tree
Showing 5 changed files with 652 additions and 24 deletions.
1 change: 1 addition & 0 deletions datafusion/core/src/physical_optimizer/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ use arrow::{
record_batch::RecordBatch,
};
use datafusion_expr::expr_rewriter::{ExprRewritable, ExprRewriter};

use datafusion_expr::utils::expr_to_columns;
use datafusion_expr::{binary_expr, cast, try_cast, ExprSchemable};
use datafusion_physical_expr::create_physical_expr;
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/physical_plan/file_format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ mod delimited_stream;
mod file_stream;
mod json;
mod parquet;
mod row_filter;

pub(crate) use self::csv::plan_to_csv;
pub use self::csv::CsvExec;
Expand Down
190 changes: 166 additions & 24 deletions datafusion/core/src/physical_plan/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use crate::datasource::listing::FileRange;
use crate::physical_plan::file_format::file_stream::{
FileOpenFuture, FileOpener, FileStream,
};
use crate::physical_plan::file_format::row_filter::build_row_filter;
use crate::physical_plan::file_format::FileMeta;
use crate::{
error::{DataFusionError, Result},
Expand Down Expand Up @@ -67,6 +68,30 @@ use parquet::file::{
};
use parquet::schema::types::ColumnDescriptor;

#[derive(Debug, Clone, Default)]
/// Specify options for the parquet scan
pub struct ParquetScanOptions {
/// If true, any available `pruning_predicate` will be converted to a `RowFilter`
/// and pushed down to the `ParquetRecordBatchStream`. This will enable row level
/// filter at the decoder level. Defaults to false
pushdown_filters: bool,
/// If true, the generated `RowFilter` may reorder the predicate `Expr`s to try and optimize
/// the cost of filter evaluation.
reorder_predicates: bool,
}

impl ParquetScanOptions {
pub fn with_pushdown_filters(mut self, pushdown_filters: bool) -> Self {
self.pushdown_filters = pushdown_filters;
self
}

pub fn with_reorder_predicates(mut self, reorder_predicates: bool) -> Self {
self.reorder_predicates = reorder_predicates;
self
}
}

/// Execution plan for scanning one or more Parquet partitions
#[derive(Debug, Clone)]
pub struct ParquetExec {
Expand All @@ -81,6 +106,8 @@ pub struct ParquetExec {
metadata_size_hint: Option<usize>,
/// Optional user defined parquet file reader factory
parquet_file_reader_factory: Option<Arc<dyn ParquetFileReaderFactory>>,
/// Options to specify behavior of parquet scan
scan_options: ParquetScanOptions,
}

impl ParquetExec {
Expand Down Expand Up @@ -121,6 +148,7 @@ impl ParquetExec {
pruning_predicate,
metadata_size_hint,
parquet_file_reader_factory: None,
scan_options: ParquetScanOptions::default(),
}
}

Expand Down Expand Up @@ -148,6 +176,12 @@ impl ParquetExec {
self.parquet_file_reader_factory = Some(parquet_file_reader_factory);
self
}

/// Configure `ParquetScanOptions`
pub fn with_scan_options(mut self, scan_options: ParquetScanOptions) -> Self {
self.scan_options = scan_options;
self
}
}

/// Stores metrics about the parquet execution for a particular parquet file.
Expand Down Expand Up @@ -258,6 +292,7 @@ impl ExecutionPlan for ParquetExec {
metadata_size_hint: self.metadata_size_hint,
metrics: self.metrics.clone(),
parquet_file_reader_factory,
scan_options: self.scan_options.clone(),
};

let stream = FileStream::new(
Expand Down Expand Up @@ -319,6 +354,7 @@ struct ParquetOpener {
metadata_size_hint: Option<usize>,
metrics: ExecutionPlanMetricsSet,
parquet_file_reader_factory: Arc<dyn ParquetFileReaderFactory>,
scan_options: ParquetScanOptions,
}

impl FileOpener for ParquetOpener {
Expand Down Expand Up @@ -347,9 +383,12 @@ impl FileOpener for ParquetOpener {
let batch_size = self.batch_size;
let projection = self.projection.clone();
let pruning_predicate = self.pruning_predicate.clone();
let table_schema = self.table_schema.clone();
let reorder_predicates = self.scan_options.reorder_predicates;
let pushdown_filters = self.scan_options.pushdown_filters;

Ok(Box::pin(async move {
let builder = ParquetRecordBatchStreamBuilder::new(reader).await?;
let mut builder = ParquetRecordBatchStreamBuilder::new(reader).await?;
let adapted_projections =
schema_adapter.map_projections(builder.schema(), &projection)?;

Expand All @@ -358,6 +397,21 @@ impl FileOpener for ParquetOpener {
adapted_projections.iter().cloned(),
);

if let Some(predicate) = pushdown_filters
.then(|| pruning_predicate.as_ref().map(|p| p.logical_expr()))
.flatten()
{
if let Ok(Some(filter)) = build_row_filter(
predicate.clone(),
builder.schema().as_ref(),
table_schema.as_ref(),
builder.metadata(),
reorder_predicates,
) {
builder = builder.with_row_filter(filter);
}
};

let groups = builder.metadata().row_groups();
let row_groups =
prune_row_groups(groups, file_range, pruning_predicate, &metrics);
Expand Down Expand Up @@ -839,6 +893,7 @@ mod tests {
projection: Option<Vec<usize>>,
schema: Option<SchemaRef>,
predicate: Option<Expr>,
pushdown_predicate: bool,
) -> Result<Vec<RecordBatch>> {
let file_schema = match schema {
Some(schema) => schema,
Expand All @@ -851,7 +906,7 @@ mod tests {
let file_groups = meta.into_iter().map(Into::into).collect();

// prepare the scan
let parquet_exec = ParquetExec::new(
let mut parquet_exec = ParquetExec::new(
FileScanConfig {
object_store_url: ObjectStoreUrl::local_filesystem(),
file_groups: vec![file_groups],
Expand All @@ -865,6 +920,14 @@ mod tests {
None,
);

if pushdown_predicate {
parquet_exec = parquet_exec.with_scan_options(
ParquetScanOptions::default()
.with_pushdown_filters(true)
.with_reorder_predicates(true),
);
}

let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
collect(Arc::new(parquet_exec), task_ctx).await
Expand Down Expand Up @@ -912,9 +975,10 @@ mod tests {
let batch3 = add_to_batch(&batch1, "c3", c3);

// read/write them files:
let read = round_trip_to_parquet(vec![batch1, batch2, batch3], None, None, None)
.await
.unwrap();
let read =
round_trip_to_parquet(vec![batch1, batch2, batch3], None, None, None, false)
.await
.unwrap();
let expected = vec![
"+-----+----+----+",
"| c1 | c2 | c3 |",
Expand Down Expand Up @@ -953,7 +1017,7 @@ mod tests {
let batch2 = create_batch(vec![("c3", c3), ("c2", c2), ("c1", c1)]);

// read/write them files:
let read = round_trip_to_parquet(vec![batch1, batch2], None, None, None)
let read = round_trip_to_parquet(vec![batch1, batch2], None, None, None, false)
.await
.unwrap();
let expected = vec![
Expand Down Expand Up @@ -987,7 +1051,7 @@ mod tests {
let batch2 = create_batch(vec![("c3", c3), ("c2", c2)]);

// read/write them files:
let read = round_trip_to_parquet(vec![batch1, batch2], None, None, None)
let read = round_trip_to_parquet(vec![batch1, batch2], None, None, None, false)
.await
.unwrap();
let expected = vec![
Expand Down Expand Up @@ -1020,24 +1084,60 @@ mod tests {
// batch2: c3(int8), c2(int64)
let batch2 = create_batch(vec![("c3", c3), ("c2", c2)]);

let filter = col("c2").eq(lit(0_i64));
let filter = col("c2").eq(lit(2_i64));

// read/write them files:
let read = round_trip_to_parquet(vec![batch1, batch2], None, None, Some(filter))
.await
.unwrap();
let read =
round_trip_to_parquet(vec![batch1, batch2], None, None, Some(filter), false)
.await
.unwrap();
let expected = vec![
"+-----+----+----+",
"| c1 | c3 | c2 |",
"+-----+----+----+",
"| Foo | 10 | |",
"| | | |",
"| | 10 | 1 |",
"| | 20 | |",
"| | 20 | 2 |",
"| Foo | 10 | |",
"| bar | | |",
"+-----+----+----+",
];
assert_batches_sorted_eq!(expected, &read);
}

#[tokio::test]
async fn evolved_schema_intersection_filter_with_filter_pushdown() {
let c1: ArrayRef =
Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));

let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None]));

let c3: ArrayRef = Arc::new(Int8Array::from(vec![Some(10), Some(20), None]));

// batch1: c1(string), c3(int8)
let batch1 = create_batch(vec![("c1", c1), ("c3", c3.clone())]);

// batch2: c3(int8), c2(int64)
let batch2 = create_batch(vec![("c3", c3), ("c2", c2)]);

let filter = col("c2").eq(lit(2_i64));

// read/write them files:
let read =
round_trip_to_parquet(vec![batch1, batch2], None, None, Some(filter), true)
.await
.unwrap();
let expected = vec![
"+----+----+----+",
"| c1 | c3 | c2 |",
"+----+----+----+",
"| | 20 | 2 |",
"+----+----+----+",
];
assert_batches_sorted_eq!(expected, &read);
}

#[tokio::test]
async fn evolved_schema_projection() {
let c1: ArrayRef =
Expand All @@ -1061,10 +1161,15 @@ mod tests {
let batch2 = create_batch(vec![("c3", c3), ("c2", c2), ("c1", c1), ("c4", c4)]);

// read/write them files:
let read =
round_trip_to_parquet(vec![batch1, batch2], Some(vec![0, 3]), None, None)
.await
.unwrap();
let read = round_trip_to_parquet(
vec![batch1, batch2],
Some(vec![0, 3]),
None,
None,
false,
)
.await
.unwrap();
let expected = vec![
"+-----+-----+",
"| c1 | c4 |",
Expand Down Expand Up @@ -1102,9 +1207,10 @@ mod tests {
let filter = col("c3").eq(lit(0_i8));

// read/write them files:
let read = round_trip_to_parquet(vec![batch1, batch2], None, None, Some(filter))
.await
.unwrap();
let read =
round_trip_to_parquet(vec![batch1, batch2], None, None, Some(filter), false)
.await
.unwrap();

// Predicate should prune all row groups
assert_eq!(read.len(), 0);
Expand All @@ -1123,12 +1229,13 @@ mod tests {
// batch2: c2(int64)
let batch2 = create_batch(vec![("c2", c2)]);

let filter = col("c2").eq(lit(0_i64));
let filter = col("c2").eq(lit(1_i64));

// read/write them files:
let read = round_trip_to_parquet(vec![batch1, batch2], None, None, Some(filter))
.await
.unwrap();
let read =
round_trip_to_parquet(vec![batch1, batch2], None, None, Some(filter), false)
.await
.unwrap();

// This does not look correct since the "c2" values in the result do not in fact match the predicate `c2 == 0`
// but parquet pruning is not exact. If the min/max values are not defined (which they are not in this case since the it is
Expand All @@ -1139,14 +1246,48 @@ mod tests {
"+-----+----+",
"| c1 | c2 |",
"+-----+----+",
"| Foo | |",
"| | |",
"| | |",
"| | 1 |",
"| | 2 |",
"| Foo | |",
"| bar | |",
"+-----+----+",
];
assert_batches_sorted_eq!(expected, &read);
}

#[tokio::test]
async fn evolved_schema_disjoint_schema_filter_with_pushdown() {
let c1: ArrayRef =
Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));

let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None]));

// batch1: c1(string)
let batch1 = create_batch(vec![("c1", c1.clone())]);

// batch2: c2(int64)
let batch2 = create_batch(vec![("c2", c2)]);

let filter = col("c2").eq(lit(1_i64));

// read/write them files:
let read =
round_trip_to_parquet(vec![batch1, batch2], None, None, Some(filter), true)
.await
.unwrap();

let expected = vec![
"+----+----+",
"| c1 | c2 |",
"+----+----+",
"| | 1 |",
"+----+----+",
];
assert_batches_sorted_eq!(expected, &read);
}

#[tokio::test]
async fn evolved_schema_incompatible_types() {
let c1: ArrayRef =
Expand Down Expand Up @@ -1181,6 +1322,7 @@ mod tests {
None,
Some(Arc::new(schema)),
None,
false,
)
.await;
assert_contains!(read.unwrap_err().to_string(),
Expand Down

0 comments on commit 9fbee1a

Please sign in to comment.