diff --git a/datafusion/core/src/datasource/view.rs b/datafusion/core/src/datasource/view.rs index 34a427030ae..357bf8505fc 100644 --- a/datafusion/core/src/datasource/view.rs +++ b/datafusion/core/src/datasource/view.rs @@ -474,8 +474,7 @@ mod tests { let formatted = arrow::util::pretty::pretty_format_batches(&plan) .unwrap() .to_string(); - // TODO: limit_push_down support SubqueryAlias - assert!(formatted.contains("GlobalLimitExec: skip=0, fetch=10")); + assert!(formatted.contains("ParquetExec: limit=Some(10)")); Ok(()) } diff --git a/datafusion/optimizer/src/limit_push_down.rs b/datafusion/optimizer/src/limit_push_down.rs index 28b868fd643..40461647b60 100644 --- a/datafusion/optimizer/src/limit_push_down.rs +++ b/datafusion/optimizer/src/limit_push_down.rs @@ -20,9 +20,7 @@ use crate::{utils, OptimizerConfig, OptimizerRule}; use datafusion_common::Result; use datafusion_expr::{ - logical_plan::{ - Join, JoinType, Limit, LogicalPlan, Projection, Sort, TableScan, Union, - }, + logical_plan::{Join, JoinType, Limit, LogicalPlan, Sort, TableScan, Union}, CrossJoin, }; use std::sync::Arc; @@ -119,7 +117,8 @@ impl OptimizerRule for LimitPushDown { }; let skip = limit.skip; - let plan = match &*limit.input { + let child_plan = &*limit.input; + let plan = match child_plan { LogicalPlan::TableScan(scan) => { let limit = if fetch != 0 { fetch + skip } else { 0 }; let new_input = LogicalPlan::TableScan(TableScan { @@ -132,21 +131,6 @@ impl OptimizerRule for LimitPushDown { }); plan.with_new_inputs(&[new_input])? } - - LogicalPlan::Projection(projection) => { - let new_input = LogicalPlan::Limit(Limit { - skip, - fetch: Some(fetch), - input: Arc::new((*projection.input).clone()), - }); - // Push down limit directly (projection doesn't change number of rows) - LogicalPlan::Projection(Projection::try_new_with_schema( - projection.expr.clone(), - Arc::new(new_input), - projection.schema.clone(), - )?) - } - LogicalPlan::Union(union) => { let new_inputs = union .inputs @@ -208,6 +192,14 @@ impl OptimizerRule for LimitPushDown { }); plan.with_new_inputs(&[new_sort])? } + LogicalPlan::Projection(_) | LogicalPlan::SubqueryAlias(_) => { + // commute + let new_limit = + plan.with_new_inputs(&[ + (*(child_plan.inputs().get(0).unwrap())).clone() + ])?; + child_plan.with_new_inputs(&[new_limit])? + } _ => plan.clone(), }; @@ -773,4 +765,21 @@ mod test { assert_optimized_plan_eq(&plan, expected) } + + #[test] + fn push_down_subquery_alias() -> Result<()> { + let scan = test_table_scan()?; + + let plan = LogicalPlanBuilder::from(scan) + .alias("a")? + .limit(0, Some(1))? + .limit(1000, None)? + .build()?; + + let expected = "SubqueryAlias: a\ + \n Limit: skip=1000, fetch=0\ + \n TableScan: test, fetch=0"; + + assert_optimized_plan_eq(&plan, expected) + } }