From 48b0e29d3d781aef6fdc49cecf1121356402a96f Mon Sep 17 00:00:00 2001 From: Sho Nakatani Date: Wed, 11 May 2022 16:59:09 +0900 Subject: [PATCH 1/4] feat: add spring_pop_non_blocking() and SpringPipelineHL::pop_non_blocking() --- springql-core/src/api/high_level_rs.rs | 24 ++++++++++- springql-core/src/api/low_level_rs.rs | 40 +++++++++++++++++-- springql-core/src/stream_engine.rs | 12 ++++-- .../in_memory_queue.rs | 20 +++------- 4 files changed, 75 insertions(+), 21 deletions(-) diff --git a/springql-core/src/api/high_level_rs.rs b/springql-core/src/api/high_level_rs.rs index d76977f4..7fd5940a 100644 --- a/springql-core/src/api/high_level_rs.rs +++ b/springql-core/src/api/high_level_rs.rs @@ -4,7 +4,10 @@ use crate::{ error::{Result, SpringError}, - low_level_rs::{spring_command, spring_open, spring_pop, SpringConfig, SpringPipeline}, + low_level_rs::{ + spring_command, spring_open, spring_pop, spring_pop_non_blocking, SpringConfig, + SpringPipeline, + }, stream_engine::{SinkRow, SpringValue, SqlValue}, }; @@ -35,6 +38,10 @@ impl SpringPipelineHL { /// Pop a row from an in memory queue. This is a blocking function. /// + /// **Do not call this function from threads.** + /// If you need to pop from multiple in-memory queues using threads, use `spring_pop_non_blocking()`. + /// See: + /// /// # Failure /// /// - [SpringError::Unavailable](crate::error::SpringError::Unavailable) when: @@ -42,6 +49,21 @@ impl SpringPipelineHL { pub fn pop(&self, queue: &str) -> Result { spring_pop(&self.0, queue).map(|row| SpringRowHL(row.0)) } + + /// Pop a row from an in memory queue. This is a non-blocking function. + /// + /// # Returns + /// + /// - `Ok(Some)` when at least a row is in the queue. + /// - `None` when no row is in the queue. + /// + /// # Failure + /// + /// - [SpringError::Unavailable](crate::error::SpringError::Unavailable) when: + /// - queue named `queue` does not exist. + pub fn pop_non_blocking(&self, queue: &str) -> Result> { + spring_pop_non_blocking(&self.0, queue).map(|opt_row| opt_row.map(|row| SpringRowHL(row.0))) + } } /// Row object from an in memory queue. diff --git a/springql-core/src/api/low_level_rs.rs b/springql-core/src/api/low_level_rs.rs index 29706b19..5cb88ae5 100644 --- a/springql-core/src/api/low_level_rs.rs +++ b/springql-core/src/api/low_level_rs.rs @@ -9,7 +9,7 @@ mod spring_config; pub use spring_config::*; -use std::sync::Once; +use std::{sync::Once, thread, time::Duration}; use crate::{ error::{Result, SpringError}, @@ -89,14 +89,48 @@ pub fn spring_command(pipeline: &SpringPipeline, sql: &str) -> Result<()> { /// Pop a row from an in memory queue. This is a blocking function. /// +/// **Do not call this function from threads.** +/// If you need to pop from multiple in-memory queues using threads, use `spring_pop_non_blocking()`. +/// See: +/// /// # Failure /// /// - [SpringError::Unavailable](crate::error::SpringError::Unavailable) when: /// - queue named `queue` does not exist. pub fn spring_pop(pipeline: &SpringPipeline, queue: &str) -> Result { + const SLEEP_MSECS: u64 = 10; + + let mut engine = pipeline.engine.get()?; + + loop { + if let Some(sink_row) = + engine.pop_in_memory_queue_non_blocking(QueueName::new(queue.to_string()))? + { + return Ok(SpringRow::from(sink_row)); + } else { + thread::sleep(Duration::from_millis(SLEEP_MSECS)); + } + } +} + +/// Pop a row from an in memory queue. This is a non-blocking function. +/// +/// # Returns +/// +/// - `Ok(Some)` when at least a row is in the queue. +/// - `None` when no row is in the queue. +/// +/// # Failure +/// +/// - [SpringError::Unavailable](crate::error::SpringError::Unavailable) when: +/// - queue named `queue` does not exist. +pub fn spring_pop_non_blocking( + pipeline: &SpringPipeline, + queue: &str, +) -> Result> { let mut engine = pipeline.engine.get()?; - let sink_row = engine.pop_in_memory_queue(QueueName::new(queue.to_string()))?; - Ok(SpringRow::from(sink_row)) + let sink_row = engine.pop_in_memory_queue_non_blocking(QueueName::new(queue.to_string()))?; + Ok(sink_row.map(SpringRow::from)) } /// Get an integer column. diff --git a/springql-core/src/stream_engine.rs b/springql-core/src/stream_engine.rs index 17dbb471..0e616a59 100644 --- a/springql-core/src/stream_engine.rs +++ b/springql-core/src/stream_engine.rs @@ -83,15 +83,21 @@ impl StreamEngine { self.autonomous_executor.notify_pipeline_update(pipeline) } - /// Blocking call + /// # Returns + /// + /// - `Ok(Some)` when at least a row is in the queue. + /// - `None` when no row is in the queue. /// /// # Failure /// /// - [SpringError::Unavailable](crate::error::SpringError::Unavailable) when: /// - queue named `queue_name` does not exist. - pub(crate) fn pop_in_memory_queue(&mut self, queue_name: QueueName) -> Result { + pub(crate) fn pop_in_memory_queue_non_blocking( + &mut self, + queue_name: QueueName, + ) -> Result> { let q = InMemoryQueueRepository::instance().get(&queue_name)?; - let row = q.pop(); + let row = q.pop_non_blocking(); Ok(row) } } diff --git a/springql-core/src/stream_engine/in_memory_queue_repository/in_memory_queue.rs b/springql-core/src/stream_engine/in_memory_queue_repository/in_memory_queue.rs index 54360423..05b3fdf0 100644 --- a/springql-core/src/stream_engine/in_memory_queue_repository/in_memory_queue.rs +++ b/springql-core/src/stream_engine/in_memory_queue_repository/in_memory_queue.rs @@ -3,30 +3,22 @@ use std::{ collections::VecDeque, sync::{Mutex, MutexGuard}, - thread, - time::Duration, }; use crate::stream_engine::autonomous_executor::row::foreign_row::sink_row::SinkRow; -const SLEEP_MSECS: u64 = 10; - #[derive(Debug, Default)] pub(in crate::stream_engine) struct InMemoryQueue( Mutex>, // TODO faster (lock-free?) queue ); impl InMemoryQueue { - /// Blocking call - pub(in crate::stream_engine) fn pop(&self) -> SinkRow { - loop { - let r = self.lock().pop_front(); - if let Some(r) = r { - return r; - } else { - thread::sleep(Duration::from_millis(SLEEP_MSECS)); - } - } + /// # Returns + /// + /// - `Ok(Some)` when at least a row is in the queue. + /// - `None` when no row is in the queue. + pub(in crate::stream_engine) fn pop_non_blocking(&self) -> Option { + self.lock().pop_front() } pub(in crate::stream_engine) fn push(&self, row: SinkRow) { From cd0c85b627e208257e547e6ab19baa7316508478 Mon Sep 17 00:00:00 2001 From: Sho Nakatani Date: Wed, 11 May 2022 17:02:06 +0900 Subject: [PATCH 2/4] test: add E2E test for spring_pop_non_blocking() --- springql-core/tests/e2e_low_level_rs.rs | 85 +++++++++++++++++++++++++ 1 file changed, 85 insertions(+) diff --git a/springql-core/tests/e2e_low_level_rs.rs b/springql-core/tests/e2e_low_level_rs.rs index 4e348663..68ee4a22 100644 --- a/springql-core/tests/e2e_low_level_rs.rs +++ b/springql-core/tests/e2e_low_level_rs.rs @@ -252,6 +252,91 @@ fn test_e2e_pop_from_in_memory_queue() { } } +#[test] +fn test_e2e_pop_non_blocking_from_in_memory_queue() { + setup_test_logger(); + + let queue_name = "queue_trade"; + let ts = "2021-11-04 23:02:52.123456789"; + let ticker = "ORCL"; + let amount = 20; + + let json_oracle = json!({ + "ts": ts, + "ticker": ticker, + "amount": amount, + }); + let trade_times = 5; + + let test_source = ForeignSource::new().unwrap(); + + let ddls = vec![ + " + CREATE SOURCE STREAM source_trade ( + ts TIMESTAMP NOT NULL ROWTIME, + ticker TEXT NOT NULL, + amount INTEGER NOT NULL + ); + " + .to_string(), + " + CREATE SINK STREAM sink_trade ( + ts TIMESTAMP NOT NULL, + amount INTEGER NOT NULL + ); + " + .to_string(), + " + CREATE PUMP pu_projection AS + INSERT INTO sink_trade (ts, amount) + SELECT STREAM source_trade.ts, source_trade.amount FROM source_trade; + " + .to_string(), + format!( + " + CREATE SINK WRITER queue_sink_trade FOR sink_trade + TYPE IN_MEMORY_QUEUE OPTIONS ( + NAME '{queue_name}' + ); + ", + queue_name = queue_name, + ), + format!( + " + CREATE SOURCE READER tcp_trade FOR source_trade + TYPE NET_CLIENT OPTIONS ( + PROTOCOL 'TCP', + REMOTE_HOST '{remote_host}', + REMOTE_PORT '{remote_port}' + ); + ", + remote_host = test_source.host_ip(), + remote_port = test_source.port() + ), + ]; + + let pipeline = apply_ddls_low_level(&ddls, spring_config_default()); + test_source.start(ForeignSourceInput::new_fifo_batch( + (0..trade_times) + .into_iter() + .map(|_| json_oracle.clone()) + .collect(), + )); + + for _ in 0..trade_times { + loop { + match spring_pop_non_blocking(&pipeline, queue_name).unwrap() { + Some(row) => { + assert_eq!(spring_column_text(&row, 0).unwrap(), ts); + assert_eq!(spring_column_i32(&row, 1).unwrap(), amount); + break; + } + None => {} + } + } + } +} + #[test] fn test_e2e_spring_column_apis() { setup_test_logger(); From ea955c474fc5cbc0c53a3b79e14d20f65bcd5ed4 Mon Sep 17 00:00:00 2001 From: Sho Nakatani Date: Wed, 11 May 2022 17:03:27 +0900 Subject: [PATCH 3/4] test: add E2E test for SpringPipelineHL::pop_non_blocking --- springql-core/tests/e2e_high_level_rs.rs | 85 ++++++++++++++++++++++++ 1 file changed, 85 insertions(+) diff --git a/springql-core/tests/e2e_high_level_rs.rs b/springql-core/tests/e2e_high_level_rs.rs index e168fb51..3ec61e30 100644 --- a/springql-core/tests/e2e_high_level_rs.rs +++ b/springql-core/tests/e2e_high_level_rs.rs @@ -251,3 +251,88 @@ fn test_e2e_pop_from_in_memory_queue() { assert_eq!(row.get_not_null_by_index::(1).unwrap(), amount); } } + +#[test] +fn test_e2e_pop_non_blocking_from_in_memory_queue() { + setup_test_logger(); + + let queue_name = "queue_trade"; + let ts = "2021-11-04 23:02:52.123456789"; + let ticker = "ORCL"; + let amount = 20; + + let json_oracle = json!({ + "ts": ts, + "ticker": ticker, + "amount": amount, + }); + let trade_times = 5; + + let test_source = ForeignSource::new().unwrap(); + + let ddls = vec![ + " + CREATE SOURCE STREAM source_trade ( + ts TIMESTAMP NOT NULL ROWTIME, + ticker TEXT NOT NULL, + amount INTEGER NOT NULL + ); + " + .to_string(), + " + CREATE SINK STREAM sink_trade ( + ts TIMESTAMP NOT NULL, + amount INTEGER NOT NULL + ); + " + .to_string(), + " + CREATE PUMP pu_projection AS + INSERT INTO sink_trade (ts, amount) + SELECT STREAM source_trade.ts, source_trade.amount FROM source_trade; + " + .to_string(), + format!( + " + CREATE SINK WRITER queue_sink_trade FOR sink_trade + TYPE IN_MEMORY_QUEUE OPTIONS ( + NAME '{queue_name}' + ); + ", + queue_name = queue_name, + ), + format!( + " + CREATE SOURCE READER tcp_trade FOR source_trade + TYPE NET_CLIENT OPTIONS ( + PROTOCOL 'TCP', + REMOTE_HOST '{remote_host}', + REMOTE_PORT '{remote_port}' + ); + ", + remote_host = test_source.host_ip(), + remote_port = test_source.port() + ), + ]; + + let pipeline = apply_ddls(&ddls, SpringConfig::default()); + test_source.start(ForeignSourceInput::new_fifo_batch( + (0..trade_times) + .into_iter() + .map(|_| json_oracle.clone()) + .collect(), + )); + + for _ in 0..trade_times { + loop { + match pipeline.pop_non_blocking(queue_name).unwrap() { + Some(row) => { + assert_eq!(row.get_not_null_by_index::(0).unwrap(), ts); + assert_eq!(row.get_not_null_by_index::(1).unwrap(), amount); + break; + } + None => {} + } + } + } +} From a33629603beccc007e9057108fb10ce3c4bf65b5 Mon Sep 17 00:00:00 2001 From: Sho Nakatani Date: Wed, 11 May 2022 17:18:47 +0900 Subject: [PATCH 4/4] fix: change queue name to pass tests --- springql-core/tests/e2e_high_level_rs.rs | 2 +- springql-core/tests/e2e_low_level_rs.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/springql-core/tests/e2e_high_level_rs.rs b/springql-core/tests/e2e_high_level_rs.rs index 3ec61e30..01af0a69 100644 --- a/springql-core/tests/e2e_high_level_rs.rs +++ b/springql-core/tests/e2e_high_level_rs.rs @@ -256,7 +256,7 @@ fn test_e2e_pop_from_in_memory_queue() { fn test_e2e_pop_non_blocking_from_in_memory_queue() { setup_test_logger(); - let queue_name = "queue_trade"; + let queue_name = "queue_trade_nb"; // FIXME using the same name as in test_e2e_pop_from_in_memory_queue causes panic let ts = "2021-11-04 23:02:52.123456789"; let ticker = "ORCL"; let amount = 20; diff --git a/springql-core/tests/e2e_low_level_rs.rs b/springql-core/tests/e2e_low_level_rs.rs index 68ee4a22..a81cf7b0 100644 --- a/springql-core/tests/e2e_low_level_rs.rs +++ b/springql-core/tests/e2e_low_level_rs.rs @@ -256,7 +256,7 @@ fn test_e2e_pop_from_in_memory_queue() { fn test_e2e_pop_non_blocking_from_in_memory_queue() { setup_test_logger(); - let queue_name = "queue_trade"; + let queue_name = "queue_trade_nb"; // FIXME using the same name as in test_e2e_pop_from_in_memory_queue causes panic let ts = "2021-11-04 23:02:52.123456789"; let ticker = "ORCL"; let amount = 20;