diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 4e38d917..2a766597 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -61,7 +61,7 @@ jobs: - make: task: test os: ubuntu-latest - rust: 1.63.0 + rust: 1.65.0 - make: task: test os: macos-latest diff --git a/CHANGELOG.md b/CHANGELOG.md index 3ecefae2..87b6a14a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,9 @@ All other sections are for end-users. - (Breaking Change) Remove `TimedStream` from foreign-service ([#250](https://github.com/SpringQL/SpringQL/pull/250) + +- Bump up Minimum Support Rust Version (MSRV) to 1.65 ([#262](https://github.com/SpringQL/SpringQL/pull/262)) + ## [v0.18.1] - 2022-10-07 ### For developers diff --git a/Cargo.toml b/Cargo.toml index 8c9f85c8..d3f21271 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,6 +3,7 @@ members = [ "springql", "springql-core", + "springql-config", "foreign-service", "test-logger", ] diff --git a/springql-config/Cargo.toml b/springql-config/Cargo.toml new file mode 100644 index 00000000..f78cbab9 --- /dev/null +++ b/springql-config/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "springql-config" +version = "0.18.1" + +authors = ["Sho Nakatani "] +license = "MIT OR Apache-2.0" + +edition = "2021" +rust-version = "1.65.0" + +categories = ["embedded"] +description = "SpringQL-config: configuration for SpringQL" +documentation = "https://springql.github.io/" +keywords = ["springql", "stream-processing"] # up to 5 keywords, each keyword should have <= 20 chars +readme = "../README.md" +repository = "https://github.com/SpringQL/SpringQL" + +[dependencies] +serde = { version = "1.0", features = ["derive"], default-features = false, optional = true } +config = { version = "0.13", features = ["toml"], default-features = false, optional = true } +thiserror = "1.0" + +[features] +default=[] +toml=["serde", "config"] diff --git a/springql-config/src/lib.rs b/springql-config/src/lib.rs new file mode 100644 index 00000000..eef819b4 --- /dev/null +++ b/springql-config/src/lib.rs @@ -0,0 +1,218 @@ +// This file is part of https://github.com/SpringQL/SpringQL which is licensed under MIT OR Apache-2.0. See file LICENSE-MIT or LICENSE-APACHE for full license details. + +#[cfg(feature = "toml")] +use serde::Deserialize; + +pub type Result = std::result::Result; +type BaseError = Box; + +/// Error type +#[allow(missing_docs)] +#[derive(Debug, thiserror::Error)] +pub enum SpringConfigError { + #[error("invalid config")] + InvalidConfig { source: BaseError }, + + #[error(r#"invalid format ("{s}")"#)] + InvalidFormat { s: String, source: BaseError }, +} + +/// Default configuration. +/// +/// Default key-values are overwritten by `overwrite_config_toml` parameter in `SpringConfig::new()`. +#[cfg(feature = "toml")] +const SPRING_CONFIG_DEFAULT: &str = r#" +[worker] +# Number of generic worker threads. Generic worker threads deal with internal and sink tasks. +# Setting this to > 1 may improve throughput but lead to out-of-order stream processing. +n_generic_worker_threads = 1 + +# Number of source worker threads. Source worker threads collect rows from foreign source. +# Too many number may may cause row fraud in runtime. +# Setting this to > 1 may improve throughput but lead to out-of-order stream processing. +n_source_worker_threads = 1 + +# How long a generic worker or a source worker sleeps if it does not receive any row from the upstream. +# Small number will improve the initial row's E2E latency but increase the CPU usage. +sleep_msec_no_row = 100 + +[memory] +# How much memory is allowed to be used in SpringQL streaming runtime. +upper_limit_bytes = 10_000_000 + +# Percentage over `upper_limit_bytes` to transit from Moderate state to Severe. +# In Severe state, internal scheduler is changed to exhibit memory-resilience. +moderate_to_severe_percent = 60 + +# Percentage over `upper_limit_bytes` to transit from Severe state to Critical. +# In Critical state, all intermediate rows are purged to release memory. +severe_to_critical_percent = 95 + +critical_to_severe_percent = 80 +severe_to_moderate_percent = 40 + +# Interval for MemoryStateMachineWorker to publish TransitPerformanceMetricsSummary event. +memory_state_transition_interval_msec = 10 + +# Interval for PerformanceMonitorWorker to publish ReportMetricsSummary event. +performance_metrics_summary_report_interval_msec = 10 + +[web_console] +# Whether to enable POST API request to web console. +enable_report_post = false + +report_interval_msec = 3_000 + +host = "127.0.0.1" +port = 8050 + +timeout_msec = 3_000 + +[source_reader] +net_connect_timeout_msec = 1_000 +net_read_timeout_msec = 100 + +can_read_timeout_msec = 100 + +[sink_writer] +net_connect_timeout_msec = 1_000 +net_write_timeout_msec = 100 + +http_connect_timeout_msec = 1_000 +http_timeout_msec = 100 +"#; + +/// Top-level config. +#[allow(missing_docs)] +#[derive(Clone, Eq, PartialEq, Debug)] +#[cfg_attr(feature = "toml", derive(Deserialize))] +pub struct SpringConfig { + pub worker: SpringWorkerConfig, + pub memory: SpringMemoryConfig, + pub web_console: SpringWebConsoleConfig, + pub source_reader: SpringSourceReaderConfig, + pub sink_writer: SpringSinkWriterConfig, +} + +#[cfg(feature = "toml")] +impl Default for SpringConfig { + fn default() -> Self { + Self::new("").expect("default configuration must be valid") + } +} + +impl SpringConfig { + /// # Failures + /// + /// - [SpringError::InvalidConfig](crate::api::error::SpringError::InvalidConfig) when: + /// - `overwrite_config_toml` includes invalid key and/or value. + /// - [SpringError::InvalidFormat](crate::api::error::SpringError::InvalidFormat) when: + /// - `overwrite_config_toml` is not valid as TOML. + #[cfg(feature = "toml")] + pub fn new(overwrite_config_toml: &str) -> Result { + let default_conf = config::Config::builder() + .add_source(config::File::from_str( + SPRING_CONFIG_DEFAULT, + config::FileFormat::Toml, + )) + .build() + .expect("SPRING_CONFIG_DEFAULT is in wrong format"); + + let c = config::Config::builder() + .add_source(default_conf) + .add_source(config::File::from_str( + overwrite_config_toml, + config::FileFormat::Toml, + )) + .build() + .map_err(|e| SpringConfigError::InvalidFormat { + s: overwrite_config_toml.to_string(), + source: e.into(), + })?; + + c.try_deserialize() + .map_err(|e| SpringConfigError::InvalidConfig { source: e.into() }) + } + + /// Configuration by TOML format string. + /// + /// # Parameters + /// + /// - `overwrite_config_toml`: TOML format configuration to overwrite default. See `SPRING_CONFIG_DEFAULT` in [spring_config.rs](https://github.com/SpringQL/SpringQL/tree/main/springql-core/src/api/spring_config.rs) for full-set default configuration. + /// + /// # Failures + /// + /// - [SpringError::InvalidConfig](crate::api::error::SpringError::InvalidConfig) when: + /// - `overwrite_config_toml` includes invalid key and/or value. + /// - [SpringError::InvalidFormat](crate::api::error::SpringError::InvalidFormat) when: + /// - `overwrite_config_toml` is not valid as TOML. + #[cfg(feature = "toml")] + pub fn from_toml(overwrite_config_toml: &str) -> Result { + SpringConfig::new(overwrite_config_toml) + } +} + +/// Config related to worker threads. +#[allow(missing_docs)] +#[derive(Copy, Clone, Eq, PartialEq, Debug)] +#[cfg_attr(feature = "toml", derive(Deserialize))] +pub struct SpringWorkerConfig { + pub n_generic_worker_threads: u16, + pub n_source_worker_threads: u16, + pub sleep_msec_no_row: u64, +} + +/// Config related to memory management. +#[allow(missing_docs)] +#[derive(Copy, Clone, Eq, PartialEq, Debug)] +#[cfg_attr(feature = "toml", derive(Deserialize))] +pub struct SpringMemoryConfig { + pub upper_limit_bytes: u64, + + pub moderate_to_severe_percent: u8, + pub severe_to_critical_percent: u8, + + pub critical_to_severe_percent: u8, + pub severe_to_moderate_percent: u8, + + pub memory_state_transition_interval_msec: u32, + pub performance_metrics_summary_report_interval_msec: u32, +} + +/// Config related to web console. +#[allow(missing_docs)] +#[derive(Clone, Eq, PartialEq, Debug)] +#[cfg_attr(feature = "toml", derive(Deserialize))] +pub struct SpringWebConsoleConfig { + pub enable_report_post: bool, + + pub report_interval_msec: u32, + + pub host: String, + pub port: u16, + + pub timeout_msec: u32, +} + +/// Config related to source reader +#[allow(missing_docs)] +#[derive(Copy, Clone, Eq, PartialEq, Debug)] +#[cfg_attr(feature = "toml", derive(Deserialize))] +pub struct SpringSourceReaderConfig { + pub net_connect_timeout_msec: u32, + pub net_read_timeout_msec: u32, + + pub can_read_timeout_msec: u32, +} + +/// Config related to sink writer. +#[allow(missing_docs)] +#[derive(Copy, Clone, Eq, PartialEq, Debug)] +#[cfg_attr(feature = "toml", derive(Deserialize))] +pub struct SpringSinkWriterConfig { + pub net_connect_timeout_msec: u32, + pub net_write_timeout_msec: u32, + + pub http_timeout_msec: u32, + pub http_connect_timeout_msec: u32, +} diff --git a/springql-core/Cargo.toml b/springql-core/Cargo.toml index 276614b1..fa1ddfb6 100644 --- a/springql-core/Cargo.toml +++ b/springql-core/Cargo.toml @@ -6,7 +6,7 @@ authors = ["Sho Nakatani "] license = "MIT OR Apache-2.0" edition = "2021" -rust-version = "1.63.0" +rust-version = "1.65.0" categories = ["embedded"] description = "SpringQL: Open-source stream processor for IoT devices and in-vehicle computers" @@ -19,11 +19,11 @@ repository = "https://github.com/SpringQL/SpringQL" stub_web_console=[] [dependencies] +springql-config = {version="0.18.0", features= ["default"], path="../springql-config"} anyhow = "1.0" thiserror = "1.0" serde = {version = "1.0", features = ["derive"], default-features = false} serde_json = "1.0" -config = {version = "0.13", features = ["toml"], default-features = false} derive-new = "0.5" ordered-float = "3.0" fastrand = "1.5" @@ -38,10 +38,10 @@ reqwest = {version = "0.11", features = ["json", "blocking"], default-features = once_cell = "1.8" parking_lot = "0.12" time = {version="0.3.9", features = ["formatting", "parsing", "macros"]} - socketcan = "1.7" [dev-dependencies] +springql-config = {version="0.18.0", features= ["toml"], path="../springql-config"} springql-foreign-service = {path = "../foreign-service"} springql-test-logger = {path = "../test-logger"} pretty_assertions = "1.0" diff --git a/springql-core/src/api/spring_config.rs b/springql-core/src/api/spring_config.rs index 77b5da17..ec64d8df 100644 --- a/springql-core/src/api/spring_config.rs +++ b/springql-core/src/api/spring_config.rs @@ -1,195 +1,6 @@ // This file is part of https://github.com/SpringQL/SpringQL which is licensed under MIT OR Apache-2.0. See file LICENSE-MIT or LICENSE-APACHE for full license details. -use serde::Deserialize; - -use crate::api::error::{Result, SpringError}; - -/// Default configuration. -/// -/// Default key-values are overwritten by `overwrite_config_toml` parameter in `SpringConfig::new()`. -const SPRING_CONFIG_DEFAULT: &str = r#" -[worker] -# Number of generic worker threads. Generic worker threads deal with internal and sink tasks. -# Setting this to > 1 may improve throughput but lead to out-of-order stream processing. -n_generic_worker_threads = 1 - -# Number of source worker threads. Source worker threads collect rows from foreign source. -# Too many number may may cause row fraud in runtime. -# Setting this to > 1 may improve throughput but lead to out-of-order stream processing. -n_source_worker_threads = 1 - -# How long a generic worker or a source worker sleeps if it does not receive any row from the upstream. -# Small number will improve the initial row's E2E latency but increase the CPU usage. -sleep_msec_no_row = 100 - -[memory] -# How much memory is allowed to be used in SpringQL streaming runtime. -upper_limit_bytes = 10_000_000 - -# Percentage over `upper_limit_bytes` to transit from Moderate state to Severe. -# In Severe state, internal scheduler is changed to exhibit memory-resilience. -moderate_to_severe_percent = 60 - -# Percentage over `upper_limit_bytes` to transit from Severe state to Critical. -# In Critical state, all intermediate rows are purged to release memory. -severe_to_critical_percent = 95 - -critical_to_severe_percent = 80 -severe_to_moderate_percent = 40 - -# Interval for MemoryStateMachineWorker to publish TransitPerformanceMetricsSummary event. -memory_state_transition_interval_msec = 10 - -# Interval for PerformanceMonitorWorker to publish ReportMetricsSummary event. -performance_metrics_summary_report_interval_msec = 10 - -[web_console] -# Whether to enable POST API request to web console. -enable_report_post = false - -report_interval_msec = 3_000 - -host = "127.0.0.1" -port = 8050 - -timeout_msec = 3_000 - -[source_reader] -net_connect_timeout_msec = 1_000 -net_read_timeout_msec = 100 - -can_read_timeout_msec = 100 - -[sink_writer] -net_connect_timeout_msec = 1_000 -net_write_timeout_msec = 100 - -http_connect_timeout_msec = 1_000 -http_timeout_msec = 100 -"#; - -/// Top-level config. -#[allow(missing_docs)] -#[derive(Clone, Eq, PartialEq, Debug, Deserialize)] -pub struct SpringConfig { - pub worker: SpringWorkerConfig, - pub memory: SpringMemoryConfig, - pub web_console: SpringWebConsoleConfig, - pub source_reader: SpringSourceReaderConfig, - pub sink_writer: SpringSinkWriterConfig, -} - -impl Default for SpringConfig { - fn default() -> Self { - Self::new("").expect("default configuration must be valid") - } -} - -impl SpringConfig { - /// # Failures - /// - /// - [SpringError::InvalidConfig](crate::api::error::SpringError::InvalidConfig) when: - /// - `overwrite_config_toml` includes invalid key and/or value. - /// - [SpringError::InvalidFormat](crate::api::error::SpringError::InvalidFormat) when: - /// - `overwrite_config_toml` is not valid as TOML. - pub fn new(overwrite_config_toml: &str) -> Result { - let default_conf = config::Config::builder() - .add_source(config::File::from_str( - SPRING_CONFIG_DEFAULT, - config::FileFormat::Toml, - )) - .build() - .expect("SPRING_CONFIG_DEFAULT is in wrong format"); - - let c = config::Config::builder() - .add_source(default_conf) - .add_source(config::File::from_str( - overwrite_config_toml, - config::FileFormat::Toml, - )) - .build() - .map_err(|e| SpringError::InvalidFormat { - s: overwrite_config_toml.to_string(), - source: e.into(), - })?; - - c.try_deserialize() - .map_err(|e| SpringError::InvalidConfig { source: e.into() }) - } - - /// Configuration by TOML format string. - /// - /// # Parameters - /// - /// - `overwrite_config_toml`: TOML format configuration to overwrite default. See `SPRING_CONFIG_DEFAULT` in [spring_config.rs](https://github.com/SpringQL/SpringQL/tree/main/springql-core/src/api/spring_config.rs) for full-set default configuration. - /// - /// # Failures - /// - /// - [SpringError::InvalidConfig](crate::api::error::SpringError::InvalidConfig) when: - /// - `overwrite_config_toml` includes invalid key and/or value. - /// - [SpringError::InvalidFormat](crate::api::error::SpringError::InvalidFormat) when: - /// - `overwrite_config_toml` is not valid as TOML. - pub fn from_toml(overwrite_config_toml: &str) -> Result { - SpringConfig::new(overwrite_config_toml) - } -} - -/// Config related to worker threads. -#[allow(missing_docs)] -#[derive(Copy, Clone, Eq, PartialEq, Debug, Deserialize)] -pub struct SpringWorkerConfig { - pub n_generic_worker_threads: u16, - pub n_source_worker_threads: u16, - pub sleep_msec_no_row: u64, -} - -/// Config related to memory management. -#[allow(missing_docs)] -#[derive(Copy, Clone, Eq, PartialEq, Debug, Deserialize)] -pub struct SpringMemoryConfig { - pub upper_limit_bytes: u64, - - pub moderate_to_severe_percent: u8, - pub severe_to_critical_percent: u8, - - pub critical_to_severe_percent: u8, - pub severe_to_moderate_percent: u8, - - pub memory_state_transition_interval_msec: u32, - pub performance_metrics_summary_report_interval_msec: u32, -} - -/// Config related to web console. -#[allow(missing_docs)] -#[derive(Clone, Eq, PartialEq, Debug, Deserialize)] -pub struct SpringWebConsoleConfig { - pub enable_report_post: bool, - - pub report_interval_msec: u32, - - pub host: String, - pub port: u16, - - pub timeout_msec: u32, -} - -/// Config related to source reader -#[allow(missing_docs)] -#[derive(Copy, Clone, Eq, PartialEq, Debug, Deserialize)] -pub struct SpringSourceReaderConfig { - pub net_connect_timeout_msec: u32, - pub net_read_timeout_msec: u32, - - pub can_read_timeout_msec: u32, -} - -/// Config related to sink writer. -#[allow(missing_docs)] -#[derive(Copy, Clone, Eq, PartialEq, Debug, Deserialize)] -pub struct SpringSinkWriterConfig { - pub net_connect_timeout_msec: u32, - pub net_write_timeout_msec: u32, - - pub http_timeout_msec: u32, - pub http_connect_timeout_msec: u32, -} +pub use springql_config::{ + SpringConfig, SpringMemoryConfig, SpringSinkWriterConfig, SpringSourceReaderConfig, + SpringWebConsoleConfig, SpringWorkerConfig, +}; diff --git a/springql-core/src/pipeline/test_support/fixture.rs b/springql-core/src/pipeline/test_support/fixture.rs index 521ae038..ae5f0228 100644 --- a/springql-core/src/pipeline/test_support/fixture.rs +++ b/springql-core/src/pipeline/test_support/fixture.rs @@ -17,24 +17,16 @@ use crate::{ }, }; -impl SpringConfig { - pub fn fx_default() -> Self { - Self::new("").unwrap() - } +pub fn default_root_config() -> SpringConfig { + SpringConfig::default() } -impl SpringSourceReaderConfig { - pub fn fx_default() -> Self { - let c = SpringConfig::fx_default(); - c.source_reader - } +pub fn default_source_reader_config() -> SpringSourceReaderConfig { + default_root_config().source_reader } -impl SpringSinkWriterConfig { - pub fn fx_default() -> Self { - let c = SpringConfig::fx_default(); - c.sink_writer - } +pub fn default_sink_writer_config() -> SpringSinkWriterConfig { + default_root_config().sink_writer } impl Pipeline { diff --git a/springql-core/src/stream_engine/autonomous_executor/task/sink_task/sink_writer/net.rs b/springql-core/src/stream_engine/autonomous_executor/task/sink_task/sink_writer/net.rs index 4be0257d..c8cc6fd3 100644 --- a/springql-core/src/stream_engine/autonomous_executor/task/sink_task/sink_writer/net.rs +++ b/springql-core/src/stream_engine/autonomous_executor/task/sink_task/sink_writer/net.rs @@ -97,7 +97,10 @@ mod tests { use springql_foreign_service::sink::ForeignSink; use super::*; - use crate::{pipeline::OptionsBuilder, stream_engine::autonomous_executor::row::JsonObject}; + use crate::{ + pipeline::{test_support::fixture::default_sink_writer_config, OptionsBuilder}, + stream_engine::autonomous_executor::row::JsonObject, + }; #[test] fn test_sink_writer_tcp() { @@ -110,7 +113,7 @@ mod tests { .build(); let mut sink_writer = - NetSinkWriter::start(&options, &SpringSinkWriterConfig::fx_default()).unwrap(); + NetSinkWriter::start(&options, &default_sink_writer_config()).unwrap(); sink_writer .send_row(SchemalessRow::fx_city_temperature_tokyo()) diff --git a/springql-core/src/stream_engine/autonomous_executor/task/source_task/source_reader/net_client.rs b/springql-core/src/stream_engine/autonomous_executor/task/source_task/source_reader/net_client.rs index cdb67510..3882fd0a 100644 --- a/springql-core/src/stream_engine/autonomous_executor/task/source_task/source_reader/net_client.rs +++ b/springql-core/src/stream_engine/autonomous_executor/task/source_task/source_reader/net_client.rs @@ -99,6 +99,7 @@ mod tests { use springql_foreign_service::source::ForeignSourceInput; use super::*; + use crate::pipeline::test_support::fixture::default_source_reader_config; use crate::pipeline::OptionsBuilder; use crate::stream_engine::autonomous_executor::row::JsonObject; use crate::stream_engine::autonomous_executor::row::JsonSourceRow; @@ -123,8 +124,7 @@ mod tests { serde_json::Value::from(j1.clone()), ])); - let mut subtask = - NetClientSourceReader::start(&options, &SpringSourceReaderConfig::fx_default())?; + let mut subtask = NetClientSourceReader::start(&options, &default_source_reader_config())?; assert_eq!( subtask.next_row()?, diff --git a/springql-core/src/stream_engine/autonomous_executor/task/source_task/source_reader/net_server.rs b/springql-core/src/stream_engine/autonomous_executor/task/source_task/source_reader/net_server.rs index ba99c9bb..1502c4ae 100644 --- a/springql-core/src/stream_engine/autonomous_executor/task/source_task/source_reader/net_server.rs +++ b/springql-core/src/stream_engine/autonomous_executor/task/source_task/source_reader/net_server.rs @@ -130,8 +130,10 @@ impl NetServerSourceReader { mod tests { use super::*; use crate::{ - api::SpringSinkWriterConfig, - pipeline::OptionsBuilder, + pipeline::{ + test_support::fixture::{default_sink_writer_config, default_source_reader_config}, + OptionsBuilder, + }, stream_engine::autonomous_executor::{ task::sink_task::{NetSinkWriter, SinkWriter}, SchemalessRow, @@ -148,7 +150,7 @@ mod tests { .add("REMOTE_HOST", "127.0.0.1") .add("REMOTE_PORT", remote_port.to_string()) .build(); - NetSinkWriter::start(&options, &SpringSinkWriterConfig::fx_default()).unwrap() + NetSinkWriter::start(&options, &default_sink_writer_config()).unwrap() } #[test] @@ -158,8 +160,7 @@ mod tests { .add("PROTOCOL", "TCP") .add("PORT", port.to_string()) .build(); - let mut reader = - NetServerSourceReader::start(&options, &SpringSourceReaderConfig::fx_default())?; + let mut reader = NetServerSourceReader::start(&options, &default_source_reader_config())?; let mut writer = tcp_writer(port); diff --git a/springql-core/src/stream_engine/autonomous_executor/task_graph.rs b/springql-core/src/stream_engine/autonomous_executor/task_graph.rs index 0f390c17..aab7202a 100644 --- a/springql-core/src/stream_engine/autonomous_executor/task_graph.rs +++ b/springql-core/src/stream_engine/autonomous_executor/task_graph.rs @@ -78,7 +78,6 @@ impl TaskGraph { let i = self.find_node(task_id); self.g .edges_directed(i, petgraph::EdgeDirection::Incoming) - .into_iter() .map(|e| &e.weight().queue_id) .cloned() .collect() @@ -87,7 +86,6 @@ impl TaskGraph { let i = self.find_node(task_id); self.g .edges_directed(i, petgraph::EdgeDirection::Outgoing) - .into_iter() .map(|e| &e.weight().queue_id) .cloned() .collect() @@ -101,7 +99,6 @@ impl TaskGraph { let i = self.find_node(task_id); self.g .edges_directed(i, petgraph::EdgeDirection::Incoming) - .into_iter() .find_map(|e| { let queue_id_with_upstream = e.weight(); (&queue_id_with_upstream.upstream == upstream) diff --git a/springql-core/src/stream_engine/autonomous_executor/test_support/factory.rs b/springql-core/src/stream_engine/autonomous_executor/test_support/factory.rs index 45713dc2..76cabaed 100644 --- a/springql-core/src/stream_engine/autonomous_executor/test_support/factory.rs +++ b/springql-core/src/stream_engine/autonomous_executor/test_support/factory.rs @@ -5,8 +5,10 @@ use std::sync::Arc; use springql_foreign_service::source::{ForeignSource, ForeignSourceInput}; use crate::{ - api::SpringSourceReaderConfig, - pipeline::{ColumnName, OptionsBuilder, StreamModel, StreamName, StreamShape}, + pipeline::{ + test_support::fixture::default_source_reader_config, ColumnName, OptionsBuilder, + StreamModel, StreamName, StreamShape, + }, stream_engine::{ autonomous_executor::{ row::{ColumnValues, NnSqlValue, SqlValue, StreamColumns, StreamRow}, @@ -47,7 +49,7 @@ impl NetClientSourceReader { .build(); source.start(input); - NetClientSourceReader::start(&options, &SpringSourceReaderConfig::fx_default()).unwrap() + NetClientSourceReader::start(&options, &default_source_reader_config()).unwrap() } } diff --git a/springql/Cargo.toml b/springql/Cargo.toml index 7e4b1b5c..ae33d0e3 100644 --- a/springql/Cargo.toml +++ b/springql/Cargo.toml @@ -15,7 +15,8 @@ readme = "../README.md" repository = "https://github.com/SpringQL/SpringQL" [dependencies] -springql-core = { version = "0.18.1", path="../springql-core"} +springql-config = {version="0.18.0", features= ["toml"], path="../springql-config"} +springql-core = { version = "0.18.0", path="../springql-core"} [dev-dependencies] springql-foreign-service = {path = "../foreign-service"} diff --git a/springql/tests/e2e_high_level_rs.rs b/springql/tests/e2e_high_level_rs.rs index ded11991..32808d7e 100644 --- a/springql/tests/e2e_high_level_rs.rs +++ b/springql/tests/e2e_high_level_rs.rs @@ -240,10 +240,7 @@ fn test_e2e_pop_from_in_memory_queue() { let pipeline = apply_ddls(&ddls, SpringConfig::default()); test_source.start(ForeignSourceInput::new_fifo_batch( - (0..trade_times) - .into_iter() - .map(|_| json_oracle.clone()) - .collect(), + (0..trade_times).map(|_| json_oracle.clone()).collect(), )); for _ in 0..trade_times { @@ -318,10 +315,7 @@ fn test_e2e_pop_non_blocking_from_in_memory_queue() { let pipeline = apply_ddls(&ddls, SpringConfig::default()); test_source.start(ForeignSourceInput::new_fifo_batch( - (0..trade_times) - .into_iter() - .map(|_| json_oracle.clone()) - .collect(), + (0..trade_times).map(|_| json_oracle.clone()).collect(), )); for _ in 0..trade_times {