Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: SourceRow::Raw(SchemalessRow) and Row -> StreamRow #211

Merged
merged 4 commits into from Jun 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 4 additions & 4 deletions springql-core/src/api/spring_sink_row.rs
Expand Up @@ -2,15 +2,15 @@

use crate::{
api::error::{Result, SpringError},
stream_engine::{Row, SpringValue, SqlValue},
stream_engine::{SpringValue, SqlValue, StreamRow},
};

/// Row object from an in memory sink queue.
#[derive(Debug)]
pub struct SpringSinkRow(Row);
pub struct SpringSinkRow(StreamRow);

impl SpringSinkRow {
pub(crate) fn new(row: Row) -> Self {
pub(crate) fn new(row: StreamRow) -> Self {
SpringSinkRow(row)
}

Expand All @@ -37,7 +37,7 @@ impl SpringSinkRow {
}
}

pub(crate) fn into_row(self) -> Row {
pub(crate) fn into_row(self) -> StreamRow {
self.0
}
}
8 changes: 4 additions & 4 deletions springql-core/src/connection.rs
Expand Up @@ -6,7 +6,7 @@ use crate::{
api::{error::Result, SpringConfig},
pipeline::QueueName,
sql_processor::SqlProcessor,
stream_engine::{command::Command, EngineMutex, Row},
stream_engine::{command::Command, EngineMutex, StreamRow},
};

fn setup_logger() {
Expand Down Expand Up @@ -54,7 +54,7 @@ impl Connection {
}
}

pub fn pop(&self, queue: &str) -> Result<Row> {
pub fn pop(&self, queue: &str) -> Result<StreamRow> {
const SLEEP_MSECS: u64 = 10;

let mut engine = self.engine.get()?;
Expand All @@ -70,14 +70,14 @@ impl Connection {
}
}

pub fn pop_non_blocking(&self, queue: &str) -> Result<Option<Row>> {
pub fn pop_non_blocking(&self, queue: &str) -> Result<Option<StreamRow>> {
let mut engine = self.engine.get()?;
let sink_row =
engine.pop_in_memory_queue_non_blocking(QueueName::new(queue.to_string()))?;
Ok(sink_row)
}

pub fn push(&self, queue: &str, row: Row) -> Result<()> {
pub fn push(&self, queue: &str, row: StreamRow) -> Result<()> {
let mut engine = self.engine.get()?;
engine.push_in_memory_queue(QueueName::new(queue.to_string()), row)
}
Expand Down
6 changes: 3 additions & 3 deletions springql-core/src/stream_engine.rs
Expand Up @@ -13,7 +13,7 @@ use std::sync::{Arc, Mutex, MutexGuard};
use anyhow::anyhow;

pub use crate::stream_engine::autonomous_executor::SpringValue;
pub use autonomous_executor::{NnSqlValue, Row, RowTime, SqlCompareResult, SqlValue, Tuple};
pub use autonomous_executor::{NnSqlValue, RowTime, SqlCompareResult, SqlValue, StreamRow, Tuple};

use crate::{
api::{error::Result, SpringConfig, SpringError},
Expand Down Expand Up @@ -90,7 +90,7 @@ impl StreamEngine {
pub fn pop_in_memory_queue_non_blocking(
&mut self,
queue_name: QueueName,
) -> Result<Option<Row>> {
) -> Result<Option<StreamRow>> {
let q = InMemoryQueueRepository::instance().get(&queue_name)?;
let row = q.pop_non_blocking();
Ok(row)
Expand All @@ -100,7 +100,7 @@ impl StreamEngine {
///
/// - `SpringError::Unavailable` when:
/// - queue named `queue_name` does not exist.
pub fn push_in_memory_queue(&mut self, queue_name: QueueName, row: Row) -> Result<()> {
pub fn push_in_memory_queue(&mut self, queue_name: QueueName, row: StreamRow) -> Result<()> {
let q = InMemoryQueueRepository::instance().get(&queue_name)?;
q.push(row);
Ok(())
Expand Down
4 changes: 2 additions & 2 deletions springql-core/src/stream_engine/autonomous_executor.rs
Expand Up @@ -22,8 +22,8 @@ pub mod test_support;

pub use row::SpringValue;
pub use row::{
ColumnValues, JsonObject, NnSqlValue, Row, RowTime, SourceRow, SqlCompareResult, SqlValue,
SqlValueHashKey, StreamColumns,
ColumnValues, JsonObject, NnSqlValue, RowTime, SourceRow, SqlCompareResult, SqlValue,
SqlValueHashKey, StreamColumns, StreamRow,
};
pub use task::{
NetClientSourceReader, NetServerSourceReader, SinkWriterRepository, SourceReader,
Expand Down
Expand Up @@ -2,7 +2,7 @@

use std::{collections::VecDeque, sync::Mutex};

use crate::stream_engine::autonomous_executor::row::Row;
use crate::stream_engine::autonomous_executor::row::StreamRow;

/// Input queue of row tasks.
///
Expand All @@ -11,18 +11,18 @@ use crate::stream_engine::autonomous_executor::row::Row;
/// ![Row queue](https://raw.githubusercontent.com/SpringQL/SpringQL/main/springql-core/doc/img/row-queue.drawio.svg)
#[derive(Debug, Default)]
pub struct RowQueue {
q: Mutex<VecDeque<Row>>,
q: Mutex<VecDeque<StreamRow>>,
}

impl RowQueue {
pub fn put(&self, row: Row) {
pub fn put(&self, row: StreamRow) {
self.q
.lock()
.expect("mutex in RowQueue is poisoned")
.push_back(row);
}

pub fn use_(&self) -> Option<Row> {
pub fn use_(&self) -> Option<StreamRow> {
self.q
.lock()
.expect("mutex in RowQueue is poisoned")
Expand Down
Expand Up @@ -2,7 +2,7 @@

use std::{collections::VecDeque, sync::Mutex};

use crate::stream_engine::autonomous_executor::row::Row;
use crate::stream_engine::autonomous_executor::row::StreamRow;

/// Input queue of window tasks.
///
Expand All @@ -11,18 +11,18 @@ use crate::stream_engine::autonomous_executor::row::Row;
/// ![Window queue](https://raw.githubusercontent.com/SpringQL/SpringQL/main/springql-core/doc/img/window-queue.drawio.svg)
#[derive(Debug, Default)]
pub struct WindowQueue {
waiting_q: Mutex<VecDeque<Row>>,
waiting_q: Mutex<VecDeque<StreamRow>>,
}

impl WindowQueue {
pub fn put(&self, row: Row) {
pub fn put(&self, row: StreamRow) {
self.waiting_q
.lock()
.expect("mutex in WindowQueue is poisoned")
.push_back(row);
}

pub fn dispatch(&self) -> Option<Row> {
pub fn dispatch(&self) -> Option<StreamRow> {
self.waiting_q
.lock()
.expect("mutex in WindowQueue is poisoned")
Expand Down
180 changes: 4 additions & 176 deletions springql-core/src/stream_engine/autonomous_executor/row.rs
@@ -1,5 +1,8 @@
// This file is part of https://github.com/SpringQL/SpringQL which is licensed under MIT OR Apache-2.0. See file LICENSE-MIT or LICENSE-APACHE for full license details.

mod schemaless_row;
mod stream_row;

mod column;
mod column_values;
mod foreign_row;
Expand All @@ -10,180 +13,5 @@ pub use column::StreamColumns;
pub use column_values::ColumnValues;
pub use foreign_row::{CANFrameSourceRow, JsonObject, JsonSourceRow, SourceRow, SourceRowFormat};
pub use rowtime::RowTime;
pub use stream_row::StreamRow;
pub use value::{NnSqlValue, SpringValue, SqlCompareResult, SqlValue, SqlValueHashKey};

use std::{sync::Arc, vec};

use crate::{
api::error::Result,
mem_size::MemSize,
pipeline::{ColumnName, StreamModel},
stream_engine::time::{SpringTimestamp, SystemTimestamp},
};

/// - Mandatory `rowtime()`, either from `cols` or `arrival_rowtime`.
/// - PartialEq by all columns (NULL prevents Eq).
/// - PartialOrd by `rowtime()`.
#[derive(Clone, PartialEq, Debug)]
pub struct Row {
/// None if an event time is available (i.e. ROWTIME keyword is supplied)
processing_time: Option<SpringTimestamp>,

/// Columns
cols: StreamColumns,
}

impl Row {
pub fn new(cols: StreamColumns) -> Self {
let processing_time = if cols.event_time().is_some() {
None
} else {
Some(SystemTimestamp::now())
};

Row {
processing_time,
cols,
}
}

pub fn stream_model(&self) -> &StreamModel {
self.cols.stream_model()
}

/// ROWTIME. See: <https://docs.sqlstream.com/glossary/rowtime-gl/>
///
/// ROWTIME is a:
///
/// - (default) Arrival time to a stream.
/// - Promoted from a column in a stream.
pub fn rowtime(&self) -> RowTime {
self.processing_time.map_or_else(
|| {
RowTime::EventTime(
self.cols
.event_time()
.expect("Either processing time or event time must be enabled"),
)
},
RowTime::ProcessingTime,
)
}

/// # Failure
///
/// - `SpringError::Sql` when:
/// - Column index out of range
pub fn get_by_index(&self, i_col: usize) -> Result<&SqlValue> {
self.cols.get_by_index(i_col)
}

/// Creates new row with the different stream model having the same shape.
///
/// # Failure
///
/// - `SpringError::InvalidFormat` when:
/// - `self` and `stream_model` has different shape.
pub fn apply_new_stream_model(&mut self, stream_model: Arc<StreamModel>) -> Result<()> {
self.cols.apply_new_stream_model(stream_model)
}
}

impl PartialOrd for Row {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.rowtime().cmp(&other.rowtime()))
}
}

impl IntoIterator for Row {
type Item = (ColumnName, SqlValue);
type IntoIter = vec::IntoIter<Self::Item>;

fn into_iter(self) -> Self::IntoIter {
let into_iter = self.cols.into_iter();
if let Some(rowtime) = self.processing_time {
into_iter
.chain(vec![(
ColumnName::arrival_rowtime(),
SqlValue::NotNull(NnSqlValue::Timestamp(rowtime)),
)])
.collect::<Vec<Self::Item>>()
.into_iter()
} else {
into_iter
}
}
}

impl MemSize for Row {
fn mem_size(&self) -> usize {
let arrival_rowtime_size = self.processing_time.map_or_else(|| 0, |ts| ts.mem_size());
let cols_size = self.cols.mem_size();
arrival_rowtime_size + cols_size
}
}

impl From<Row> for JsonObject {
fn from(row: Row) -> Self {
let map = row
.into_iter()
.map(|(col, val)| (col.to_string(), serde_json::Value::from(val)))
.collect::<serde_json::Map<String, serde_json::Value>>();
let v = serde_json::Value::from(map);
JsonObject::new(v)
}
}

#[cfg(test)]
mod tests {
use serde_json::json;

use crate::{stream_engine::autonomous_executor::row::foreign_row::JsonObject, time::Duration};

use super::*;

#[test]
fn test_partial_eq() {
assert_eq!(
Row::fx_city_temperature_tokyo(),
Row::fx_city_temperature_tokyo()
);
}

#[test]
fn test_partial_ne() {
assert_ne!(
Row::fx_city_temperature_tokyo(),
Row::fx_city_temperature_osaka()
);
}

#[test]
fn test_into_json() {
let row = Row::fx_city_temperature_tokyo();

let json = JsonObject::new(json!({
"ts": SpringTimestamp::fx_ts1().to_string(),
"city": "Tokyo",
"temperature": 21
}));

assert_eq!(JsonObject::from(row), json);
}

#[test]
fn test_from_row_arrival_rowtime() {
let row = Row::fx_no_promoted_rowtime();
let f_json = JsonObject::from(row);
let mut f_colvals = f_json.into_column_values().unwrap();
let f_rowtime_sql_value = f_colvals.remove(&ColumnName::arrival_rowtime()).unwrap();

if let SqlValue::NotNull(f_rowtime_nn_sql_value) = f_rowtime_sql_value {
let f_rowtime: SpringTimestamp = f_rowtime_nn_sql_value.unpack().unwrap();
assert!(SystemTimestamp::now() - Duration::seconds(1) < f_rowtime);
assert!(f_rowtime < SystemTimestamp::now() + Duration::seconds(1));
} else {
unreachable!()
};
}
}
Expand Up @@ -140,22 +140,6 @@ impl StreamColumns {
}
}
}

pub fn apply_new_stream_model(&mut self, stream_model: Arc<StreamModel>) -> Result<()> {
if self.stream_model.shape() == stream_model.shape() {
self.stream_model = stream_model;
Ok(())
} else {
Err(SpringError::InvalidFormat {
s: format!(
"original stream `{}` and new stream `{}` have different shape",
self.stream_model.name(),
stream_model.name()
),
source: anyhow!(r#"stream model shape mismatch"#),
})
}
}
}

impl IntoIterator for StreamColumns {
Expand All @@ -174,6 +158,19 @@ impl IntoIterator for StreamColumns {
}
}

impl From<StreamColumns> for ColumnValues {
fn from(stream_columns: StreamColumns) -> ColumnValues {
let mut colvals = ColumnValues::default();

for (col_name, sql_value) in stream_columns {
colvals
.insert(col_name, sql_value)
.expect("StreamColumns must not have duplicate column names");
}
colvals
}
}

#[cfg(test)]
mod tests {
use crate::stream_engine::{
Expand Down