Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Relax join keys constraint from Column to any physical expression for physical join operators #8991

Merged
merged 14 commits into from Jan 29, 2024

Conversation

viirya
Copy link
Member

@viirya viirya commented Jan 25, 2024

Which issue does this PR close?

Closes #.

Rationale for this change

Currently the join keys of join operators like SortMergeJoin are restricted to be Column. But it is commonly we use expressions (e.g., l_col + 1 = r_col + 2) other than simply columns as join keys. From the query plan, DataFusion seems to add additional Project under join operator which projects the expressions into columns. So the above join operators take join keys as columns.

However, in other query engines, e.g., Spark, its query plan doesn't have the additional projection but its join operators directly take general expressions as join keys. (note that by adding additional projection before join in Spark it means more data to be shuffled/sorted which can be bad for performance)

That means if we cannot delegate such join operators to DataFusion physical join operators which require join keys must be columns.

This patch tries to relax this join keys constraint of physical join operators. So we can construct DataFusion physical join operator using general expressions as join keys.

This patch doesn't change how DataFusion plans the join operators. I.e., DataFusion still plans a join operation using non-column join keys into projection + join operator with columns. (We probably can remove this additional projection later if it also adds additional cost to DataFusion. Currently I'm not sure if/how DataFusion plans partitioning for the join operators.)

What changes are included in this PR?

Are these changes tested?

Are there any user-facing changes?

@github-actions github-actions bot added physical-expr Physical Expressions core Core datafusion crate labels Jan 25, 2024
@viirya viirya changed the title Relax join keys constraint from Column to any physical expression Relax join keys constraint from Column to any physical expression for physical join operators Jan 25, 2024
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @viirya -- I went through this PR carefully and I agree it makes sense to me. In many ways I think it is cleaner now as the Joins now don't handle Columns specially and instead just have PhysicalExprs

Given this is step one, of likely several steps needed to support using DataFusion for spark Joins, I wonder if we should create an EPIC style ticket that lists known steps (e.g. "investigate removing projection before join inputs")

@jackwener and @liukun4515 and @mustafasrepo perhaps you have some time to review this PR as well.

Also, @korowa , is another of our resident Join experts, perhaps he has some time to weigh in

BTW for context, I believe this feature is in support of the native spark execution engine apache/datafusion-comet#1 which is in the process of being donated to Apache.

I am running some basic benchmarks now to confirm this change doesn't impact performance (I don't expect that it does)

@@ -278,7 +277,7 @@ pub struct HashJoinExec {
/// right (probe) side which are filtered by the hash table
pub right: Arc<dyn ExecutionPlan>,
/// Set of equijoin columns from the relations: `(left_col, right_col)`
pub on: Vec<(Column, Column)>,
pub on: Vec<(PhysicalExprRef, PhysicalExprRef)>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

})
.unzip();
let (left_expr, right_expr) =
on.iter().map(|(l, r)| (l.clone(), r.clone())).unzip();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In many ways this change makes the code cleaner as it doesn't have to special case Column so much

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here are my benchmark results (no change)

--------------------
Benchmark tpch_mem.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┓
┃ Query        ┃ main_base ┃ relex_sort_merge_join_keys ┃       Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━┩
│ QQuery 1     │  206.84ms │                   213.29ms │    no change │
│ QQuery 2     │   45.97ms │                    45.34ms │    no change │
│ QQuery 3     │   76.92ms │                    74.94ms │    no change │
│ QQuery 4     │   73.96ms │                    71.63ms │    no change │
│ QQuery 5     │  122.80ms │                   121.84ms │    no change │
│ QQuery 6     │   16.43ms │                    16.32ms │    no change │
│ QQuery 7     │  322.01ms │                   315.56ms │    no change │
│ QQuery 8     │   80.29ms │                    80.99ms │    no change │
│ QQuery 9     │  125.44ms │                   125.68ms │    no change │
│ QQuery 10    │  154.89ms │                   152.95ms │    no change │
│ QQuery 11    │   35.15ms │                    34.23ms │    no change │
│ QQuery 12    │   70.34ms │                    69.77ms │    no change │
│ QQuery 13    │   87.38ms │                    85.58ms │    no change │
│ QQuery 14    │   27.04ms │                    26.31ms │    no change │
│ QQuery 15    │   61.48ms │                    60.55ms │    no change │
│ QQuery 16    │   45.86ms │                    45.40ms │    no change │
│ QQuery 17    │  149.32ms │                   145.65ms │    no change │
│ QQuery 18    │  448.61ms │                   456.14ms │    no change │
│ QQuery 19    │   61.49ms │                    64.97ms │ 1.06x slower │
│ QQuery 20    │  115.97ms │                   115.43ms │    no change │
│ QQuery 21    │  355.94ms │                   360.97ms │    no change │
│ QQuery 22    │   29.36ms │                    29.63ms │    no change │
└──────────────┴───────────┴────────────────────────────┴──────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━┓
┃ Benchmark Summary                         ┃           ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━┩
│ Total Time (main_base)                    │ 2713.49ms │
│ Total Time (relex_sort_merge_join_keys)   │ 2713.16ms │
│ Average Time (main_base)                  │  123.34ms │
│ Average Time (relex_sort_merge_join_keys) │  123.33ms │
│ Queries Faster                            │         0 │
│ Queries Slower                            │         1 │
│ Queries with No Change                    │        21 │
└───────────────────────────────────────────┴───────────┘

@viirya
Copy link
Member Author

viirya commented Jan 26, 2024

Thank you @alamb

Note that "investigate removing projection before join input" is not a necessary step for join support in Comet, because Comet doesn't depend on DataFusion's query planning. But definitely we have other steps to work on DataFusion Join.

BTW for context, I believe this feature is in support of the native spark execution engine apache/datafusion-comet#1 which is in the process of being donated to Apache.

Yes. As mentioned the PR description, Spark join operators have general expressions as join keys instead of columns only. We need to align the physical operator join key constraints to make it possible for more query cases.

@alamb
Copy link
Contributor

alamb commented Jan 27, 2024

Note that "investigate removing projection before join input" is not a necessary step for join support in Comet, because Comet doesn't depend on DataFusion's query planning. But definitely we have other steps to work on DataFusion Join.

That makes sense. I think having DataFusion planned joins and Spark planned joins going through different codepaths not only potentially duplicates code it makes it more lkely there are discrepancies that may lead to bugs (e.g. it may be hard to test the spark path as part of the datafusion tests)

For that reason I suggest we make a ticket listing the overall project where we can collect other follow on steps as we come on tehem (not that you or Comet people will actually do the work, other contributors are likely to be able to help too I suspect).

@viirya
Copy link
Member Author

viirya commented Jan 28, 2024

For the creation of an EPIC style ticket, I am hesitate about it because for now I don't have all steps known to do. I may have next one step/work to do as I saw it during experimenting the integration of Spark and DataFusion with Comet on SortMergeJoin. But this is basically new attempt and I am not sure what issues could be encountered after next. Currently I'm working on it in an incremental style to solve each issue happened/seen during the integration.

@alamb
Copy link
Contributor

alamb commented Jan 28, 2024

For the creation of an EPIC style ticket, I am hesitate about it because for now I don't have all steps known to do

This makes sense and in fact just figuring out all the steps to do is typically a large undertaking itself

I have had reasonable luck with partially formed Epic that we incrementally and collaboratively fill out (e.g. #8916 and #3148 )

If you don't object, I will file such a ticket and we can see how useful / not useful it is.

@alamb
Copy link
Contributor

alamb commented Jan 28, 2024

BTW the reason I am pushing on the epic ticket is twofold:

  1. I want to support comet as I think it will drive DataFusion significantly forward and
  2. I want to be better about making things that are important to committers more visible as I think there is demand across the larger community to help when there are well understood tasks

@viirya
Copy link
Member Author

viirya commented Jan 28, 2024

This makes sense and in fact just figuring out all the steps to do is typically a large undertaking itself

I have had reasonable luck with partially formed Epic that we incrementally and collaboratively fill out (e.g. #8916 and #3148 )

If you don't object, I will file such a ticket and we can see how useful / not useful it is.

No, I don't object it at all, I just feel I have no too much steps to fill there. 😄 Hope it could be useful. 👍

@metesynnada
Copy link
Contributor

I will review this as soon as possible, particularly the effects on SHJ.

Comment on lines 416 to 425
let mut columns = vec![];
left.apply(&mut |expr| {
Ok({
if let Some(column) = expr.as_any().downcast_ref::<Column>() {
columns.push(column.clone());
}
VisitRecursion::Continue
})
})
.unwrap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this section, I think you can use collect_columns util.

Comment on lines 436 to 447
let mut columns = vec![];
right
.apply(&mut |expr| {
Ok({
if let Some(column) = expr.as_any().downcast_ref::<Column>() {
columns.push(column.clone());
}
VisitRecursion::Continue
})
})
.unwrap();
columns
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to comment above

@@ -1886,8 +1887,8 @@ pub(crate) mod tests {

// Join on (a == b1)
let join_on = vec![(
Column::new_with_schema("a", &schema()).unwrap(),
Column::new_with_schema("b1", &right.schema()).unwrap(),
Arc::new(Column::new_with_schema("a", &schema()).unwrap()) as _,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can use col function (which is in the filedatafusion/physical_expr/src/expressions/column.rs) here
as col("a", &schema()).unwrap() instead of Arc::new(Column::new_with_schema("a", &schema()).unwrap()) as _. However, there are lots of changes like this. And it is purely stylistic. This change is not important.

Copy link
Contributor

@mustafasrepo mustafasrepo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @viirya for this PR. I left some minor comments, however they are not essential. LGTM!.

@alamb
Copy link
Contributor

alamb commented Jan 29, 2024

I merged this PR up from main to pick up the clippy fix

@alamb alamb merged commit d594e62 into apache:main Jan 29, 2024
22 checks passed
@alamb
Copy link
Contributor

alamb commented Jan 29, 2024

Thanks again @viirya

@viirya
Copy link
Member Author

viirya commented Jan 29, 2024

Thank you @alamb @mustafasrepo

@alamb
Copy link
Contributor

alamb commented Jan 29, 2024

I wrote up #9056 (comment)

While doing so, I wonder if comet could use a custom PhysicalOptimzer pass to add a ProjectionExec 🤔

@viirya
Copy link
Member Author

viirya commented Jan 29, 2024

While doing so, I wonder if comet could use a custom PhysicalOptimzer pass to add a ProjectionExec 🤔

As I mentioned in the description, adding an additional Projection under Join (e.g., SortMergeJoin) doesn't make a lot sense for Spark due to its distribution nature. An additional Projection means we materialize extra columns (the join key expressions) earlier and which could lead to additional data in shuffle/sort.

So what this does is to relax join key constraints in DataFusion, for example now physical SortMergeJoin can take expressions other than Column as join keys. In Spark/Comet side, now we can translate Spark SortMergeJoin directly to DataFusion SortMergeJoin. We don't need to change Spark physical query structure.

@alamb
Copy link
Contributor

alamb commented Jan 29, 2024

As I mentioned in the description, adding an additional Projection under Join (e.g., SortMergeJoin) doesn't make a lot sense for Spark due to its distribution nature.

🤔

(note that by adding additional projection before join in Spark it means more data to be shuffled/sorted which can be bad for performance)

I don't understand this

If the join is on lcol_1 + lcol_2 = rcol_1 + rcol_2 the plan will have to materialize four columns lcol_1, lcol_2, rcol_1 and rcol_2, and would have to feed all 4 of those columns to the join

wouldn't it actually make more sense to compute the expressions prior to the networked shuffle so only 2 columns of data (lcol_1 + lcol_2 and rcol_1 + rcol_2) need to be sent, rather than the 4 original columns 🤔

@viirya
Copy link
Member Author

viirya commented Jan 29, 2024

wouldn't it actually make more sense to compute the expressions prior to the networked shuffle so only 2 columns of data (lcol_1 + lcol_2 and rcol_1 + rcol_2) need to be sent, rather than the 4 original columns 🤔

Hmm, except for joining keys, I think you still can list other columns (e.g., the original 4 columns) into selection list? So they are not always able to be removed from shuffle, I think?

For example,

EXPLAIN SELECT test1.a, test1.b, test2.a, test2.b FROM test1 JOIN test2 ON test1.a + test1.b = test2.a + test2.b;
----
logical_plan
Inner Join: test1.a + test1.b = test2.a + test2.b
--TableScan: test1 projection=[a, b]
--TableScan: test2 projection=[a, b]
physical_plan
ProjectionExec: expr=[a@0 as a, b@1 as b, a@3 as a, b@4 as b]
--CoalesceBatchesExec: target_batch_size=8192
----HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(test1.a + test1.b@2, test2.a + test2.b@2)]
------ProjectionExec: expr=[a@0 as a, b@1 as b, a@0 + b@1 as test1.a + test1.b]
--------MemoryExec: partitions=1, partition_sizes=[1]
------ProjectionExec: expr=[a@0 as a, b@1 as b, a@0 + b@1 as test2.a + test2.b]
--------MemoryExec: partitions=1, partition_sizes=[1]

@alamb
Copy link
Contributor

alamb commented Jan 29, 2024

Hmm, except for joining keys, I think you still can list other columns (e.g., the original 4 columns) into selection list? So they are not always able to be removed from shuffle, I think?

Sure, if the columns are used elsewhere in the plan, they can't be removed. I was thinking of the case when they aren't used anywhere else

However, I am not sure how often that happens in the real world

Maybe a range join 🤔

SELECT ...
FROM stocks JOIN splits ON (
  stocks.symbol = splits.symbol AND 
  stocks.ts < splits.max_time AND stocks.ts > splits.min_time
)

But I suppose the expressions are't equijoin predicates anyways 🤔

@viirya
Copy link
Member Author

viirya commented Jan 29, 2024

So in Spark the physical query plan doesn't have additional Projection under Join operator for the join key expressions, in generally we would like to keep it as it without changing the query plan structure to avoid introducing unknown issues or performance regressions. As this PR relaxes the join key constraints for DataFusion Join operators, we don't need to add such Projection during translating Spark query plan in Comet.

@alamb
Copy link
Contributor

alamb commented Jan 29, 2024

As this PR relaxes the join key constraints for DataFusion Join operators, we don't need to add such Projection during translating Spark query plan in Comet.

Yeah, but presumably you will have to update the Join operators in DataFusion to take Expressions (rather than Columns) and evaluate them on the inputs

@viirya
Copy link
Member Author

viirya commented Jan 29, 2024

Yeah, but presumably you will have to update the Join operators in DataFusion to take Expressions (rather than Columns) and evaluate them on the inputs

I think this is already done by this PR. The Join operators in DataFusion actually already evaluate join key expressions (previously they are Columns). They are not simply taking the values of columns from input arrays but invoking evaluate physical expression API. So once we relax the join key types to general expression, it does what we need, i.e., evaluating the provided join key expressions. It saves me some works to update the operators as you said.

@jackwener
Copy link
Member

jackwener commented Jan 30, 2024

I review this PR and read comment above, I think use expr instead of column in condition is ok for me.

Some databases materialize expressions into columns in projects mainly to facilitate the calculation of data distributions. So it should be noted that we can correctly calculate the data distribution of expressions in Join Condition

@viirya viirya mentioned this pull request Feb 12, 2024
10 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core datafusion crate physical-expr Physical Expressions
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants