Skip to content

Commit

Permalink
support SET variable (apache#4069)
Browse files Browse the repository at this point in the history
* support SET

* remove useless comment

* add test cases

* Update datafusion/core/src/config.rs

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>

* Update datafusion/core/src/execution/context.rs

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>

* fix test cases

* fmt

* Update datafusion/expr/src/logical_plan/plan.rs

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
  • Loading branch information
2 people authored and Dandandan committed Nov 5, 2022
1 parent f676611 commit 8478366
Show file tree
Hide file tree
Showing 13 changed files with 483 additions and 7 deletions.
5 changes: 5 additions & 0 deletions datafusion/core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,11 @@ impl ConfigOptions {
self.set(key, ScalarValue::UInt64(Some(value)))
}

/// set a `String` configuration option
pub fn set_string(&mut self, key: &str, value: impl Into<String>) {
self.set(key, ScalarValue::Utf8(Some(value.into())))
}

/// get a configuration option
pub fn get(&self, key: &str) -> Option<ScalarValue> {
self.options.get(key).cloned()
Expand Down
56 changes: 55 additions & 1 deletion datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ use crate::error::{DataFusionError, Result};
use crate::logical_expr::{
CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateMemoryTable,
CreateView, DropTable, DropView, Explain, LogicalPlan, LogicalPlanBuilder,
TableSource, TableType, UNNAMED_TABLE,
SetVariable, TableSource, TableType, UNNAMED_TABLE,
};
use crate::optimizer::optimizer::{OptimizerConfig, OptimizerRule};
use datafusion_sql::{ResolvedTableReference, TableReference};
Expand Down Expand Up @@ -341,6 +341,60 @@ impl SessionContext {
))),
}
}

LogicalPlan::SetVariable(SetVariable {
variable, value, ..
}) => {
let config_options = &self.state.write().config.config_options;

let old_value =
config_options.read().get(&variable).ok_or_else(|| {
DataFusionError::Execution(format!(
"Can not SET variable: Unknown Variable {}",
variable
))
})?;

match old_value {
ScalarValue::Boolean(_) => {
let new_value = value.parse::<bool>().map_err(|_| {
DataFusionError::Execution(format!(
"Failed to parse {} as bool",
value,
))
})?;
config_options.write().set_bool(&variable, new_value);
}

ScalarValue::UInt64(_) => {
let new_value = value.parse::<u64>().map_err(|_| {
DataFusionError::Execution(format!(
"Failed to parse {} as u64",
value,
))
})?;
config_options.write().set_u64(&variable, new_value);
}

ScalarValue::Utf8(_) => {
let new_value = value.parse::<String>().map_err(|_| {
DataFusionError::Execution(format!(
"Failed to parse {} as String",
value,
))
})?;
config_options.write().set_string(&variable, new_value);
}

_ => {
return Err(DataFusionError::Execution(
"Unsupported Scalar Value Type".to_string(),
))
}
}
self.return_empty_dataframe()
}

LogicalPlan::CreateCatalogSchema(CreateCatalogSchema {
schema_name,
if_not_exists,
Expand Down
5 changes: 5 additions & 0 deletions datafusion/core/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1097,6 +1097,11 @@ impl DefaultPhysicalPlanner {
"Unsupported logical plan: CreateView".to_string(),
))
}
LogicalPlan::SetVariable(_) => {
Err(DataFusionError::Internal(
"Unsupported logical plan: SetVariable must be root of the plan".to_string(),
))
}
LogicalPlan::Explain(_) => Err(DataFusionError::Internal(
"Unsupported logical plan: Explain must be root of the plan".to_string(),
)),
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/tests/sql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ pub mod idenfifers;
pub mod information_schema;
pub mod parquet_schema;
pub mod partitioned_csv;
pub mod set_variable;
pub mod subqueries;
#[cfg(feature = "unicode_expressions")]
pub mod unicode;
Expand Down
296 changes: 296 additions & 0 deletions datafusion/core/tests/sql/set_variable.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,296 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use super::*;

#[tokio::test]
async fn set_variable_to_value() {
let ctx =
SessionContext::with_config(SessionConfig::new().with_information_schema(true));

ctx.sql("SET datafusion.execution.batch_size to 1")
.await
.unwrap();
let result = plan_and_collect(&ctx, "SHOW datafusion.execution.batch_size")
.await
.unwrap();
let expected = vec![
"+---------------------------------+---------+",
"| name | setting |",
"+---------------------------------+---------+",
"| datafusion.execution.batch_size | 1 |",
"+---------------------------------+---------+",
];
assert_batches_sorted_eq!(expected, &result);
}

#[tokio::test]
async fn set_variable_to_value_with_equal_sign() {
let ctx =
SessionContext::with_config(SessionConfig::new().with_information_schema(true));

ctx.sql("SET datafusion.execution.batch_size = 1")
.await
.unwrap();
let result = plan_and_collect(&ctx, "SHOW datafusion.execution.batch_size")
.await
.unwrap();
let expected = vec![
"+---------------------------------+---------+",
"| name | setting |",
"+---------------------------------+---------+",
"| datafusion.execution.batch_size | 1 |",
"+---------------------------------+---------+",
];
assert_batches_sorted_eq!(expected, &result);
}

#[tokio::test]
async fn set_variable_to_value_with_single_quoted_string() {
let ctx =
SessionContext::with_config(SessionConfig::new().with_information_schema(true));

ctx.sql("SET datafusion.execution.batch_size to '1'")
.await
.unwrap();
let result = plan_and_collect(&ctx, "SHOW datafusion.execution.batch_size")
.await
.unwrap();
let expected = vec![
"+---------------------------------+---------+",
"| name | setting |",
"+---------------------------------+---------+",
"| datafusion.execution.batch_size | 1 |",
"+---------------------------------+---------+",
];
assert_batches_sorted_eq!(expected, &result);
}

#[tokio::test]
async fn set_variable_to_value_case_insensitive() {
let ctx =
SessionContext::with_config(SessionConfig::new().with_information_schema(true));

ctx.sql("SET datafusion.EXECUTION.batch_size to '1'")
.await
.unwrap();
let result = plan_and_collect(&ctx, "SHOW datafusion.execution.batch_size")
.await
.unwrap();
let expected = vec![
"+---------------------------------+---------+",
"| name | setting |",
"+---------------------------------+---------+",
"| datafusion.execution.batch_size | 1 |",
"+---------------------------------+---------+",
];
assert_batches_sorted_eq!(expected, &result);
}

#[tokio::test]
async fn set_variable_unknown_variable() {
let ctx = SessionContext::new();

let err = plan_and_collect(&ctx, "SET aabbcc to '1'")
.await
.unwrap_err();
assert_eq!(
err.to_string(),
"Execution error: Can not SET variable: Unknown Variable aabbcc"
);
}

#[tokio::test]
async fn set_bool_variable() {
let ctx =
SessionContext::with_config(SessionConfig::new().with_information_schema(true));

ctx.sql("SET datafusion.execution.coalesce_batches to true")
.await
.unwrap();
let result = plan_and_collect(&ctx, "SHOW datafusion.execution.coalesce_batches")
.await
.unwrap();
let expected = vec![
"+---------------------------------------+---------+",
"| name | setting |",
"+---------------------------------------+---------+",
"| datafusion.execution.coalesce_batches | true |",
"+---------------------------------------+---------+",
];
assert_batches_eq!(expected, &result);

ctx.sql("SET datafusion.execution.coalesce_batches to 'false'")
.await
.unwrap();
let result = plan_and_collect(&ctx, "SHOW datafusion.execution.coalesce_batches")
.await
.unwrap();
let expected = vec![
"+---------------------------------------+---------+",
"| name | setting |",
"+---------------------------------------+---------+",
"| datafusion.execution.coalesce_batches | false |",
"+---------------------------------------+---------+",
];
assert_batches_eq!(expected, &result);
}

#[tokio::test]
async fn set_bool_variable_bad_value() {
let ctx =
SessionContext::with_config(SessionConfig::new().with_information_schema(true));

let err = plan_and_collect(&ctx, "SET datafusion.execution.coalesce_batches to 1")
.await
.unwrap_err();

assert_eq!(
err.to_string(),
"Execution error: Failed to parse 1 as bool"
);

let err = plan_and_collect(&ctx, "SET datafusion.execution.coalesce_batches to abc")
.await
.unwrap_err();

assert_eq!(
err.to_string(),
"Execution error: Failed to parse abc as bool"
);
}

#[tokio::test]
async fn set_u64_variable() {
let ctx =
SessionContext::with_config(SessionConfig::new().with_information_schema(true));

ctx.sql("SET datafusion.execution.batch_size to 0")
.await
.unwrap();
let result = plan_and_collect(&ctx, "SHOW datafusion.execution.batch_size")
.await
.unwrap();
let expected = vec![
"+---------------------------------+---------+",
"| name | setting |",
"+---------------------------------+---------+",
"| datafusion.execution.batch_size | 0 |",
"+---------------------------------+---------+",
];
assert_batches_eq!(expected, &result);

ctx.sql("SET datafusion.execution.batch_size to '1'")
.await
.unwrap();
let result = plan_and_collect(&ctx, "SHOW datafusion.execution.batch_size")
.await
.unwrap();
let expected = vec![
"+---------------------------------+---------+",
"| name | setting |",
"+---------------------------------+---------+",
"| datafusion.execution.batch_size | 1 |",
"+---------------------------------+---------+",
];
assert_batches_eq!(expected, &result);

ctx.sql("SET datafusion.execution.batch_size to +2")
.await
.unwrap();
let result = plan_and_collect(&ctx, "SHOW datafusion.execution.batch_size")
.await
.unwrap();
let expected = vec![
"+---------------------------------+---------+",
"| name | setting |",
"+---------------------------------+---------+",
"| datafusion.execution.batch_size | 2 |",
"+---------------------------------+---------+",
];
assert_batches_eq!(expected, &result);
}

#[tokio::test]
async fn set_u64_variable_bad_value() {
let ctx =
SessionContext::with_config(SessionConfig::new().with_information_schema(true));

let err = plan_and_collect(&ctx, "SET datafusion.execution.batch_size to -1")
.await
.unwrap_err();

assert_eq!(
err.to_string(),
"Execution error: Failed to parse -1 as u64"
);

let err = plan_and_collect(&ctx, "SET datafusion.execution.batch_size to abc")
.await
.unwrap_err();

assert_eq!(
err.to_string(),
"Execution error: Failed to parse abc as u64"
);

let err = plan_and_collect(&ctx, "SET datafusion.execution.batch_size to 0.1")
.await
.unwrap_err();

assert_eq!(
err.to_string(),
"Execution error: Failed to parse 0.1 as u64"
);
}

#[tokio::test]
async fn set_time_zone() {
// we don't support changing time zone for now until all time zone issues fixed and related function completed

let ctx = SessionContext::new();

// for full variable name
let err = plan_and_collect(&ctx, "set datafusion.execution.time_zone = '8'")
.await
.unwrap_err();

assert_eq!(
err.to_string(),
"Error during planning: Changing Time Zone isn't supported yet"
);

// for alias time zone
let err = plan_and_collect(&ctx, "set time zone = '8'")
.await
.unwrap_err();

assert_eq!(
err.to_string(),
"Error during planning: Changing Time Zone isn't supported yet"
);

// for alias timezone
let err = plan_and_collect(&ctx, "set timezone = '8'")
.await
.unwrap_err();

assert_eq!(
err.to_string(),
"Error during planning: Changing Time Zone isn't supported yet"
);
}

0 comments on commit 8478366

Please sign in to comment.