Skip to content

Commit

Permalink
fix(test): avoid bug #219
Browse files Browse the repository at this point in the history
  • Loading branch information
laysakura committed Jun 29, 2022
1 parent e335b74 commit 771ddae
Showing 1 changed file with 14 additions and 11 deletions.
25 changes: 14 additions & 11 deletions springql/tests/feat_source_row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ use springql::{

use crate::test_support::*;

fn pipeline() -> SpringPipeline {
// Requires queue name parameters for: <https://github.com/SpringQL/SpringQL/issues/219>
fn pipeline(source_queue_name: &str, sink_queue_name: &str) -> SpringPipeline {
let ddls = vec![
"
CREATE SOURCE STREAM source_1 (
Expand All @@ -37,17 +38,19 @@ fn pipeline() -> SpringPipeline {
"
CREATE SINK WRITER q_sink_1 FOR sink_1
TYPE IN_MEMORY_QUEUE OPTIONS (
NAME 'q_sink_1'
NAME '{}'
);
",
sink_queue_name
),
format!(
"
CREATE SOURCE READER q_source_1 FOR source_1
TYPE IN_MEMORY_QUEUE OPTIONS (
NAME 'q_source_1'
NAME '{}'
);
",
source_queue_name
),
];

Expand All @@ -56,19 +59,19 @@ fn pipeline() -> SpringPipeline {

#[test]
fn test_source_row_from_json() {
let pipeline = pipeline();
let pipeline = pipeline("q_source_from_json", "q_sink_from_json");

let source_rows = vec![
SpringSourceRow::from_json(r#"{"ts": "2022-01-01 13:00:00.000000000", "n": 42}"#).unwrap(),
SpringSourceRow::from_json(r#"{"ts": "2022-01-01 14:00:00.000000000", "n": 43}"#).unwrap(),
];

for row in source_rows {
pipeline.push("q_source_1", row).unwrap();
pipeline.push("q_source_from_json", row).unwrap();
}

let sink_row1 = pipeline.pop("q_sink_1").unwrap();
let sink_row2 = pipeline.pop("q_sink_1").unwrap();
let sink_row1 = pipeline.pop("q_sink_from_json").unwrap();
let sink_row2 = pipeline.pop("q_sink_from_json").unwrap();

assert_eq!(
sink_row1.get_not_null_by_index::<String>(0).unwrap(),
Expand All @@ -85,7 +88,7 @@ fn test_source_row_from_json() {

#[test]
fn test_source_row_from_builder() -> Result<(), SpringError> {
let pipeline = pipeline();
let pipeline = pipeline("q_source_from_builder", "q_sink_from_builder");

let source_rows = vec![
SpringSourceRowBuilder::default()
Expand All @@ -105,11 +108,11 @@ fn test_source_row_from_builder() -> Result<(), SpringError> {
];

for row in source_rows {
pipeline.push("q_source_1", row).unwrap();
pipeline.push("q_source_from_builder", row).unwrap();
}

let sink_row1 = pipeline.pop("q_sink_1").unwrap();
let sink_row2 = pipeline.pop("q_sink_1").unwrap();
let sink_row1 = pipeline.pop("q_sink_from_builder").unwrap();
let sink_row2 = pipeline.pop("q_sink_from_builder").unwrap();

assert_eq!(
sink_row1.get_not_null_by_index::<String>(0).unwrap(),
Expand Down

0 comments on commit 771ddae

Please sign in to comment.