diff --git a/datafusion/physical-expr/src/window/aggregate.rs b/datafusion/physical-expr/src/window/aggregate.rs index 69b0812c1aa6..17a19b6d72a6 100644 --- a/datafusion/physical-expr/src/window/aggregate.rs +++ b/datafusion/physical-expr/src/window/aggregate.rs @@ -87,15 +87,9 @@ impl WindowExpr for AggregateWindowExpr { let partition_columns = self.partition_columns(batch)?; let partition_points = self.evaluate_partition_points(batch.num_rows(), &partition_columns)?; - let values = self.evaluate_args(batch)?; - let sort_options: Vec = self.order_by.iter().map(|o| o.options).collect(); - let columns = self.sort_columns(batch)?; - let order_columns: Vec<&ArrayRef> = columns.iter().map(|s| &s.values).collect(); - // Sort values, this will make the same partitions consecutive. Also, within the partition - // range, values will be sorted. - let order_bys = &order_columns[self.partition_by.len()..]; + let (_, order_bys) = self.get_values_orderbys(batch)?; let window_frame = if !order_bys.is_empty() && self.window_frame.is_none() { // OVER (ORDER BY a) case // We create an implicit window for ORDER BY. @@ -107,14 +101,8 @@ impl WindowExpr for AggregateWindowExpr { for partition_range in &partition_points { let mut accumulator = self.aggregate.create_accumulator()?; let length = partition_range.end - partition_range.start; - let slice_order_bys = order_bys - .iter() - .map(|v| v.slice(partition_range.start, length)) - .collect::>(); - let value_slice = values - .iter() - .map(|v| v.slice(partition_range.start, length)) - .collect::>(); + let (values, order_bys) = + self.get_values_orderbys(&batch.slice(partition_range.start, length))?; let mut window_frame_ctx = WindowFrameContext::new(&window_frame); let mut last_range: (usize, usize) = (0, 0); @@ -123,7 +111,7 @@ impl WindowExpr for AggregateWindowExpr { // First, cur_range is calculated, then it is compared with last_range. for i in 0..length { let cur_range = window_frame_ctx.calculate_range( - &slice_order_bys, + &order_bys, &sort_options, length, i, @@ -135,7 +123,7 @@ impl WindowExpr for AggregateWindowExpr { // Accumulate any new rows that have entered the window: let update_bound = cur_range.1 - last_range.1; if update_bound > 0 { - let update: Vec = value_slice + let update: Vec = values .iter() .map(|v| v.slice(last_range.1, update_bound)) .collect(); @@ -144,7 +132,7 @@ impl WindowExpr for AggregateWindowExpr { // Remove rows that have now left the window: let retract_bound = cur_range.0 - last_range.0; if retract_bound > 0 { - let retract: Vec = value_slice + let retract: Vec = values .iter() .map(|v| v.slice(last_range.0, retract_bound)) .collect(); diff --git a/datafusion/physical-expr/src/window/built_in.rs b/datafusion/physical-expr/src/window/built_in.rs index da551d5347d4..adf8d5b34677 100644 --- a/datafusion/physical-expr/src/window/built_in.rs +++ b/datafusion/physical-expr/src/window/built_in.rs @@ -21,7 +21,6 @@ use super::window_frame_state::WindowFrameContext; use super::BuiltInWindowFunctionExpr; use super::WindowExpr; use crate::{expressions::PhysicalSortExpr, PhysicalExpr}; -use arrow::array::Array; use arrow::compute::{concat, SortOptions}; use arrow::record_batch::RecordBatch; use arrow::{array::ArrayRef, datatypes::Field}; @@ -85,7 +84,7 @@ impl WindowExpr for BuiltInWindowExpr { } fn evaluate(&self, batch: &RecordBatch) -> Result { - let evaluator = self.expr.create_evaluator(batch)?; + let evaluator = self.expr.create_evaluator()?; let num_rows = batch.num_rows(); let partition_columns = self.partition_columns(batch)?; let partition_points = @@ -94,12 +93,7 @@ impl WindowExpr for BuiltInWindowExpr { let results = if evaluator.uses_window_frame() { let sort_options: Vec = self.order_by.iter().map(|o| o.options).collect(); - let columns = self.sort_columns(batch)?; - let order_columns: Vec<&ArrayRef> = - columns.iter().map(|s| &s.values).collect(); - // Sort values, this will make the same partitions consecutive. Also, within the partition - // range, values will be sorted. - let order_bys = &order_columns[self.partition_by.len()..]; + let (_, order_bys) = self.get_values_orderbys(batch)?; let window_frame = if !order_bys.is_empty() && self.window_frame.is_none() { // OVER (ORDER BY a) case // We create an implicit window for ORDER BY. @@ -110,24 +104,22 @@ impl WindowExpr for BuiltInWindowExpr { let mut row_wise_results = vec![]; for partition_range in &partition_points { let length = partition_range.end - partition_range.start; - let slice_order_bys = order_bys - .iter() - .map(|v| v.slice(partition_range.start, length)) - .collect::>(); + let (values, order_bys) = self + .get_values_orderbys(&batch.slice(partition_range.start, length))?; let mut window_frame_ctx = WindowFrameContext::new(&window_frame); // We iterate on each row to calculate window frame range and and window function result for idx in 0..length { let range = window_frame_ctx.calculate_range( - &slice_order_bys, + &order_bys, &sort_options, num_rows, idx, )?; let range = Range { - start: partition_range.start + range.0, - end: partition_range.start + range.1, + start: range.0, + end: range.1, }; - let value = evaluator.evaluate_inside_range(range)?; + let value = evaluator.evaluate_inside_range(&values, range)?; row_wise_results.push(value.to_array()); } } @@ -138,7 +130,8 @@ impl WindowExpr for BuiltInWindowExpr { self.evaluate_partition_points(num_rows, &columns)?; evaluator.evaluate_with_rank(partition_points, sort_partition_points)? } else { - evaluator.evaluate(partition_points)? + let (values, _) = self.get_values_orderbys(batch)?; + evaluator.evaluate(&values, partition_points)? }; let results = results.iter().map(|i| i.as_ref()).collect::>(); concat(&results).map_err(DataFusionError::ArrowError) diff --git a/datafusion/physical-expr/src/window/built_in_window_function_expr.rs b/datafusion/physical-expr/src/window/built_in_window_function_expr.rs index 43e1272bce18..7f7a27435c39 100644 --- a/datafusion/physical-expr/src/window/built_in_window_function_expr.rs +++ b/datafusion/physical-expr/src/window/built_in_window_function_expr.rs @@ -17,6 +17,7 @@ use super::partition_evaluator::PartitionEvaluator; use crate::PhysicalExpr; +use arrow::array::ArrayRef; use arrow::datatypes::Field; use arrow::record_batch::RecordBatch; use datafusion_common::Result; @@ -45,9 +46,16 @@ pub trait BuiltInWindowFunctionExpr: Send + Sync + std::fmt::Debug { "BuiltInWindowFunctionExpr: default name" } + /// Evaluate window function arguments against the batch and return + /// an array ref. Typically, the resulting vector is a single element vector. + fn evaluate_args(&self, batch: &RecordBatch) -> Result> { + self.expressions() + .iter() + .map(|e| e.evaluate(batch)) + .map(|r| r.map(|v| v.into_array(batch.num_rows()))) + .collect() + } + /// Create built-in window evaluator with a batch - fn create_evaluator( - &self, - batch: &RecordBatch, - ) -> Result>; + fn create_evaluator(&self) -> Result>; } diff --git a/datafusion/physical-expr/src/window/cume_dist.rs b/datafusion/physical-expr/src/window/cume_dist.rs index 92278881696e..4202058a3c5a 100644 --- a/datafusion/physical-expr/src/window/cume_dist.rs +++ b/datafusion/physical-expr/src/window/cume_dist.rs @@ -24,7 +24,6 @@ use crate::PhysicalExpr; use arrow::array::ArrayRef; use arrow::array::Float64Array; use arrow::datatypes::{DataType, Field}; -use arrow::record_batch::RecordBatch; use datafusion_common::Result; use std::any::Any; use std::iter; @@ -62,10 +61,7 @@ impl BuiltInWindowFunctionExpr for CumeDist { &self.name } - fn create_evaluator( - &self, - _batch: &RecordBatch, - ) -> Result> { + fn create_evaluator(&self) -> Result> { Ok(Box::new(CumeDistEvaluator {})) } } @@ -77,12 +73,6 @@ impl PartitionEvaluator for CumeDistEvaluator { true } - fn evaluate_partition(&self, _partition: Range) -> Result { - unreachable!( - "cume_dist evaluation must be called with evaluate_partition_with_rank" - ) - } - fn evaluate_partition_with_rank( &self, partition: Range, @@ -108,22 +98,16 @@ impl PartitionEvaluator for CumeDistEvaluator { #[cfg(test)] mod tests { use super::*; - use arrow::{array::*, datatypes::*}; use datafusion_common::cast::as_float64_array; fn test_i32_result( expr: &CumeDist, - data: Vec, partition: Range, ranks: Vec>, expected: Vec, ) -> Result<()> { - let arr: ArrayRef = Arc::new(Int32Array::from(data)); - let values = vec![arr]; - let schema = Schema::new(vec![Field::new("arr", DataType::Int32, false)]); - let batch = RecordBatch::try_new(Arc::new(schema), values.clone())?; let result = expr - .create_evaluator(&batch)? + .create_evaluator()? .evaluate_with_rank(vec![partition], ranks)?; assert_eq!(1, result.len()); let result = as_float64_array(&result[0])?; @@ -137,25 +121,19 @@ mod tests { let r = cume_dist("arr".into()); let expected = vec![0.0; 0]; - test_i32_result(&r, vec![], 0..0, vec![], expected)?; + test_i32_result(&r, 0..0, vec![], expected)?; let expected = vec![1.0; 1]; - test_i32_result(&r, vec![20; 1], 0..1, vec![0..1], expected)?; + test_i32_result(&r, 0..1, vec![0..1], expected)?; let expected = vec![1.0; 2]; - test_i32_result(&r, vec![20; 2], 0..2, vec![0..2], expected)?; + test_i32_result(&r, 0..2, vec![0..2], expected)?; let expected = vec![0.5, 0.5, 1.0, 1.0]; - test_i32_result(&r, vec![1, 1, 2, 2], 0..4, vec![0..2, 2..4], expected)?; + test_i32_result(&r, 0..4, vec![0..2, 2..4], expected)?; let expected = vec![0.25, 0.5, 0.75, 1.0]; - test_i32_result( - &r, - vec![1, 2, 4, 5], - 0..4, - vec![0..1, 1..2, 2..3, 3..4], - expected, - )?; + test_i32_result(&r, 0..4, vec![0..1, 1..2, 2..3, 3..4], expected)?; Ok(()) } diff --git a/datafusion/physical-expr/src/window/lead_lag.rs b/datafusion/physical-expr/src/window/lead_lag.rs index c50df3c1c9b3..7a5d2be525b7 100644 --- a/datafusion/physical-expr/src/window/lead_lag.rs +++ b/datafusion/physical-expr/src/window/lead_lag.rs @@ -24,7 +24,6 @@ use crate::PhysicalExpr; use arrow::array::ArrayRef; use arrow::compute::cast; use arrow::datatypes::{DataType, Field}; -use arrow::record_batch::RecordBatch; use datafusion_common::ScalarValue; use datafusion_common::{DataFusionError, Result}; use std::any::Any; @@ -95,19 +94,9 @@ impl BuiltInWindowFunctionExpr for WindowShift { &self.name } - fn create_evaluator( - &self, - batch: &RecordBatch, - ) -> Result> { - let values = self - .expressions() - .iter() - .map(|e| e.evaluate(batch)) - .map(|r| r.map(|v| v.into_array(batch.num_rows()))) - .collect::>>()?; + fn create_evaluator(&self) -> Result> { Ok(Box::new(WindowShiftEvaluator { shift_offset: self.shift_offset, - values, default_value: self.default_value.clone(), })) } @@ -115,7 +104,6 @@ impl BuiltInWindowFunctionExpr for WindowShift { pub(crate) struct WindowShiftEvaluator { shift_offset: i64, - values: Vec, default_value: Option, } @@ -169,8 +157,13 @@ fn shift_with_default_value( } impl PartitionEvaluator for WindowShiftEvaluator { - fn evaluate_partition(&self, partition: Range) -> Result { - let value = &self.values[0]; + fn evaluate_partition( + &self, + values: &[ArrayRef], + partition: Range, + ) -> Result { + // LEAD, LAG window functions take single column, values will have size 1 + let value = &values[0]; let value = value.slice(partition.start, partition.end - partition.start); shift_with_default_value(&value, self.shift_offset, &self.default_value) } @@ -190,7 +183,8 @@ mod tests { let values = vec![arr]; let schema = Schema::new(vec![Field::new("arr", DataType::Int32, false)]); let batch = RecordBatch::try_new(Arc::new(schema), values.clone())?; - let result = expr.create_evaluator(&batch)?.evaluate(vec![0..8])?; + let values = expr.evaluate_args(&batch)?; + let result = expr.create_evaluator()?.evaluate(&values, vec![0..8])?; assert_eq!(1, result.len()); let result = as_int32_array(&result[0])?; assert_eq!(expected, *result); diff --git a/datafusion/physical-expr/src/window/nth_value.rs b/datafusion/physical-expr/src/window/nth_value.rs index e9988032cfee..e0afb520e427 100644 --- a/datafusion/physical-expr/src/window/nth_value.rs +++ b/datafusion/physical-expr/src/window/nth_value.rs @@ -23,7 +23,6 @@ use crate::window::BuiltInWindowFunctionExpr; use crate::PhysicalExpr; use arrow::array::{Array, ArrayRef}; use arrow::datatypes::{DataType, Field}; -use arrow::record_batch::RecordBatch; use datafusion_common::ScalarValue; use datafusion_common::{DataFusionError, Result}; use std::any::Any; @@ -116,27 +115,14 @@ impl BuiltInWindowFunctionExpr for NthValue { &self.name } - fn create_evaluator( - &self, - batch: &RecordBatch, - ) -> Result> { - let values = self - .expressions() - .iter() - .map(|e| e.evaluate(batch)) - .map(|r| r.map(|v| v.into_array(batch.num_rows()))) - .collect::>>()?; - Ok(Box::new(NthValueEvaluator { - kind: self.kind, - values, - })) + fn create_evaluator(&self) -> Result> { + Ok(Box::new(NthValueEvaluator { kind: self.kind })) } } /// Value evaluator for nth_value functions pub(crate) struct NthValueEvaluator { kind: NthValueKind, - values: Vec, } impl PartitionEvaluator for NthValueEvaluator { @@ -144,12 +130,13 @@ impl PartitionEvaluator for NthValueEvaluator { true } - fn evaluate_partition(&self, _partition: Range) -> Result { - unreachable!("first, last, and nth_value evaluation must be called with evaluate_partition_with_rank") - } - - fn evaluate_inside_range(&self, range: Range) -> Result { - let arr = &self.values[0]; + fn evaluate_inside_range( + &self, + values: &[ArrayRef], + range: Range, + ) -> Result { + // FIRST_VALUE, LAST_VALUE, NTH_VALUE window functions take single column, values will have size 1 + let arr = &values[0]; let n_range = range.end - range.start; match self.kind { NthValueKind::First => ScalarValue::try_from_array(arr, range.start), @@ -188,10 +175,11 @@ mod tests { end: i + 1, }) } - let evaluator = expr.create_evaluator(&batch)?; + let evaluator = expr.create_evaluator()?; + let values = expr.evaluate_args(&batch)?; let result = ranges .into_iter() - .map(|range| evaluator.evaluate_inside_range(range)) + .map(|range| evaluator.evaluate_inside_range(&values, range)) .into_iter() .collect::>>()?; let result = ScalarValue::iter_to_array(result.into_iter())?; diff --git a/datafusion/physical-expr/src/window/partition_evaluator.rs b/datafusion/physical-expr/src/window/partition_evaluator.rs index 4ecfd87a9df0..1608758d61b3 100644 --- a/datafusion/physical-expr/src/window/partition_evaluator.rs +++ b/datafusion/physical-expr/src/window/partition_evaluator.rs @@ -51,10 +51,14 @@ pub trait PartitionEvaluator { } /// evaluate the partition evaluator against the partitions - fn evaluate(&self, partition_points: Vec>) -> Result> { + fn evaluate( + &self, + values: &[ArrayRef], + partition_points: Vec>, + ) -> Result> { partition_points .into_iter() - .map(|partition| self.evaluate_partition(partition)) + .map(|partition| self.evaluate_partition(values, partition)) .collect() } @@ -75,7 +79,15 @@ pub trait PartitionEvaluator { } /// evaluate the partition evaluator against the partition - fn evaluate_partition(&self, _partition: Range) -> Result; + fn evaluate_partition( + &self, + _values: &[ArrayRef], + _partition: Range, + ) -> Result { + Err(DataFusionError::NotImplemented( + "evaluate_partition is not implemented by default".into(), + )) + } /// evaluate the partition evaluator against the partition but with rank fn evaluate_partition_with_rank( @@ -89,7 +101,11 @@ pub trait PartitionEvaluator { } /// evaluate window function result inside given range - fn evaluate_inside_range(&self, _range: Range) -> Result { + fn evaluate_inside_range( + &self, + _values: &[ArrayRef], + _range: Range, + ) -> Result { Err(DataFusionError::NotImplemented( "evaluate_inside_range is not implemented by default".into(), )) diff --git a/datafusion/physical-expr/src/window/rank.rs b/datafusion/physical-expr/src/window/rank.rs index ec9aca532cdd..3447d47b3545 100644 --- a/datafusion/physical-expr/src/window/rank.rs +++ b/datafusion/physical-expr/src/window/rank.rs @@ -24,7 +24,6 @@ use crate::PhysicalExpr; use arrow::array::ArrayRef; use arrow::array::{Float64Array, UInt64Array}; use arrow::datatypes::{DataType, Field}; -use arrow::record_batch::RecordBatch; use datafusion_common::Result; use std::any::Any; use std::iter; @@ -92,10 +91,7 @@ impl BuiltInWindowFunctionExpr for Rank { &self.name } - fn create_evaluator( - &self, - _batch: &RecordBatch, - ) -> Result> { + fn create_evaluator(&self) -> Result> { Ok(Box::new(RankEvaluator { rank_type: self.rank_type, })) @@ -111,10 +107,6 @@ impl PartitionEvaluator for RankEvaluator { true } - fn evaluate_partition(&self, _partition: Range) -> Result { - unreachable!("rank evaluation must be called with evaluate_partition_with_rank") - } - fn evaluate_partition_with_rank( &self, partition: Range, @@ -166,35 +158,24 @@ impl PartitionEvaluator for RankEvaluator { #[cfg(test)] mod tests { use super::*; - use arrow::{array::*, datatypes::*}; use datafusion_common::cast::{as_float64_array, as_uint64_array}; fn test_with_rank(expr: &Rank, expected: Vec) -> Result<()> { - test_i32_result( - expr, - vec![-2, -2, 1, 3, 3, 3, 7, 8], - vec![0..2, 2..3, 3..6, 6..7, 7..8], - expected, - ) + test_i32_result(expr, vec![0..2, 2..3, 3..6, 6..7, 7..8], expected) } fn test_without_rank(expr: &Rank, expected: Vec) -> Result<()> { - test_i32_result(expr, vec![-2, -2, 1, 3, 3, 3, 7, 8], vec![0..8], expected) + test_i32_result(expr, vec![0..8], expected) } fn test_f64_result( expr: &Rank, - data: Vec, range: Range, ranks: Vec>, expected: Vec, ) -> Result<()> { - let arr: ArrayRef = Arc::new(Int32Array::from(data)); - let values = vec![arr]; - let schema = Schema::new(vec![Field::new("arr", DataType::Int32, false)]); - let batch = RecordBatch::try_new(Arc::new(schema), values.clone())?; let result = expr - .create_evaluator(&batch)? + .create_evaluator()? .evaluate_with_rank(vec![range], ranks)?; assert_eq!(1, result.len()); let result = as_float64_array(&result[0])?; @@ -205,16 +186,11 @@ mod tests { fn test_i32_result( expr: &Rank, - data: Vec, ranks: Vec>, expected: Vec, ) -> Result<()> { - let arr: ArrayRef = Arc::new(Int32Array::from(data)); - let values = vec![arr]; - let schema = Schema::new(vec![Field::new("arr", DataType::Int32, false)]); - let batch = RecordBatch::try_new(Arc::new(schema), values.clone())?; let result = expr - .create_evaluator(&batch)? + .create_evaluator()? .evaluate_with_rank(vec![0..8], ranks)?; assert_eq!(1, result.len()); let result = as_uint64_array(&result[0])?; @@ -245,25 +221,19 @@ mod tests { // empty case let expected = vec![0.0; 0]; - test_f64_result(&r, vec![0; 0], 0..0, vec![0..0; 0], expected)?; + test_f64_result(&r, 0..0, vec![0..0; 0], expected)?; // singleton case let expected = vec![0.0]; - test_f64_result(&r, vec![13], 0..1, vec![0..1], expected)?; + test_f64_result(&r, 0..1, vec![0..1], expected)?; // uniform case let expected = vec![0.0; 7]; - test_f64_result(&r, vec![4; 7], 0..7, vec![0..7], expected)?; + test_f64_result(&r, 0..7, vec![0..7], expected)?; // non-trivial case let expected = vec![0.0, 0.0, 0.0, 0.5, 0.5, 0.5, 0.5]; - test_f64_result( - &r, - vec![1, 1, 1, 2, 2, 2, 2], - 0..7, - vec![0..3, 3..7], - expected, - )?; + test_f64_result(&r, 0..7, vec![0..3, 3..7], expected)?; Ok(()) } diff --git a/datafusion/physical-expr/src/window/row_number.rs b/datafusion/physical-expr/src/window/row_number.rs index 11f4f620dc9b..f70d9ea379dd 100644 --- a/datafusion/physical-expr/src/window/row_number.rs +++ b/datafusion/physical-expr/src/window/row_number.rs @@ -22,7 +22,6 @@ use crate::window::BuiltInWindowFunctionExpr; use crate::PhysicalExpr; use arrow::array::{ArrayRef, UInt64Array}; use arrow::datatypes::{DataType, Field}; -use arrow::record_batch::RecordBatch; use datafusion_common::Result; use std::any::Any; use std::ops::Range; @@ -61,10 +60,7 @@ impl BuiltInWindowFunctionExpr for RowNumber { &self.name } - fn create_evaluator( - &self, - _batch: &RecordBatch, - ) -> Result> { + fn create_evaluator(&self) -> Result> { Ok(Box::::default()) } } @@ -73,7 +69,11 @@ impl BuiltInWindowFunctionExpr for RowNumber { pub(crate) struct NumRowsEvaluator {} impl PartitionEvaluator for NumRowsEvaluator { - fn evaluate_partition(&self, partition: Range) -> Result { + fn evaluate_partition( + &self, + _values: &[ArrayRef], + partition: Range, + ) -> Result { let num_rows = partition.end - partition.start; Ok(Arc::new(UInt64Array::from_iter_values( 1..(num_rows as u64) + 1, @@ -96,7 +96,10 @@ mod tests { let schema = Schema::new(vec![Field::new("arr", DataType::Boolean, true)]); let batch = RecordBatch::try_new(Arc::new(schema), vec![arr])?; let row_number = RowNumber::new("row_number".to_owned()); - let result = row_number.create_evaluator(&batch)?.evaluate(vec![0..8])?; + let values = row_number.evaluate_args(&batch)?; + let result = row_number + .create_evaluator()? + .evaluate(&values, vec![0..8])?; assert_eq!(1, result.len()); let result = as_uint64_array(&result[0])?; let result = result.values(); @@ -112,7 +115,10 @@ mod tests { let schema = Schema::new(vec![Field::new("arr", DataType::Boolean, false)]); let batch = RecordBatch::try_new(Arc::new(schema), vec![arr])?; let row_number = RowNumber::new("row_number".to_owned()); - let result = row_number.create_evaluator(&batch)?.evaluate(vec![0..8])?; + let values = row_number.evaluate_args(&batch)?; + let result = row_number + .create_evaluator()? + .evaluate(&values, vec![0..8])?; assert_eq!(1, result.len()); let result = as_uint64_array(&result[0])?; let result = result.values(); diff --git a/datafusion/physical-expr/src/window/window_expr.rs b/datafusion/physical-expr/src/window/window_expr.rs index 67caba51dcab..fe381935bb76 100644 --- a/datafusion/physical-expr/src/window/window_expr.rs +++ b/datafusion/physical-expr/src/window/window_expr.rs @@ -99,15 +99,32 @@ pub trait WindowExpr: Send + Sync + Debug { .collect() } + /// get order by columns, empty if absent + fn order_by_columns(&self, batch: &RecordBatch) -> Result> { + self.order_by() + .iter() + .map(|e| e.evaluate_to_sort_column(batch)) + .collect::>>() + } + /// get sort columns that can be used for peer evaluation, empty if absent fn sort_columns(&self, batch: &RecordBatch) -> Result> { let mut sort_columns = self.partition_columns(batch)?; - let order_by_columns = self - .order_by() - .iter() - .map(|e| e.evaluate_to_sort_column(batch)) - .collect::>>()?; + let order_by_columns = self.order_by_columns(batch)?; sort_columns.extend(order_by_columns); Ok(sort_columns) } + + /// Get values columns(argument of Window Function) + /// and order by columns (columns of the ORDER BY expression)used in evaluators + fn get_values_orderbys( + &self, + record_batch: &RecordBatch, + ) -> Result<(Vec, Vec)> { + let values = self.evaluate_args(record_batch)?; + let order_by_columns = self.order_by_columns(record_batch)?; + let order_bys: Vec = + order_by_columns.iter().map(|s| s.values.clone()).collect(); + Ok((values, order_bys)) + } }