Skip to content

Commit

Permalink
Merge pull request #134 from SpringQL/fix/non-blocking-pop
Browse files Browse the repository at this point in the history
feat: *pop_non_blocking()
  • Loading branch information
laysakura committed May 11, 2022
2 parents fd56f7b + a336296 commit 286ab9d
Show file tree
Hide file tree
Showing 6 changed files with 245 additions and 21 deletions.
24 changes: 23 additions & 1 deletion springql-core/src/api/high_level_rs.rs
Expand Up @@ -4,7 +4,10 @@

use crate::{
error::{Result, SpringError},
low_level_rs::{spring_command, spring_open, spring_pop, SpringConfig, SpringPipeline},
low_level_rs::{
spring_command, spring_open, spring_pop, spring_pop_non_blocking, SpringConfig,
SpringPipeline,
},
stream_engine::{SinkRow, SpringValue, SqlValue},
};

Expand Down Expand Up @@ -35,13 +38,32 @@ impl SpringPipelineHL {

/// Pop a row from an in memory queue. This is a blocking function.
///
/// **Do not call this function from threads.**
/// If you need to pop from multiple in-memory queues using threads, use `spring_pop_non_blocking()`.
/// See: <https://github.com/SpringQL/SpringQL/issues/125>
///
/// # Failure
///
/// - [SpringError::Unavailable](crate::error::SpringError::Unavailable) when:
/// - queue named `queue` does not exist.
pub fn pop(&self, queue: &str) -> Result<SpringRowHL> {
spring_pop(&self.0, queue).map(|row| SpringRowHL(row.0))
}

/// Pop a row from an in memory queue. This is a non-blocking function.
///
/// # Returns
///
/// - `Ok(Some)` when at least a row is in the queue.
/// - `None` when no row is in the queue.
///
/// # Failure
///
/// - [SpringError::Unavailable](crate::error::SpringError::Unavailable) when:
/// - queue named `queue` does not exist.
pub fn pop_non_blocking(&self, queue: &str) -> Result<Option<SpringRowHL>> {
spring_pop_non_blocking(&self.0, queue).map(|opt_row| opt_row.map(|row| SpringRowHL(row.0)))
}
}

/// Row object from an in memory queue.
Expand Down
40 changes: 37 additions & 3 deletions springql-core/src/api/low_level_rs.rs
Expand Up @@ -9,7 +9,7 @@ mod spring_config;

pub use spring_config::*;

use std::sync::Once;
use std::{sync::Once, thread, time::Duration};

use crate::{
error::{Result, SpringError},
Expand Down Expand Up @@ -89,14 +89,48 @@ pub fn spring_command(pipeline: &SpringPipeline, sql: &str) -> Result<()> {

/// Pop a row from an in memory queue. This is a blocking function.
///
/// **Do not call this function from threads.**
/// If you need to pop from multiple in-memory queues using threads, use `spring_pop_non_blocking()`.
/// See: <https://github.com/SpringQL/SpringQL/issues/125>
///
/// # Failure
///
/// - [SpringError::Unavailable](crate::error::SpringError::Unavailable) when:
/// - queue named `queue` does not exist.
pub fn spring_pop(pipeline: &SpringPipeline, queue: &str) -> Result<SpringRow> {
const SLEEP_MSECS: u64 = 10;

let mut engine = pipeline.engine.get()?;

loop {
if let Some(sink_row) =
engine.pop_in_memory_queue_non_blocking(QueueName::new(queue.to_string()))?
{
return Ok(SpringRow::from(sink_row));
} else {
thread::sleep(Duration::from_millis(SLEEP_MSECS));
}
}
}

/// Pop a row from an in memory queue. This is a non-blocking function.
///
/// # Returns
///
/// - `Ok(Some)` when at least a row is in the queue.
/// - `None` when no row is in the queue.
///
/// # Failure
///
/// - [SpringError::Unavailable](crate::error::SpringError::Unavailable) when:
/// - queue named `queue` does not exist.
pub fn spring_pop_non_blocking(
pipeline: &SpringPipeline,
queue: &str,
) -> Result<Option<SpringRow>> {
let mut engine = pipeline.engine.get()?;
let sink_row = engine.pop_in_memory_queue(QueueName::new(queue.to_string()))?;
Ok(SpringRow::from(sink_row))
let sink_row = engine.pop_in_memory_queue_non_blocking(QueueName::new(queue.to_string()))?;
Ok(sink_row.map(SpringRow::from))
}

/// Get an integer column.
Expand Down
12 changes: 9 additions & 3 deletions springql-core/src/stream_engine.rs
Expand Up @@ -83,15 +83,21 @@ impl StreamEngine {
self.autonomous_executor.notify_pipeline_update(pipeline)
}

/// Blocking call
/// # Returns
///
/// - `Ok(Some)` when at least a row is in the queue.
/// - `None` when no row is in the queue.
///
/// # Failure
///
/// - [SpringError::Unavailable](crate::error::SpringError::Unavailable) when:
/// - queue named `queue_name` does not exist.
pub(crate) fn pop_in_memory_queue(&mut self, queue_name: QueueName) -> Result<SinkRow> {
pub(crate) fn pop_in_memory_queue_non_blocking(
&mut self,
queue_name: QueueName,
) -> Result<Option<SinkRow>> {
let q = InMemoryQueueRepository::instance().get(&queue_name)?;
let row = q.pop();
let row = q.pop_non_blocking();
Ok(row)
}
}
Expand Up @@ -3,30 +3,22 @@
use std::{
collections::VecDeque,
sync::{Mutex, MutexGuard},
thread,
time::Duration,
};

use crate::stream_engine::autonomous_executor::row::foreign_row::sink_row::SinkRow;

const SLEEP_MSECS: u64 = 10;

#[derive(Debug, Default)]
pub(in crate::stream_engine) struct InMemoryQueue(
Mutex<VecDeque<SinkRow>>, // TODO faster (lock-free?) queue
);

impl InMemoryQueue {
/// Blocking call
pub(in crate::stream_engine) fn pop(&self) -> SinkRow {
loop {
let r = self.lock().pop_front();
if let Some(r) = r {
return r;
} else {
thread::sleep(Duration::from_millis(SLEEP_MSECS));
}
}
/// # Returns
///
/// - `Ok(Some)` when at least a row is in the queue.
/// - `None` when no row is in the queue.
pub(in crate::stream_engine) fn pop_non_blocking(&self) -> Option<SinkRow> {
self.lock().pop_front()
}

pub(in crate::stream_engine) fn push(&self, row: SinkRow) {
Expand Down
85 changes: 85 additions & 0 deletions springql-core/tests/e2e_high_level_rs.rs
Expand Up @@ -251,3 +251,88 @@ fn test_e2e_pop_from_in_memory_queue() {
assert_eq!(row.get_not_null_by_index::<i32>(1).unwrap(), amount);
}
}

#[test]
fn test_e2e_pop_non_blocking_from_in_memory_queue() {
setup_test_logger();

let queue_name = "queue_trade_nb"; // FIXME using the same name as in test_e2e_pop_from_in_memory_queue causes panic
let ts = "2021-11-04 23:02:52.123456789";
let ticker = "ORCL";
let amount = 20;

let json_oracle = json!({
"ts": ts,
"ticker": ticker,
"amount": amount,
});
let trade_times = 5;

let test_source = ForeignSource::new().unwrap();

let ddls = vec![
"
CREATE SOURCE STREAM source_trade (
ts TIMESTAMP NOT NULL ROWTIME,
ticker TEXT NOT NULL,
amount INTEGER NOT NULL
);
"
.to_string(),
"
CREATE SINK STREAM sink_trade (
ts TIMESTAMP NOT NULL,
amount INTEGER NOT NULL
);
"
.to_string(),
"
CREATE PUMP pu_projection AS
INSERT INTO sink_trade (ts, amount)
SELECT STREAM source_trade.ts, source_trade.amount FROM source_trade;
"
.to_string(),
format!(
"
CREATE SINK WRITER queue_sink_trade FOR sink_trade
TYPE IN_MEMORY_QUEUE OPTIONS (
NAME '{queue_name}'
);
",
queue_name = queue_name,
),
format!(
"
CREATE SOURCE READER tcp_trade FOR source_trade
TYPE NET_CLIENT OPTIONS (
PROTOCOL 'TCP',
REMOTE_HOST '{remote_host}',
REMOTE_PORT '{remote_port}'
);
",
remote_host = test_source.host_ip(),
remote_port = test_source.port()
),
];

let pipeline = apply_ddls(&ddls, SpringConfig::default());
test_source.start(ForeignSourceInput::new_fifo_batch(
(0..trade_times)
.into_iter()
.map(|_| json_oracle.clone())
.collect(),
));

for _ in 0..trade_times {
loop {
match pipeline.pop_non_blocking(queue_name).unwrap() {
Some(row) => {
assert_eq!(row.get_not_null_by_index::<String>(0).unwrap(), ts);
assert_eq!(row.get_not_null_by_index::<i32>(1).unwrap(), amount);
break;
}
None => {}
}
}
}
}
85 changes: 85 additions & 0 deletions springql-core/tests/e2e_low_level_rs.rs
Expand Up @@ -252,6 +252,91 @@ fn test_e2e_pop_from_in_memory_queue() {
}
}

#[test]
fn test_e2e_pop_non_blocking_from_in_memory_queue() {
setup_test_logger();

let queue_name = "queue_trade_nb"; // FIXME using the same name as in test_e2e_pop_from_in_memory_queue causes panic
let ts = "2021-11-04 23:02:52.123456789";
let ticker = "ORCL";
let amount = 20;

let json_oracle = json!({
"ts": ts,
"ticker": ticker,
"amount": amount,
});
let trade_times = 5;

let test_source = ForeignSource::new().unwrap();

let ddls = vec![
"
CREATE SOURCE STREAM source_trade (
ts TIMESTAMP NOT NULL ROWTIME,
ticker TEXT NOT NULL,
amount INTEGER NOT NULL
);
"
.to_string(),
"
CREATE SINK STREAM sink_trade (
ts TIMESTAMP NOT NULL,
amount INTEGER NOT NULL
);
"
.to_string(),
"
CREATE PUMP pu_projection AS
INSERT INTO sink_trade (ts, amount)
SELECT STREAM source_trade.ts, source_trade.amount FROM source_trade;
"
.to_string(),
format!(
"
CREATE SINK WRITER queue_sink_trade FOR sink_trade
TYPE IN_MEMORY_QUEUE OPTIONS (
NAME '{queue_name}'
);
",
queue_name = queue_name,
),
format!(
"
CREATE SOURCE READER tcp_trade FOR source_trade
TYPE NET_CLIENT OPTIONS (
PROTOCOL 'TCP',
REMOTE_HOST '{remote_host}',
REMOTE_PORT '{remote_port}'
);
",
remote_host = test_source.host_ip(),
remote_port = test_source.port()
),
];

let pipeline = apply_ddls_low_level(&ddls, spring_config_default());
test_source.start(ForeignSourceInput::new_fifo_batch(
(0..trade_times)
.into_iter()
.map(|_| json_oracle.clone())
.collect(),
));

for _ in 0..trade_times {
loop {
match spring_pop_non_blocking(&pipeline, queue_name).unwrap() {
Some(row) => {
assert_eq!(spring_column_text(&row, 0).unwrap(), ts);
assert_eq!(spring_column_i32(&row, 1).unwrap(), amount);
break;
}
None => {}
}
}
}
}

#[test]
fn test_e2e_spring_column_apis() {
setup_test_logger();
Expand Down

0 comments on commit 286ab9d

Please sign in to comment.