-
Notifications
You must be signed in to change notification settings - Fork 7
/
demo_pipeline.rs
282 lines (259 loc) · 8.38 KB
/
demo_pipeline.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
// Copyright (c) 2022 TOYOTA MOTOR CORPORATION. Licensed under MIT OR Apache-2.0.
use std::{
io::copy,
path::Path,
process::{Child, Command},
time::Duration,
};
use springql_core::{high_level_rs::SpringPipelineHL, low_level_rs::SpringConfig};
use springql_foreign_service::sink::ForeignSink;
use tempfile::NamedTempFile;
const LOCALHOST: &str = "127.0.0.1";
const ENGINE_REPLAYMAN_PORT: u16 = 19870;
const WHEEL_SPEED_REPLAYMAN_PORT: u16 = 19871;
const INITIAL_TIMESTAMP: &str = "2020-10-21T10:37:56.000+09:00";
fn setup_logger() {
let _ = env_logger::builder().is_test(false).try_init();
}
fn http_download_file_to_tempdir(url: &str) -> NamedTempFile {
log::info!("Downloading file from {}", url);
let fname = url.split('/').last().unwrap();
let mut tempfile = tempfile::Builder::new()
.suffix(&format!("-{}", fname))
.tempfile()
.unwrap();
let response = reqwest::blocking::get(url).unwrap();
let content = response.text().unwrap();
copy(&mut content.as_bytes(), &mut tempfile).unwrap();
log::info!("Finish downloading into {}", tempfile.path().display());
tempfile
}
fn spawn_engine_replayman(engine_dataset: &Path) -> Child {
// prerequisite: `cargo install replayman`
Command::new("replayman")
.arg("--timed-by")
.arg("Time")
.arg("--initial-timestamp")
.arg(INITIAL_TIMESTAMP)
.arg("--dest-tcp")
.arg(format!("{}:{}", LOCALHOST, ENGINE_REPLAYMAN_PORT))
.arg(engine_dataset)
.spawn()
.expect("failed to execute replayman command as child process")
}
fn spawn_wheel_speed_replayman(wheel_speed_dataset: &Path) -> Child {
// prerequisite: `cargo install replayman`
Command::new("replayman")
.arg("--timed-by")
.arg("Time")
.arg("--initial-timestamp")
.arg(INITIAL_TIMESTAMP)
.arg("--dest-tcp")
.arg(format!("{}:{}", LOCALHOST, WHEEL_SPEED_REPLAYMAN_PORT))
.arg(wheel_speed_dataset)
.spawn()
.expect("failed to execute replayman command as child process")
}
fn main() {
setup_logger();
let engine_dataset = http_download_file_to_tempdir("https://raw.githubusercontent.com/SpringQL/dataset/main/pseudo-in-vehicle/Engine-30sec.tsv");
let wheel_speed_dataset = http_download_file_to_tempdir("https://raw.githubusercontent.com/SpringQL/dataset/main/pseudo-in-vehicle/VehicleControl-30sec.tsv");
// start servers to listen to sinks' output
let sink_engine_wheel_speed = ForeignSink::start().unwrap();
let sink_vehicle_speed = ForeignSink::start().unwrap();
let pipeline = SpringPipelineHL::new(&SpringConfig::default()).unwrap();
pipeline
.command(
"
CREATE SOURCE STREAM source_engine (
Time TIMESTAMP NOT NULL ROWTIME,
Rpm FLOAT NOT NULL
);
",
)
.unwrap();
pipeline
.command(
"
CREATE SOURCE STREAM source_wheel_speed (
Time TIMESTAMP NOT NULL ROWTIME,
SpeedSignalFreq INTEGER NOT NULL,
Speed FLOAT NOT NULL
);
",
)
.unwrap();
pipeline
.command(
"
CREATE STREAM sampled_engine (
ts TIMESTAMP NOT NULL ROWTIME,
rpm FLOAT NOT NULL
);
",
)
.unwrap();
pipeline
.command(
"
CREATE STREAM sampled_wheel_speed (
ts TIMESTAMP NOT NULL ROWTIME,
speed FLOAT NOT NULL
);
",
)
.unwrap();
pipeline
.command(
"
CREATE SINK STREAM sink_vehicle_speed (
ts TIMESTAMP NOT NULL ROWTIME,
speed FLOAT
);
",
)
.unwrap();
pipeline
.command(
"
CREATE SINK STREAM sink_engine_wheel_speed (
ts TIMESTAMP NOT NULL ROWTIME,
rpm FLOAT NOT NULL,
speed FLOAT
);
",
)
.unwrap();
pipeline
.command(
"
CREATE PUMP pu_sample_engine AS
INSERT INTO sampled_engine (ts, rpm)
SELECT STREAM
FLOOR_TIME(source_engine.Time, DURATION_MILLIS(100)) AS sampled_ts,
AVG(source_engine.Rpm) AS avg_rpm
FROM source_engine
GROUP BY sampled_ts
FIXED WINDOW DURATION_MILLIS(100), DURATION_MILLIS(100);
",
)
.unwrap();
pipeline
.command(
"
CREATE PUMP pu_sample_wheel_speed AS
INSERT INTO sampled_wheel_speed (ts, speed)
SELECT STREAM
FLOOR_TIME(source_wheel_speed.Time, DURATION_MILLIS(100)) AS sampled_ts,
AVG(source_wheel_speed.Speed) AS avg_speed
FROM source_wheel_speed
GROUP BY sampled_ts
FIXED WINDOW DURATION_MILLIS(100), DURATION_MILLIS(100);
",
)
.unwrap();
pipeline
.command(
"
CREATE PUMP pu_join AS
INSERT INTO sink_engine_wheel_speed (ts, rpm, speed)
SELECT STREAM
sampled_engine.ts,
sampled_engine.rpm,
sampled_wheel_speed.speed
FROM sampled_engine
LEFT OUTER JOIN sampled_wheel_speed
ON sampled_engine.ts = sampled_wheel_speed.ts
FIXED WINDOW DURATION_MILLIS(1000), DURATION_MILLIS(500);
",
)
.unwrap();
pipeline
.command(
"
CREATE PUMP pu_phy_conversion AS
INSERT INTO sink_vehicle_speed (ts, speed)
SELECT STREAM
sampled_wheel_speed.ts,
sampled_wheel_speed.speed * 0.3
FROM sampled_wheel_speed;
",
)
.unwrap();
pipeline
.command(format!(
"
CREATE SINK WRITER tcp_sink_engine_wheel_speed FOR sink_engine_wheel_speed
TYPE NET_CLIENT OPTIONS (
PROTOCOL 'TCP',
REMOTE_HOST '{remote_host}',
REMOTE_PORT '{remote_port}'
);
",
remote_host = sink_engine_wheel_speed.host_ip(),
remote_port = sink_engine_wheel_speed.port(),
))
.unwrap();
pipeline
.command(format!(
"
CREATE SINK WRITER tcp_sink_vehicle_speed FOR sink_vehicle_speed
TYPE NET_CLIENT OPTIONS (
PROTOCOL 'TCP',
REMOTE_HOST '{remote_host}',
REMOTE_PORT '{remote_port}'
);
",
remote_host = sink_vehicle_speed.host_ip(),
remote_port = sink_vehicle_speed.port()
))
.unwrap();
// start external source streams
let mut engine_replayman = spawn_engine_replayman(engine_dataset.path());
let mut wheel_speed_replayman = spawn_wheel_speed_replayman(wheel_speed_dataset.path());
// start source readers
pipeline
.command(format!(
"
CREATE SOURCE READER tcp_source_engine FOR source_engine
TYPE NET_SERVER OPTIONS (
PROTOCOL 'TCP',
PORT '{port}'
);
",
port = ENGINE_REPLAYMAN_PORT
))
.unwrap();
pipeline
.command(format!(
"
CREATE SOURCE READER tcp_source_wheel_speed FOR source_wheel_speed
TYPE NET_SERVER OPTIONS (
PROTOCOL 'TCP',
PORT '{port}'
);
",
port = WHEEL_SPEED_REPLAYMAN_PORT
))
.unwrap();
// print sinks' outputs while waiting for replayman processes to finish
let mut ecode_engine = None;
let mut ecode_wheel_speed = None;
while ecode_engine.is_none() || ecode_wheel_speed.is_none() {
if let Some(v) = sink_engine_wheel_speed.try_receive(Duration::from_millis(100)) {
println!("sink_engine_wheel_speed\t{}", v);
}
if let Some(v) = sink_vehicle_speed.try_receive(Duration::from_millis(100)) {
println!("sink_vehicle_speed\t{:?}", v);
}
ecode_engine = engine_replayman
.try_wait()
.expect("failed to wait on engine replayman");
ecode_wheel_speed = wheel_speed_replayman
.try_wait()
.expect("failed to wait on vehicle control replayman");
}
// check exit statuses of replayman processes
assert!(ecode_engine.unwrap().success());
assert!(ecode_wheel_speed.unwrap().success());
}