/
high_level_rs.rs
114 lines (103 loc) · 4.05 KB
/
high_level_rs.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
// This file is part of https://github.com/SpringQL/SpringQL which is licensed under MIT OR Apache-2.0. See file LICENSE-MIT or LICENSE-APACHE for full license details.
//! High-level Rust API to execute / register SpringQL from Rust.
use crate::{
error::{Result, SpringError},
low_level_rs::{
spring_command, spring_open, spring_pop, spring_pop_non_blocking, SpringConfig,
SpringPipeline,
},
stream_engine::{SinkRow, SpringValue, SqlValue},
};
/// Pipeline.
#[derive(Debug)]
pub struct SpringPipelineHL(SpringPipeline);
impl SpringPipelineHL {
/// Creates and open an in-process stream pipeline.
pub fn new(config: &SpringConfig) -> Result<Self> {
let low_level = spring_open(config)?;
Ok(Self(low_level))
}
/// Execute commands (DDL).
///
/// # Failure
///
/// - [SpringError::Sql](crate::error::SpringError::Sql) when:
/// - Invalid SQL syntax.
/// - Refers to undefined objects (streams, pumps, etc)
/// - Other semantic errors.
/// - [SpringError::InvalidOption](crate::error::SpringError::Sql) when:
/// - `OPTIONS` in `CREATE` statement includes invalid key or value.
pub fn command<S: AsRef<str>>(&self, sql: S) -> Result<()> {
spring_command(&self.0, sql.as_ref())
}
/// Pop a row from an in memory queue. This is a blocking function.
///
/// **Do not call this function from threads.**
/// If you need to pop from multiple in-memory queues using threads, use `spring_pop_non_blocking()`.
/// See: <https://github.com/SpringQL/SpringQL/issues/125>
///
/// # Failure
///
/// - [SpringError::Unavailable](crate::error::SpringError::Unavailable) when:
/// - queue named `queue` does not exist.
pub fn pop(&self, queue: &str) -> Result<SpringRowHL> {
spring_pop(&self.0, queue).map(|row| SpringRowHL(row.0))
}
/// Pop a row from an in memory queue. This is a non-blocking function.
///
/// # Returns
///
/// - `Ok(Some)` when at least a row is in the queue.
/// - `None` when no row is in the queue.
///
/// # Failure
///
/// - [SpringError::Unavailable](crate::error::SpringError::Unavailable) when:
/// - queue named `queue` does not exist.
pub fn pop_non_blocking(&self, queue: &str) -> Result<Option<SpringRowHL>> {
spring_pop_non_blocking(&self.0, queue).map(|opt_row| opt_row.map(|row| SpringRowHL(row.0)))
}
}
/// Row object from an in memory queue.
#[derive(Debug)]
pub struct SpringRowHL(SinkRow);
impl SpringRowHL {
/// Get a i-th column value from the row.
///
/// # Failure
///
/// - [SpringError::Sql](crate::error::SpringError::Sql) when:
/// - Column index out of range
/// - [SpringError::Null](crate::error::SpringError::Null) when:
/// - Column value is NULL
pub fn get_not_null_by_index<T>(&self, i_col: usize) -> Result<T>
where
T: SpringValue,
{
let sql_value = self.0.get_by_index(i_col)?;
match sql_value {
SqlValue::Null => Err(SpringError::Null {
stream_name: self.0.stream_name().clone(),
i_col,
}),
SqlValue::NotNull(nn_sql_value) => nn_sql_value.unpack(),
}
}
}
impl SpringConfig {
/// Configuration by TOML format string.
///
/// # Parameters
///
/// - `overwrite_config_toml`: TOML format configuration to overwrite default. See `SPRING_CONFIG_DEFAULT` in [spring_config.rs](https://github.com/SpringQL/SpringQL/tree/main/springql-core/src/api/low_level_rs/spring_config.rs) for full-set default configuration.
///
/// # Failures
///
/// - [SpringError::InvalidConfig](crate::error::SpringError::InvalidConfig) when:
/// - `overwrite_config_toml` includes invalid key and/or value.
/// - [SpringError::InvalidFormat](crate::error::SpringError::InvalidFormat) when:
/// - `overwrite_config_toml` is not valid as TOML.
pub fn from_toml(overwrite_config_toml: &str) -> Result<SpringConfig> {
SpringConfig::new(overwrite_config_toml)
}
}