Skip to content

Commit

Permalink
Make Like a top-level Expr
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed Sep 6, 2022
1 parent 191d8b7 commit 64e8559
Show file tree
Hide file tree
Showing 16 changed files with 360 additions and 95 deletions.
3 changes: 3 additions & 0 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1439,6 +1439,8 @@ impl SessionState {
rules.push(Arc::new(TypeCoercion::new()));
rules.push(Arc::new(LimitPushDown::new()));
rules.push(Arc::new(SingleDistinctToGroupBy::new()));
//TODO add a config so we can turn this off since it is new
rules.push(Arc::new(TypeCoercion::default()));

let mut physical_optimizers: Vec<Arc<dyn PhysicalOptimizerRule + Sync + Send>> = vec![
Arc::new(AggregateStatistics::new()),
Expand Down Expand Up @@ -1589,6 +1591,7 @@ impl SessionState {
) -> Result<Arc<dyn ExecutionPlan>> {
let planner = self.query_planner.clone();
let logical_plan = self.optimize(logical_plan)?;
println!("optimized plan [2]: {:?}", logical_plan);
planner.create_physical_plan(&logical_plan, self).await
}
}
Expand Down
16 changes: 15 additions & 1 deletion datafusion/core/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1682,6 +1682,8 @@ mod tests {
use datafusion_expr::expr::GroupingSet;
use datafusion_expr::sum;
use datafusion_expr::{col, lit};
use datafusion_optimizer::type_coercion::TypeCoercion;
use datafusion_optimizer::{OptimizerConfig, OptimizerRule};
use fmt::Debug;
use std::collections::HashMap;
use std::convert::TryFrom;
Expand Down Expand Up @@ -1859,7 +1861,19 @@ mod tests {
col("c1").like(col("c2")),
];
for case in cases {
let logical_plan = test_csv_scan().await?.project(vec![case.clone()]);
let logical_plan = test_csv_scan()
.await?
.project(vec![case.clone()])
.and_then(|b| b.build())
.and_then(|plan| {
// this test was expecting type coercion/validation errors before the optimizer
// had run due to the legacy approach of type coercion but now we need to run the
// optimizer here
let type_coercion = TypeCoercion::default();
let mut config = OptimizerConfig::new();
type_coercion.optimize(&plan, &mut config)
});

let message = format!(
"Expression {:?} expected to error due to impossible coercion",
case
Expand Down
27 changes: 15 additions & 12 deletions datafusion/core/tests/sql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -751,29 +751,32 @@ async fn try_execute_to_batches(
/// Execute query and return results as a Vec of RecordBatches
async fn execute_to_batches(ctx: &SessionContext, sql: &str) -> Vec<RecordBatch> {
let msg = format!("Creating logical plan for '{}'", sql);
let plan = ctx
let logical_plan = ctx
.create_logical_plan(sql)
.map_err(|e| format!("{:?} at {}", e, msg))
.unwrap();
let logical_schema = plan.schema();
let logical_schema = logical_plan.schema();

let msg = format!("Optimizing logical plan for '{}': {:?}", sql, plan);
let plan = ctx
.optimize(&plan)
let msg = format!("Optimizing logical plan for '{}': {:?}", sql, logical_plan);
let optimized_logical_plan = ctx
.optimize(&logical_plan)
.map_err(|e| format!("{:?} at {}", e, msg))
.unwrap();
let optimized_logical_schema = plan.schema();

let msg = format!("Creating physical plan for '{}': {:?}", sql, plan);
let plan = ctx
.create_physical_plan(&plan)
println!("optimized plan [1]: {:?}", optimized_logical_plan);
let optimized_logical_schema = optimized_logical_plan.schema();

// creating a physical plan will call `optimize` again so we pass in the
// unoptimized logical plan here
let msg = format!("Creating physical plan for '{}': {:?}", sql, logical_plan);
let physical_plan = ctx
.create_physical_plan(&logical_plan)
.await
.map_err(|e| format!("{:?} at {}", e, msg))
.unwrap();

let msg = format!("Executing physical plan for '{}': {:?}", sql, plan);
let msg = format!("Executing physical plan for '{}': {:?}", sql, physical_plan);
let task_ctx = ctx.task_ctx();
let results = collect(plan, task_ctx)
let results = collect(physical_plan, task_ctx)
.await
.map_err(|e| format!("{:?} at {}", e, msg))
.unwrap();
Expand Down
6 changes: 1 addition & 5 deletions datafusion/expr/src/binary_rule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@ pub fn binary_operator_data_type(
| Operator::NotEq
| Operator::And
| Operator::Or
| Operator::Like
| Operator::NotLike
| Operator::Lt
| Operator::Gt
| Operator::GtEq
Expand Down Expand Up @@ -95,8 +93,6 @@ pub fn coerce_types(
| Operator::Gt
| Operator::GtEq
| Operator::LtEq => comparison_coercion(lhs_type, rhs_type),
// "like" operators operate on strings and always return a boolean
Operator::Like | Operator::NotLike => like_coercion(lhs_type, rhs_type),
// date +/- interval returns date
Operator::Plus | Operator::Minus
if (*lhs_type == DataType::Date32
Expand Down Expand Up @@ -504,7 +500,7 @@ fn string_coercion(lhs_type: &DataType, rhs_type: &DataType) -> Option<DataType>

/// coercion rules for like operations.
/// This is a union of string coercion rules and dictionary coercion rules
fn like_coercion(lhs_type: &DataType, rhs_type: &DataType) -> Option<DataType> {
pub fn like_coercion(lhs_type: &DataType, rhs_type: &DataType) -> Option<DataType> {
string_coercion(lhs_type, rhs_type)
.or_else(|| dictionary_coercion(lhs_type, rhs_type, false))
.or_else(|| null_coercion(lhs_type, rhs_type))
Expand Down
15 changes: 13 additions & 2 deletions datafusion/expr/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -444,12 +444,22 @@ impl Expr {

/// Return `self LIKE other`
pub fn like(self, other: Expr) -> Expr {
binary_expr(self, Operator::Like, other)
Expr::Like {
negated: false,
expr: Box::new(self),
pattern: Box::new(other),
escape_char: None,
}
}

/// Return `self NOT LIKE other`
pub fn not_like(self, other: Expr) -> Expr {
binary_expr(self, Operator::NotLike, other)
Expr::Like {
negated: true,
expr: Box::new(self),
pattern: Box::new(other),
escape_char: None,
}
}

/// Return `self AS name` alias expression
Expand Down Expand Up @@ -505,6 +515,7 @@ impl Not for Expr {
type Output = Self;

fn not(self) -> Self::Output {
// TODO file issue for extending this to other similar expressions
match self {
Expr::Like {
negated,
Expand Down
24 changes: 21 additions & 3 deletions datafusion/expr/src/expr_fn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@ use crate::expr::GroupingSet;
use crate::{
aggregate_function, built_in_function, conditional_expressions::CaseBuilder, lit,
logical_plan::Subquery, AccumulatorFunctionImplementation, AggregateUDF,
BuiltinScalarFunction, Expr, LogicalPlan, Operator, ReturnTypeFunction,
ScalarFunctionImplementation, ScalarUDF, Signature, StateTypeFunction, Volatility,
BuiltinScalarFunction, Expr, ExprSchemable, LogicalPlan, Operator,
ReturnTypeFunction, ScalarFunctionImplementation, ScalarUDF, Signature,
StateTypeFunction, Volatility,
};
use arrow::datatypes::DataType;
use datafusion_common::Result;
use datafusion_common::{DFSchema, Result};
use std::sync::Arc;

/// Create a column expression based on a qualified or unqualified column name
Expand Down Expand Up @@ -259,6 +260,23 @@ pub fn cast(expr: Expr, data_type: DataType) -> Expr {
}
}

/// Create a cast expression
pub fn cast_if_needed(
expr: Expr,
data_type: &DataType,
input_schema: &DFSchema,
) -> Result<Expr> {
let t = expr.get_type(input_schema)?;
if &t == data_type {
Ok(expr)
} else {
Ok(Expr::Cast {
expr: Box::new(expr),
data_type: data_type.clone(),
})
}
}

/// Create an convenience function representing a unary scalar function
macro_rules! unary_scalar_expr {
($ENUM:ident, $FUNC:ident, $DOC:expr) => {
Expand Down
8 changes: 0 additions & 8 deletions datafusion/expr/src/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,6 @@ pub enum Operator {
And,
/// Logical OR, like `||`
Or,
/// Matches a wildcard pattern
Like,
/// Does not match a wildcard pattern
NotLike,
/// IS DISTINCT FROM
IsDistinctFrom,
/// IS NOT DISTINCT FROM
Expand Down Expand Up @@ -90,8 +86,6 @@ impl Operator {
Operator::LtEq => Some(Operator::Gt),
Operator::Gt => Some(Operator::LtEq),
Operator::GtEq => Some(Operator::Lt),
Operator::Like => Some(Operator::NotLike),
Operator::NotLike => Some(Operator::Like),
Operator::IsDistinctFrom => Some(Operator::IsNotDistinctFrom),
Operator::IsNotDistinctFrom => Some(Operator::IsDistinctFrom),
Operator::Plus
Expand Down Expand Up @@ -130,8 +124,6 @@ impl fmt::Display for Operator {
Operator::Modulo => "%",
Operator::And => "AND",
Operator::Or => "OR",
Operator::Like => "LIKE",
Operator::NotLike => "NOT LIKE",
Operator::RegexMatch => "~",
Operator::RegexIMatch => "~*",
Operator::RegexNotMatch => "!~",
Expand Down
16 changes: 14 additions & 2 deletions datafusion/optimizer/src/simplify_expressions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,18 @@ fn negate_clause(expr: Expr) -> Expr {
};
}
match op {
// not (A is distinct from B) ===> (A is not distinct from B)
Operator::IsDistinctFrom => Expr::BinaryExpr {
left,
op: Operator::IsNotDistinctFrom,
right,
},
// not (A is not distinct from B) ===> (A is distinct from B)
Operator::IsNotDistinctFrom => Expr::BinaryExpr {
left,
op: Operator::IsDistinctFrom,
right,
},
// not (A and B) ===> (not A) or (not B)
Operator::And => {
let left = negate_clause(*left);
Expand Down Expand Up @@ -2210,7 +2222,7 @@ mod tests {
.unwrap()
.build()
.unwrap();
let expected = "Filter: #test.a NOT LIKE #test.b AS NOT test.a LIKE test.b\
let expected = "Filter: #test.a NOT LIKE #test.b\
\n TableScan: test";

assert_optimized_plan_eq(&plan, expected);
Expand All @@ -2232,7 +2244,7 @@ mod tests {
.unwrap()
.build()
.unwrap();
let expected = "Filter: #test.a LIKE #test.b AS NOT test.a NOT LIKE test.b\
let expected = "Filter: #test.a LIKE #test.b\
\n TableScan: test";

assert_optimized_plan_eq(&plan, expected);
Expand Down
41 changes: 3 additions & 38 deletions datafusion/physical-expr/src/expressions/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ use arrow::compute::kernels::arithmetic::{
multiply_scalar, subtract, subtract_scalar,
};
use arrow::compute::kernels::boolean::{and_kleene, not, or_kleene};
use arrow::compute::kernels::comparison::regexp_is_match_utf8;
use arrow::compute::kernels::comparison::regexp_is_match_utf8_scalar;
use arrow::compute::kernels::comparison::{
eq_dyn_binary_scalar, gt_dyn_binary_scalar, gt_eq_dyn_binary_scalar,
lt_dyn_binary_scalar, lt_eq_dyn_binary_scalar, neq_dyn_binary_scalar,
Expand All @@ -47,10 +49,6 @@ use arrow::compute::kernels::comparison::{
use arrow::compute::kernels::comparison::{
eq_scalar, gt_eq_scalar, gt_scalar, lt_eq_scalar, lt_scalar, neq_scalar,
};
use arrow::compute::kernels::comparison::{like_utf8, nlike_utf8, regexp_is_match_utf8};
use arrow::compute::kernels::comparison::{
like_utf8_scalar, nlike_utf8_scalar, regexp_is_match_utf8_scalar,
};

use adapter::{eq_dyn, gt_dyn, gt_eq_dyn, lt_dyn, lt_eq_dyn, neq_dyn};
use arrow::compute::kernels::concat_elements::concat_elements_utf8;
Expand Down Expand Up @@ -323,19 +321,6 @@ macro_rules! compute_op {
}};
}

macro_rules! binary_string_array_op_scalar {
($LEFT:expr, $RIGHT:expr, $OP:ident, $OP_TYPE:expr) => {{
let result: Result<Arc<dyn Array>> = match $LEFT.data_type() {
DataType::Utf8 => compute_utf8_op_scalar!($LEFT, $RIGHT, $OP, StringArray, $OP_TYPE),
other => Err(DataFusionError::Internal(format!(
"Data type {:?} not supported for scalar operation '{}' on string array",
other, stringify!($OP)
))),
};
Some(result)
}};
}

macro_rules! binary_string_array_op {
($LEFT:expr, $RIGHT:expr, $OP:ident) => {{
match $LEFT.data_type() {
Expand Down Expand Up @@ -623,7 +608,7 @@ impl PhysicalExpr for BinaryExpr {
}

/// unwrap underlying (non dictionary) value, if any, to pass to a scalar kernel
fn unwrap_dict_value(v: ScalarValue) -> ScalarValue {
pub fn unwrap_dict_value(v: ScalarValue) -> ScalarValue {
if let ScalarValue::Dictionary(_key_type, v) = v {
unwrap_dict_value(*v)
} else {
Expand Down Expand Up @@ -713,12 +698,6 @@ impl BinaryExpr {
Operator::NotEq => {
binary_array_op_dyn_scalar!(array, scalar.clone(), neq, bool_type)
}
Operator::Like => {
binary_string_array_op_scalar!(array, scalar.clone(), like, bool_type)
}
Operator::NotLike => {
binary_string_array_op_scalar!(array, scalar.clone(), nlike, bool_type)
}
Operator::Plus => {
binary_primitive_array_op_scalar!(array, scalar.clone(), add)
}
Expand Down Expand Up @@ -818,8 +797,6 @@ impl BinaryExpr {
right_data_type: &DataType,
) -> Result<ArrayRef> {
match &self.op {
Operator::Like => binary_string_array_op!(left, right, like),
Operator::NotLike => binary_string_array_op!(left, right, nlike),
Operator::Lt => lt_dyn(&left, &right),
Operator::LtEq => lt_eq_dyn(&left, &right),
Operator::Gt => gt_dyn(&left, &right),
Expand Down Expand Up @@ -1115,18 +1092,6 @@ mod tests {
DataType::Float32,
vec![2f32]
);
test_coercion!(
StringArray,
DataType::Utf8,
vec!["hello world", "world"],
StringArray,
DataType::Utf8,
vec!["%hello%", "%hello%"],
Operator::Like,
BooleanArray,
DataType::Boolean,
vec![true, false]
);
test_coercion!(
StringArray,
DataType::Utf8,
Expand Down

0 comments on commit 64e8559

Please sign in to comment.