Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: High-level API to get rows and column values from in-memory queue #119

Merged
merged 8 commits into from May 8, 2022
9 changes: 9 additions & 0 deletions CHANGELOG.md
Expand Up @@ -6,6 +6,15 @@ The format is based on [Keep a Changelog][Keep a Changelog] and this project adh

## [Unreleased]

### Added

- `SpringPipelineHL::pop()` to get `SpringRowHL` from an in-memory queue ([#119](https://github.com/SpringQL/SpringQL/pull/119)).
- `SpringRowHL::get_not_null_by_index()` to get a column value from `SpringRowHL` ([#119](https://github.com/SpringQL/SpringQL/pull/119)).
- Made public ([#119](https://github.com/SpringQL/SpringQL/pull/119)):
- `SpringTimestamp`
- `SpringEventDuration`
- `SpringValue` trait

## [v0.5.0]

### Fixed
Expand Down
4 changes: 4 additions & 0 deletions springql-core/src/api.rs
Expand Up @@ -3,3 +3,7 @@
pub mod error;
pub mod high_level_rs;
pub mod low_level_rs;

pub use crate::stream_engine::time::duration::event_duration::SpringEventDuration;
pub use crate::stream_engine::time::timestamp::SpringTimestamp;
pub use crate::stream_engine::SpringValue;
10 changes: 10 additions & 0 deletions springql-core/src/api/error.rs
Expand Up @@ -6,6 +6,8 @@ pub mod foreign_info;

use thiserror::Error;

use crate::pipeline::name::StreamName;

use self::foreign_info::ForeignInfo;

/// Result type
Expand Down Expand Up @@ -60,4 +62,12 @@ pub enum SpringError {

#[error("SQL error")]
Sql(anyhow::Error),

/// Occurs only when a value is fetched from a SpringRow.
#[error("unexpectedly got NULL")]
Null {
stream_name: StreamName,
/// Column index
i_col: usize,
},
}
44 changes: 42 additions & 2 deletions springql-core/src/api/high_level_rs.rs
Expand Up @@ -3,8 +3,9 @@
//! High-level Rust API to execute / register SpringQL from Rust.

use crate::{
error::Result,
low_level_rs::{spring_command, spring_open, SpringConfig, SpringPipeline},
error::{Result, SpringError},
low_level_rs::{spring_command, spring_open, spring_pop, SpringConfig, SpringPipeline},
stream_engine::{SinkRow, SpringValue, SqlValue},
};

/// Pipeline.
Expand All @@ -31,6 +32,45 @@ impl SpringPipelineHL {
pub fn command<S: AsRef<str>>(&self, sql: S) -> Result<()> {
spring_command(&self.0, sql.as_ref())
}

/// Pop a row from an in memory queue. This is a blocking function.
///
/// # Failure
///
/// - [SpringError::Unavailable](crate::error::SpringError::Unavailable) when:
/// - queue named `queue` does not exist.
pub fn pop(&self, queue: &str) -> Result<SpringRowHL> {
spring_pop(&self.0, queue).map(|row| SpringRowHL(row.0))
}
}

/// Row object from an in memory queue.
#[derive(Debug)]
pub struct SpringRowHL(SinkRow);

impl SpringRowHL {
/// Get a i-th column value from the row.
///
/// # Failure
///
/// - [SpringError::Sql](crate::error::SpringError::Sql) when:
/// - Column index out of range
/// - [SpringError::Null](crate::error::SpringError::Null) when:
/// - Column value is NULL
pub fn get_not_null_by_index<T>(&self, i_col: usize) -> Result<T>
where
T: SpringValue, // TODO use this function in spring_column_*()
{
let sql_value = self.0.get_by_index(i_col)?;

match sql_value {
SqlValue::Null => Err(SpringError::Null {
stream_name: self.0.stream_name().clone(),
i_col,
}),
SqlValue::NotNull(nn_sql_value) => nn_sql_value.unpack(),
}
}
}

impl SpringConfig {
Expand Down
2 changes: 1 addition & 1 deletion springql-core/src/api/low_level_rs.rs
Expand Up @@ -44,7 +44,7 @@ pub struct SpringPipeline {

/// Row object from an in memory queue.
#[derive(Debug)]
pub struct SpringRow(SinkRow);
pub struct SpringRow(pub(in crate::api) SinkRow);

impl From<SinkRow> for SpringRow {
fn from(sink_row: SinkRow) -> Self {
Expand Down
4 changes: 2 additions & 2 deletions springql-core/src/expr_resolver.rs
Expand Up @@ -187,7 +187,7 @@ impl ExprResolver {

#[cfg(test)]
mod tests {
use crate::{expression::ValueExpr, stream_engine::time::timestamp::Timestamp};
use crate::{expression::ValueExpr, stream_engine::time::timestamp::SpringTimestamp};

use super::*;

Expand Down Expand Up @@ -228,7 +228,7 @@ mod tests {
ValueExpr::factory_integer(3),
));

let empty_tuple = Tuple::new(Timestamp::fx_ts1(), vec![]);
let empty_tuple = Tuple::new(SpringTimestamp::fx_ts1(), vec![]);

assert_eq!(
resolver
Expand Down
6 changes: 3 additions & 3 deletions springql-core/src/expression.rs
Expand Up @@ -21,7 +21,7 @@ use crate::{
pump_model::window_operation_parameter::aggregate::AggregateFunctionParameter,
},
stream_engine::{
time::duration::{event_duration::EventDuration, SpringDuration},
time::duration::{event_duration::SpringEventDuration, SpringDuration},
NnSqlValue, SqlCompareResult, SqlValue, Tuple,
},
};
Expand Down Expand Up @@ -261,7 +261,7 @@ impl ValueExprPh2 {
let duration_value = duration_millis.eval()?;
let duration_millis = duration_value.to_i64()?;
if duration_millis >= 0 {
let duration = EventDuration::from_millis(duration_millis as u64);
let duration = SpringEventDuration::from_millis(duration_millis as u64);
Ok(SqlValue::NotNull(NnSqlValue::Duration(duration)))
} else {
Err(SpringError::Sql(anyhow!(
Expand All @@ -274,7 +274,7 @@ impl ValueExprPh2 {
let duration_value = duration_secs.eval()?;
let duration_secs = duration_value.to_i64()?;
if duration_secs >= 0 {
let duration = EventDuration::from_secs(duration_secs as u64);
let duration = SpringEventDuration::from_secs(duration_secs as u64);
Ok(SqlValue::NotNull(NnSqlValue::Duration(duration)))
} else {
Err(SpringError::Sql(anyhow!(
Expand Down
18 changes: 9 additions & 9 deletions springql-core/src/pipeline/pump_model/window_parameter.rs
@@ -1,6 +1,6 @@
// This file is part of https://github.com/SpringQL/SpringQL which is licensed under MIT OR Apache-2.0. See file LICENSE-MIT or LICENSE-APACHE for full license details.

use crate::stream_engine::time::duration::event_duration::EventDuration;
use crate::stream_engine::time::duration::event_duration::SpringEventDuration;

/// Window parameters
#[derive(Clone, Eq, PartialEq, Debug)]
Expand All @@ -18,9 +18,9 @@ pub(crate) enum WindowParameter {
/// :00 :05 :10 :15 :20
/// ```
TimedSlidingWindow {
length: EventDuration,
period: EventDuration,
allowed_delay: EventDuration,
length: SpringEventDuration,
period: SpringEventDuration,
allowed_delay: SpringEventDuration,
},

/// Time-based fixed window
Expand All @@ -36,27 +36,27 @@ pub(crate) enum WindowParameter {
/// :00 :05 :10 :15 :20
/// ```
TimedFixedWindow {
length: EventDuration,
allowed_delay: EventDuration,
length: SpringEventDuration,
allowed_delay: SpringEventDuration,
},
}

impl WindowParameter {
pub(crate) fn length(&self) -> EventDuration {
pub(crate) fn length(&self) -> SpringEventDuration {
match self {
WindowParameter::TimedSlidingWindow { length, .. } => *length,
WindowParameter::TimedFixedWindow { length, .. } => *length,
}
}

pub(crate) fn period(&self) -> EventDuration {
pub(crate) fn period(&self) -> SpringEventDuration {
match self {
WindowParameter::TimedSlidingWindow { period, .. } => *period,
WindowParameter::TimedFixedWindow { length, .. } => *length,
}
}

pub(crate) fn allowed_delay(&self) -> EventDuration {
pub(crate) fn allowed_delay(&self) -> SpringEventDuration {
match self {
WindowParameter::TimedSlidingWindow { allowed_delay, .. } => *allowed_delay,
WindowParameter::TimedFixedWindow { allowed_delay, .. } => *allowed_delay,
Expand Down
Expand Up @@ -34,7 +34,7 @@ use crate::sql_processor::sql_parser::syntax::{
ColumnConstraintSyntax, OptionSyntax, SelectStreamSyntax,
};
use crate::stream_engine::command::insert_plan::InsertPlan;
use crate::stream_engine::time::duration::event_duration::EventDuration;
use crate::stream_engine::time::duration::event_duration::SpringEventDuration;
use crate::stream_engine::time::duration::SpringDuration;
use crate::stream_engine::{NnSqlValue, SqlValue};
use anyhow::{anyhow, Context};
Expand Down Expand Up @@ -211,10 +211,10 @@ impl PestParserImpl {

let event_duration = match duration_function {
DurationFunction::Millis => {
Ok(EventDuration::from_millis(integer_constant.to_i64()? as u64))
Ok(SpringEventDuration::from_millis(integer_constant.to_i64()? as u64))
}
DurationFunction::Secs => {
Ok(EventDuration::from_secs(integer_constant.to_i64()? as u64))
Ok(SpringEventDuration::from_secs(integer_constant.to_i64()? as u64))
}
}?;

Expand Down
7 changes: 4 additions & 3 deletions springql-core/src/stream_engine.rs
Expand Up @@ -26,6 +26,8 @@
//!
//! ![Communication between entities](https://raw.githubusercontent.com/SpringQL/SpringQL.github.io/main/static/img/stream-engine-architecture-communication.svg)

pub use autonomous_executor::SpringValue;

pub(crate) mod command;
pub(crate) mod time;

Expand All @@ -34,9 +36,8 @@ mod in_memory_queue_repository;
mod sql_executor;

pub(crate) use autonomous_executor::{
row::value::{
sql_convertible::SpringValue,
sql_value::{nn_sql_value::NnSqlValue, sql_compare_result::SqlCompareResult, SqlValue},
row::value::sql_value::{
nn_sql_value::NnSqlValue, sql_compare_result::SqlCompareResult, SqlValue,
},
task::tuple::Tuple,
SinkRow,
Expand Down
3 changes: 3 additions & 0 deletions springql-core/src/stream_engine/autonomous_executor.rs
@@ -1,5 +1,7 @@
// This file is part of https://github.com/SpringQL/SpringQL which is licensed under MIT OR Apache-2.0. See file LICENSE-MIT or LICENSE-APACHE for full license details.

pub use row::SpringValue;

pub(crate) mod row;
pub(crate) mod task;

Expand Down Expand Up @@ -160,6 +162,7 @@ impl AutonomousExecutor {
| SpringError::ThreadPoisoned(_) => log::error!("{:?}", e),

SpringError::InvalidConfig { .. } => unreachable!("must be handled on startup"),
SpringError::Null { .. } => unreachable!("must be handled on startup"),
}
}
}
8 changes: 5 additions & 3 deletions springql-core/src/stream_engine/autonomous_executor/row.rs
@@ -1,5 +1,7 @@
// This file is part of https://github.com/SpringQL/SpringQL which is licensed under MIT OR Apache-2.0. See file LICENSE-MIT or LICENSE-APACHE for full license details.

pub use value::SpringValue;

pub(crate) mod value;

pub(in crate::stream_engine::autonomous_executor) mod column;
Expand All @@ -17,14 +19,14 @@ use crate::pipeline::name::ColumnName;
use crate::pipeline::stream_model::StreamModel;
use crate::stream_engine::autonomous_executor::row::value::sql_value::nn_sql_value::NnSqlValue;
use crate::stream_engine::time::timestamp::system_timestamp::SystemTimestamp;
use crate::stream_engine::time::timestamp::Timestamp;
use crate::stream_engine::time::timestamp::SpringTimestamp;

/// - Mandatory `rowtime()`, either from `cols` or `arrival_rowtime`.
/// - PartialEq by all columns (NULL prevents Eq).
/// - PartialOrd by timestamp.
#[derive(Clone, PartialEq, Debug)]
pub(crate) struct Row {
arrival_rowtime: Option<Timestamp>,
arrival_rowtime: Option<SpringTimestamp>,

/// Columns
cols: StreamColumns,
Expand Down Expand Up @@ -54,7 +56,7 @@ impl Row {
///
/// - (default) Arrival time to a stream.
/// - Promoted from a column in a stream.
pub(in crate::stream_engine::autonomous_executor) fn rowtime(&self) -> Timestamp {
pub(in crate::stream_engine::autonomous_executor) fn rowtime(&self) -> SpringTimestamp {
self.arrival_rowtime.unwrap_or_else(|| {
self.cols
.promoted_rowtime()
Expand Down
Expand Up @@ -9,7 +9,7 @@ use crate::{
pipeline::{relation::column::column_definition::ColumnDefinition, stream_model::StreamModel},
stream_engine::{
autonomous_executor::row::{column_values::ColumnValues, value::sql_value::SqlValue},
time::timestamp::Timestamp,
time::timestamp::SpringTimestamp,
},
};
use std::{sync::Arc, vec};
Expand Down Expand Up @@ -67,7 +67,7 @@ impl StreamColumns {

pub(in crate::stream_engine::autonomous_executor) fn promoted_rowtime(
&self,
) -> Option<Timestamp> {
) -> Option<SpringTimestamp> {
let rowtime_col = self.stream_model.shape().promoted_rowtime()?;
let rowtime_sql_value = self
.get_by_column_name(rowtime_col)
Expand Down Expand Up @@ -173,7 +173,7 @@ impl IntoIterator for StreamColumns {
mod tests {
use crate::stream_engine::{
autonomous_executor::row::value::sql_value::nn_sql_value::NnSqlValue,
time::timestamp::Timestamp,
time::timestamp::SpringTimestamp,
};

use super::*;
Expand All @@ -184,7 +184,7 @@ mod tests {
column_values
.insert(
ColumnName::fx_timestamp(),
SqlValue::NotNull(NnSqlValue::Timestamp(Timestamp::fx_ts1())),
SqlValue::NotNull(NnSqlValue::Timestamp(SpringTimestamp::fx_ts1())),
)
.unwrap();
column_values
Expand All @@ -210,7 +210,7 @@ mod tests {
column_values
.insert(
ColumnName::new("timestamp".to_string()),
SqlValue::NotNull(NnSqlValue::Timestamp(Timestamp::fx_ts1())),
SqlValue::NotNull(NnSqlValue::Timestamp(SpringTimestamp::fx_ts1())),
)
.unwrap();
column_values
Expand All @@ -234,7 +234,7 @@ mod tests {
column_values
.insert(
ColumnName::new("timestamp".to_string()),
SqlValue::NotNull(NnSqlValue::Timestamp(Timestamp::fx_ts1())),
SqlValue::NotNull(NnSqlValue::Timestamp(SpringTimestamp::fx_ts1())),
)
.unwrap();
column_values
Expand Down Expand Up @@ -263,7 +263,7 @@ mod tests {
column_values
.insert(
ColumnName::new("timestamp".to_string()),
SqlValue::NotNull(NnSqlValue::Timestamp(Timestamp::fx_ts1())),
SqlValue::NotNull(NnSqlValue::Timestamp(SpringTimestamp::fx_ts1())),
)
.unwrap();
column_values
Expand Down