Skip to content

Commit

Permalink
implement aggregation
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Chien <stdrc@outlook.com>
  • Loading branch information
stdrc committed Apr 19, 2023
1 parent 161fc05 commit 9818593
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 26 deletions.
18 changes: 8 additions & 10 deletions src/stream/src/executor/over_window/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use std::marker::PhantomData;

use futures::StreamExt;
use futures_async_stream::{for_await, try_stream};
use itertools::Itertools;
use risingwave_common::array::column::Column;
use risingwave_common::array::stream_record::Record;
use risingwave_common::array::{Op, StreamChunk};
Expand Down Expand Up @@ -246,7 +245,7 @@ impl<S: StateStore> OverWindowExecutor<S> {
.input_pk_indices
.iter()
.map(|idx| this.col_mapping.upstream_to_state_table(*idx).unwrap())
.collect_vec(),
.collect::<Vec<_>>(),
),
&vec![OrderType::ascending(); this.input_pk_indices.len()],
)?
Expand All @@ -265,7 +264,7 @@ impl<S: StateStore> OverWindowExecutor<S> {
.val_indices()
.iter()
.map(|idx| this.col_mapping.upstream_to_state_table(*idx).unwrap())
.collect_vec(),
.collect::<Vec<_>>(),
)
.into_owned_row()
.into_inner()
Expand All @@ -279,9 +278,9 @@ impl<S: StateStore> OverWindowExecutor<S> {

// Ignore ready windows (all ready windows were outputted before).
while partition.is_ready() {
partition.states.iter_mut().for_each(|state| {
state.output();
});
for state in &mut partition.states {
state.output()?;
}
}

cache.put(encoded_partition_key.clone(), partition);
Expand Down Expand Up @@ -364,6 +363,8 @@ impl<S: StateStore> OverWindowExecutor<S> {
.states
.iter_mut()
.map(|state| state.output())
.try_collect::<Vec<_>>()?
.into_iter()
.map(|o| (o.return_value, o.evict_hint))
.unzip();

Expand Down Expand Up @@ -402,10 +403,7 @@ impl<S: StateStore> OverWindowExecutor<S> {
}
}

let columns: Vec<Column> = builders
.into_iter()
.map(|b| b.finish().into())
.collect_vec();
let columns: Vec<Column> = builders.into_iter().map(|b| b.finish().into()).collect();
let chunk_size = columns[0].len();
Ok(if chunk_size > 0 {
Some(StreamChunk::new(
Expand Down
71 changes: 63 additions & 8 deletions src/stream/src/executor/over_window/state/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@

use std::collections::BTreeSet;

use futures::FutureExt;
use risingwave_common::array::{DataChunk, Vis};
use risingwave_common::must_match;
use risingwave_common::types::Datum;
use risingwave_expr::function::aggregate::AggCall;
use risingwave_common::types::{DataType, Datum};
use risingwave_expr::function::aggregate::{AggArgs, AggCall};
use risingwave_expr::function::window::{WindowFuncCall, WindowFuncKind};
use risingwave_expr::vector_op::agg::AggStateFactory;
use smallvec::SmallVec;
Expand All @@ -27,15 +29,22 @@ use crate::executor::StreamExecutorResult;

pub(super) struct AggregateState {
factory: AggStateFactory,
arg_data_types: Vec<DataType>,
buffer: WindowBuffer<StateKey, SmallVec<[Datum; 2]>>,
}

impl AggregateState {
pub fn new(call: &WindowFuncCall) -> StreamExecutorResult<Self> {
let agg_kind = must_match!(call.kind, WindowFuncKind::Aggregate(agg_kind) => agg_kind);
let arg_data_types = call.args.arg_types().to_vec();
let agg_call = AggCall {
kind: agg_kind,
args: call.args.clone(),
args: match &call.args {
// convert args to [0] or [0, 1]
AggArgs::None => AggArgs::None,
AggArgs::Unary(data_type, _) => AggArgs::Unary(data_type.to_owned(), 0),
AggArgs::Binary(data_types, _) => AggArgs::Binary(data_types.to_owned(), [0, 1]),
},
return_type: call.return_type.clone(),
column_orders: Vec::new(), // the input is already sorted
// TODO(rc): support filter on window function call
Expand All @@ -45,6 +54,7 @@ impl AggregateState {
};
Ok(Self {
factory: AggStateFactory::new(agg_call)?,
arg_data_types,
buffer: WindowBuffer::new(call.frame.clone()),
})
}
Expand All @@ -63,12 +73,17 @@ impl WindowState for AggregateState {
}
}

fn output(&mut self) -> StateOutput {
debug_assert!(self.curr_window().is_ready);
let aggregator = self.factory.create_agg_state();
let return_value = None; // TODO(): do aggregation
fn output(&mut self) -> StreamExecutorResult<StateOutput> {
assert!(self.buffer.curr_window().is_ready());
let wrapper = BatchAggregatorWrapper {
factory: &self.factory,
arg_data_types: &self.arg_data_types,
};
let return_value = wrapper.aggregate(self.buffer.curr_window_values().map(
|val: &SmallVec<[Option<risingwave_common::types::ScalarImpl>; 2]>| val.as_slice(),
))?;
let removed_keys: BTreeSet<_> = self.buffer.slide().collect();
StateOutput {
Ok(StateOutput {
return_value,
evict_hint: if removed_keys.is_empty() {
StateEvictHint::CannotEvict(
Expand All @@ -81,6 +96,46 @@ impl WindowState for AggregateState {
} else {
StateEvictHint::CanEvict(removed_keys)
},
})
}
}

struct BatchAggregatorWrapper<'a> {
factory: &'a AggStateFactory,
arg_data_types: &'a [DataType],
}

impl BatchAggregatorWrapper<'_> {
fn aggregate<'a>(
&'a self,
values: impl ExactSizeIterator<Item = &'a [Datum]>,
) -> StreamExecutorResult<Datum> {
let n_values = values.len();

let mut args_builders = self
.arg_data_types
.iter()
.map(|data_type| data_type.create_array_builder(n_values))
.collect::<Vec<_>>();
for value in values {
for (builder, datum) in args_builders.iter_mut().zip(value.iter()) {
builder.append_datum(datum);
}
}
let columns = args_builders
.into_iter()
.map(|builder| builder.finish().into())
.collect::<Vec<_>>();
let chunk = DataChunk::new(columns, Vis::Compact(n_values));

let mut aggregator = self.factory.create_agg_state();
aggregator
.update_multi(&chunk, 0, n_values)
.now_or_never()
.expect("we don't support UDAF currently, so the function should return immediately")?;

let mut ret_value_builder = aggregator.return_type().create_array_builder(1);
aggregator.output(&mut ret_value_builder)?;
Ok(ret_value_builder.finish().to_datum())
}
}
2 changes: 1 addition & 1 deletion src/stream/src/executor/over_window/state/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ impl<K: Ord, V> WindowBuffer<K, V> {

/// Iterate over the values in the current window.
/// Panics if the current window is not ready.
pub fn curr_window_values(&self) -> impl Iterator<Item = &V> {
pub fn curr_window_values(&self) -> impl ExactSizeIterator<Item = &V> {
assert!(self.curr_window().is_ready());
self.buffer
.range(LEFT_IDX..=self.right_idx)
Expand Down
7 changes: 4 additions & 3 deletions src/stream/src/executor/over_window/state/lag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use smallvec::SmallVec;

use super::{StateKey, StateOutput, StatePos, WindowState};
use crate::executor::over_window::state::StateEvictHint;
use crate::executor::StreamExecutorResult;

struct BufferEntry(StateKey, Datum);

Expand Down Expand Up @@ -62,9 +63,9 @@ impl WindowState for LagState {
}
}

fn output(&mut self) -> StateOutput {
fn output(&mut self) -> StreamExecutorResult<StateOutput> {
debug_assert!(self.curr_window().is_ready);
if self.curr_idx < self.offset {
Ok(if self.curr_idx < self.offset {
// the ready window doesn't have enough preceding rows, just return NULL
self.curr_idx += 1;
StateOutput {
Expand All @@ -79,6 +80,6 @@ impl WindowState for LagState {
return_value: value,
evict_hint: StateEvictHint::CanEvict(std::iter::once(key).collect()),
}
}
})
}
}
7 changes: 4 additions & 3 deletions src/stream/src/executor/over_window/state/lead.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use smallvec::SmallVec;

use super::{StateKey, StateOutput, StatePos, WindowState};
use crate::executor::over_window::state::StateEvictHint;
use crate::executor::StreamExecutorResult;

struct BufferEntry(StateKey, Datum);

Expand Down Expand Up @@ -53,13 +54,13 @@ impl WindowState for LeadState {
}
}

fn output(&mut self) -> StateOutput {
fn output(&mut self) -> StreamExecutorResult<StateOutput> {
debug_assert!(self.curr_window().is_ready);
let lead_value = self.buffer[self.offset].1.clone();
let BufferEntry(key, _) = self.buffer.pop_front().unwrap();
StateOutput {
Ok(StateOutput {
return_value: lead_value,
evict_hint: StateEvictHint::CanEvict(std::iter::once(key).collect()),
}
})
}
}
2 changes: 1 addition & 1 deletion src/stream/src/executor/over_window/state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ pub(super) trait WindowState {
fn curr_window(&self) -> StatePos<'_>;

/// Return the output for the current ready window frame and push the window forward.
fn output(&mut self) -> StateOutput;
fn output(&mut self) -> StreamExecutorResult<StateOutput>;

// fn curr_output(&self) -> Datum {
// todo!()
Expand Down

0 comments on commit 9818593

Please sign in to comment.