From 94fb07522d10a2840652ac0f9b1ff593649119b3 Mon Sep 17 00:00:00 2001 From: Sho Nakatani Date: Sun, 8 May 2022 08:56:05 +0900 Subject: [PATCH 1/7] test(fix): fix low level API test to use SpringPipeline --- springql-core/tests/e2e_low_level_rs.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/springql-core/tests/e2e_low_level_rs.rs b/springql-core/tests/e2e_low_level_rs.rs index 42859cc9..d1bb3728 100644 --- a/springql-core/tests/e2e_low_level_rs.rs +++ b/springql-core/tests/e2e_low_level_rs.rs @@ -86,7 +86,7 @@ fn test_e2e_source_sink() -> Result<()> { ), ]; - let _pipeline = apply_ddls(&ddls, spring_config_default()); + let _pipeline = apply_ddls_low_level(&ddls, spring_config_default()); test_source.start(ForeignSourceInput::new_fifo_batch(source_input.clone())); let sink_received = drain_from_sink(&test_sink); @@ -159,7 +159,7 @@ fn test_e2e_projection() -> Result<()> { ), ]; - let _pipeline = apply_ddls(&ddls, spring_config_default()); + let _pipeline = apply_ddls_low_level(&ddls, spring_config_default()); test_source.start(ForeignSourceInput::new_fifo_batch(vec![json_oracle])); let sink_received = drain_from_sink(&test_sink); From ef2206d427f482c257fb9e6ed9b516310a861f1c Mon Sep 17 00:00:00 2001 From: Sho Nakatani Date: Sun, 8 May 2022 09:05:25 +0900 Subject: [PATCH 2/7] test: add high-level API test (including not implemented API) --- springql-core/tests/e2e_high_level_rs.rs | 253 +++++++++++++++++++++++ 1 file changed, 253 insertions(+) create mode 100644 springql-core/tests/e2e_high_level_rs.rs diff --git a/springql-core/tests/e2e_high_level_rs.rs b/springql-core/tests/e2e_high_level_rs.rs new file mode 100644 index 00000000..2499ce5d --- /dev/null +++ b/springql-core/tests/e2e_high_level_rs.rs @@ -0,0 +1,253 @@ +// This file is part of https://github.com/SpringQL/SpringQL which is licensed under MIT OR Apache-2.0. See file LICENSE-MIT or LICENSE-APACHE for full license details. + +mod test_support; + +use pretty_assertions::assert_eq; +use serde_json::json; +use springql_core::error::Result; +use springql_core::low_level_rs::SpringConfig; +use springql_foreign_service::sink::ForeignSink; +use springql_foreign_service::source::source_input::ForeignSourceInput; +use springql_foreign_service::source::ForeignSource; +use springql_test_logger::setup_test_logger; + +use crate::test_support::*; + +#[test] +fn test_e2e_source_sink() -> Result<()> { + setup_test_logger(); + + let json_oracle = json!({ + "ts": "2021-11-04 23:02:52.123456789", + "ticker": "ORCL", + "amount": 20, + }); + let json_ibm = json!({ + "ts": "2021-11-04 23:03:29.123456789", + "ticker": "IBM", + "amount": 30, + }); + let json_google = json!({ + "ts": "2021-11-04 23:03:42.123456789", + "ticker": "GOOGL", + "amount": 100, + }); + let source_input = vec![json_oracle, json_ibm, json_google]; + + let test_source = ForeignSource::new().unwrap(); + let test_sink = ForeignSink::start().unwrap(); + + let ddls = vec![ + " + CREATE SOURCE STREAM source_trade ( + ts TIMESTAMP NOT NULL ROWTIME, + ticker TEXT NOT NULL, + amount INTEGER NOT NULL + ); + " + .to_string(), + " + CREATE SINK STREAM sink_trade ( + ts TIMESTAMP NOT NULL ROWTIME, + ticker TEXT NOT NULL, + amount INTEGER NOT NULL + ); + " + .to_string(), + " + CREATE PUMP pu_passthrough AS + INSERT INTO sink_trade (ts, ticker, amount) + SELECT STREAM source_trade.ts, source_trade.ticker, source_trade.amount FROM source_trade; + " + .to_string(), + format!( + " + CREATE SINK WRITER tcp_sink_trade FOR sink_trade + TYPE NET_CLIENT OPTIONS ( + PROTOCOL 'TCP', + REMOTE_HOST '{remote_host}', + REMOTE_PORT '{remote_port}' + ); + ", + remote_host = test_sink.host_ip(), + remote_port = test_sink.port() + ), + format!( + " + CREATE SOURCE READER tcp_trade FOR source_trade + TYPE NET_CLIENT OPTIONS ( + PROTOCOL 'TCP', + REMOTE_HOST '{remote_host}', + REMOTE_PORT '{remote_port}' + ); + ", + remote_host = test_source.host_ip(), + remote_port = test_source.port() + ), + ]; + + let _pipeline = apply_ddls(&ddls, SpringConfig::default()); + test_source.start(ForeignSourceInput::new_fifo_batch(source_input.clone())); + let sink_received = drain_from_sink(&test_sink); + + // because worker takes source input in multi-thread, order may be changed + assert!(sink_received.contains(source_input.get(0).unwrap())); + assert!(sink_received.contains(source_input.get(1).unwrap())); + assert!(sink_received.contains(source_input.get(2).unwrap())); + + Ok(()) +} + +#[test] +fn test_e2e_projection() -> Result<()> { + setup_test_logger(); + + let json_oracle = json!({ + "ts": "2021-11-04 23:02:52.123456789", + "ticker": "ORCL", + "amount": 20, + }); + + let test_source = ForeignSource::new().unwrap(); + let test_sink = ForeignSink::start().unwrap(); + + let ddls = vec![ + " + CREATE SOURCE STREAM source_trade ( + ts TIMESTAMP NOT NULL ROWTIME, + ticker TEXT NOT NULL, + amount INTEGER NOT NULL + ); + " + .to_string(), + " + CREATE SINK STREAM sink_trade ( + ts TIMESTAMP NOT NULL ROWTIME, + ticker TEXT NOT NULL + ); + " + .to_string(), + " + CREATE PUMP pu_projection AS + INSERT INTO sink_trade (ts, ticker) + SELECT STREAM source_trade.ts, source_trade.ticker FROM source_trade; + " + .to_string(), + format!( + " + CREATE SINK WRITER tcp_sink_trade FOR sink_trade + TYPE NET_CLIENT OPTIONS ( + PROTOCOL 'TCP', + REMOTE_HOST '{remote_host}', + REMOTE_PORT '{remote_port}' + ); + ", + remote_host = test_sink.host_ip(), + remote_port = test_sink.port() + ), + format!( + " + CREATE SOURCE READER tcp_trade FOR source_trade + TYPE NET_CLIENT OPTIONS ( + PROTOCOL 'TCP', + REMOTE_HOST '{remote_host}', + REMOTE_PORT '{remote_port}' + ); + ", + remote_host = test_source.host_ip(), + remote_port = test_source.port() + ), + ]; + + let _pipeline = apply_ddls(&ddls, SpringConfig::default()); + test_source.start(ForeignSourceInput::new_fifo_batch(vec![json_oracle])); + let sink_received = drain_from_sink(&test_sink); + + assert_eq!( + sink_received.get(0).unwrap(), + &json!({ + "ts": "2021-11-04 23:02:52.123456789", + "ticker": "ORCL" + }) + ); + + Ok(()) +} + +#[test] +fn test_e2e_pop_from_in_memory_queue() { + setup_test_logger(); + + let queue_name = "queue_trade"; + let ts = "2021-11-04 23:02:52.123456789"; + let ticker = "ORCL"; + let amount = 20; + + let json_oracle = json!({ + "ts": ts, + "ticker": ticker, + "amount": amount, + }); + let trade_times = 5; + + let test_source = ForeignSource::new().unwrap(); + + let ddls = vec![ + " + CREATE SOURCE STREAM source_trade ( + ts TIMESTAMP NOT NULL ROWTIME, + ticker TEXT NOT NULL, + amount INTEGER NOT NULL + ); + " + .to_string(), + " + CREATE SINK STREAM sink_trade ( + ts TIMESTAMP NOT NULL, + amount INTEGER NOT NULL + ); + " + .to_string(), + " + CREATE PUMP pu_projection AS + INSERT INTO sink_trade (ts, amount) + SELECT STREAM source_trade.ts, source_trade.amount FROM source_trade; + " + .to_string(), + format!( + " + CREATE SINK WRITER queue_sink_trade FOR sink_trade + TYPE IN_MEMORY_QUEUE OPTIONS ( + NAME '{queue_name}' + ); + ", + queue_name = queue_name, + ), + format!( + " + CREATE SOURCE READER tcp_trade FOR source_trade + TYPE NET_CLIENT OPTIONS ( + PROTOCOL 'TCP', + REMOTE_HOST '{remote_host}', + REMOTE_PORT '{remote_port}' + ); + ", + remote_host = test_source.host_ip(), + remote_port = test_source.port() + ), + ]; + + let pipeline = apply_ddls(&ddls, SpringConfig::default()); + test_source.start(ForeignSourceInput::new_fifo_batch( + (0..trade_times) + .into_iter() + .map(|_| json_oracle.clone()) + .collect(), + )); + + for _ in 0..trade_times { + let row = pipeline.pop(queue_name).unwrap(); + assert_eq!(row.get(0).unwrap(), ts); + assert_eq!(row.get(1).unwrap(), amount); + } +} From 0dfc1f05a7ffc5da451b14c7f4f57db602bcac18 Mon Sep 17 00:00:00 2001 From: Sho Nakatani Date: Mon, 9 May 2022 05:45:56 +0900 Subject: [PATCH 3/7] wip --- springql-core/src/api/error.rs | 10 +++++ springql-core/src/api/high_level_rs.rs | 44 ++++++++++++++++++- springql-core/src/api/low_level_rs.rs | 2 +- .../src/stream_engine/autonomous_executor.rs | 1 + .../row/foreign_row/sink_row.rs | 5 +++ .../row/value/sql_convertible.rs | 4 +- springql-core/tests/e2e_high_level_rs.rs | 4 +- 7 files changed, 63 insertions(+), 7 deletions(-) diff --git a/springql-core/src/api/error.rs b/springql-core/src/api/error.rs index f3b72c6d..f94743b7 100644 --- a/springql-core/src/api/error.rs +++ b/springql-core/src/api/error.rs @@ -6,6 +6,8 @@ pub mod foreign_info; use thiserror::Error; +use crate::{pipeline::name::StreamName, stream_engine::SinkRow}; + use self::foreign_info::ForeignInfo; /// Result type @@ -60,4 +62,12 @@ pub enum SpringError { #[error("SQL error")] Sql(anyhow::Error), + + /// Occurs only when a value is fetched from a SpringRow. + #[error("unexpectedly got NULL")] + Null { + stream_name: StreamName, + /// Column index + i_col: usize, + }, } diff --git a/springql-core/src/api/high_level_rs.rs b/springql-core/src/api/high_level_rs.rs index 2719ef94..bc2ebddd 100644 --- a/springql-core/src/api/high_level_rs.rs +++ b/springql-core/src/api/high_level_rs.rs @@ -3,8 +3,9 @@ //! High-level Rust API to execute / register SpringQL from Rust. use crate::{ - error::Result, - low_level_rs::{spring_command, spring_open, SpringConfig, SpringPipeline}, + error::{Result, SpringError}, + low_level_rs::{spring_command, spring_open, spring_pop, SpringConfig, SpringPipeline}, + stream_engine::{SinkRow, SqlConvertible, SqlValue}, }; /// Pipeline. @@ -31,6 +32,45 @@ impl SpringPipelineHL { pub fn command>(&self, sql: S) -> Result<()> { spring_command(&self.0, sql.as_ref()) } + + /// Pop a row from an in memory queue. This is a blocking function. + /// + /// # Failure + /// + /// - [SpringError::Unavailable](crate::error::SpringError::Unavailable) when: + /// - queue named `queue` does not exist. + pub fn pop(&self, queue: &str) -> Result { + spring_pop(&self.0, queue).map(|row| SpringRowHL(row.0)) + } +} + +/// Row object from an in memory queue. +#[derive(Debug)] +pub struct SpringRowHL(SinkRow); + +impl SpringRowHL { + /// Get a i-th column value from the row. + /// + /// # Failure + /// + /// - [SpringError::Sql](crate::error::SpringError::Sql) when: + /// - Column index out of range + /// - [SpringError::Null](crate::error::SpringError::Null) when: + /// - Column value is NULL + pub fn get_not_null_by_index(&self, i_col: usize) -> Result + where + T: SqlConvertible, // TODO use this function in spring_column_*() + { + let sql_value = self.0.get_by_index(i_col)?; + + match sql_value { + SqlValue::Null => Err(SpringError::Null { + stream_name: self.0.stream_name().clone(), + i_col, + }), + SqlValue::NotNull(nn_sql_value) => nn_sql_value.unpack(), + } + } } impl SpringConfig { diff --git a/springql-core/src/api/low_level_rs.rs b/springql-core/src/api/low_level_rs.rs index 1f360914..1c8ecf79 100644 --- a/springql-core/src/api/low_level_rs.rs +++ b/springql-core/src/api/low_level_rs.rs @@ -44,7 +44,7 @@ pub struct SpringPipeline { /// Row object from an in memory queue. #[derive(Debug)] -pub struct SpringRow(SinkRow); +pub struct SpringRow(pub(in crate::api) SinkRow); impl From for SpringRow { fn from(sink_row: SinkRow) -> Self { diff --git a/springql-core/src/stream_engine/autonomous_executor.rs b/springql-core/src/stream_engine/autonomous_executor.rs index 8b0bde25..c53c62ab 100644 --- a/springql-core/src/stream_engine/autonomous_executor.rs +++ b/springql-core/src/stream_engine/autonomous_executor.rs @@ -160,6 +160,7 @@ impl AutonomousExecutor { | SpringError::ThreadPoisoned(_) => log::error!("{:?}", e), SpringError::InvalidConfig { .. } => unreachable!("must be handled on startup"), + SpringError::Null { .. } => unreachable!("must be handled on startup"), } } } diff --git a/springql-core/src/stream_engine/autonomous_executor/row/foreign_row/sink_row.rs b/springql-core/src/stream_engine/autonomous_executor/row/foreign_row/sink_row.rs index 28686dae..e95c0f96 100644 --- a/springql-core/src/stream_engine/autonomous_executor/row/foreign_row/sink_row.rs +++ b/springql-core/src/stream_engine/autonomous_executor/row/foreign_row/sink_row.rs @@ -2,6 +2,7 @@ use super::format::json::JsonObject; use crate::error::Result; +use crate::pipeline::name::StreamName; use crate::stream_engine::autonomous_executor::row::{value::sql_value::SqlValue, Row}; /// Output row into foreign systems (retrieved by SinkWriter). @@ -36,6 +37,10 @@ impl SinkRow { pub(crate) fn get_by_index(&self, i_col: usize) -> Result<&SqlValue> { self.0.get_by_index(i_col) } + + pub(crate) fn stream_name(&self) -> &StreamName { + self.0.stream_model().name() + } } #[cfg(test)] diff --git a/springql-core/src/stream_engine/autonomous_executor/row/value/sql_convertible.rs b/springql-core/src/stream_engine/autonomous_executor/row/value/sql_convertible.rs index a09b43e0..9aeac65d 100644 --- a/springql-core/src/stream_engine/autonomous_executor/row/value/sql_convertible.rs +++ b/springql-core/src/stream_engine/autonomous_executor/row/value/sql_convertible.rs @@ -17,9 +17,9 @@ use std::any::type_name; use super::sql_value::nn_sql_value::NnSqlValue; /// Rust values which can have bidirectional mapping to/from SQL [NnSqlValue](crate::NnSqlValue). -pub(crate) trait SqlConvertible: Sized { +pub trait SqlConvertible: Sized { /// Convert Rust type into strictly-matching SQL type. - fn into_sql_value(self) -> NnSqlValue; + fn into_sql_value(self) -> NnSqlValue; // TODO separate this. Sealed trait? /// # Failures /// diff --git a/springql-core/tests/e2e_high_level_rs.rs b/springql-core/tests/e2e_high_level_rs.rs index 2499ce5d..099480f2 100644 --- a/springql-core/tests/e2e_high_level_rs.rs +++ b/springql-core/tests/e2e_high_level_rs.rs @@ -247,7 +247,7 @@ fn test_e2e_pop_from_in_memory_queue() { for _ in 0..trade_times { let row = pipeline.pop(queue_name).unwrap(); - assert_eq!(row.get(0).unwrap(), ts); - assert_eq!(row.get(1).unwrap(), amount); + assert_eq!(row.get_not_null_by_index(0).unwrap(), ts); + assert_eq!(row.get_not_null_by_index(1).unwrap(), amount); } } From a4521c3ed410eaba4f57baa4658bb460600481bd Mon Sep 17 00:00:00 2001 From: Sho Nakatani Date: Mon, 9 May 2022 07:05:51 +0900 Subject: [PATCH 4/7] feat: make SpringValue into public --- springql-core/src/api.rs | 2 ++ springql-core/src/stream_engine.rs | 7 ++++--- springql-core/src/stream_engine/autonomous_executor.rs | 2 ++ springql-core/src/stream_engine/autonomous_executor/row.rs | 2 ++ .../src/stream_engine/autonomous_executor/row/value.rs | 2 ++ .../autonomous_executor/row/value/sql_convertible.rs | 2 +- 6 files changed, 13 insertions(+), 4 deletions(-) diff --git a/springql-core/src/api.rs b/springql-core/src/api.rs index 1e313935..cf612479 100644 --- a/springql-core/src/api.rs +++ b/springql-core/src/api.rs @@ -3,3 +3,5 @@ pub mod error; pub mod high_level_rs; pub mod low_level_rs; + +pub use crate::stream_engine::SpringValue; diff --git a/springql-core/src/stream_engine.rs b/springql-core/src/stream_engine.rs index 5fa929a4..17dbb471 100644 --- a/springql-core/src/stream_engine.rs +++ b/springql-core/src/stream_engine.rs @@ -26,6 +26,8 @@ //! //! ![Communication between entities](https://raw.githubusercontent.com/SpringQL/SpringQL.github.io/main/static/img/stream-engine-architecture-communication.svg) +pub use autonomous_executor::SpringValue; + pub(crate) mod command; pub(crate) mod time; @@ -34,9 +36,8 @@ mod in_memory_queue_repository; mod sql_executor; pub(crate) use autonomous_executor::{ - row::value::{ - sql_convertible::SpringValue, - sql_value::{nn_sql_value::NnSqlValue, sql_compare_result::SqlCompareResult, SqlValue}, + row::value::sql_value::{ + nn_sql_value::NnSqlValue, sql_compare_result::SqlCompareResult, SqlValue, }, task::tuple::Tuple, SinkRow, diff --git a/springql-core/src/stream_engine/autonomous_executor.rs b/springql-core/src/stream_engine/autonomous_executor.rs index c53c62ab..bbee8870 100644 --- a/springql-core/src/stream_engine/autonomous_executor.rs +++ b/springql-core/src/stream_engine/autonomous_executor.rs @@ -1,5 +1,7 @@ // This file is part of https://github.com/SpringQL/SpringQL which is licensed under MIT OR Apache-2.0. See file LICENSE-MIT or LICENSE-APACHE for full license details. +pub use row::SpringValue; + pub(crate) mod row; pub(crate) mod task; diff --git a/springql-core/src/stream_engine/autonomous_executor/row.rs b/springql-core/src/stream_engine/autonomous_executor/row.rs index 9c4c3940..4726464b 100644 --- a/springql-core/src/stream_engine/autonomous_executor/row.rs +++ b/springql-core/src/stream_engine/autonomous_executor/row.rs @@ -1,5 +1,7 @@ // This file is part of https://github.com/SpringQL/SpringQL which is licensed under MIT OR Apache-2.0. See file LICENSE-MIT or LICENSE-APACHE for full license details. +pub use value::SpringValue; + pub(crate) mod value; pub(in crate::stream_engine::autonomous_executor) mod column; diff --git a/springql-core/src/stream_engine/autonomous_executor/row/value.rs b/springql-core/src/stream_engine/autonomous_executor/row/value.rs index c63d016f..52900ee8 100644 --- a/springql-core/src/stream_engine/autonomous_executor/row/value.rs +++ b/springql-core/src/stream_engine/autonomous_executor/row/value.rs @@ -2,3 +2,5 @@ pub(crate) mod sql_convertible; pub(crate) mod sql_value; + +pub use sql_convertible::SpringValue; diff --git a/springql-core/src/stream_engine/autonomous_executor/row/value/sql_convertible.rs b/springql-core/src/stream_engine/autonomous_executor/row/value/sql_convertible.rs index e4f7984d..10c6b4af 100644 --- a/springql-core/src/stream_engine/autonomous_executor/row/value/sql_convertible.rs +++ b/springql-core/src/stream_engine/autonomous_executor/row/value/sql_convertible.rs @@ -17,7 +17,7 @@ use std::any::type_name; use super::sql_value::nn_sql_value::NnSqlValue; /// Rust values can be unpacked from NnSqlValue back into them. -pub(crate) trait SpringValue: Sized { +pub trait SpringValue: Sized { /// # Failures /// /// - [SpringError::Sql](crate::error::SpringError::Sql) when: From 980efb60d639e37a362b1e6a426459c7a1260823 Mon Sep 17 00:00:00 2001 From: Sho Nakatani Date: Mon, 9 May 2022 07:11:13 +0900 Subject: [PATCH 5/7] feat: make SpringTimestamp and SpringEventDuration public --- springql-core/src/api.rs | 2 + springql-core/src/expr_resolver.rs | 4 +- springql-core/src/expression.rs | 6 +-- .../pipeline/pump_model/window_parameter.rs | 18 ++++---- .../sql_parser/pest_parser_impl.rs | 6 +-- .../stream_engine/autonomous_executor/row.rs | 6 +-- .../row/column/stream_column.rs | 14 +++--- .../row/foreign_row/sink_row.rs | 6 +-- .../row/value/sql_convertible.rs | 6 +-- .../value/sql_convertible/event_duration.rs | 8 ++-- .../row/value/sql_convertible/text.rs | 4 +- .../row/value/sql_convertible/timestamp.rs | 8 ++-- .../row/value/sql_value.rs | 6 +-- .../row/value/sql_value/nn_sql_value.rs | 26 +++++------ .../autonomous_executor/task/tuple.rs | 8 ++-- .../task/window/aggregate.rs | 46 +++++++++---------- .../task/window/join_window.rs | 32 ++++++------- .../autonomous_executor/task/window/panes.rs | 40 ++++++++-------- .../task/window/panes/pane.rs | 10 ++-- .../task/window/panes/pane/aggregate_pane.rs | 12 ++--- .../task/window/panes/pane/join_pane.rs | 14 +++--- .../task/window/watermark.rs | 14 +++--- .../test_support/factory.rs | 14 +++--- .../test_support/fixture.rs | 28 +++++------ .../time/duration/event_duration.rs | 22 ++++----- .../wall_clock_stopwatch.rs | 4 +- .../src/stream_engine/time/timestamp.rs | 44 +++++++++--------- .../time/timestamp/system_timestamp.rs | 6 +-- 28 files changed, 208 insertions(+), 206 deletions(-) diff --git a/springql-core/src/api.rs b/springql-core/src/api.rs index cf612479..1dc064b2 100644 --- a/springql-core/src/api.rs +++ b/springql-core/src/api.rs @@ -4,4 +4,6 @@ pub mod error; pub mod high_level_rs; pub mod low_level_rs; +pub use crate::stream_engine::time::duration::event_duration::SpringEventDuration; +pub use crate::stream_engine::time::timestamp::SpringTimestamp; pub use crate::stream_engine::SpringValue; diff --git a/springql-core/src/expr_resolver.rs b/springql-core/src/expr_resolver.rs index ae0a634b..ac783c5f 100644 --- a/springql-core/src/expr_resolver.rs +++ b/springql-core/src/expr_resolver.rs @@ -187,7 +187,7 @@ impl ExprResolver { #[cfg(test)] mod tests { - use crate::{expression::ValueExpr, stream_engine::time::timestamp::Timestamp}; + use crate::{expression::ValueExpr, stream_engine::time::timestamp::SpringTimestamp}; use super::*; @@ -228,7 +228,7 @@ mod tests { ValueExpr::factory_integer(3), )); - let empty_tuple = Tuple::new(Timestamp::fx_ts1(), vec![]); + let empty_tuple = Tuple::new(SpringTimestamp::fx_ts1(), vec![]); assert_eq!( resolver diff --git a/springql-core/src/expression.rs b/springql-core/src/expression.rs index e0545395..280acd35 100644 --- a/springql-core/src/expression.rs +++ b/springql-core/src/expression.rs @@ -21,7 +21,7 @@ use crate::{ pump_model::window_operation_parameter::aggregate::AggregateFunctionParameter, }, stream_engine::{ - time::duration::{event_duration::EventDuration, SpringDuration}, + time::duration::{event_duration::SpringEventDuration, SpringDuration}, NnSqlValue, SqlCompareResult, SqlValue, Tuple, }, }; @@ -261,7 +261,7 @@ impl ValueExprPh2 { let duration_value = duration_millis.eval()?; let duration_millis = duration_value.to_i64()?; if duration_millis >= 0 { - let duration = EventDuration::from_millis(duration_millis as u64); + let duration = SpringEventDuration::from_millis(duration_millis as u64); Ok(SqlValue::NotNull(NnSqlValue::Duration(duration))) } else { Err(SpringError::Sql(anyhow!( @@ -274,7 +274,7 @@ impl ValueExprPh2 { let duration_value = duration_secs.eval()?; let duration_secs = duration_value.to_i64()?; if duration_secs >= 0 { - let duration = EventDuration::from_secs(duration_secs as u64); + let duration = SpringEventDuration::from_secs(duration_secs as u64); Ok(SqlValue::NotNull(NnSqlValue::Duration(duration))) } else { Err(SpringError::Sql(anyhow!( diff --git a/springql-core/src/pipeline/pump_model/window_parameter.rs b/springql-core/src/pipeline/pump_model/window_parameter.rs index 0d6bbef2..2b1ef14c 100644 --- a/springql-core/src/pipeline/pump_model/window_parameter.rs +++ b/springql-core/src/pipeline/pump_model/window_parameter.rs @@ -1,6 +1,6 @@ // This file is part of https://github.com/SpringQL/SpringQL which is licensed under MIT OR Apache-2.0. See file LICENSE-MIT or LICENSE-APACHE for full license details. -use crate::stream_engine::time::duration::event_duration::EventDuration; +use crate::stream_engine::time::duration::event_duration::SpringEventDuration; /// Window parameters #[derive(Clone, Eq, PartialEq, Debug)] @@ -18,9 +18,9 @@ pub(crate) enum WindowParameter { /// :00 :05 :10 :15 :20 /// ``` TimedSlidingWindow { - length: EventDuration, - period: EventDuration, - allowed_delay: EventDuration, + length: SpringEventDuration, + period: SpringEventDuration, + allowed_delay: SpringEventDuration, }, /// Time-based fixed window @@ -36,27 +36,27 @@ pub(crate) enum WindowParameter { /// :00 :05 :10 :15 :20 /// ``` TimedFixedWindow { - length: EventDuration, - allowed_delay: EventDuration, + length: SpringEventDuration, + allowed_delay: SpringEventDuration, }, } impl WindowParameter { - pub(crate) fn length(&self) -> EventDuration { + pub(crate) fn length(&self) -> SpringEventDuration { match self { WindowParameter::TimedSlidingWindow { length, .. } => *length, WindowParameter::TimedFixedWindow { length, .. } => *length, } } - pub(crate) fn period(&self) -> EventDuration { + pub(crate) fn period(&self) -> SpringEventDuration { match self { WindowParameter::TimedSlidingWindow { period, .. } => *period, WindowParameter::TimedFixedWindow { length, .. } => *length, } } - pub(crate) fn allowed_delay(&self) -> EventDuration { + pub(crate) fn allowed_delay(&self) -> SpringEventDuration { match self { WindowParameter::TimedSlidingWindow { allowed_delay, .. } => *allowed_delay, WindowParameter::TimedFixedWindow { allowed_delay, .. } => *allowed_delay, diff --git a/springql-core/src/sql_processor/sql_parser/pest_parser_impl.rs b/springql-core/src/sql_processor/sql_parser/pest_parser_impl.rs index ea184963..93a4861a 100644 --- a/springql-core/src/sql_processor/sql_parser/pest_parser_impl.rs +++ b/springql-core/src/sql_processor/sql_parser/pest_parser_impl.rs @@ -34,7 +34,7 @@ use crate::sql_processor::sql_parser::syntax::{ ColumnConstraintSyntax, OptionSyntax, SelectStreamSyntax, }; use crate::stream_engine::command::insert_plan::InsertPlan; -use crate::stream_engine::time::duration::event_duration::EventDuration; +use crate::stream_engine::time::duration::event_duration::SpringEventDuration; use crate::stream_engine::time::duration::SpringDuration; use crate::stream_engine::{NnSqlValue, SqlValue}; use anyhow::{anyhow, Context}; @@ -211,10 +211,10 @@ impl PestParserImpl { let event_duration = match duration_function { DurationFunction::Millis => { - Ok(EventDuration::from_millis(integer_constant.to_i64()? as u64)) + Ok(SpringEventDuration::from_millis(integer_constant.to_i64()? as u64)) } DurationFunction::Secs => { - Ok(EventDuration::from_secs(integer_constant.to_i64()? as u64)) + Ok(SpringEventDuration::from_secs(integer_constant.to_i64()? as u64)) } }?; diff --git a/springql-core/src/stream_engine/autonomous_executor/row.rs b/springql-core/src/stream_engine/autonomous_executor/row.rs index 4726464b..0bc71c01 100644 --- a/springql-core/src/stream_engine/autonomous_executor/row.rs +++ b/springql-core/src/stream_engine/autonomous_executor/row.rs @@ -19,14 +19,14 @@ use crate::pipeline::name::ColumnName; use crate::pipeline::stream_model::StreamModel; use crate::stream_engine::autonomous_executor::row::value::sql_value::nn_sql_value::NnSqlValue; use crate::stream_engine::time::timestamp::system_timestamp::SystemTimestamp; -use crate::stream_engine::time::timestamp::Timestamp; +use crate::stream_engine::time::timestamp::SpringTimestamp; /// - Mandatory `rowtime()`, either from `cols` or `arrival_rowtime`. /// - PartialEq by all columns (NULL prevents Eq). /// - PartialOrd by timestamp. #[derive(Clone, PartialEq, Debug)] pub(crate) struct Row { - arrival_rowtime: Option, + arrival_rowtime: Option, /// Columns cols: StreamColumns, @@ -56,7 +56,7 @@ impl Row { /// /// - (default) Arrival time to a stream. /// - Promoted from a column in a stream. - pub(in crate::stream_engine::autonomous_executor) fn rowtime(&self) -> Timestamp { + pub(in crate::stream_engine::autonomous_executor) fn rowtime(&self) -> SpringTimestamp { self.arrival_rowtime.unwrap_or_else(|| { self.cols .promoted_rowtime() diff --git a/springql-core/src/stream_engine/autonomous_executor/row/column/stream_column.rs b/springql-core/src/stream_engine/autonomous_executor/row/column/stream_column.rs index 38c40d6d..fa78965f 100644 --- a/springql-core/src/stream_engine/autonomous_executor/row/column/stream_column.rs +++ b/springql-core/src/stream_engine/autonomous_executor/row/column/stream_column.rs @@ -9,7 +9,7 @@ use crate::{ pipeline::{relation::column::column_definition::ColumnDefinition, stream_model::StreamModel}, stream_engine::{ autonomous_executor::row::{column_values::ColumnValues, value::sql_value::SqlValue}, - time::timestamp::Timestamp, + time::timestamp::SpringTimestamp, }, }; use std::{sync::Arc, vec}; @@ -67,7 +67,7 @@ impl StreamColumns { pub(in crate::stream_engine::autonomous_executor) fn promoted_rowtime( &self, - ) -> Option { + ) -> Option { let rowtime_col = self.stream_model.shape().promoted_rowtime()?; let rowtime_sql_value = self .get_by_column_name(rowtime_col) @@ -173,7 +173,7 @@ impl IntoIterator for StreamColumns { mod tests { use crate::stream_engine::{ autonomous_executor::row::value::sql_value::nn_sql_value::NnSqlValue, - time::timestamp::Timestamp, + time::timestamp::SpringTimestamp, }; use super::*; @@ -184,7 +184,7 @@ mod tests { column_values .insert( ColumnName::fx_timestamp(), - SqlValue::NotNull(NnSqlValue::Timestamp(Timestamp::fx_ts1())), + SqlValue::NotNull(NnSqlValue::Timestamp(SpringTimestamp::fx_ts1())), ) .unwrap(); column_values @@ -210,7 +210,7 @@ mod tests { column_values .insert( ColumnName::new("timestamp".to_string()), - SqlValue::NotNull(NnSqlValue::Timestamp(Timestamp::fx_ts1())), + SqlValue::NotNull(NnSqlValue::Timestamp(SpringTimestamp::fx_ts1())), ) .unwrap(); column_values @@ -234,7 +234,7 @@ mod tests { column_values .insert( ColumnName::new("timestamp".to_string()), - SqlValue::NotNull(NnSqlValue::Timestamp(Timestamp::fx_ts1())), + SqlValue::NotNull(NnSqlValue::Timestamp(SpringTimestamp::fx_ts1())), ) .unwrap(); column_values @@ -263,7 +263,7 @@ mod tests { column_values .insert( ColumnName::new("timestamp".to_string()), - SqlValue::NotNull(NnSqlValue::Timestamp(Timestamp::fx_ts1())), + SqlValue::NotNull(NnSqlValue::Timestamp(SpringTimestamp::fx_ts1())), ) .unwrap(); column_values diff --git a/springql-core/src/stream_engine/autonomous_executor/row/foreign_row/sink_row.rs b/springql-core/src/stream_engine/autonomous_executor/row/foreign_row/sink_row.rs index e95c0f96..c10dedac 100644 --- a/springql-core/src/stream_engine/autonomous_executor/row/foreign_row/sink_row.rs +++ b/springql-core/src/stream_engine/autonomous_executor/row/foreign_row/sink_row.rs @@ -52,7 +52,7 @@ mod tests { pipeline::name::ColumnName, stream_engine::{ autonomous_executor::row::value::sql_value::SqlValue, - time::timestamp::{system_timestamp::SystemTimestamp, Timestamp}, + time::timestamp::{system_timestamp::SystemTimestamp, SpringTimestamp}, }, }; @@ -64,7 +64,7 @@ mod tests { let f_row = SinkRow(row); let json = JsonObject::new(json!({ - "ts": Timestamp::fx_ts1().to_string(), + "ts": SpringTimestamp::fx_ts1().to_string(), "city": "Tokyo", "temperature": 21 })); @@ -81,7 +81,7 @@ mod tests { let f_rowtime_sql_value = f_colvals.remove(&ColumnName::arrival_rowtime()).unwrap(); if let SqlValue::NotNull(f_rowtime_nn_sql_value) = f_rowtime_sql_value { - let f_rowtime: Timestamp = f_rowtime_nn_sql_value.unpack().unwrap(); + let f_rowtime: SpringTimestamp = f_rowtime_nn_sql_value.unpack().unwrap(); assert!(SystemTimestamp::now() - Duration::seconds(1) < f_rowtime); assert!(f_rowtime < SystemTimestamp::now() + Duration::seconds(1)); } else { diff --git a/springql-core/src/stream_engine/autonomous_executor/row/value/sql_convertible.rs b/springql-core/src/stream_engine/autonomous_executor/row/value/sql_convertible.rs index 10c6b4af..8b06521a 100644 --- a/springql-core/src/stream_engine/autonomous_executor/row/value/sql_convertible.rs +++ b/springql-core/src/stream_engine/autonomous_executor/row/value/sql_convertible.rs @@ -9,7 +9,7 @@ mod timestamp; use crate::{ error::{Result, SpringError}, - stream_engine::time::{duration::event_duration::EventDuration, timestamp::Timestamp}, + stream_engine::time::{duration::event_duration::SpringEventDuration, timestamp::SpringTimestamp}, }; use anyhow::anyhow; use std::any::type_name; @@ -70,7 +70,7 @@ pub trait SpringValue: Sized { /// /// - [SpringError::Sql](crate::error::SpringError::Sql) when: /// - the type implementing SqlConvertible is not convertible from Timestamp - fn try_from_timestamp(_: &Timestamp) -> Result { + fn try_from_timestamp(_: &SpringTimestamp) -> Result { Self::default_err("Timestamp") } @@ -78,7 +78,7 @@ pub trait SpringValue: Sized { /// /// - [SpringError::Sql](crate::error::SpringError::Sql) when: /// - the type implementing SqlConvertible is not convertible from EventDuration - fn try_from_duration(_: &EventDuration) -> Result { + fn try_from_duration(_: &SpringEventDuration) -> Result { Self::default_err("EventDuration") } diff --git a/springql-core/src/stream_engine/autonomous_executor/row/value/sql_convertible/event_duration.rs b/springql-core/src/stream_engine/autonomous_executor/row/value/sql_convertible/event_duration.rs index 8e30e2e9..5588ed15 100644 --- a/springql-core/src/stream_engine/autonomous_executor/row/value/sql_convertible/event_duration.rs +++ b/springql-core/src/stream_engine/autonomous_executor/row/value/sql_convertible/event_duration.rs @@ -4,19 +4,19 @@ use crate::{ error::Result, stream_engine::{ autonomous_executor::row::value::sql_value::nn_sql_value::NnSqlValue, - time::duration::event_duration::EventDuration, + time::duration::event_duration::SpringEventDuration, }, }; use super::{SpringValue, ToNnSqlValue}; -impl SpringValue for EventDuration { - fn try_from_duration(v: &EventDuration) -> Result { +impl SpringValue for SpringEventDuration { + fn try_from_duration(v: &SpringEventDuration) -> Result { Ok(*v) } } -impl ToNnSqlValue for EventDuration { +impl ToNnSqlValue for SpringEventDuration { fn into_sql_value(self) -> NnSqlValue { NnSqlValue::Duration(self) } diff --git a/springql-core/src/stream_engine/autonomous_executor/row/value/sql_convertible/text.rs b/springql-core/src/stream_engine/autonomous_executor/row/value/sql_convertible/text.rs index 80875613..e834bd90 100644 --- a/springql-core/src/stream_engine/autonomous_executor/row/value/sql_convertible/text.rs +++ b/springql-core/src/stream_engine/autonomous_executor/row/value/sql_convertible/text.rs @@ -4,7 +4,7 @@ use crate::{ error::Result, stream_engine::{ autonomous_executor::row::value::sql_value::nn_sql_value::NnSqlValue, - time::timestamp::Timestamp, + time::timestamp::SpringTimestamp, }, }; @@ -15,7 +15,7 @@ impl SpringValue for String { Ok(v.to_string()) } - fn try_from_timestamp(v: &Timestamp) -> Result { + fn try_from_timestamp(v: &SpringTimestamp) -> Result { Ok(v.to_string()) } } diff --git a/springql-core/src/stream_engine/autonomous_executor/row/value/sql_convertible/timestamp.rs b/springql-core/src/stream_engine/autonomous_executor/row/value/sql_convertible/timestamp.rs index 7ea094e7..fe49d264 100644 --- a/springql-core/src/stream_engine/autonomous_executor/row/value/sql_convertible/timestamp.rs +++ b/springql-core/src/stream_engine/autonomous_executor/row/value/sql_convertible/timestamp.rs @@ -4,23 +4,23 @@ use crate::{ error::Result, stream_engine::{ autonomous_executor::row::value::sql_value::nn_sql_value::NnSqlValue, - time::timestamp::Timestamp, + time::timestamp::SpringTimestamp, }, }; use super::{SpringValue, ToNnSqlValue}; -impl SpringValue for Timestamp { +impl SpringValue for SpringTimestamp { fn try_from_string(s: &str) -> Result { s.parse() } - fn try_from_timestamp(v: &Timestamp) -> Result { + fn try_from_timestamp(v: &SpringTimestamp) -> Result { Ok(*v) } } -impl ToNnSqlValue for Timestamp { +impl ToNnSqlValue for SpringTimestamp { fn into_sql_value(self) -> NnSqlValue { NnSqlValue::Timestamp(self) } diff --git a/springql-core/src/stream_engine/autonomous_executor/row/value/sql_value.rs b/springql-core/src/stream_engine/autonomous_executor/row/value/sql_value.rs index cdf88f24..e882896a 100644 --- a/springql-core/src/stream_engine/autonomous_executor/row/value/sql_value.rs +++ b/springql-core/src/stream_engine/autonomous_executor/row/value/sql_value.rs @@ -8,7 +8,7 @@ use self::{nn_sql_value::NnSqlValue, sql_compare_result::SqlCompareResult}; use crate::{ error::{Result, SpringError}, mem_size::MemSize, - stream_engine::time::duration::event_duration::EventDuration, + stream_engine::time::duration::event_duration::SpringEventDuration, }; use anyhow::anyhow; use ordered_float::OrderedFloat; @@ -162,12 +162,12 @@ impl SqlValue { /// /// - `SpringError::Sql` when: /// - this SqlValue cannot be evaluated as event duration - pub(crate) fn to_event_duration(&self) -> Result { + pub(crate) fn to_event_duration(&self) -> Result { match self { SqlValue::Null => Err(SpringError::Sql(anyhow!( "NULL cannot be evaluated as event duration", ))), - SqlValue::NotNull(nn_sql_value) => nn_sql_value.unpack::(), + SqlValue::NotNull(nn_sql_value) => nn_sql_value.unpack::(), } } } diff --git a/springql-core/src/stream_engine/autonomous_executor/row/value/sql_value/nn_sql_value.rs b/springql-core/src/stream_engine/autonomous_executor/row/value/sql_value/nn_sql_value.rs index 44de2c6a..669e6dc1 100644 --- a/springql-core/src/stream_engine/autonomous_executor/row/value/sql_value/nn_sql_value.rs +++ b/springql-core/src/stream_engine/autonomous_executor/row/value/sql_value/nn_sql_value.rs @@ -13,8 +13,8 @@ use crate::pipeline::relation::sql_type::{ use crate::stream_engine::autonomous_executor::row::value::sql_convertible::{ SpringValue, ToNnSqlValue, }; -use crate::stream_engine::time::duration::event_duration::EventDuration; -use crate::stream_engine::time::timestamp::Timestamp; +use crate::stream_engine::time::duration::event_duration::SpringEventDuration; +use crate::stream_engine::time::timestamp::SpringTimestamp; use anyhow::anyhow; use ordered_float::OrderedFloat; @@ -41,10 +41,10 @@ pub(crate) enum NnSqlValue { Boolean(bool), /// TIMESTAMP - Timestamp(Timestamp), + Timestamp(SpringTimestamp), /// DURATION - Duration(EventDuration), + Duration(SpringEventDuration), } impl MemSize for NnSqlValue { @@ -123,8 +123,8 @@ impl Hash for NnSqlValue { s.hash(state); }, |b: bool| { b.hash(state) }, - |t: Timestamp| { t.hash(state) }, - |d: EventDuration| { d.hash(state) } + |t: SpringTimestamp| { t.hash(state) }, + |d: SpringEventDuration| { d.hash(state) } ) } } @@ -137,8 +137,8 @@ impl Display for NnSqlValue { |f: OrderedFloat| f.to_string(), |s: String| format!(r#""{}""#, s), |b: bool| (if b { "TRUE" } else { "FALSE" }).to_string(), - |t: Timestamp| t.to_string(), - |d: EventDuration| d.to_string() + |t: SpringTimestamp| t.to_string(), + |d: SpringEventDuration| d.to_string() ); write!(f, "{}", s) } @@ -220,9 +220,9 @@ impl NnSqlValue { } }, SqlType::BooleanComparable => self.unpack::().map(|v| v.into_sql_value()), - SqlType::TimestampComparable => self.unpack::().map(|v| v.into_sql_value()), + SqlType::TimestampComparable => self.unpack::().map(|v| v.into_sql_value()), SqlType::DurationComparable => { - self.unpack::().map(|v| v.into_sql_value()) + self.unpack::().map(|v| v.into_sql_value()) } } } @@ -260,7 +260,7 @@ impl NnSqlValue { Ok(SqlCompareResult::from(self_b.cmp(&other_b))) } (SqlType::TimestampComparable, SqlType::TimestampComparable) => { - let (self_t, other_t) = (self.unpack::()?, other.unpack::()?); + let (self_t, other_t) = (self.unpack::()?, other.unpack::()?); Ok(SqlCompareResult::from(self_t.cmp(&other_t))) } (_, _) => Err(SpringError::Sql(anyhow!( @@ -394,8 +394,8 @@ mod tests { assert!(!NnSqlValue::Boolean(false).unpack::()?); assert_eq!( - NnSqlValue::Timestamp(Timestamp::fx_ts1()).unpack::()?, - Timestamp::fx_ts1() + NnSqlValue::Timestamp(SpringTimestamp::fx_ts1()).unpack::()?, + SpringTimestamp::fx_ts1() ); Ok(()) diff --git a/springql-core/src/stream_engine/autonomous_executor/task/tuple.rs b/springql-core/src/stream_engine/autonomous_executor/task/tuple.rs index 292dfde9..31ce80ae 100644 --- a/springql-core/src/stream_engine/autonomous_executor/task/tuple.rs +++ b/springql-core/src/stream_engine/autonomous_executor/task/tuple.rs @@ -4,7 +4,7 @@ use crate::{ error::{Result, SpringError}, mem_size::MemSize, pipeline::field::{field_name::ColumnReference, Field}, - stream_engine::{autonomous_executor::row::Row, time::timestamp::Timestamp, SqlValue}, + stream_engine::{autonomous_executor::row::Row, time::timestamp::SpringTimestamp, SqlValue}, }; use anyhow::anyhow; @@ -19,7 +19,7 @@ use anyhow::anyhow; pub(crate) struct Tuple { /// Either be an event-time or a process-time. /// If a row this tuple is constructed from has a ROWTIME column, `rowtime` has duplicate value with one of `fields`. - rowtime: Timestamp, + rowtime: SpringTimestamp, fields: Vec, } @@ -48,7 +48,7 @@ impl Tuple { Self { rowtime, fields } } - pub(in crate::stream_engine::autonomous_executor) fn rowtime(&self) -> &Timestamp { + pub(in crate::stream_engine::autonomous_executor) fn rowtime(&self) -> &SpringTimestamp { &self.rowtime } @@ -110,7 +110,7 @@ mod tests { // ColumnReference TestDatum::new( ValueExpr::ColumnReference(ColumnReference::factory("trade", "amount")), - Tuple::factory_trade(Timestamp::fx_ts1(), "ORCL", 1), + Tuple::factory_trade(SpringTimestamp::fx_ts1(), "ORCL", 1), SqlValue::factory_integer(1), ), // BooleanExpression diff --git a/springql-core/src/stream_engine/autonomous_executor/task/window/aggregate.rs b/springql-core/src/stream_engine/autonomous_executor/task/window/aggregate.rs index 4a53d5a3..2274611b 100644 --- a/springql-core/src/stream_engine/autonomous_executor/task/window/aggregate.rs +++ b/springql-core/src/stream_engine/autonomous_executor/task/window/aggregate.rs @@ -119,8 +119,8 @@ mod tests { stream_engine::{ autonomous_executor::task::tuple::Tuple, time::{ - duration::{event_duration::EventDuration, SpringDuration}, - timestamp::Timestamp, + duration::{event_duration::SpringEventDuration, SpringDuration}, + timestamp::SpringTimestamp, }, }, }; @@ -181,9 +181,9 @@ mod tests { let mut window = AggrWindow::new( WindowParameter::TimedSlidingWindow { - length: EventDuration::from_secs(10), - period: EventDuration::from_secs(5), - allowed_delay: EventDuration::from_secs(1), + length: SpringEventDuration::from_secs(10), + period: SpringEventDuration::from_secs(5), + allowed_delay: SpringEventDuration::from_secs(1), }, WindowOperationParameter::GroupAggregation(GroupAggregateParameter { aggr_func: AggregateFunctionParameter::Avg, @@ -197,7 +197,7 @@ mod tests { let (out, window_in_flow) = window.dispatch( &expr_resolver, Tuple::factory_trade( - Timestamp::from_str("2020-01-01 00:00:00.000000000").unwrap(), + SpringTimestamp::from_str("2020-01-01 00:00:00.000000000").unwrap(), "GOOGL", 100, ), @@ -212,7 +212,7 @@ mod tests { let (out, window_in_flow) = window.dispatch( &expr_resolver, Tuple::factory_trade( - Timestamp::from_str("2020-01-01 00:00:04.999999999").unwrap(), + SpringTimestamp::from_str("2020-01-01 00:00:04.999999999").unwrap(), "ORCL", 100, ), @@ -229,7 +229,7 @@ mod tests { let (mut out, window_in_flow) = window.dispatch( &expr_resolver, Tuple::factory_trade( - Timestamp::from_str("2020-01-01 00:00:06.000000000").unwrap(), + SpringTimestamp::from_str("2020-01-01 00:00:06.000000000").unwrap(), "ORCL", 400, ), @@ -255,7 +255,7 @@ mod tests { let (out, window_in_flow) = window.dispatch( &expr_resolver, Tuple::factory_trade( - Timestamp::from_str("2020-01-01 00:00:10.999999999").unwrap(), + SpringTimestamp::from_str("2020-01-01 00:00:10.999999999").unwrap(), "ORCL", 100, ), @@ -273,7 +273,7 @@ mod tests { let (out, window_in_flow) = window.dispatch( &expr_resolver, Tuple::factory_trade( - Timestamp::from_str("2020-01-01 00:00:09.999999998").unwrap(), + SpringTimestamp::from_str("2020-01-01 00:00:09.999999998").unwrap(), "ORCL", 100, ), @@ -289,7 +289,7 @@ mod tests { let (out, window_in_flow) = window.dispatch( &expr_resolver, Tuple::factory_trade( - Timestamp::from_str("2020-01-01 00:00:09.9999999999").unwrap(), + SpringTimestamp::from_str("2020-01-01 00:00:09.9999999999").unwrap(), "ORCL", 100, ), @@ -306,7 +306,7 @@ mod tests { let (mut out, window_in_flow) = window.dispatch( &expr_resolver, Tuple::factory_trade( - Timestamp::from_str("2020-01-01 00:00:11.000000000").unwrap(), + SpringTimestamp::from_str("2020-01-01 00:00:11.000000000").unwrap(), "ORCL", 100, ), @@ -334,7 +334,7 @@ mod tests { let (out, window_in_flow) = window.dispatch( &expr_resolver, Tuple::factory_trade( - Timestamp::from_str("2020-01-01 00:00:21.000000000").unwrap(), + SpringTimestamp::from_str("2020-01-01 00:00:21.000000000").unwrap(), "ORCL", 100, ), @@ -389,8 +389,8 @@ mod tests { let mut window = AggrWindow::new( WindowParameter::TimedFixedWindow { - length: EventDuration::from_secs(10), - allowed_delay: EventDuration::from_secs(1), + length: SpringEventDuration::from_secs(10), + allowed_delay: SpringEventDuration::from_secs(1), }, WindowOperationParameter::GroupAggregation(GroupAggregateParameter { aggr_func: AggregateFunctionParameter::Avg, @@ -403,7 +403,7 @@ mod tests { let (out, window_in_flow) = window.dispatch( &expr_resolver, Tuple::factory_trade( - Timestamp::from_str("2020-01-01 00:00:00.000000000").unwrap(), + SpringTimestamp::from_str("2020-01-01 00:00:00.000000000").unwrap(), "GOOGL", 100, ), @@ -417,7 +417,7 @@ mod tests { let (out, window_in_flow) = window.dispatch( &expr_resolver, Tuple::factory_trade( - Timestamp::from_str("2020-01-01 00:00:09.000000000").unwrap(), + SpringTimestamp::from_str("2020-01-01 00:00:09.000000000").unwrap(), "ORCL", 100, ), @@ -431,7 +431,7 @@ mod tests { let (out, window_in_flow) = window.dispatch( &expr_resolver, Tuple::factory_trade( - Timestamp::from_str("2020-01-01 00:00:09.999999999").unwrap(), + SpringTimestamp::from_str("2020-01-01 00:00:09.999999999").unwrap(), "ORCL", 400, ), @@ -446,7 +446,7 @@ mod tests { let (out, window_in_flow) = window.dispatch( &expr_resolver, Tuple::factory_trade( - Timestamp::from_str("2020-01-01 00:00:10.999999999").unwrap(), + SpringTimestamp::from_str("2020-01-01 00:00:10.999999999").unwrap(), "ORCL", 100, ), @@ -463,7 +463,7 @@ mod tests { let (out, window_in_flow) = window.dispatch( &expr_resolver, Tuple::factory_trade( - Timestamp::from_str("2020-01-01 00:00:09.999999998").unwrap(), + SpringTimestamp::from_str("2020-01-01 00:00:09.999999998").unwrap(), "ORCL", 100, ), @@ -478,7 +478,7 @@ mod tests { let (out, window_in_flow) = window.dispatch( &expr_resolver, Tuple::factory_trade( - Timestamp::from_str("2020-01-01 00:00:09.9999999999").unwrap(), + SpringTimestamp::from_str("2020-01-01 00:00:09.9999999999").unwrap(), "ORCL", 100, ), @@ -494,7 +494,7 @@ mod tests { let (mut out, window_in_flow) = window.dispatch( &expr_resolver, Tuple::factory_trade( - Timestamp::from_str("2020-01-01 00:00:11.000000000").unwrap(), + SpringTimestamp::from_str("2020-01-01 00:00:11.000000000").unwrap(), "ORCL", 100, ), @@ -520,7 +520,7 @@ mod tests { let (out, window_in_flow) = window.dispatch( &expr_resolver, Tuple::factory_trade( - Timestamp::from_str("2020-01-01 00:00:21.000000000").unwrap(), + SpringTimestamp::from_str("2020-01-01 00:00:21.000000000").unwrap(), "ORCL", 100, ), diff --git a/springql-core/src/stream_engine/autonomous_executor/task/window/join_window.rs b/springql-core/src/stream_engine/autonomous_executor/task/window/join_window.rs index 81e71f5b..a510b7bd 100644 --- a/springql-core/src/stream_engine/autonomous_executor/task/window/join_window.rs +++ b/springql-core/src/stream_engine/autonomous_executor/task/window/join_window.rs @@ -72,8 +72,8 @@ mod tests { stream_engine::{ autonomous_executor::task::window::panes::pane::join_pane::JoinDir, time::{ - duration::{event_duration::EventDuration, SpringDuration}, - timestamp::Timestamp, + duration::{event_duration::SpringEventDuration, SpringDuration}, + timestamp::SpringTimestamp, }, SqlValue, Tuple, }, @@ -83,7 +83,7 @@ mod tests { fn t_expect( tuple: Tuple, - expected_timestamp: Timestamp, + expected_timestamp: SpringTimestamp, expected_amount: i32, expected_temperature: Option, ) { @@ -91,7 +91,7 @@ mod tests { .get_value(&ColumnReference::fx_trade_timestamp()) .unwrap() .unwrap(); - assert_eq!(timestamp.unpack::().unwrap(), expected_timestamp); + assert_eq!(timestamp.unpack::().unwrap(), expected_timestamp); let amount = tuple .get_value(&ColumnReference::fx_trade_amount()) @@ -164,8 +164,8 @@ mod tests { let mut window = JoinWindow::new( WindowParameter::TimedFixedWindow { - length: EventDuration::from_secs(10), - allowed_delay: EventDuration::from_secs(1), + length: SpringEventDuration::from_secs(10), + allowed_delay: SpringEventDuration::from_secs(1), }, JoinParameter { join_type: JoinType::LeftOuter, @@ -187,7 +187,7 @@ mod tests { let (out, window_in_flow) = window.dispatch( &expr_resolver, Tuple::factory_trade( - Timestamp::from_str("2020-01-01 00:00:00.000000000").unwrap(), + SpringTimestamp::from_str("2020-01-01 00:00:00.000000000").unwrap(), "", 100, ), @@ -201,7 +201,7 @@ mod tests { let (out, window_in_flow) = window.dispatch( &expr_resolver, Tuple::factory_city_temperature( - Timestamp::from_str("2020-01-01 00:00:00.0000000000").unwrap(), + SpringTimestamp::from_str("2020-01-01 00:00:00.0000000000").unwrap(), "", 10, ), @@ -215,7 +215,7 @@ mod tests { let (out, window_in_flow) = window.dispatch( &expr_resolver, Tuple::factory_trade( - Timestamp::from_str("2020-01-01 00:00:09.999999999").unwrap(), + SpringTimestamp::from_str("2020-01-01 00:00:09.999999999").unwrap(), "", 200, ), @@ -230,7 +230,7 @@ mod tests { let (out, window_in_flow) = window.dispatch( &expr_resolver, Tuple::factory_trade( - Timestamp::from_str("2020-01-01 00:00:10.999999999").unwrap(), + SpringTimestamp::from_str("2020-01-01 00:00:10.999999999").unwrap(), "", 300, ), @@ -247,7 +247,7 @@ mod tests { let (out, window_in_flow) = window.dispatch( &expr_resolver, Tuple::factory_trade( - Timestamp::from_str("2020-01-01 00:00:09.999999998").unwrap(), + SpringTimestamp::from_str("2020-01-01 00:00:09.999999998").unwrap(), "", 400, ), @@ -262,7 +262,7 @@ mod tests { let (out, window_in_flow) = window.dispatch( &expr_resolver, Tuple::factory_trade( - Timestamp::from_str("2020-01-01 00:00:09.999999999").unwrap(), + SpringTimestamp::from_str("2020-01-01 00:00:09.999999999").unwrap(), "", 500, ), @@ -278,7 +278,7 @@ mod tests { let (out, window_in_flow) = window.dispatch( &expr_resolver, Tuple::factory_trade( - Timestamp::from_str("2020-01-01 00:00:11.0000000000").unwrap(), + SpringTimestamp::from_str("2020-01-01 00:00:11.0000000000").unwrap(), "", 600, ), @@ -287,19 +287,19 @@ mod tests { assert_eq!(out.len(), 3); t_expect( out.get(0).cloned().unwrap(), - Timestamp::from_str("2020-01-01 00:00:00.0000000000").unwrap(), + SpringTimestamp::from_str("2020-01-01 00:00:00.0000000000").unwrap(), 100, Some(10), ); t_expect( out.get(1).cloned().unwrap(), - Timestamp::from_str("2020-01-01 00:00:09.999999999").unwrap(), + SpringTimestamp::from_str("2020-01-01 00:00:09.999999999").unwrap(), 200, None, ); t_expect( out.get(2).cloned().unwrap(), - Timestamp::from_str("2020-01-01 00:00:09.999999999").unwrap(), + SpringTimestamp::from_str("2020-01-01 00:00:09.999999999").unwrap(), 500, None, ); diff --git a/springql-core/src/stream_engine/autonomous_executor/task/window/panes.rs b/springql-core/src/stream_engine/autonomous_executor/task/window/panes.rs index dd291073..0f50646c 100644 --- a/springql-core/src/stream_engine/autonomous_executor/task/window/panes.rs +++ b/springql-core/src/stream_engine/autonomous_executor/task/window/panes.rs @@ -8,7 +8,7 @@ use crate::{ pipeline::pump_model::{ window_operation_parameter::WindowOperationParameter, window_parameter::WindowParameter, }, - stream_engine::time::{duration::SpringDuration, timestamp::Timestamp}, + stream_engine::time::{duration::SpringDuration, timestamp::SpringTimestamp}, }; use self::pane::Pane; @@ -45,7 +45,7 @@ where /// Then, return all panes to get a tuple with the `rowtime`. /// /// Caller must assure rowtime is not smaller than watermark. - pub(super) fn panes_to_dispatch(&mut self, rowtime: Timestamp) -> impl Iterator { + pub(super) fn panes_to_dispatch(&mut self, rowtime: SpringTimestamp) -> impl Iterator { self.generate_panes_if_not_exist(rowtime); self.panes @@ -75,7 +75,7 @@ where self.panes.clear() } - fn generate_panes_if_not_exist(&mut self, rowtime: Timestamp) { + fn generate_panes_if_not_exist(&mut self, rowtime: SpringTimestamp) { // Sort-Merge Join like algorithm let mut pane_idx = 0; for open_at in self.valid_open_at_s(rowtime) { @@ -101,7 +101,7 @@ where } } - fn valid_open_at_s(&self, rowtime: Timestamp) -> Vec { + fn valid_open_at_s(&self, rowtime: SpringTimestamp) -> Vec { let mut ret = vec![]; let leftmost_open_at = { @@ -126,7 +126,7 @@ where ret } - fn generate_pane(&self, open_at: Timestamp) -> P { + fn generate_pane(&self, open_at: SpringTimestamp) -> P { let close_at = open_at + self.window_param.length().to_chrono(); P::new(open_at, close_at, self.op_param.clone()) } @@ -145,7 +145,7 @@ mod tests { sql_processor::sql_parser::syntax::SelectFieldSyntax, stream_engine::{ autonomous_executor::task::window::panes::pane::aggregate_pane::AggrPane, - time::duration::event_duration::EventDuration, + time::duration::event_duration::SpringEventDuration, }, }; @@ -176,42 +176,42 @@ mod tests { #[test] fn test_valid_open_at_s() { - fn sliding_window_panes(length: EventDuration, period: EventDuration) -> Panes { + fn sliding_window_panes(length: SpringEventDuration, period: SpringEventDuration) -> Panes { Panes::new( WindowParameter::TimedSlidingWindow { length, period, - allowed_delay: EventDuration::from_secs(0), + allowed_delay: SpringEventDuration::from_secs(0), }, dont_care_window_operation_parameter(), ) } - let panes = sliding_window_panes(EventDuration::from_secs(10), EventDuration::from_secs(5)); + let panes = sliding_window_panes(SpringEventDuration::from_secs(10), SpringEventDuration::from_secs(5)); assert_eq!( - panes.valid_open_at_s(Timestamp::from_str("2020-01-01 00:00:05.000000000").unwrap()), + panes.valid_open_at_s(SpringTimestamp::from_str("2020-01-01 00:00:05.000000000").unwrap()), vec![ - Timestamp::from_str("2020-01-01 00:00:00.000000000").unwrap(), - Timestamp::from_str("2020-01-01 00:00:05.000000000").unwrap() + SpringTimestamp::from_str("2020-01-01 00:00:00.000000000").unwrap(), + SpringTimestamp::from_str("2020-01-01 00:00:05.000000000").unwrap() ] ); assert_eq!( - panes.valid_open_at_s(Timestamp::from_str("2020-01-01 00:00:09.999999999").unwrap()), + panes.valid_open_at_s(SpringTimestamp::from_str("2020-01-01 00:00:09.999999999").unwrap()), vec![ - Timestamp::from_str("2020-01-01 00:00:00.000000000").unwrap(), - Timestamp::from_str("2020-01-01 00:00:05.000000000").unwrap() + SpringTimestamp::from_str("2020-01-01 00:00:00.000000000").unwrap(), + SpringTimestamp::from_str("2020-01-01 00:00:05.000000000").unwrap() ] ); let panes = - sliding_window_panes(EventDuration::from_secs(10), EventDuration::from_secs(10)); + sliding_window_panes(SpringEventDuration::from_secs(10), SpringEventDuration::from_secs(10)); assert_eq!( - panes.valid_open_at_s(Timestamp::from_str("2020-01-01 00:00:00.000000000").unwrap()), - vec![Timestamp::from_str("2020-01-01 00:00:00.000000000").unwrap(),] + panes.valid_open_at_s(SpringTimestamp::from_str("2020-01-01 00:00:00.000000000").unwrap()), + vec![SpringTimestamp::from_str("2020-01-01 00:00:00.000000000").unwrap(),] ); assert_eq!( - panes.valid_open_at_s(Timestamp::from_str("2020-01-01 00:00:09.999999999").unwrap()), - vec![Timestamp::from_str("2020-01-01 00:00:00.000000000").unwrap(),] + panes.valid_open_at_s(SpringTimestamp::from_str("2020-01-01 00:00:09.999999999").unwrap()), + vec![SpringTimestamp::from_str("2020-01-01 00:00:00.000000000").unwrap(),] ); } } diff --git a/springql-core/src/stream_engine/autonomous_executor/task/window/panes/pane.rs b/springql-core/src/stream_engine/autonomous_executor/task/window/panes/pane.rs index b9c68c3f..f5d3bfd7 100644 --- a/springql-core/src/stream_engine/autonomous_executor/task/window/panes/pane.rs +++ b/springql-core/src/stream_engine/autonomous_executor/task/window/panes/pane.rs @@ -8,7 +8,7 @@ use crate::{ performance_metrics::metrics_update_command::metrics_update_by_task_execution::WindowInFlowByWindowTask, task::window::watermark::Watermark, }, - time::timestamp::Timestamp, + time::timestamp::SpringTimestamp, Tuple, }, }; @@ -20,12 +20,12 @@ pub(in crate::stream_engine::autonomous_executor) trait Pane { type CloseOut; type DispatchArg: Clone; - fn new(open_at: Timestamp, close_at: Timestamp, param: WindowOperationParameter) -> Self; + fn new(open_at: SpringTimestamp, close_at: SpringTimestamp, param: WindowOperationParameter) -> Self; - fn open_at(&self) -> Timestamp; - fn close_at(&self) -> Timestamp; + fn open_at(&self) -> SpringTimestamp; + fn close_at(&self) -> SpringTimestamp; - fn is_acceptable(&self, rowtime: &Timestamp) -> bool { + fn is_acceptable(&self, rowtime: &SpringTimestamp) -> bool { &self.open_at() <= rowtime && rowtime < &self.close_at() } diff --git a/springql-core/src/stream_engine/autonomous_executor/task/window/panes/pane/aggregate_pane.rs b/springql-core/src/stream_engine/autonomous_executor/task/window/panes/pane/aggregate_pane.rs index 1e214af0..cd49faab 100644 --- a/springql-core/src/stream_engine/autonomous_executor/task/window/panes/pane/aggregate_pane.rs +++ b/springql-core/src/stream_engine/autonomous_executor/task/window/panes/pane/aggregate_pane.rs @@ -17,7 +17,7 @@ use crate::{ performance_metrics::metrics_update_command::metrics_update_by_task_execution::WindowInFlowByWindowTask, task::{tuple::Tuple, window::aggregate::GroupAggrOut}, }, - time::timestamp::Timestamp, + time::timestamp::SpringTimestamp, NnSqlValue, SqlValue, }, }; @@ -28,8 +28,8 @@ use super::Pane; #[derive(Debug)] pub(in crate::stream_engine::autonomous_executor) struct AggrPane { - open_at: Timestamp, - close_at: Timestamp, + open_at: SpringTimestamp, + close_at: SpringTimestamp, group_aggregation_parameter: GroupAggregateParameter, @@ -43,7 +43,7 @@ impl Pane for AggrPane { /// # Panics /// /// if `op_param` is not `GroupAggregateParameter` - fn new(open_at: Timestamp, close_at: Timestamp, op_param: WindowOperationParameter) -> Self { + fn new(open_at: SpringTimestamp, close_at: SpringTimestamp, op_param: WindowOperationParameter) -> Self { if let WindowOperationParameter::GroupAggregation(group_aggregation_parameter) = op_param { let inner = match group_aggregation_parameter.aggr_func { AggregateFunctionParameter::Avg => AggrPaneInner::Avg { @@ -62,11 +62,11 @@ impl Pane for AggrPane { } } - fn open_at(&self) -> Timestamp { + fn open_at(&self) -> SpringTimestamp { self.open_at } - fn close_at(&self) -> Timestamp { + fn close_at(&self) -> SpringTimestamp { self.close_at } diff --git a/springql-core/src/stream_engine/autonomous_executor/task/window/panes/pane/join_pane.rs b/springql-core/src/stream_engine/autonomous_executor/task/window/panes/pane/join_pane.rs index df5c65a1..1fc41d3b 100644 --- a/springql-core/src/stream_engine/autonomous_executor/task/window/panes/pane/join_pane.rs +++ b/springql-core/src/stream_engine/autonomous_executor/task/window/panes/pane/join_pane.rs @@ -17,7 +17,7 @@ use crate::{ performance_metrics::metrics_update_command::metrics_update_by_task_execution::WindowInFlowByWindowTask, task::tuple::Tuple, }, - time::timestamp::Timestamp, + time::timestamp::SpringTimestamp, SqlValue, }, }; @@ -32,8 +32,8 @@ pub(in crate::stream_engine::autonomous_executor) enum JoinDir { #[derive(Debug)] pub(in crate::stream_engine::autonomous_executor) struct JoinPane { - open_at: Timestamp, - close_at: Timestamp, + open_at: SpringTimestamp, + close_at: SpringTimestamp, join_parameter: JoinParameter, @@ -48,7 +48,7 @@ impl Pane for JoinPane { /// # Panics /// /// if `op_param` is not `JoinParameter` - fn new(open_at: Timestamp, close_at: Timestamp, op_param: WindowOperationParameter) -> Self { + fn new(open_at: SpringTimestamp, close_at: SpringTimestamp, op_param: WindowOperationParameter) -> Self { let join_parameter = if let WindowOperationParameter::Join(p) = op_param { p } else { @@ -64,11 +64,11 @@ impl Pane for JoinPane { } } - fn open_at(&self) -> Timestamp { + fn open_at(&self) -> SpringTimestamp { self.open_at } - fn close_at(&self) -> Timestamp { + fn close_at(&self) -> SpringTimestamp { self.close_at } @@ -149,7 +149,7 @@ impl JoinPane { fn null_right_tuple(&self) -> Tuple { // unused - let rowtime = Timestamp::from_str("1970-01-01 00:00:00.0000000000").unwrap(); + let rowtime = SpringTimestamp::from_str("1970-01-01 00:00:00.0000000000").unwrap(); let fields = self .join_parameter diff --git a/springql-core/src/stream_engine/autonomous_executor/task/window/watermark.rs b/springql-core/src/stream_engine/autonomous_executor/task/window/watermark.rs index 9241d285..400b1914 100644 --- a/springql-core/src/stream_engine/autonomous_executor/task/window/watermark.rs +++ b/springql-core/src/stream_engine/autonomous_executor/task/window/watermark.rs @@ -3,8 +3,8 @@ use std::cmp::max; use crate::stream_engine::time::{ - duration::{event_duration::EventDuration, SpringDuration}, - timestamp::{Timestamp, MIN_TIMESTAMP}, + duration::{event_duration::SpringEventDuration, SpringDuration}, + timestamp::{SpringTimestamp, MIN_TIMESTAMP}, }; /// A watermark is held by each window. @@ -14,23 +14,23 @@ use crate::stream_engine::time::{ /// ``` #[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Debug)] pub(in crate::stream_engine::autonomous_executor) struct Watermark { - max_rowtime: Timestamp, - allowed_delay: EventDuration, + max_rowtime: SpringTimestamp, + allowed_delay: SpringEventDuration, } impl Watermark { - pub(in crate::stream_engine::autonomous_executor) fn new(allowed_delay: EventDuration) -> Self { + pub(in crate::stream_engine::autonomous_executor) fn new(allowed_delay: SpringEventDuration) -> Self { Self { max_rowtime: MIN_TIMESTAMP + allowed_delay.to_chrono(), // to avoid overflow allowed_delay, } } - pub(in crate::stream_engine::autonomous_executor) fn as_timestamp(&self) -> Timestamp { + pub(in crate::stream_engine::autonomous_executor) fn as_timestamp(&self) -> SpringTimestamp { self.max_rowtime - self.allowed_delay.to_chrono() } - pub(in crate::stream_engine::autonomous_executor) fn update(&mut self, rowtime: Timestamp) { + pub(in crate::stream_engine::autonomous_executor) fn update(&mut self, rowtime: SpringTimestamp) { self.max_rowtime = max(rowtime, self.max_rowtime); } } diff --git a/springql-core/src/stream_engine/autonomous_executor/test_support/factory.rs b/springql-core/src/stream_engine/autonomous_executor/test_support/factory.rs index 1e91ff85..df2a7e06 100644 --- a/springql-core/src/stream_engine/autonomous_executor/test_support/factory.rs +++ b/springql-core/src/stream_engine/autonomous_executor/test_support/factory.rs @@ -10,7 +10,7 @@ use crate::pipeline::stream_model::StreamModel; use crate::stream_engine::autonomous_executor::task::source_task::source_reader::net_client::NetClientSourceReader; use crate::stream_engine::autonomous_executor::task::source_task::source_reader::SourceReader; use crate::stream_engine::autonomous_executor::task::tuple::Tuple; -use crate::stream_engine::time::timestamp::Timestamp; +use crate::stream_engine::time::timestamp::SpringTimestamp; use crate::{ pipeline::{ name::ColumnName, option::options_builder::OptionsBuilder, @@ -61,7 +61,7 @@ impl NetClientSourceReader { impl StreamColumns { pub(in crate::stream_engine) fn factory_city_temperature( - timestamp: Timestamp, + timestamp: SpringTimestamp, city: &str, temperature: i32, ) -> Self { @@ -89,7 +89,7 @@ impl StreamColumns { } pub(in crate::stream_engine) fn factory_trade( - timestamp: Timestamp, + timestamp: SpringTimestamp, ticker: &str, amount: i16, ) -> Self { @@ -138,7 +138,7 @@ impl StreamColumns { impl Row { pub(in crate::stream_engine) fn factory_city_temperature( - timestamp: Timestamp, + timestamp: SpringTimestamp, city: &str, temperature: i32, ) -> Self { @@ -149,7 +149,7 @@ impl Row { )) } pub(in crate::stream_engine) fn factory_trade( - timestamp: Timestamp, + timestamp: SpringTimestamp, ticker: &str, amount: i16, ) -> Self { @@ -159,14 +159,14 @@ impl Row { impl Tuple { pub(in crate::stream_engine) fn factory_city_temperature( - timestamp: Timestamp, + timestamp: SpringTimestamp, city: &str, temperature: i32, ) -> Self { Self::from_row(Row::factory_city_temperature(timestamp, city, temperature)) } pub(in crate::stream_engine) fn factory_trade( - timestamp: Timestamp, + timestamp: SpringTimestamp, ticker: &str, amount: i16, ) -> Self { diff --git a/springql-core/src/stream_engine/autonomous_executor/test_support/fixture.rs b/springql-core/src/stream_engine/autonomous_executor/test_support/fixture.rs index d7d93e5a..24b65793 100644 --- a/springql-core/src/stream_engine/autonomous_executor/test_support/fixture.rs +++ b/springql-core/src/stream_engine/autonomous_executor/test_support/fixture.rs @@ -38,12 +38,12 @@ use crate::{ row::foreign_row::source_row::SourceRow, task_graph::TaskGraph, }, - time::timestamp::Timestamp, + time::timestamp::SpringTimestamp, SinkRow, }, }; -impl Timestamp { +impl SpringTimestamp { pub(crate) fn fx_ts1() -> Self { "2021-01-01 13:00:00.000000001".parse().unwrap() } @@ -58,7 +58,7 @@ impl Timestamp { impl JsonObject { pub(in crate::stream_engine) fn fx_city_temperature_tokyo() -> Self { Self::new(json!({ - "ts": Timestamp::fx_ts1().to_string(), + "ts": SpringTimestamp::fx_ts1().to_string(), "city": "Tokyo", "temperature": 21, })) @@ -66,7 +66,7 @@ impl JsonObject { pub(in crate::stream_engine) fn fx_city_temperature_osaka() -> Self { Self::new(json!({ - "ts": Timestamp::fx_ts2().to_string(), + "ts": SpringTimestamp::fx_ts2().to_string(), "city": "Osaka", "temperature": 23, })) @@ -74,7 +74,7 @@ impl JsonObject { pub(in crate::stream_engine) fn fx_city_temperature_london() -> Self { Self::new(json!({ - "ts": Timestamp::fx_ts3().to_string(), + "ts": SpringTimestamp::fx_ts3().to_string(), "city": "London", "temperature": 13, })) @@ -82,7 +82,7 @@ impl JsonObject { pub(in crate::stream_engine) fn fx_trade_oracle() -> Self { Self::new(json!({ - "ts": Timestamp::fx_ts1().to_string(), + "ts": SpringTimestamp::fx_ts1().to_string(), "ticker": "ORCL", "amount": 20, })) @@ -90,7 +90,7 @@ impl JsonObject { pub(in crate::stream_engine) fn fx_trade_ibm() -> Self { Self::new(json!({ - "ts": Timestamp::fx_ts2().to_string(), + "ts": SpringTimestamp::fx_ts2().to_string(), "ticker": "IBM", "amount": 30, })) @@ -98,7 +98,7 @@ impl JsonObject { pub(in crate::stream_engine) fn fx_trade_google() -> Self { Self::new(json!({ - "ts": Timestamp::fx_ts3().to_string(), + "ts": SpringTimestamp::fx_ts3().to_string(), "ticker": "GOOGL", "amount": 100, })) @@ -184,23 +184,23 @@ impl Tuple { impl StreamColumns { pub(in crate::stream_engine) fn fx_city_temperature_tokyo() -> Self { - Self::factory_city_temperature(Timestamp::fx_ts1(), "Tokyo", 21) + Self::factory_city_temperature(SpringTimestamp::fx_ts1(), "Tokyo", 21) } pub(in crate::stream_engine) fn fx_city_temperature_osaka() -> Self { - Self::factory_city_temperature(Timestamp::fx_ts2(), "Osaka", 23) + Self::factory_city_temperature(SpringTimestamp::fx_ts2(), "Osaka", 23) } pub(in crate::stream_engine) fn fx_city_temperature_london() -> Self { - Self::factory_city_temperature(Timestamp::fx_ts3(), "London", 13) + Self::factory_city_temperature(SpringTimestamp::fx_ts3(), "London", 13) } pub(in crate::stream_engine) fn fx_trade_oracle() -> Self { - Self::factory_trade(Timestamp::fx_ts1(), "ORCL", 20) + Self::factory_trade(SpringTimestamp::fx_ts1(), "ORCL", 20) } pub(in crate::stream_engine) fn fx_trade_ibm() -> Self { - Self::factory_trade(Timestamp::fx_ts2(), "IBM", 30) + Self::factory_trade(SpringTimestamp::fx_ts2(), "IBM", 30) } pub(in crate::stream_engine) fn fx_trade_google() -> Self { - Self::factory_trade(Timestamp::fx_ts3(), "GOOGL", 100) + Self::factory_trade(SpringTimestamp::fx_ts3(), "GOOGL", 100) } pub(in crate::stream_engine) fn fx_no_promoted_rowtime() -> Self { diff --git a/springql-core/src/stream_engine/time/duration/event_duration.rs b/springql-core/src/stream_engine/time/duration/event_duration.rs index 9be981bb..3bd7d3ff 100644 --- a/springql-core/src/stream_engine/time/duration/event_duration.rs +++ b/springql-core/src/stream_engine/time/duration/event_duration.rs @@ -13,15 +13,15 @@ use super::SpringDuration; /// Event-time duration. #[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug)] -pub(crate) struct EventDuration(Duration); +pub struct SpringEventDuration(Duration); -impl MemSize for EventDuration { +impl MemSize for SpringEventDuration { fn mem_size(&self) -> usize { size_of::() + size_of::() } } -impl SpringDuration for EventDuration { +impl SpringDuration for SpringEventDuration { fn as_std(&self) -> &Duration { &self.0 } @@ -31,41 +31,41 @@ impl SpringDuration for EventDuration { } } -impl Display for EventDuration { +impl Display for SpringEventDuration { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "{} secs", self.0.as_secs()) } } -impl Add for EventDuration { +impl Add for SpringEventDuration { type Output = Self; - fn add(self, rhs: EventDuration) -> Self::Output { + fn add(self, rhs: SpringEventDuration) -> Self::Output { Self(self.0 + rhs.0) } } -impl Sub for EventDuration { +impl Sub for SpringEventDuration { type Output = Self; - fn sub(self, rhs: EventDuration) -> Self::Output { + fn sub(self, rhs: SpringEventDuration) -> Self::Output { Self(self.0 - rhs.0) } } -impl Mul for EventDuration { +impl Mul for SpringEventDuration { type Output = Self; fn mul(self, rhs: u32) -> Self::Output { Self(self.0 * rhs) } } -impl Mul for EventDuration { +impl Mul for SpringEventDuration { type Output = Self; fn mul(self, rhs: f32) -> Self::Output { Self(self.0.mul_f32(rhs)) } } -impl Div for EventDuration { +impl Div for SpringEventDuration { type Output = Self; fn div(self, rhs: u32) -> Self::Output { diff --git a/springql-core/src/stream_engine/time/duration/wall_clock_duration/wall_clock_stopwatch.rs b/springql-core/src/stream_engine/time/duration/wall_clock_duration/wall_clock_stopwatch.rs index 71b6b659..a09b6b6a 100644 --- a/springql-core/src/stream_engine/time/duration/wall_clock_duration/wall_clock_stopwatch.rs +++ b/springql-core/src/stream_engine/time/duration/wall_clock_duration/wall_clock_stopwatch.rs @@ -2,7 +2,7 @@ use crate::stream_engine::time::{ duration::SpringDuration, - timestamp::{system_timestamp::SystemTimestamp, Timestamp}, + timestamp::{system_timestamp::SystemTimestamp, SpringTimestamp}, }; use super::WallClockDuration; @@ -10,7 +10,7 @@ use super::WallClockDuration; /// Real-time (wall-clock) stopwatch. #[derive(Debug)] pub(in crate::stream_engine) struct WallClockStopwatch { - start_at: Timestamp, + start_at: SpringTimestamp, } impl WallClockStopwatch { diff --git a/springql-core/src/stream_engine/time/timestamp.rs b/springql-core/src/stream_engine/time/timestamp.rs index df36bc50..d6bb7411 100644 --- a/springql-core/src/stream_engine/time/timestamp.rs +++ b/springql-core/src/stream_engine/time/timestamp.rs @@ -19,23 +19,23 @@ use crate::{ }; /// The minimum possible `Timestamp`. -pub(crate) const MIN_TIMESTAMP: Timestamp = Timestamp(MIN_DATETIME); +pub(crate) const MIN_TIMESTAMP: SpringTimestamp = SpringTimestamp(MIN_DATETIME); const FORMAT: &str = "%Y-%m-%d %H:%M:%S%.9f"; /// Timestamp in UTC. Serializable. #[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug, Serialize, Deserialize, new)] -pub(crate) struct Timestamp(#[serde(with = "datetime_format")] NaiveDateTime); +pub struct SpringTimestamp(#[serde(with = "datetime_format")] NaiveDateTime); -impl MemSize for Timestamp { +impl MemSize for SpringTimestamp { fn mem_size(&self) -> usize { chrono_naive_date_time_overhead_size() } } -impl Timestamp { +impl SpringTimestamp { /// Note: `2262-04-11T23:47:16.854775804` is the maximum possible timestamp because it uses nano-sec unixtime internally. - pub(crate) fn floor(&self, resolution: Duration) -> Timestamp { + pub(crate) fn floor(&self, resolution: Duration) -> SpringTimestamp { let ts_nano = self.0.timestamp_nanos(); let resolution_nano = resolution.num_nanoseconds().expect("no overflow"); assert!(resolution_nano > 0); @@ -48,11 +48,11 @@ impl Timestamp { NaiveDateTime::from_timestamp(floor_ts_secs, floor_ts_nanos as u32) }; - Timestamp(floor_naive_date_time) + SpringTimestamp(floor_naive_date_time) } /// Note: `2262-04-11T23:47:16.854775804` is the maximum possible timestamp because it uses nano-sec unixtime internally. - pub(crate) fn ceil(&self, resolution: Duration) -> Timestamp { + pub(crate) fn ceil(&self, resolution: Duration) -> SpringTimestamp { let floor = self.floor(resolution); if &floor == self { floor @@ -68,7 +68,7 @@ impl Timestamp { s: s.to_string(), source: e, })?; - Ok(Timestamp(ndt)) + Ok(SpringTimestamp(ndt)) } fn try_parse_rfc3339(s: &str) -> Result { let dt = DateTime::parse_from_rfc3339(s) @@ -77,11 +77,11 @@ impl Timestamp { s: s.to_string(), source: e, })?; - Ok(Timestamp(dt.naive_utc())) + Ok(SpringTimestamp(dt.naive_utc())) } } -impl FromStr for Timestamp { +impl FromStr for SpringTimestamp { type Err = SpringError; /// Parse as RFC-3339 or `"%Y-%m-%d %H:%M:%S%.9f"` format. @@ -90,20 +90,20 @@ impl FromStr for Timestamp { } } -impl ToString for Timestamp { +impl ToString for SpringTimestamp { fn to_string(&self) -> String { self.0.format(FORMAT).to_string() } } -impl Add for Timestamp { +impl Add for SpringTimestamp { type Output = Self; fn add(self, rhs: Duration) -> Self::Output { Self(self.0 + rhs) } } -impl Sub for Timestamp { +impl Sub for SpringTimestamp { type Output = Self; fn sub(self, rhs: Duration) -> Self::Output { @@ -111,10 +111,10 @@ impl Sub for Timestamp { } } -impl Sub for Timestamp { +impl Sub for SpringTimestamp { type Output = Duration; - fn sub(self, rhs: Timestamp) -> Self::Output { + fn sub(self, rhs: SpringTimestamp) -> Self::Output { self.0 - rhs.0 } } @@ -151,8 +151,8 @@ mod tests { #[test] fn test_floor() { fn t(ts: &str, resolution: Duration, expected: &str) { - let ts = Timestamp::from_str(ts).unwrap(); - let expected = Timestamp::from_str(expected).unwrap(); + let ts = SpringTimestamp::from_str(ts).unwrap(); + let expected = SpringTimestamp::from_str(expected).unwrap(); let actual = ts.floor(resolution); assert_eq!(actual, expected); @@ -244,8 +244,8 @@ mod tests { #[test] fn test_ceil() { fn t(ts: &str, resolution: Duration, expected: &str) { - let ts = Timestamp::from_str(ts).unwrap(); - let expected = Timestamp::from_str(expected).unwrap(); + let ts = SpringTimestamp::from_str(ts).unwrap(); + let expected = SpringTimestamp::from_str(expected).unwrap(); let actual = ts.ceil(resolution); assert_eq!(actual, expected); @@ -351,7 +351,7 @@ mod tests { for t in ts { let ser = serde_json::to_string(&t).unwrap(); - let de: Timestamp = serde_json::from_str(&ser).unwrap(); + let de: SpringTimestamp = serde_json::from_str(&ser).unwrap(); assert_eq!(de, t); } @@ -360,8 +360,8 @@ mod tests { #[test] fn test_timestamp_parse_rfc3339() -> Result<()> { - let ts_rfc3339: Timestamp = "2020-01-01T09:12:34.56789+09:00".parse()?; - let ts: Timestamp = "2020-01-01 00:12:34.567890000".parse()?; + let ts_rfc3339: SpringTimestamp = "2020-01-01T09:12:34.56789+09:00".parse()?; + let ts: SpringTimestamp = "2020-01-01 00:12:34.567890000".parse()?; assert_eq!(ts_rfc3339, ts); Ok(()) diff --git a/springql-core/src/stream_engine/time/timestamp/system_timestamp.rs b/springql-core/src/stream_engine/time/timestamp/system_timestamp.rs index 5a0564f3..ff118ecc 100644 --- a/springql-core/src/stream_engine/time/timestamp/system_timestamp.rs +++ b/springql-core/src/stream_engine/time/timestamp/system_timestamp.rs @@ -1,14 +1,14 @@ // This file is part of https://github.com/SpringQL/SpringQL which is licensed under MIT OR Apache-2.0. See file LICENSE-MIT or LICENSE-APACHE for full license details. -use super::Timestamp; +use super::SpringTimestamp; /// Wall-clock timestamp from system clock. #[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug, new)] pub(crate) struct SystemTimestamp; impl SystemTimestamp { - pub(crate) fn now() -> Timestamp { + pub(crate) fn now() -> SpringTimestamp { let t = chrono::offset::Utc::now().naive_utc(); - Timestamp::new(t) + SpringTimestamp::new(t) } } From 74563fae028c3a722a12b074779d75efdd503ce0 Mon Sep 17 00:00:00 2001 From: Sho Nakatani Date: Mon, 9 May 2022 07:12:54 +0900 Subject: [PATCH 6/7] test(fix): add type annotation --- springql-core/tests/e2e_high_level_rs.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/springql-core/tests/e2e_high_level_rs.rs b/springql-core/tests/e2e_high_level_rs.rs index 099480f2..e168fb51 100644 --- a/springql-core/tests/e2e_high_level_rs.rs +++ b/springql-core/tests/e2e_high_level_rs.rs @@ -247,7 +247,7 @@ fn test_e2e_pop_from_in_memory_queue() { for _ in 0..trade_times { let row = pipeline.pop(queue_name).unwrap(); - assert_eq!(row.get_not_null_by_index(0).unwrap(), ts); - assert_eq!(row.get_not_null_by_index(1).unwrap(), amount); + assert_eq!(row.get_not_null_by_index::(0).unwrap(), ts); + assert_eq!(row.get_not_null_by_index::(1).unwrap(), amount); } } From ed5c55f45e2093524d1d4b0c04d2bc3719c0b5ae Mon Sep 17 00:00:00 2001 From: Sho Nakatani Date: Mon, 9 May 2022 07:15:42 +0900 Subject: [PATCH 7/7] docs: write CHANGELOG --- CHANGELOG.md | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 42e7899c..fc34bbea 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,15 @@ The format is based on [Keep a Changelog][Keep a Changelog] and this project adh ## [Unreleased] +### Added + +- `SpringPipelineHL::pop()` to get `SpringRowHL` from an in-memory queue ([#119](https://github.com/SpringQL/SpringQL/pull/119)). +- `SpringRowHL::get_not_null_by_index()` to get a column value from `SpringRowHL` ([#119](https://github.com/SpringQL/SpringQL/pull/119)). +- Made public ([#119](https://github.com/SpringQL/SpringQL/pull/119)): + - `SpringTimestamp` + - `SpringEventDuration` + - `SpringValue` trait + ## [v0.5.0] ### Fixed