Skip to content

Commit

Permalink
Add support for correlated subqueries & fix all related TPC-H benchma…
Browse files Browse the repository at this point in the history
…rk issues (#2885)

* Failing test case for TPC-H query 20

* Fix name

* Broken test for adding intervals to dates

* Tests pass

* Fix rebase

* Fix query

* Additional tests

* Reduce to minimum failing (and passing) cases

* Adjust so data _should_ be returned, but see none

* Fixed data, decorrelated test passes

* Check in plans

* Put real assertion in place

* Add test for already working subquery optimizer

* Add decorellator

* Check in broken test

* Add some passing and failing tests to see scope of problem

* Have almost all inputs needed for optimization, but need to catch 1 level earlier in tree

* Collected all inputs, now we just need to optimize

* Successfully decorrelated query 4

* refactor

* Pass test 4

* Ready for PR?

* Only operate on equality expressions

* Lint error

* Tests still pass because we are losing remaining predicate

* Don't lose remaining expressions

* Update test to expect remaining filter clause

* Debugging

* Can run query 4

* Remove debugging code

* Clippy

* Refactor where exists, add scalar subquery

* Login qty < () and 0.2 times, predicate pushdown is killing our plan

* Query plan looks good

* Fudge data to make test output nicer

* Fix syntax error

* [WIP] where in

* Working recursively, q20 plan looks good, but execution failing

* Fix CSV for execution error, remove silly variables in favor of --nocapture

* Silence verbose logs

* Query 21 test

* [WIP] refactoring, query 4 looking good

* [WIP] 4 & 17 look good

* 22 good?

* Check in "Test" for query 11

* query 11 works

* Don't throw away plans when multiple subqueries in one filter

* Manually decorellate query 21

* [WIP] add data for query 21, anti join failing for some reason

* Does appear to be problem with anti-join

* Minimum failing test

* Verify anti join fix

* Repeatable tests

* cargo fmt

* Restore some optimizers and update test expectations

* Restore some optimizers and update test expectations

* Restore some optimizers and update test expectations

* Restore some optimizers and update test expectations

* Cleanup

* Cleanup scalar subquery, de-duplicate some code

* Cleanup

* Refactor

* Refactor

* Refactor

* Refactor

* Handle recursive where in

* Update assertions

* Support recursion in where exists queries

* Unit tests on where in

* Add correlated where in test

* Nasty code to make where in work for both correlated and uncorrelated queries

* Cleanup

* Refactoring

* Refactoring

* Add correlated unit test

* Add correlated where exists unit test

* [WIP] Failing scalar subquery unit test

* Refactor

* tuple mixup

* Scalar subquery unit test

* ASF header

* PR feedback

* PR feedback

* PR feedback

* PR feedback

* Fix build again

* Formatting

* Testing

* multiple where in

* Unit tests for where in

* where exists tests

* scalar subquery tests

* add aggregates to scalar subqueries

* Remove tests that only existed to get logical plans as input to unit tests

* Check in assertions for valid tests

* 1/33 passing unit tests :/

* Down to one failing test

* All the unit tests pass

* into methods

* Where exists unit tests passing

* Try from methods

* Fix tests

* Fix tests

* Refactor

* Fix test

* Refactor

* Fix test

* Fix error message

* Fix tests

* Fix tests

* Refactor

* Refactor and fix tests

* Improved recursive subquery test

* Recursive subquery fix

* Update tests

* Update tests

* Update tests

* Doc

* Clippy

* Linter & clippy

* Add doc, move test methods into test modules

* PR cleanup

* Inline test data

* Remove shared test data

* Remove shared test data

* Update tests

* Fix toml

* Update expectation

* PR feedback

* PR feedback

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>

* Fix test to reveal logic error

* Simplify test

* Fix stuff, break other stuff

* I've writen scala in rust because I'm in a hurry :(

* Clean the API up a little

* PR feedback

* PR feedback

* PR feedback

* PR feedback

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
  • Loading branch information
avantgardnerio and alamb committed Jul 22, 2022
1 parent 117df4d commit 7b0f2f8
Show file tree
Hide file tree
Showing 23 changed files with 2,952 additions and 13 deletions.
2 changes: 1 addition & 1 deletion benchmarks/queries/q20.sql
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ where
l_partkey = ps_partkey
and l_suppkey = ps_suppkey
and l_shipdate >= date '1994-01-01'
and l_shipdate < 'date 1994-01-01' + interval '1' year
and l_shipdate < date '1994-01-01' + interval '1' year
)
)
and s_nationkey = n_nationkey
Expand Down
27 changes: 27 additions & 0 deletions datafusion/common/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,30 @@ pub enum DataFusionError {
#[cfg(feature = "jit")]
/// Error occurs during code generation
JITError(ModuleError),
/// Error with additional context
Context(String, Box<DataFusionError>),
}

#[macro_export]
macro_rules! context {
($desc:expr, $err:expr) => {
datafusion_common::DataFusionError::Context(
format!("{} at {}:{}", $desc, file!(), line!()),
Box::new($err),
)
};
}

#[macro_export]
macro_rules! plan_err {
($desc:expr) => {
Err(datafusion_common::DataFusionError::Plan(format!(
"{} at {}:{}",
$desc,
file!(),
line!()
)))
};
}

/// Schema-related errors
Expand Down Expand Up @@ -285,6 +309,9 @@ impl Display for DataFusionError {
DataFusionError::ObjectStore(ref desc) => {
write!(f, "Object Store error: {}", desc)
}
DataFusionError::Context(ref desc, ref err) => {
write!(f, "{}\ncaused by\n{}", desc, *err)
}
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ uuid = { version = "1.0", features = ["v4"] }

[dev-dependencies]
criterion = "0.3"
csv = "1.1.6"
ctor = "0.1.22"
doc-comment = "0.3"
env_logger = "0.9"
fuzz-utils = { path = "fuzz-utils" }
Expand Down
6 changes: 6 additions & 0 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ use async_trait::async_trait;
use chrono::{DateTime, Utc};
use datafusion_common::ScalarValue;
use datafusion_expr::TableSource;
use datafusion_optimizer::decorrelate_scalar_subquery::DecorrelateScalarSubquery;
use datafusion_optimizer::decorrelate_where_exists::DecorrelateWhereExists;
use datafusion_optimizer::decorrelate_where_in::DecorrelateWhereIn;
use datafusion_optimizer::filter_null_join_keys::FilterNullJoinKeys;
use datafusion_sql::{
parser::DFParser,
Expand Down Expand Up @@ -1356,6 +1359,9 @@ impl SessionState {
// Simplify expressions first to maximize the chance
// of applying other optimizations
Arc::new(SimplifyExpressions::new()),
Arc::new(DecorrelateWhereExists::new()),
Arc::new(DecorrelateWhereIn::new()),
Arc::new(DecorrelateScalarSubquery::new()),
Arc::new(SubqueryFilterToJoin::new()),
Arc::new(EliminateFilter::new()),
Arc::new(CommonSubexprEliminate::new()),
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/physical_plan/coalesce_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use arrow::datatypes::SchemaRef;
use arrow::error::Result as ArrowResult;
use arrow::record_batch::RecordBatch;
use futures::stream::{Stream, StreamExt};
use log::debug;
use log::trace;

use super::expressions::PhysicalSortExpr;
use super::metrics::{BaselineMetrics, MetricsSet};
Expand Down Expand Up @@ -286,7 +286,7 @@ pub fn concat_batches(
)?;
arrays.push(array);
}
debug!(
trace!(
"Combined {} batches containing {} rows",
batches.len(),
row_count
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/physical_plan/file_format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ mod file_stream;
mod json;
mod parquet;

pub(crate) use self::csv::plan_to_csv;
pub use self::csv::CsvExec;
pub(crate) use self::parquet::plan_to_parquet;
pub use self::parquet::ParquetExec;
use arrow::{
Expand All @@ -36,8 +38,6 @@ use arrow::{
record_batch::RecordBatch,
};
pub use avro::AvroExec;
pub(crate) use csv::plan_to_csv;
pub use csv::CsvExec;
pub(crate) use json::plan_to_json;
pub use json::NdJsonExec;

Expand Down
111 changes: 110 additions & 1 deletion datafusion/core/tests/sql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ use datafusion_expr::Volatility;
use object_store::path::Path;
use std::fs::File;
use std::io::Write;
use std::ops::Sub;
use std::path::PathBuf;
use tempfile::TempDir;

Expand Down Expand Up @@ -108,6 +109,7 @@ mod explain;
mod idenfifers;
pub mod information_schema;
mod partitioned_csv;
mod subqueries;
#[cfg(feature = "unicode_expressions")]
pub mod unicode;

Expand Down Expand Up @@ -483,7 +485,43 @@ fn get_tpch_table_schema(table: &str) -> Schema {
Field::new("n_comment", DataType::Utf8, false),
]),

_ => unimplemented!(),
"supplier" => Schema::new(vec![
Field::new("s_suppkey", DataType::Int64, false),
Field::new("s_name", DataType::Utf8, false),
Field::new("s_address", DataType::Utf8, false),
Field::new("s_nationkey", DataType::Int64, false),
Field::new("s_phone", DataType::Utf8, false),
Field::new("s_acctbal", DataType::Float64, false),
Field::new("s_comment", DataType::Utf8, false),
]),

"partsupp" => Schema::new(vec![
Field::new("ps_partkey", DataType::Int64, false),
Field::new("ps_suppkey", DataType::Int64, false),
Field::new("ps_availqty", DataType::Int32, false),
Field::new("ps_supplycost", DataType::Float64, false),
Field::new("ps_comment", DataType::Utf8, false),
]),

"part" => Schema::new(vec![
Field::new("p_partkey", DataType::Int64, false),
Field::new("p_name", DataType::Utf8, false),
Field::new("p_mfgr", DataType::Utf8, false),
Field::new("p_brand", DataType::Utf8, false),
Field::new("p_type", DataType::Utf8, false),
Field::new("p_size", DataType::Int32, false),
Field::new("p_container", DataType::Utf8, false),
Field::new("p_retailprice", DataType::Float64, false),
Field::new("p_comment", DataType::Utf8, false),
]),

"region" => Schema::new(vec![
Field::new("r_regionkey", DataType::Int64, false),
Field::new("r_name", DataType::Utf8, false),
Field::new("r_comment", DataType::Utf8, false),
]),

_ => unimplemented!("Table: {}", table),
}
}

Expand All @@ -499,6 +537,77 @@ async fn register_tpch_csv(ctx: &SessionContext, table: &str) -> Result<()> {
Ok(())
}

async fn register_tpch_csv_data(
ctx: &SessionContext,
table_name: &str,
data: &str,
) -> Result<()> {
let schema = Arc::new(get_tpch_table_schema(table_name));

let mut reader = ::csv::ReaderBuilder::new()
.has_headers(false)
.from_reader(data.as_bytes());
let records: Vec<_> = reader.records().map(|it| it.unwrap()).collect();

let mut cols: Vec<Box<dyn ArrayBuilder>> = vec![];
for field in schema.fields().iter() {
match field.data_type() {
DataType::Utf8 => cols.push(Box::new(StringBuilder::new(records.len()))),
DataType::Date32 => cols.push(Box::new(Date32Builder::new(records.len()))),
DataType::Int32 => cols.push(Box::new(Int32Builder::new(records.len()))),
DataType::Int64 => cols.push(Box::new(Int64Builder::new(records.len()))),
DataType::Float64 => cols.push(Box::new(Float64Builder::new(records.len()))),
_ => {
let msg = format!("Not implemented: {}", field.data_type());
Err(DataFusionError::Plan(msg))?
}
}
}

for record in records.iter() {
for (idx, val) in record.iter().enumerate() {
let col = cols.get_mut(idx).unwrap();
let field = schema.field(idx);
match field.data_type() {
DataType::Utf8 => {
let sb = col.as_any_mut().downcast_mut::<StringBuilder>().unwrap();
sb.append_value(val)?;
}
DataType::Date32 => {
let sb = col.as_any_mut().downcast_mut::<Date32Builder>().unwrap();
let dt = NaiveDate::parse_from_str(val.trim(), "%Y-%m-%d").unwrap();
let dt = dt.sub(NaiveDate::from_ymd(1970, 1, 1)).num_days() as i32;
sb.append_value(dt)?;
}
DataType::Int32 => {
let sb = col.as_any_mut().downcast_mut::<Int32Builder>().unwrap();
sb.append_value(val.trim().parse().unwrap())?;
}
DataType::Int64 => {
let sb = col.as_any_mut().downcast_mut::<Int64Builder>().unwrap();
sb.append_value(val.trim().parse().unwrap())?;
}
DataType::Float64 => {
let sb = col.as_any_mut().downcast_mut::<Float64Builder>().unwrap();
sb.append_value(val.trim().parse().unwrap())?;
}
_ => Err(DataFusionError::Plan(format!(
"Not implemented: {}",
field.data_type()
)))?,
}
}
}
let cols: Vec<ArrayRef> = cols.iter_mut().map(|it| it.finish()).collect();

let batch = RecordBatch::try_new(Arc::clone(&schema), cols)?;

let table = Arc::new(MemTable::try_new(Arc::clone(&schema), vec![vec![batch]])?);
let _ = ctx.register_table(table_name, table).unwrap();

Ok(())
}

async fn register_aggregate_csv_by_sql(ctx: &SessionContext) {
let testdata = datafusion::test_util::arrow_test_data();

Expand Down

0 comments on commit 7b0f2f8

Please sign in to comment.