Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: SELECT with aggregate preserves select_list order #128

Merged
merged 7 commits into from May 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
110 changes: 56 additions & 54 deletions springql-core/src/expr_resolver.rs
Expand Up @@ -10,7 +10,7 @@ use crate::stream_engine::{SqlValue, Tuple};
use anyhow::anyhow;
use std::collections::HashMap;

use self::expr_label::{AggrExprLabel, ExprLabelGenerator, ValueExprLabel};
use self::expr_label::{AggrExprLabel, ExprLabel, ExprLabelGenerator, ValueExprLabel};

/// ExprResolver is to:
///
Expand All @@ -32,39 +32,35 @@ pub(crate) struct ExprResolver {
impl ExprResolver {
/// # Returns
///
/// `(instance, value expr labels in select_list, aggr expr labels in select_list)
pub(crate) fn new(
select_list: Vec<SelectFieldSyntax>,
) -> (Self, Vec<ValueExprLabel>, Vec<AggrExprLabel>) {
/// `(instance, value/aggr expr labels in select_list)
pub(crate) fn new(select_list: Vec<SelectFieldSyntax>) -> (Self, Vec<ExprLabel>) {
let mut label_gen = ExprLabelGenerator::default();
let mut value_expressions = HashMap::new();
let mut value_aliased_labels = HashMap::new();
let mut aggr_expressions = HashMap::new();
let mut aggr_aliased_labels = HashMap::new();

let mut value_expr_labels = Vec::new();
let mut aggr_expr_labels = Vec::new();

select_list
let expr_labels = select_list
.into_iter()
.for_each(|select_field| match select_field {
.map(|select_field| match select_field {
SelectFieldSyntax::ValueExpr { value_expr, alias } => {
let label = label_gen.next_value();
value_expressions.insert(label, value_expr);
if let Some(alias) = alias {
value_aliased_labels.insert(alias, label);
}
value_expr_labels.push(label);
ExprLabel::Value(label)
}
SelectFieldSyntax::AggrExpr { aggr_expr, alias } => {
let label = label_gen.next_aggr();
aggr_expressions.insert(label, aggr_expr);
if let Some(alias) = alias {
aggr_aliased_labels.insert(alias, label);
}
aggr_expr_labels.push(label);
ExprLabel::Aggr(label)
}
});
})
.collect();

(
Self {
Expand All @@ -75,8 +71,7 @@ impl ExprResolver {
aggr_aliased_labels,
aggr_expression_results: HashMap::new(),
},
value_expr_labels,
aggr_expr_labels,
expr_labels,
)
}

Expand Down Expand Up @@ -164,6 +159,8 @@ impl ExprResolver {

/// label -> (internal) value expression inside aggr expr + tuple (for ColumnReference) -> SqlValue.
///
/// _inner_ means: AGGR_FUNC(inner_value_expr)
///
/// # Panics
///
/// - `label` is not found
Expand Down Expand Up @@ -210,48 +207,53 @@ mod tests {
},
];

let (mut resolver, value_labels_select_list, _aggr_labels_select_list) =
ExprResolver::new(select_list);
let (mut resolver, labels_select_list) = ExprResolver::new(select_list);

assert_eq!(
resolver
.resolve_value_alias(ValueAlias::new("a1".to_string()))
.unwrap(),
value_labels_select_list[1]
);
assert!(resolver
.resolve_value_alias(ValueAlias::new("a404".to_string()))
.is_err(),);
if let &[ExprLabel::Value(value_label0), ExprLabel::Value(value_label1)] =
&labels_select_list[..]
{
assert_eq!(
resolver
.resolve_value_alias(ValueAlias::new("a1".to_string()))
.unwrap(),
value_label1
);
assert!(resolver
.resolve_value_alias(ValueAlias::new("a404".to_string()))
.is_err(),);

let label = resolver.register_value_expr(ValueExpr::factory_add(
ValueExpr::factory_integer(3),
ValueExpr::factory_integer(3),
));
let label = resolver.register_value_expr(ValueExpr::factory_add(
ValueExpr::factory_integer(3),
ValueExpr::factory_integer(3),
));

let empty_tuple = Tuple::new(SpringTimestamp::fx_ts1(), vec![]);
let empty_tuple = Tuple::new(SpringTimestamp::fx_ts1(), vec![]);

assert_eq!(
resolver
.eval_value_expr(value_labels_select_list[0], &empty_tuple)
.unwrap(),
SqlValue::factory_integer(2)
);
assert_eq!(
resolver
.eval_value_expr(value_labels_select_list[0], &empty_tuple)
.unwrap(),
SqlValue::factory_integer(2),
"eval twice"
);
assert_eq!(
resolver
.eval_value_expr(value_labels_select_list[1], &empty_tuple)
.unwrap(),
SqlValue::factory_integer(4)
);
assert_eq!(
resolver.eval_value_expr(label, &empty_tuple).unwrap(),
SqlValue::factory_integer(6)
);
assert_eq!(
resolver
.eval_value_expr(value_label0, &empty_tuple)
.unwrap(),
SqlValue::factory_integer(2)
);
assert_eq!(
resolver
.eval_value_expr(value_label0, &empty_tuple)
.unwrap(),
SqlValue::factory_integer(2),
"eval twice"
);
assert_eq!(
resolver
.eval_value_expr(value_label1, &empty_tuple)
.unwrap(),
SqlValue::factory_integer(4)
);
assert_eq!(
resolver.eval_value_expr(label, &empty_tuple).unwrap(),
SqlValue::factory_integer(6)
);
} else {
unreachable!()
}
}
}
7 changes: 7 additions & 0 deletions springql-core/src/expr_resolver/expr_label.rs
Expand Up @@ -25,3 +25,10 @@ pub(crate) struct ValueExprLabel(u16);

#[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)]
pub(crate) struct AggrExprLabel(u16);

/// Either ValueExprLabel or AggrExprLabel.
#[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)]
pub(crate) enum ExprLabel {
Value(ValueExprLabel),
Aggr(AggrExprLabel),
}
2 changes: 1 addition & 1 deletion springql-core/src/pipeline/field.rs
Expand Up @@ -13,7 +13,7 @@ use crate::{mem_size::MemSize, stream_engine::SqlValue};

use self::field_name::ColumnReference;

/// Field == SqlValue + FieldName
/// Field == SqlValue + ColumnReference
#[derive(Clone, PartialEq, Debug, new)]
pub(crate) struct Field {
name: ColumnReference,
Expand Down
21 changes: 15 additions & 6 deletions springql-core/src/sql_processor/query_planner.rs
Expand Up @@ -59,7 +59,7 @@ mod select_syntax_analyzer;

use crate::{
error::Result,
expr_resolver::ExprResolver,
expr_resolver::{expr_label::ExprLabel, ExprResolver},
pipeline::{
pump_model::{
window_operation_parameter::{
Expand Down Expand Up @@ -92,11 +92,10 @@ impl QueryPlanner {
}

pub(crate) fn plan(self, pipeline: &Pipeline) -> Result<QueryPlan> {
let (mut expr_resolver, value_labels_select_list, aggr_labels_select_list) =
let (mut expr_resolver, labels_select_list) =
ExprResolver::new(self.analyzer.select_list().to_vec());
let projection = ProjectionOp {
value_expr_labels: value_labels_select_list,
aggr_expr_labels: aggr_labels_select_list,
expr_labels: labels_select_list,
};

let group_aggr_window =
Expand Down Expand Up @@ -140,11 +139,21 @@ impl QueryPlanner {
projection_op: &ProjectionOp,
) -> Result<Option<GroupAggregateParameter>> {
let opt_grouping_elem = self.analyzer.grouping_element();
let aggr_labels = &projection_op.aggr_expr_labels;
let aggr_labels = projection_op
.expr_labels
.iter()
.filter_map(|label| {
if let ExprLabel::Aggr(aggr_label) = label {
Some(*aggr_label)
} else {
None
}
})
.collect::<Vec<_>>();

match (opt_grouping_elem, aggr_labels.len()) {
(Some(grouping_elem), 1) => {
let aggr_label = aggr_labels.iter().next().expect("len checked");
let aggr_label = aggr_labels.get(0).expect("len checked");
let aggr_func = expr_resolver.resolve_aggr_expr(*aggr_label).func;

let group_by_label = match grouping_elem {
Expand Down
@@ -1,10 +1,9 @@
// This file is part of https://github.com/SpringQL/SpringQL which is licensed under MIT OR Apache-2.0. See file LICENSE-MIT or LICENSE-APACHE for full license details.

pub(super) mod aggr_projection_subtask;
pub(super) mod collect_subtask;
pub(super) mod group_aggregate_window_subtask;
pub(super) mod join_subtask;
pub(super) mod value_projection_subtask;
pub(super) mod projection_subtask;

use std::sync::{Arc, Mutex, MutexGuard};

Expand Down Expand Up @@ -36,26 +35,23 @@ use crate::{
InQueueMetricsUpdateByTask, WindowInFlowByWindowTask,
},
row::{column::stream_column::StreamColumns, column_values::ColumnValues, Row},
task::window::aggregate::GroupAggrOut,
},
command::query_plan::query_plan_operation::JoinOp,
},
};

use self::{
aggr_projection_subtask::AggrProjectionSubtask, collect_subtask::CollectSubtask,
group_aggregate_window_subtask::GroupAggregateWindowSubtask, join_subtask::JoinSubtask,
value_projection_subtask::ValueProjectionSubtask,
collect_subtask::CollectSubtask, group_aggregate_window_subtask::GroupAggregateWindowSubtask,
join_subtask::JoinSubtask, projection_subtask::ProjectionSubtask,
};

/// Process input row 1-by-1.
#[derive(Debug)]
pub(in crate::stream_engine::autonomous_executor) struct QuerySubtask {
expr_resolver: ExprResolver,

value_projection_subtask: Option<ValueProjectionSubtask>,
projection_subtask: ProjectionSubtask,

aggr_projection_subtask: Option<AggrProjectionSubtask>,
group_aggr_window_subtask: Option<GroupAggregateWindowSubtask>,

// TODO recursive JOIN
Expand Down Expand Up @@ -126,52 +122,20 @@ impl QuerySubtask {

let (left_collect_subtask, join) = Self::subtasks_from_lower_ops(plan.lower_ops);

if plan.upper_ops.projection.aggr_expr_labels.is_empty() {
let value_projection_subtask =
ValueProjectionSubtask::new(plan.upper_ops.projection.value_expr_labels);

Self {
expr_resolver: plan.expr_resolver,
value_projection_subtask: Some(value_projection_subtask),
aggr_projection_subtask: None,
group_aggr_window_subtask: None,
left_collect_subtask,
join,
rng,
}
} else {
assert_eq!(
plan.upper_ops.projection.aggr_expr_labels.len(),
1,
"currently only 1 aggregate in select_list is supported"
);
assert_eq!(
plan.upper_ops.projection.value_expr_labels.len(),
1,
"currently only GROUP BY expression in select_list is supported"
);

let aggr_projection_subtask = AggrProjectionSubtask::new(
plan.upper_ops.projection.value_expr_labels[0],
plan.upper_ops.projection.aggr_expr_labels[0],
);

let op = plan
.upper_ops
.group_aggr_window
.expect("select_list includes aggregate ");
let group_aggr_window_subtask =
GroupAggregateWindowSubtask::new(op.window_param, op.op_param);

Self {
expr_resolver: plan.expr_resolver,
value_projection_subtask: None,
aggr_projection_subtask: Some(aggr_projection_subtask),
group_aggr_window_subtask: Some(group_aggr_window_subtask),
left_collect_subtask,
join,
rng,
}
let group_aggr_window_subtask = plan
.upper_ops
.group_aggr_window
.map(|op| GroupAggregateWindowSubtask::new(op.window_param, op.op_param));

let projection_subtask = ProjectionSubtask::new(plan.upper_ops.projection.expr_labels);

Self {
expr_resolver: plan.expr_resolver,
projection_subtask,
group_aggr_window_subtask,
left_collect_subtask,
join,
rng,
}
}
/// (left collect subtask, Option<(join subtask, right collect subtask)>)
Expand Down Expand Up @@ -248,40 +212,26 @@ impl QuerySubtask {
tuple: Tuple,
) -> Result<(Vec<SqlValues>, WindowInFlowByWindowTask)> {
if let Some(group_aggr_window_subtask) = &self.group_aggr_window_subtask {
let (group_aggr_out_seq, window_in_flow) =
let (aggregated_and_grouping_values_seq, window_in_flow) =
group_aggr_window_subtask.run(&self.expr_resolver, tuple);

let values = self.run_aggr_projection_op(group_aggr_out_seq)?;
let values_seq = aggregated_and_grouping_values_seq
.into_iter()
.map(|aggregated_and_grouping_values| {
self.projection_subtask
.run_with_aggr(aggregated_and_grouping_values)
})
.collect::<Result<Vec<_>>>()?;

Ok((values, window_in_flow))
Ok((values_seq, window_in_flow))
} else {
let values = self.run_projection_op(&tuple)?;
let values = self
.projection_subtask
.run_without_aggr(&self.expr_resolver, &tuple)?;
Ok((vec![values], WindowInFlowByWindowTask::zero()))
}
}

fn run_projection_op(&self, tuple: &Tuple) -> Result<SqlValues> {
self.value_projection_subtask
.as_ref()
.unwrap()
.run(&self.expr_resolver, tuple)
}

fn run_aggr_projection_op(
&self,
group_aggr_out_seq: Vec<GroupAggrOut>,
) -> Result<Vec<SqlValues>> {
group_aggr_out_seq
.into_iter()
.map(|group_agg_out| {
self.aggr_projection_subtask
.as_ref()
.unwrap()
.run(group_agg_out)
})
.collect()
}

/// # Returns
///
/// None when input queue does not exist or is empty or JOIN op does not emit output yet.
Expand Down