From 09c40ef25f07f77017baa2382f3a9794ab4233f0 Mon Sep 17 00:00:00 2001 From: Sho Nakatani Date: Tue, 10 May 2022 14:07:16 +0900 Subject: [PATCH 1/4] test: add test_feat_aggregation_without_group_by --- springql-core/tests/feat_aggregation.rs | 147 ++++++++++++++++++++++++ 1 file changed, 147 insertions(+) create mode 100644 springql-core/tests/feat_aggregation.rs diff --git a/springql-core/tests/feat_aggregation.rs b/springql-core/tests/feat_aggregation.rs new file mode 100644 index 00000000..0f6f7a86 --- /dev/null +++ b/springql-core/tests/feat_aggregation.rs @@ -0,0 +1,147 @@ +// This file is part of https://github.com/SpringQL/SpringQL which is licensed under MIT OR Apache-2.0. See file LICENSE-MIT or LICENSE-APACHE for full license details. + +mod test_support; + +use pretty_assertions::assert_eq; +use serde_json::json; +use springql_core::error::Result; +use springql_core::low_level_rs::*; +use springql_foreign_service::sink::ForeignSink; +use springql_foreign_service::source::source_input::ForeignSourceInput; +use springql_foreign_service::source::ForeignSource; +use springql_test_logger::setup_test_logger; + +use crate::test_support::*; + +fn gen_source_input() -> Vec { + let json_00_1 = json!({ + "ts": "2020-01-01 00:00:00.000000000", + "ticker": "ORCL", + "amount": 10, + }); + let json_00_2 = json!({ + "ts": "2020-01-01 00:00:09.9999999999", + "ticker": "GOOGL", + "amount": 30, + }); + let json_10_1 = json!({ + "ts": "2020-01-01 00:00:10.0000000000", + "ticker": "IBM", + "amount": 50, + }); + let json_20_1 = json!({ + "ts": "2020-01-01 00:00:20.0000000000", + "ticker": "IBM", + "amount": 70, + }); + + vec![json_00_1, json_00_2, json_10_1, json_20_1] +} + +fn run_and_drain( + ddls: &[String], + source_input: ForeignSourceInput, + test_source: ForeignSource, + test_sink: &ForeignSink, +) -> Vec { + let _pipeline = apply_ddls(ddls, spring_config_default()); + test_source.start(source_input); + let mut sink_received = drain_from_sink(test_sink); + sink_received.sort_by_key(|r| { + let ts = &r["ts"]; + ts.as_str().unwrap().to_string() + }); + sink_received +} + +/// See: +#[test] +fn test_feat_aggregation_without_group_by() -> Result<()> { + setup_test_logger(); + + let source_input = gen_source_input(); + + let test_source = ForeignSource::new().unwrap(); + let test_sink = ForeignSink::start().unwrap(); + + let ddls = vec![ + " + CREATE SOURCE STREAM source_trade ( + ts TIMESTAMP NOT NULL ROWTIME, + ticker TEXT NOT NULL, + amount INTEGER NOT NULL + ); + " + .to_string(), + " + CREATE SINK STREAM sink_avg_all ( + ts TIMESTAMP NOT NULL ROWTIME, + amount FLOAT NOT NULL + ); + " + .to_string(), + " + CREATE PUMP avg_all AS + INSERT INTO sink_avg_all (ts, avg_amount) + SELECT STREAM + FLOOR_TIME(source_trade.ts, DURATION_SECS(10)) AS aggr_ts, + AVG(source_trade.amount) AS avg_amount + FROM source_trade + FIXED WINDOW DURATION_SECS(10), DURATION_SECS(0); + " + .to_string(), + format!( + " + CREATE SINK WRITER tcp_sink_trade FOR sink_avg_all + TYPE NET_CLIENT OPTIONS ( + PROTOCOL 'TCP', + REMOTE_HOST '{remote_host}', + REMOTE_PORT '{remote_port}' + ); + ", + remote_host = test_sink.host_ip(), + remote_port = test_sink.port() + ), + format!( + " + CREATE SOURCE READER tcp_trade FOR source_trade + TYPE NET_CLIENT OPTIONS ( + PROTOCOL 'TCP', + REMOTE_HOST '{remote_host}', + REMOTE_PORT '{remote_port}' + ); + ", + remote_host = test_source.host_ip(), + remote_port = test_source.port() + ), + ]; + + let sink_received = run_and_drain( + &ddls, + ForeignSourceInput::new_fifo_batch(source_input), + test_source, + &test_sink, + ); + + assert_eq!(sink_received.len(), 2); + + assert_eq!( + sink_received[0]["ts"].as_str().unwrap(), + "2020-01-01 00:00:00.000000000" + ); + assert_eq!( + sink_received[0]["amount"].as_f64().unwrap().round() as i32, + 20, + ); + + assert_eq!( + sink_received[1]["ts"].as_str().unwrap(), + "2020-01-01 00:00:10.000000000" + ); + assert_eq!( + sink_received[1]["amount"].as_f64().unwrap().round() as i32, + 50, + ); + + Ok(()) +} From f0b0c33203db839ce7c74773f56cbf06a48ebfb7 Mon Sep 17 00:00:00 2001 From: Sho Nakatani Date: Tue, 10 May 2022 14:21:02 +0900 Subject: [PATCH 2/4] refactor: rename GroupAggregateParameter -> AggregateParameter --- .../src/pipeline/pump_model/window_operation_parameter.rs | 4 ++-- .../pump_model/window_operation_parameter/aggregate.rs | 2 +- springql-core/src/sql_processor/query_planner.rs | 6 +++--- .../autonomous_executor/task/window/aggregate.rs | 6 +++--- .../stream_engine/autonomous_executor/task/window/panes.rs | 4 ++-- .../task/window/panes/pane/aggregate_pane.rs | 4 ++-- 6 files changed, 13 insertions(+), 13 deletions(-) diff --git a/springql-core/src/pipeline/pump_model/window_operation_parameter.rs b/springql-core/src/pipeline/pump_model/window_operation_parameter.rs index ae93da38..1fe6c144 100644 --- a/springql-core/src/pipeline/pump_model/window_operation_parameter.rs +++ b/springql-core/src/pipeline/pump_model/window_operation_parameter.rs @@ -3,11 +3,11 @@ pub(crate) mod aggregate; pub(crate) mod join_parameter; -use self::{aggregate::GroupAggregateParameter, join_parameter::JoinParameter}; +use self::{aggregate::AggregateParameter, join_parameter::JoinParameter}; /// Window operation parameters #[derive(Clone, PartialEq, Debug)] pub(crate) enum WindowOperationParameter { - GroupAggregation(GroupAggregateParameter), + GroupAggregation(AggregateParameter), Join(JoinParameter), } diff --git a/springql-core/src/pipeline/pump_model/window_operation_parameter/aggregate.rs b/springql-core/src/pipeline/pump_model/window_operation_parameter/aggregate.rs index a7796bc5..2f29d54c 100644 --- a/springql-core/src/pipeline/pump_model/window_operation_parameter/aggregate.rs +++ b/springql-core/src/pipeline/pump_model/window_operation_parameter/aggregate.rs @@ -11,7 +11,7 @@ use crate::expr_resolver::expr_label::{AggrExprLabel, ValueExprLabel}; /// SLIDING WINDOW ...; /// ``` #[derive(Copy, Clone, PartialEq, Debug, new)] -pub(crate) struct GroupAggregateParameter { +pub(crate) struct AggregateParameter { // TODO multiple aggr_expr pub(crate) aggr_func: AggregateFunctionParameter, pub(crate) aggr_expr: AggrExprLabel, diff --git a/springql-core/src/sql_processor/query_planner.rs b/springql-core/src/sql_processor/query_planner.rs index 3d3ac683..1acdc519 100644 --- a/springql-core/src/sql_processor/query_planner.rs +++ b/springql-core/src/sql_processor/query_planner.rs @@ -63,7 +63,7 @@ use crate::{ pipeline::{ pump_model::{ window_operation_parameter::{ - aggregate::GroupAggregateParameter, WindowOperationParameter, + aggregate::AggregateParameter, WindowOperationParameter, }, window_parameter::WindowParameter, }, @@ -138,7 +138,7 @@ impl QueryPlanner { &self, expr_resolver: &mut ExprResolver, projection_op: &ProjectionOp, - ) -> Result> { + ) -> Result> { let opt_grouping_elem = self.analyzer.grouping_element(); let aggr_labels = &projection_op.aggr_expr_labels; @@ -156,7 +156,7 @@ impl QueryPlanner { } }; - Ok(Some(GroupAggregateParameter::new( + Ok(Some(AggregateParameter::new( aggr_func, *aggr_label, group_by_label, diff --git a/springql-core/src/stream_engine/autonomous_executor/task/window/aggregate.rs b/springql-core/src/stream_engine/autonomous_executor/task/window/aggregate.rs index 2274611b..7f93d9a1 100644 --- a/springql-core/src/stream_engine/autonomous_executor/task/window/aggregate.rs +++ b/springql-core/src/stream_engine/autonomous_executor/task/window/aggregate.rs @@ -112,7 +112,7 @@ mod tests { pipeline::{ name::{AggrAlias, ColumnName, StreamName}, pump_model::window_operation_parameter::aggregate::{ - AggregateFunctionParameter, GroupAggregateParameter, + AggregateFunctionParameter, AggregateParameter, }, }, sql_processor::sql_parser::syntax::SelectFieldSyntax, @@ -185,7 +185,7 @@ mod tests { period: SpringEventDuration::from_secs(5), allowed_delay: SpringEventDuration::from_secs(1), }, - WindowOperationParameter::GroupAggregation(GroupAggregateParameter { + WindowOperationParameter::GroupAggregation(AggregateParameter { aggr_func: AggregateFunctionParameter::Avg, aggr_expr: aggr_labels_select_list[0], group_by: group_by_label, @@ -392,7 +392,7 @@ mod tests { length: SpringEventDuration::from_secs(10), allowed_delay: SpringEventDuration::from_secs(1), }, - WindowOperationParameter::GroupAggregation(GroupAggregateParameter { + WindowOperationParameter::GroupAggregation(AggregateParameter { aggr_func: AggregateFunctionParameter::Avg, aggr_expr: aggr_labels_select_list[0], group_by: group_by_label, diff --git a/springql-core/src/stream_engine/autonomous_executor/task/window/panes.rs b/springql-core/src/stream_engine/autonomous_executor/task/window/panes.rs index 0f50646c..59868333 100644 --- a/springql-core/src/stream_engine/autonomous_executor/task/window/panes.rs +++ b/springql-core/src/stream_engine/autonomous_executor/task/window/panes.rs @@ -140,7 +140,7 @@ mod tests { expr_resolver::ExprResolver, expression::{AggrExpr, ValueExpr}, pipeline::pump_model::window_operation_parameter::aggregate::{ - AggregateFunctionParameter, GroupAggregateParameter, + AggregateFunctionParameter, AggregateParameter, }, sql_processor::sql_parser::syntax::SelectFieldSyntax, stream_engine::{ @@ -167,7 +167,7 @@ mod tests { let group_by_label = expr_resolver.register_value_expr(group_by_expr); - WindowOperationParameter::GroupAggregation(GroupAggregateParameter { + WindowOperationParameter::GroupAggregation(AggregateParameter { aggr_func: AggregateFunctionParameter::Avg, aggr_expr: aggr_labels_select_list[0], group_by: group_by_label, diff --git a/springql-core/src/stream_engine/autonomous_executor/task/window/panes/pane/aggregate_pane.rs b/springql-core/src/stream_engine/autonomous_executor/task/window/panes/pane/aggregate_pane.rs index cd49faab..9392cca8 100644 --- a/springql-core/src/stream_engine/autonomous_executor/task/window/panes/pane/aggregate_pane.rs +++ b/springql-core/src/stream_engine/autonomous_executor/task/window/panes/pane/aggregate_pane.rs @@ -9,7 +9,7 @@ use ordered_float::OrderedFloat; use crate::{ expr_resolver::ExprResolver, pipeline::pump_model::window_operation_parameter::{ - aggregate::{AggregateFunctionParameter, GroupAggregateParameter}, + aggregate::{AggregateFunctionParameter, AggregateParameter}, WindowOperationParameter, }, stream_engine::{ @@ -31,7 +31,7 @@ pub(in crate::stream_engine::autonomous_executor) struct AggrPane { open_at: SpringTimestamp, close_at: SpringTimestamp, - group_aggregation_parameter: GroupAggregateParameter, + group_aggregation_parameter: AggregateParameter, inner: AggrPaneInner, } From 96c1d9945b5068bd91eb72b32c88812627f80a11 Mon Sep 17 00:00:00 2001 From: Sho Nakatani Date: Tue, 10 May 2022 18:27:04 +0900 Subject: [PATCH 3/4] feat(fix): GROUP BY can take 2 or more expressions --- .../pump_model/window_operation_parameter.rs | 2 +- .../window_operation_parameter/aggregate.rs | 19 +++- .../src/sql_processor/query_planner.rs | 36 ++++---- .../select_syntax_analyzer/group_aggregate.rs | 5 +- .../sql_parser/pest_grammar/springql.pest | 6 +- .../sql_parser/pest_parser_impl.rs | 29 ++++-- .../src/sql_processor/sql_parser/syntax.rs | 5 +- .../pump_task/pump_subtask/query_subtask.rs | 7 +- .../query_subtask/aggr_projection_subtask.rs | 15 +-- .../task/window/aggregate.rs | 91 +++++++++---------- .../autonomous_executor/task/window/panes.rs | 46 +++++++--- .../task/window/panes/pane/aggregate_pane.rs | 82 ++++++++++++----- .../query_plan/query_plan_operation.rs | 19 +++- 13 files changed, 231 insertions(+), 131 deletions(-) diff --git a/springql-core/src/pipeline/pump_model/window_operation_parameter.rs b/springql-core/src/pipeline/pump_model/window_operation_parameter.rs index 1fe6c144..1369e482 100644 --- a/springql-core/src/pipeline/pump_model/window_operation_parameter.rs +++ b/springql-core/src/pipeline/pump_model/window_operation_parameter.rs @@ -8,6 +8,6 @@ use self::{aggregate::AggregateParameter, join_parameter::JoinParameter}; /// Window operation parameters #[derive(Clone, PartialEq, Debug)] pub(crate) enum WindowOperationParameter { - GroupAggregation(AggregateParameter), + Aggregate(AggregateParameter), Join(JoinParameter), } diff --git a/springql-core/src/pipeline/pump_model/window_operation_parameter/aggregate.rs b/springql-core/src/pipeline/pump_model/window_operation_parameter/aggregate.rs index 2f29d54c..5c67519e 100644 --- a/springql-core/src/pipeline/pump_model/window_operation_parameter/aggregate.rs +++ b/springql-core/src/pipeline/pump_model/window_operation_parameter/aggregate.rs @@ -2,21 +2,32 @@ use crate::expr_resolver::expr_label::{AggrExprLabel, ValueExprLabel}; +/// [GROUP BY c1, c2, c3...] +#[derive(Clone, PartialEq, Debug, Default, new)] +pub(crate) struct GroupByLabels( + /// Empty when GROUP BY clause is not supplied. + Vec, +); +impl GroupByLabels { + pub(crate) fn as_labels(&self) -> &[ValueExprLabel] { + &self.0 + } +} + /// TODO [support complex expression with aggregations](https://gh01.base.toyota-tokyo.tech/SpringQL-internal/SpringQL/issues/152) /// /// ```sql /// SELECT group_by, aggr_expr.func(aggr_expr.aggregated) /// FROM s -/// GROUP BY group_by +/// [GROUP BY group_by] /// SLIDING WINDOW ...; /// ``` -#[derive(Copy, Clone, PartialEq, Debug, new)] +#[derive(Clone, PartialEq, Debug, new)] pub(crate) struct AggregateParameter { // TODO multiple aggr_expr pub(crate) aggr_func: AggregateFunctionParameter, pub(crate) aggr_expr: AggrExprLabel, - - pub(crate) group_by: ValueExprLabel, // TODO multiple group by expression + pub(crate) group_by: GroupByLabels, } #[derive(Copy, Clone, Eq, PartialEq, Debug)] diff --git a/springql-core/src/sql_processor/query_planner.rs b/springql-core/src/sql_processor/query_planner.rs index 1acdc519..6f313ac4 100644 --- a/springql-core/src/sql_processor/query_planner.rs +++ b/springql-core/src/sql_processor/query_planner.rs @@ -63,7 +63,8 @@ use crate::{ pipeline::{ pump_model::{ window_operation_parameter::{ - aggregate::AggregateParameter, WindowOperationParameter, + aggregate::{AggregateParameter, GroupByLabels}, + WindowOperationParameter, }, window_parameter::WindowParameter, }, @@ -124,7 +125,7 @@ impl QueryPlanner { match (window_param, group_aggr_param) { (Some(window_param), Some(group_aggr_param)) => Ok(Some(GroupAggregateWindowOp { window_param, - op_param: WindowOperationParameter::GroupAggregation(group_aggr_param), + op_param: WindowOperationParameter::Aggregate(group_aggr_param), })), _ => Ok(None), } @@ -139,31 +140,34 @@ impl QueryPlanner { expr_resolver: &mut ExprResolver, projection_op: &ProjectionOp, ) -> Result> { - let opt_grouping_elem = self.analyzer.grouping_element(); + let grouping_elements = self.analyzer.grouping_elements(); let aggr_labels = &projection_op.aggr_expr_labels; - match (opt_grouping_elem, aggr_labels.len()) { - (Some(grouping_elem), 1) => { + match aggr_labels.len() { + 1 => { let aggr_label = aggr_labels.iter().next().expect("len checked"); let aggr_func = expr_resolver.resolve_aggr_expr(*aggr_label).func; - let group_by_label = match grouping_elem { - GroupingElementSyntax::ValueExpr(expr) => { - expr_resolver.register_value_expr(expr) - } - GroupingElementSyntax::ValueAlias(alias) => { - expr_resolver.resolve_value_alias(alias)? - } - }; + let group_by_labels = grouping_elements + .iter() + .map(|grouping_elem| match grouping_elem { + GroupingElementSyntax::ValueExpr(expr) => { + Ok(expr_resolver.register_value_expr(expr.clone())) + } + GroupingElementSyntax::ValueAlias(alias) => { + expr_resolver.resolve_value_alias(alias.clone()) + } + }) + .collect::>>()?; Ok(Some(AggregateParameter::new( aggr_func, *aggr_label, - group_by_label, + GroupByLabels::new(group_by_labels), ))) } - (None, 0) => Ok(None), - _ => unimplemented!(), + 0 => Ok(None), + _ => unimplemented!("2 or more aggregate expressions"), } } diff --git a/springql-core/src/sql_processor/query_planner/select_syntax_analyzer/group_aggregate.rs b/springql-core/src/sql_processor/query_planner/select_syntax_analyzer/group_aggregate.rs index 3ddafb55..89e6d28e 100644 --- a/springql-core/src/sql_processor/query_planner/select_syntax_analyzer/group_aggregate.rs +++ b/springql-core/src/sql_processor/query_planner/select_syntax_analyzer/group_aggregate.rs @@ -4,8 +4,7 @@ use super::SelectSyntaxAnalyzer; use crate::sql_processor::sql_parser::syntax::GroupingElementSyntax; impl SelectSyntaxAnalyzer { - /// TODO multiple GROUP BY - pub(in super::super) fn grouping_element(&self) -> Option { - self.select_syntax.grouping_element.clone() + pub(in super::super) fn grouping_elements(&self) -> Vec { + self.select_syntax.grouping_elements.clone() } } diff --git a/springql-core/src/sql_processor/sql_parser/pest_grammar/springql.pest b/springql-core/src/sql_processor/sql_parser/pest_grammar/springql.pest index 5832ab67..8c5c7f46 100644 --- a/springql-core/src/sql_processor/sql_parser/pest_grammar/springql.pest +++ b/springql-core/src/sql_processor/sql_parser/pest_grammar/springql.pest @@ -479,7 +479,7 @@ select_stream_command = { ^"SELECT" ~ "STREAM" ~ select_field ~ ("," ~ select_field)* ~ (^"FROM" ~ from_item) - ~ (^"GROUP" ~ "BY" ~ grouping_element)? // TODO multiple grouping elements + ~ group_by_clause? ~ window_clause? } @@ -500,6 +500,10 @@ join_type = { ^"LEFT" ~ ^"OUTER" ~ ^"JOIN" } +group_by_clause = { + ^"GROUP" ~ "BY" ~ grouping_element ~ ("," ~ grouping_element)* +} + grouping_element = { value_expr | value_alias diff --git a/springql-core/src/sql_processor/sql_parser/pest_parser_impl.rs b/springql-core/src/sql_processor/sql_parser/pest_parser_impl.rs index 93a4861a..448d3b25 100644 --- a/springql-core/src/sql_processor/sql_parser/pest_parser_impl.rs +++ b/springql-core/src/sql_processor/sql_parser/pest_parser_impl.rs @@ -210,12 +210,12 @@ impl PestParserImpl { )?; let event_duration = match duration_function { - DurationFunction::Millis => { - Ok(SpringEventDuration::from_millis(integer_constant.to_i64()? as u64)) - } - DurationFunction::Secs => { - Ok(SpringEventDuration::from_secs(integer_constant.to_i64()? as u64)) - } + DurationFunction::Millis => Ok(SpringEventDuration::from_millis( + integer_constant.to_i64()? as u64, + )), + DurationFunction::Secs => Ok(SpringEventDuration::from_secs( + integer_constant.to_i64()? as u64, + )), }?; Ok(SqlValue::NotNull(NnSqlValue::Duration(event_duration))) @@ -549,10 +549,10 @@ impl PestParserImpl { Self::parse_from_item, identity, )?; - let grouping_element = try_parse_child( + let grouping_elements = try_parse_child( &mut params, - Rule::grouping_element, - Self::parse_grouping_element, + Rule::group_by_clause, + Self::parse_group_by_clause, identity, )?; let window_clause = try_parse_child( @@ -565,7 +565,7 @@ impl PestParserImpl { Ok(SelectStreamSyntax { fields, from_item, - grouping_element, + grouping_elements: grouping_elements.unwrap_or_default(), window_clause, }) } @@ -680,6 +680,15 @@ impl PestParserImpl { } } + fn parse_group_by_clause(mut params: FnParseParams) -> Result> { + parse_child_seq( + &mut params, + Rule::grouping_element, + &Self::parse_grouping_element, + &identity, + ) + } + fn parse_grouping_element(mut params: FnParseParams) -> Result { try_parse_child( &mut params, diff --git a/springql-core/src/sql_processor/sql_parser/syntax.rs b/springql-core/src/sql_processor/sql_parser/syntax.rs index b8fa7469..fdb7c558 100644 --- a/springql-core/src/sql_processor/sql_parser/syntax.rs +++ b/springql-core/src/sql_processor/sql_parser/syntax.rs @@ -26,7 +26,10 @@ pub(in crate::sql_processor) struct OptionSyntax { pub(in crate::sql_processor) struct SelectStreamSyntax { pub(in crate::sql_processor) fields: Vec, pub(in crate::sql_processor) from_item: FromItemSyntax, - pub(in crate::sql_processor) grouping_element: Option, + + /// Empty when no GROUP BY clause is supplied. + pub(in crate::sql_processor) grouping_elements: Vec, + pub(in crate::sql_processor) window_clause: Option, } diff --git a/springql-core/src/stream_engine/autonomous_executor/task/pump_task/pump_subtask/query_subtask.rs b/springql-core/src/stream_engine/autonomous_executor/task/pump_task/pump_subtask/query_subtask.rs index 3b40a494..720e3d05 100644 --- a/springql-core/src/stream_engine/autonomous_executor/task/pump_task/pump_subtask/query_subtask.rs +++ b/springql-core/src/stream_engine/autonomous_executor/task/pump_task/pump_subtask/query_subtask.rs @@ -145,14 +145,9 @@ impl QuerySubtask { 1, "currently only 1 aggregate in select_list is supported" ); - assert_eq!( - plan.upper_ops.projection.value_expr_labels.len(), - 1, - "currently only GROUP BY expression in select_list is supported" - ); let aggr_projection_subtask = AggrProjectionSubtask::new( - plan.upper_ops.projection.value_expr_labels[0], + plan.upper_ops.group_by_labels(), plan.upper_ops.projection.aggr_expr_labels[0], ); diff --git a/springql-core/src/stream_engine/autonomous_executor/task/pump_task/pump_subtask/query_subtask/aggr_projection_subtask.rs b/springql-core/src/stream_engine/autonomous_executor/task/pump_task/pump_subtask/query_subtask/aggr_projection_subtask.rs index d7de9cfe..977500f4 100644 --- a/springql-core/src/stream_engine/autonomous_executor/task/pump_task/pump_subtask/query_subtask/aggr_projection_subtask.rs +++ b/springql-core/src/stream_engine/autonomous_executor/task/pump_task/pump_subtask/query_subtask/aggr_projection_subtask.rs @@ -1,14 +1,15 @@ // This file is part of https://github.com/SpringQL/SpringQL which is licensed under MIT OR Apache-2.0. See file LICENSE-MIT or LICENSE-APACHE for full license details. use crate::error::Result; -use crate::expr_resolver::expr_label::{AggrExprLabel, ValueExprLabel}; +use crate::expr_resolver::expr_label::AggrExprLabel; +use crate::pipeline::pump_model::window_operation_parameter::aggregate::GroupByLabels; use crate::stream_engine::autonomous_executor::task::window::aggregate::GroupAggrOut; use super::SqlValues; #[derive(Debug, new)] pub(in crate::stream_engine::autonomous_executor) struct AggrProjectionSubtask { - group_by_expr: ValueExprLabel, + group_by_labels: GroupByLabels, aggr_expr: AggrExprLabel, } @@ -18,11 +19,13 @@ impl AggrProjectionSubtask { group_aggr_out: GroupAggrOut, ) -> Result { let aggr_label = self.aggr_expr; - let group_by_label = self.group_by_expr; + let group_by_labels = &self.group_by_labels; - let (aggr_result, group_by_result) = - group_aggr_out.into_results(aggr_label, group_by_label)?; + let (aggr_result, group_by_values) = + group_aggr_out.into_results(aggr_label, group_by_labels.clone())?; - Ok(SqlValues::new(vec![group_by_result, aggr_result])) // FIXME keep select list order: + let mut values = group_by_values.into_sql_values(); + values.push(aggr_result); + Ok(SqlValues::new(values)) // FIXME keep select list order: } } diff --git a/springql-core/src/stream_engine/autonomous_executor/task/window/aggregate.rs b/springql-core/src/stream_engine/autonomous_executor/task/window/aggregate.rs index 7f93d9a1..7a8549ec 100644 --- a/springql-core/src/stream_engine/autonomous_executor/task/window/aggregate.rs +++ b/springql-core/src/stream_engine/autonomous_executor/task/window/aggregate.rs @@ -2,9 +2,10 @@ use crate::{ error::{Result, SpringError}, - expr_resolver::expr_label::{AggrExprLabel, ValueExprLabel}, + expr_resolver::expr_label::AggrExprLabel, pipeline::pump_model::{ - window_operation_parameter::WindowOperationParameter, window_parameter::WindowParameter, + window_operation_parameter::{aggregate::GroupByLabels, WindowOperationParameter}, + window_parameter::WindowParameter, }, stream_engine::SqlValue, }; @@ -12,23 +13,27 @@ use crate::{ use anyhow::anyhow; use super::{ - panes::{pane::aggregate_pane::AggrPane, Panes}, + panes::{ + pane::aggregate_pane::{AggrPane, GroupByValues}, + Panes, + }, watermark::Watermark, Window, }; +/// Cache from (aggregation label and group by labels) into (aggregation value and group by values). #[derive(Clone, PartialEq, Debug, new)] pub(in crate::stream_engine::autonomous_executor) struct GroupAggrOut { aggr_label: AggrExprLabel, aggr_result: SqlValue, - group_by_label: ValueExprLabel, - group_by_result: SqlValue, + group_by_labels: GroupByLabels, + group_by_values: GroupByValues, } impl GroupAggrOut { /// # Returns /// - /// (aggregate result, group by result) + /// (aggregate result, group by values) /// /// # Failures /// @@ -37,20 +42,20 @@ impl GroupAggrOut { pub(in crate::stream_engine::autonomous_executor) fn into_results( self, aggr_label: AggrExprLabel, - group_by_label: ValueExprLabel, - ) -> Result<(SqlValue, SqlValue)> { + group_by_labels: GroupByLabels, + ) -> Result<(SqlValue, GroupByValues)> { if self.aggr_label != aggr_label { Err(SpringError::Sql(anyhow!( "aggregate labeled {:?} is not calculated", aggr_label ))) - } else if self.group_by_label != group_by_label { + } else if self.group_by_labels != group_by_labels { Err(SpringError::Sql(anyhow!( "GROUP BY expression {:?} is not calculated", - group_by_label + group_by_labels ))) } else { - Ok((self.aggr_result, self.group_by_result)) + Ok((self.aggr_result, self.group_by_values)) } } } @@ -112,7 +117,7 @@ mod tests { pipeline::{ name::{AggrAlias, ColumnName, StreamName}, pump_model::window_operation_parameter::aggregate::{ - AggregateFunctionParameter, AggregateParameter, + AggregateFunctionParameter, AggregateParameter, GroupByLabels, }, }, sql_processor::sql_parser::syntax::SelectFieldSyntax, @@ -126,11 +131,8 @@ mod tests { }; fn t_expect(group_aggr_out: GroupAggrOut, expected_ticker: &str, expected_avg_amount: i16) { - let ticker = group_aggr_out.group_by_result.unwrap(); - assert_eq!( - ticker.unpack::().unwrap(), - expected_ticker.to_string() - ); + let ticker = group_aggr_out_to_first_string(&group_aggr_out); + assert_eq!(ticker, expected_ticker.to_string()); let avg_amount = group_aggr_out.aggr_result.unwrap(); assert_eq!( @@ -139,6 +141,20 @@ mod tests { ); } + fn group_aggr_out_to_first_string(group_aggr_out: &GroupAggrOut) -> String { + if let SqlValue::NotNull(ticker) = group_aggr_out + .group_by_values + .clone() + .into_sql_values() + .first() + .unwrap() + { + ticker.unpack::().unwrap() + } else { + unreachable!() + } + } + #[test] fn test_timed_sliding_window_aggregation() { setup_test_logger(); @@ -177,7 +193,8 @@ mod tests { StreamName::fx_trade().as_ref(), ColumnName::fx_ticker().as_ref(), ); - let group_by_label = expr_resolver.register_value_expr(group_by_expr); + let group_by_labels = + GroupByLabels::new(vec![expr_resolver.register_value_expr(group_by_expr)]); let mut window = AggrWindow::new( WindowParameter::TimedSlidingWindow { @@ -185,10 +202,10 @@ mod tests { period: SpringEventDuration::from_secs(5), allowed_delay: SpringEventDuration::from_secs(1), }, - WindowOperationParameter::GroupAggregation(AggregateParameter { + WindowOperationParameter::Aggregate(AggregateParameter { aggr_func: AggregateFunctionParameter::Avg, aggr_expr: aggr_labels_select_list[0], - group_by: group_by_label, + group_by: group_by_labels, }), ); @@ -236,14 +253,7 @@ mod tests { (), ); assert_eq!(out.len(), 2); - out.sort_by_key(|group_aggr_out| { - group_aggr_out - .group_by_result - .clone() - .unwrap() - .unpack::() - .unwrap() - }); + out.sort_by_key(group_aggr_out_to_first_string); t_expect(out.get(0).cloned().unwrap(), "GOOGL", 100); t_expect(out.get(1).cloned().unwrap(), "ORCL", 100); assert_eq!(window_in_flow.window_gain_bytes_states, 0); @@ -313,14 +323,7 @@ mod tests { (), ); assert_eq!(out.len(), 2); - out.sort_by_key(|group_aggr_out| { - group_aggr_out - .group_by_result - .clone() - .unwrap() - .unpack::() - .unwrap() - }); + out.sort_by_key(group_aggr_out_to_first_string); t_expect(out.get(0).cloned().unwrap(), "GOOGL", 100); t_expect(out.get(1).cloned().unwrap(), "ORCL", 200); assert_eq!(window_in_flow.window_gain_bytes_states, 0); @@ -385,17 +388,18 @@ mod tests { StreamName::fx_trade().as_ref(), ColumnName::fx_ticker().as_ref(), ); - let group_by_label = expr_resolver.register_value_expr(group_by_expr); + let group_by_labels = + GroupByLabels::new(vec![expr_resolver.register_value_expr(group_by_expr)]); let mut window = AggrWindow::new( WindowParameter::TimedFixedWindow { length: SpringEventDuration::from_secs(10), allowed_delay: SpringEventDuration::from_secs(1), }, - WindowOperationParameter::GroupAggregation(AggregateParameter { + WindowOperationParameter::Aggregate(AggregateParameter { aggr_func: AggregateFunctionParameter::Avg, aggr_expr: aggr_labels_select_list[0], - group_by: group_by_label, + group_by: group_by_labels, }), ); @@ -501,14 +505,7 @@ mod tests { (), ); assert_eq!(out.len(), 2); - out.sort_by_key(|group_aggr_out| { - group_aggr_out - .group_by_result - .clone() - .unwrap() - .unpack::() - .unwrap() - }); + out.sort_by_key(group_aggr_out_to_first_string); t_expect(out.get(0).cloned().unwrap(), "GOOGL", 100); t_expect(out.get(1).cloned().unwrap(), "ORCL", 200); assert_eq!(window_in_flow.window_gain_bytes_states, 0); diff --git a/springql-core/src/stream_engine/autonomous_executor/task/window/panes.rs b/springql-core/src/stream_engine/autonomous_executor/task/window/panes.rs index 59868333..f6ff6bbe 100644 --- a/springql-core/src/stream_engine/autonomous_executor/task/window/panes.rs +++ b/springql-core/src/stream_engine/autonomous_executor/task/window/panes.rs @@ -45,7 +45,10 @@ where /// Then, return all panes to get a tuple with the `rowtime`. /// /// Caller must assure rowtime is not smaller than watermark. - pub(super) fn panes_to_dispatch(&mut self, rowtime: SpringTimestamp) -> impl Iterator { + pub(super) fn panes_to_dispatch( + &mut self, + rowtime: SpringTimestamp, + ) -> impl Iterator { self.generate_panes_if_not_exist(rowtime); self.panes @@ -140,7 +143,7 @@ mod tests { expr_resolver::ExprResolver, expression::{AggrExpr, ValueExpr}, pipeline::pump_model::window_operation_parameter::aggregate::{ - AggregateFunctionParameter, AggregateParameter, + AggregateFunctionParameter, AggregateParameter, GroupByLabels, }, sql_processor::sql_parser::syntax::SelectFieldSyntax, stream_engine::{ @@ -165,18 +168,22 @@ mod tests { }]; let (mut expr_resolver, _, aggr_labels_select_list) = ExprResolver::new(select_list); - let group_by_label = expr_resolver.register_value_expr(group_by_expr); + let group_by_labels = + GroupByLabels::new(vec![expr_resolver.register_value_expr(group_by_expr)]); - WindowOperationParameter::GroupAggregation(AggregateParameter { + WindowOperationParameter::Aggregate(AggregateParameter { aggr_func: AggregateFunctionParameter::Avg, aggr_expr: aggr_labels_select_list[0], - group_by: group_by_label, + group_by: group_by_labels, }) } #[test] fn test_valid_open_at_s() { - fn sliding_window_panes(length: SpringEventDuration, period: SpringEventDuration) -> Panes { + fn sliding_window_panes( + length: SpringEventDuration, + period: SpringEventDuration, + ) -> Panes { Panes::new( WindowParameter::TimedSlidingWindow { length, @@ -187,30 +194,43 @@ mod tests { ) } - let panes = sliding_window_panes(SpringEventDuration::from_secs(10), SpringEventDuration::from_secs(5)); + let panes = sliding_window_panes( + SpringEventDuration::from_secs(10), + SpringEventDuration::from_secs(5), + ); assert_eq!( - panes.valid_open_at_s(SpringTimestamp::from_str("2020-01-01 00:00:05.000000000").unwrap()), + panes.valid_open_at_s( + SpringTimestamp::from_str("2020-01-01 00:00:05.000000000").unwrap() + ), vec![ SpringTimestamp::from_str("2020-01-01 00:00:00.000000000").unwrap(), SpringTimestamp::from_str("2020-01-01 00:00:05.000000000").unwrap() ] ); assert_eq!( - panes.valid_open_at_s(SpringTimestamp::from_str("2020-01-01 00:00:09.999999999").unwrap()), + panes.valid_open_at_s( + SpringTimestamp::from_str("2020-01-01 00:00:09.999999999").unwrap() + ), vec![ SpringTimestamp::from_str("2020-01-01 00:00:00.000000000").unwrap(), SpringTimestamp::from_str("2020-01-01 00:00:05.000000000").unwrap() ] ); - let panes = - sliding_window_panes(SpringEventDuration::from_secs(10), SpringEventDuration::from_secs(10)); + let panes = sliding_window_panes( + SpringEventDuration::from_secs(10), + SpringEventDuration::from_secs(10), + ); assert_eq!( - panes.valid_open_at_s(SpringTimestamp::from_str("2020-01-01 00:00:00.000000000").unwrap()), + panes.valid_open_at_s( + SpringTimestamp::from_str("2020-01-01 00:00:00.000000000").unwrap() + ), vec![SpringTimestamp::from_str("2020-01-01 00:00:00.000000000").unwrap(),] ); assert_eq!( - panes.valid_open_at_s(SpringTimestamp::from_str("2020-01-01 00:00:09.999999999").unwrap()), + panes.valid_open_at_s( + SpringTimestamp::from_str("2020-01-01 00:00:09.999999999").unwrap() + ), vec![SpringTimestamp::from_str("2020-01-01 00:00:00.000000000").unwrap(),] ); } diff --git a/springql-core/src/stream_engine/autonomous_executor/task/window/panes/pane/aggregate_pane.rs b/springql-core/src/stream_engine/autonomous_executor/task/window/panes/pane/aggregate_pane.rs index 9392cca8..e80f69e6 100644 --- a/springql-core/src/stream_engine/autonomous_executor/task/window/panes/pane/aggregate_pane.rs +++ b/springql-core/src/stream_engine/autonomous_executor/task/window/panes/pane/aggregate_pane.rs @@ -7,9 +7,10 @@ use std::collections::HashMap; use ordered_float::OrderedFloat; use crate::{ + error::Result, expr_resolver::ExprResolver, pipeline::pump_model::window_operation_parameter::{ - aggregate::{AggregateFunctionParameter, AggregateParameter}, + aggregate::{AggregateFunctionParameter, AggregateParameter, GroupByLabels}, WindowOperationParameter, }, stream_engine::{ @@ -31,7 +32,7 @@ pub(in crate::stream_engine::autonomous_executor) struct AggrPane { open_at: SpringTimestamp, close_at: SpringTimestamp, - group_aggregation_parameter: AggregateParameter, + aggregate_parameter: AggregateParameter, inner: AggrPaneInner, } @@ -43,9 +44,13 @@ impl Pane for AggrPane { /// # Panics /// /// if `op_param` is not `GroupAggregateParameter` - fn new(open_at: SpringTimestamp, close_at: SpringTimestamp, op_param: WindowOperationParameter) -> Self { - if let WindowOperationParameter::GroupAggregation(group_aggregation_parameter) = op_param { - let inner = match group_aggregation_parameter.aggr_func { + fn new( + open_at: SpringTimestamp, + close_at: SpringTimestamp, + op_param: WindowOperationParameter, + ) -> Self { + if let WindowOperationParameter::Aggregate(aggregate_parameter) = op_param { + let inner = match aggregate_parameter.aggr_func { AggregateFunctionParameter::Avg => AggrPaneInner::Avg { states: HashMap::new(), }, @@ -54,7 +59,7 @@ impl Pane for AggrPane { Self { open_at, close_at, - group_aggregation_parameter, + aggregate_parameter, inner, } } else { @@ -76,17 +81,15 @@ impl Pane for AggrPane { tuple: &Tuple, _arg: (), ) -> WindowInFlowByWindowTask { - let group_by_value = expr_resolver - .eval_value_expr(self.group_aggregation_parameter.group_by, tuple) - .expect("TODO Result"); - let group_by_value = if let SqlValue::NotNull(v) = group_by_value { - v - } else { - unimplemented!("group by NULL is not supported ") - }; + let group_by_values = GroupByValues::from_group_by_labels( + self.aggregate_parameter.group_by.clone(), + expr_resolver, + tuple, + ) + .expect("TODO handle Result"); let aggregated_value = expr_resolver - .eval_aggr_expr_inner(self.group_aggregation_parameter.aggr_expr, tuple) + .eval_aggr_expr_inner(self.aggregate_parameter.aggr_expr, tuple) .expect("TODO Result"); let aggregated_value = if let SqlValue::NotNull(v) = aggregated_value { v @@ -97,7 +100,7 @@ impl Pane for AggrPane { match &mut self.inner { AggrPaneInner::Avg { states } => { let state = states - .entry(group_by_value) + .entry(group_by_values) .or_insert_with(AvgState::default); state.next( @@ -115,21 +118,21 @@ impl Pane for AggrPane { self, _expr_resolver: &ExprResolver, ) -> (Vec, WindowInFlowByWindowTask) { - let aggr_label = self.group_aggregation_parameter.aggr_expr; - let group_by_label = self.group_aggregation_parameter.group_by; + let aggr_label = self.aggregate_parameter.aggr_expr; + let group_by_labels = self.aggregate_parameter.group_by; match self.inner { AggrPaneInner::Avg { states } => { let group_aggr_out_seq = states .into_iter() - .map(|(group_by, state)| { + .map(|(group_by_values, state)| { let aggr_value = SqlValue::NotNull(NnSqlValue::Float(OrderedFloat(state.finalize()))); GroupAggrOut::new( aggr_label, aggr_value, - group_by_label, - SqlValue::NotNull(group_by), + group_by_labels.clone(), + group_by_values, ) }) .collect(); @@ -143,6 +146,41 @@ impl Pane for AggrPane { #[derive(Debug)] pub(in crate::stream_engine::autonomous_executor) enum AggrPaneInner { Avg { - states: HashMap, + states: HashMap, }, } + +#[derive(Clone, PartialEq, Eq, Hash, Debug)] +pub(in crate::stream_engine::autonomous_executor) struct GroupByValues( + /// TODO support NULL in GROUP BY elements + Vec, +); + +impl GroupByValues { + fn from_group_by_labels( + group_by_labels: GroupByLabels, + expr_resolver: &ExprResolver, + tuple: &Tuple, + ) -> Result { + let values = group_by_labels + .as_labels() + .iter() + .map(|group_by_label| { + let group_by_value = + expr_resolver.eval_value_expr(*group_by_label, tuple)?; + + if let SqlValue::NotNull(v) = group_by_value { + Ok(v) + } else { + unimplemented!("group by NULL is not supported ") + } + }) + .collect::>>()?; + + Ok(Self(values)) + } + + pub(in crate::stream_engine::autonomous_executor) fn into_sql_values(self) -> Vec { + self.0.into_iter().map(SqlValue::NotNull).collect() + } +} diff --git a/springql-core/src/stream_engine/command/query_plan/query_plan_operation.rs b/springql-core/src/stream_engine/command/query_plan/query_plan_operation.rs index a5c79ba5..38195d9f 100644 --- a/springql-core/src/stream_engine/command/query_plan/query_plan_operation.rs +++ b/springql-core/src/stream_engine/command/query_plan/query_plan_operation.rs @@ -5,7 +5,9 @@ use crate::{ pipeline::{ name::StreamName, pump_model::{ - window_operation_parameter::{join_parameter::JoinParameter, WindowOperationParameter}, + window_operation_parameter::{ + aggregate::GroupByLabels, join_parameter::JoinParameter, WindowOperationParameter, + }, window_parameter::WindowParameter, }, }, @@ -20,6 +22,21 @@ impl UpperOps { pub(crate) fn has_window(&self) -> bool { self.group_aggr_window.is_some() } + + pub(crate) fn group_by_labels(&self) -> GroupByLabels { + self.group_aggr_window + .as_ref() + .and_then(|group_aggr_window_op| { + if let WindowOperationParameter::Aggregate(aggregate_parameter) = + &group_aggr_window_op.op_param + { + Some(aggregate_parameter.group_by.clone()) + } else { + None + } + }) + .unwrap_or_default() + } } #[derive(Clone, PartialEq, Debug)] From 37b253643576787b805eec8a0ac78ca3afb9d8c8 Mon Sep 17 00:00:00 2001 From: Sho Nakatani Date: Wed, 11 May 2022 11:10:57 +0900 Subject: [PATCH 4/4] test(fix): test case --- springql-core/tests/feat_aggregation.rs | 25 +++++-------------------- 1 file changed, 5 insertions(+), 20 deletions(-) diff --git a/springql-core/tests/feat_aggregation.rs b/springql-core/tests/feat_aggregation.rs index 0f6f7a86..1a80d94b 100644 --- a/springql-core/tests/feat_aggregation.rs +++ b/springql-core/tests/feat_aggregation.rs @@ -46,12 +46,7 @@ fn run_and_drain( ) -> Vec { let _pipeline = apply_ddls(ddls, spring_config_default()); test_source.start(source_input); - let mut sink_received = drain_from_sink(test_sink); - sink_received.sort_by_key(|r| { - let ts = &r["ts"]; - ts.as_str().unwrap().to_string() - }); - sink_received + drain_from_sink(test_sink) } /// See: @@ -75,16 +70,14 @@ fn test_feat_aggregation_without_group_by() -> Result<()> { .to_string(), " CREATE SINK STREAM sink_avg_all ( - ts TIMESTAMP NOT NULL ROWTIME, - amount FLOAT NOT NULL + avg_amount FLOAT NOT NULL ); " .to_string(), " CREATE PUMP avg_all AS - INSERT INTO sink_avg_all (ts, avg_amount) + INSERT INTO sink_avg_all (avg_amount) SELECT STREAM - FLOOR_TIME(source_trade.ts, DURATION_SECS(10)) AS aggr_ts, AVG(source_trade.amount) AS avg_amount FROM source_trade FIXED WINDOW DURATION_SECS(10), DURATION_SECS(0); @@ -126,20 +119,12 @@ fn test_feat_aggregation_without_group_by() -> Result<()> { assert_eq!(sink_received.len(), 2); assert_eq!( - sink_received[0]["ts"].as_str().unwrap(), - "2020-01-01 00:00:00.000000000" - ); - assert_eq!( - sink_received[0]["amount"].as_f64().unwrap().round() as i32, + sink_received[0]["avg_amount"].as_f64().unwrap().round() as i32, 20, ); assert_eq!( - sink_received[1]["ts"].as_str().unwrap(), - "2020-01-01 00:00:10.000000000" - ); - assert_eq!( - sink_received[1]["amount"].as_f64().unwrap().round() as i32, + sink_received[1]["avg_amount"].as_f64().unwrap().round() as i32, 50, );