Skip to content

Commit

Permalink
Merge pull request #133 from SpringQL/feat/split-from-source-stream
Browse files Browse the repository at this point in the history
feat: Split from a source stream
  • Loading branch information
laysakura committed May 11, 2022
2 parents 80352b1 + d5710e7 commit beabf96
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 18 deletions.
Expand Up @@ -48,42 +48,39 @@ impl SourceTask {
) -> Result<MetricsUpdateByTaskExecution> {
let stopwatch = WallClockStopwatch::start();

assert_eq!(
context.output_queues().len(),
1,
"source task must have 1 output queue"
);

let out_qid = context
.output_queues()
.first()
.expect("source task must have 1 output queue")
.clone();
let out_queue_metrics = self.put_row_into(out_qid, context);
let out_queue_metrics_seq = match self.collect_next(context) {
Some(row) => {
context
.output_queues()
.into_iter()
.map(|out_qid| self.put_row_into(out_qid, row.clone(), context)) // remove None metrics
.collect::<Vec<OutQueueMetricsUpdateByTask>>()
}
None => vec![],
};

let execution_time = stopwatch.stop();

let task_metrics = TaskMetricsUpdateByTask::new(context.task(), execution_time);
Ok(MetricsUpdateByTaskExecution::new(
task_metrics,
vec![],
out_queue_metrics.map_or_else(Vec::new, |o| vec![o]),
out_queue_metrics_seq,
))
}

fn put_row_into(
&self,
queue_id: QueueId,
row: Row,
context: &TaskContext,
) -> Option<OutQueueMetricsUpdateByTask> {
let row = self.collect_next(context)?;
) -> OutQueueMetricsUpdateByTask {
let repos = context.repos();

let out_queue_metrics = match queue_id {
match queue_id {
QueueId::Row(queue_id) => self.put_row_into_row_queue(row, queue_id, repos),
QueueId::Window(queue_id) => self.put_row_into_window_queue(row, queue_id, repos),
};
Some(out_queue_metrics)
}
}
fn put_row_into_row_queue(
&self,
Expand Down
99 changes: 99 additions & 0 deletions springql-core/tests/feat_pipeline.rs
@@ -0,0 +1,99 @@
// This file is part of https://github.com/SpringQL/SpringQL which is licensed under MIT OR Apache-2.0. See file LICENSE-MIT or LICENSE-APACHE for full license details.

mod test_support;

use pretty_assertions::assert_eq;
use serde_json::json;
use springql_core::low_level_rs::*;
use springql_foreign_service::source::source_input::ForeignSourceInput;
use springql_foreign_service::source::ForeignSource;
use springql_test_logger::setup_test_logger;

use crate::test_support::apply_ddls;

/// See: <https://github.com/SpringQL/SpringQL/issues/132>
#[test]
fn test_feat_split_from_source() {
setup_test_logger();

let json1 = json!({
"ts": "2020-01-01 00:00:00.000000000",
"c": 42,
});
let source_input = vec![json1];

let test_source = ForeignSource::new().unwrap();

let ddls = vec![
"
CREATE SOURCE STREAM source_1 (
ts TIMESTAMP NOT NULL ROWTIME,
c INTEGER NOT NULL
);
"
.to_string(),
"
CREATE SINK STREAM sink_1 (
ts TIMESTAMP NOT NULL ROWTIME,
c_mul_10 INTEGER NOT NULL
);
"
.to_string(),
"
CREATE SINK STREAM sink_2 (
ts TIMESTAMP NOT NULL ROWTIME,
c_add_1 INTEGER NOT NULL
);
"
.to_string(),
"
CREATE PUMP pu_mul AS
INSERT INTO sink_1 (ts, c_mul_10)
SELECT STREAM source_1.ts, source_1.c * 10
FROM source_1;
"
.to_string(),
"
CREATE PUMP pu_add AS
INSERT INTO sink_2 (ts, c_add_1)
SELECT STREAM source_1.ts, source_1.c + 1
FROM source_1;
"
.to_string(),
"
CREATE SINK WRITER q_sink_1 FOR sink_1
TYPE IN_MEMORY_QUEUE OPTIONS (
NAME 'q1'
);
"
.to_string(),
"
CREATE SINK WRITER q_sink_2 FOR sink_2
TYPE IN_MEMORY_QUEUE OPTIONS (
NAME 'q2'
);
"
.to_string(),
format!(
"
CREATE SOURCE READER tcp_1 FOR source_1
TYPE NET_CLIENT OPTIONS (
PROTOCOL 'TCP',
REMOTE_HOST '{remote_host}',
REMOTE_PORT '{remote_port}'
);
",
remote_host = test_source.host_ip(),
remote_port = test_source.port()
),
];

let pipeline = apply_ddls(&ddls, SpringConfig::default());
test_source.start(ForeignSourceInput::new_fifo_batch(source_input));

let row = pipeline.pop("q1").unwrap();
assert_eq!(row.get_not_null_by_index::<i32>(1).unwrap(), 42 * 10);

let row = pipeline.pop("q2").unwrap();
assert_eq!(row.get_not_null_by_index::<i32>(1).unwrap(), 42 + 1);
}

0 comments on commit beabf96

Please sign in to comment.