Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Builtin Window Function Implementation #4441

Merged
merged 1 commit into from Dec 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
24 changes: 6 additions & 18 deletions datafusion/physical-expr/src/window/aggregate.rs
Expand Up @@ -87,15 +87,9 @@ impl WindowExpr for AggregateWindowExpr {
let partition_columns = self.partition_columns(batch)?;
let partition_points =
self.evaluate_partition_points(batch.num_rows(), &partition_columns)?;
let values = self.evaluate_args(batch)?;

let sort_options: Vec<SortOptions> =
self.order_by.iter().map(|o| o.options).collect();
let columns = self.sort_columns(batch)?;
let order_columns: Vec<&ArrayRef> = columns.iter().map(|s| &s.values).collect();
// Sort values, this will make the same partitions consecutive. Also, within the partition
// range, values will be sorted.
let order_bys = &order_columns[self.partition_by.len()..];
let (_, order_bys) = self.get_values_orderbys(batch)?;
let window_frame = if !order_bys.is_empty() && self.window_frame.is_none() {
// OVER (ORDER BY a) case
// We create an implicit window for ORDER BY.
Expand All @@ -107,14 +101,8 @@ impl WindowExpr for AggregateWindowExpr {
for partition_range in &partition_points {
let mut accumulator = self.aggregate.create_accumulator()?;
let length = partition_range.end - partition_range.start;
let slice_order_bys = order_bys
.iter()
.map(|v| v.slice(partition_range.start, length))
.collect::<Vec<_>>();
let value_slice = values
.iter()
.map(|v| v.slice(partition_range.start, length))
.collect::<Vec<_>>();
let (values, order_bys) =
self.get_values_orderbys(&batch.slice(partition_range.start, length))?;

let mut window_frame_ctx = WindowFrameContext::new(&window_frame);
let mut last_range: (usize, usize) = (0, 0);
Expand All @@ -123,7 +111,7 @@ impl WindowExpr for AggregateWindowExpr {
// First, cur_range is calculated, then it is compared with last_range.
for i in 0..length {
let cur_range = window_frame_ctx.calculate_range(
&slice_order_bys,
&order_bys,
&sort_options,
length,
i,
Expand All @@ -135,7 +123,7 @@ impl WindowExpr for AggregateWindowExpr {
// Accumulate any new rows that have entered the window:
let update_bound = cur_range.1 - last_range.1;
if update_bound > 0 {
let update: Vec<ArrayRef> = value_slice
let update: Vec<ArrayRef> = values
.iter()
.map(|v| v.slice(last_range.1, update_bound))
.collect();
Expand All @@ -144,7 +132,7 @@ impl WindowExpr for AggregateWindowExpr {
// Remove rows that have now left the window:
let retract_bound = cur_range.0 - last_range.0;
if retract_bound > 0 {
let retract: Vec<ArrayRef> = value_slice
let retract: Vec<ArrayRef> = values
.iter()
.map(|v| v.slice(last_range.0, retract_bound))
.collect();
Expand Down
27 changes: 10 additions & 17 deletions datafusion/physical-expr/src/window/built_in.rs
Expand Up @@ -21,7 +21,6 @@ use super::window_frame_state::WindowFrameContext;
use super::BuiltInWindowFunctionExpr;
use super::WindowExpr;
use crate::{expressions::PhysicalSortExpr, PhysicalExpr};
use arrow::array::Array;
use arrow::compute::{concat, SortOptions};
use arrow::record_batch::RecordBatch;
use arrow::{array::ArrayRef, datatypes::Field};
Expand Down Expand Up @@ -85,7 +84,7 @@ impl WindowExpr for BuiltInWindowExpr {
}

fn evaluate(&self, batch: &RecordBatch) -> Result<ArrayRef> {
let evaluator = self.expr.create_evaluator(batch)?;
let evaluator = self.expr.create_evaluator()?;
let num_rows = batch.num_rows();
let partition_columns = self.partition_columns(batch)?;
let partition_points =
Expand All @@ -94,12 +93,7 @@ impl WindowExpr for BuiltInWindowExpr {
let results = if evaluator.uses_window_frame() {
let sort_options: Vec<SortOptions> =
self.order_by.iter().map(|o| o.options).collect();
let columns = self.sort_columns(batch)?;
let order_columns: Vec<&ArrayRef> =
columns.iter().map(|s| &s.values).collect();
// Sort values, this will make the same partitions consecutive. Also, within the partition
// range, values will be sorted.
let order_bys = &order_columns[self.partition_by.len()..];
let (_, order_bys) = self.get_values_orderbys(batch)?;
let window_frame = if !order_bys.is_empty() && self.window_frame.is_none() {
// OVER (ORDER BY a) case
// We create an implicit window for ORDER BY.
Expand All @@ -110,24 +104,22 @@ impl WindowExpr for BuiltInWindowExpr {
let mut row_wise_results = vec![];
for partition_range in &partition_points {
let length = partition_range.end - partition_range.start;
let slice_order_bys = order_bys
.iter()
.map(|v| v.slice(partition_range.start, length))
.collect::<Vec<_>>();
let (values, order_bys) = self
.get_values_orderbys(&batch.slice(partition_range.start, length))?;
let mut window_frame_ctx = WindowFrameContext::new(&window_frame);
// We iterate on each row to calculate window frame range and and window function result
for idx in 0..length {
let range = window_frame_ctx.calculate_range(
&slice_order_bys,
&order_bys,
&sort_options,
num_rows,
idx,
)?;
let range = Range {
start: partition_range.start + range.0,
end: partition_range.start + range.1,
start: range.0,
end: range.1,
};
let value = evaluator.evaluate_inside_range(range)?;
let value = evaluator.evaluate_inside_range(&values, range)?;
row_wise_results.push(value.to_array());
}
}
Expand All @@ -138,7 +130,8 @@ impl WindowExpr for BuiltInWindowExpr {
self.evaluate_partition_points(num_rows, &columns)?;
evaluator.evaluate_with_rank(partition_points, sort_partition_points)?
} else {
evaluator.evaluate(partition_points)?
let (values, _) = self.get_values_orderbys(batch)?;
evaluator.evaluate(&values, partition_points)?
};
let results = results.iter().map(|i| i.as_ref()).collect::<Vec<_>>();
concat(&results).map_err(DataFusionError::ArrowError)
Expand Down
Expand Up @@ -17,6 +17,7 @@

use super::partition_evaluator::PartitionEvaluator;
use crate::PhysicalExpr;
use arrow::array::ArrayRef;
use arrow::datatypes::Field;
use arrow::record_batch::RecordBatch;
use datafusion_common::Result;
Expand Down Expand Up @@ -45,9 +46,16 @@ pub trait BuiltInWindowFunctionExpr: Send + Sync + std::fmt::Debug {
"BuiltInWindowFunctionExpr: default name"
}

/// Evaluate window function arguments against the batch and return
/// an array ref. Typically, the resulting vector is a single element vector.
fn evaluate_args(&self, batch: &RecordBatch) -> Result<Vec<ArrayRef>> {
self.expressions()
.iter()
.map(|e| e.evaluate(batch))
.map(|r| r.map(|v| v.into_array(batch.num_rows())))
.collect()
}

/// Create built-in window evaluator with a batch
fn create_evaluator(
&self,
batch: &RecordBatch,
) -> Result<Box<dyn PartitionEvaluator>>;
fn create_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>>;
}
36 changes: 7 additions & 29 deletions datafusion/physical-expr/src/window/cume_dist.rs
Expand Up @@ -24,7 +24,6 @@ use crate::PhysicalExpr;
use arrow::array::ArrayRef;
use arrow::array::Float64Array;
use arrow::datatypes::{DataType, Field};
use arrow::record_batch::RecordBatch;
use datafusion_common::Result;
use std::any::Any;
use std::iter;
Expand Down Expand Up @@ -62,10 +61,7 @@ impl BuiltInWindowFunctionExpr for CumeDist {
&self.name
}

fn create_evaluator(
&self,
_batch: &RecordBatch,
) -> Result<Box<dyn PartitionEvaluator>> {
fn create_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> {
Ok(Box::new(CumeDistEvaluator {}))
}
}
Expand All @@ -77,12 +73,6 @@ impl PartitionEvaluator for CumeDistEvaluator {
true
}

fn evaluate_partition(&self, _partition: Range<usize>) -> Result<ArrayRef> {
unreachable!(
"cume_dist evaluation must be called with evaluate_partition_with_rank"
)
}

fn evaluate_partition_with_rank(
&self,
partition: Range<usize>,
Expand All @@ -108,22 +98,16 @@ impl PartitionEvaluator for CumeDistEvaluator {
#[cfg(test)]
mod tests {
use super::*;
use arrow::{array::*, datatypes::*};
use datafusion_common::cast::as_float64_array;

fn test_i32_result(
expr: &CumeDist,
data: Vec<i32>,
partition: Range<usize>,
ranks: Vec<Range<usize>>,
expected: Vec<f64>,
) -> Result<()> {
let arr: ArrayRef = Arc::new(Int32Array::from(data));
let values = vec![arr];
let schema = Schema::new(vec![Field::new("arr", DataType::Int32, false)]);
let batch = RecordBatch::try_new(Arc::new(schema), values.clone())?;
let result = expr
.create_evaluator(&batch)?
.create_evaluator()?
.evaluate_with_rank(vec![partition], ranks)?;
assert_eq!(1, result.len());
let result = as_float64_array(&result[0])?;
Expand All @@ -137,25 +121,19 @@ mod tests {
let r = cume_dist("arr".into());

let expected = vec![0.0; 0];
test_i32_result(&r, vec![], 0..0, vec![], expected)?;
test_i32_result(&r, 0..0, vec![], expected)?;

let expected = vec![1.0; 1];
test_i32_result(&r, vec![20; 1], 0..1, vec![0..1], expected)?;
test_i32_result(&r, 0..1, vec![0..1], expected)?;

let expected = vec![1.0; 2];
test_i32_result(&r, vec![20; 2], 0..2, vec![0..2], expected)?;
test_i32_result(&r, 0..2, vec![0..2], expected)?;

let expected = vec![0.5, 0.5, 1.0, 1.0];
test_i32_result(&r, vec![1, 1, 2, 2], 0..4, vec![0..2, 2..4], expected)?;
test_i32_result(&r, 0..4, vec![0..2, 2..4], expected)?;

let expected = vec![0.25, 0.5, 0.75, 1.0];
test_i32_result(
&r,
vec![1, 2, 4, 5],
0..4,
vec![0..1, 1..2, 2..3, 3..4],
expected,
)?;
test_i32_result(&r, 0..4, vec![0..1, 1..2, 2..3, 3..4], expected)?;

Ok(())
}
Expand Down
26 changes: 10 additions & 16 deletions datafusion/physical-expr/src/window/lead_lag.rs
Expand Up @@ -24,7 +24,6 @@ use crate::PhysicalExpr;
use arrow::array::ArrayRef;
use arrow::compute::cast;
use arrow::datatypes::{DataType, Field};
use arrow::record_batch::RecordBatch;
use datafusion_common::ScalarValue;
use datafusion_common::{DataFusionError, Result};
use std::any::Any;
Expand Down Expand Up @@ -95,27 +94,16 @@ impl BuiltInWindowFunctionExpr for WindowShift {
&self.name
}

fn create_evaluator(
&self,
batch: &RecordBatch,
) -> Result<Box<dyn PartitionEvaluator>> {
let values = self
.expressions()
.iter()
.map(|e| e.evaluate(batch))
.map(|r| r.map(|v| v.into_array(batch.num_rows())))
.collect::<Result<Vec<_>>>()?;
fn create_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> {
Ok(Box::new(WindowShiftEvaluator {
shift_offset: self.shift_offset,
values,
default_value: self.default_value.clone(),
}))
}
}

pub(crate) struct WindowShiftEvaluator {
shift_offset: i64,
values: Vec<ArrayRef>,
default_value: Option<ScalarValue>,
}

Expand Down Expand Up @@ -169,8 +157,13 @@ fn shift_with_default_value(
}

impl PartitionEvaluator for WindowShiftEvaluator {
fn evaluate_partition(&self, partition: Range<usize>) -> Result<ArrayRef> {
let value = &self.values[0];
fn evaluate_partition(
&self,
values: &[ArrayRef],
partition: Range<usize>,
) -> Result<ArrayRef> {
// LEAD, LAG window functions take single column, values will have size 1
let value = &values[0];
let value = value.slice(partition.start, partition.end - partition.start);
shift_with_default_value(&value, self.shift_offset, &self.default_value)
}
Expand All @@ -190,7 +183,8 @@ mod tests {
let values = vec![arr];
let schema = Schema::new(vec![Field::new("arr", DataType::Int32, false)]);
let batch = RecordBatch::try_new(Arc::new(schema), values.clone())?;
let result = expr.create_evaluator(&batch)?.evaluate(vec![0..8])?;
let values = expr.evaluate_args(&batch)?;
let result = expr.create_evaluator()?.evaluate(&values, vec![0..8])?;
assert_eq!(1, result.len());
let result = as_int32_array(&result[0])?;
assert_eq!(expected, *result);
Expand Down
36 changes: 12 additions & 24 deletions datafusion/physical-expr/src/window/nth_value.rs
Expand Up @@ -23,7 +23,6 @@ use crate::window::BuiltInWindowFunctionExpr;
use crate::PhysicalExpr;
use arrow::array::{Array, ArrayRef};
use arrow::datatypes::{DataType, Field};
use arrow::record_batch::RecordBatch;
use datafusion_common::ScalarValue;
use datafusion_common::{DataFusionError, Result};
use std::any::Any;
Expand Down Expand Up @@ -116,40 +115,28 @@ impl BuiltInWindowFunctionExpr for NthValue {
&self.name
}

fn create_evaluator(
&self,
batch: &RecordBatch,
) -> Result<Box<dyn PartitionEvaluator>> {
let values = self
.expressions()
.iter()
.map(|e| e.evaluate(batch))
.map(|r| r.map(|v| v.into_array(batch.num_rows())))
.collect::<Result<Vec<_>>>()?;
Ok(Box::new(NthValueEvaluator {
kind: self.kind,
values,
}))
fn create_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> {
Ok(Box::new(NthValueEvaluator { kind: self.kind }))
}
}

/// Value evaluator for nth_value functions
pub(crate) struct NthValueEvaluator {
kind: NthValueKind,
values: Vec<ArrayRef>,
}

impl PartitionEvaluator for NthValueEvaluator {
fn uses_window_frame(&self) -> bool {
true
}

fn evaluate_partition(&self, _partition: Range<usize>) -> Result<ArrayRef> {
unreachable!("first, last, and nth_value evaluation must be called with evaluate_partition_with_rank")
}

fn evaluate_inside_range(&self, range: Range<usize>) -> Result<ScalarValue> {
let arr = &self.values[0];
fn evaluate_inside_range(
&self,
values: &[ArrayRef],
range: Range<usize>,
) -> Result<ScalarValue> {
// FIRST_VALUE, LAST_VALUE, NTH_VALUE window functions take single column, values will have size 1
let arr = &values[0];
let n_range = range.end - range.start;
match self.kind {
NthValueKind::First => ScalarValue::try_from_array(arr, range.start),
Expand Down Expand Up @@ -188,10 +175,11 @@ mod tests {
end: i + 1,
})
}
let evaluator = expr.create_evaluator(&batch)?;
let evaluator = expr.create_evaluator()?;
let values = expr.evaluate_args(&batch)?;
let result = ranges
.into_iter()
.map(|range| evaluator.evaluate_inside_range(range))
.map(|range| evaluator.evaluate_inside_range(&values, range))
.into_iter()
.collect::<Result<Vec<ScalarValue>>>()?;
let result = ScalarValue::iter_to_array(result.into_iter())?;
Expand Down