Skip to content

Commit

Permalink
Merge pull request #119 from SpringQL/feat/SpringPipelineHL-pop
Browse files Browse the repository at this point in the history
feat: High-level API to get rows and column values from in-memory queue
  • Loading branch information
laysakura committed May 8, 2022
2 parents 5ae6009 + ed5c55f commit 2055691
Show file tree
Hide file tree
Showing 37 changed files with 544 additions and 215 deletions.
9 changes: 9 additions & 0 deletions CHANGELOG.md
Expand Up @@ -6,6 +6,15 @@ The format is based on [Keep a Changelog][Keep a Changelog] and this project adh

## [Unreleased]

### Added

- `SpringPipelineHL::pop()` to get `SpringRowHL` from an in-memory queue ([#119](https://github.com/SpringQL/SpringQL/pull/119)).
- `SpringRowHL::get_not_null_by_index()` to get a column value from `SpringRowHL` ([#119](https://github.com/SpringQL/SpringQL/pull/119)).
- Made public ([#119](https://github.com/SpringQL/SpringQL/pull/119)):
- `SpringTimestamp`
- `SpringEventDuration`
- `SpringValue` trait

## [v0.5.0]

### Fixed
Expand Down
4 changes: 4 additions & 0 deletions springql-core/src/api.rs
Expand Up @@ -3,3 +3,7 @@
pub mod error;
pub mod high_level_rs;
pub mod low_level_rs;

pub use crate::stream_engine::time::duration::event_duration::SpringEventDuration;
pub use crate::stream_engine::time::timestamp::SpringTimestamp;
pub use crate::stream_engine::SpringValue;
10 changes: 10 additions & 0 deletions springql-core/src/api/error.rs
Expand Up @@ -6,6 +6,8 @@ pub mod foreign_info;

use thiserror::Error;

use crate::pipeline::name::StreamName;

use self::foreign_info::ForeignInfo;

/// Result type
Expand Down Expand Up @@ -60,4 +62,12 @@ pub enum SpringError {

#[error("SQL error")]
Sql(anyhow::Error),

/// Occurs only when a value is fetched from a SpringRow.
#[error("unexpectedly got NULL")]
Null {
stream_name: StreamName,
/// Column index
i_col: usize,
},
}
44 changes: 42 additions & 2 deletions springql-core/src/api/high_level_rs.rs
Expand Up @@ -3,8 +3,9 @@
//! High-level Rust API to execute / register SpringQL from Rust.

use crate::{
error::Result,
low_level_rs::{spring_command, spring_open, SpringConfig, SpringPipeline},
error::{Result, SpringError},
low_level_rs::{spring_command, spring_open, spring_pop, SpringConfig, SpringPipeline},
stream_engine::{SinkRow, SpringValue, SqlValue},
};

/// Pipeline.
Expand All @@ -31,6 +32,45 @@ impl SpringPipelineHL {
pub fn command<S: AsRef<str>>(&self, sql: S) -> Result<()> {
spring_command(&self.0, sql.as_ref())
}

/// Pop a row from an in memory queue. This is a blocking function.
///
/// # Failure
///
/// - [SpringError::Unavailable](crate::error::SpringError::Unavailable) when:
/// - queue named `queue` does not exist.
pub fn pop(&self, queue: &str) -> Result<SpringRowHL> {
spring_pop(&self.0, queue).map(|row| SpringRowHL(row.0))
}
}

/// Row object from an in memory queue.
#[derive(Debug)]
pub struct SpringRowHL(SinkRow);

impl SpringRowHL {
/// Get a i-th column value from the row.
///
/// # Failure
///
/// - [SpringError::Sql](crate::error::SpringError::Sql) when:
/// - Column index out of range
/// - [SpringError::Null](crate::error::SpringError::Null) when:
/// - Column value is NULL
pub fn get_not_null_by_index<T>(&self, i_col: usize) -> Result<T>
where
T: SpringValue, // TODO use this function in spring_column_*()
{
let sql_value = self.0.get_by_index(i_col)?;

match sql_value {
SqlValue::Null => Err(SpringError::Null {
stream_name: self.0.stream_name().clone(),
i_col,
}),
SqlValue::NotNull(nn_sql_value) => nn_sql_value.unpack(),
}
}
}

impl SpringConfig {
Expand Down
2 changes: 1 addition & 1 deletion springql-core/src/api/low_level_rs.rs
Expand Up @@ -44,7 +44,7 @@ pub struct SpringPipeline {

/// Row object from an in memory queue.
#[derive(Debug)]
pub struct SpringRow(SinkRow);
pub struct SpringRow(pub(in crate::api) SinkRow);

impl From<SinkRow> for SpringRow {
fn from(sink_row: SinkRow) -> Self {
Expand Down
4 changes: 2 additions & 2 deletions springql-core/src/expr_resolver.rs
Expand Up @@ -187,7 +187,7 @@ impl ExprResolver {

#[cfg(test)]
mod tests {
use crate::{expression::ValueExpr, stream_engine::time::timestamp::Timestamp};
use crate::{expression::ValueExpr, stream_engine::time::timestamp::SpringTimestamp};

use super::*;

Expand Down Expand Up @@ -228,7 +228,7 @@ mod tests {
ValueExpr::factory_integer(3),
));

let empty_tuple = Tuple::new(Timestamp::fx_ts1(), vec![]);
let empty_tuple = Tuple::new(SpringTimestamp::fx_ts1(), vec![]);

assert_eq!(
resolver
Expand Down
6 changes: 3 additions & 3 deletions springql-core/src/expression.rs
Expand Up @@ -21,7 +21,7 @@ use crate::{
pump_model::window_operation_parameter::aggregate::AggregateFunctionParameter,
},
stream_engine::{
time::duration::{event_duration::EventDuration, SpringDuration},
time::duration::{event_duration::SpringEventDuration, SpringDuration},
NnSqlValue, SqlCompareResult, SqlValue, Tuple,
},
};
Expand Down Expand Up @@ -261,7 +261,7 @@ impl ValueExprPh2 {
let duration_value = duration_millis.eval()?;
let duration_millis = duration_value.to_i64()?;
if duration_millis >= 0 {
let duration = EventDuration::from_millis(duration_millis as u64);
let duration = SpringEventDuration::from_millis(duration_millis as u64);
Ok(SqlValue::NotNull(NnSqlValue::Duration(duration)))
} else {
Err(SpringError::Sql(anyhow!(
Expand All @@ -274,7 +274,7 @@ impl ValueExprPh2 {
let duration_value = duration_secs.eval()?;
let duration_secs = duration_value.to_i64()?;
if duration_secs >= 0 {
let duration = EventDuration::from_secs(duration_secs as u64);
let duration = SpringEventDuration::from_secs(duration_secs as u64);
Ok(SqlValue::NotNull(NnSqlValue::Duration(duration)))
} else {
Err(SpringError::Sql(anyhow!(
Expand Down
18 changes: 9 additions & 9 deletions springql-core/src/pipeline/pump_model/window_parameter.rs
@@ -1,6 +1,6 @@
// This file is part of https://github.com/SpringQL/SpringQL which is licensed under MIT OR Apache-2.0. See file LICENSE-MIT or LICENSE-APACHE for full license details.

use crate::stream_engine::time::duration::event_duration::EventDuration;
use crate::stream_engine::time::duration::event_duration::SpringEventDuration;

/// Window parameters
#[derive(Clone, Eq, PartialEq, Debug)]
Expand All @@ -18,9 +18,9 @@ pub(crate) enum WindowParameter {
/// :00 :05 :10 :15 :20
/// ```
TimedSlidingWindow {
length: EventDuration,
period: EventDuration,
allowed_delay: EventDuration,
length: SpringEventDuration,
period: SpringEventDuration,
allowed_delay: SpringEventDuration,
},

/// Time-based fixed window
Expand All @@ -36,27 +36,27 @@ pub(crate) enum WindowParameter {
/// :00 :05 :10 :15 :20
/// ```
TimedFixedWindow {
length: EventDuration,
allowed_delay: EventDuration,
length: SpringEventDuration,
allowed_delay: SpringEventDuration,
},
}

impl WindowParameter {
pub(crate) fn length(&self) -> EventDuration {
pub(crate) fn length(&self) -> SpringEventDuration {
match self {
WindowParameter::TimedSlidingWindow { length, .. } => *length,
WindowParameter::TimedFixedWindow { length, .. } => *length,
}
}

pub(crate) fn period(&self) -> EventDuration {
pub(crate) fn period(&self) -> SpringEventDuration {
match self {
WindowParameter::TimedSlidingWindow { period, .. } => *period,
WindowParameter::TimedFixedWindow { length, .. } => *length,
}
}

pub(crate) fn allowed_delay(&self) -> EventDuration {
pub(crate) fn allowed_delay(&self) -> SpringEventDuration {
match self {
WindowParameter::TimedSlidingWindow { allowed_delay, .. } => *allowed_delay,
WindowParameter::TimedFixedWindow { allowed_delay, .. } => *allowed_delay,
Expand Down
Expand Up @@ -34,7 +34,7 @@ use crate::sql_processor::sql_parser::syntax::{
ColumnConstraintSyntax, OptionSyntax, SelectStreamSyntax,
};
use crate::stream_engine::command::insert_plan::InsertPlan;
use crate::stream_engine::time::duration::event_duration::EventDuration;
use crate::stream_engine::time::duration::event_duration::SpringEventDuration;
use crate::stream_engine::time::duration::SpringDuration;
use crate::stream_engine::{NnSqlValue, SqlValue};
use anyhow::{anyhow, Context};
Expand Down Expand Up @@ -211,10 +211,10 @@ impl PestParserImpl {

let event_duration = match duration_function {
DurationFunction::Millis => {
Ok(EventDuration::from_millis(integer_constant.to_i64()? as u64))
Ok(SpringEventDuration::from_millis(integer_constant.to_i64()? as u64))
}
DurationFunction::Secs => {
Ok(EventDuration::from_secs(integer_constant.to_i64()? as u64))
Ok(SpringEventDuration::from_secs(integer_constant.to_i64()? as u64))
}
}?;

Expand Down
7 changes: 4 additions & 3 deletions springql-core/src/stream_engine.rs
Expand Up @@ -26,6 +26,8 @@
//!
//! ![Communication between entities](https://raw.githubusercontent.com/SpringQL/SpringQL.github.io/main/static/img/stream-engine-architecture-communication.svg)

pub use autonomous_executor::SpringValue;

pub(crate) mod command;
pub(crate) mod time;

Expand All @@ -34,9 +36,8 @@ mod in_memory_queue_repository;
mod sql_executor;

pub(crate) use autonomous_executor::{
row::value::{
sql_convertible::SpringValue,
sql_value::{nn_sql_value::NnSqlValue, sql_compare_result::SqlCompareResult, SqlValue},
row::value::sql_value::{
nn_sql_value::NnSqlValue, sql_compare_result::SqlCompareResult, SqlValue,
},
task::tuple::Tuple,
SinkRow,
Expand Down
3 changes: 3 additions & 0 deletions springql-core/src/stream_engine/autonomous_executor.rs
@@ -1,5 +1,7 @@
// This file is part of https://github.com/SpringQL/SpringQL which is licensed under MIT OR Apache-2.0. See file LICENSE-MIT or LICENSE-APACHE for full license details.

pub use row::SpringValue;

pub(crate) mod row;
pub(crate) mod task;

Expand Down Expand Up @@ -160,6 +162,7 @@ impl AutonomousExecutor {
| SpringError::ThreadPoisoned(_) => log::error!("{:?}", e),

SpringError::InvalidConfig { .. } => unreachable!("must be handled on startup"),
SpringError::Null { .. } => unreachable!("must be handled on startup"),
}
}
}
8 changes: 5 additions & 3 deletions springql-core/src/stream_engine/autonomous_executor/row.rs
@@ -1,5 +1,7 @@
// This file is part of https://github.com/SpringQL/SpringQL which is licensed under MIT OR Apache-2.0. See file LICENSE-MIT or LICENSE-APACHE for full license details.

pub use value::SpringValue;

pub(crate) mod value;

pub(in crate::stream_engine::autonomous_executor) mod column;
Expand All @@ -17,14 +19,14 @@ use crate::pipeline::name::ColumnName;
use crate::pipeline::stream_model::StreamModel;
use crate::stream_engine::autonomous_executor::row::value::sql_value::nn_sql_value::NnSqlValue;
use crate::stream_engine::time::timestamp::system_timestamp::SystemTimestamp;
use crate::stream_engine::time::timestamp::Timestamp;
use crate::stream_engine::time::timestamp::SpringTimestamp;

/// - Mandatory `rowtime()`, either from `cols` or `arrival_rowtime`.
/// - PartialEq by all columns (NULL prevents Eq).
/// - PartialOrd by timestamp.
#[derive(Clone, PartialEq, Debug)]
pub(crate) struct Row {
arrival_rowtime: Option<Timestamp>,
arrival_rowtime: Option<SpringTimestamp>,

/// Columns
cols: StreamColumns,
Expand Down Expand Up @@ -54,7 +56,7 @@ impl Row {
///
/// - (default) Arrival time to a stream.
/// - Promoted from a column in a stream.
pub(in crate::stream_engine::autonomous_executor) fn rowtime(&self) -> Timestamp {
pub(in crate::stream_engine::autonomous_executor) fn rowtime(&self) -> SpringTimestamp {
self.arrival_rowtime.unwrap_or_else(|| {
self.cols
.promoted_rowtime()
Expand Down
Expand Up @@ -9,7 +9,7 @@ use crate::{
pipeline::{relation::column::column_definition::ColumnDefinition, stream_model::StreamModel},
stream_engine::{
autonomous_executor::row::{column_values::ColumnValues, value::sql_value::SqlValue},
time::timestamp::Timestamp,
time::timestamp::SpringTimestamp,
},
};
use std::{sync::Arc, vec};
Expand Down Expand Up @@ -67,7 +67,7 @@ impl StreamColumns {

pub(in crate::stream_engine::autonomous_executor) fn promoted_rowtime(
&self,
) -> Option<Timestamp> {
) -> Option<SpringTimestamp> {
let rowtime_col = self.stream_model.shape().promoted_rowtime()?;
let rowtime_sql_value = self
.get_by_column_name(rowtime_col)
Expand Down Expand Up @@ -173,7 +173,7 @@ impl IntoIterator for StreamColumns {
mod tests {
use crate::stream_engine::{
autonomous_executor::row::value::sql_value::nn_sql_value::NnSqlValue,
time::timestamp::Timestamp,
time::timestamp::SpringTimestamp,
};

use super::*;
Expand All @@ -184,7 +184,7 @@ mod tests {
column_values
.insert(
ColumnName::fx_timestamp(),
SqlValue::NotNull(NnSqlValue::Timestamp(Timestamp::fx_ts1())),
SqlValue::NotNull(NnSqlValue::Timestamp(SpringTimestamp::fx_ts1())),
)
.unwrap();
column_values
Expand All @@ -210,7 +210,7 @@ mod tests {
column_values
.insert(
ColumnName::new("timestamp".to_string()),
SqlValue::NotNull(NnSqlValue::Timestamp(Timestamp::fx_ts1())),
SqlValue::NotNull(NnSqlValue::Timestamp(SpringTimestamp::fx_ts1())),
)
.unwrap();
column_values
Expand All @@ -234,7 +234,7 @@ mod tests {
column_values
.insert(
ColumnName::new("timestamp".to_string()),
SqlValue::NotNull(NnSqlValue::Timestamp(Timestamp::fx_ts1())),
SqlValue::NotNull(NnSqlValue::Timestamp(SpringTimestamp::fx_ts1())),
)
.unwrap();
column_values
Expand Down Expand Up @@ -263,7 +263,7 @@ mod tests {
column_values
.insert(
ColumnName::new("timestamp".to_string()),
SqlValue::NotNull(NnSqlValue::Timestamp(Timestamp::fx_ts1())),
SqlValue::NotNull(NnSqlValue::Timestamp(SpringTimestamp::fx_ts1())),
)
.unwrap();
column_values
Expand Down

0 comments on commit 2055691

Please sign in to comment.