Skip to content

Commit

Permalink
Merge pull request #128 from SpringQL/fix/select_list-order-with-aggr
Browse files Browse the repository at this point in the history
fix: SELECT with aggregate preserves select_list order
  • Loading branch information
laysakura committed May 11, 2022
2 parents 74ad08b + f124810 commit cccad2d
Show file tree
Hide file tree
Showing 16 changed files with 763 additions and 586 deletions.
110 changes: 56 additions & 54 deletions springql-core/src/expr_resolver.rs
Expand Up @@ -10,7 +10,7 @@ use crate::stream_engine::{SqlValue, Tuple};
use anyhow::anyhow;
use std::collections::HashMap;

use self::expr_label::{AggrExprLabel, ExprLabelGenerator, ValueExprLabel};
use self::expr_label::{AggrExprLabel, ExprLabel, ExprLabelGenerator, ValueExprLabel};

/// ExprResolver is to:
///
Expand All @@ -32,39 +32,35 @@ pub(crate) struct ExprResolver {
impl ExprResolver {
/// # Returns
///
/// `(instance, value expr labels in select_list, aggr expr labels in select_list)
pub(crate) fn new(
select_list: Vec<SelectFieldSyntax>,
) -> (Self, Vec<ValueExprLabel>, Vec<AggrExprLabel>) {
/// `(instance, value/aggr expr labels in select_list)
pub(crate) fn new(select_list: Vec<SelectFieldSyntax>) -> (Self, Vec<ExprLabel>) {
let mut label_gen = ExprLabelGenerator::default();
let mut value_expressions = HashMap::new();
let mut value_aliased_labels = HashMap::new();
let mut aggr_expressions = HashMap::new();
let mut aggr_aliased_labels = HashMap::new();

let mut value_expr_labels = Vec::new();
let mut aggr_expr_labels = Vec::new();

select_list
let expr_labels = select_list
.into_iter()
.for_each(|select_field| match select_field {
.map(|select_field| match select_field {
SelectFieldSyntax::ValueExpr { value_expr, alias } => {
let label = label_gen.next_value();
value_expressions.insert(label, value_expr);
if let Some(alias) = alias {
value_aliased_labels.insert(alias, label);
}
value_expr_labels.push(label);
ExprLabel::Value(label)
}
SelectFieldSyntax::AggrExpr { aggr_expr, alias } => {
let label = label_gen.next_aggr();
aggr_expressions.insert(label, aggr_expr);
if let Some(alias) = alias {
aggr_aliased_labels.insert(alias, label);
}
aggr_expr_labels.push(label);
ExprLabel::Aggr(label)
}
});
})
.collect();

(
Self {
Expand All @@ -75,8 +71,7 @@ impl ExprResolver {
aggr_aliased_labels,
aggr_expression_results: HashMap::new(),
},
value_expr_labels,
aggr_expr_labels,
expr_labels,
)
}

Expand Down Expand Up @@ -164,6 +159,8 @@ impl ExprResolver {

/// label -> (internal) value expression inside aggr expr + tuple (for ColumnReference) -> SqlValue.
///
/// _inner_ means: AGGR_FUNC(inner_value_expr)
///
/// # Panics
///
/// - `label` is not found
Expand Down Expand Up @@ -210,48 +207,53 @@ mod tests {
},
];

let (mut resolver, value_labels_select_list, _aggr_labels_select_list) =
ExprResolver::new(select_list);
let (mut resolver, labels_select_list) = ExprResolver::new(select_list);

assert_eq!(
resolver
.resolve_value_alias(ValueAlias::new("a1".to_string()))
.unwrap(),
value_labels_select_list[1]
);
assert!(resolver
.resolve_value_alias(ValueAlias::new("a404".to_string()))
.is_err(),);
if let &[ExprLabel::Value(value_label0), ExprLabel::Value(value_label1)] =
&labels_select_list[..]
{
assert_eq!(
resolver
.resolve_value_alias(ValueAlias::new("a1".to_string()))
.unwrap(),
value_label1
);
assert!(resolver
.resolve_value_alias(ValueAlias::new("a404".to_string()))
.is_err(),);

let label = resolver.register_value_expr(ValueExpr::factory_add(
ValueExpr::factory_integer(3),
ValueExpr::factory_integer(3),
));
let label = resolver.register_value_expr(ValueExpr::factory_add(
ValueExpr::factory_integer(3),
ValueExpr::factory_integer(3),
));

let empty_tuple = Tuple::new(SpringTimestamp::fx_ts1(), vec![]);
let empty_tuple = Tuple::new(SpringTimestamp::fx_ts1(), vec![]);

assert_eq!(
resolver
.eval_value_expr(value_labels_select_list[0], &empty_tuple)
.unwrap(),
SqlValue::factory_integer(2)
);
assert_eq!(
resolver
.eval_value_expr(value_labels_select_list[0], &empty_tuple)
.unwrap(),
SqlValue::factory_integer(2),
"eval twice"
);
assert_eq!(
resolver
.eval_value_expr(value_labels_select_list[1], &empty_tuple)
.unwrap(),
SqlValue::factory_integer(4)
);
assert_eq!(
resolver.eval_value_expr(label, &empty_tuple).unwrap(),
SqlValue::factory_integer(6)
);
assert_eq!(
resolver
.eval_value_expr(value_label0, &empty_tuple)
.unwrap(),
SqlValue::factory_integer(2)
);
assert_eq!(
resolver
.eval_value_expr(value_label0, &empty_tuple)
.unwrap(),
SqlValue::factory_integer(2),
"eval twice"
);
assert_eq!(
resolver
.eval_value_expr(value_label1, &empty_tuple)
.unwrap(),
SqlValue::factory_integer(4)
);
assert_eq!(
resolver.eval_value_expr(label, &empty_tuple).unwrap(),
SqlValue::factory_integer(6)
);
} else {
unreachable!()
}
}
}
7 changes: 7 additions & 0 deletions springql-core/src/expr_resolver/expr_label.rs
Expand Up @@ -25,3 +25,10 @@ pub(crate) struct ValueExprLabel(u16);

#[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)]
pub(crate) struct AggrExprLabel(u16);

/// Either ValueExprLabel or AggrExprLabel.
#[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)]
pub(crate) enum ExprLabel {
Value(ValueExprLabel),
Aggr(AggrExprLabel),
}
2 changes: 1 addition & 1 deletion springql-core/src/pipeline/field.rs
Expand Up @@ -13,7 +13,7 @@ use crate::{mem_size::MemSize, stream_engine::SqlValue};

use self::field_name::ColumnReference;

/// Field == SqlValue + FieldName
/// Field == SqlValue + ColumnReference
#[derive(Clone, PartialEq, Debug, new)]
pub(crate) struct Field {
name: ColumnReference,
Expand Down
21 changes: 15 additions & 6 deletions springql-core/src/sql_processor/query_planner.rs
Expand Up @@ -59,7 +59,7 @@ mod select_syntax_analyzer;

use crate::{
error::Result,
expr_resolver::ExprResolver,
expr_resolver::{expr_label::ExprLabel, ExprResolver},
pipeline::{
pump_model::{
window_operation_parameter::{
Expand Down Expand Up @@ -92,11 +92,10 @@ impl QueryPlanner {
}

pub(crate) fn plan(self, pipeline: &Pipeline) -> Result<QueryPlan> {
let (mut expr_resolver, value_labels_select_list, aggr_labels_select_list) =
let (mut expr_resolver, labels_select_list) =
ExprResolver::new(self.analyzer.select_list().to_vec());
let projection = ProjectionOp {
value_expr_labels: value_labels_select_list,
aggr_expr_labels: aggr_labels_select_list,
expr_labels: labels_select_list,
};

let group_aggr_window =
Expand Down Expand Up @@ -140,11 +139,21 @@ impl QueryPlanner {
projection_op: &ProjectionOp,
) -> Result<Option<GroupAggregateParameter>> {
let opt_grouping_elem = self.analyzer.grouping_element();
let aggr_labels = &projection_op.aggr_expr_labels;
let aggr_labels = projection_op
.expr_labels
.iter()
.filter_map(|label| {
if let ExprLabel::Aggr(aggr_label) = label {
Some(*aggr_label)
} else {
None
}
})
.collect::<Vec<_>>();

match (opt_grouping_elem, aggr_labels.len()) {
(Some(grouping_elem), 1) => {
let aggr_label = aggr_labels.iter().next().expect("len checked");
let aggr_label = aggr_labels.get(0).expect("len checked");
let aggr_func = expr_resolver.resolve_aggr_expr(*aggr_label).func;

let group_by_label = match grouping_elem {
Expand Down
@@ -1,10 +1,9 @@
// This file is part of https://github.com/SpringQL/SpringQL which is licensed under MIT OR Apache-2.0. See file LICENSE-MIT or LICENSE-APACHE for full license details.

pub(super) mod aggr_projection_subtask;
pub(super) mod collect_subtask;
pub(super) mod group_aggregate_window_subtask;
pub(super) mod join_subtask;
pub(super) mod value_projection_subtask;
pub(super) mod projection_subtask;

use std::sync::{Arc, Mutex, MutexGuard};

Expand Down Expand Up @@ -36,26 +35,23 @@ use crate::{
InQueueMetricsUpdateByTask, WindowInFlowByWindowTask,
},
row::{column::stream_column::StreamColumns, column_values::ColumnValues, Row},
task::window::aggregate::GroupAggrOut,
},
command::query_plan::query_plan_operation::JoinOp,
},
};

use self::{
aggr_projection_subtask::AggrProjectionSubtask, collect_subtask::CollectSubtask,
group_aggregate_window_subtask::GroupAggregateWindowSubtask, join_subtask::JoinSubtask,
value_projection_subtask::ValueProjectionSubtask,
collect_subtask::CollectSubtask, group_aggregate_window_subtask::GroupAggregateWindowSubtask,
join_subtask::JoinSubtask, projection_subtask::ProjectionSubtask,
};

/// Process input row 1-by-1.
#[derive(Debug)]
pub(in crate::stream_engine::autonomous_executor) struct QuerySubtask {
expr_resolver: ExprResolver,

value_projection_subtask: Option<ValueProjectionSubtask>,
projection_subtask: ProjectionSubtask,

aggr_projection_subtask: Option<AggrProjectionSubtask>,
group_aggr_window_subtask: Option<GroupAggregateWindowSubtask>,

// TODO recursive JOIN
Expand Down Expand Up @@ -126,52 +122,20 @@ impl QuerySubtask {

let (left_collect_subtask, join) = Self::subtasks_from_lower_ops(plan.lower_ops);

if plan.upper_ops.projection.aggr_expr_labels.is_empty() {
let value_projection_subtask =
ValueProjectionSubtask::new(plan.upper_ops.projection.value_expr_labels);

Self {
expr_resolver: plan.expr_resolver,
value_projection_subtask: Some(value_projection_subtask),
aggr_projection_subtask: None,
group_aggr_window_subtask: None,
left_collect_subtask,
join,
rng,
}
} else {
assert_eq!(
plan.upper_ops.projection.aggr_expr_labels.len(),
1,
"currently only 1 aggregate in select_list is supported"
);
assert_eq!(
plan.upper_ops.projection.value_expr_labels.len(),
1,
"currently only GROUP BY expression in select_list is supported"
);

let aggr_projection_subtask = AggrProjectionSubtask::new(
plan.upper_ops.projection.value_expr_labels[0],
plan.upper_ops.projection.aggr_expr_labels[0],
);

let op = plan
.upper_ops
.group_aggr_window
.expect("select_list includes aggregate ");
let group_aggr_window_subtask =
GroupAggregateWindowSubtask::new(op.window_param, op.op_param);

Self {
expr_resolver: plan.expr_resolver,
value_projection_subtask: None,
aggr_projection_subtask: Some(aggr_projection_subtask),
group_aggr_window_subtask: Some(group_aggr_window_subtask),
left_collect_subtask,
join,
rng,
}
let group_aggr_window_subtask = plan
.upper_ops
.group_aggr_window
.map(|op| GroupAggregateWindowSubtask::new(op.window_param, op.op_param));

let projection_subtask = ProjectionSubtask::new(plan.upper_ops.projection.expr_labels);

Self {
expr_resolver: plan.expr_resolver,
projection_subtask,
group_aggr_window_subtask,
left_collect_subtask,
join,
rng,
}
}
/// (left collect subtask, Option<(join subtask, right collect subtask)>)
Expand Down Expand Up @@ -248,40 +212,26 @@ impl QuerySubtask {
tuple: Tuple,
) -> Result<(Vec<SqlValues>, WindowInFlowByWindowTask)> {
if let Some(group_aggr_window_subtask) = &self.group_aggr_window_subtask {
let (group_aggr_out_seq, window_in_flow) =
let (aggregated_and_grouping_values_seq, window_in_flow) =
group_aggr_window_subtask.run(&self.expr_resolver, tuple);

let values = self.run_aggr_projection_op(group_aggr_out_seq)?;
let values_seq = aggregated_and_grouping_values_seq
.into_iter()
.map(|aggregated_and_grouping_values| {
self.projection_subtask
.run_with_aggr(aggregated_and_grouping_values)
})
.collect::<Result<Vec<_>>>()?;

Ok((values, window_in_flow))
Ok((values_seq, window_in_flow))
} else {
let values = self.run_projection_op(&tuple)?;
let values = self
.projection_subtask
.run_without_aggr(&self.expr_resolver, &tuple)?;
Ok((vec![values], WindowInFlowByWindowTask::zero()))
}
}

fn run_projection_op(&self, tuple: &Tuple) -> Result<SqlValues> {
self.value_projection_subtask
.as_ref()
.unwrap()
.run(&self.expr_resolver, tuple)
}

fn run_aggr_projection_op(
&self,
group_aggr_out_seq: Vec<GroupAggrOut>,
) -> Result<Vec<SqlValues>> {
group_aggr_out_seq
.into_iter()
.map(|group_agg_out| {
self.aggr_projection_subtask
.as_ref()
.unwrap()
.run(group_agg_out)
})
.collect()
}

/// # Returns
///
/// None when input queue does not exist or is empty or JOIN op does not emit output yet.
Expand Down

0 comments on commit cccad2d

Please sign in to comment.