Skip to content

Commit

Permalink
Merge pull request #121 from SpringQL/feat/add-spring_column-apis
Browse files Browse the repository at this point in the history
feat: add spring_column_{bool, f32, i16, i64}() APIs
  • Loading branch information
laysakura committed May 9, 2022
2 parents 38f6ff1 + cc82c46 commit 47bee92
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 1 deletion.
10 changes: 10 additions & 0 deletions CHANGELOG.md
Expand Up @@ -6,6 +6,16 @@ The format is based on [Keep a Changelog][Keep a Changelog] and this project adh

## [Unreleased]

## [v0.7.0]

### Added

- Low-level APIs ([#121](https://github.com/SpringQL/SpringQL/pull/121)).
- `spring_column_bool`
- `spring_column_f32`
- `spring_column_i16`
- `spring_column_i64`

## [v0.6.0]

### Added
Expand Down
40 changes: 39 additions & 1 deletion springql-core/src/api/low_level_rs.rs
Expand Up @@ -106,11 +106,13 @@ pub fn spring_pop(pipeline: &SpringPipeline, queue: &str) -> Result<SpringRow> {
/// - [SpringError::Unavailable](crate::error::SpringError::Unavailable) when:
/// - `i_col` already fetched.
/// - `i_col` out of range.
/// - [SpringError::Null](crate::error::SpringError::Null) when:
/// - Column value is NULL
pub fn spring_column_i32(row: &SpringRow, i_col: usize) -> Result<i32> {
spring_column_not_null(row, i_col)
}

/// Get an text column.
/// Get a text column.
///
/// # Failure
///
Expand All @@ -119,6 +121,42 @@ pub fn spring_column_text(row: &SpringRow, i_col: usize) -> Result<String> {
spring_column_not_null(row, i_col)
}

/// Get a boolean column.
///
/// # Failure
///
/// Same as [spring_column_i32()](spring_column_i32)
pub fn spring_column_bool(row: &SpringRow, i_col: usize) -> Result<bool> {
spring_column_not_null(row, i_col)
}

/// Get a float column.
///
/// # Failure
///
/// Same as [spring_column_i32()](spring_column_i32)
pub fn spring_column_f32(row: &SpringRow, i_col: usize) -> Result<f32> {
spring_column_not_null(row, i_col)
}

/// Get a 2-byte integer column.
///
/// # Failure
///
/// Same as [spring_column_i32()](spring_column_i32)
pub fn spring_column_i16(row: &SpringRow, i_col: usize) -> Result<i16> {
spring_column_not_null(row, i_col)
}

/// Get a 8-byte integer column.
///
/// # Failure
///
/// Same as [spring_column_i32()](spring_column_i32)
pub fn spring_column_i64(row: &SpringRow, i_col: usize) -> Result<i64> {
spring_column_not_null(row, i_col)
}

fn spring_column_not_null<T: SpringValue>(row: &SpringRow, i_col: usize) -> Result<T> {
let v = row.0.get_by_index(i_col)?;
if let SqlValue::NotNull(v) = v {
Expand Down
83 changes: 83 additions & 0 deletions springql-core/tests/e2e_low_level_rs.rs
Expand Up @@ -251,3 +251,86 @@ fn test_e2e_pop_from_in_memory_queue() {
assert_eq!(spring_column_i32(&row, 1).unwrap(), amount);
}
}

#[test]
fn test_e2e_spring_column_apis() {
setup_test_logger();

let queue_name = "queue_many_types";
let c_timestamp = "2021-11-04 23:02:52.123456789";
let c_integer = i32::MAX;
let c_text = "HELLO";
let c_boolean = true;
let c_float = f32::MAX;

let json_many_types = json!({
"c_timestamp": c_timestamp,
"c_integer": c_integer,
"c_text": c_text,
"c_boolean": c_boolean,
"c_float": c_float,
});

let test_source = ForeignSource::new().unwrap();

let ddls = vec![
"
CREATE SOURCE STREAM source_many_types (
c_timestamp TIMESTAMP NOT NULL ROWTIME,
c_integer INTEGER NOT NULL,
c_text TEXT NOT NULL,
c_boolean BOOLEAN NOT NULL,
c_float FLOAT NOT NULL
);
"
.to_string(),
"
CREATE SINK STREAM sink_many_types (
c_timestamp TIMESTAMP NOT NULL ROWTIME,
c_integer INTEGER NOT NULL,
c_text TEXT NOT NULL,
c_boolean BOOLEAN NOT NULL,
c_float FLOAT NOT NULL
);
"
.to_string(),
"
CREATE PUMP pu_through AS
INSERT INTO sink_many_types (c_timestamp, c_integer, c_text, c_boolean, c_float)
SELECT STREAM source_many_types.c_timestamp, source_many_types.c_integer, source_many_types.c_text, source_many_types.c_boolean, source_many_types.c_float
FROM source_many_types;
"
.to_string(),
format!(
"
CREATE SINK WRITER queue_sink_many_types FOR sink_many_types
TYPE IN_MEMORY_QUEUE OPTIONS (
NAME '{queue_name}'
);
",
queue_name = queue_name,
),
format!(
"
CREATE SOURCE READER tcp_many_types FOR source_many_types
TYPE NET_CLIENT OPTIONS (
PROTOCOL 'TCP',
REMOTE_HOST '{remote_host}',
REMOTE_PORT '{remote_port}'
);
",
remote_host = test_source.host_ip(),
remote_port = test_source.port()
),
];

let pipeline = apply_ddls_low_level(&ddls, spring_config_default());
test_source.start(ForeignSourceInput::new_fifo_batch(vec![json_many_types]));

let row = spring_pop(&pipeline, queue_name).unwrap();
assert_eq!(spring_column_text(&row, 0).unwrap(), c_timestamp); // TIMESTAMP type is usually converted into string in low-level API.
assert_eq!(spring_column_i32(&row, 1).unwrap(), c_integer);
assert_eq!(spring_column_text(&row, 2).unwrap(), c_text);
assert_eq!(spring_column_bool(&row, 3).unwrap(), c_boolean);
assert_eq!(spring_column_f32(&row, 4).unwrap(), c_float);
}

0 comments on commit 47bee92

Please sign in to comment.