/
query_planner.rs
186 lines (166 loc) · 5.57 KB
/
query_planner.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
// This file is part of https://github.com/SpringQL/SpringQL which is licensed under MIT OR Apache-2.0. See file LICENSE-MIT or LICENSE-APACHE for full license details.
//! Translates `SelectSyntax` into `ExprResolver` and `QueryPlan`.
//!
//! `ExprResolver` is to resolve aliases in select_list. Each operation only has `ExprLabel` returned from `ExprResolver`.
//!
//! Query plan is represented by binary tree of operator nodes; most nodes have only a child but JOIN node has two children.
//!
//! Query plan firstly collect `Row`s from streams and they are converted into simplified `Tuple`s who have `Map<ColumnReference, SqlValue>` structure.
//! A tuple may be firstly dropped by single stream selection.
//! Then tuples may be joined.
//! And finally tuples may be dropped again by multi stream selection.
//!
//! ```text
//! (root)
//!
//! ^
//! | Tuple (0~)
//! |
//! multi stream selection
//! ^
//! | Tuple (0~)
//! |
//! join (window) <--- Option<Tuple> --- ....
//! ^
//! | Tuple (0/1)
//! |
//! single stream selection
//! ^
//! | Tuple
//! |
//! row to tuple
//! ^
//! | Row
//! |
//! collect
//!
//! (leaf)
//! ```
//!
//! Ascendant operators of multi stream selection operator do not modify tuples' structure but just reference them to resolve ColumnReference in expressions.
//!
//! ```text
//! (root)
//!
//! projection
//! ^
//! |
//! group aggregation (window)
//!
//! Tuple
//!
//! (leaf)
//! ```
//!
//! Projection operator emits a `SqlValues` by evaluating its expressions (via `ExprLabel`) using `Tuple` for column references.
mod select_syntax_analyzer;
use crate::{
error::Result,
expr_resolver::{expr_label::ExprLabel, ExprResolver},
pipeline::{
pump_model::{
window_operation_parameter::{
aggregate::GroupAggregateParameter, WindowOperationParameter,
},
window_parameter::WindowParameter,
},
Pipeline,
},
stream_engine::command::query_plan::{
query_plan_operation::{GroupAggregateWindowOp, JoinOp, LowerOps, ProjectionOp, UpperOps},
QueryPlan,
},
};
use self::select_syntax_analyzer::SelectSyntaxAnalyzer;
use super::sql_parser::syntax::{GroupingElementSyntax, SelectStreamSyntax};
#[derive(Debug)]
pub(crate) struct QueryPlanner {
analyzer: SelectSyntaxAnalyzer,
}
impl QueryPlanner {
pub(in crate::sql_processor) fn new(select_stream_syntax: SelectStreamSyntax) -> Self {
Self {
analyzer: SelectSyntaxAnalyzer::new(select_stream_syntax),
}
}
pub(crate) fn plan(self, pipeline: &Pipeline) -> Result<QueryPlan> {
let (mut expr_resolver, labels_select_list) =
ExprResolver::new(self.analyzer.select_list().to_vec());
let projection = ProjectionOp {
expr_labels: labels_select_list,
};
let group_aggr_window =
self.create_group_aggr_window_op(&projection, &mut expr_resolver)?;
let upper_ops = UpperOps {
projection,
group_aggr_window,
};
let join = self.create_join_op(&mut expr_resolver, pipeline)?;
let lower_ops = LowerOps { join };
Ok(QueryPlan::new(upper_ops, lower_ops, expr_resolver))
}
fn create_group_aggr_window_op(
&self,
projection_op: &ProjectionOp,
expr_resolver: &mut ExprResolver,
) -> Result<Option<GroupAggregateWindowOp>> {
let window_param = self.create_window_param();
let group_aggr_param = self.create_group_aggr_param(expr_resolver, projection_op)?;
match (window_param, group_aggr_param) {
(Some(window_param), Some(group_aggr_param)) => Ok(Some(GroupAggregateWindowOp {
window_param,
op_param: WindowOperationParameter::GroupAggregation(group_aggr_param),
})),
_ => Ok(None),
}
}
fn create_window_param(&self) -> Option<WindowParameter> {
self.analyzer.window_parameter()
}
fn create_group_aggr_param(
&self,
expr_resolver: &mut ExprResolver,
projection_op: &ProjectionOp,
) -> Result<Option<GroupAggregateParameter>> {
let opt_grouping_elem = self.analyzer.grouping_element();
let aggr_labels = projection_op
.expr_labels
.iter()
.filter_map(|label| {
if let ExprLabel::Aggr(aggr_label) = label {
Some(*aggr_label)
} else {
None
}
})
.collect::<Vec<_>>();
match (opt_grouping_elem, aggr_labels.len()) {
(Some(grouping_elem), 1) => {
let aggr_label = aggr_labels.get(0).expect("len checked");
let aggr_func = expr_resolver.resolve_aggr_expr(*aggr_label).func;
let group_by_label = match grouping_elem {
GroupingElementSyntax::ValueExpr(expr) => {
expr_resolver.register_value_expr(expr)
}
GroupingElementSyntax::ValueAlias(alias) => {
expr_resolver.resolve_value_alias(alias)?
}
};
Ok(Some(GroupAggregateParameter::new(
aggr_func,
*aggr_label,
group_by_label,
)))
}
(None, 0) => Ok(None),
_ => unimplemented!(),
}
}
fn create_join_op(
&self,
expr_resolver: &mut ExprResolver,
pipeline: &Pipeline,
) -> Result<JoinOp> {
self.analyzer.join_op(expr_resolver, pipeline)
}
}