Skip to content

Commit

Permalink
Update sqlparser to 29.0.0 (#4770)
Browse files Browse the repository at this point in the history
* Sql planner support for rollup/cube/grouping sets ast nodes

* Trigger build

* Update sqlparser to 29.0.0

* Update for api changes

* Update one test

* sqlparser update

* fmt + clippy

* reduce stack overhead

Co-authored-by: Jefffrey <22608443+Jefffrey@users.noreply.github.com>
  • Loading branch information
alamb and Jefffrey committed Jan 1, 2023
1 parent acb8846 commit 9a8190a
Show file tree
Hide file tree
Showing 8 changed files with 92 additions and 30 deletions.
4 changes: 2 additions & 2 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion datafusion/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,4 @@ cranelift-module = { version = "0.89.0", optional = true }
object_store = { version = "0.5.0", default-features = false, optional = true }
parquet = { version = "29.0.0", default-features = false, optional = true }
pyo3 = { version = "0.17.1", optional = true }
sqlparser = "0.28"
sqlparser = "0.29"
3 changes: 1 addition & 2 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ pyo3 = { version = "0.17.1", optional = true }
rand = "0.8"
rayon = { version = "1.5", optional = true }
smallvec = { version = "1.6", features = ["union"] }
sqlparser = "0.28"
sqlparser = "0.29"
tempfile = "3"
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync", "fs", "parking_lot"] }
tokio-stream = "0.1"
Expand All @@ -111,7 +111,6 @@ env_logger = "0.10"
parquet-test-utils = { path = "../../parquet-test-utils" }
rstest = "0.16.0"
sqllogictest = "0.10.0"
sqlparser = "0.28"
test-utils = { path = "../../test-utils" }
thiserror = "1.0.37"

Expand Down
3 changes: 1 addition & 2 deletions datafusion/core/tests/sql/explain_analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -898,8 +898,7 @@ async fn explain_nested() {
.set_bool(OPT_EXPLAIN_PHYSICAL_PLAN_ONLY, explain_phy_plan_flag);
let ctx = SessionContext::with_config(config);
let sql = "EXPLAIN explain select 1";
let dataframe = ctx.sql(sql).await.unwrap();
let err = dataframe.create_physical_plan().await.unwrap_err();
let err = ctx.sql(sql).await.unwrap_err();
assert!(err.to_string().contains("Explain must be root of the plan"));
}

Expand Down
2 changes: 1 addition & 1 deletion datafusion/expr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,4 @@ ahash = { version = "0.8", default-features = false, features = ["runtime-rng"]
arrow = { version = "29.0.0", default-features = false }
datafusion-common = { path = "../common", version = "15.0.0" }
log = "^0.4"
sqlparser = "0.28"
sqlparser = "0.29"
2 changes: 1 addition & 1 deletion datafusion/sql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,4 @@ arrow-schema = "29.0.0"
datafusion-common = { path = "../common", version = "15.0.0" }
datafusion-expr = { path = "../expr", version = "15.0.0" }
log = "^0.4"
sqlparser = "0.28"
sqlparser = "0.29"
26 changes: 16 additions & 10 deletions datafusion/sql/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use sqlparser::{
},
dialect::{keywords::Keyword, Dialect, GenericDialect},
parser::{Parser, ParserError},
tokenizer::{Token, Tokenizer},
tokenizer::{Token, TokenWithLocation, Tokenizer},
};
use std::{collections::HashMap, str::FromStr};
use std::{collections::VecDeque, fmt};
Expand Down Expand Up @@ -124,7 +124,7 @@ impl<'a> DFParser<'a> {
let tokens = tokenizer.tokenize()?;

Ok(DFParser {
parser: Parser::new(tokens, dialect),
parser: Parser::new(dialect).with_tokens(tokens),
})
}

Expand Down Expand Up @@ -165,13 +165,17 @@ impl<'a> DFParser<'a> {
}

/// Report an unexpected token
fn expected<T>(&self, expected: &str, found: Token) -> Result<T, ParserError> {
fn expected<T>(
&self,
expected: &str,
found: TokenWithLocation,
) -> Result<T, ParserError> {
parser_err!(format!("Expected {expected}, found: {found}"))
}

/// Parse a new expression
pub fn parse_statement(&mut self) -> Result<Statement, ParserError> {
match self.parser.peek_token() {
match self.parser.peek_token().token {
Token::Word(w) => {
match w.keyword {
Keyword::CREATE => {
Expand Down Expand Up @@ -227,7 +231,7 @@ impl<'a> DFParser<'a> {
}

loop {
if let Token::Word(_) = self.parser.peek_token() {
if let Token::Word(_) = self.parser.peek_token().token {
let identifier = self.parser.parse_identifier()?;
partitions.push(identifier.to_string());
} else {
Expand Down Expand Up @@ -262,7 +266,7 @@ impl<'a> DFParser<'a> {
loop {
if let Some(constraint) = self.parser.parse_optional_table_constraint()? {
constraints.push(constraint);
} else if let Token::Word(_) = self.parser.peek_token() {
} else if let Token::Word(_) = self.parser.peek_token().token {
let column_def = self.parse_column_def()?;
columns.push(column_def);
} else {
Expand Down Expand Up @@ -379,19 +383,21 @@ impl<'a> DFParser<'a> {

/// Parses the set of valid formats
fn parse_file_format(&mut self) -> Result<String, ParserError> {
match self.parser.next_token() {
let token = self.parser.next_token();
match &token.token {
Token::Word(w) => parse_file_type(&w.value),
unexpected => self.expected("one of PARQUET, NDJSON, or CSV", unexpected),
_ => self.expected("one of PARQUET, NDJSON, or CSV", token),
}
}

/// Parses the set of
fn parse_file_compression_type(
&mut self,
) -> Result<CompressionTypeVariant, ParserError> {
match self.parser.next_token() {
let token = self.parser.next_token();
match &token.token {
Token::Word(w) => CompressionTypeVariant::from_str(&w.value),
unexpected => self.expected("one of GZIP, BZIP2, XZ", unexpected),
_ => self.expected("one of GZIP, BZIP2, XZ", token),
}
}

Expand Down
80 changes: 69 additions & 11 deletions datafusion/sql/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2235,15 +2235,6 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
normalize_ident(function.name.0[0].clone())
};

// first, check SQL reserved words
if name == "rollup" {
let args = self.function_args_to_expr(function.args, schema)?;
return Ok(Expr::GroupingSet(GroupingSet::Rollup(args)));
} else if name == "cube" {
let args = self.function_args_to_expr(function.args, schema)?;
return Ok(Expr::GroupingSet(GroupingSet::Cube(args)));
}

// next, scalar built-in
if let Ok(fun) = BuiltinScalarFunction::from_str(&name) {
let args = self.function_args_to_expr(function.args, schema)?;
Expand Down Expand Up @@ -2347,6 +2338,10 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
}
}

SQLExpr::Rollup(exprs) => self.sql_rollup_to_expr(exprs, schema, planner_context),
SQLExpr::Cube(exprs) => self.sql_cube_to_expr(exprs,schema, planner_context),
SQLExpr::GroupingSets(exprs) => self.sql_grouping_sets_to_expr(exprs, schema, planner_context),

SQLExpr::Floor { expr, field: _field } => {
let fun = BuiltinScalarFunction::Floor;
let args = vec![self.sql_expr_to_logical_expr(*expr, schema, planner_context)?];
Expand Down Expand Up @@ -2387,6 +2382,67 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
})
}

fn sql_rollup_to_expr(
&self,
exprs: Vec<Vec<SQLExpr>>,
schema: &DFSchema,
planner_context: &mut PlannerContext,
) -> Result<Expr> {
let args: Result<Vec<_>> = exprs
.into_iter()
.map(|v| {
if v.len() != 1 {
Err(DataFusionError::Internal(
"Tuple expressions are not supported for Rollup expressions"
.to_string(),
))
} else {
self.sql_expr_to_logical_expr(v[0].clone(), schema, planner_context)
}
})
.collect();
Ok(Expr::GroupingSet(GroupingSet::Rollup(args?)))
}

fn sql_cube_to_expr(
&self,
exprs: Vec<Vec<SQLExpr>>,
schema: &DFSchema,
planner_context: &mut PlannerContext,
) -> Result<Expr> {
let args: Result<Vec<_>> = exprs
.into_iter()
.map(|v| {
if v.len() != 1 {
Err(DataFusionError::Internal(
"Tuple expressions not are supported for Cube expressions"
.to_string(),
))
} else {
self.sql_expr_to_logical_expr(v[0].clone(), schema, planner_context)
}
})
.collect();
Ok(Expr::GroupingSet(GroupingSet::Cube(args?)))
}

fn sql_grouping_sets_to_expr(
&self,
exprs: Vec<Vec<SQLExpr>>,
schema: &DFSchema,
planner_context: &mut PlannerContext,
) -> Result<Expr> {
let args: Result<Vec<Vec<_>>> = exprs
.into_iter()
.map(|v| {
v.into_iter()
.map(|e| self.sql_expr_to_logical_expr(e, schema, planner_context))
.collect()
})
.collect();
Ok(Expr::GroupingSet(GroupingSet::GroupingSets(args?)))
}

fn parse_exists_subquery(
&self,
subquery: Query,
Expand Down Expand Up @@ -2634,6 +2690,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
SQLExpr::Identifier(i) => i.to_string(),
SQLExpr::Value(v) => match v {
Value::SingleQuotedString(s) => s.to_string(),
Value::DollarQuotedString(s) => s.to_string(),
Value::Number(_, _) | Value::Boolean(_) => v.to_string(),
Value::DoubleQuotedString(_)
| Value::UnQuotedString(_)
Expand Down Expand Up @@ -5664,11 +5721,12 @@ mod tests {
quick_test(sql, expected);
}

#[ignore] // see https://github.com/apache/arrow-datafusion/issues/2469
#[test]
fn aggregate_with_grouping_sets() {
let sql = "SELECT id, state, age, COUNT(*) FROM person GROUP BY id, GROUPING SETS ((state), (state, age), (id, state))";
let expected = "TBD";
let expected = "Projection: person.id, person.state, person.age, COUNT(UInt8(1))\
\n Aggregate: groupBy=[[person.id, GROUPING SETS ((person.state), (person.state, person.age), (person.id, person.state))]], aggr=[[COUNT(UInt8(1))]]\
\n TableScan: person";
quick_test(sql, expected);
}

Expand Down

0 comments on commit 9a8190a

Please sign in to comment.