Skip to content

Commit

Permalink
fix: SELECT with aggragate preserves select_list order
Browse files Browse the repository at this point in the history
  • Loading branch information
laysakura committed May 11, 2022
1 parent 8d6f620 commit 71dddbe
Show file tree
Hide file tree
Showing 13 changed files with 602 additions and 533 deletions.
4 changes: 1 addition & 3 deletions springql-core/src/expr_resolver.rs
Expand Up @@ -158,7 +158,7 @@ impl ExprResolver {
}

/// label -> (internal) value expression inside aggr expr + tuple (for ColumnReference) -> SqlValue.
///
///
/// _inner_ means: AGGR_FUNC(inner_value_expr)
///
/// # Panics
Expand All @@ -184,8 +184,6 @@ impl ExprResolver {

#[cfg(test)]
mod tests {
use std::thread::panicking;

use crate::{expression::ValueExpr, stream_engine::time::timestamp::SpringTimestamp};

use super::*;
Expand Down
7 changes: 7 additions & 0 deletions springql-core/src/expr_resolver/expr_label.rs
Expand Up @@ -25,3 +25,10 @@ pub(crate) struct ValueExprLabel(u16);

#[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)]
pub(crate) struct AggrExprLabel(u16);

/// Either ValueExprLabel or AggrExprLabel.
#[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)]
pub(crate) enum ExprLabel {
Value(ValueExprLabel),
Aggr(AggrExprLabel),
}
21 changes: 15 additions & 6 deletions springql-core/src/sql_processor/query_planner.rs
Expand Up @@ -59,7 +59,7 @@ mod select_syntax_analyzer;

use crate::{
error::Result,
expr_resolver::ExprResolver,
expr_resolver::{expr_label::ExprLabel, ExprResolver},
pipeline::{
pump_model::{
window_operation_parameter::{
Expand Down Expand Up @@ -92,11 +92,10 @@ impl QueryPlanner {
}

pub(crate) fn plan(self, pipeline: &Pipeline) -> Result<QueryPlan> {
let (mut expr_resolver, value_labels_select_list, aggr_labels_select_list) =
let (mut expr_resolver, labels_select_list) =
ExprResolver::new(self.analyzer.select_list().to_vec());
let projection = ProjectionOp {
value_expr_labels: value_labels_select_list,
aggr_expr_labels: aggr_labels_select_list,
expr_labels: labels_select_list,
};

let group_aggr_window =
Expand Down Expand Up @@ -140,11 +139,21 @@ impl QueryPlanner {
projection_op: &ProjectionOp,
) -> Result<Option<GroupAggregateParameter>> {
let opt_grouping_elem = self.analyzer.grouping_element();
let aggr_labels = &projection_op.aggr_expr_labels;
let aggr_labels = projection_op
.expr_labels
.iter()
.filter_map(|label| {
if let ExprLabel::Aggr(aggr_label) = label {
Some(*aggr_label)
} else {
None
}
})
.collect::<Vec<_>>();

match (opt_grouping_elem, aggr_labels.len()) {
(Some(grouping_elem), 1) => {
let aggr_label = aggr_labels.iter().next().expect("len checked");
let aggr_label = aggr_labels.get(0).expect("len checked");
let aggr_func = expr_resolver.resolve_aggr_expr(*aggr_label).func;

let group_by_label = match grouping_elem {
Expand Down
@@ -1,10 +1,9 @@
// This file is part of https://github.com/SpringQL/SpringQL which is licensed under MIT OR Apache-2.0. See file LICENSE-MIT or LICENSE-APACHE for full license details.

pub(super) mod aggr_projection_subtask;
pub(super) mod collect_subtask;
pub(super) mod group_aggregate_window_subtask;
pub(super) mod join_subtask;
pub(super) mod value_projection_subtask;
pub(super) mod projection_subtask;

use std::sync::{Arc, Mutex, MutexGuard};

Expand Down Expand Up @@ -36,26 +35,23 @@ use crate::{
InQueueMetricsUpdateByTask, WindowInFlowByWindowTask,
},
row::{column::stream_column::StreamColumns, column_values::ColumnValues, Row},
task::window::aggregate::GroupAggrOut,
},
command::query_plan::query_plan_operation::JoinOp,
},
};

use self::{
aggr_projection_subtask::AggrProjectionSubtask, collect_subtask::CollectSubtask,
group_aggregate_window_subtask::GroupAggregateWindowSubtask, join_subtask::JoinSubtask,
value_projection_subtask::ValueProjectionSubtask,
collect_subtask::CollectSubtask, group_aggregate_window_subtask::GroupAggregateWindowSubtask,
join_subtask::JoinSubtask, projection_subtask::ProjectionSubtask,
};

/// Process input row 1-by-1.
#[derive(Debug)]
pub(in crate::stream_engine::autonomous_executor) struct QuerySubtask {
expr_resolver: ExprResolver,

value_projection_subtask: Option<ValueProjectionSubtask>,
projection_subtask: ProjectionSubtask,

aggr_projection_subtask: Option<AggrProjectionSubtask>,
group_aggr_window_subtask: Option<GroupAggregateWindowSubtask>,

// TODO recursive JOIN
Expand Down Expand Up @@ -126,52 +122,20 @@ impl QuerySubtask {

let (left_collect_subtask, join) = Self::subtasks_from_lower_ops(plan.lower_ops);

if plan.upper_ops.projection.aggr_expr_labels.is_empty() {
let value_projection_subtask =
ValueProjectionSubtask::new(plan.upper_ops.projection.value_expr_labels);

Self {
expr_resolver: plan.expr_resolver,
value_projection_subtask: Some(value_projection_subtask),
aggr_projection_subtask: None,
group_aggr_window_subtask: None,
left_collect_subtask,
join,
rng,
}
} else {
assert_eq!(
plan.upper_ops.projection.aggr_expr_labels.len(),
1,
"currently only 1 aggregate in select_list is supported"
);
assert_eq!(
plan.upper_ops.projection.value_expr_labels.len(),
1,
"currently only GROUP BY expression in select_list is supported"
);

let aggr_projection_subtask = AggrProjectionSubtask::new(
plan.upper_ops.projection.value_expr_labels[0],
plan.upper_ops.projection.aggr_expr_labels[0],
);

let op = plan
.upper_ops
.group_aggr_window
.expect("select_list includes aggregate ");
let group_aggr_window_subtask =
GroupAggregateWindowSubtask::new(op.window_param, op.op_param);

Self {
expr_resolver: plan.expr_resolver,
value_projection_subtask: None,
aggr_projection_subtask: Some(aggr_projection_subtask),
group_aggr_window_subtask: Some(group_aggr_window_subtask),
left_collect_subtask,
join,
rng,
}
let group_aggr_window_subtask = plan
.upper_ops
.group_aggr_window
.map(|op| GroupAggregateWindowSubtask::new(op.window_param, op.op_param));

let projection_subtask = ProjectionSubtask::new(plan.upper_ops.projection.expr_labels);

Self {
expr_resolver: plan.expr_resolver,
projection_subtask,
group_aggr_window_subtask,
left_collect_subtask,
join,
rng,
}
}
/// (left collect subtask, Option<(join subtask, right collect subtask)>)
Expand Down Expand Up @@ -248,40 +212,26 @@ impl QuerySubtask {
tuple: Tuple,
) -> Result<(Vec<SqlValues>, WindowInFlowByWindowTask)> {
if let Some(group_aggr_window_subtask) = &self.group_aggr_window_subtask {
let (group_aggr_out_seq, window_in_flow) =
let (aggregated_and_grouping_values_seq, window_in_flow) =
group_aggr_window_subtask.run(&self.expr_resolver, tuple);

let values = self.run_aggr_projection_op(group_aggr_out_seq)?;
let values_seq = aggregated_and_grouping_values_seq
.into_iter()
.map(|aggregated_and_grouping_values| {
self.projection_subtask
.run_with_aggr(aggregated_and_grouping_values)
})
.collect::<Result<Vec<_>>>()?;

Ok((values, window_in_flow))
Ok((values_seq, window_in_flow))
} else {
let values = self.run_projection_op(&tuple)?;
let values = self
.projection_subtask
.run_without_aggr(&self.expr_resolver, &tuple)?;
Ok((vec![values], WindowInFlowByWindowTask::zero()))
}
}

fn run_projection_op(&self, tuple: &Tuple) -> Result<SqlValues> {
self.value_projection_subtask
.as_ref()
.unwrap()
.run(&self.expr_resolver, tuple)
}

fn run_aggr_projection_op(
&self,
group_aggr_out_seq: Vec<GroupAggrOut>,
) -> Result<Vec<SqlValues>> {
group_aggr_out_seq
.into_iter()
.map(|group_agg_out| {
self.aggr_projection_subtask
.as_ref()
.unwrap()
.run(group_agg_out)
})
.collect()
}

/// # Returns
///
/// None when input queue does not exist or is empty or JOIN op does not emit output yet.
Expand Down

This file was deleted.

Expand Up @@ -8,7 +8,7 @@ use crate::pipeline::pump_model::window_parameter::WindowParameter;
use crate::stream_engine::autonomous_executor::performance_metrics::metrics_update_command::metrics_update_by_task_execution::WindowInFlowByWindowTask;
use crate::stream_engine::autonomous_executor::task::tuple::Tuple;
use crate::stream_engine::autonomous_executor::task::window::Window;
use crate::stream_engine::autonomous_executor::task::window::aggregate::{GroupAggrOut, AggrWindow};
use crate::stream_engine::autonomous_executor::task::window::aggregate::{AggregatedAndGroupingValues, AggrWindow};

#[derive(Debug)]
pub(in crate::stream_engine::autonomous_executor) struct GroupAggregateWindowSubtask(
Expand All @@ -28,7 +28,7 @@ impl GroupAggregateWindowSubtask {
&self,
expr_resolver: &ExprResolver,
tuple: Tuple,
) -> (Vec<GroupAggrOut>, WindowInFlowByWindowTask) {
) -> (Vec<AggregatedAndGroupingValues>, WindowInFlowByWindowTask) {
self.0
.lock()
.expect("another thread accessing to window gets poisoned")
Expand Down
@@ -0,0 +1,62 @@
// This file is part of https://github.com/SpringQL/SpringQL which is licensed under MIT OR Apache-2.0. See file LICENSE-MIT or LICENSE-APACHE for full license details.

use crate::error::Result;
use crate::expr_resolver::expr_label::ExprLabel;
use crate::expr_resolver::ExprResolver;
use crate::stream_engine::autonomous_executor::task::tuple::Tuple;
use crate::stream_engine::autonomous_executor::task::window::aggregate::AggregatedAndGroupingValues;

use super::SqlValues;

#[derive(Debug, new)]
pub(in crate::stream_engine::autonomous_executor) struct ProjectionSubtask {
exprs: Vec<ExprLabel>,
}

impl ProjectionSubtask {
/// Projection for SELECT without aggregate.
pub(in crate::stream_engine::autonomous_executor) fn run_without_aggr(
&self,
expr_resolver: &ExprResolver,
tuple: &Tuple,
) -> Result<SqlValues> {
let values = self
.exprs
.iter()
.map(|label| match label {
ExprLabel::Value(group_by_value_label) => {
expr_resolver.eval_value_expr(*group_by_value_label, tuple)
}
ExprLabel::Aggr(_) => unreachable!("aggregate must not be in select_list"),
})
.collect::<Result<Vec<_>>>()?;

Ok(SqlValues::new(values))
}

/// Projection for SELECT with aggregate.
/// select_list must only have GROUP BY elements or aggregate expressions.
/// (Column reference without aggregate is not allowed.)
pub(in crate::stream_engine::autonomous_executor) fn run_with_aggr(
&self,
aggregated_and_grouping_values: AggregatedAndGroupingValues,
) -> Result<SqlValues> {
let values = self
.exprs
.iter()
.map(|label| {
match label {
ExprLabel::Value(group_by_value_label) => {
aggregated_and_grouping_values.get_group_by_value(group_by_value_label)
}
ExprLabel::Aggr(aggr_label) => {
aggregated_and_grouping_values.get_aggregated_value(aggr_label)
}
}
.cloned()
})
.collect::<Result<Vec<_>>>()?;

Ok(SqlValues::new(values))
}
}

This file was deleted.

0 comments on commit 71dddbe

Please sign in to comment.