Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: *pop_non_blocking() #134

Merged
merged 4 commits into from May 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
24 changes: 23 additions & 1 deletion springql-core/src/api/high_level_rs.rs
Expand Up @@ -4,7 +4,10 @@

use crate::{
error::{Result, SpringError},
low_level_rs::{spring_command, spring_open, spring_pop, SpringConfig, SpringPipeline},
low_level_rs::{
spring_command, spring_open, spring_pop, spring_pop_non_blocking, SpringConfig,
SpringPipeline,
},
stream_engine::{SinkRow, SpringValue, SqlValue},
};

Expand Down Expand Up @@ -35,13 +38,32 @@ impl SpringPipelineHL {

/// Pop a row from an in memory queue. This is a blocking function.
///
/// **Do not call this function from threads.**
/// If you need to pop from multiple in-memory queues using threads, use `spring_pop_non_blocking()`.
/// See: <https://github.com/SpringQL/SpringQL/issues/125>
///
/// # Failure
///
/// - [SpringError::Unavailable](crate::error::SpringError::Unavailable) when:
/// - queue named `queue` does not exist.
pub fn pop(&self, queue: &str) -> Result<SpringRowHL> {
spring_pop(&self.0, queue).map(|row| SpringRowHL(row.0))
}

/// Pop a row from an in memory queue. This is a non-blocking function.
///
/// # Returns
///
/// - `Ok(Some)` when at least a row is in the queue.
/// - `None` when no row is in the queue.
///
/// # Failure
///
/// - [SpringError::Unavailable](crate::error::SpringError::Unavailable) when:
/// - queue named `queue` does not exist.
pub fn pop_non_blocking(&self, queue: &str) -> Result<Option<SpringRowHL>> {
spring_pop_non_blocking(&self.0, queue).map(|opt_row| opt_row.map(|row| SpringRowHL(row.0)))
}
}

/// Row object from an in memory queue.
Expand Down
40 changes: 37 additions & 3 deletions springql-core/src/api/low_level_rs.rs
Expand Up @@ -9,7 +9,7 @@ mod spring_config;

pub use spring_config::*;

use std::sync::Once;
use std::{sync::Once, thread, time::Duration};

use crate::{
error::{Result, SpringError},
Expand Down Expand Up @@ -89,14 +89,48 @@ pub fn spring_command(pipeline: &SpringPipeline, sql: &str) -> Result<()> {

/// Pop a row from an in memory queue. This is a blocking function.
///
/// **Do not call this function from threads.**
/// If you need to pop from multiple in-memory queues using threads, use `spring_pop_non_blocking()`.
/// See: <https://github.com/SpringQL/SpringQL/issues/125>
///
/// # Failure
///
/// - [SpringError::Unavailable](crate::error::SpringError::Unavailable) when:
/// - queue named `queue` does not exist.
pub fn spring_pop(pipeline: &SpringPipeline, queue: &str) -> Result<SpringRow> {
const SLEEP_MSECS: u64 = 10;

let mut engine = pipeline.engine.get()?;

loop {
if let Some(sink_row) =
engine.pop_in_memory_queue_non_blocking(QueueName::new(queue.to_string()))?
{
return Ok(SpringRow::from(sink_row));
} else {
thread::sleep(Duration::from_millis(SLEEP_MSECS));
}
}
}

/// Pop a row from an in memory queue. This is a non-blocking function.
///
/// # Returns
///
/// - `Ok(Some)` when at least a row is in the queue.
/// - `None` when no row is in the queue.
///
/// # Failure
///
/// - [SpringError::Unavailable](crate::error::SpringError::Unavailable) when:
/// - queue named `queue` does not exist.
pub fn spring_pop_non_blocking(
pipeline: &SpringPipeline,
queue: &str,
) -> Result<Option<SpringRow>> {
let mut engine = pipeline.engine.get()?;
let sink_row = engine.pop_in_memory_queue(QueueName::new(queue.to_string()))?;
Ok(SpringRow::from(sink_row))
let sink_row = engine.pop_in_memory_queue_non_blocking(QueueName::new(queue.to_string()))?;
Ok(sink_row.map(SpringRow::from))
}

/// Get an integer column.
Expand Down
12 changes: 9 additions & 3 deletions springql-core/src/stream_engine.rs
Expand Up @@ -83,15 +83,21 @@ impl StreamEngine {
self.autonomous_executor.notify_pipeline_update(pipeline)
}

/// Blocking call
/// # Returns
///
/// - `Ok(Some)` when at least a row is in the queue.
/// - `None` when no row is in the queue.
///
/// # Failure
///
/// - [SpringError::Unavailable](crate::error::SpringError::Unavailable) when:
/// - queue named `queue_name` does not exist.
pub(crate) fn pop_in_memory_queue(&mut self, queue_name: QueueName) -> Result<SinkRow> {
pub(crate) fn pop_in_memory_queue_non_blocking(
&mut self,
queue_name: QueueName,
) -> Result<Option<SinkRow>> {
let q = InMemoryQueueRepository::instance().get(&queue_name)?;
let row = q.pop();
let row = q.pop_non_blocking();
Ok(row)
}
}
Expand Up @@ -3,30 +3,22 @@
use std::{
collections::VecDeque,
sync::{Mutex, MutexGuard},
thread,
time::Duration,
};

use crate::stream_engine::autonomous_executor::row::foreign_row::sink_row::SinkRow;

const SLEEP_MSECS: u64 = 10;

#[derive(Debug, Default)]
pub(in crate::stream_engine) struct InMemoryQueue(
Mutex<VecDeque<SinkRow>>, // TODO faster (lock-free?) queue
);

impl InMemoryQueue {
/// Blocking call
pub(in crate::stream_engine) fn pop(&self) -> SinkRow {
loop {
let r = self.lock().pop_front();
if let Some(r) = r {
return r;
} else {
thread::sleep(Duration::from_millis(SLEEP_MSECS));
}
}
/// # Returns
///
/// - `Ok(Some)` when at least a row is in the queue.
/// - `None` when no row is in the queue.
pub(in crate::stream_engine) fn pop_non_blocking(&self) -> Option<SinkRow> {
self.lock().pop_front()
}

pub(in crate::stream_engine) fn push(&self, row: SinkRow) {
Expand Down
85 changes: 85 additions & 0 deletions springql-core/tests/e2e_high_level_rs.rs
Expand Up @@ -251,3 +251,88 @@ fn test_e2e_pop_from_in_memory_queue() {
assert_eq!(row.get_not_null_by_index::<i32>(1).unwrap(), amount);
}
}

#[test]
fn test_e2e_pop_non_blocking_from_in_memory_queue() {
setup_test_logger();

let queue_name = "queue_trade_nb"; // FIXME using the same name as in test_e2e_pop_from_in_memory_queue causes panic
let ts = "2021-11-04 23:02:52.123456789";
let ticker = "ORCL";
let amount = 20;

let json_oracle = json!({
"ts": ts,
"ticker": ticker,
"amount": amount,
});
let trade_times = 5;

let test_source = ForeignSource::new().unwrap();

let ddls = vec![
"
CREATE SOURCE STREAM source_trade (
ts TIMESTAMP NOT NULL ROWTIME,
ticker TEXT NOT NULL,
amount INTEGER NOT NULL
);
"
.to_string(),
"
CREATE SINK STREAM sink_trade (
ts TIMESTAMP NOT NULL,
amount INTEGER NOT NULL
);
"
.to_string(),
"
CREATE PUMP pu_projection AS
INSERT INTO sink_trade (ts, amount)
SELECT STREAM source_trade.ts, source_trade.amount FROM source_trade;
"
.to_string(),
format!(
"
CREATE SINK WRITER queue_sink_trade FOR sink_trade
TYPE IN_MEMORY_QUEUE OPTIONS (
NAME '{queue_name}'
);
",
queue_name = queue_name,
),
format!(
"
CREATE SOURCE READER tcp_trade FOR source_trade
TYPE NET_CLIENT OPTIONS (
PROTOCOL 'TCP',
REMOTE_HOST '{remote_host}',
REMOTE_PORT '{remote_port}'
);
",
remote_host = test_source.host_ip(),
remote_port = test_source.port()
),
];

let pipeline = apply_ddls(&ddls, SpringConfig::default());
test_source.start(ForeignSourceInput::new_fifo_batch(
(0..trade_times)
.into_iter()
.map(|_| json_oracle.clone())
.collect(),
));

for _ in 0..trade_times {
loop {
match pipeline.pop_non_blocking(queue_name).unwrap() {
Some(row) => {
assert_eq!(row.get_not_null_by_index::<String>(0).unwrap(), ts);
assert_eq!(row.get_not_null_by_index::<i32>(1).unwrap(), amount);
break;
}
None => {}
}
}
}
}
85 changes: 85 additions & 0 deletions springql-core/tests/e2e_low_level_rs.rs
Expand Up @@ -252,6 +252,91 @@ fn test_e2e_pop_from_in_memory_queue() {
}
}

#[test]
fn test_e2e_pop_non_blocking_from_in_memory_queue() {
setup_test_logger();

let queue_name = "queue_trade_nb"; // FIXME using the same name as in test_e2e_pop_from_in_memory_queue causes panic
let ts = "2021-11-04 23:02:52.123456789";
let ticker = "ORCL";
let amount = 20;

let json_oracle = json!({
"ts": ts,
"ticker": ticker,
"amount": amount,
});
let trade_times = 5;

let test_source = ForeignSource::new().unwrap();

let ddls = vec![
"
CREATE SOURCE STREAM source_trade (
ts TIMESTAMP NOT NULL ROWTIME,
ticker TEXT NOT NULL,
amount INTEGER NOT NULL
);
"
.to_string(),
"
CREATE SINK STREAM sink_trade (
ts TIMESTAMP NOT NULL,
amount INTEGER NOT NULL
);
"
.to_string(),
"
CREATE PUMP pu_projection AS
INSERT INTO sink_trade (ts, amount)
SELECT STREAM source_trade.ts, source_trade.amount FROM source_trade;
"
.to_string(),
format!(
"
CREATE SINK WRITER queue_sink_trade FOR sink_trade
TYPE IN_MEMORY_QUEUE OPTIONS (
NAME '{queue_name}'
);
",
queue_name = queue_name,
),
format!(
"
CREATE SOURCE READER tcp_trade FOR source_trade
TYPE NET_CLIENT OPTIONS (
PROTOCOL 'TCP',
REMOTE_HOST '{remote_host}',
REMOTE_PORT '{remote_port}'
);
",
remote_host = test_source.host_ip(),
remote_port = test_source.port()
),
];

let pipeline = apply_ddls_low_level(&ddls, spring_config_default());
test_source.start(ForeignSourceInput::new_fifo_batch(
(0..trade_times)
.into_iter()
.map(|_| json_oracle.clone())
.collect(),
));

for _ in 0..trade_times {
loop {
match spring_pop_non_blocking(&pipeline, queue_name).unwrap() {
Some(row) => {
assert_eq!(spring_column_text(&row, 0).unwrap(), ts);
assert_eq!(spring_column_i32(&row, 1).unwrap(), amount);
break;
}
None => {}
}
}
}
}

#[test]
fn test_e2e_spring_column_apis() {
setup_test_logger();
Expand Down