Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change empty projection to not add an extra column #7114

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions datafusion/core/tests/sqllogictests/test_files/json.slt
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,13 @@ EXPLAIN SELECT count(*) from json_test
----
logical_plan
Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1))]]
--TableScan: json_test projection=[a]
--TableScan: json_test projection=[]
physical_plan
AggregateExec: mode=Final, gby=[], aggr=[COUNT(UInt8(1))]
--CoalescePartitionsExec
----AggregateExec: mode=Partial, gby=[], aggr=[COUNT(UInt8(1))]
------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
--------JsonExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/2.json]]}, projection=[a]
--------JsonExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/2.json]]}

query error DataFusion error: Schema error: No field named mycol\.
SELECT mycol FROM single_nan
Expand Down
6 changes: 3 additions & 3 deletions datafusion/core/tests/sqllogictests/test_files/subquery.slt
Original file line number Diff line number Diff line change
Expand Up @@ -688,7 +688,7 @@ logical_plan
Projection: __scalar_sq_1.COUNT(UInt8(1)) AS b
--SubqueryAlias: __scalar_sq_1
----Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1))]]
------TableScan: t1 projection=[t1_id]
------TableScan: t1 projection=[]

#simple_uncorrelated_scalar_subquery2
query TT
Expand All @@ -699,10 +699,10 @@ Projection: __scalar_sq_1.COUNT(UInt8(1)) AS b, __scalar_sq_2.COUNT(Int64(1)) AS
--Left Join:
----SubqueryAlias: __scalar_sq_1
------Aggregate: groupBy=[[]], aggr=[[COUNT(UInt8(1))]]
--------TableScan: t1 projection=[t1_id]
--------TableScan: t1 projection=[]
----SubqueryAlias: __scalar_sq_2
------Aggregate: groupBy=[[]], aggr=[[COUNT(Int64(1))]]
--------TableScan: t2 projection=[t2_id]
--------TableScan: t2 projection=[]

query II
select (select count(*) from t1) as b, (select count(1) from t2)
Expand Down
54 changes: 15 additions & 39 deletions datafusion/optimizer/src/push_down_projection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,26 +151,12 @@ impl OptimizerRule for PushDownProjection {
// like: TableScan: t1 projection=[bool_col, int_col], full_filters=[t1.id = Int32(1)]
// projection=[bool_col, int_col] don't contain `ti.id`.
exprlist_to_columns(&scan.filters, &mut used_columns)?;
if projection_is_empty {
used_columns
.insert(scan.projected_schema.fields()[0].qualified_column());
push_down_scan(&used_columns, scan, true)?
} else {
for expr in projection.expr.iter() {
expr_to_columns(expr, &mut used_columns)?;
}
let new_scan = push_down_scan(&used_columns, scan, true)?;

plan.with_new_inputs(&[new_scan])?
for expr in projection.expr.iter() {
expr_to_columns(expr, &mut used_columns)?;
}
}
LogicalPlan::Values(values) if projection_is_empty => {
let first_col =
Expr::Column(values.schema.fields()[0].qualified_column());
LogicalPlan::Projection(Projection::try_new(
vec![first_col],
Arc::new(child_plan.clone()),
)?)
let new_scan = push_down_scan(&used_columns, scan, true)?;

plan.with_new_inputs(&[new_scan])?
}
LogicalPlan::Union(union) => {
let mut required_columns = HashSet::new();
Expand All @@ -179,8 +165,7 @@ impl OptimizerRule for PushDownProjection {
// Because if push empty down, children may output different columns.
if required_columns.is_empty() {
required_columns.insert(union.schema.fields()[0].qualified_column());
}
// we don't push down projection expr, we just prune columns, so we just push column
} // we don't push down projection expr, we just prune columns, so we just push column
// because push expr may cause more cost.
let projection_column_exprs = get_expr(&required_columns, &union.schema)?;
let mut inputs = Vec::with_capacity(union.inputs.len());
Expand Down Expand Up @@ -495,23 +480,14 @@ fn push_down_scan(
.filter_map(ArrowResult::ok)
.collect();

if projection.is_empty() {
if has_projection && !schema.fields().is_empty() {
// Ensure that we are reading at least one column from the table in case the query
// does not reference any columns directly such as "SELECT COUNT(1) FROM table",
// except when the table is empty (no column)
projection.insert(0);
} else {
// for table scan without projection, we default to return all columns
projection = scan
.source
.schema()
.fields()
.iter()
.enumerate()
.map(|(i, _)| i)
.collect::<BTreeSet<usize>>();
}
if projection.is_empty() && !has_projection {
// for table scan without projection, we default to return all columns
projection = schema
.fields()
.iter()
.enumerate()
.map(|(i, _)| i)
.collect::<BTreeSet<usize>>();
}

// Building new projection from BTreeSet
Expand Down Expand Up @@ -974,7 +950,7 @@ mod tests {

let expected = "\
Projection: Int32(1) AS a\
\n TableScan: test projection=[a]";
\n TableScan: test projection=[]";

assert_optimized_plan_eq(&plan, expected)
}
Expand Down