Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs: add copy of examples to lib.md #156

Merged
merged 2 commits into from May 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -17,6 +17,7 @@ All other sections are for end-users.
### For Developers

- add `cargo-make` task for runs [actionlint](https://github.com/rhysd/actionlint)
- docs: add code example to crate document([#156](https://github.com/SpringQL/SpringQL/pull/156))

## [v0.9.0]

Expand Down
186 changes: 186 additions & 0 deletions springql-core/src/lib.md
Expand Up @@ -4,3 +4,189 @@
## High-level architecture diagram

![High-level architecture diagram](https://raw.githubusercontent.com/SpringQL/SpringQL.github.io/main/static/img/springql-architecture.svg)

## Examples

### Simple pipeline example

- create pipeline instance: [SpringPipelineHL::new](crate::high_level_rs::SpringPipelineHL::new)
- execute DDLs: [SpringPipelineHL::command](crate::high_level_rs::SpringPipelineHL::command)
- fetch row from pipeline: [SpringPipelineHL::pop](crate::high_level_rs::SpringPipelineHL::pop)

```rust
use springql_core::{high_level_rs::SpringPipelineHL, low_level_rs::SpringConfig};

fn main() {
const SOURCE_PORT: u16 = 54300;

// create pipeline instans
let pipeline = SpringPipelineHL::new(&SpringConfig::default()).unwrap();

// execute DDLs for build pipeline

// source stream inputs to SpringQL pipeline
pipeline.command(
"CREATE SOURCE STREAM source_temperature_celsius (
ts TIMESTAMP NOT NULL ROWTIME,
temperature FLOAT NOT NULL
);", ).unwrap();

// sink stream output from SpringQL pipeline
pipeline.command(
"CREATE SINK STREAM sink_temperature_fahrenheit (
ts TIMESTAMP NOT NULL ROWTIME,
temperature FLOAT NOT NULL
);", ).unwrap();

// create pump for fetching rows from source
pipeline.command(
"CREATE PUMP c_to_f AS
INSERT INTO sink_temperature_fahrenheit (ts, temperature)
SELECT STREAM
source_temperature_celsius.ts,
32.0 + source_temperature_celsius.temperature * 1.8
FROM source_temperature_celsius;", ).unwrap();

// create sink writer, accessible by name "q", You can fetch row from `pipeline.pop("q")`
pipeline.command(
"CREATE SINK WRITER queue_temperature_fahrenheit FOR sink_temperature_fahrenheit
TYPE IN_MEMORY_QUEUE OPTIONS (
NAME 'q'
);", ).unwrap();

// create source reader, input come from net_server
pipeline.command(format!(
"CREATE SOURCE READER tcp_temperature_celsius FOR source_temperature_celsius
TYPE NET_SERVER OPTIONS (
PROTOCOL 'TCP',
PORT '{}'
);", SOURCE_PORT)).unwrap();

eprintln!("waiting JSON records in tcp/{} port...", SOURCE_PORT);
# return ();
// fetch row from "q" , "q" is a sink writer defined above.
while let Ok(row) = pipeline.pop("q") {
// get field value from row by field index
let ts: String = row.get_not_null_by_index(0).unwrap();
let temperature_fahrenheit: f32 = row.get_not_null_by_index(1).unwrap();
// show in STDERR
eprintln!("{}\t{}", ts, temperature_fahrenheit);
}
}
```

Run this shell script to input data.

```bash
echo '{"ts": "2022-01-01 13:00:00.000000000", "temperature": 5.3}' | nc localhost 54300
```

### Using Window and share pipeline for many threads

- To share pipeline for threads, use [std::sync::Arc](std::sync::Arc)
- non blocking fetch rom for sink [pop_non_blocking](high_level_rs::SpringPipelineHL::pop_non_blocking)

```rust
use std::{sync::Arc, thread, time::Duration};
use springql_core::{high_level_rs::SpringPipelineHL, low_level_rs::SpringConfig};

fn main() {
const SOURCE_PORT: u16 = 54300;

// Using Arc to share the reference between threads feeding sink rows.
let pipeline = Arc::new(SpringPipelineHL::new(&SpringConfig::default()).unwrap());

pipeline.command(
"CREATE SOURCE STREAM source_trade (
ts TIMESTAMP NOT NULL ROWTIME,
symbol TEXT NOT NULL,
amount INTEGER NOT NULL
);", ).unwrap();

pipeline.command(
"CREATE SINK STREAM sink_avg_all (
ts TIMESTAMP NOT NULL ROWTIME,
avg_amount FLOAT NOT NULL
);", ).unwrap();

pipeline.command(
"CREATE SINK STREAM sink_avg_by_symbol (
ts TIMESTAMP NOT NULL ROWTIME,
symbol TEXT NOT NULL,
avg_amount FLOAT NOT NULL
);", ).unwrap();

// Creates windows per 10 seconds ([:00, :10), [:10, :20), ...),
// and calculate the average amount over the rows inside each window.
//
// Second parameter `DURATION_SECS(0)` means allowed latency for late data.
//
// You can ignore here.
pipeline.command(
"CREATE PUMP avg_all AS
INSERT INTO sink_avg_all (ts, avg_amount)
SELECT STREAM
FLOOR_TIME(source_trade.ts, DURATION_SECS(10)) AS min_ts,
AVG(source_trade.amount) AS avg_amount
FROM source_trade
GROUP BY min_ts
FIXED WINDOW DURATION_SECS(10), DURATION_SECS(0);
", ).unwrap();

// Creates windows per 2 seconds ([:00, :02), [:02, :04), ...),
// and then group the rows inside each window having the same symbol.
// Calculate the average amount for each group.
pipeline.command(
"CREATE PUMP avg_by_symbol AS
INSERT INTO sink_avg_by_symbol (ts, symbol, avg_amount)
SELECT STREAM
FLOOR_TIME(source_trade.ts, DURATION_SECS(2)) AS min_ts,
source_trade.symbol AS symbol,
AVG(source_trade.amount) AS avg_amount
FROM source_trade
GROUP BY min_ts, symbol
FIXED WINDOW DURATION_SECS(2), DURATION_SECS(0);
", ).unwrap();

pipeline.command(
"CREATE SINK WRITER queue_avg_all FOR sink_avg_all
TYPE IN_MEMORY_QUEUE OPTIONS (
NAME 'q_avg_all'
);", ).unwrap();

pipeline.command(
"CREATE SINK WRITER queue_avg_by_symbol FOR sink_avg_by_symbol
TYPE IN_MEMORY_QUEUE OPTIONS (
NAME 'q_avg_by_symbol'
);", ).unwrap();

pipeline.command(format!(
"CREATE SOURCE READER tcp_trade FOR source_trade
TYPE NET_SERVER OPTIONS (
PROTOCOL 'TCP',
PORT '{}'
);", SOURCE_PORT)).unwrap();

eprintln!("waiting JSON records in tcp/{} port...", SOURCE_PORT);
# return ();
loop {
// Fetching rows from q_avg_all.
if let Some(row) = pipeline.pop_non_blocking("q_avg_all").unwrap() {
let ts: String = row.get_not_null_by_index(0).unwrap();
let avg_amount: f32 = row.get_not_null_by_index(1).unwrap();
eprintln!("[q_avg_all] {}\t{}", ts, avg_amount);
}

// Fetching rows from q_avg_by_symbol
if let Some(row) = pipeline.pop_non_blocking("q_avg_by_symbol").unwrap() {
let ts: String = row.get_not_null_by_index(0).unwrap();
let symbol: String = row.get_not_null_by_index(1).unwrap();
let avg_amount: f32 = row.get_not_null_by_index(2).unwrap();
eprintln!("[q_avg_by_symbol] {}\t{}\t{}", ts, symbol, avg_amount);
}

// Avoid busy loop
thread::sleep(Duration::from_millis(10))
}
}
```