From d3ddfc040f2d3f197bc5c2b9c2ca70ab578b6e1c Mon Sep 17 00:00:00 2001 From: Sho Nakatani Date: Wed, 11 May 2022 05:43:49 +0900 Subject: [PATCH 1/7] test: add test_select_list_order_with_aggr (reproduce) --- springql-core/tests/feat_projection.rs | 104 +++++++++++++++++++++++++ 1 file changed, 104 insertions(+) create mode 100644 springql-core/tests/feat_projection.rs diff --git a/springql-core/tests/feat_projection.rs b/springql-core/tests/feat_projection.rs new file mode 100644 index 00000000..dc9ac674 --- /dev/null +++ b/springql-core/tests/feat_projection.rs @@ -0,0 +1,104 @@ +// This file is part of https://github.com/SpringQL/SpringQL which is licensed under MIT OR Apache-2.0. See file LICENSE-MIT or LICENSE-APACHE for full license details. + +mod test_support; + +use pretty_assertions::assert_eq; +use serde_json::json; +use springql_core::low_level_rs::*; +use springql_foreign_service::source::source_input::ForeignSourceInput; +use springql_foreign_service::source::ForeignSource; +use springql_test_logger::setup_test_logger; + +use crate::test_support::*; + +fn gen_source_input() -> Vec { + let json_00_1 = json!({ + "ts": "2020-01-01 00:00:00.000000000", + "ticker": "ORCL", + "amount": 10, + }); + let json_00_2 = json!({ + "ts": "2020-01-01 00:00:09.9999999999", + "ticker": "GOOGL", + "amount": 30, + }); + let json_10_1 = json!({ + "ts": "2020-01-01 00:00:10.0000000000", + "ticker": "IBM", + "amount": 50, + }); + + vec![json_00_1, json_00_2, json_10_1] +} + +/// See: +#[test] +fn test_select_list_order_with_aggr() { + setup_test_logger(); + + const QUEUE_NAME: &str = "q"; + + let source_input = gen_source_input(); + + let test_source = ForeignSource::new().unwrap(); + + let ddls = vec![ + " + CREATE SOURCE STREAM source_trade ( + ts TIMESTAMP NOT NULL ROWTIME, + ticker TEXT NOT NULL, + amount INTEGER NOT NULL + ); + " + .to_string(), + " + CREATE SINK STREAM sink_sampled_trade_amount ( + ts TIMESTAMP NOT NULL ROWTIME, + amount FLOAT NOT NULL + ); + " + .to_string(), + " + CREATE PUMP pu_passthrough AS + INSERT INTO sink_sampled_trade_amount (ts, amount) + SELECT STREAM + AVG(source_trade.amount) AS avg_amount, + FLOOR_TIME(source_trade.ts, DURATION_SECS(10)) AS sampled_ts + FROM source_trade + GROUP BY sampled_ts + FIXED WINDOW DURATION_SECS(10), DURATION_SECS(0); + " + .to_string(), + format!( + " + CREATE SINK WRITER queue_sink_trade FOR sink_sampled_trade_amount + TYPE IN_MEMORY_QUEUE OPTIONS ( + NAME '{queue_name}' + ); + ", + queue_name = QUEUE_NAME + ), + format!( + " + CREATE SOURCE READER tcp_trade FOR source_trade + TYPE NET_CLIENT OPTIONS ( + PROTOCOL 'TCP', + REMOTE_HOST '{remote_host}', + REMOTE_PORT '{remote_port}' + ); + ", + remote_host = test_source.host_ip(), + remote_port = test_source.port() + ), + ]; + + let pipeline = apply_ddls(&ddls, SpringConfig::default()); + test_source.start(ForeignSourceInput::new_fifo_batch(source_input)); + + let row = pipeline.pop(QUEUE_NAME).unwrap(); + assert_eq!(row.get_not_null_by_index::(0).unwrap(), 20); + assert_eq!( + row.get_not_null_by_index::(1).unwrap(), + "2020-01-01 00:00:00.000000000" + ); +} From 8141d8163e3be7515b7352a8c6f0305df9ed6f49 Mon Sep 17 00:00:00 2001 From: Sho Nakatani Date: Wed, 11 May 2022 06:10:32 +0900 Subject: [PATCH 2/7] refactor: comment to function name --- springql-core/src/expr_resolver.rs | 112 +++++++++++++++-------------- 1 file changed, 58 insertions(+), 54 deletions(-) diff --git a/springql-core/src/expr_resolver.rs b/springql-core/src/expr_resolver.rs index ac783c5f..636b7b2e 100644 --- a/springql-core/src/expr_resolver.rs +++ b/springql-core/src/expr_resolver.rs @@ -10,7 +10,7 @@ use crate::stream_engine::{SqlValue, Tuple}; use anyhow::anyhow; use std::collections::HashMap; -use self::expr_label::{AggrExprLabel, ExprLabelGenerator, ValueExprLabel}; +use self::expr_label::{AggrExprLabel, ExprLabel, ExprLabelGenerator, ValueExprLabel}; /// ExprResolver is to: /// @@ -32,29 +32,24 @@ pub(crate) struct ExprResolver { impl ExprResolver { /// # Returns /// - /// `(instance, value expr labels in select_list, aggr expr labels in select_list) - pub(crate) fn new( - select_list: Vec, - ) -> (Self, Vec, Vec) { + /// `(instance, value/aggr expr labels in select_list) + pub(crate) fn new(select_list: Vec) -> (Self, Vec) { let mut label_gen = ExprLabelGenerator::default(); let mut value_expressions = HashMap::new(); let mut value_aliased_labels = HashMap::new(); let mut aggr_expressions = HashMap::new(); let mut aggr_aliased_labels = HashMap::new(); - let mut value_expr_labels = Vec::new(); - let mut aggr_expr_labels = Vec::new(); - - select_list + let expr_labels = select_list .into_iter() - .for_each(|select_field| match select_field { + .map(|select_field| match select_field { SelectFieldSyntax::ValueExpr { value_expr, alias } => { let label = label_gen.next_value(); value_expressions.insert(label, value_expr); if let Some(alias) = alias { value_aliased_labels.insert(alias, label); } - value_expr_labels.push(label); + ExprLabel::Value(label) } SelectFieldSyntax::AggrExpr { aggr_expr, alias } => { let label = label_gen.next_aggr(); @@ -62,9 +57,10 @@ impl ExprResolver { if let Some(alias) = alias { aggr_aliased_labels.insert(alias, label); } - aggr_expr_labels.push(label); + ExprLabel::Aggr(label) } - }); + }) + .collect(); ( Self { @@ -75,8 +71,7 @@ impl ExprResolver { aggr_aliased_labels, aggr_expression_results: HashMap::new(), }, - value_expr_labels, - aggr_expr_labels, + expr_labels, ) } @@ -163,6 +158,8 @@ impl ExprResolver { } /// label -> (internal) value expression inside aggr expr + tuple (for ColumnReference) -> SqlValue. + /// + /// _inner_ means: AGGR_FUNC(inner_value_expr) /// /// # Panics /// @@ -187,6 +184,8 @@ impl ExprResolver { #[cfg(test)] mod tests { + use std::thread::panicking; + use crate::{expression::ValueExpr, stream_engine::time::timestamp::SpringTimestamp}; use super::*; @@ -210,48 +209,53 @@ mod tests { }, ]; - let (mut resolver, value_labels_select_list, _aggr_labels_select_list) = - ExprResolver::new(select_list); + let (mut resolver, labels_select_list) = ExprResolver::new(select_list); - assert_eq!( - resolver - .resolve_value_alias(ValueAlias::new("a1".to_string())) - .unwrap(), - value_labels_select_list[1] - ); - assert!(resolver - .resolve_value_alias(ValueAlias::new("a404".to_string())) - .is_err(),); + if let &[ExprLabel::Value(value_label0), ExprLabel::Value(value_label1)] = + &labels_select_list[..] + { + assert_eq!( + resolver + .resolve_value_alias(ValueAlias::new("a1".to_string())) + .unwrap(), + value_label1 + ); + assert!(resolver + .resolve_value_alias(ValueAlias::new("a404".to_string())) + .is_err(),); - let label = resolver.register_value_expr(ValueExpr::factory_add( - ValueExpr::factory_integer(3), - ValueExpr::factory_integer(3), - )); + let label = resolver.register_value_expr(ValueExpr::factory_add( + ValueExpr::factory_integer(3), + ValueExpr::factory_integer(3), + )); - let empty_tuple = Tuple::new(SpringTimestamp::fx_ts1(), vec![]); + let empty_tuple = Tuple::new(SpringTimestamp::fx_ts1(), vec![]); - assert_eq!( - resolver - .eval_value_expr(value_labels_select_list[0], &empty_tuple) - .unwrap(), - SqlValue::factory_integer(2) - ); - assert_eq!( - resolver - .eval_value_expr(value_labels_select_list[0], &empty_tuple) - .unwrap(), - SqlValue::factory_integer(2), - "eval twice" - ); - assert_eq!( - resolver - .eval_value_expr(value_labels_select_list[1], &empty_tuple) - .unwrap(), - SqlValue::factory_integer(4) - ); - assert_eq!( - resolver.eval_value_expr(label, &empty_tuple).unwrap(), - SqlValue::factory_integer(6) - ); + assert_eq!( + resolver + .eval_value_expr(value_label0, &empty_tuple) + .unwrap(), + SqlValue::factory_integer(2) + ); + assert_eq!( + resolver + .eval_value_expr(value_label0, &empty_tuple) + .unwrap(), + SqlValue::factory_integer(2), + "eval twice" + ); + assert_eq!( + resolver + .eval_value_expr(value_label1, &empty_tuple) + .unwrap(), + SqlValue::factory_integer(4) + ); + assert_eq!( + resolver.eval_value_expr(label, &empty_tuple).unwrap(), + SqlValue::factory_integer(6) + ); + } else { + unreachable!() + } } } From 8e69de37a98e480e0960557667a987599ac66e52 Mon Sep 17 00:00:00 2001 From: Sho Nakatani Date: Wed, 11 May 2022 08:17:54 +0900 Subject: [PATCH 3/7] docs: note on Tuple --- springql-core/src/pipeline/field.rs | 2 +- .../src/stream_engine/autonomous_executor/task/tuple.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/springql-core/src/pipeline/field.rs b/springql-core/src/pipeline/field.rs index 39c598de..8b22a65b 100644 --- a/springql-core/src/pipeline/field.rs +++ b/springql-core/src/pipeline/field.rs @@ -13,7 +13,7 @@ use crate::{mem_size::MemSize, stream_engine::SqlValue}; use self::field_name::ColumnReference; -/// Field == SqlValue + FieldName +/// Field == SqlValue + ColumnReference #[derive(Clone, PartialEq, Debug, new)] pub(crate) struct Field { name: ColumnReference, diff --git a/springql-core/src/stream_engine/autonomous_executor/task/tuple.rs b/springql-core/src/stream_engine/autonomous_executor/task/tuple.rs index 31ce80ae..c1a82aa3 100644 --- a/springql-core/src/stream_engine/autonomous_executor/task/tuple.rs +++ b/springql-core/src/stream_engine/autonomous_executor/task/tuple.rs @@ -12,7 +12,7 @@ use anyhow::anyhow; /// /// 1. Task gets a row from input queue. /// 2. Task converts the row into tuple. -/// 3. Task puts a row converted from the final tuple. +/// 3. Task puts a row converted from the final tuple (for column values) and ExprResolver (for expressions). /// /// Unlike rows, tuples may have not only stream's columns but also fields derived from expressions. #[derive(Clone, PartialEq, Debug, new)] From b93dd69aff04a3b8e771c40e164cd48355d5bfe1 Mon Sep 17 00:00:00 2001 From: Sho Nakatani Date: Wed, 11 May 2022 09:42:12 +0900 Subject: [PATCH 4/7] fix: insert order --- springql-core/tests/feat_projection.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/springql-core/tests/feat_projection.rs b/springql-core/tests/feat_projection.rs index dc9ac674..866d359e 100644 --- a/springql-core/tests/feat_projection.rs +++ b/springql-core/tests/feat_projection.rs @@ -60,7 +60,7 @@ fn test_select_list_order_with_aggr() { .to_string(), " CREATE PUMP pu_passthrough AS - INSERT INTO sink_sampled_trade_amount (ts, amount) + INSERT INTO sink_sampled_trade_amount (amount, ts) SELECT STREAM AVG(source_trade.amount) AS avg_amount, FLOOR_TIME(source_trade.ts, DURATION_SECS(10)) AS sampled_ts From 8d6f620575f82df5417ab2eb7243fbf47e206ccb Mon Sep 17 00:00:00 2001 From: Sho Nakatani Date: Wed, 11 May 2022 09:45:56 +0900 Subject: [PATCH 5/7] fix: RowHL::get_not_null_by_index order follows table definition --- springql-core/tests/feat_projection.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/springql-core/tests/feat_projection.rs b/springql-core/tests/feat_projection.rs index 866d359e..0815f202 100644 --- a/springql-core/tests/feat_projection.rs +++ b/springql-core/tests/feat_projection.rs @@ -96,9 +96,9 @@ fn test_select_list_order_with_aggr() { test_source.start(ForeignSourceInput::new_fifo_batch(source_input)); let row = pipeline.pop(QUEUE_NAME).unwrap(); - assert_eq!(row.get_not_null_by_index::(0).unwrap(), 20); assert_eq!( - row.get_not_null_by_index::(1).unwrap(), + row.get_not_null_by_index::(0).unwrap(), "2020-01-01 00:00:00.000000000" ); + assert_eq!(row.get_not_null_by_index::(1).unwrap(), 20); } From 71dddbe13cde7bb611d022fb4904e7b163f01bdb Mon Sep 17 00:00:00 2001 From: Sho Nakatani Date: Wed, 11 May 2022 09:47:07 +0900 Subject: [PATCH 6/7] fix: SELECT with aggragate preserves select_list order --- springql-core/src/expr_resolver.rs | 4 +- springql-core/src/expr_resolver/expr_label.rs | 7 + .../src/sql_processor/query_planner.rs | 21 +- .../pump_task/pump_subtask/query_subtask.rs | 110 +-- .../query_subtask/aggr_projection_subtask.rs | 28 - .../group_aggregate_window_subtask.rs | 4 +- .../query_subtask/projection_subtask.rs | 62 ++ .../query_subtask/value_projection_subtask.rs | 29 - .../task/window/aggregate.rs | 791 ++++++++++-------- .../task/window/join_window.rs | 2 +- .../autonomous_executor/task/window/panes.rs | 47 +- .../task/window/panes/pane/aggregate_pane.rs | 25 +- .../query_plan/query_plan_operation.rs | 5 +- 13 files changed, 602 insertions(+), 533 deletions(-) delete mode 100644 springql-core/src/stream_engine/autonomous_executor/task/pump_task/pump_subtask/query_subtask/aggr_projection_subtask.rs create mode 100644 springql-core/src/stream_engine/autonomous_executor/task/pump_task/pump_subtask/query_subtask/projection_subtask.rs delete mode 100644 springql-core/src/stream_engine/autonomous_executor/task/pump_task/pump_subtask/query_subtask/value_projection_subtask.rs diff --git a/springql-core/src/expr_resolver.rs b/springql-core/src/expr_resolver.rs index 636b7b2e..e516c913 100644 --- a/springql-core/src/expr_resolver.rs +++ b/springql-core/src/expr_resolver.rs @@ -158,7 +158,7 @@ impl ExprResolver { } /// label -> (internal) value expression inside aggr expr + tuple (for ColumnReference) -> SqlValue. - /// + /// /// _inner_ means: AGGR_FUNC(inner_value_expr) /// /// # Panics @@ -184,8 +184,6 @@ impl ExprResolver { #[cfg(test)] mod tests { - use std::thread::panicking; - use crate::{expression::ValueExpr, stream_engine::time::timestamp::SpringTimestamp}; use super::*; diff --git a/springql-core/src/expr_resolver/expr_label.rs b/springql-core/src/expr_resolver/expr_label.rs index 1612c189..9c804869 100644 --- a/springql-core/src/expr_resolver/expr_label.rs +++ b/springql-core/src/expr_resolver/expr_label.rs @@ -25,3 +25,10 @@ pub(crate) struct ValueExprLabel(u16); #[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)] pub(crate) struct AggrExprLabel(u16); + +/// Either ValueExprLabel or AggrExprLabel. +#[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)] +pub(crate) enum ExprLabel { + Value(ValueExprLabel), + Aggr(AggrExprLabel), +} diff --git a/springql-core/src/sql_processor/query_planner.rs b/springql-core/src/sql_processor/query_planner.rs index 3d3ac683..b7712a16 100644 --- a/springql-core/src/sql_processor/query_planner.rs +++ b/springql-core/src/sql_processor/query_planner.rs @@ -59,7 +59,7 @@ mod select_syntax_analyzer; use crate::{ error::Result, - expr_resolver::ExprResolver, + expr_resolver::{expr_label::ExprLabel, ExprResolver}, pipeline::{ pump_model::{ window_operation_parameter::{ @@ -92,11 +92,10 @@ impl QueryPlanner { } pub(crate) fn plan(self, pipeline: &Pipeline) -> Result { - let (mut expr_resolver, value_labels_select_list, aggr_labels_select_list) = + let (mut expr_resolver, labels_select_list) = ExprResolver::new(self.analyzer.select_list().to_vec()); let projection = ProjectionOp { - value_expr_labels: value_labels_select_list, - aggr_expr_labels: aggr_labels_select_list, + expr_labels: labels_select_list, }; let group_aggr_window = @@ -140,11 +139,21 @@ impl QueryPlanner { projection_op: &ProjectionOp, ) -> Result> { let opt_grouping_elem = self.analyzer.grouping_element(); - let aggr_labels = &projection_op.aggr_expr_labels; + let aggr_labels = projection_op + .expr_labels + .iter() + .filter_map(|label| { + if let ExprLabel::Aggr(aggr_label) = label { + Some(*aggr_label) + } else { + None + } + }) + .collect::>(); match (opt_grouping_elem, aggr_labels.len()) { (Some(grouping_elem), 1) => { - let aggr_label = aggr_labels.iter().next().expect("len checked"); + let aggr_label = aggr_labels.get(0).expect("len checked"); let aggr_func = expr_resolver.resolve_aggr_expr(*aggr_label).func; let group_by_label = match grouping_elem { diff --git a/springql-core/src/stream_engine/autonomous_executor/task/pump_task/pump_subtask/query_subtask.rs b/springql-core/src/stream_engine/autonomous_executor/task/pump_task/pump_subtask/query_subtask.rs index 3b40a494..2efc99e3 100644 --- a/springql-core/src/stream_engine/autonomous_executor/task/pump_task/pump_subtask/query_subtask.rs +++ b/springql-core/src/stream_engine/autonomous_executor/task/pump_task/pump_subtask/query_subtask.rs @@ -1,10 +1,9 @@ // This file is part of https://github.com/SpringQL/SpringQL which is licensed under MIT OR Apache-2.0. See file LICENSE-MIT or LICENSE-APACHE for full license details. -pub(super) mod aggr_projection_subtask; pub(super) mod collect_subtask; pub(super) mod group_aggregate_window_subtask; pub(super) mod join_subtask; -pub(super) mod value_projection_subtask; +pub(super) mod projection_subtask; use std::sync::{Arc, Mutex, MutexGuard}; @@ -36,16 +35,14 @@ use crate::{ InQueueMetricsUpdateByTask, WindowInFlowByWindowTask, }, row::{column::stream_column::StreamColumns, column_values::ColumnValues, Row}, - task::window::aggregate::GroupAggrOut, }, command::query_plan::query_plan_operation::JoinOp, }, }; use self::{ - aggr_projection_subtask::AggrProjectionSubtask, collect_subtask::CollectSubtask, - group_aggregate_window_subtask::GroupAggregateWindowSubtask, join_subtask::JoinSubtask, - value_projection_subtask::ValueProjectionSubtask, + collect_subtask::CollectSubtask, group_aggregate_window_subtask::GroupAggregateWindowSubtask, + join_subtask::JoinSubtask, projection_subtask::ProjectionSubtask, }; /// Process input row 1-by-1. @@ -53,9 +50,8 @@ use self::{ pub(in crate::stream_engine::autonomous_executor) struct QuerySubtask { expr_resolver: ExprResolver, - value_projection_subtask: Option, + projection_subtask: ProjectionSubtask, - aggr_projection_subtask: Option, group_aggr_window_subtask: Option, // TODO recursive JOIN @@ -126,52 +122,20 @@ impl QuerySubtask { let (left_collect_subtask, join) = Self::subtasks_from_lower_ops(plan.lower_ops); - if plan.upper_ops.projection.aggr_expr_labels.is_empty() { - let value_projection_subtask = - ValueProjectionSubtask::new(plan.upper_ops.projection.value_expr_labels); - - Self { - expr_resolver: plan.expr_resolver, - value_projection_subtask: Some(value_projection_subtask), - aggr_projection_subtask: None, - group_aggr_window_subtask: None, - left_collect_subtask, - join, - rng, - } - } else { - assert_eq!( - plan.upper_ops.projection.aggr_expr_labels.len(), - 1, - "currently only 1 aggregate in select_list is supported" - ); - assert_eq!( - plan.upper_ops.projection.value_expr_labels.len(), - 1, - "currently only GROUP BY expression in select_list is supported" - ); - - let aggr_projection_subtask = AggrProjectionSubtask::new( - plan.upper_ops.projection.value_expr_labels[0], - plan.upper_ops.projection.aggr_expr_labels[0], - ); - - let op = plan - .upper_ops - .group_aggr_window - .expect("select_list includes aggregate "); - let group_aggr_window_subtask = - GroupAggregateWindowSubtask::new(op.window_param, op.op_param); - - Self { - expr_resolver: plan.expr_resolver, - value_projection_subtask: None, - aggr_projection_subtask: Some(aggr_projection_subtask), - group_aggr_window_subtask: Some(group_aggr_window_subtask), - left_collect_subtask, - join, - rng, - } + let group_aggr_window_subtask = plan + .upper_ops + .group_aggr_window + .map(|op| GroupAggregateWindowSubtask::new(op.window_param, op.op_param)); + + let projection_subtask = ProjectionSubtask::new(plan.upper_ops.projection.expr_labels); + + Self { + expr_resolver: plan.expr_resolver, + projection_subtask, + group_aggr_window_subtask, + left_collect_subtask, + join, + rng, } } /// (left collect subtask, Option<(join subtask, right collect subtask)>) @@ -248,40 +212,26 @@ impl QuerySubtask { tuple: Tuple, ) -> Result<(Vec, WindowInFlowByWindowTask)> { if let Some(group_aggr_window_subtask) = &self.group_aggr_window_subtask { - let (group_aggr_out_seq, window_in_flow) = + let (aggregated_and_grouping_values_seq, window_in_flow) = group_aggr_window_subtask.run(&self.expr_resolver, tuple); - let values = self.run_aggr_projection_op(group_aggr_out_seq)?; + let values_seq = aggregated_and_grouping_values_seq + .into_iter() + .map(|aggregated_and_grouping_values| { + self.projection_subtask + .run_with_aggr(aggregated_and_grouping_values) + }) + .collect::>>()?; - Ok((values, window_in_flow)) + Ok((values_seq, window_in_flow)) } else { - let values = self.run_projection_op(&tuple)?; + let values = self + .projection_subtask + .run_without_aggr(&self.expr_resolver, &tuple)?; Ok((vec![values], WindowInFlowByWindowTask::zero())) } } - fn run_projection_op(&self, tuple: &Tuple) -> Result { - self.value_projection_subtask - .as_ref() - .unwrap() - .run(&self.expr_resolver, tuple) - } - - fn run_aggr_projection_op( - &self, - group_aggr_out_seq: Vec, - ) -> Result> { - group_aggr_out_seq - .into_iter() - .map(|group_agg_out| { - self.aggr_projection_subtask - .as_ref() - .unwrap() - .run(group_agg_out) - }) - .collect() - } - /// # Returns /// /// None when input queue does not exist or is empty or JOIN op does not emit output yet. diff --git a/springql-core/src/stream_engine/autonomous_executor/task/pump_task/pump_subtask/query_subtask/aggr_projection_subtask.rs b/springql-core/src/stream_engine/autonomous_executor/task/pump_task/pump_subtask/query_subtask/aggr_projection_subtask.rs deleted file mode 100644 index d7de9cfe..00000000 --- a/springql-core/src/stream_engine/autonomous_executor/task/pump_task/pump_subtask/query_subtask/aggr_projection_subtask.rs +++ /dev/null @@ -1,28 +0,0 @@ -// This file is part of https://github.com/SpringQL/SpringQL which is licensed under MIT OR Apache-2.0. See file LICENSE-MIT or LICENSE-APACHE for full license details. - -use crate::error::Result; -use crate::expr_resolver::expr_label::{AggrExprLabel, ValueExprLabel}; -use crate::stream_engine::autonomous_executor::task::window::aggregate::GroupAggrOut; - -use super::SqlValues; - -#[derive(Debug, new)] -pub(in crate::stream_engine::autonomous_executor) struct AggrProjectionSubtask { - group_by_expr: ValueExprLabel, - aggr_expr: AggrExprLabel, -} - -impl AggrProjectionSubtask { - pub(in crate::stream_engine::autonomous_executor) fn run( - &self, - group_aggr_out: GroupAggrOut, - ) -> Result { - let aggr_label = self.aggr_expr; - let group_by_label = self.group_by_expr; - - let (aggr_result, group_by_result) = - group_aggr_out.into_results(aggr_label, group_by_label)?; - - Ok(SqlValues::new(vec![group_by_result, aggr_result])) // FIXME keep select list order: - } -} diff --git a/springql-core/src/stream_engine/autonomous_executor/task/pump_task/pump_subtask/query_subtask/group_aggregate_window_subtask.rs b/springql-core/src/stream_engine/autonomous_executor/task/pump_task/pump_subtask/query_subtask/group_aggregate_window_subtask.rs index 2257d48e..dbbd00db 100644 --- a/springql-core/src/stream_engine/autonomous_executor/task/pump_task/pump_subtask/query_subtask/group_aggregate_window_subtask.rs +++ b/springql-core/src/stream_engine/autonomous_executor/task/pump_task/pump_subtask/query_subtask/group_aggregate_window_subtask.rs @@ -8,7 +8,7 @@ use crate::pipeline::pump_model::window_parameter::WindowParameter; use crate::stream_engine::autonomous_executor::performance_metrics::metrics_update_command::metrics_update_by_task_execution::WindowInFlowByWindowTask; use crate::stream_engine::autonomous_executor::task::tuple::Tuple; use crate::stream_engine::autonomous_executor::task::window::Window; -use crate::stream_engine::autonomous_executor::task::window::aggregate::{GroupAggrOut, AggrWindow}; +use crate::stream_engine::autonomous_executor::task::window::aggregate::{AggregatedAndGroupingValues, AggrWindow}; #[derive(Debug)] pub(in crate::stream_engine::autonomous_executor) struct GroupAggregateWindowSubtask( @@ -28,7 +28,7 @@ impl GroupAggregateWindowSubtask { &self, expr_resolver: &ExprResolver, tuple: Tuple, - ) -> (Vec, WindowInFlowByWindowTask) { + ) -> (Vec, WindowInFlowByWindowTask) { self.0 .lock() .expect("another thread accessing to window gets poisoned") diff --git a/springql-core/src/stream_engine/autonomous_executor/task/pump_task/pump_subtask/query_subtask/projection_subtask.rs b/springql-core/src/stream_engine/autonomous_executor/task/pump_task/pump_subtask/query_subtask/projection_subtask.rs new file mode 100644 index 00000000..2b838ff8 --- /dev/null +++ b/springql-core/src/stream_engine/autonomous_executor/task/pump_task/pump_subtask/query_subtask/projection_subtask.rs @@ -0,0 +1,62 @@ +// This file is part of https://github.com/SpringQL/SpringQL which is licensed under MIT OR Apache-2.0. See file LICENSE-MIT or LICENSE-APACHE for full license details. + +use crate::error::Result; +use crate::expr_resolver::expr_label::ExprLabel; +use crate::expr_resolver::ExprResolver; +use crate::stream_engine::autonomous_executor::task::tuple::Tuple; +use crate::stream_engine::autonomous_executor::task::window::aggregate::AggregatedAndGroupingValues; + +use super::SqlValues; + +#[derive(Debug, new)] +pub(in crate::stream_engine::autonomous_executor) struct ProjectionSubtask { + exprs: Vec, +} + +impl ProjectionSubtask { + /// Projection for SELECT without aggregate. + pub(in crate::stream_engine::autonomous_executor) fn run_without_aggr( + &self, + expr_resolver: &ExprResolver, + tuple: &Tuple, + ) -> Result { + let values = self + .exprs + .iter() + .map(|label| match label { + ExprLabel::Value(group_by_value_label) => { + expr_resolver.eval_value_expr(*group_by_value_label, tuple) + } + ExprLabel::Aggr(_) => unreachable!("aggregate must not be in select_list"), + }) + .collect::>>()?; + + Ok(SqlValues::new(values)) + } + + /// Projection for SELECT with aggregate. + /// select_list must only have GROUP BY elements or aggregate expressions. + /// (Column reference without aggregate is not allowed.) + pub(in crate::stream_engine::autonomous_executor) fn run_with_aggr( + &self, + aggregated_and_grouping_values: AggregatedAndGroupingValues, + ) -> Result { + let values = self + .exprs + .iter() + .map(|label| { + match label { + ExprLabel::Value(group_by_value_label) => { + aggregated_and_grouping_values.get_group_by_value(group_by_value_label) + } + ExprLabel::Aggr(aggr_label) => { + aggregated_and_grouping_values.get_aggregated_value(aggr_label) + } + } + .cloned() + }) + .collect::>>()?; + + Ok(SqlValues::new(values)) + } +} diff --git a/springql-core/src/stream_engine/autonomous_executor/task/pump_task/pump_subtask/query_subtask/value_projection_subtask.rs b/springql-core/src/stream_engine/autonomous_executor/task/pump_task/pump_subtask/query_subtask/value_projection_subtask.rs deleted file mode 100644 index 61c9ebb2..00000000 --- a/springql-core/src/stream_engine/autonomous_executor/task/pump_task/pump_subtask/query_subtask/value_projection_subtask.rs +++ /dev/null @@ -1,29 +0,0 @@ -// This file is part of https://github.com/SpringQL/SpringQL which is licensed under MIT OR Apache-2.0. See file LICENSE-MIT or LICENSE-APACHE for full license details. - -use crate::error::Result; -use crate::expr_resolver::expr_label::ValueExprLabel; -use crate::expr_resolver::ExprResolver; -use crate::stream_engine::autonomous_executor::task::tuple::Tuple; - -use super::SqlValues; - -#[derive(Debug, new)] -pub(in crate::stream_engine::autonomous_executor) struct ValueProjectionSubtask { - value_exprs: Vec, -} - -impl ValueProjectionSubtask { - pub(in crate::stream_engine::autonomous_executor) fn run( - &self, - expr_resolver: &ExprResolver, - tuple: &Tuple, - ) -> Result { - let values = self - .value_exprs - .iter() - .map(|label| expr_resolver.eval_value_expr(*label, tuple)) - .collect::>>()?; - - Ok(SqlValues::new(values)) - } -} diff --git a/springql-core/src/stream_engine/autonomous_executor/task/window/aggregate.rs b/springql-core/src/stream_engine/autonomous_executor/task/window/aggregate.rs index 2274611b..84f3cbc5 100644 --- a/springql-core/src/stream_engine/autonomous_executor/task/window/aggregate.rs +++ b/springql-core/src/stream_engine/autonomous_executor/task/window/aggregate.rs @@ -1,5 +1,7 @@ // This file is part of https://github.com/SpringQL/SpringQL which is licensed under MIT OR Apache-2.0. See file LICENSE-MIT or LICENSE-APACHE for full license details. +use std::collections::HashMap; + use crate::{ error::{Result, SpringError}, expr_resolver::expr_label::{AggrExprLabel, ValueExprLabel}, @@ -17,41 +19,50 @@ use super::{ Window, }; -#[derive(Clone, PartialEq, Debug, new)] -pub(in crate::stream_engine::autonomous_executor) struct GroupAggrOut { - aggr_label: AggrExprLabel, - aggr_result: SqlValue, - - group_by_label: ValueExprLabel, - group_by_result: SqlValue, +/// for aggregate expressions: AggrExprLabel -> SqlValue, +/// for GROUP BY expressions: ValueExprLabel -> SqlValue +/// +/// Projection operation for SELECT with aggregate completes with this instance (without Tuple). +#[derive(Clone, PartialEq, Debug, Default)] +pub(in crate::stream_engine::autonomous_executor) struct AggregatedAndGroupingValues { + aggr: HashMap, + group_by: HashMap, } -impl GroupAggrOut { - /// # Returns - /// - /// (aggregate result, group by result) +impl AggregatedAndGroupingValues { + pub(in crate::stream_engine::autonomous_executor) fn new( + aggregates: Vec<(AggrExprLabel, SqlValue)>, + group_bys: Vec<(ValueExprLabel, SqlValue)>, + ) -> Self { + Self { + aggr: aggregates.into_iter().collect(), + group_by: group_bys.into_iter().collect(), + } + } + + /// # Failures /// + /// - `SpringError::Sql` when: + /// - `label` is not included in aggregation result + pub(in crate::stream_engine::autonomous_executor) fn get_aggregated_value( + &self, + label: &AggrExprLabel, + ) -> Result<&SqlValue> { + self.aggr + .get(label) + .ok_or_else(|| SpringError::Sql(anyhow!("aggregate label not found: {:?}", label))) + } + /// # Failures /// /// - `SpringError::Sql` when: /// - `label` is not included in aggregation result - pub(in crate::stream_engine::autonomous_executor) fn into_results( - self, - aggr_label: AggrExprLabel, - group_by_label: ValueExprLabel, - ) -> Result<(SqlValue, SqlValue)> { - if self.aggr_label != aggr_label { - Err(SpringError::Sql(anyhow!( - "aggregate labeled {:?} is not calculated", - aggr_label - ))) - } else if self.group_by_label != group_by_label { - Err(SpringError::Sql(anyhow!( - "GROUP BY expression {:?} is not calculated", - group_by_label - ))) - } else { - Ok((self.aggr_result, self.group_by_result)) - } + pub(in crate::stream_engine::autonomous_executor) fn get_group_by_value( + &self, + label: &ValueExprLabel, + ) -> Result<&SqlValue> { + self.group_by + .get(label) + .ok_or_else(|| SpringError::Sql(anyhow!("GROUP BY label not found: {:?}", label))) } } @@ -107,7 +118,7 @@ mod tests { use std::str::FromStr; use crate::{ - expr_resolver::ExprResolver, + expr_resolver::{expr_label::ExprLabel, ExprResolver}, expression::{AggrExpr, ValueExpr}, pipeline::{ name::{AggrAlias, ColumnName, StreamName}, @@ -125,25 +136,52 @@ mod tests { }, }; - fn t_expect(group_aggr_out: GroupAggrOut, expected_ticker: &str, expected_avg_amount: i16) { - let ticker = group_aggr_out.group_by_result.unwrap(); + fn t_expect( + aggr_label: AggrExprLabel, + group_by_label: ValueExprLabel, + aggregated_and_grouping_values: AggregatedAndGroupingValues, + expected_ticker: &str, + expected_avg_amount: i16, + ) { + let ticker = aggregated_and_grouping_values + .get_group_by_value(&group_by_label) + .unwrap() + .clone() + .unwrap(); assert_eq!( ticker.unpack::().unwrap(), expected_ticker.to_string() ); - let avg_amount = group_aggr_out.aggr_result.unwrap(); + let avg_amount = aggregated_and_grouping_values + .get_aggregated_value(&aggr_label) + .unwrap() + .clone() + .unwrap(); assert_eq!( avg_amount.unpack::().unwrap().round() as i16, expected_avg_amount ); } + fn sort_key( + group_by_label: &ValueExprLabel, + aggregated_and_grouping_values: &AggregatedAndGroupingValues, + ) -> String { + aggregated_and_grouping_values + .get_group_by_value(group_by_label) + .unwrap() + .clone() + .unwrap() + .unpack::() + .unwrap() + } + #[test] fn test_timed_sliding_window_aggregation() { setup_test_logger(); - // SELECT ticker AS tick, AVG(amount) AS avg_amount + // SELECT ticker, AVG(amount) AS avg_amount // FROM trade // SLIDING WINDOW duration_secs(10), duration_secs(5), duration_secs(1) // GROUP BY ticker; @@ -171,180 +209,204 @@ mod tests { }, ]; - let (mut expr_resolver, _, aggr_labels_select_list) = ExprResolver::new(select_list); - - let group_by_expr = ValueExpr::factory_colref( - StreamName::fx_trade().as_ref(), - ColumnName::fx_ticker().as_ref(), - ); - let group_by_label = expr_resolver.register_value_expr(group_by_expr); - - let mut window = AggrWindow::new( - WindowParameter::TimedSlidingWindow { - length: SpringEventDuration::from_secs(10), - period: SpringEventDuration::from_secs(5), - allowed_delay: SpringEventDuration::from_secs(1), - }, - WindowOperationParameter::GroupAggregation(GroupAggregateParameter { - aggr_func: AggregateFunctionParameter::Avg, - aggr_expr: aggr_labels_select_list[0], - group_by: group_by_label, - }), - ); - - // [:55, :05): ("GOOGL", 100) - // [:00, :10): ("GOOGL", 100) - let (out, window_in_flow) = window.dispatch( - &expr_resolver, - Tuple::factory_trade( - SpringTimestamp::from_str("2020-01-01 00:00:00.000000000").unwrap(), - "GOOGL", - 100, - ), - (), - ); - assert!(out.is_empty()); - assert_eq!(window_in_flow.window_gain_bytes_states, 0); - assert_eq!(window_in_flow.window_gain_bytes_rows, 0); - - // [:55, :05): ("GOOGL", 100), ("ORCL", 100) - // [:00, :10): ("GOOGL", 100), ("ORCL", 100) - let (out, window_in_flow) = window.dispatch( - &expr_resolver, - Tuple::factory_trade( - SpringTimestamp::from_str("2020-01-01 00:00:04.999999999").unwrap(), - "ORCL", - 100, - ), - (), - ); - assert!(out.is_empty()); - assert_eq!(window_in_flow.window_gain_bytes_states, 0); - assert_eq!(window_in_flow.window_gain_bytes_rows, 0); - - // [:55, :05): -> "GOOGL" AVG = 100; "ORCL" AVG = 100 - // - // [:00, :10): ("GOOGL", 100), ("ORCL", 100), ("ORCL", 400) - // [:05, :15): ("ORCL", 400) - let (mut out, window_in_flow) = window.dispatch( - &expr_resolver, - Tuple::factory_trade( - SpringTimestamp::from_str("2020-01-01 00:00:06.000000000").unwrap(), - "ORCL", - 400, - ), - (), - ); - assert_eq!(out.len(), 2); - out.sort_by_key(|group_aggr_out| { - group_aggr_out - .group_by_result - .clone() - .unwrap() - .unpack::() - .unwrap() - }); - t_expect(out.get(0).cloned().unwrap(), "GOOGL", 100); - t_expect(out.get(1).cloned().unwrap(), "ORCL", 100); - assert_eq!(window_in_flow.window_gain_bytes_states, 0); - assert_eq!(window_in_flow.window_gain_bytes_rows, 0); - - // [:00, :10): ("GOOGL", 100), ("ORCL", 100), ("ORCL", 400) <-- !!NOT CLOSED YET (within delay)!! - // [:05, :15): ("ORCL", 400), ("ORCL", 100) - // [:10, :20): ("ORCL", 100) - let (out, window_in_flow) = window.dispatch( - &expr_resolver, - Tuple::factory_trade( - SpringTimestamp::from_str("2020-01-01 00:00:10.999999999").unwrap(), - "ORCL", - 100, - ), - (), - ); - assert!(out.is_empty()); - assert_eq!(window_in_flow.window_gain_bytes_states, 0); - assert_eq!(window_in_flow.window_gain_bytes_rows, 0); - - // too late data to be ignored - // - // [:00, :10): ("GOOGL", 100), ("ORCL", 100), ("ORCL", 400) - // [:05, :15): ("ORCL", 400), ("ORCL", 100) - // [:10, :20): ("ORCL", 100) - let (out, window_in_flow) = window.dispatch( - &expr_resolver, - Tuple::factory_trade( - SpringTimestamp::from_str("2020-01-01 00:00:09.999999998").unwrap(), - "ORCL", - 100, - ), - (), - ); - assert!(out.is_empty()); - assert_eq!(window_in_flow.window_gain_bytes_states, 0); - assert_eq!(window_in_flow.window_gain_bytes_rows, 0); - - // [:00, :10): ("GOOGL", 100), ("ORCL", 100), ("ORCL", 400), ("ORCL", 100) <-- !!LATE DATA!! - // [:05, :15): ("ORCL", 400), ("ORCL", 100), ("ORCL", 100) - // [:10, :20): ("ORCL", 100) - let (out, window_in_flow) = window.dispatch( - &expr_resolver, - Tuple::factory_trade( - SpringTimestamp::from_str("2020-01-01 00:00:09.9999999999").unwrap(), - "ORCL", - 100, - ), - (), - ); - assert!(out.is_empty()); - assert_eq!(window_in_flow.window_gain_bytes_states, 0); - assert_eq!(window_in_flow.window_gain_bytes_rows, 0); - - // [:00, :10): -> "GOOGL" AVG = 100; "ORCL" AVG = 200 - // - // [:05, :15): ("ORCL", 400), ("ORCL", 100), ("ORCL", 100), ("ORCL", 100) - // [:10, :20): ("ORCL", 100), ("ORCL", 100) - let (mut out, window_in_flow) = window.dispatch( - &expr_resolver, - Tuple::factory_trade( - SpringTimestamp::from_str("2020-01-01 00:00:11.000000000").unwrap(), - "ORCL", - 100, - ), - (), - ); - assert_eq!(out.len(), 2); - out.sort_by_key(|group_aggr_out| { - group_aggr_out - .group_by_result - .clone() - .unwrap() - .unpack::() - .unwrap() - }); - t_expect(out.get(0).cloned().unwrap(), "GOOGL", 100); - t_expect(out.get(1).cloned().unwrap(), "ORCL", 200); - assert_eq!(window_in_flow.window_gain_bytes_states, 0); - assert_eq!(window_in_flow.window_gain_bytes_rows, 0); - - // [:05, :15): -> "ORCL" = 175 - // [:10, :20): -> "ORCL" = 100 - // - // [:15, :25): ("ORCL", 100) - // [:20, :30): ("ORCL", 100) - let (out, window_in_flow) = window.dispatch( - &expr_resolver, - Tuple::factory_trade( - SpringTimestamp::from_str("2020-01-01 00:00:21.000000000").unwrap(), - "ORCL", - 100, - ), - (), - ); - assert_eq!(out.len(), 2); - t_expect(out.get(0).cloned().unwrap(), "ORCL", 175); - t_expect(out.get(1).cloned().unwrap(), "ORCL", 100); - assert_eq!(window_in_flow.window_gain_bytes_states, 0); - assert_eq!(window_in_flow.window_gain_bytes_rows, 0); + let (expr_resolver, labels) = ExprResolver::new(select_list); + match &labels[..] { + &[ExprLabel::Value(group_by_label), ExprLabel::Aggr(aggr_label)] => { + let mut window = AggrWindow::new( + WindowParameter::TimedSlidingWindow { + length: SpringEventDuration::from_secs(10), + period: SpringEventDuration::from_secs(5), + allowed_delay: SpringEventDuration::from_secs(1), + }, + WindowOperationParameter::GroupAggregation(GroupAggregateParameter { + aggr_func: AggregateFunctionParameter::Avg, + aggr_expr: aggr_label, + group_by: group_by_label, + }), + ); + + // [:55, :05): ("GOOGL", 100) + // [:00, :10): ("GOOGL", 100) + let (out, window_in_flow) = window.dispatch( + &expr_resolver, + Tuple::factory_trade( + SpringTimestamp::from_str("2020-01-01 00:00:00.000000000").unwrap(), + "GOOGL", + 100, + ), + (), + ); + assert!(out.is_empty()); + assert_eq!(window_in_flow.window_gain_bytes_states, 0); + assert_eq!(window_in_flow.window_gain_bytes_rows, 0); + + // [:55, :05): ("GOOGL", 100), ("ORCL", 100) + // [:00, :10): ("GOOGL", 100), ("ORCL", 100) + let (out, window_in_flow) = window.dispatch( + &expr_resolver, + Tuple::factory_trade( + SpringTimestamp::from_str("2020-01-01 00:00:04.999999999").unwrap(), + "ORCL", + 100, + ), + (), + ); + assert!(out.is_empty()); + assert_eq!(window_in_flow.window_gain_bytes_states, 0); + assert_eq!(window_in_flow.window_gain_bytes_rows, 0); + + // [:55, :05): -> "GOOGL" AVG = 100; "ORCL" AVG = 100 + // + // [:00, :10): ("GOOGL", 100), ("ORCL", 100), ("ORCL", 400) + // [:05, :15): ("ORCL", 400) + let (mut out, window_in_flow) = window.dispatch( + &expr_resolver, + Tuple::factory_trade( + SpringTimestamp::from_str("2020-01-01 00:00:06.000000000").unwrap(), + "ORCL", + 400, + ), + (), + ); + assert_eq!(out.len(), 2); + out.sort_by_key(|aggregated_and_grouping_values| { + sort_key(&group_by_label, aggregated_and_grouping_values) + }); + t_expect( + aggr_label, + group_by_label, + out.get(0).cloned().unwrap(), + "GOOGL", + 100, + ); + t_expect( + aggr_label, + group_by_label, + out.get(1).cloned().unwrap(), + "ORCL", + 100, + ); + assert_eq!(window_in_flow.window_gain_bytes_states, 0); + assert_eq!(window_in_flow.window_gain_bytes_rows, 0); + + // [:00, :10): ("GOOGL", 100), ("ORCL", 100), ("ORCL", 400) <-- !!NOT CLOSED YET (within delay)!! + // [:05, :15): ("ORCL", 400), ("ORCL", 100) + // [:10, :20): ("ORCL", 100) + let (out, window_in_flow) = window.dispatch( + &expr_resolver, + Tuple::factory_trade( + SpringTimestamp::from_str("2020-01-01 00:00:10.999999999").unwrap(), + "ORCL", + 100, + ), + (), + ); + assert!(out.is_empty()); + assert_eq!(window_in_flow.window_gain_bytes_states, 0); + assert_eq!(window_in_flow.window_gain_bytes_rows, 0); + + // too late data to be ignored + // + // [:00, :10): ("GOOGL", 100), ("ORCL", 100), ("ORCL", 400) + // [:05, :15): ("ORCL", 400), ("ORCL", 100) + // [:10, :20): ("ORCL", 100) + let (out, window_in_flow) = window.dispatch( + &expr_resolver, + Tuple::factory_trade( + SpringTimestamp::from_str("2020-01-01 00:00:09.999999998").unwrap(), + "ORCL", + 100, + ), + (), + ); + assert!(out.is_empty()); + assert_eq!(window_in_flow.window_gain_bytes_states, 0); + assert_eq!(window_in_flow.window_gain_bytes_rows, 0); + + // [:00, :10): ("GOOGL", 100), ("ORCL", 100), ("ORCL", 400), ("ORCL", 100) <-- !!LATE DATA!! + // [:05, :15): ("ORCL", 400), ("ORCL", 100), ("ORCL", 100) + // [:10, :20): ("ORCL", 100) + let (out, window_in_flow) = window.dispatch( + &expr_resolver, + Tuple::factory_trade( + SpringTimestamp::from_str("2020-01-01 00:00:09.9999999999").unwrap(), + "ORCL", + 100, + ), + (), + ); + assert!(out.is_empty()); + assert_eq!(window_in_flow.window_gain_bytes_states, 0); + assert_eq!(window_in_flow.window_gain_bytes_rows, 0); + + // [:00, :10): -> "GOOGL" AVG = 100; "ORCL" AVG = 200 + // + // [:05, :15): ("ORCL", 400), ("ORCL", 100), ("ORCL", 100), ("ORCL", 100) + // [:10, :20): ("ORCL", 100), ("ORCL", 100) + let (mut out, window_in_flow) = window.dispatch( + &expr_resolver, + Tuple::factory_trade( + SpringTimestamp::from_str("2020-01-01 00:00:11.000000000").unwrap(), + "ORCL", + 100, + ), + (), + ); + assert_eq!(out.len(), 2); + out.sort_by_key(|aggregated_and_grouping_values| { + sort_key(&group_by_label, aggregated_and_grouping_values) + }); + t_expect( + aggr_label, + group_by_label, + out.get(0).cloned().unwrap(), + "GOOGL", + 100, + ); + t_expect( + aggr_label, + group_by_label, + out.get(1).cloned().unwrap(), + "ORCL", + 200, + ); + assert_eq!(window_in_flow.window_gain_bytes_states, 0); + assert_eq!(window_in_flow.window_gain_bytes_rows, 0); + + // [:05, :15): -> "ORCL" = 175 + // [:10, :20): -> "ORCL" = 100 + // + // [:15, :25): ("ORCL", 100) + // [:20, :30): ("ORCL", 100) + let (out, window_in_flow) = window.dispatch( + &expr_resolver, + Tuple::factory_trade( + SpringTimestamp::from_str("2020-01-01 00:00:21.000000000").unwrap(), + "ORCL", + 100, + ), + (), + ); + assert_eq!(out.len(), 2); + t_expect( + aggr_label, + group_by_label, + out.get(0).cloned().unwrap(), + "ORCL", + 175, + ); + t_expect( + aggr_label, + group_by_label, + out.get(1).cloned().unwrap(), + "ORCL", + 100, + ); + assert_eq!(window_in_flow.window_gain_bytes_states, 0); + assert_eq!(window_in_flow.window_gain_bytes_rows, 0); + } + _ => unreachable!(), + } } #[test] @@ -379,156 +441,167 @@ mod tests { }, ]; - let (mut expr_resolver, _, aggr_labels_select_list) = ExprResolver::new(select_list); - - let group_by_expr = ValueExpr::factory_colref( - StreamName::fx_trade().as_ref(), - ColumnName::fx_ticker().as_ref(), - ); - let group_by_label = expr_resolver.register_value_expr(group_by_expr); - - let mut window = AggrWindow::new( - WindowParameter::TimedFixedWindow { - length: SpringEventDuration::from_secs(10), - allowed_delay: SpringEventDuration::from_secs(1), - }, - WindowOperationParameter::GroupAggregation(GroupAggregateParameter { - aggr_func: AggregateFunctionParameter::Avg, - aggr_expr: aggr_labels_select_list[0], - group_by: group_by_label, - }), - ); - - // [:00, :10): ("GOOGL", 100) - let (out, window_in_flow) = window.dispatch( - &expr_resolver, - Tuple::factory_trade( - SpringTimestamp::from_str("2020-01-01 00:00:00.000000000").unwrap(), - "GOOGL", - 100, - ), - (), - ); - assert!(out.is_empty()); - assert_eq!(window_in_flow.window_gain_bytes_states, 0); - assert_eq!(window_in_flow.window_gain_bytes_rows, 0); - - // [:00, :10): ("GOOGL", 100), ("ORCL", 100) - let (out, window_in_flow) = window.dispatch( - &expr_resolver, - Tuple::factory_trade( - SpringTimestamp::from_str("2020-01-01 00:00:09.000000000").unwrap(), - "ORCL", - 100, - ), - (), - ); - assert!(out.is_empty()); - assert_eq!(window_in_flow.window_gain_bytes_states, 0); - assert_eq!(window_in_flow.window_gain_bytes_rows, 0); - - // [:00, :10): ("GOOGL", 100), ("ORCL", 100), ("ORCL", 400) - let (out, window_in_flow) = window.dispatch( - &expr_resolver, - Tuple::factory_trade( - SpringTimestamp::from_str("2020-01-01 00:00:09.999999999").unwrap(), - "ORCL", - 400, - ), - (), - ); - assert!(out.is_empty()); - assert_eq!(window_in_flow.window_gain_bytes_states, 0); - assert_eq!(window_in_flow.window_gain_bytes_rows, 0); - - // [:00, :10): ("GOOGL", 100), ("ORCL", 100), ("ORCL", 400) <-- !!NOT CLOSED YET (within delay)!! - // [:10, :20): ("ORCL", 100) - let (out, window_in_flow) = window.dispatch( - &expr_resolver, - Tuple::factory_trade( - SpringTimestamp::from_str("2020-01-01 00:00:10.999999999").unwrap(), - "ORCL", - 100, - ), - (), - ); - assert!(out.is_empty()); - assert_eq!(window_in_flow.window_gain_bytes_states, 0); - assert_eq!(window_in_flow.window_gain_bytes_rows, 0); - - // too late data to be ignored - // - // [:00, :10): ("GOOGL", 100), ("ORCL", 100), ("ORCL", 400) - // [:10, :20): ("ORCL", 100) - let (out, window_in_flow) = window.dispatch( - &expr_resolver, - Tuple::factory_trade( - SpringTimestamp::from_str("2020-01-01 00:00:09.999999998").unwrap(), - "ORCL", - 100, - ), - (), - ); - assert!(out.is_empty()); - assert_eq!(window_in_flow.window_gain_bytes_states, 0); - assert_eq!(window_in_flow.window_gain_bytes_rows, 0); - - // [:00, :10): ("GOOGL", 100), ("ORCL", 100), ("ORCL", 400), ("ORCL", 100) <-- !!LATE DATA!! - // [:10, :20): ("ORCL", 100) - let (out, window_in_flow) = window.dispatch( - &expr_resolver, - Tuple::factory_trade( - SpringTimestamp::from_str("2020-01-01 00:00:09.9999999999").unwrap(), - "ORCL", - 100, - ), - (), - ); - assert!(out.is_empty()); - assert_eq!(window_in_flow.window_gain_bytes_states, 0); - assert_eq!(window_in_flow.window_gain_bytes_rows, 0); - - // [:00, :10): -> "GOOGL" AVG = 100; "ORCL" AVG = 200 - // - // [:10, :20): ("ORCL", 100), ("ORCL", 100) - let (mut out, window_in_flow) = window.dispatch( - &expr_resolver, - Tuple::factory_trade( - SpringTimestamp::from_str("2020-01-01 00:00:11.000000000").unwrap(), - "ORCL", - 100, - ), - (), - ); - assert_eq!(out.len(), 2); - out.sort_by_key(|group_aggr_out| { - group_aggr_out - .group_by_result - .clone() - .unwrap() - .unpack::() - .unwrap() - }); - t_expect(out.get(0).cloned().unwrap(), "GOOGL", 100); - t_expect(out.get(1).cloned().unwrap(), "ORCL", 200); - assert_eq!(window_in_flow.window_gain_bytes_states, 0); - assert_eq!(window_in_flow.window_gain_bytes_rows, 0); - - // [:10, :20): -> "ORCL" = 100 - // - // [:20, :30): ("ORCL", 100) - let (out, window_in_flow) = window.dispatch( - &expr_resolver, - Tuple::factory_trade( - SpringTimestamp::from_str("2020-01-01 00:00:21.000000000").unwrap(), - "ORCL", - 100, - ), - (), - ); - assert_eq!(out.len(), 1); - t_expect(out.get(0).cloned().unwrap(), "ORCL", 100); - assert_eq!(window_in_flow.window_gain_bytes_states, 0); - assert_eq!(window_in_flow.window_gain_bytes_rows, 0); + let (expr_resolver, labels) = ExprResolver::new(select_list); + match &labels[..] { + &[ExprLabel::Value(group_by_label), ExprLabel::Aggr(aggr_label)] => { + let mut window = AggrWindow::new( + WindowParameter::TimedFixedWindow { + length: SpringEventDuration::from_secs(10), + allowed_delay: SpringEventDuration::from_secs(1), + }, + WindowOperationParameter::GroupAggregation(GroupAggregateParameter { + aggr_func: AggregateFunctionParameter::Avg, + aggr_expr: aggr_label, + group_by: group_by_label, + }), + ); + + // [:00, :10): ("GOOGL", 100) + let (out, window_in_flow) = window.dispatch( + &expr_resolver, + Tuple::factory_trade( + SpringTimestamp::from_str("2020-01-01 00:00:00.000000000").unwrap(), + "GOOGL", + 100, + ), + (), + ); + assert!(out.is_empty()); + assert_eq!(window_in_flow.window_gain_bytes_states, 0); + assert_eq!(window_in_flow.window_gain_bytes_rows, 0); + + // [:00, :10): ("GOOGL", 100), ("ORCL", 100) + let (out, window_in_flow) = window.dispatch( + &expr_resolver, + Tuple::factory_trade( + SpringTimestamp::from_str("2020-01-01 00:00:09.000000000").unwrap(), + "ORCL", + 100, + ), + (), + ); + assert!(out.is_empty()); + assert_eq!(window_in_flow.window_gain_bytes_states, 0); + assert_eq!(window_in_flow.window_gain_bytes_rows, 0); + + // [:00, :10): ("GOOGL", 100), ("ORCL", 100), ("ORCL", 400) + let (out, window_in_flow) = window.dispatch( + &expr_resolver, + Tuple::factory_trade( + SpringTimestamp::from_str("2020-01-01 00:00:09.999999999").unwrap(), + "ORCL", + 400, + ), + (), + ); + assert!(out.is_empty()); + assert_eq!(window_in_flow.window_gain_bytes_states, 0); + assert_eq!(window_in_flow.window_gain_bytes_rows, 0); + + // [:00, :10): ("GOOGL", 100), ("ORCL", 100), ("ORCL", 400) <-- !!NOT CLOSED YET (within delay)!! + // [:10, :20): ("ORCL", 100) + let (out, window_in_flow) = window.dispatch( + &expr_resolver, + Tuple::factory_trade( + SpringTimestamp::from_str("2020-01-01 00:00:10.999999999").unwrap(), + "ORCL", + 100, + ), + (), + ); + assert!(out.is_empty()); + assert_eq!(window_in_flow.window_gain_bytes_states, 0); + assert_eq!(window_in_flow.window_gain_bytes_rows, 0); + + // too late data to be ignored + // + // [:00, :10): ("GOOGL", 100), ("ORCL", 100), ("ORCL", 400) + // [:10, :20): ("ORCL", 100) + let (out, window_in_flow) = window.dispatch( + &expr_resolver, + Tuple::factory_trade( + SpringTimestamp::from_str("2020-01-01 00:00:09.999999998").unwrap(), + "ORCL", + 100, + ), + (), + ); + assert!(out.is_empty()); + assert_eq!(window_in_flow.window_gain_bytes_states, 0); + assert_eq!(window_in_flow.window_gain_bytes_rows, 0); + + // [:00, :10): ("GOOGL", 100), ("ORCL", 100), ("ORCL", 400), ("ORCL", 100) <-- !!LATE DATA!! + // [:10, :20): ("ORCL", 100) + let (out, window_in_flow) = window.dispatch( + &expr_resolver, + Tuple::factory_trade( + SpringTimestamp::from_str("2020-01-01 00:00:09.9999999999").unwrap(), + "ORCL", + 100, + ), + (), + ); + assert!(out.is_empty()); + assert_eq!(window_in_flow.window_gain_bytes_states, 0); + assert_eq!(window_in_flow.window_gain_bytes_rows, 0); + + // [:00, :10): -> "GOOGL" AVG = 100; "ORCL" AVG = 200 + // + // [:10, :20): ("ORCL", 100), ("ORCL", 100) + let (mut out, window_in_flow) = window.dispatch( + &expr_resolver, + Tuple::factory_trade( + SpringTimestamp::from_str("2020-01-01 00:00:11.000000000").unwrap(), + "ORCL", + 100, + ), + (), + ); + assert_eq!(out.len(), 2); + out.sort_by_key(|aggregated_and_grouping_values| { + sort_key(&group_by_label, aggregated_and_grouping_values) + }); + t_expect( + aggr_label, + group_by_label, + out.get(0).cloned().unwrap(), + "GOOGL", + 100, + ); + t_expect( + aggr_label, + group_by_label, + out.get(1).cloned().unwrap(), + "ORCL", + 200, + ); + assert_eq!(window_in_flow.window_gain_bytes_states, 0); + assert_eq!(window_in_flow.window_gain_bytes_rows, 0); + + // [:10, :20): -> "ORCL" = 100 + // + // [:20, :30): ("ORCL", 100) + let (out, window_in_flow) = window.dispatch( + &expr_resolver, + Tuple::factory_trade( + SpringTimestamp::from_str("2020-01-01 00:00:21.000000000").unwrap(), + "ORCL", + 100, + ), + (), + ); + assert_eq!(out.len(), 1); + t_expect( + aggr_label, + group_by_label, + out.get(0).cloned().unwrap(), + "ORCL", + 100, + ); + assert_eq!(window_in_flow.window_gain_bytes_states, 0); + assert_eq!(window_in_flow.window_gain_bytes_rows, 0); + } + _ => unreachable!(), + } } } diff --git a/springql-core/src/stream_engine/autonomous_executor/task/window/join_window.rs b/springql-core/src/stream_engine/autonomous_executor/task/window/join_window.rs index a510b7bd..f91ec5ae 100644 --- a/springql-core/src/stream_engine/autonomous_executor/task/window/join_window.rs +++ b/springql-core/src/stream_engine/autonomous_executor/task/window/join_window.rs @@ -158,7 +158,7 @@ mod tests { }, ]; - let (mut expr_resolver, _, _) = ExprResolver::new(select_list); + let (mut expr_resolver, _) = ExprResolver::new(select_list); let on_expr_label = expr_resolver.register_value_expr(on_expr); diff --git a/springql-core/src/stream_engine/autonomous_executor/task/window/panes.rs b/springql-core/src/stream_engine/autonomous_executor/task/window/panes.rs index 0f50646c..c302aaee 100644 --- a/springql-core/src/stream_engine/autonomous_executor/task/window/panes.rs +++ b/springql-core/src/stream_engine/autonomous_executor/task/window/panes.rs @@ -45,7 +45,10 @@ where /// Then, return all panes to get a tuple with the `rowtime`. /// /// Caller must assure rowtime is not smaller than watermark. - pub(super) fn panes_to_dispatch(&mut self, rowtime: SpringTimestamp) -> impl Iterator { + pub(super) fn panes_to_dispatch( + &mut self, + rowtime: SpringTimestamp, + ) -> impl Iterator { self.generate_panes_if_not_exist(rowtime); self.panes @@ -137,7 +140,7 @@ mod tests { use std::str::FromStr; use crate::{ - expr_resolver::ExprResolver, + expr_resolver::{expr_label::ExprLabel, ExprResolver}, expression::{AggrExpr, ValueExpr}, pipeline::pump_model::window_operation_parameter::aggregate::{ AggregateFunctionParameter, GroupAggregateParameter, @@ -163,20 +166,27 @@ mod tests { aggr_expr, alias: None, }]; - let (mut expr_resolver, _, aggr_labels_select_list) = ExprResolver::new(select_list); + let (mut expr_resolver, labels) = ExprResolver::new(select_list); let group_by_label = expr_resolver.register_value_expr(group_by_expr); WindowOperationParameter::GroupAggregation(GroupAggregateParameter { aggr_func: AggregateFunctionParameter::Avg, - aggr_expr: aggr_labels_select_list[0], + aggr_expr: if let ExprLabel::Aggr(l) = labels[0] { + l + } else { + unreachable!() + }, group_by: group_by_label, }) } #[test] fn test_valid_open_at_s() { - fn sliding_window_panes(length: SpringEventDuration, period: SpringEventDuration) -> Panes { + fn sliding_window_panes( + length: SpringEventDuration, + period: SpringEventDuration, + ) -> Panes { Panes::new( WindowParameter::TimedSlidingWindow { length, @@ -187,30 +197,43 @@ mod tests { ) } - let panes = sliding_window_panes(SpringEventDuration::from_secs(10), SpringEventDuration::from_secs(5)); + let panes = sliding_window_panes( + SpringEventDuration::from_secs(10), + SpringEventDuration::from_secs(5), + ); assert_eq!( - panes.valid_open_at_s(SpringTimestamp::from_str("2020-01-01 00:00:05.000000000").unwrap()), + panes.valid_open_at_s( + SpringTimestamp::from_str("2020-01-01 00:00:05.000000000").unwrap() + ), vec![ SpringTimestamp::from_str("2020-01-01 00:00:00.000000000").unwrap(), SpringTimestamp::from_str("2020-01-01 00:00:05.000000000").unwrap() ] ); assert_eq!( - panes.valid_open_at_s(SpringTimestamp::from_str("2020-01-01 00:00:09.999999999").unwrap()), + panes.valid_open_at_s( + SpringTimestamp::from_str("2020-01-01 00:00:09.999999999").unwrap() + ), vec![ SpringTimestamp::from_str("2020-01-01 00:00:00.000000000").unwrap(), SpringTimestamp::from_str("2020-01-01 00:00:05.000000000").unwrap() ] ); - let panes = - sliding_window_panes(SpringEventDuration::from_secs(10), SpringEventDuration::from_secs(10)); + let panes = sliding_window_panes( + SpringEventDuration::from_secs(10), + SpringEventDuration::from_secs(10), + ); assert_eq!( - panes.valid_open_at_s(SpringTimestamp::from_str("2020-01-01 00:00:00.000000000").unwrap()), + panes.valid_open_at_s( + SpringTimestamp::from_str("2020-01-01 00:00:00.000000000").unwrap() + ), vec![SpringTimestamp::from_str("2020-01-01 00:00:00.000000000").unwrap(),] ); assert_eq!( - panes.valid_open_at_s(SpringTimestamp::from_str("2020-01-01 00:00:09.999999999").unwrap()), + panes.valid_open_at_s( + SpringTimestamp::from_str("2020-01-01 00:00:09.999999999").unwrap() + ), vec![SpringTimestamp::from_str("2020-01-01 00:00:00.000000000").unwrap(),] ); } diff --git a/springql-core/src/stream_engine/autonomous_executor/task/window/panes/pane/aggregate_pane.rs b/springql-core/src/stream_engine/autonomous_executor/task/window/panes/pane/aggregate_pane.rs index cd49faab..df96cffe 100644 --- a/springql-core/src/stream_engine/autonomous_executor/task/window/panes/pane/aggregate_pane.rs +++ b/springql-core/src/stream_engine/autonomous_executor/task/window/panes/pane/aggregate_pane.rs @@ -15,7 +15,7 @@ use crate::{ stream_engine::{ autonomous_executor::{ performance_metrics::metrics_update_command::metrics_update_by_task_execution::WindowInFlowByWindowTask, - task::{tuple::Tuple, window::aggregate::GroupAggrOut}, + task::{tuple::Tuple, window::aggregate::AggregatedAndGroupingValues}, }, time::timestamp::SpringTimestamp, NnSqlValue, SqlValue, @@ -37,13 +37,17 @@ pub(in crate::stream_engine::autonomous_executor) struct AggrPane { } impl Pane for AggrPane { - type CloseOut = GroupAggrOut; + type CloseOut = AggregatedAndGroupingValues; type DispatchArg = (); /// # Panics /// /// if `op_param` is not `GroupAggregateParameter` - fn new(open_at: SpringTimestamp, close_at: SpringTimestamp, op_param: WindowOperationParameter) -> Self { + fn new( + open_at: SpringTimestamp, + close_at: SpringTimestamp, + op_param: WindowOperationParameter, + ) -> Self { if let WindowOperationParameter::GroupAggregation(group_aggregation_parameter) = op_param { let inner = match group_aggregation_parameter.aggr_func { AggregateFunctionParameter::Avg => AggrPaneInner::Avg { @@ -120,21 +124,22 @@ impl Pane for AggrPane { match self.inner { AggrPaneInner::Avg { states } => { - let group_aggr_out_seq = states + let aggregated_and_grouping_values_seq = states .into_iter() .map(|(group_by, state)| { let aggr_value = SqlValue::NotNull(NnSqlValue::Float(OrderedFloat(state.finalize()))); - GroupAggrOut::new( - aggr_label, - aggr_value, - group_by_label, - SqlValue::NotNull(group_by), + AggregatedAndGroupingValues::new( + vec![(aggr_label, aggr_value)], + vec![(group_by_label, SqlValue::NotNull(group_by))], ) }) .collect(); - (group_aggr_out_seq, WindowInFlowByWindowTask::zero()) + ( + aggregated_and_grouping_values_seq, + WindowInFlowByWindowTask::zero(), + ) } } } diff --git a/springql-core/src/stream_engine/command/query_plan/query_plan_operation.rs b/springql-core/src/stream_engine/command/query_plan/query_plan_operation.rs index a5c79ba5..f559c3e9 100644 --- a/springql-core/src/stream_engine/command/query_plan/query_plan_operation.rs +++ b/springql-core/src/stream_engine/command/query_plan/query_plan_operation.rs @@ -1,7 +1,7 @@ // This file is part of https://github.com/SpringQL/SpringQL which is licensed under MIT OR Apache-2.0. See file LICENSE-MIT or LICENSE-APACHE for full license details. use crate::{ - expr_resolver::expr_label::{AggrExprLabel, ValueExprLabel}, + expr_resolver::expr_label::ExprLabel, pipeline::{ name::StreamName, pump_model::{ @@ -34,8 +34,7 @@ impl LowerOps { #[derive(Clone, PartialEq, Debug)] pub(crate) struct ProjectionOp { - pub(crate) value_expr_labels: Vec, - pub(crate) aggr_expr_labels: Vec, + pub(crate) expr_labels: Vec, } #[derive(Clone, PartialEq, Debug)] From f12481093e5971f5106024133751f9c0e91e4a09 Mon Sep 17 00:00:00 2001 From: Sho Nakatani Date: Wed, 11 May 2022 09:55:32 +0900 Subject: [PATCH 7/7] chore(lint): stop using Result::cloned() (unstable in 1.56.0 --- .../pump_subtask/query_subtask/projection_subtask.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/springql-core/src/stream_engine/autonomous_executor/task/pump_task/pump_subtask/query_subtask/projection_subtask.rs b/springql-core/src/stream_engine/autonomous_executor/task/pump_task/pump_subtask/query_subtask/projection_subtask.rs index 2b838ff8..0f2d3680 100644 --- a/springql-core/src/stream_engine/autonomous_executor/task/pump_task/pump_subtask/query_subtask/projection_subtask.rs +++ b/springql-core/src/stream_engine/autonomous_executor/task/pump_task/pump_subtask/query_subtask/projection_subtask.rs @@ -45,15 +45,15 @@ impl ProjectionSubtask { .exprs .iter() .map(|label| { - match label { + let value = match label { ExprLabel::Value(group_by_value_label) => { aggregated_and_grouping_values.get_group_by_value(group_by_value_label) } ExprLabel::Aggr(aggr_label) => { aggregated_and_grouping_values.get_aggregated_value(aggr_label) } - } - .cloned() + }?; + Ok(value.clone()) }) .collect::>>()?;