Skip to content

Commit

Permalink
Merge pull request #156 from SpringQL/add_code_example_to_docs
Browse files Browse the repository at this point in the history
docs: add copy of examples to lib.md
  • Loading branch information
laysakura committed May 30, 2022
2 parents 8b2cbf8 + 5a24bd2 commit e1d63b7
Show file tree
Hide file tree
Showing 2 changed files with 187 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -17,6 +17,7 @@ All other sections are for end-users.
### For Developers

- add `cargo-make` task for runs [actionlint](https://github.com/rhysd/actionlint)
- docs: add code example to crate document([#156](https://github.com/SpringQL/SpringQL/pull/156))

## [v0.9.0]

Expand Down
186 changes: 186 additions & 0 deletions springql-core/src/lib.md
Expand Up @@ -4,3 +4,189 @@
## High-level architecture diagram

![High-level architecture diagram](https://raw.githubusercontent.com/SpringQL/SpringQL.github.io/main/static/img/springql-architecture.svg)

## Examples

### Simple pipeline example

- create pipeline instance: [SpringPipelineHL::new](crate::high_level_rs::SpringPipelineHL::new)
- execute DDLs: [SpringPipelineHL::command](crate::high_level_rs::SpringPipelineHL::command)
- fetch row from pipeline: [SpringPipelineHL::pop](crate::high_level_rs::SpringPipelineHL::pop)

```rust
use springql_core::{high_level_rs::SpringPipelineHL, low_level_rs::SpringConfig};

fn main() {
const SOURCE_PORT: u16 = 54300;

// create pipeline instans
let pipeline = SpringPipelineHL::new(&SpringConfig::default()).unwrap();

// execute DDLs for build pipeline

// source stream inputs to SpringQL pipeline
pipeline.command(
"CREATE SOURCE STREAM source_temperature_celsius (
ts TIMESTAMP NOT NULL ROWTIME,
temperature FLOAT NOT NULL
);", ).unwrap();

// sink stream output from SpringQL pipeline
pipeline.command(
"CREATE SINK STREAM sink_temperature_fahrenheit (
ts TIMESTAMP NOT NULL ROWTIME,
temperature FLOAT NOT NULL
);", ).unwrap();

// create pump for fetching rows from source
pipeline.command(
"CREATE PUMP c_to_f AS
INSERT INTO sink_temperature_fahrenheit (ts, temperature)
SELECT STREAM
source_temperature_celsius.ts,
32.0 + source_temperature_celsius.temperature * 1.8
FROM source_temperature_celsius;", ).unwrap();

// create sink writer, accessible by name "q", You can fetch row from `pipeline.pop("q")`
pipeline.command(
"CREATE SINK WRITER queue_temperature_fahrenheit FOR sink_temperature_fahrenheit
TYPE IN_MEMORY_QUEUE OPTIONS (
NAME 'q'
);", ).unwrap();

// create source reader, input come from net_server
pipeline.command(format!(
"CREATE SOURCE READER tcp_temperature_celsius FOR source_temperature_celsius
TYPE NET_SERVER OPTIONS (
PROTOCOL 'TCP',
PORT '{}'
);", SOURCE_PORT)).unwrap();

eprintln!("waiting JSON records in tcp/{} port...", SOURCE_PORT);
# return ();
// fetch row from "q" , "q" is a sink writer defined above.
while let Ok(row) = pipeline.pop("q") {
// get field value from row by field index
let ts: String = row.get_not_null_by_index(0).unwrap();
let temperature_fahrenheit: f32 = row.get_not_null_by_index(1).unwrap();
// show in STDERR
eprintln!("{}\t{}", ts, temperature_fahrenheit);
}
}
```

Run this shell script to input data.

```bash
echo '{"ts": "2022-01-01 13:00:00.000000000", "temperature": 5.3}' | nc localhost 54300
```

### Using Window and share pipeline for many threads

- To share pipeline for threads, use [std::sync::Arc](std::sync::Arc)
- non blocking fetch rom for sink [pop_non_blocking](high_level_rs::SpringPipelineHL::pop_non_blocking)

```rust
use std::{sync::Arc, thread, time::Duration};
use springql_core::{high_level_rs::SpringPipelineHL, low_level_rs::SpringConfig};

fn main() {
const SOURCE_PORT: u16 = 54300;

// Using Arc to share the reference between threads feeding sink rows.
let pipeline = Arc::new(SpringPipelineHL::new(&SpringConfig::default()).unwrap());

pipeline.command(
"CREATE SOURCE STREAM source_trade (
ts TIMESTAMP NOT NULL ROWTIME,
symbol TEXT NOT NULL,
amount INTEGER NOT NULL
);", ).unwrap();

pipeline.command(
"CREATE SINK STREAM sink_avg_all (
ts TIMESTAMP NOT NULL ROWTIME,
avg_amount FLOAT NOT NULL
);", ).unwrap();

pipeline.command(
"CREATE SINK STREAM sink_avg_by_symbol (
ts TIMESTAMP NOT NULL ROWTIME,
symbol TEXT NOT NULL,
avg_amount FLOAT NOT NULL
);", ).unwrap();

// Creates windows per 10 seconds ([:00, :10), [:10, :20), ...),
// and calculate the average amount over the rows inside each window.
//
// Second parameter `DURATION_SECS(0)` means allowed latency for late data.
//
// You can ignore here.
pipeline.command(
"CREATE PUMP avg_all AS
INSERT INTO sink_avg_all (ts, avg_amount)
SELECT STREAM
FLOOR_TIME(source_trade.ts, DURATION_SECS(10)) AS min_ts,
AVG(source_trade.amount) AS avg_amount
FROM source_trade
GROUP BY min_ts
FIXED WINDOW DURATION_SECS(10), DURATION_SECS(0);
", ).unwrap();

// Creates windows per 2 seconds ([:00, :02), [:02, :04), ...),
// and then group the rows inside each window having the same symbol.
// Calculate the average amount for each group.
pipeline.command(
"CREATE PUMP avg_by_symbol AS
INSERT INTO sink_avg_by_symbol (ts, symbol, avg_amount)
SELECT STREAM
FLOOR_TIME(source_trade.ts, DURATION_SECS(2)) AS min_ts,
source_trade.symbol AS symbol,
AVG(source_trade.amount) AS avg_amount
FROM source_trade
GROUP BY min_ts, symbol
FIXED WINDOW DURATION_SECS(2), DURATION_SECS(0);
", ).unwrap();

pipeline.command(
"CREATE SINK WRITER queue_avg_all FOR sink_avg_all
TYPE IN_MEMORY_QUEUE OPTIONS (
NAME 'q_avg_all'
);", ).unwrap();

pipeline.command(
"CREATE SINK WRITER queue_avg_by_symbol FOR sink_avg_by_symbol
TYPE IN_MEMORY_QUEUE OPTIONS (
NAME 'q_avg_by_symbol'
);", ).unwrap();

pipeline.command(format!(
"CREATE SOURCE READER tcp_trade FOR source_trade
TYPE NET_SERVER OPTIONS (
PROTOCOL 'TCP',
PORT '{}'
);", SOURCE_PORT)).unwrap();

eprintln!("waiting JSON records in tcp/{} port...", SOURCE_PORT);
# return ();
loop {
// Fetching rows from q_avg_all.
if let Some(row) = pipeline.pop_non_blocking("q_avg_all").unwrap() {
let ts: String = row.get_not_null_by_index(0).unwrap();
let avg_amount: f32 = row.get_not_null_by_index(1).unwrap();
eprintln!("[q_avg_all] {}\t{}", ts, avg_amount);
}

// Fetching rows from q_avg_by_symbol
if let Some(row) = pipeline.pop_non_blocking("q_avg_by_symbol").unwrap() {
let ts: String = row.get_not_null_by_index(0).unwrap();
let symbol: String = row.get_not_null_by_index(1).unwrap();
let avg_amount: f32 = row.get_not_null_by_index(2).unwrap();
eprintln!("[q_avg_by_symbol] {}\t{}\t{}", ts, symbol, avg_amount);
}

// Avoid busy loop
thread::sleep(Duration::from_millis(10))
}
}
```

0 comments on commit e1d63b7

Please sign in to comment.