Skip to content

Commit

Permalink
Refactor Builtin Window Function Implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
mustafasrepo committed Nov 30, 2022
1 parent ad3df7d commit b419f56
Show file tree
Hide file tree
Showing 10 changed files with 122 additions and 164 deletions.
24 changes: 6 additions & 18 deletions datafusion/physical-expr/src/window/aggregate.rs
Expand Up @@ -87,15 +87,9 @@ impl WindowExpr for AggregateWindowExpr {
let partition_columns = self.partition_columns(batch)?;
let partition_points =
self.evaluate_partition_points(batch.num_rows(), &partition_columns)?;
let values = self.evaluate_args(batch)?;

let sort_options: Vec<SortOptions> =
self.order_by.iter().map(|o| o.options).collect();
let columns = self.sort_columns(batch)?;
let order_columns: Vec<&ArrayRef> = columns.iter().map(|s| &s.values).collect();
// Sort values, this will make the same partitions consecutive. Also, within the partition
// range, values will be sorted.
let order_bys = &order_columns[self.partition_by.len()..];
let (_, order_bys) = self.get_values_orderbys(batch)?;
let window_frame = if !order_bys.is_empty() && self.window_frame.is_none() {
// OVER (ORDER BY a) case
// We create an implicit window for ORDER BY.
Expand All @@ -107,14 +101,8 @@ impl WindowExpr for AggregateWindowExpr {
for partition_range in &partition_points {
let mut accumulator = self.aggregate.create_accumulator()?;
let length = partition_range.end - partition_range.start;
let slice_order_bys = order_bys
.iter()
.map(|v| v.slice(partition_range.start, length))
.collect::<Vec<_>>();
let value_slice = values
.iter()
.map(|v| v.slice(partition_range.start, length))
.collect::<Vec<_>>();
let (values, order_bys) =
self.get_values_orderbys(&batch.slice(partition_range.start, length))?;

let mut window_frame_ctx = WindowFrameContext::new(&window_frame);
let mut last_range: (usize, usize) = (0, 0);
Expand All @@ -123,7 +111,7 @@ impl WindowExpr for AggregateWindowExpr {
// First, cur_range is calculated, then it is compared with last_range.
for i in 0..length {
let cur_range = window_frame_ctx.calculate_range(
&slice_order_bys,
&order_bys,
&sort_options,
length,
i,
Expand All @@ -135,7 +123,7 @@ impl WindowExpr for AggregateWindowExpr {
// Accumulate any new rows that have entered the window:
let update_bound = cur_range.1 - last_range.1;
if update_bound > 0 {
let update: Vec<ArrayRef> = value_slice
let update: Vec<ArrayRef> = values
.iter()
.map(|v| v.slice(last_range.1, update_bound))
.collect();
Expand All @@ -144,7 +132,7 @@ impl WindowExpr for AggregateWindowExpr {
// Remove rows that have now left the window:
let retract_bound = cur_range.0 - last_range.0;
if retract_bound > 0 {
let retract: Vec<ArrayRef> = value_slice
let retract: Vec<ArrayRef> = values
.iter()
.map(|v| v.slice(last_range.0, retract_bound))
.collect();
Expand Down
27 changes: 10 additions & 17 deletions datafusion/physical-expr/src/window/built_in.rs
Expand Up @@ -21,7 +21,6 @@ use super::window_frame_state::WindowFrameContext;
use super::BuiltInWindowFunctionExpr;
use super::WindowExpr;
use crate::{expressions::PhysicalSortExpr, PhysicalExpr};
use arrow::array::Array;
use arrow::compute::{concat, SortOptions};
use arrow::record_batch::RecordBatch;
use arrow::{array::ArrayRef, datatypes::Field};
Expand Down Expand Up @@ -85,7 +84,7 @@ impl WindowExpr for BuiltInWindowExpr {
}

fn evaluate(&self, batch: &RecordBatch) -> Result<ArrayRef> {
let evaluator = self.expr.create_evaluator(batch)?;
let evaluator = self.expr.create_evaluator()?;
let num_rows = batch.num_rows();
let partition_columns = self.partition_columns(batch)?;
let partition_points =
Expand All @@ -94,12 +93,7 @@ impl WindowExpr for BuiltInWindowExpr {
let results = if evaluator.uses_window_frame() {
let sort_options: Vec<SortOptions> =
self.order_by.iter().map(|o| o.options).collect();
let columns = self.sort_columns(batch)?;
let order_columns: Vec<&ArrayRef> =
columns.iter().map(|s| &s.values).collect();
// Sort values, this will make the same partitions consecutive. Also, within the partition
// range, values will be sorted.
let order_bys = &order_columns[self.partition_by.len()..];
let (_, order_bys) = self.get_values_orderbys(batch)?;
let window_frame = if !order_bys.is_empty() && self.window_frame.is_none() {
// OVER (ORDER BY a) case
// We create an implicit window for ORDER BY.
Expand All @@ -110,24 +104,22 @@ impl WindowExpr for BuiltInWindowExpr {
let mut row_wise_results = vec![];
for partition_range in &partition_points {
let length = partition_range.end - partition_range.start;
let slice_order_bys = order_bys
.iter()
.map(|v| v.slice(partition_range.start, length))
.collect::<Vec<_>>();
let (values, order_bys) = self
.get_values_orderbys(&batch.slice(partition_range.start, length))?;
let mut window_frame_ctx = WindowFrameContext::new(&window_frame);
// We iterate on each row to calculate window frame range and and window function result
for idx in 0..length {
let range = window_frame_ctx.calculate_range(
&slice_order_bys,
&order_bys,
&sort_options,
num_rows,
idx,
)?;
let range = Range {
start: partition_range.start + range.0,
end: partition_range.start + range.1,
start: range.0,
end: range.1,
};
let value = evaluator.evaluate_inside_range(range)?;
let value = evaluator.evaluate_inside_range(&values, range)?;
row_wise_results.push(value.to_array());
}
}
Expand All @@ -138,7 +130,8 @@ impl WindowExpr for BuiltInWindowExpr {
self.evaluate_partition_points(num_rows, &columns)?;
evaluator.evaluate_with_rank(partition_points, sort_partition_points)?
} else {
evaluator.evaluate(partition_points)?
let (values, _) = self.get_values_orderbys(batch)?;
evaluator.evaluate(&values, partition_points)?
};
let results = results.iter().map(|i| i.as_ref()).collect::<Vec<_>>();
concat(&results).map_err(DataFusionError::ArrowError)
Expand Down
Expand Up @@ -17,6 +17,7 @@

use super::partition_evaluator::PartitionEvaluator;
use crate::PhysicalExpr;
use arrow::array::ArrayRef;
use arrow::datatypes::Field;
use arrow::record_batch::RecordBatch;
use datafusion_common::Result;
Expand Down Expand Up @@ -45,9 +46,16 @@ pub trait BuiltInWindowFunctionExpr: Send + Sync + std::fmt::Debug {
"BuiltInWindowFunctionExpr: default name"
}

/// Evaluate window function arguments against the batch and return
/// an array ref. Typically, the resulting vector is a single element vector.
fn evaluate_args(&self, batch: &RecordBatch) -> Result<Vec<ArrayRef>> {
self.expressions()
.iter()
.map(|e| e.evaluate(batch))
.map(|r| r.map(|v| v.into_array(batch.num_rows())))
.collect()
}

/// Create built-in window evaluator with a batch
fn create_evaluator(
&self,
batch: &RecordBatch,
) -> Result<Box<dyn PartitionEvaluator>>;
fn create_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>>;
}
36 changes: 7 additions & 29 deletions datafusion/physical-expr/src/window/cume_dist.rs
Expand Up @@ -24,7 +24,6 @@ use crate::PhysicalExpr;
use arrow::array::ArrayRef;
use arrow::array::Float64Array;
use arrow::datatypes::{DataType, Field};
use arrow::record_batch::RecordBatch;
use datafusion_common::Result;
use std::any::Any;
use std::iter;
Expand Down Expand Up @@ -62,10 +61,7 @@ impl BuiltInWindowFunctionExpr for CumeDist {
&self.name
}

fn create_evaluator(
&self,
_batch: &RecordBatch,
) -> Result<Box<dyn PartitionEvaluator>> {
fn create_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> {
Ok(Box::new(CumeDistEvaluator {}))
}
}
Expand All @@ -77,12 +73,6 @@ impl PartitionEvaluator for CumeDistEvaluator {
true
}

fn evaluate_partition(&self, _partition: Range<usize>) -> Result<ArrayRef> {
unreachable!(
"cume_dist evaluation must be called with evaluate_partition_with_rank"
)
}

fn evaluate_partition_with_rank(
&self,
partition: Range<usize>,
Expand All @@ -108,22 +98,16 @@ impl PartitionEvaluator for CumeDistEvaluator {
#[cfg(test)]
mod tests {
use super::*;
use arrow::{array::*, datatypes::*};
use datafusion_common::cast::as_float64_array;

fn test_i32_result(
expr: &CumeDist,
data: Vec<i32>,
partition: Range<usize>,
ranks: Vec<Range<usize>>,
expected: Vec<f64>,
) -> Result<()> {
let arr: ArrayRef = Arc::new(Int32Array::from(data));
let values = vec![arr];
let schema = Schema::new(vec![Field::new("arr", DataType::Int32, false)]);
let batch = RecordBatch::try_new(Arc::new(schema), values.clone())?;
let result = expr
.create_evaluator(&batch)?
.create_evaluator()?
.evaluate_with_rank(vec![partition], ranks)?;
assert_eq!(1, result.len());
let result = as_float64_array(&result[0])?;
Expand All @@ -137,25 +121,19 @@ mod tests {
let r = cume_dist("arr".into());

let expected = vec![0.0; 0];
test_i32_result(&r, vec![], 0..0, vec![], expected)?;
test_i32_result(&r, 0..0, vec![], expected)?;

let expected = vec![1.0; 1];
test_i32_result(&r, vec![20; 1], 0..1, vec![0..1], expected)?;
test_i32_result(&r, 0..1, vec![0..1], expected)?;

let expected = vec![1.0; 2];
test_i32_result(&r, vec![20; 2], 0..2, vec![0..2], expected)?;
test_i32_result(&r, 0..2, vec![0..2], expected)?;

let expected = vec![0.5, 0.5, 1.0, 1.0];
test_i32_result(&r, vec![1, 1, 2, 2], 0..4, vec![0..2, 2..4], expected)?;
test_i32_result(&r, 0..4, vec![0..2, 2..4], expected)?;

let expected = vec![0.25, 0.5, 0.75, 1.0];
test_i32_result(
&r,
vec![1, 2, 4, 5],
0..4,
vec![0..1, 1..2, 2..3, 3..4],
expected,
)?;
test_i32_result(&r, 0..4, vec![0..1, 1..2, 2..3, 3..4], expected)?;

Ok(())
}
Expand Down
26 changes: 10 additions & 16 deletions datafusion/physical-expr/src/window/lead_lag.rs
Expand Up @@ -24,7 +24,6 @@ use crate::PhysicalExpr;
use arrow::array::ArrayRef;
use arrow::compute::cast;
use arrow::datatypes::{DataType, Field};
use arrow::record_batch::RecordBatch;
use datafusion_common::ScalarValue;
use datafusion_common::{DataFusionError, Result};
use std::any::Any;
Expand Down Expand Up @@ -95,27 +94,16 @@ impl BuiltInWindowFunctionExpr for WindowShift {
&self.name
}

fn create_evaluator(
&self,
batch: &RecordBatch,
) -> Result<Box<dyn PartitionEvaluator>> {
let values = self
.expressions()
.iter()
.map(|e| e.evaluate(batch))
.map(|r| r.map(|v| v.into_array(batch.num_rows())))
.collect::<Result<Vec<_>>>()?;
fn create_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> {
Ok(Box::new(WindowShiftEvaluator {
shift_offset: self.shift_offset,
values,
default_value: self.default_value.clone(),
}))
}
}

pub(crate) struct WindowShiftEvaluator {
shift_offset: i64,
values: Vec<ArrayRef>,
default_value: Option<ScalarValue>,
}

Expand Down Expand Up @@ -169,8 +157,13 @@ fn shift_with_default_value(
}

impl PartitionEvaluator for WindowShiftEvaluator {
fn evaluate_partition(&self, partition: Range<usize>) -> Result<ArrayRef> {
let value = &self.values[0];
fn evaluate_partition(
&self,
values: &[ArrayRef],
partition: Range<usize>,
) -> Result<ArrayRef> {
// LEAD, LAG window functions take single column, values will have size 1
let value = &values[0];
let value = value.slice(partition.start, partition.end - partition.start);
shift_with_default_value(&value, self.shift_offset, &self.default_value)
}
Expand All @@ -190,7 +183,8 @@ mod tests {
let values = vec![arr];
let schema = Schema::new(vec![Field::new("arr", DataType::Int32, false)]);
let batch = RecordBatch::try_new(Arc::new(schema), values.clone())?;
let result = expr.create_evaluator(&batch)?.evaluate(vec![0..8])?;
let values = expr.evaluate_args(&batch)?;
let result = expr.create_evaluator()?.evaluate(&values, vec![0..8])?;
assert_eq!(1, result.len());
let result = as_int32_array(&result[0])?;
assert_eq!(expected, *result);
Expand Down
36 changes: 12 additions & 24 deletions datafusion/physical-expr/src/window/nth_value.rs
Expand Up @@ -23,7 +23,6 @@ use crate::window::BuiltInWindowFunctionExpr;
use crate::PhysicalExpr;
use arrow::array::{Array, ArrayRef};
use arrow::datatypes::{DataType, Field};
use arrow::record_batch::RecordBatch;
use datafusion_common::ScalarValue;
use datafusion_common::{DataFusionError, Result};
use std::any::Any;
Expand Down Expand Up @@ -116,40 +115,28 @@ impl BuiltInWindowFunctionExpr for NthValue {
&self.name
}

fn create_evaluator(
&self,
batch: &RecordBatch,
) -> Result<Box<dyn PartitionEvaluator>> {
let values = self
.expressions()
.iter()
.map(|e| e.evaluate(batch))
.map(|r| r.map(|v| v.into_array(batch.num_rows())))
.collect::<Result<Vec<_>>>()?;
Ok(Box::new(NthValueEvaluator {
kind: self.kind,
values,
}))
fn create_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> {
Ok(Box::new(NthValueEvaluator { kind: self.kind }))
}
}

/// Value evaluator for nth_value functions
pub(crate) struct NthValueEvaluator {
kind: NthValueKind,
values: Vec<ArrayRef>,
}

impl PartitionEvaluator for NthValueEvaluator {
fn uses_window_frame(&self) -> bool {
true
}

fn evaluate_partition(&self, _partition: Range<usize>) -> Result<ArrayRef> {
unreachable!("first, last, and nth_value evaluation must be called with evaluate_partition_with_rank")
}

fn evaluate_inside_range(&self, range: Range<usize>) -> Result<ScalarValue> {
let arr = &self.values[0];
fn evaluate_inside_range(
&self,
values: &[ArrayRef],
range: Range<usize>,
) -> Result<ScalarValue> {
// FIRST_VALUE, LAST_VALUE, NTH_VALUE window functions take single column, values will have size 1
let arr = &values[0];
let n_range = range.end - range.start;
match self.kind {
NthValueKind::First => ScalarValue::try_from_array(arr, range.start),
Expand Down Expand Up @@ -188,10 +175,11 @@ mod tests {
end: i + 1,
})
}
let evaluator = expr.create_evaluator(&batch)?;
let evaluator = expr.create_evaluator()?;
let values = expr.evaluate_args(&batch)?;
let result = ranges
.into_iter()
.map(|range| evaluator.evaluate_inside_range(range))
.map(|range| evaluator.evaluate_inside_range(&values, range))
.into_iter()
.collect::<Result<Vec<ScalarValue>>>()?;
let result = ScalarValue::iter_to_array(result.into_iter())?;
Expand Down

0 comments on commit b419f56

Please sign in to comment.