Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize where exists sub-queries into aggregate and join #2813

Closed
wants to merge 40 commits into from

Conversation

avantgardnerio
Copy link
Contributor

Which issue does this PR close?

Closes #160.

Rationale for this change

In order to evaluate DataFusion as a candidate query engine, users need to be able to run industry standard benchmarks like TPC-H. Query 4 is a good initial candidate, because it is being blocked only by a relatively simple optimization rule to turn exists subqueries into joins.

This PR includes the minimum necessary changes to get Query 4 passing, but I believe this is a generalizable approach that will work for the remaining queries in the TPC-H suite being blocked by subquery-related issues.

I wanted to PR early to start the conversation, but I intend to either submit subsequent PRs generalizing this approach, or extend this PR until we have all the TPC-H subquery cases covered.

What changes are included in this PR?

An optimization rule for decorelating a narrowly defined set of queries. Those not explicitly covered will remain unaltered.

Are there any user-facing changes?

Any single-column join where exists correlated subquery should now work.

@github-actions github-actions bot added core Core datafusion crate optimizer Optimizer rules physical-expr Physical Expressions labels Jun 29, 2022
@@ -37,6 +37,8 @@ use std::{convert::TryFrom, fmt, iter::repeat, sync::Arc};

/// Represents a dynamically typed, nullable single value.
/// This is the single-valued counter-part of arrow’s `Array`.
/// https://arrow.apache.org/docs/python/api/datatypes.html
/// https://github.com/apache/arrow/blob/master/format/Schema.fbs#L354-L375
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, this was built upon #2797 . I'll turn this into a draft until that gets merged.

@@ -483,7 +484,37 @@ fn get_tpch_table_schema(table: &str) -> Schema {
Field::new("n_comment", DataType::Utf8, false),
]),

_ => unimplemented!(),
"supplier" => Schema::new(vec![
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add missing TPC-H tables to support testing those queries.

register_tpch_csv(&ctx, "orders").await?;
register_tpch_csv(&ctx, "lineitem").await?;

/*
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Annotate plan with variable names from optimizer code for cross-correlation.

// TODO: arbitrary expressions
Expr::Exists { subquery, negated } => {
if *negated {
return Ok(plan.clone());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In any case of doubt, fall back to skipping optimization, following the "do no harm" rule.


// Only operate if one column is present and the other closed upon from outside scope
let found: Vec<_> = cols.intersection(&fields).map(|it| (*it).clone()).collect();
let closed_upon: Vec<_> = cols.difference(&fields).map(|it| (*it).clone()).collect();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should truly resolve closed-upon scope here, instead of assuming if it's not in the present scope it must exist elsewhere. Queries will fail either way, but this could cause the error messages to be significantly more difficult for users to debug.

@avantgardnerio avantgardnerio marked this pull request as draft June 29, 2022 20:18
Comment on lines 131 to 135
let group_expr = vec![Expr::Column(found.as_str().into())];
let aggr_expr: Vec<Expr> = vec![];
let join_keys = (c_col.clone(), f_col.clone());
let right = LogicalPlanBuilder::from((*filter.input).clone())
.aggregate(group_expr, aggr_expr)?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could just use distinct rather than create the aggregate. It is semantically equivalent and result in a simpler logical plan. It will get translated into an aggregate in the physical plan.

Suggested change
let group_expr = vec![Expr::Column(found.as_str().into())];
let aggr_expr: Vec<Expr> = vec![];
let join_keys = (c_col.clone(), f_col.clone());
let right = LogicalPlanBuilder::from((*filter.input).clone())
.aggregate(group_expr, aggr_expr)?
let join_keys = (c_col.clone(), f_col.clone());
let right = LogicalPlanBuilder::from((*filter.input).clone())
.distinct()?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I may have misunderstood. I thought this was grouping on all the columns but it looks that is not the case so please disregard this suggestion.

@codecov-commenter
Copy link

Codecov Report

Merging #2813 (858b284) into master (839a618) will decrease coverage by 0.01%.
The diff coverage is 72.68%.

@@            Coverage Diff             @@
##           master    #2813      +/-   ##
==========================================
- Coverage   85.20%   85.19%   -0.02%     
==========================================
  Files         274      276       +2     
  Lines       48666    48848     +182     
==========================================
+ Hits        41468    41616     +148     
- Misses       7198     7232      +34     
Impacted Files Coverage Δ
datafusion/common/src/scalar.rs 74.94% <ø> (+0.11%) ⬆️
datafusion/core/tests/sql/mod.rs 93.25% <0.00%> (-4.39%) ⬇️
...tafusion/physical-expr/src/expressions/datetime.rs 59.21% <64.40%> (+26.55%) ⬆️
datafusion/optimizer/src/subquery_decorrelate.rs 82.85% <82.85%> (ø)
datafusion/core/tests/sql/subqueries.rs 88.23% <88.23%> (ø)
datafusion/core/src/execution/context.rs 79.02% <100.00%> (+0.02%) ⬆️
datafusion/core/tests/sql/timestamp.rs 100.00% <100.00%> (ø)
datafusion/optimizer/src/simplify_expressions.rs 82.04% <100.00%> (+0.02%) ⬆️
datafusion/core/src/physical_plan/metrics/value.rs 86.93% <0.00%> (-0.51%) ⬇️
... and 3 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 839a618...858b284. Read the comment docs.

Comment on lines +76 to +80
let filter = if let LogicalPlan::Filter(f) = sub_input {
f
} else {
return Ok(plan.clone());
};
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it might be more idiomatic to use a match for these patterns.

Suggested change
let filter = if let LogicalPlan::Filter(f) = sub_input {
f
} else {
return Ok(plan.clone());
};
let filter = match sub_input {
LogicalPlan::Filter(f) => f,
_ => return Ok(plan.clone())
};

Comment on lines +87 to +92
let fields: HashSet<_> = sub_input
.schema()
.fields()
.iter()
.map(|f| f.name())
.collect();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should be able to get a hashset of qualified names like this:

Suggested change
let fields: HashSet<_> = sub_input
.schema()
.fields()
.iter()
.map(|f| f.name())
.collect();
let fields = HashSet::from_iter(sub_input
.schema()
.field_names());

@andygrove
Copy link
Member

Thanks @avantgardnerio. This looks good overall and the logic is easy to follow. I will review again when #2797 is merged.

@avantgardnerio
Copy link
Contributor Author

I double checked with cargo run --release --bin tpch -- benchmark datafusion --iterations 3 --path ./data --format tbl --query 4 --batch-size 4096 and observed that this didn't resolve "the real" query 4, just my stripped down minimal failing test case.

I made some adjustments, and I was able to run query 4 with the presently committed code:

+-----------------+-------------+
| o_orderpriority | order_count |
+-----------------+-------------+
| 1-URGENT        | 10594       |
| 2-HIGH          | 10476       |
| 3-MEDIUM        | 10410       |
| 4-NOT SPECIFIED | 10556       |
| 5-LOW           | 10487       |
+-----------------+-------------+
Query 4 iteration 2 took 43617.9 ms and returned 5 rows
Query 4 avg time: 45785.94 ms

This is slow, but matches my postgres results:

+---------------+-----------+
|o_orderpriority|order_count|
+---------------+-----------+
|1-URGENT       |10594      |
|2-HIGH         |10476      |
|3-MEDIUM       |10410      |
|4-NOT SPECIFIED|10556      |
|5-LOW          |10487      |
+---------------+-----------+

@avantgardnerio
Copy link
Contributor Author

The remaining failing queries seem to fall into two categories:

  1. Ones that fail because we only handle where exists with this PR (not where x < (subquery) expressions)
  2. Ones that fail due to multiple subqueries in the same filter expression (which means we have to run this iteratively or something)

@avantgardnerio avantgardnerio mentioned this pull request Jul 1, 2022
@avantgardnerio
Copy link
Contributor Author

Probably duplicated work with #2421

@avantgardnerio
Copy link
Contributor Author

Closed in favor of #2885

@avantgardnerio avantgardnerio deleted the bg_tpch_q4 branch July 19, 2022 11:39
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core datafusion crate optimizer Optimizer rules physical-expr Physical Expressions
Projects
None yet
Development

Successfully merging this pull request may close these issues.

TPC-H Query 4
3 participants