Skip to content

Commit

Permalink
API-break: Support SubqueryAlias and remove Alias in Projection (#…
Browse files Browse the repository at this point in the history
…4333)

* subqueryAlias replace alias-projection

* remove alias in projection.

* correct plan in test

* correct tpch plan

* correct plan in UT.

* fix conflict and add doc for `with_alias`
  • Loading branch information
jackwener committed Nov 27, 2022
1 parent da54fa5 commit ad3df7d
Show file tree
Hide file tree
Showing 32 changed files with 607 additions and 591 deletions.
17 changes: 9 additions & 8 deletions benchmarks/expected-plans/q11.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@ Sort: value DESC NULLS FIRST
TableScan: supplier projection=[s_suppkey, s_nationkey]
Filter: nation.n_name = Utf8("GERMANY")
TableScan: nation projection=[n_nationkey, n_name]
Projection: CAST(SUM(partsupp.ps_supplycost * partsupp.ps_availqty) AS Float64) * Float64(0.0001) AS __value, alias=__sq_1
Aggregate: groupBy=[[]], aggr=[[SUM(CAST(partsupp.ps_supplycost AS Decimal128(26, 2)) * CAST(partsupp.ps_availqty AS Decimal128(26, 2)))]]
Inner Join: supplier.s_nationkey = nation.n_nationkey
Inner Join: partsupp.ps_suppkey = supplier.s_suppkey
TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_availqty, ps_supplycost]
TableScan: supplier projection=[s_suppkey, s_nationkey]
Filter: nation.n_name = Utf8("GERMANY")
TableScan: nation projection=[n_nationkey, n_name]
SubqueryAlias: __sq_1
Projection: CAST(SUM(partsupp.ps_supplycost * partsupp.ps_availqty) AS Float64) * Float64(0.0001) AS __value
Aggregate: groupBy=[[]], aggr=[[SUM(CAST(partsupp.ps_supplycost AS Decimal128(26, 2)) * CAST(partsupp.ps_availqty AS Decimal128(26, 2)))]]
Inner Join: supplier.s_nationkey = nation.n_nationkey
Inner Join: partsupp.ps_suppkey = supplier.s_suppkey
TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_availqty, ps_supplycost]
TableScan: supplier projection=[s_suppkey, s_nationkey]
Filter: nation.n_name = Utf8("GERMANY")
TableScan: nation projection=[n_nationkey, n_name]
17 changes: 9 additions & 8 deletions benchmarks/expected-plans/q13.txt
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
Sort: custdist DESC NULLS FIRST, c_orders.c_count DESC NULLS FIRST
Projection: c_orders.c_count, COUNT(UInt8(1)) AS custdist
Aggregate: groupBy=[[c_orders.c_count]], aggr=[[COUNT(UInt8(1))]]
Projection: c_orders.COUNT(orders.o_orderkey) AS c_count, alias=c_orders
Projection: COUNT(orders.o_orderkey), alias=c_orders
Projection: COUNT(orders.o_orderkey)
Aggregate: groupBy=[[customer.c_custkey]], aggr=[[COUNT(orders.o_orderkey)]]
Left Join: customer.c_custkey = orders.o_custkey
TableScan: customer projection=[c_custkey]
Filter: orders.o_comment NOT LIKE Utf8("%special%requests%")
TableScan: orders projection=[o_orderkey, o_custkey, o_comment]
SubqueryAlias: c_orders
Projection: c_orders.COUNT(orders.o_orderkey) AS c_count
SubqueryAlias: c_orders
Projection: COUNT(orders.o_orderkey)
Aggregate: groupBy=[[customer.c_custkey]], aggr=[[COUNT(orders.o_orderkey)]]
Left Join: customer.c_custkey = orders.o_custkey
TableScan: customer projection=[c_custkey]
Filter: orders.o_comment NOT LIKE Utf8("%special%requests%")
TableScan: orders projection=[o_orderkey, o_custkey, o_comment]
25 changes: 14 additions & 11 deletions benchmarks/expected-plans/q15.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,21 @@ Sort: supplier.s_suppkey ASC NULLS LAST
Inner Join: revenue0.total_revenue = __sq_1.__value
Inner Join: supplier.s_suppkey = revenue0.supplier_no
TableScan: supplier projection=[s_suppkey, s_name, s_address, s_phone]
Projection: supplier_no, total_revenue, alias=revenue0
Projection: lineitem.l_suppkey AS supplier_no, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) AS total_revenue
Projection: lineitem.l_suppkey, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)
Aggregate: groupBy=[[lineitem.l_suppkey]], aggr=[[SUM(CAST(lineitem.l_extendedprice AS Decimal128(38, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4))) AS SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]]
Filter: lineitem.l_shipdate >= Date32("9496") AND lineitem.l_shipdate < Date32("9587")
TableScan: lineitem projection=[l_suppkey, l_extendedprice, l_discount, l_shipdate]
Projection: MAX(revenue0.total_revenue) AS __value, alias=__sq_1
Aggregate: groupBy=[[]], aggr=[[MAX(revenue0.total_revenue)]]
Projection: total_revenue, alias=revenue0
Projection: SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) AS total_revenue
Projection: SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)
SubqueryAlias: revenue0
Projection: supplier_no, total_revenue
Projection: lineitem.l_suppkey AS supplier_no, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) AS total_revenue
Projection: lineitem.l_suppkey, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)
Aggregate: groupBy=[[lineitem.l_suppkey]], aggr=[[SUM(CAST(lineitem.l_extendedprice AS Decimal128(38, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4))) AS SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]]
Filter: lineitem.l_shipdate >= Date32("9496") AND lineitem.l_shipdate < Date32("9587")
TableScan: lineitem projection=[l_suppkey, l_extendedprice, l_discount, l_shipdate]
SubqueryAlias: __sq_1
Projection: MAX(revenue0.total_revenue) AS __value
Aggregate: groupBy=[[]], aggr=[[MAX(revenue0.total_revenue)]]
SubqueryAlias: revenue0
Projection: total_revenue
Projection: SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) AS total_revenue
Projection: SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)
Aggregate: groupBy=[[lineitem.l_suppkey]], aggr=[[SUM(CAST(lineitem.l_extendedprice AS Decimal128(38, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4))) AS SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]]
Filter: lineitem.l_shipdate >= Date32("9496") AND lineitem.l_shipdate < Date32("9587")
TableScan: lineitem projection=[l_suppkey, l_extendedprice, l_discount, l_shipdate]
EmptyRelation
7 changes: 4 additions & 3 deletions benchmarks/expected-plans/q16.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ Sort: supplier_cnt DESC NULLS FIRST, part.p_brand ASC NULLS LAST, part.p_type AS
TableScan: partsupp projection=[ps_partkey, ps_suppkey]
Filter: part.p_brand != Utf8("Brand#45") AND part.p_type NOT LIKE Utf8("MEDIUM POLISHED%") AND part.p_size IN ([Int32(49), Int32(14), Int32(23), Int32(45), Int32(19), Int32(3), Int32(36), Int32(9)])
TableScan: part projection=[p_partkey, p_brand, p_type, p_size]
Projection: supplier.s_suppkey AS s_suppkey, alias=__sq_1
Filter: supplier.s_comment LIKE Utf8("%Customer%Complaints%")
TableScan: supplier projection=[s_suppkey, s_comment]
SubqueryAlias: __sq_1
Projection: supplier.s_suppkey AS s_suppkey
Filter: supplier.s_comment LIKE Utf8("%Customer%Complaints%")
TableScan: supplier projection=[s_suppkey, s_comment]
9 changes: 5 additions & 4 deletions benchmarks/expected-plans/q18.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ Sort: orders.o_totalprice DESC NULLS FIRST, orders.o_orderdate ASC NULLS LAST
TableScan: customer projection=[c_custkey, c_name]
TableScan: orders projection=[o_orderkey, o_custkey, o_totalprice, o_orderdate]
TableScan: lineitem projection=[l_orderkey, l_quantity]
Projection: lineitem.l_orderkey AS l_orderkey, alias=__sq_1
Filter: SUM(lineitem.l_quantity) > Decimal128(Some(30000),25,2)
Aggregate: groupBy=[[lineitem.l_orderkey]], aggr=[[SUM(lineitem.l_quantity)]]
TableScan: lineitem projection=[l_orderkey, l_quantity]
SubqueryAlias: __sq_1
Projection: lineitem.l_orderkey AS l_orderkey
Filter: SUM(lineitem.l_quantity) > Decimal128(Some(30000),25,2)
Aggregate: groupBy=[[lineitem.l_orderkey]], aggr=[[SUM(lineitem.l_quantity)]]
TableScan: lineitem projection=[l_orderkey, l_quantity]
21 changes: 11 additions & 10 deletions benchmarks/expected-plans/q2.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@ Sort: supplier.s_acctbal DESC NULLS FIRST, nation.n_name ASC NULLS LAST, supplie
TableScan: nation projection=[n_nationkey, n_name, n_regionkey]
Filter: region.r_name = Utf8("EUROPE")
TableScan: region projection=[r_regionkey, r_name]
Projection: partsupp.ps_partkey, MIN(partsupp.ps_supplycost) AS __value, alias=__sq_1
Aggregate: groupBy=[[partsupp.ps_partkey]], aggr=[[MIN(partsupp.ps_supplycost)]]
Inner Join: nation.n_regionkey = region.r_regionkey
Inner Join: supplier.s_nationkey = nation.n_nationkey
Inner Join: partsupp.ps_suppkey = supplier.s_suppkey
TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_supplycost]
TableScan: supplier projection=[s_suppkey, s_name, s_address, s_nationkey, s_phone, s_acctbal, s_comment]
TableScan: nation projection=[n_nationkey, n_name, n_regionkey]
Filter: region.r_name = Utf8("EUROPE")
TableScan: region projection=[r_regionkey, r_name]
SubqueryAlias: __sq_1
Projection: partsupp.ps_partkey, MIN(partsupp.ps_supplycost) AS __value
Aggregate: groupBy=[[partsupp.ps_partkey]], aggr=[[MIN(partsupp.ps_supplycost)]]
Inner Join: nation.n_regionkey = region.r_regionkey
Inner Join: supplier.s_nationkey = nation.n_nationkey
Inner Join: partsupp.ps_suppkey = supplier.s_suppkey
TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_supplycost]
TableScan: supplier projection=[s_suppkey, s_name, s_address, s_nationkey, s_phone, s_acctbal, s_comment]
TableScan: nation projection=[n_nationkey, n_name, n_regionkey]
Filter: region.r_name = Utf8("EUROPE")
TableScan: region projection=[r_regionkey, r_name]
27 changes: 15 additions & 12 deletions benchmarks/expected-plans/q20.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,18 @@ Sort: supplier.s_name ASC NULLS LAST
TableScan: supplier projection=[s_suppkey, s_name, s_address, s_nationkey]
Filter: nation.n_name = Utf8("CANADA")
TableScan: nation projection=[n_nationkey, n_name]
Projection: partsupp.ps_suppkey AS ps_suppkey, alias=__sq_2
Filter: CAST(partsupp.ps_availqty AS Float64) > __sq_3.__value
Inner Join: partsupp.ps_partkey = __sq_3.l_partkey, partsupp.ps_suppkey = __sq_3.l_suppkey
LeftSemi Join: partsupp.ps_partkey = __sq_1.p_partkey
TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_availqty]
Projection: part.p_partkey AS p_partkey, alias=__sq_1
Filter: part.p_name LIKE Utf8("forest%")
TableScan: part projection=[p_partkey, p_name]
Projection: lineitem.l_partkey, lineitem.l_suppkey, Float64(0.5) * CAST(SUM(lineitem.l_quantity) AS Float64) AS __value, alias=__sq_3
Aggregate: groupBy=[[lineitem.l_partkey, lineitem.l_suppkey]], aggr=[[SUM(lineitem.l_quantity)]]
Filter: lineitem.l_shipdate >= Date32("8766") AND lineitem.l_shipdate < Date32("9131")
TableScan: lineitem projection=[l_partkey, l_suppkey, l_quantity, l_shipdate]
SubqueryAlias: __sq_2
Projection: partsupp.ps_suppkey AS ps_suppkey
Filter: CAST(partsupp.ps_availqty AS Float64) > __sq_3.__value
Inner Join: partsupp.ps_partkey = __sq_3.l_partkey, partsupp.ps_suppkey = __sq_3.l_suppkey
LeftSemi Join: partsupp.ps_partkey = __sq_1.p_partkey
TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_availqty]
SubqueryAlias: __sq_1
Projection: part.p_partkey AS p_partkey
Filter: part.p_name LIKE Utf8("forest%")
TableScan: part projection=[p_partkey, p_name]
SubqueryAlias: __sq_3
Projection: lineitem.l_partkey, lineitem.l_suppkey, Float64(0.5) * CAST(SUM(lineitem.l_quantity) AS Float64) AS __value
Aggregate: groupBy=[[lineitem.l_partkey, lineitem.l_suppkey]], aggr=[[SUM(lineitem.l_quantity)]]
Filter: lineitem.l_shipdate >= Date32("8766") AND lineitem.l_shipdate < Date32("9131")
TableScan: lineitem projection=[l_partkey, l_suppkey, l_quantity, l_shipdate]
11 changes: 6 additions & 5 deletions benchmarks/expected-plans/q22.txt
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
Sort: custsale.cntrycode ASC NULLS LAST
Projection: custsale.cntrycode, COUNT(UInt8(1)) AS numcust, SUM(custsale.c_acctbal) AS totacctbal
Aggregate: groupBy=[[custsale.cntrycode]], aggr=[[COUNT(UInt8(1)), SUM(custsale.c_acctbal)]]
Projection: cntrycode, customer.c_acctbal, alias=custsale
SubqueryAlias: custsale
Projection: substr(customer.c_phone, Int64(1), Int64(2)) AS cntrycode, customer.c_acctbal
Filter: CAST(customer.c_acctbal AS Decimal128(19, 6)) > __sq_1.__value
CrossJoin:
LeftAnti Join: customer.c_custkey = orders.o_custkey
Filter: substr(customer.c_phone, Int64(1), Int64(2)) IN ([Utf8("13"), Utf8("31"), Utf8("23"), Utf8("29"), Utf8("30"), Utf8("18"), Utf8("17")])
TableScan: customer projection=[c_custkey, c_phone, c_acctbal]
TableScan: orders projection=[o_custkey]
Projection: AVG(customer.c_acctbal) AS __value, alias=__sq_1
Aggregate: groupBy=[[]], aggr=[[AVG(customer.c_acctbal)]]
Filter: customer.c_acctbal > Decimal128(Some(0),15,2) AND substr(customer.c_phone, Int64(1), Int64(2)) IN ([Utf8("13"), Utf8("31"), Utf8("23"), Utf8("29"), Utf8("30"), Utf8("18"), Utf8("17")])
TableScan: customer projection=[c_phone, c_acctbal]
SubqueryAlias: __sq_1
Projection: AVG(customer.c_acctbal) AS __value
Aggregate: groupBy=[[]], aggr=[[AVG(customer.c_acctbal)]]
Filter: customer.c_acctbal > Decimal128(Some(0),15,2) AND substr(customer.c_phone, Int64(1), Int64(2)) IN ([Utf8("13"), Utf8("31"), Utf8("23"), Utf8("29"), Utf8("30"), Utf8("18"), Utf8("17")])
TableScan: customer projection=[c_phone, c_acctbal]
4 changes: 2 additions & 2 deletions benchmarks/expected-plans/q7.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Sort: shipping.supp_nation ASC NULLS LAST, shipping.cust_nation ASC NULLS LAST, shipping.l_year ASC NULLS LAST
Projection: shipping.supp_nation, shipping.cust_nation, shipping.l_year, SUM(shipping.volume) AS revenue
Aggregate: groupBy=[[shipping.supp_nation, shipping.cust_nation, shipping.l_year]], aggr=[[SUM(shipping.volume)]]
Projection: supp_nation, cust_nation, l_year, volume, alias=shipping
SubqueryAlias: shipping
Projection: n1.n_name AS supp_nation, n2.n_name AS cust_nation, datepart(Utf8("YEAR"), lineitem.l_shipdate) AS l_year, CAST(lineitem.l_extendedprice AS Decimal128(38, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4)) AS volume
Filter: (n1.n_name = Utf8("FRANCE") OR n2.n_name = Utf8("FRANCE")) AND (n2.n_name = Utf8("GERMANY") OR n1.n_name = Utf8("GERMANY"))
Inner Join: customer.c_nationkey = n2.n_nationkey
Expand All @@ -19,4 +19,4 @@ Sort: shipping.supp_nation ASC NULLS LAST, shipping.cust_nation ASC NULLS LAST,
TableScan: nation projection=[n_nationkey, n_name]
Filter: n2.n_name = Utf8("GERMANY") OR n2.n_name = Utf8("FRANCE")
SubqueryAlias: n2
TableScan: nation projection=[n_nationkey, n_name]
TableScan: nation projection=[n_nationkey, n_name]
2 changes: 1 addition & 1 deletion benchmarks/expected-plans/q8.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Sort: all_nations.o_year ASC NULLS LAST
Projection: all_nations.o_year, SUM(CASE WHEN all_nations.nation = Utf8("BRAZIL") THEN all_nations.volume ELSE Int64(0) END) / SUM(all_nations.volume) AS mkt_share
Aggregate: groupBy=[[all_nations.o_year]], aggr=[[SUM(CASE WHEN all_nations.nation = Utf8("BRAZIL") THEN all_nations.volume ELSE Decimal128(Some(0),38,4) END) AS SUM(CASE WHEN all_nations.nation = Utf8("BRAZIL") THEN all_nations.volume ELSE Int64(0) END), SUM(all_nations.volume)]]
Projection: o_year, volume, nation, alias=all_nations
SubqueryAlias: all_nations
Projection: datepart(Utf8("YEAR"), orders.o_orderdate) AS o_year, CAST(lineitem.l_extendedprice AS Decimal128(38, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4)) AS volume, n2.n_name AS nation
Projection: lineitem.l_extendedprice, lineitem.l_discount, orders.o_orderdate, n2.n_name
Inner Join: n1.n_regionkey = region.r_regionkey
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/expected-plans/q9.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Sort: profit.nation ASC NULLS LAST, profit.o_year DESC NULLS FIRST
Projection: profit.nation, profit.o_year, SUM(profit.amount) AS sum_profit
Aggregate: groupBy=[[profit.nation, profit.o_year]], aggr=[[SUM(profit.amount)]]
Projection: nation, o_year, amount, alias=profit
SubqueryAlias: profit
Projection: nation.n_name AS nation, datepart(Utf8("YEAR"), orders.o_orderdate) AS o_year, CAST(lineitem.l_extendedprice AS Decimal128(38, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4)) - CAST(partsupp.ps_supplycost * lineitem.l_quantity AS Decimal128(38, 4)) AS amount
Projection: lineitem.l_quantity, lineitem.l_extendedprice, lineitem.l_discount, partsupp.ps_supplycost, orders.o_orderdate, nation.n_name
Inner Join: supplier.s_nationkey = nation.n_nationkey
Expand Down
6 changes: 3 additions & 3 deletions datafusion/core/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1342,10 +1342,10 @@ mod tests {
\n Limit: skip=0, fetch=1\
\n Sort: t1.c1 ASC NULLS FIRST, t1.c2 ASC NULLS FIRST, t1.c3 ASC NULLS FIRST, t2.c1 ASC NULLS FIRST, t2.c2 ASC NULLS FIRST, t2.c3 ASC NULLS FIRST, fetch=1\
\n Inner Join: t1.c1 = t2.c1\
\n Projection: aggregate_test_100.c1, aggregate_test_100.c2, aggregate_test_100.c3, alias=t1\
\n SubqueryAlias: t1\
\n Projection: aggregate_test_100.c1, aggregate_test_100.c2, aggregate_test_100.c3\
\n TableScan: aggregate_test_100 projection=[c1, c2, c3]\
\n Projection: aggregate_test_100.c1, aggregate_test_100.c2, aggregate_test_100.c3, alias=t2\
\n SubqueryAlias: t2\
\n Projection: aggregate_test_100.c1, aggregate_test_100.c2, aggregate_test_100.c3\
\n TableScan: aggregate_test_100 projection=[c1, c2, c3]",
format!("{:?}", df_renamed.to_logical_plan()?)
Expand Down Expand Up @@ -1388,7 +1388,7 @@ mod tests {
let plan = df.explain(false, false)?.collect().await?;
// Filters all the way to Parquet
let formatted = pretty::pretty_format_batches(&plan).unwrap().to_string();
assert!(formatted.contains("predicate=id_min@0 <= 1 AND 1 <= id_max@1"));
assert!(formatted.contains("FilterExec: id@0 = 1"));

Ok(())
}
Expand Down
5 changes: 3 additions & 2 deletions datafusion/core/src/datasource/view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ mod tests {
let formatted = arrow::util::pretty::pretty_format_batches(&plan)
.unwrap()
.to_string();
assert!(formatted.contains("predicate=id_min@0 <= 1 AND 1 <= id_max@1"));
assert!(formatted.contains("FilterExec: id@0 = 1"));
Ok(())
}

Expand Down Expand Up @@ -474,7 +474,8 @@ mod tests {
let formatted = arrow::util::pretty::pretty_format_batches(&plan)
.unwrap()
.to_string();
assert!(formatted.contains("ParquetExec: limit=Some(10)"));
// TODO: limit_push_down support SubqueryAlias
assert!(formatted.contains("GlobalLimitExec: skip=0, fetch=10"));
Ok(())
}

Expand Down

0 comments on commit ad3df7d

Please sign in to comment.