From 6f83571d35d61a761307fc14ed0905041b47c36e Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Fri, 1 Apr 2022 12:15:41 +0800 Subject: [PATCH 01/15] wip --- .../core/src/physical_plan/sorts/mod.rs | 1 + .../core/src/physical_plan/sorts/sort2.rs | 1077 +++++++++++++++++ 2 files changed, 1078 insertions(+) create mode 100644 datafusion/core/src/physical_plan/sorts/sort2.rs diff --git a/datafusion/core/src/physical_plan/sorts/mod.rs b/datafusion/core/src/physical_plan/sorts/mod.rs index 818546f316fc..bfd6b62ce1fa 100644 --- a/datafusion/core/src/physical_plan/sorts/mod.rs +++ b/datafusion/core/src/physical_plan/sorts/mod.rs @@ -37,6 +37,7 @@ use std::sync::Arc; use std::task::{Context, Poll}; pub mod sort; +pub mod sort2; pub mod sort_preserving_merge; /// A `SortKeyCursor` is created from a `RecordBatch`, and a set of diff --git a/datafusion/core/src/physical_plan/sorts/sort2.rs b/datafusion/core/src/physical_plan/sorts/sort2.rs new file mode 100644 index 000000000000..7872820aad0c --- /dev/null +++ b/datafusion/core/src/physical_plan/sorts/sort2.rs @@ -0,0 +1,1077 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Sort that deals with an arbitrary size of the input. +//! It will do in-memory sorting if it has enough memory budget +//! but spills to disk if needed. + +use crate::error::{DataFusionError, Result}; +use crate::execution::context::TaskContext; +use crate::execution::memory_manager::{ + human_readable_size, ConsumerType, MemoryConsumer, MemoryConsumerId, MemoryManager, +}; +use crate::execution::runtime_env::RuntimeEnv; +use crate::physical_plan::common::{batch_byte_size, IPCWriter, SizedRecordBatchStream}; +use crate::physical_plan::expressions::PhysicalSortExpr; +use crate::physical_plan::metrics::{ + BaselineMetrics, CompositeMetricsSet, MemTrackingMetrics, MetricsSet, +}; +use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeStream; +use crate::physical_plan::sorts::SortedStream; +use crate::physical_plan::stream::RecordBatchReceiverStream; +use crate::physical_plan::{ + common, DisplayFormatType, Distribution, EmptyRecordBatchStream, ExecutionPlan, + Partitioning, SendableRecordBatchStream, Statistics, +}; +use crate::prelude::SessionConfig; +use arrow::array::{Array, ArrayRef, UInt32Array}; +pub use arrow::compute::SortOptions; +use arrow::compute::{concat, lexsort_to_indices, take, SortColumn, TakeOptions}; +use arrow::datatypes::SchemaRef; +use arrow::error::Result as ArrowResult; +use arrow::ipc::reader::FileReader; +use arrow::record_batch::RecordBatch; +use async_trait::async_trait; +use futures::lock::Mutex; +use futures::StreamExt; +use log::{debug, error}; +use std::any::Any; +use std::cmp::min; +use std::fmt; +use std::fmt::{Debug, Formatter}; +use std::fs::File; +use std::io::BufReader; +use std::path::{Path, PathBuf}; +use std::sync::Arc; +use tempfile::NamedTempFile; +use tokio::sync::mpsc::{Receiver, Sender}; +use tokio::task; + +/// Sort arbitrary size of data to get a total order (may spill several times during sorting based on free memory available). +/// +/// The basic architecture of the algorithm: +/// 1. get a non-empty new batch from input +/// 2. check with the memory manager if we could buffer the batch in memory +/// 2.1 if memory sufficient, then buffer batch in memory, go to 1. +/// 2.2 if the memory threshold is reached, sort all buffered batches and spill to file. +/// buffer the batch in memory, go to 1. +/// 3. when input is exhausted, merge all in memory batches and spills to get a total order. +struct ExternalSorter2 { + id: MemoryConsumerId, + schema: SchemaRef, + in_mem_batches: Mutex>, + spills: Mutex>, + /// Sort expressions + expr: Vec, + session_config: Arc, + runtime: Arc, + metrics_set: CompositeMetricsSet, + metrics: BaselineMetrics, +} + +impl ExternalSorter2 { + pub fn new( + partition_id: usize, + schema: SchemaRef, + expr: Vec, + metrics_set: CompositeMetricsSet, + session_config: Arc, + runtime: Arc, + ) -> Self { + let metrics = metrics_set.new_intermediate_baseline(partition_id); + Self { + id: MemoryConsumerId::new(partition_id), + schema, + in_mem_batches: Mutex::new(vec![]), + spills: Mutex::new(vec![]), + expr, + session_config, + runtime, + metrics_set, + metrics, + } + } + + async fn insert_batch( + &self, + input: RecordBatch, + tracking_metrics: &MemTrackingMetrics, + ) -> Result<()> { + if input.num_rows() > 0 { + let size = batch_byte_size(&input); + self.try_grow(size).await?; + self.metrics.mem_used().add(size); + let mut in_mem_batches = self.in_mem_batches.lock().await; + // NB timer records time taken on drop, so there are no + // calls to `timer.done()` below. + let _timer = tracking_metrics.elapsed_compute().timer(); + let partial = sort_batch(input, self.schema.clone(), &self.expr)?; + in_mem_batches.push(partial); + } + Ok(()) + } + + async fn spilled_before(&self) -> bool { + let spills = self.spills.lock().await; + !spills.is_empty() + } + + /// MergeSort in mem batches as well as spills into total order with `SortPreservingMergeStream`. + async fn sort(&self) -> Result { + let partition = self.partition_id(); + let mut in_mem_batches = self.in_mem_batches.lock().await; + + if self.spilled_before().await { + let tracking_metrics = self + .metrics_set + .new_intermediate_tracking(partition, self.runtime.clone()); + let mut streams: Vec = vec![]; + if in_mem_batches.len() > 0 { + let in_mem_stream = in_mem_partial_sort( + &mut *in_mem_batches, + self.schema.clone(), + &self.expr, + tracking_metrics, + )?; + let prev_used = self.metrics.mem_used().set(0); + streams.push(SortedStream::new(in_mem_stream, prev_used)); + } + + let mut spills = self.spills.lock().await; + + for spill in spills.drain(..) { + let stream = read_spill_as_stream(spill, self.schema.clone())?; + streams.push(SortedStream::new(stream, 0)); + } + let tracking_metrics = self + .metrics_set + .new_final_tracking(partition, self.runtime.clone()); + Ok(Box::pin(SortPreservingMergeStream::new_from_streams( + streams, + self.schema.clone(), + &self.expr, + tracking_metrics, + self.session_config.batch_size, + ))) + } else if in_mem_batches.len() > 0 { + let tracking_metrics = self + .metrics_set + .new_final_tracking(partition, self.runtime.clone()); + let result = in_mem_partial_sort( + &mut *in_mem_batches, + self.schema.clone(), + &self.expr, + tracking_metrics, + ); + // Report to the memory manager we are no longer using memory + self.metrics.mem_used().set(0); + result + } else { + Ok(Box::pin(EmptyRecordBatchStream::new(self.schema.clone()))) + } + } + + fn used(&self) -> usize { + self.metrics.mem_used().value() + } + + fn spilled_bytes(&self) -> usize { + self.metrics.spilled_bytes().value() + } + + fn spill_count(&self) -> usize { + self.metrics.spill_count().value() + } +} + +impl Debug for ExternalSorter2 { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + f.debug_struct("ExternalSorter") + .field("id", &self.id()) + .field("memory_used", &self.used()) + .field("spilled_bytes", &self.spilled_bytes()) + .field("spill_count", &self.spill_count()) + .finish() + } +} + +#[derive(Debug, Copy, Clone)] +struct CombinedIndex { + batch_idx: usize, + row_idx: usize, +} + +impl Drop for ExternalSorter2 { + fn drop(&mut self) { + self.runtime.drop_consumer(self.id(), self.used()); + } +} + +#[async_trait] +impl MemoryConsumer for ExternalSorter2 { + fn name(&self) -> String { + "ExternalSorter".to_owned() + } + + fn id(&self) -> &MemoryConsumerId { + &self.id + } + + fn memory_manager(&self) -> Arc { + self.runtime.memory_manager.clone() + } + + fn type_(&self) -> &ConsumerType { + &ConsumerType::Requesting + } + + async fn spill(&self) -> Result { + debug!( + "{}[{}] spilling sort data of {} to disk while inserting ({} time(s) so far)", + self.name(), + self.id(), + self.used(), + self.spill_count() + ); + + let partition = self.partition_id(); + let mut in_mem_batches = self.in_mem_batches.lock().await; + // we could always get a chance to free some memory as long as we are holding some + if in_mem_batches.len() == 0 { + return Ok(0); + } + + let tracking_metrics = self + .metrics_set + .new_intermediate_tracking(partition, self.runtime.clone()); + + let spillfile = self.runtime.disk_manager.create_tmp_file()?; + let stream = in_mem_partial_sort( + &mut *in_mem_batches, + self.schema.clone(), + &*self.expr, + tracking_metrics, + ); + + spill_partial_sorted_stream(&mut stream?, spillfile.path(), self.schema.clone()) + .await?; + let mut spills = self.spills.lock().await; + let used = self.metrics.mem_used().set(0); + self.metrics.record_spill(used); + spills.push(spillfile); + Ok(used) + } + + fn mem_used(&self) -> usize { + self.metrics.mem_used().value() + } +} + +/// consume the non-empty `sorted_bathes` and do in_mem_sort +fn in_mem_partial_sort( + buffered_batches: &mut Vec, + schema: SchemaRef, + expressions: &[PhysicalSortExpr], + tracking_metrics: MemTrackingMetrics, +) -> Result { + assert_ne!(buffered_batches.len(), 0); + if buffered_batches.len() == 1 { + let result = buffered_batches.pop(); + Ok(Box::pin(SizedRecordBatchStream::new( + schema, + vec![Arc::new(result.unwrap())], + tracking_metrics, + ))) + } else { + let batches = buffered_batches.drain(..).collect::>(); + let result = { + // NB timer records time taken on drop, so there are no + // calls to `timer.done()` below. + let _timer = tracking_metrics.elapsed_compute().timer(); + + // combine all record batches into one for each column + let pre_sort = common::combine_batches(&batches, schema.clone())?; + pre_sort + .map(|batch| sort_batch(batch, schema.clone(), expressions)) + .transpose()? + }; + Ok(Box::pin(SizedRecordBatchStream::new( + schema, + vec![Arc::new(result.unwrap())], + tracking_metrics, + ))) + } +} + +fn get_sorted_iter( + batches: &[RecordBatch], + expr: &[PhysicalSortExpr], + batch_size: usize, +) -> Result { + let (batch_grouped, combined): (Vec>, Vec>) = + batches + .iter() + .enumerate() + .map(|(i, batch)| { + let col: Vec = expr + .iter() + .map(|e| Ok(e.evaluate_to_sort_column(batch)?.values)) + .collect::>>()?; + + let combined_index = (0..batch.num_rows()) + .map(|r| CombinedIndex { + batch_idx: i, + row_idx: r, + }) + .collect::>(); + + Ok((col, combined_index)) + }) + .collect::, Vec)>>>()? + .into_iter() + .unzip(); + + let column_grouped = transpose(batch_grouped); + + let sort_columns: Vec = column_grouped + .iter() + .zip(expr.iter()) + .map(|(c, e)| { + Ok(SortColumn { + values: concat( + &*c.iter().map(|i| i.as_ref()).collect::>(), + )?, + options: Some(e.options), + }) + }) + .collect::>>() + .into_iter() + .collect::>>()?; + + let indices = lexsort_to_indices(&sort_columns, None)?; + let combined = combined + .into_iter() + .flatten() + .collect::>(); + Ok(SortedIterator::new(indices, combined, batch_size)) +} + +struct SortedIterator { + pos: usize, + indices: UInt32Array, + combined: Vec, + batch_size: usize, + length: usize, +} + +impl SortedIterator { + fn new( + indices: UInt32Array, + combined: Vec, + batch_size: usize, + ) -> Self { + let length = combined.len(); + Self { + pos: 0, + indices, + combined, + batch_size, + length, + } + } +} + +impl Iterator for SortedIterator { + type Item = Vec; + + fn next(&mut self) -> Option { + if self.pos >= self.length { + return None; + } + + let current_size = min(self.batch_size, self.length - self.pos); + let mut result = Vec::with_capacity(current_size); + for i in 0..current_size { + let p = self.pos + i; + let c_index = self.indices.value(p) as usize; + result.push(self.combined[c_index]) + } + self.pos += current_size; + Some(result) + } +} + +fn transpose(v: Vec>) -> Vec> { + assert!(!v.is_empty()); + let len = v[0].len(); + let mut iters: Vec<_> = v.into_iter().map(|n| n.into_iter()).collect(); + (0..len) + .map(|_| { + iters + .iter_mut() + .map(|n| n.next().unwrap()) + .collect::>() + }) + .collect() +} + +async fn spill_partial_sorted_stream( + in_mem_stream: &mut SendableRecordBatchStream, + path: &Path, + schema: SchemaRef, +) -> Result<()> { + let (sender, receiver) = tokio::sync::mpsc::channel(2); + let path: PathBuf = path.into(); + let handle = task::spawn_blocking(move || write_sorted(receiver, path, schema)); + while let Some(item) = in_mem_stream.next().await { + sender.send(item).await.ok(); + } + drop(sender); + match handle.await { + Ok(r) => r, + Err(e) => Err(DataFusionError::Execution(format!( + "Error occurred while spilling {}", + e + ))), + } +} + +fn read_spill_as_stream( + path: NamedTempFile, + schema: SchemaRef, +) -> Result { + let (sender, receiver): ( + Sender>, + Receiver>, + ) = tokio::sync::mpsc::channel(2); + let join_handle = task::spawn_blocking(move || { + if let Err(e) = read_spill(sender, path.path()) { + error!("Failure while reading spill file: {:?}. Error: {}", path, e); + } + }); + Ok(RecordBatchReceiverStream::create( + &schema, + receiver, + join_handle, + )) +} + +fn write_sorted( + mut receiver: Receiver>, + path: PathBuf, + schema: SchemaRef, +) -> Result<()> { + let mut writer = IPCWriter::new(path.as_ref(), schema.as_ref())?; + while let Some(batch) = receiver.blocking_recv() { + writer.write(&batch?)?; + } + writer.finish()?; + debug!( + "Spilled {} batches of total {} rows to disk, memory released {}", + writer.num_batches, + writer.num_rows, + human_readable_size(writer.num_bytes as usize), + ); + Ok(()) +} + +fn read_spill(sender: Sender>, path: &Path) -> Result<()> { + let file = BufReader::new(File::open(&path)?); + let reader = FileReader::try_new(file, None)?; + for batch in reader { + sender + .blocking_send(batch) + .map_err(|e| DataFusionError::Execution(format!("{}", e)))?; + } + Ok(()) +} + +/// External Sort execution plan +#[derive(Debug)] +pub struct SortExec2 { + /// Input schema + input: Arc, + /// Sort expressions + expr: Vec, + /// Containing all metrics set created during sort + metrics_set: CompositeMetricsSet, + /// Preserve partitions of input plan + preserve_partitioning: bool, +} + +impl SortExec2 { + /// Create a new sort execution plan + pub fn try_new( + expr: Vec, + input: Arc, + ) -> Result { + Ok(Self::new_with_partitioning(expr, input, false)) + } + + /// Create a new sort execution plan with the option to preserve + /// the partitioning of the input plan + pub fn new_with_partitioning( + expr: Vec, + input: Arc, + preserve_partitioning: bool, + ) -> Self { + Self { + expr, + input, + metrics_set: CompositeMetricsSet::new(), + preserve_partitioning, + } + } + + /// Input schema + pub fn input(&self) -> &Arc { + &self.input + } + + /// Sort expressions + pub fn expr(&self) -> &[PhysicalSortExpr] { + &self.expr + } +} + +#[async_trait] +impl ExecutionPlan for SortExec2 { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.input.schema() + } + + /// Get the output partitioning of this plan + fn output_partitioning(&self) -> Partitioning { + if self.preserve_partitioning { + self.input.output_partitioning() + } else { + Partitioning::UnknownPartitioning(1) + } + } + + fn required_child_distribution(&self) -> Distribution { + if self.preserve_partitioning { + Distribution::UnspecifiedDistribution + } else { + Distribution::SinglePartition + } + } + + fn children(&self) -> Vec> { + vec![self.input.clone()] + } + + fn relies_on_input_order(&self) -> bool { + // this operator resorts everything + false + } + + fn benefits_from_input_partitioning(&self) -> bool { + false + } + + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + Some(&self.expr) + } + + fn with_new_children( + &self, + children: Vec>, + ) -> Result> { + match children.len() { + 1 => Ok(Arc::new(SortExec2::try_new( + self.expr.clone(), + children[0].clone(), + )?)), + _ => Err(DataFusionError::Internal( + "SortExec wrong number of children".to_string(), + )), + } + } + + async fn execute( + &self, + partition: usize, + context: Arc, + ) -> Result { + if !self.preserve_partitioning { + if 0 != partition { + return Err(DataFusionError::Internal(format!( + "SortExec invalid partition {}", + partition + ))); + } + + // sort needs to operate on a single partition currently + if 1 != self.input.output_partitioning().partition_count() { + return Err(DataFusionError::Internal( + "SortExec requires a single input partition".to_owned(), + )); + } + } + + let input = self.input.execute(partition, context.clone()).await?; + + do_sort( + input, + partition, + self.expr.clone(), + self.metrics_set.clone(), + context, + ) + .await + } + + fn metrics(&self) -> Option { + Some(self.metrics_set.aggregate_all()) + } + + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + match t { + DisplayFormatType::Default => { + let expr: Vec = self.expr.iter().map(|e| e.to_string()).collect(); + write!(f, "SortExec: [{}]", expr.join(",")) + } + } + } + + fn statistics(&self) -> Statistics { + self.input.statistics() + } +} + +fn sort_batch( + batch: RecordBatch, + schema: SchemaRef, + expr: &[PhysicalSortExpr], +) -> ArrowResult { + // TODO: pushup the limit expression to sort + let indices = lexsort_to_indices( + &expr + .iter() + .map(|e| e.evaluate_to_sort_column(&batch)) + .collect::>>()?, + None, + )?; + + // reorder all rows based on sorted indices + RecordBatch::try_new( + schema, + batch + .columns() + .iter() + .map(|column| { + take( + column.as_ref(), + &indices, + // disable bound check overhead since indices are already generated from + // the same record batch + Some(TakeOptions { + check_bounds: false, + }), + ) + }) + .collect::>>()?, + ) +} + +async fn do_sort( + mut input: SendableRecordBatchStream, + partition_id: usize, + expr: Vec, + metrics_set: CompositeMetricsSet, + context: Arc, +) -> Result { + let schema = input.schema(); + let tracking_metrics = metrics_set + .new_intermediate_tracking(partition_id, context.runtime_env().clone()); + let sorter = ExternalSorter2::new( + partition_id, + schema.clone(), + expr, + metrics_set, + Arc::new(context.session_config()), + context.runtime_env(), + ); + context.runtime_env().register_requester(sorter.id()); + while let Some(batch) = input.next().await { + let batch = batch?; + sorter.insert_batch(batch, &tracking_metrics).await?; + } + sorter.sort().await +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::datafusion_data_access::object_store::local::LocalFileSystem; + use crate::execution::context::SessionConfig; + use crate::execution::runtime_env::RuntimeConfig; + use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec; + use crate::physical_plan::expressions::col; + use crate::physical_plan::memory::MemoryExec; + use crate::physical_plan::{ + collect, + file_format::{CsvExec, FileScanConfig}, + }; + use crate::prelude::SessionContext; + use crate::test; + use crate::test::assert_is_pending; + use crate::test::exec::{assert_strong_count_converges_to_zero, BlockingExec}; + use crate::test_util; + use arrow::array::*; + use arrow::compute::SortOptions; + use arrow::datatypes::*; + use futures::FutureExt; + use std::collections::{BTreeMap, HashMap}; + + #[tokio::test] + async fn test_in_mem_sort() -> Result<()> { + let session_ctx = SessionContext::new(); + let task_ctx = session_ctx.task_ctx(); + let schema = test_util::aggr_test_schema(); + let partitions = 4; + let (_, files) = + test::create_partitioned_csv("aggregate_test_100.csv", partitions)?; + + let csv = CsvExec::new( + FileScanConfig { + object_store: Arc::new(LocalFileSystem {}), + file_schema: Arc::clone(&schema), + file_groups: files, + statistics: Statistics::default(), + projection: None, + limit: None, + table_partition_cols: vec![], + }, + true, + b',', + ); + + let sort_exec = Arc::new(SortExec2::try_new( + vec![ + // c1 string column + PhysicalSortExpr { + expr: col("c1", &schema)?, + options: SortOptions::default(), + }, + // c2 uin32 column + PhysicalSortExpr { + expr: col("c2", &schema)?, + options: SortOptions::default(), + }, + // c7 uin8 column + PhysicalSortExpr { + expr: col("c7", &schema)?, + options: SortOptions::default(), + }, + ], + Arc::new(CoalescePartitionsExec::new(Arc::new(csv))), + )?); + + let result = collect(sort_exec, task_ctx).await?; + + assert_eq!(result.len(), 1); + + let columns = result[0].columns(); + + let c1 = as_string_array(&columns[0]); + assert_eq!(c1.value(0), "a"); + assert_eq!(c1.value(c1.len() - 1), "e"); + + let c2 = as_primitive_array::(&columns[1]); + assert_eq!(c2.value(0), 1); + assert_eq!(c2.value(c2.len() - 1), 5,); + + let c7 = as_primitive_array::(&columns[6]); + assert_eq!(c7.value(0), 15); + assert_eq!(c7.value(c7.len() - 1), 254,); + + Ok(()) + } + + #[tokio::test] + async fn test_sort_spill() -> Result<()> { + // trigger spill there will be 4 batches with 5.5KB for each + let config = RuntimeConfig::new().with_memory_limit(12288, 1.0); + let runtime = Arc::new(RuntimeEnv::new(config)?); + let session_ctx = SessionContext::with_config_rt(SessionConfig::new(), runtime); + + let schema = test_util::aggr_test_schema(); + let partitions = 4; + let (_, files) = + test::create_partitioned_csv("aggregate_test_100.csv", partitions)?; + + let csv = CsvExec::new( + FileScanConfig { + object_store: Arc::new(LocalFileSystem {}), + file_schema: Arc::clone(&schema), + file_groups: files, + statistics: Statistics::default(), + projection: None, + limit: None, + table_partition_cols: vec![], + }, + true, + b',', + ); + + let sort_exec = Arc::new(SortExec2::try_new( + vec![ + // c1 string column + PhysicalSortExpr { + expr: col("c1", &schema)?, + options: SortOptions::default(), + }, + // c2 uin32 column + PhysicalSortExpr { + expr: col("c2", &schema)?, + options: SortOptions::default(), + }, + // c7 uin8 column + PhysicalSortExpr { + expr: col("c7", &schema)?, + options: SortOptions::default(), + }, + ], + Arc::new(CoalescePartitionsExec::new(Arc::new(csv))), + )?); + + let task_ctx = session_ctx.task_ctx(); + let result = collect(sort_exec.clone(), task_ctx).await?; + + assert_eq!(result.len(), 1); + + // Now, validate metrics + let metrics = sort_exec.metrics().unwrap(); + + assert_eq!(metrics.output_rows().unwrap(), 100); + assert!(metrics.elapsed_compute().unwrap() > 0); + assert!(metrics.spill_count().unwrap() > 0); + assert!(metrics.spilled_bytes().unwrap() > 0); + + let columns = result[0].columns(); + + let c1 = as_string_array(&columns[0]); + assert_eq!(c1.value(0), "a"); + assert_eq!(c1.value(c1.len() - 1), "e"); + + let c2 = as_primitive_array::(&columns[1]); + assert_eq!(c2.value(0), 1); + assert_eq!(c2.value(c2.len() - 1), 5,); + + let c7 = as_primitive_array::(&columns[6]); + assert_eq!(c7.value(0), 15); + assert_eq!(c7.value(c7.len() - 1), 254,); + + Ok(()) + } + + #[tokio::test] + async fn test_sort_metadata() -> Result<()> { + let session_ctx = SessionContext::new(); + let task_ctx = session_ctx.task_ctx(); + let field_metadata: BTreeMap = + vec![("foo".to_string(), "bar".to_string())] + .into_iter() + .collect(); + let schema_metadata: HashMap = + vec![("baz".to_string(), "barf".to_string())] + .into_iter() + .collect(); + + let mut field = Field::new("field_name", DataType::UInt64, true); + field.set_metadata(Some(field_metadata.clone())); + let schema = Schema::new_with_metadata(vec![field], schema_metadata.clone()); + let schema = Arc::new(schema); + + let data: ArrayRef = + Arc::new(vec![3, 2, 1].into_iter().map(Some).collect::()); + + let batch = RecordBatch::try_new(schema.clone(), vec![data]).unwrap(); + let input = + Arc::new(MemoryExec::try_new(&[vec![batch]], schema.clone(), None).unwrap()); + + let sort_exec = Arc::new(SortExec2::try_new( + vec![PhysicalSortExpr { + expr: col("field_name", &schema)?, + options: SortOptions::default(), + }], + input, + )?); + + let result: Vec = collect(sort_exec, task_ctx).await?; + + let expected_data: ArrayRef = + Arc::new(vec![1, 2, 3].into_iter().map(Some).collect::()); + let expected_batch = + RecordBatch::try_new(schema.clone(), vec![expected_data]).unwrap(); + + // Data is correct + assert_eq!(&vec![expected_batch], &result); + + // explicitlty ensure the metadata is present + assert_eq!( + result[0].schema().fields()[0].metadata(), + &Some(field_metadata) + ); + assert_eq!(result[0].schema().metadata(), &schema_metadata); + + Ok(()) + } + + #[tokio::test] + async fn test_lex_sort_by_float() -> Result<()> { + let session_ctx = SessionContext::new(); + let task_ctx = session_ctx.task_ctx(); + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Float32, true), + Field::new("b", DataType::Float64, true), + ])); + + // define data. + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Float32Array::from(vec![ + Some(f32::NAN), + None, + None, + Some(f32::NAN), + Some(1.0_f32), + Some(1.0_f32), + Some(2.0_f32), + Some(3.0_f32), + ])), + Arc::new(Float64Array::from(vec![ + Some(200.0_f64), + Some(20.0_f64), + Some(10.0_f64), + Some(100.0_f64), + Some(f64::NAN), + None, + None, + Some(f64::NAN), + ])), + ], + )?; + + let sort_exec = Arc::new(SortExec2::try_new( + vec![ + PhysicalSortExpr { + expr: col("a", &schema)?, + options: SortOptions { + descending: true, + nulls_first: true, + }, + }, + PhysicalSortExpr { + expr: col("b", &schema)?, + options: SortOptions { + descending: false, + nulls_first: false, + }, + }, + ], + Arc::new(MemoryExec::try_new(&[vec![batch]], schema, None)?), + )?); + + assert_eq!(DataType::Float32, *sort_exec.schema().field(0).data_type()); + assert_eq!(DataType::Float64, *sort_exec.schema().field(1).data_type()); + + let result: Vec = collect(sort_exec.clone(), task_ctx).await?; + let metrics = sort_exec.metrics().unwrap(); + assert!(metrics.elapsed_compute().unwrap() > 0); + assert_eq!(metrics.output_rows().unwrap(), 8); + assert_eq!(result.len(), 1); + + let columns = result[0].columns(); + + assert_eq!(DataType::Float32, *columns[0].data_type()); + assert_eq!(DataType::Float64, *columns[1].data_type()); + + let a = as_primitive_array::(&columns[0]); + let b = as_primitive_array::(&columns[1]); + + // convert result to strings to allow comparing to expected result containing NaN + let result: Vec<(Option, Option)> = (0..result[0].num_rows()) + .map(|i| { + let aval = if a.is_valid(i) { + Some(a.value(i).to_string()) + } else { + None + }; + let bval = if b.is_valid(i) { + Some(b.value(i).to_string()) + } else { + None + }; + (aval, bval) + }) + .collect(); + + let expected: Vec<(Option, Option)> = vec![ + (None, Some("10".to_owned())), + (None, Some("20".to_owned())), + (Some("NaN".to_owned()), Some("100".to_owned())), + (Some("NaN".to_owned()), Some("200".to_owned())), + (Some("3".to_owned()), Some("NaN".to_owned())), + (Some("2".to_owned()), None), + (Some("1".to_owned()), Some("NaN".to_owned())), + (Some("1".to_owned()), None), + ]; + + assert_eq!(expected, result); + + Ok(()) + } + + #[tokio::test] + async fn test_drop_cancel() -> Result<()> { + let session_ctx = SessionContext::new(); + let task_ctx = session_ctx.task_ctx(); + let schema = + Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, true)])); + + let blocking_exec = Arc::new(BlockingExec::new(Arc::clone(&schema), 1)); + let refs = blocking_exec.refs(); + let sort_exec = Arc::new(SortExec2::try_new( + vec![PhysicalSortExpr { + expr: col("a", &schema)?, + options: SortOptions::default(), + }], + blocking_exec, + )?); + + let fut = collect(sort_exec, task_ctx); + let mut fut = fut.boxed(); + + assert_is_pending(&mut fut); + drop(fut); + assert_strong_count_converges_to_zero(refs).await; + + Ok(()) + } +} From 4497d1150eb59de8cb73c416edc2215782515bc8 Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Fri, 1 Apr 2022 15:42:35 +0800 Subject: [PATCH 02/15] workable version --- .../core/src/physical_plan/sorts/sort2.rs | 96 ++++++++++++++++--- datafusion/core/tests/order_spill_fuzz.rs | 58 ++++++++++- 2 files changed, 138 insertions(+), 16 deletions(-) diff --git a/datafusion/core/src/physical_plan/sorts/sort2.rs b/datafusion/core/src/physical_plan/sorts/sort2.rs index 7872820aad0c..a63053aff4ac 100644 --- a/datafusion/core/src/physical_plan/sorts/sort2.rs +++ b/datafusion/core/src/physical_plan/sorts/sort2.rs @@ -34,11 +34,11 @@ use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeStrea use crate::physical_plan::sorts::SortedStream; use crate::physical_plan::stream::RecordBatchReceiverStream; use crate::physical_plan::{ - common, DisplayFormatType, Distribution, EmptyRecordBatchStream, ExecutionPlan, - Partitioning, SendableRecordBatchStream, Statistics, + DisplayFormatType, Distribution, EmptyRecordBatchStream, ExecutionPlan, Partitioning, + RecordBatchStream, SendableRecordBatchStream, Statistics, }; use crate::prelude::SessionConfig; -use arrow::array::{Array, ArrayRef, UInt32Array}; +use arrow::array::{make_array, Array, ArrayRef, MutableArrayData, UInt32Array}; pub use arrow::compute::SortOptions; use arrow::compute::{concat, lexsort_to_indices, take, SortColumn, TakeOptions}; use arrow::datatypes::SchemaRef; @@ -47,7 +47,7 @@ use arrow::ipc::reader::FileReader; use arrow::record_batch::RecordBatch; use async_trait::async_trait; use futures::lock::Mutex; -use futures::StreamExt; +use futures::{Future, SinkExt, Stream, StreamExt, TryStreamExt}; use log::{debug, error}; use std::any::Any; use std::cmp::min; @@ -57,6 +57,7 @@ use std::fs::File; use std::io::BufReader; use std::path::{Path, PathBuf}; use std::sync::Arc; +use std::task::{Context, Poll}; use tempfile::NamedTempFile; use tokio::sync::mpsc::{Receiver, Sender}; use tokio::task; @@ -133,6 +134,7 @@ impl ExternalSorter2 { /// MergeSort in mem batches as well as spills into total order with `SortPreservingMergeStream`. async fn sort(&self) -> Result { let partition = self.partition_id(); + let batch_size = self.session_config.batch_size; let mut in_mem_batches = self.in_mem_batches.lock().await; if self.spilled_before().await { @@ -145,6 +147,7 @@ impl ExternalSorter2 { &mut *in_mem_batches, self.schema.clone(), &self.expr, + batch_size, tracking_metrics, )?; let prev_used = self.metrics.mem_used().set(0); @@ -175,6 +178,7 @@ impl ExternalSorter2 { &mut *in_mem_batches, self.schema.clone(), &self.expr, + batch_size, tracking_metrics, ); // Report to the memory manager we are no longer using memory @@ -264,6 +268,7 @@ impl MemoryConsumer for ExternalSorter2 { &mut *in_mem_batches, self.schema.clone(), &*self.expr, + self.session_config.batch_size, tracking_metrics, ); @@ -286,6 +291,7 @@ fn in_mem_partial_sort( buffered_batches: &mut Vec, schema: SchemaRef, expressions: &[PhysicalSortExpr], + batch_size: usize, tracking_metrics: MemTrackingMetrics, ) -> Result { assert_ne!(buffered_batches.len(), 0); @@ -298,20 +304,16 @@ fn in_mem_partial_sort( ))) } else { let batches = buffered_batches.drain(..).collect::>(); - let result = { + let sorted_iter = { // NB timer records time taken on drop, so there are no // calls to `timer.done()` below. let _timer = tracking_metrics.elapsed_compute().timer(); - - // combine all record batches into one for each column - let pre_sort = common::combine_batches(&batches, schema.clone())?; - pre_sort - .map(|batch| sort_batch(batch, schema.clone(), expressions)) - .transpose()? + get_sorted_iter(&batches, expressions, batch_size)? }; - Ok(Box::pin(SizedRecordBatchStream::new( + Ok(Box::pin(SortedSizedRecordBatchStream::new( schema, - vec![Arc::new(result.unwrap())], + batches, + sorted_iter, tracking_metrics, ))) } @@ -415,6 +417,74 @@ impl Iterator for SortedIterator { } } +/// Stream of sorted record batches +struct SortedSizedRecordBatchStream { + schema: SchemaRef, + batches: Vec, + sorted_iter: SortedIterator, + num_cols: usize, + metrics: MemTrackingMetrics, +} + +impl SortedSizedRecordBatchStream { + /// new + pub fn new( + schema: SchemaRef, + batches: Vec, + sorted_iter: SortedIterator, + metrics: MemTrackingMetrics, + ) -> Self { + let size = batches.iter().map(|b| batch_byte_size(b)).sum::(); + metrics.init_mem_used(size); + let num_cols = batches[0].num_columns(); + SortedSizedRecordBatchStream { + schema, + batches, + sorted_iter, + num_cols, + metrics, + } + } +} + +impl Stream for SortedSizedRecordBatchStream { + type Item = ArrowResult; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + _: &mut Context<'_>, + ) -> Poll> { + match self.sorted_iter.next() { + None => Poll::Ready(None), + Some(combined) => { + let mut output = Vec::with_capacity(self.num_cols); + for i in 0..self.num_cols { + let arrays = self + .batches + .iter() + .map(|b| b.column(i).data()) + .collect::>(); + let mut mutable = + MutableArrayData::new(arrays, false, combined.len()); + for x in combined.iter() { + mutable.extend(x.batch_idx, x.row_idx, x.row_idx + 1); + } + output.push(make_array(mutable.freeze())) + } + let batch = RecordBatch::try_new(self.schema.clone(), output); + let poll = Poll::Ready(Some(batch)); + self.metrics.record_poll(poll) + } + } + } +} + +impl RecordBatchStream for SortedSizedRecordBatchStream { + fn schema(&self) -> SchemaRef { + self.schema.clone() + } +} + fn transpose(v: Vec>) -> Vec> { assert!(!v.is_empty()); let len = v[0].len(); diff --git a/datafusion/core/tests/order_spill_fuzz.rs b/datafusion/core/tests/order_spill_fuzz.rs index c052382d5eac..3b066abfafa2 100644 --- a/datafusion/core/tests/order_spill_fuzz.rs +++ b/datafusion/core/tests/order_spill_fuzz.rs @@ -27,6 +27,7 @@ use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; use datafusion::physical_plan::expressions::{col, PhysicalSortExpr}; use datafusion::physical_plan::memory::MemoryExec; use datafusion::physical_plan::sorts::sort::SortExec; +use datafusion::physical_plan::sorts::sort2::SortExec2; use datafusion::physical_plan::{collect, ExecutionPlan}; use datafusion::prelude::{SessionConfig, SessionContext}; use fuzz_utils::{add_empty_batches, batches_to_vec, partitions_to_sorted_vec}; @@ -36,12 +37,14 @@ use std::sync::Arc; #[tokio::test] async fn test_sort_1k_mem() { - run_sort(1024, vec![(5, false), (2000, true), (1000000, true)]).await + run_sort(1024, vec![(5, false), (2000, true), (1000000, true)]).await; + run_sort2(1024, vec![(5, false), (2000, true), (1000000, true)]).await; } #[tokio::test] async fn test_sort_100k_mem() { - run_sort(102400, vec![(5, false), (2000, false), (1000000, true)]).await + run_sort(102400, vec![(5, false), (2000, false), (1000000, true)]).await; + run_sort2(102400, vec![(5, false), (2000, false), (1000000, true)]).await; } #[tokio::test] @@ -50,7 +53,12 @@ async fn test_sort_unlimited_mem() { usize::MAX, vec![(5, false), (2000, false), (1000000, false)], ) - .await + .await; + run_sort2( + usize::MAX, + vec![(5, false), (2000, false), (1000000, false)], + ) + .await; } /// Sort the input using SortExec and ensure the results are correct according to `Vec::sort` @@ -97,6 +105,50 @@ async fn run_sort(pool_size: usize, size_spill: Vec<(usize, bool)>) { } } +/// Sort the input using SortExec and ensure the results are correct according to `Vec::sort` +async fn run_sort2(pool_size: usize, size_spill: Vec<(usize, bool)>) { + for (size, spill) in size_spill { + let input = vec![make_staggered_batches(size)]; + let first_batch = input + .iter() + .flat_map(|p| p.iter()) + .next() + .expect("at least one batch"); + let schema = first_batch.schema(); + + let sort = vec![PhysicalSortExpr { + expr: col("x", &schema).unwrap(), + options: SortOptions { + descending: false, + nulls_first: true, + }, + }]; + + let exec = MemoryExec::try_new(&input, schema, None).unwrap(); + let sort = Arc::new(SortExec2::try_new(sort, Arc::new(exec)).unwrap()); + + let runtime_config = RuntimeConfig::new().with_memory_manager( + MemoryManagerConfig::try_new_limit(pool_size, 1.0).unwrap(), + ); + let runtime = Arc::new(RuntimeEnv::new(runtime_config).unwrap()); + let session_ctx = SessionContext::with_config_rt(SessionConfig::new(), runtime); + + let task_ctx = session_ctx.task_ctx(); + let collected = collect(sort.clone(), task_ctx).await.unwrap(); + + let expected = partitions_to_sorted_vec(&input); + let actual = batches_to_vec(&collected); + + if spill { + assert_ne!(sort.metrics().unwrap().spill_count().unwrap(), 0); + } else { + assert_eq!(sort.metrics().unwrap().spill_count().unwrap(), 0); + } + + assert_eq!(expected, actual, "failure in @ pool_size {}", pool_size); + } +} + /// Return randomly sized record batches in a field named 'x' of type `Int32` /// with randomized i32 content fn make_staggered_batches(len: usize) -> Vec { From 971ded66ae012ab9f07dddaf6c8296d16911a34a Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Fri, 1 Apr 2022 19:07:16 +0800 Subject: [PATCH 03/15] sort to bench --- benchmarks/queries/q1.sql | 21 ++++++------------- .../core/src/physical_plan/sorts/sort2.rs | 2 +- datafusion/core/tests/order_spill_fuzz.rs | 12 +++++++++++ 3 files changed, 19 insertions(+), 16 deletions(-) diff --git a/benchmarks/queries/q1.sql b/benchmarks/queries/q1.sql index a0fcf159e209..9af11d3c99fa 100644 --- a/benchmarks/queries/q1.sql +++ b/benchmarks/queries/q1.sql @@ -1,21 +1,12 @@ select l_returnflag, l_linestatus, - sum(l_quantity) as sum_qty, - sum(l_extendedprice) as sum_base_price, - sum(l_extendedprice * (1 - l_discount)) as sum_disc_price, - sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge, - avg(l_quantity) as avg_qty, - avg(l_extendedprice) as avg_price, - avg(l_discount) as avg_disc, - count(*) as count_order + l_quantity, + l_extendedprice, + l_discount, + l_tax from lineitem -where - l_shipdate <= date '1998-09-02' -group by - l_returnflag, - l_linestatus order by - l_returnflag, - l_linestatus; \ No newline at end of file + l_extendedprice, + l_discount; \ No newline at end of file diff --git a/datafusion/core/src/physical_plan/sorts/sort2.rs b/datafusion/core/src/physical_plan/sorts/sort2.rs index a63053aff4ac..0eba958a7fe5 100644 --- a/datafusion/core/src/physical_plan/sorts/sort2.rs +++ b/datafusion/core/src/physical_plan/sorts/sort2.rs @@ -47,7 +47,7 @@ use arrow::ipc::reader::FileReader; use arrow::record_batch::RecordBatch; use async_trait::async_trait; use futures::lock::Mutex; -use futures::{Future, SinkExt, Stream, StreamExt, TryStreamExt}; +use futures::{Stream, StreamExt}; use log::{debug, error}; use std::any::Any; use std::cmp::min; diff --git a/datafusion/core/tests/order_spill_fuzz.rs b/datafusion/core/tests/order_spill_fuzz.rs index 3b066abfafa2..dd363ed6138c 100644 --- a/datafusion/core/tests/order_spill_fuzz.rs +++ b/datafusion/core/tests/order_spill_fuzz.rs @@ -38,12 +38,20 @@ use std::sync::Arc; #[tokio::test] async fn test_sort_1k_mem() { run_sort(1024, vec![(5, false), (2000, true), (1000000, true)]).await; +} + +#[tokio::test] +async fn test_sort_1k_mem_2() { run_sort2(1024, vec![(5, false), (2000, true), (1000000, true)]).await; } #[tokio::test] async fn test_sort_100k_mem() { run_sort(102400, vec![(5, false), (2000, false), (1000000, true)]).await; +} + +#[tokio::test] +async fn test_sort_100k_mem_2() { run_sort2(102400, vec![(5, false), (2000, false), (1000000, true)]).await; } @@ -54,6 +62,10 @@ async fn test_sort_unlimited_mem() { vec![(5, false), (2000, false), (1000000, false)], ) .await; +} + +#[tokio::test] +async fn test_sort_unlimited_mem2() { run_sort2( usize::MAX, vec![(5, false), (2000, false), (1000000, false)], From acff3778cfaed20c3610c4244a63e04218722ff8 Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Fri, 1 Apr 2022 19:31:27 +0800 Subject: [PATCH 04/15] use sort2 --- ballista/rust/core/src/serde/physical_plan/mod.rs | 10 +++++----- ballista/rust/core/src/utils.rs | 6 +++--- ballista/rust/scheduler/src/planner.rs | 4 ++-- .../core/src/physical_optimizer/repartition.rs | 14 +++++++------- datafusion/core/src/physical_plan/planner.rs | 8 ++++---- datafusion/core/src/physical_plan/sorts/sort2.rs | 12 ++++++------ .../physical_plan/sorts/sort_preserving_merge.rs | 6 +++--- datafusion/core/tests/sql/explain_analyze.rs | 6 +++--- 8 files changed, 33 insertions(+), 33 deletions(-) diff --git a/ballista/rust/core/src/serde/physical_plan/mod.rs b/ballista/rust/core/src/serde/physical_plan/mod.rs index 0361df677607..ba2fd3cefa31 100644 --- a/ballista/rust/core/src/serde/physical_plan/mod.rs +++ b/ballista/rust/core/src/serde/physical_plan/mod.rs @@ -53,7 +53,7 @@ use datafusion::physical_plan::hash_join::{HashJoinExec, PartitionMode}; use datafusion::physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; use datafusion::physical_plan::projection::ProjectionExec; use datafusion::physical_plan::repartition::RepartitionExec; -use datafusion::physical_plan::sorts::sort::SortExec; +use datafusion::physical_plan::sorts::sort2::SortExec2; use datafusion::physical_plan::union::UnionExec; use datafusion::physical_plan::windows::{create_window_expr, WindowAggExec}; use datafusion::physical_plan::{ @@ -522,7 +522,7 @@ impl AsExecutionPlan for PhysicalPlanNode { } }) .collect::, _>>()?; - Ok(Arc::new(SortExec::try_new(exprs, input)?)) + Ok(Arc::new(SortExec2::try_new(exprs, input)?)) } PhysicalPlanType::Unresolved(unresolved_shuffle) => { let schema = Arc::new(convert_required!(unresolved_shuffle.schema)?); @@ -849,7 +849,7 @@ impl AsExecutionPlan for PhysicalPlanNode { }, ))), }) - } else if let Some(exec) = plan.downcast_ref::() { + } else if let Some(exec) = plan.downcast_ref::() { let input = protobuf::PhysicalPlanNode::try_from_physical_plan( exec.input().to_owned(), extension_codec, @@ -1032,7 +1032,7 @@ mod roundtrip_tests { hash_aggregate::{AggregateMode, HashAggregateExec}, hash_join::{HashJoinExec, PartitionMode}, limit::{GlobalLimitExec, LocalLimitExec}, - sorts::sort::SortExec, + sorts::sort2::SortExec2, AggregateExpr, ExecutionPlan, Partitioning, PhysicalExpr, Statistics, }, prelude::SessionContext, @@ -1193,7 +1193,7 @@ mod roundtrip_tests { }, }, ]; - roundtrip_test(Arc::new(SortExec::try_new( + roundtrip_test(Arc::new(SortExec2::try_new( sort_exprs, Arc::new(EmptyExec::new(false, schema)), )?)) diff --git a/ballista/rust/core/src/utils.rs b/ballista/rust/core/src/utils.rs index 6670ab5cedd8..1e66728f4b37 100644 --- a/ballista/rust/core/src/utils.rs +++ b/ballista/rust/core/src/utils.rs @@ -53,7 +53,7 @@ use datafusion::physical_plan::filter::FilterExec; use datafusion::physical_plan::hash_aggregate::HashAggregateExec; use datafusion::physical_plan::hash_join::HashJoinExec; use datafusion::physical_plan::projection::ProjectionExec; -use datafusion::physical_plan::sorts::sort::SortExec; +use datafusion::physical_plan::sorts::sort2::SortExec2; use datafusion::physical_plan::{metrics, ExecutionPlan, RecordBatchStream}; use futures::{Stream, StreamExt}; @@ -153,8 +153,8 @@ fn build_exec_plan_diagram( ) -> Result { let operator_str = if plan.as_any().downcast_ref::().is_some() { "HashAggregateExec" - } else if plan.as_any().downcast_ref::().is_some() { - "SortExec" + } else if plan.as_any().downcast_ref::().is_some() { + "SortExec2" } else if plan.as_any().downcast_ref::().is_some() { "ProjectionExec" } else if plan.as_any().downcast_ref::().is_some() { diff --git a/ballista/rust/scheduler/src/planner.rs b/ballista/rust/scheduler/src/planner.rs index d7ee22bbdf35..c3d884b5acc1 100644 --- a/ballista/rust/scheduler/src/planner.rs +++ b/ballista/rust/scheduler/src/planner.rs @@ -274,7 +274,7 @@ mod test { use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec; use datafusion::physical_plan::hash_aggregate::{AggregateMode, HashAggregateExec}; use datafusion::physical_plan::hash_join::HashJoinExec; - use datafusion::physical_plan::sorts::sort::SortExec; + use datafusion::physical_plan::sorts::sort2::SortExec2; use datafusion::physical_plan::{ coalesce_partitions::CoalescePartitionsExec, projection::ProjectionExec, }; @@ -361,7 +361,7 @@ mod test { // verify stage 2 let stage2 = stages[2].children()[0].clone(); - let sort = downcast_exec!(stage2, SortExec); + let sort = downcast_exec!(stage2, SortExec2); let coalesce_partitions = sort.children()[0].clone(); let coalesce_partitions = downcast_exec!(coalesce_partitions, CoalescePartitionsExec); diff --git a/datafusion/core/src/physical_optimizer/repartition.rs b/datafusion/core/src/physical_optimizer/repartition.rs index d98fa4162fa2..1656a51ec828 100644 --- a/datafusion/core/src/physical_optimizer/repartition.rs +++ b/datafusion/core/src/physical_optimizer/repartition.rs @@ -245,7 +245,7 @@ mod tests { use crate::physical_plan::hash_aggregate::{AggregateMode, HashAggregateExec}; use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; use crate::physical_plan::projection::ProjectionExec; - use crate::physical_plan::sorts::sort::SortExec; + use crate::physical_plan::sorts::sort2::SortExec2; use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; use crate::physical_plan::union::UnionExec; use crate::physical_plan::{displayable, Statistics}; @@ -290,7 +290,7 @@ mod tests { expr: col("c1", &schema()).unwrap(), options: SortOptions::default(), }]; - Arc::new(SortExec::try_new(sort_exprs, input).unwrap()) + Arc::new(SortExec2::try_new(sort_exprs, input).unwrap()) } fn projection_exec(input: Arc) -> Arc { @@ -413,7 +413,7 @@ mod tests { "GlobalLimitExec: limit=100", "LocalLimitExec: limit=100", // data is sorted so can't repartition here - "SortExec: [c1@0 ASC]", + "SortExec2: [c1@0 ASC]", "ParquetExec: limit=None, partitions=[x], projection=[c1]", ]; @@ -431,7 +431,7 @@ mod tests { "FilterExec: c1@0", // data is sorted so can't repartition here even though // filter would benefit from parallelism, the answers might be wrong - "SortExec: [c1@0 ASC]", + "SortExec2: [c1@0 ASC]", "ParquetExec: limit=None, partitions=[x], projection=[c1]", ]; @@ -519,7 +519,7 @@ mod tests { let expected = &[ "SortPreservingMergeExec: [c1@0 ASC]", // Expect repartition on the input to the sort (as it can benefit from additional parallelism) - "SortExec: [c1@0 ASC]", + "SortExec2: [c1@0 ASC]", "ProjectionExec: expr=[c1@0 as c1]", "RepartitionExec: partitioning=RoundRobinBatch(10)", "ParquetExec: limit=None, partitions=[x], projection=[c1]", @@ -536,7 +536,7 @@ mod tests { let expected = &[ "SortPreservingMergeExec: [c1@0 ASC]", // Expect repartition on the input to the sort (as it can benefit from additional parallelism) - "SortExec: [c1@0 ASC]", + "SortExec2: [c1@0 ASC]", "FilterExec: c1@0", "RepartitionExec: partitioning=RoundRobinBatch(10)", "ParquetExec: limit=None, partitions=[x], projection=[c1]", @@ -555,7 +555,7 @@ mod tests { let expected = &[ "SortPreservingMergeExec: [c1@0 ASC]", // Expect repartition on the input to the sort (as it can benefit from additional parallelism) - "SortExec: [c1@0 ASC]", + "SortExec2: [c1@0 ASC]", "ProjectionExec: expr=[c1@0 as c1]", "FilterExec: c1@0", // repartition is lowest down diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs index 7504dd4385e4..1c08b17014f2 100644 --- a/datafusion/core/src/physical_plan/planner.rs +++ b/datafusion/core/src/physical_plan/planner.rs @@ -45,7 +45,7 @@ use crate::physical_plan::hash_join::HashJoinExec; use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; use crate::physical_plan::projection::ProjectionExec; use crate::physical_plan::repartition::RepartitionExec; -use crate::physical_plan::sorts::sort::SortExec; +use crate::physical_plan::sorts::sort2::SortExec2; use crate::physical_plan::udf; use crate::physical_plan::windows::WindowAggExec; use crate::physical_plan::{join_utils, Partitioning}; @@ -457,9 +457,9 @@ impl DefaultPhysicalPlanner { }) .collect::>>()?; Arc::new(if can_repartition { - SortExec::new_with_partitioning(sort_keys, input_exec, true) + SortExec2::new_with_partitioning(sort_keys, input_exec, true) } else { - SortExec::try_new(sort_keys, input_exec)? + SortExec2::try_new(sort_keys, input_exec)? }) }; @@ -704,7 +704,7 @@ impl DefaultPhysicalPlanner { )), }) .collect::>>()?; - Ok(Arc::new(SortExec::try_new(sort_expr, physical_input)?) ) + Ok(Arc::new(SortExec2::try_new(sort_expr, physical_input)?) ) } LogicalPlan::Join(Join { left, diff --git a/datafusion/core/src/physical_plan/sorts/sort2.rs b/datafusion/core/src/physical_plan/sorts/sort2.rs index 0eba958a7fe5..67dca3508b3f 100644 --- a/datafusion/core/src/physical_plan/sorts/sort2.rs +++ b/datafusion/core/src/physical_plan/sorts/sort2.rs @@ -204,7 +204,7 @@ impl ExternalSorter2 { impl Debug for ExternalSorter2 { fn fmt(&self, f: &mut Formatter) -> fmt::Result { - f.debug_struct("ExternalSorter") + f.debug_struct("ExternalSorter2") .field("id", &self.id()) .field("memory_used", &self.used()) .field("spilled_bytes", &self.spilled_bytes()) @@ -228,7 +228,7 @@ impl Drop for ExternalSorter2 { #[async_trait] impl MemoryConsumer for ExternalSorter2 { fn name(&self) -> String { - "ExternalSorter".to_owned() + "ExternalSorter2".to_owned() } fn id(&self) -> &MemoryConsumerId { @@ -672,7 +672,7 @@ impl ExecutionPlan for SortExec2 { children[0].clone(), )?)), _ => Err(DataFusionError::Internal( - "SortExec wrong number of children".to_string(), + "SortExec2 wrong number of children".to_string(), )), } } @@ -685,7 +685,7 @@ impl ExecutionPlan for SortExec2 { if !self.preserve_partitioning { if 0 != partition { return Err(DataFusionError::Internal(format!( - "SortExec invalid partition {}", + "SortExec2 invalid partition {}", partition ))); } @@ -693,7 +693,7 @@ impl ExecutionPlan for SortExec2 { // sort needs to operate on a single partition currently if 1 != self.input.output_partitioning().partition_count() { return Err(DataFusionError::Internal( - "SortExec requires a single input partition".to_owned(), + "SortExec2 requires a single input partition".to_owned(), )); } } @@ -722,7 +722,7 @@ impl ExecutionPlan for SortExec2 { match t { DisplayFormatType::Default => { let expr: Vec = self.expr.iter().map(|e| e.to_string()).collect(); - write!(f, "SortExec: [{}]", expr.join(",")) + write!(f, "SortExec2: [{}]", expr.join(",")) } } } diff --git a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs index 3fb480de21bd..e5151ffe32de 100644 --- a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs +++ b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs @@ -617,7 +617,7 @@ mod tests { use crate::physical_plan::expressions::col; use crate::physical_plan::file_format::{CsvExec, FileScanConfig}; use crate::physical_plan::memory::MemoryExec; - use crate::physical_plan::sorts::sort::SortExec; + use crate::physical_plan::sorts::sort2::SortExec2; use crate::physical_plan::{collect, common}; use crate::test::{self, assert_is_pending}; use crate::{assert_batches_eq, test_util}; @@ -881,7 +881,7 @@ mod tests { context: Arc, ) -> RecordBatch { let sort_exec = - Arc::new(SortExec::new_with_partitioning(sort.clone(), input, true)); + Arc::new(SortExec2::new_with_partitioning(sort.clone(), input, true)); sorted_merge(sort_exec, sort, context).await } @@ -891,7 +891,7 @@ mod tests { context: Arc, ) -> RecordBatch { let merge = Arc::new(CoalescePartitionsExec::new(src)); - let sort_exec = Arc::new(SortExec::try_new(sort, merge).unwrap()); + let sort_exec = Arc::new(SortExec2::try_new(sort, merge).unwrap()); let mut result = collect(sort_exec, context).await.unwrap(); assert_eq!(result.len(), 1); result.remove(0) diff --git a/datafusion/core/tests/sql/explain_analyze.rs b/datafusion/core/tests/sql/explain_analyze.rs index ef6ea52dd774..fe1094542bef 100644 --- a/datafusion/core/tests/sql/explain_analyze.rs +++ b/datafusion/core/tests/sql/explain_analyze.rs @@ -60,7 +60,7 @@ async fn explain_analyze_baseline_metrics() { ); assert_metrics!( &formatted, - "SortExec: [c1@0 ASC NULLS LAST]", + "SortExec2: [c1@0 ASC NULLS LAST]", "metrics=[output_rows=5, elapsed_compute=" ); assert_metrics!( @@ -108,7 +108,7 @@ async fn explain_analyze_baseline_metrics() { use datafusion::physical_plan; use datafusion::physical_plan::sorts; - plan.as_any().downcast_ref::().is_some() + plan.as_any().downcast_ref::().is_some() || plan.as_any().downcast_ref::().is_some() // CoalescePartitionsExec doesn't do any work so is not included || plan.as_any().downcast_ref::().is_some() @@ -648,7 +648,7 @@ async fn test_physical_plan_display_indent() { let physical_plan = ctx.create_physical_plan(&plan).await.unwrap(); let expected = vec![ "GlobalLimitExec: limit=10", - " SortExec: [the_min@2 DESC]", + " SortExec2: [the_min@2 DESC]", " CoalescePartitionsExec", " ProjectionExec: expr=[c1@0 as c1, MAX(aggregate_test_100.c12)@1 as MAX(aggregate_test_100.c12), MIN(aggregate_test_100.c12)@2 as the_min]", " HashAggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], aggr=[MAX(aggregate_test_100.c12), MIN(aggregate_test_100.c12)]", From 1e120dee8c1331dfbb7b2c62e64dad3cbcf3b90e Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Sat, 2 Apr 2022 12:52:08 +0800 Subject: [PATCH 05/15] Replace --- .../rust/core/src/serde/physical_plan/mod.rs | 10 +- ballista/rust/core/src/utils.rs | 6 +- ballista/rust/scheduler/src/planner.rs | 4 +- .../src/physical_optimizer/repartition.rs | 14 +- datafusion/core/src/physical_plan/planner.rs | 8 +- .../core/src/physical_plan/sorts/mod.rs | 1 - .../core/src/physical_plan/sorts/sort.rs | 261 +++- .../core/src/physical_plan/sorts/sort2.rs | 1147 ----------------- .../sorts/sort_preserving_merge.rs | 6 +- datafusion/core/tests/order_spill_fuzz.rs | 64 - datafusion/core/tests/sql/explain_analyze.rs | 6 +- 11 files changed, 258 insertions(+), 1269 deletions(-) delete mode 100644 datafusion/core/src/physical_plan/sorts/sort2.rs diff --git a/ballista/rust/core/src/serde/physical_plan/mod.rs b/ballista/rust/core/src/serde/physical_plan/mod.rs index ba2fd3cefa31..0361df677607 100644 --- a/ballista/rust/core/src/serde/physical_plan/mod.rs +++ b/ballista/rust/core/src/serde/physical_plan/mod.rs @@ -53,7 +53,7 @@ use datafusion::physical_plan::hash_join::{HashJoinExec, PartitionMode}; use datafusion::physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; use datafusion::physical_plan::projection::ProjectionExec; use datafusion::physical_plan::repartition::RepartitionExec; -use datafusion::physical_plan::sorts::sort2::SortExec2; +use datafusion::physical_plan::sorts::sort::SortExec; use datafusion::physical_plan::union::UnionExec; use datafusion::physical_plan::windows::{create_window_expr, WindowAggExec}; use datafusion::physical_plan::{ @@ -522,7 +522,7 @@ impl AsExecutionPlan for PhysicalPlanNode { } }) .collect::, _>>()?; - Ok(Arc::new(SortExec2::try_new(exprs, input)?)) + Ok(Arc::new(SortExec::try_new(exprs, input)?)) } PhysicalPlanType::Unresolved(unresolved_shuffle) => { let schema = Arc::new(convert_required!(unresolved_shuffle.schema)?); @@ -849,7 +849,7 @@ impl AsExecutionPlan for PhysicalPlanNode { }, ))), }) - } else if let Some(exec) = plan.downcast_ref::() { + } else if let Some(exec) = plan.downcast_ref::() { let input = protobuf::PhysicalPlanNode::try_from_physical_plan( exec.input().to_owned(), extension_codec, @@ -1032,7 +1032,7 @@ mod roundtrip_tests { hash_aggregate::{AggregateMode, HashAggregateExec}, hash_join::{HashJoinExec, PartitionMode}, limit::{GlobalLimitExec, LocalLimitExec}, - sorts::sort2::SortExec2, + sorts::sort::SortExec, AggregateExpr, ExecutionPlan, Partitioning, PhysicalExpr, Statistics, }, prelude::SessionContext, @@ -1193,7 +1193,7 @@ mod roundtrip_tests { }, }, ]; - roundtrip_test(Arc::new(SortExec2::try_new( + roundtrip_test(Arc::new(SortExec::try_new( sort_exprs, Arc::new(EmptyExec::new(false, schema)), )?)) diff --git a/ballista/rust/core/src/utils.rs b/ballista/rust/core/src/utils.rs index 1e66728f4b37..6670ab5cedd8 100644 --- a/ballista/rust/core/src/utils.rs +++ b/ballista/rust/core/src/utils.rs @@ -53,7 +53,7 @@ use datafusion::physical_plan::filter::FilterExec; use datafusion::physical_plan::hash_aggregate::HashAggregateExec; use datafusion::physical_plan::hash_join::HashJoinExec; use datafusion::physical_plan::projection::ProjectionExec; -use datafusion::physical_plan::sorts::sort2::SortExec2; +use datafusion::physical_plan::sorts::sort::SortExec; use datafusion::physical_plan::{metrics, ExecutionPlan, RecordBatchStream}; use futures::{Stream, StreamExt}; @@ -153,8 +153,8 @@ fn build_exec_plan_diagram( ) -> Result { let operator_str = if plan.as_any().downcast_ref::().is_some() { "HashAggregateExec" - } else if plan.as_any().downcast_ref::().is_some() { - "SortExec2" + } else if plan.as_any().downcast_ref::().is_some() { + "SortExec" } else if plan.as_any().downcast_ref::().is_some() { "ProjectionExec" } else if plan.as_any().downcast_ref::().is_some() { diff --git a/ballista/rust/scheduler/src/planner.rs b/ballista/rust/scheduler/src/planner.rs index c3d884b5acc1..d7ee22bbdf35 100644 --- a/ballista/rust/scheduler/src/planner.rs +++ b/ballista/rust/scheduler/src/planner.rs @@ -274,7 +274,7 @@ mod test { use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec; use datafusion::physical_plan::hash_aggregate::{AggregateMode, HashAggregateExec}; use datafusion::physical_plan::hash_join::HashJoinExec; - use datafusion::physical_plan::sorts::sort2::SortExec2; + use datafusion::physical_plan::sorts::sort::SortExec; use datafusion::physical_plan::{ coalesce_partitions::CoalescePartitionsExec, projection::ProjectionExec, }; @@ -361,7 +361,7 @@ mod test { // verify stage 2 let stage2 = stages[2].children()[0].clone(); - let sort = downcast_exec!(stage2, SortExec2); + let sort = downcast_exec!(stage2, SortExec); let coalesce_partitions = sort.children()[0].clone(); let coalesce_partitions = downcast_exec!(coalesce_partitions, CoalescePartitionsExec); diff --git a/datafusion/core/src/physical_optimizer/repartition.rs b/datafusion/core/src/physical_optimizer/repartition.rs index 1656a51ec828..d98fa4162fa2 100644 --- a/datafusion/core/src/physical_optimizer/repartition.rs +++ b/datafusion/core/src/physical_optimizer/repartition.rs @@ -245,7 +245,7 @@ mod tests { use crate::physical_plan::hash_aggregate::{AggregateMode, HashAggregateExec}; use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; use crate::physical_plan::projection::ProjectionExec; - use crate::physical_plan::sorts::sort2::SortExec2; + use crate::physical_plan::sorts::sort::SortExec; use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; use crate::physical_plan::union::UnionExec; use crate::physical_plan::{displayable, Statistics}; @@ -290,7 +290,7 @@ mod tests { expr: col("c1", &schema()).unwrap(), options: SortOptions::default(), }]; - Arc::new(SortExec2::try_new(sort_exprs, input).unwrap()) + Arc::new(SortExec::try_new(sort_exprs, input).unwrap()) } fn projection_exec(input: Arc) -> Arc { @@ -413,7 +413,7 @@ mod tests { "GlobalLimitExec: limit=100", "LocalLimitExec: limit=100", // data is sorted so can't repartition here - "SortExec2: [c1@0 ASC]", + "SortExec: [c1@0 ASC]", "ParquetExec: limit=None, partitions=[x], projection=[c1]", ]; @@ -431,7 +431,7 @@ mod tests { "FilterExec: c1@0", // data is sorted so can't repartition here even though // filter would benefit from parallelism, the answers might be wrong - "SortExec2: [c1@0 ASC]", + "SortExec: [c1@0 ASC]", "ParquetExec: limit=None, partitions=[x], projection=[c1]", ]; @@ -519,7 +519,7 @@ mod tests { let expected = &[ "SortPreservingMergeExec: [c1@0 ASC]", // Expect repartition on the input to the sort (as it can benefit from additional parallelism) - "SortExec2: [c1@0 ASC]", + "SortExec: [c1@0 ASC]", "ProjectionExec: expr=[c1@0 as c1]", "RepartitionExec: partitioning=RoundRobinBatch(10)", "ParquetExec: limit=None, partitions=[x], projection=[c1]", @@ -536,7 +536,7 @@ mod tests { let expected = &[ "SortPreservingMergeExec: [c1@0 ASC]", // Expect repartition on the input to the sort (as it can benefit from additional parallelism) - "SortExec2: [c1@0 ASC]", + "SortExec: [c1@0 ASC]", "FilterExec: c1@0", "RepartitionExec: partitioning=RoundRobinBatch(10)", "ParquetExec: limit=None, partitions=[x], projection=[c1]", @@ -555,7 +555,7 @@ mod tests { let expected = &[ "SortPreservingMergeExec: [c1@0 ASC]", // Expect repartition on the input to the sort (as it can benefit from additional parallelism) - "SortExec2: [c1@0 ASC]", + "SortExec: [c1@0 ASC]", "ProjectionExec: expr=[c1@0 as c1]", "FilterExec: c1@0", // repartition is lowest down diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs index 1c08b17014f2..7504dd4385e4 100644 --- a/datafusion/core/src/physical_plan/planner.rs +++ b/datafusion/core/src/physical_plan/planner.rs @@ -45,7 +45,7 @@ use crate::physical_plan::hash_join::HashJoinExec; use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; use crate::physical_plan::projection::ProjectionExec; use crate::physical_plan::repartition::RepartitionExec; -use crate::physical_plan::sorts::sort2::SortExec2; +use crate::physical_plan::sorts::sort::SortExec; use crate::physical_plan::udf; use crate::physical_plan::windows::WindowAggExec; use crate::physical_plan::{join_utils, Partitioning}; @@ -457,9 +457,9 @@ impl DefaultPhysicalPlanner { }) .collect::>>()?; Arc::new(if can_repartition { - SortExec2::new_with_partitioning(sort_keys, input_exec, true) + SortExec::new_with_partitioning(sort_keys, input_exec, true) } else { - SortExec2::try_new(sort_keys, input_exec)? + SortExec::try_new(sort_keys, input_exec)? }) }; @@ -704,7 +704,7 @@ impl DefaultPhysicalPlanner { )), }) .collect::>>()?; - Ok(Arc::new(SortExec2::try_new(sort_expr, physical_input)?) ) + Ok(Arc::new(SortExec::try_new(sort_expr, physical_input)?) ) } LogicalPlan::Join(Join { left, diff --git a/datafusion/core/src/physical_plan/sorts/mod.rs b/datafusion/core/src/physical_plan/sorts/mod.rs index bfd6b62ce1fa..818546f316fc 100644 --- a/datafusion/core/src/physical_plan/sorts/mod.rs +++ b/datafusion/core/src/physical_plan/sorts/mod.rs @@ -37,7 +37,6 @@ use std::sync::Arc; use std::task::{Context, Poll}; pub mod sort; -pub mod sort2; pub mod sort_preserving_merge; /// A `SortKeyCursor` is created from a `RecordBatch`, and a set of diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs index 67a6e5fec244..71ffa21ab56c 100644 --- a/datafusion/core/src/physical_plan/sorts/sort.rs +++ b/datafusion/core/src/physical_plan/sorts/sort.rs @@ -34,28 +34,30 @@ use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeStrea use crate::physical_plan::sorts::SortedStream; use crate::physical_plan::stream::RecordBatchReceiverStream; use crate::physical_plan::{ - common, DisplayFormatType, Distribution, EmptyRecordBatchStream, ExecutionPlan, - Partitioning, SendableRecordBatchStream, Statistics, + DisplayFormatType, Distribution, EmptyRecordBatchStream, ExecutionPlan, Partitioning, + RecordBatchStream, SendableRecordBatchStream, Statistics, }; use crate::prelude::SessionConfig; -use arrow::array::ArrayRef; +use arrow::array::{make_array, Array, ArrayRef, MutableArrayData, UInt32Array}; pub use arrow::compute::SortOptions; -use arrow::compute::{lexsort_to_indices, take, SortColumn, TakeOptions}; +use arrow::compute::{concat, lexsort_to_indices, take, SortColumn, TakeOptions}; use arrow::datatypes::SchemaRef; use arrow::error::Result as ArrowResult; use arrow::ipc::reader::FileReader; use arrow::record_batch::RecordBatch; use async_trait::async_trait; use futures::lock::Mutex; -use futures::StreamExt; +use futures::{Stream, StreamExt}; use log::{debug, error}; use std::any::Any; +use std::cmp::min; use std::fmt; use std::fmt::{Debug, Formatter}; use std::fs::File; use std::io::BufReader; use std::path::{Path, PathBuf}; use std::sync::Arc; +use std::task::{Context, Poll}; use tempfile::NamedTempFile; use tokio::sync::mpsc::{Receiver, Sender}; use tokio::task; @@ -105,13 +107,21 @@ impl ExternalSorter { } } - async fn insert_batch(&self, input: RecordBatch) -> Result<()> { + async fn insert_batch( + &self, + input: RecordBatch, + tracking_metrics: &MemTrackingMetrics, + ) -> Result<()> { if input.num_rows() > 0 { let size = batch_byte_size(&input); self.try_grow(size).await?; self.metrics.mem_used().add(size); let mut in_mem_batches = self.in_mem_batches.lock().await; - in_mem_batches.push(input); + // NB timer records time taken on drop, so there are no + // calls to `timer.done()` below. + let _timer = tracking_metrics.elapsed_compute().timer(); + let partial = sort_batch(input, self.schema.clone(), &self.expr)?; + in_mem_batches.push(partial); } Ok(()) } @@ -124,6 +134,7 @@ impl ExternalSorter { /// MergeSort in mem batches as well as spills into total order with `SortPreservingMergeStream`. async fn sort(&self) -> Result { let partition = self.partition_id(); + let batch_size = self.session_config.batch_size; let mut in_mem_batches = self.in_mem_batches.lock().await; if self.spilled_before().await { @@ -136,6 +147,7 @@ impl ExternalSorter { &mut *in_mem_batches, self.schema.clone(), &self.expr, + batch_size, tracking_metrics, )?; let prev_used = self.metrics.mem_used().set(0); @@ -166,6 +178,7 @@ impl ExternalSorter { &mut *in_mem_batches, self.schema.clone(), &self.expr, + batch_size, tracking_metrics, ); // Report to the memory manager we are no longer using memory @@ -191,7 +204,7 @@ impl ExternalSorter { impl Debug for ExternalSorter { fn fmt(&self, f: &mut Formatter) -> fmt::Result { - f.debug_struct("ExternalSorter") + f.debug_struct("ExternalSorter2") .field("id", &self.id()) .field("memory_used", &self.used()) .field("spilled_bytes", &self.spilled_bytes()) @@ -200,6 +213,12 @@ impl Debug for ExternalSorter { } } +#[derive(Debug, Copy, Clone)] +struct CombinedIndex { + batch_idx: usize, + row_idx: usize, +} + impl Drop for ExternalSorter { fn drop(&mut self) { self.runtime.drop_consumer(self.id(), self.used()); @@ -209,7 +228,7 @@ impl Drop for ExternalSorter { #[async_trait] impl MemoryConsumer for ExternalSorter { fn name(&self) -> String { - "ExternalSorter".to_owned() + "ExternalSorter2".to_owned() } fn id(&self) -> &MemoryConsumerId { @@ -249,6 +268,7 @@ impl MemoryConsumer for ExternalSorter { &mut *in_mem_batches, self.schema.clone(), &*self.expr, + self.session_config.batch_size, tracking_metrics, ); @@ -271,33 +291,212 @@ fn in_mem_partial_sort( buffered_batches: &mut Vec, schema: SchemaRef, expressions: &[PhysicalSortExpr], + batch_size: usize, tracking_metrics: MemTrackingMetrics, ) -> Result { assert_ne!(buffered_batches.len(), 0); + if buffered_batches.len() == 1 { + let result = buffered_batches.pop(); + Ok(Box::pin(SizedRecordBatchStream::new( + schema, + vec![Arc::new(result.unwrap())], + tracking_metrics, + ))) + } else { + let batches = buffered_batches.drain(..).collect::>(); + let sorted_iter = { + // NB timer records time taken on drop, so there are no + // calls to `timer.done()` below. + let _timer = tracking_metrics.elapsed_compute().timer(); + get_sorted_iter(&batches, expressions, batch_size)? + }; + Ok(Box::pin(SortedSizedRecordBatchStream::new( + schema, + batches, + sorted_iter, + tracking_metrics, + ))) + } +} - let result = { - // NB timer records time taken on drop, so there are no - // calls to `timer.done()` below. - let _timer = tracking_metrics.elapsed_compute().timer(); +fn get_sorted_iter( + batches: &[RecordBatch], + expr: &[PhysicalSortExpr], + batch_size: usize, +) -> Result { + let (batch_grouped, combined): (Vec>, Vec>) = + batches + .iter() + .enumerate() + .map(|(i, batch)| { + let col: Vec = expr + .iter() + .map(|e| Ok(e.evaluate_to_sort_column(batch)?.values)) + .collect::>>()?; + + let combined_index = (0..batch.num_rows()) + .map(|r| CombinedIndex { + batch_idx: i, + row_idx: r, + }) + .collect::>(); + + Ok((col, combined_index)) + }) + .collect::, Vec)>>>()? + .into_iter() + .unzip(); + + let column_grouped = transpose(batch_grouped); + + let sort_columns: Vec = column_grouped + .iter() + .zip(expr.iter()) + .map(|(c, e)| { + Ok(SortColumn { + values: concat( + &*c.iter().map(|i| i.as_ref()).collect::>(), + )?, + options: Some(e.options), + }) + }) + .collect::>>() + .into_iter() + .collect::>>()?; + + let indices = lexsort_to_indices(&sort_columns, None)?; + let combined = combined + .into_iter() + .flatten() + .collect::>(); + Ok(SortedIterator::new(indices, combined, batch_size)) +} - let pre_sort = if buffered_batches.len() == 1 { - buffered_batches.pop() - } else { - let batches = buffered_batches.drain(..).collect::>(); - // combine all record batches into one for each column - common::combine_batches(&batches, schema.clone())? - }; +struct SortedIterator { + pos: usize, + indices: UInt32Array, + combined: Vec, + batch_size: usize, + length: usize, +} - pre_sort - .map(|batch| sort_batch(batch, schema.clone(), expressions)) - .transpose()? - }; +impl SortedIterator { + fn new( + indices: UInt32Array, + combined: Vec, + batch_size: usize, + ) -> Self { + let length = combined.len(); + Self { + pos: 0, + indices, + combined, + batch_size, + length, + } + } +} - Ok(Box::pin(SizedRecordBatchStream::new( - schema, - vec![Arc::new(result.unwrap())], - tracking_metrics, - ))) +impl Iterator for SortedIterator { + type Item = Vec; + + fn next(&mut self) -> Option { + if self.pos >= self.length { + return None; + } + + let current_size = min(self.batch_size, self.length - self.pos); + let mut result = Vec::with_capacity(current_size); + for i in 0..current_size { + let p = self.pos + i; + let c_index = self.indices.value(p) as usize; + result.push(self.combined[c_index]) + } + self.pos += current_size; + Some(result) + } +} + +/// Stream of sorted record batches +struct SortedSizedRecordBatchStream { + schema: SchemaRef, + batches: Vec, + sorted_iter: SortedIterator, + num_cols: usize, + metrics: MemTrackingMetrics, +} + +impl SortedSizedRecordBatchStream { + /// new + pub fn new( + schema: SchemaRef, + batches: Vec, + sorted_iter: SortedIterator, + metrics: MemTrackingMetrics, + ) -> Self { + let size = batches.iter().map(batch_byte_size).sum::(); + metrics.init_mem_used(size); + let num_cols = batches[0].num_columns(); + SortedSizedRecordBatchStream { + schema, + batches, + sorted_iter, + num_cols, + metrics, + } + } +} + +impl Stream for SortedSizedRecordBatchStream { + type Item = ArrowResult; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + _: &mut Context<'_>, + ) -> Poll> { + match self.sorted_iter.next() { + None => Poll::Ready(None), + Some(combined) => { + let mut output = Vec::with_capacity(self.num_cols); + for i in 0..self.num_cols { + let arrays = self + .batches + .iter() + .map(|b| b.column(i).data()) + .collect::>(); + let mut mutable = + MutableArrayData::new(arrays, false, combined.len()); + for x in combined.iter() { + mutable.extend(x.batch_idx, x.row_idx, x.row_idx + 1); + } + output.push(make_array(mutable.freeze())) + } + let batch = RecordBatch::try_new(self.schema.clone(), output); + let poll = Poll::Ready(Some(batch)); + self.metrics.record_poll(poll) + } + } + } +} + +impl RecordBatchStream for SortedSizedRecordBatchStream { + fn schema(&self) -> SchemaRef { + self.schema.clone() + } +} + +fn transpose(v: Vec>) -> Vec> { + assert!(!v.is_empty()); + let len = v[0].len(); + let mut iters: Vec<_> = v.into_iter().map(|n| n.into_iter()).collect(); + (0..len) + .map(|_| { + iters + .iter_mut() + .map(|n| n.next().unwrap()) + .collect::>() + }) + .collect() } async fn spill_partial_sorted_stream( @@ -576,6 +775,8 @@ async fn do_sort( context: Arc, ) -> Result { let schema = input.schema(); + let tracking_metrics = + metrics_set.new_intermediate_tracking(partition_id, context.runtime_env()); let sorter = ExternalSorter::new( partition_id, schema.clone(), @@ -587,7 +788,7 @@ async fn do_sort( context.runtime_env().register_requester(sorter.id()); while let Some(batch) = input.next().await { let batch = batch?; - sorter.insert_batch(batch).await?; + sorter.insert_batch(batch, &tracking_metrics).await?; } sorter.sort().await } diff --git a/datafusion/core/src/physical_plan/sorts/sort2.rs b/datafusion/core/src/physical_plan/sorts/sort2.rs deleted file mode 100644 index 67dca3508b3f..000000000000 --- a/datafusion/core/src/physical_plan/sorts/sort2.rs +++ /dev/null @@ -1,1147 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! Sort that deals with an arbitrary size of the input. -//! It will do in-memory sorting if it has enough memory budget -//! but spills to disk if needed. - -use crate::error::{DataFusionError, Result}; -use crate::execution::context::TaskContext; -use crate::execution::memory_manager::{ - human_readable_size, ConsumerType, MemoryConsumer, MemoryConsumerId, MemoryManager, -}; -use crate::execution::runtime_env::RuntimeEnv; -use crate::physical_plan::common::{batch_byte_size, IPCWriter, SizedRecordBatchStream}; -use crate::physical_plan::expressions::PhysicalSortExpr; -use crate::physical_plan::metrics::{ - BaselineMetrics, CompositeMetricsSet, MemTrackingMetrics, MetricsSet, -}; -use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeStream; -use crate::physical_plan::sorts::SortedStream; -use crate::physical_plan::stream::RecordBatchReceiverStream; -use crate::physical_plan::{ - DisplayFormatType, Distribution, EmptyRecordBatchStream, ExecutionPlan, Partitioning, - RecordBatchStream, SendableRecordBatchStream, Statistics, -}; -use crate::prelude::SessionConfig; -use arrow::array::{make_array, Array, ArrayRef, MutableArrayData, UInt32Array}; -pub use arrow::compute::SortOptions; -use arrow::compute::{concat, lexsort_to_indices, take, SortColumn, TakeOptions}; -use arrow::datatypes::SchemaRef; -use arrow::error::Result as ArrowResult; -use arrow::ipc::reader::FileReader; -use arrow::record_batch::RecordBatch; -use async_trait::async_trait; -use futures::lock::Mutex; -use futures::{Stream, StreamExt}; -use log::{debug, error}; -use std::any::Any; -use std::cmp::min; -use std::fmt; -use std::fmt::{Debug, Formatter}; -use std::fs::File; -use std::io::BufReader; -use std::path::{Path, PathBuf}; -use std::sync::Arc; -use std::task::{Context, Poll}; -use tempfile::NamedTempFile; -use tokio::sync::mpsc::{Receiver, Sender}; -use tokio::task; - -/// Sort arbitrary size of data to get a total order (may spill several times during sorting based on free memory available). -/// -/// The basic architecture of the algorithm: -/// 1. get a non-empty new batch from input -/// 2. check with the memory manager if we could buffer the batch in memory -/// 2.1 if memory sufficient, then buffer batch in memory, go to 1. -/// 2.2 if the memory threshold is reached, sort all buffered batches and spill to file. -/// buffer the batch in memory, go to 1. -/// 3. when input is exhausted, merge all in memory batches and spills to get a total order. -struct ExternalSorter2 { - id: MemoryConsumerId, - schema: SchemaRef, - in_mem_batches: Mutex>, - spills: Mutex>, - /// Sort expressions - expr: Vec, - session_config: Arc, - runtime: Arc, - metrics_set: CompositeMetricsSet, - metrics: BaselineMetrics, -} - -impl ExternalSorter2 { - pub fn new( - partition_id: usize, - schema: SchemaRef, - expr: Vec, - metrics_set: CompositeMetricsSet, - session_config: Arc, - runtime: Arc, - ) -> Self { - let metrics = metrics_set.new_intermediate_baseline(partition_id); - Self { - id: MemoryConsumerId::new(partition_id), - schema, - in_mem_batches: Mutex::new(vec![]), - spills: Mutex::new(vec![]), - expr, - session_config, - runtime, - metrics_set, - metrics, - } - } - - async fn insert_batch( - &self, - input: RecordBatch, - tracking_metrics: &MemTrackingMetrics, - ) -> Result<()> { - if input.num_rows() > 0 { - let size = batch_byte_size(&input); - self.try_grow(size).await?; - self.metrics.mem_used().add(size); - let mut in_mem_batches = self.in_mem_batches.lock().await; - // NB timer records time taken on drop, so there are no - // calls to `timer.done()` below. - let _timer = tracking_metrics.elapsed_compute().timer(); - let partial = sort_batch(input, self.schema.clone(), &self.expr)?; - in_mem_batches.push(partial); - } - Ok(()) - } - - async fn spilled_before(&self) -> bool { - let spills = self.spills.lock().await; - !spills.is_empty() - } - - /// MergeSort in mem batches as well as spills into total order with `SortPreservingMergeStream`. - async fn sort(&self) -> Result { - let partition = self.partition_id(); - let batch_size = self.session_config.batch_size; - let mut in_mem_batches = self.in_mem_batches.lock().await; - - if self.spilled_before().await { - let tracking_metrics = self - .metrics_set - .new_intermediate_tracking(partition, self.runtime.clone()); - let mut streams: Vec = vec![]; - if in_mem_batches.len() > 0 { - let in_mem_stream = in_mem_partial_sort( - &mut *in_mem_batches, - self.schema.clone(), - &self.expr, - batch_size, - tracking_metrics, - )?; - let prev_used = self.metrics.mem_used().set(0); - streams.push(SortedStream::new(in_mem_stream, prev_used)); - } - - let mut spills = self.spills.lock().await; - - for spill in spills.drain(..) { - let stream = read_spill_as_stream(spill, self.schema.clone())?; - streams.push(SortedStream::new(stream, 0)); - } - let tracking_metrics = self - .metrics_set - .new_final_tracking(partition, self.runtime.clone()); - Ok(Box::pin(SortPreservingMergeStream::new_from_streams( - streams, - self.schema.clone(), - &self.expr, - tracking_metrics, - self.session_config.batch_size, - ))) - } else if in_mem_batches.len() > 0 { - let tracking_metrics = self - .metrics_set - .new_final_tracking(partition, self.runtime.clone()); - let result = in_mem_partial_sort( - &mut *in_mem_batches, - self.schema.clone(), - &self.expr, - batch_size, - tracking_metrics, - ); - // Report to the memory manager we are no longer using memory - self.metrics.mem_used().set(0); - result - } else { - Ok(Box::pin(EmptyRecordBatchStream::new(self.schema.clone()))) - } - } - - fn used(&self) -> usize { - self.metrics.mem_used().value() - } - - fn spilled_bytes(&self) -> usize { - self.metrics.spilled_bytes().value() - } - - fn spill_count(&self) -> usize { - self.metrics.spill_count().value() - } -} - -impl Debug for ExternalSorter2 { - fn fmt(&self, f: &mut Formatter) -> fmt::Result { - f.debug_struct("ExternalSorter2") - .field("id", &self.id()) - .field("memory_used", &self.used()) - .field("spilled_bytes", &self.spilled_bytes()) - .field("spill_count", &self.spill_count()) - .finish() - } -} - -#[derive(Debug, Copy, Clone)] -struct CombinedIndex { - batch_idx: usize, - row_idx: usize, -} - -impl Drop for ExternalSorter2 { - fn drop(&mut self) { - self.runtime.drop_consumer(self.id(), self.used()); - } -} - -#[async_trait] -impl MemoryConsumer for ExternalSorter2 { - fn name(&self) -> String { - "ExternalSorter2".to_owned() - } - - fn id(&self) -> &MemoryConsumerId { - &self.id - } - - fn memory_manager(&self) -> Arc { - self.runtime.memory_manager.clone() - } - - fn type_(&self) -> &ConsumerType { - &ConsumerType::Requesting - } - - async fn spill(&self) -> Result { - debug!( - "{}[{}] spilling sort data of {} to disk while inserting ({} time(s) so far)", - self.name(), - self.id(), - self.used(), - self.spill_count() - ); - - let partition = self.partition_id(); - let mut in_mem_batches = self.in_mem_batches.lock().await; - // we could always get a chance to free some memory as long as we are holding some - if in_mem_batches.len() == 0 { - return Ok(0); - } - - let tracking_metrics = self - .metrics_set - .new_intermediate_tracking(partition, self.runtime.clone()); - - let spillfile = self.runtime.disk_manager.create_tmp_file()?; - let stream = in_mem_partial_sort( - &mut *in_mem_batches, - self.schema.clone(), - &*self.expr, - self.session_config.batch_size, - tracking_metrics, - ); - - spill_partial_sorted_stream(&mut stream?, spillfile.path(), self.schema.clone()) - .await?; - let mut spills = self.spills.lock().await; - let used = self.metrics.mem_used().set(0); - self.metrics.record_spill(used); - spills.push(spillfile); - Ok(used) - } - - fn mem_used(&self) -> usize { - self.metrics.mem_used().value() - } -} - -/// consume the non-empty `sorted_bathes` and do in_mem_sort -fn in_mem_partial_sort( - buffered_batches: &mut Vec, - schema: SchemaRef, - expressions: &[PhysicalSortExpr], - batch_size: usize, - tracking_metrics: MemTrackingMetrics, -) -> Result { - assert_ne!(buffered_batches.len(), 0); - if buffered_batches.len() == 1 { - let result = buffered_batches.pop(); - Ok(Box::pin(SizedRecordBatchStream::new( - schema, - vec![Arc::new(result.unwrap())], - tracking_metrics, - ))) - } else { - let batches = buffered_batches.drain(..).collect::>(); - let sorted_iter = { - // NB timer records time taken on drop, so there are no - // calls to `timer.done()` below. - let _timer = tracking_metrics.elapsed_compute().timer(); - get_sorted_iter(&batches, expressions, batch_size)? - }; - Ok(Box::pin(SortedSizedRecordBatchStream::new( - schema, - batches, - sorted_iter, - tracking_metrics, - ))) - } -} - -fn get_sorted_iter( - batches: &[RecordBatch], - expr: &[PhysicalSortExpr], - batch_size: usize, -) -> Result { - let (batch_grouped, combined): (Vec>, Vec>) = - batches - .iter() - .enumerate() - .map(|(i, batch)| { - let col: Vec = expr - .iter() - .map(|e| Ok(e.evaluate_to_sort_column(batch)?.values)) - .collect::>>()?; - - let combined_index = (0..batch.num_rows()) - .map(|r| CombinedIndex { - batch_idx: i, - row_idx: r, - }) - .collect::>(); - - Ok((col, combined_index)) - }) - .collect::, Vec)>>>()? - .into_iter() - .unzip(); - - let column_grouped = transpose(batch_grouped); - - let sort_columns: Vec = column_grouped - .iter() - .zip(expr.iter()) - .map(|(c, e)| { - Ok(SortColumn { - values: concat( - &*c.iter().map(|i| i.as_ref()).collect::>(), - )?, - options: Some(e.options), - }) - }) - .collect::>>() - .into_iter() - .collect::>>()?; - - let indices = lexsort_to_indices(&sort_columns, None)?; - let combined = combined - .into_iter() - .flatten() - .collect::>(); - Ok(SortedIterator::new(indices, combined, batch_size)) -} - -struct SortedIterator { - pos: usize, - indices: UInt32Array, - combined: Vec, - batch_size: usize, - length: usize, -} - -impl SortedIterator { - fn new( - indices: UInt32Array, - combined: Vec, - batch_size: usize, - ) -> Self { - let length = combined.len(); - Self { - pos: 0, - indices, - combined, - batch_size, - length, - } - } -} - -impl Iterator for SortedIterator { - type Item = Vec; - - fn next(&mut self) -> Option { - if self.pos >= self.length { - return None; - } - - let current_size = min(self.batch_size, self.length - self.pos); - let mut result = Vec::with_capacity(current_size); - for i in 0..current_size { - let p = self.pos + i; - let c_index = self.indices.value(p) as usize; - result.push(self.combined[c_index]) - } - self.pos += current_size; - Some(result) - } -} - -/// Stream of sorted record batches -struct SortedSizedRecordBatchStream { - schema: SchemaRef, - batches: Vec, - sorted_iter: SortedIterator, - num_cols: usize, - metrics: MemTrackingMetrics, -} - -impl SortedSizedRecordBatchStream { - /// new - pub fn new( - schema: SchemaRef, - batches: Vec, - sorted_iter: SortedIterator, - metrics: MemTrackingMetrics, - ) -> Self { - let size = batches.iter().map(|b| batch_byte_size(b)).sum::(); - metrics.init_mem_used(size); - let num_cols = batches[0].num_columns(); - SortedSizedRecordBatchStream { - schema, - batches, - sorted_iter, - num_cols, - metrics, - } - } -} - -impl Stream for SortedSizedRecordBatchStream { - type Item = ArrowResult; - - fn poll_next( - mut self: std::pin::Pin<&mut Self>, - _: &mut Context<'_>, - ) -> Poll> { - match self.sorted_iter.next() { - None => Poll::Ready(None), - Some(combined) => { - let mut output = Vec::with_capacity(self.num_cols); - for i in 0..self.num_cols { - let arrays = self - .batches - .iter() - .map(|b| b.column(i).data()) - .collect::>(); - let mut mutable = - MutableArrayData::new(arrays, false, combined.len()); - for x in combined.iter() { - mutable.extend(x.batch_idx, x.row_idx, x.row_idx + 1); - } - output.push(make_array(mutable.freeze())) - } - let batch = RecordBatch::try_new(self.schema.clone(), output); - let poll = Poll::Ready(Some(batch)); - self.metrics.record_poll(poll) - } - } - } -} - -impl RecordBatchStream for SortedSizedRecordBatchStream { - fn schema(&self) -> SchemaRef { - self.schema.clone() - } -} - -fn transpose(v: Vec>) -> Vec> { - assert!(!v.is_empty()); - let len = v[0].len(); - let mut iters: Vec<_> = v.into_iter().map(|n| n.into_iter()).collect(); - (0..len) - .map(|_| { - iters - .iter_mut() - .map(|n| n.next().unwrap()) - .collect::>() - }) - .collect() -} - -async fn spill_partial_sorted_stream( - in_mem_stream: &mut SendableRecordBatchStream, - path: &Path, - schema: SchemaRef, -) -> Result<()> { - let (sender, receiver) = tokio::sync::mpsc::channel(2); - let path: PathBuf = path.into(); - let handle = task::spawn_blocking(move || write_sorted(receiver, path, schema)); - while let Some(item) = in_mem_stream.next().await { - sender.send(item).await.ok(); - } - drop(sender); - match handle.await { - Ok(r) => r, - Err(e) => Err(DataFusionError::Execution(format!( - "Error occurred while spilling {}", - e - ))), - } -} - -fn read_spill_as_stream( - path: NamedTempFile, - schema: SchemaRef, -) -> Result { - let (sender, receiver): ( - Sender>, - Receiver>, - ) = tokio::sync::mpsc::channel(2); - let join_handle = task::spawn_blocking(move || { - if let Err(e) = read_spill(sender, path.path()) { - error!("Failure while reading spill file: {:?}. Error: {}", path, e); - } - }); - Ok(RecordBatchReceiverStream::create( - &schema, - receiver, - join_handle, - )) -} - -fn write_sorted( - mut receiver: Receiver>, - path: PathBuf, - schema: SchemaRef, -) -> Result<()> { - let mut writer = IPCWriter::new(path.as_ref(), schema.as_ref())?; - while let Some(batch) = receiver.blocking_recv() { - writer.write(&batch?)?; - } - writer.finish()?; - debug!( - "Spilled {} batches of total {} rows to disk, memory released {}", - writer.num_batches, - writer.num_rows, - human_readable_size(writer.num_bytes as usize), - ); - Ok(()) -} - -fn read_spill(sender: Sender>, path: &Path) -> Result<()> { - let file = BufReader::new(File::open(&path)?); - let reader = FileReader::try_new(file, None)?; - for batch in reader { - sender - .blocking_send(batch) - .map_err(|e| DataFusionError::Execution(format!("{}", e)))?; - } - Ok(()) -} - -/// External Sort execution plan -#[derive(Debug)] -pub struct SortExec2 { - /// Input schema - input: Arc, - /// Sort expressions - expr: Vec, - /// Containing all metrics set created during sort - metrics_set: CompositeMetricsSet, - /// Preserve partitions of input plan - preserve_partitioning: bool, -} - -impl SortExec2 { - /// Create a new sort execution plan - pub fn try_new( - expr: Vec, - input: Arc, - ) -> Result { - Ok(Self::new_with_partitioning(expr, input, false)) - } - - /// Create a new sort execution plan with the option to preserve - /// the partitioning of the input plan - pub fn new_with_partitioning( - expr: Vec, - input: Arc, - preserve_partitioning: bool, - ) -> Self { - Self { - expr, - input, - metrics_set: CompositeMetricsSet::new(), - preserve_partitioning, - } - } - - /// Input schema - pub fn input(&self) -> &Arc { - &self.input - } - - /// Sort expressions - pub fn expr(&self) -> &[PhysicalSortExpr] { - &self.expr - } -} - -#[async_trait] -impl ExecutionPlan for SortExec2 { - fn as_any(&self) -> &dyn Any { - self - } - - fn schema(&self) -> SchemaRef { - self.input.schema() - } - - /// Get the output partitioning of this plan - fn output_partitioning(&self) -> Partitioning { - if self.preserve_partitioning { - self.input.output_partitioning() - } else { - Partitioning::UnknownPartitioning(1) - } - } - - fn required_child_distribution(&self) -> Distribution { - if self.preserve_partitioning { - Distribution::UnspecifiedDistribution - } else { - Distribution::SinglePartition - } - } - - fn children(&self) -> Vec> { - vec![self.input.clone()] - } - - fn relies_on_input_order(&self) -> bool { - // this operator resorts everything - false - } - - fn benefits_from_input_partitioning(&self) -> bool { - false - } - - fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { - Some(&self.expr) - } - - fn with_new_children( - &self, - children: Vec>, - ) -> Result> { - match children.len() { - 1 => Ok(Arc::new(SortExec2::try_new( - self.expr.clone(), - children[0].clone(), - )?)), - _ => Err(DataFusionError::Internal( - "SortExec2 wrong number of children".to_string(), - )), - } - } - - async fn execute( - &self, - partition: usize, - context: Arc, - ) -> Result { - if !self.preserve_partitioning { - if 0 != partition { - return Err(DataFusionError::Internal(format!( - "SortExec2 invalid partition {}", - partition - ))); - } - - // sort needs to operate on a single partition currently - if 1 != self.input.output_partitioning().partition_count() { - return Err(DataFusionError::Internal( - "SortExec2 requires a single input partition".to_owned(), - )); - } - } - - let input = self.input.execute(partition, context.clone()).await?; - - do_sort( - input, - partition, - self.expr.clone(), - self.metrics_set.clone(), - context, - ) - .await - } - - fn metrics(&self) -> Option { - Some(self.metrics_set.aggregate_all()) - } - - fn fmt_as( - &self, - t: DisplayFormatType, - f: &mut std::fmt::Formatter, - ) -> std::fmt::Result { - match t { - DisplayFormatType::Default => { - let expr: Vec = self.expr.iter().map(|e| e.to_string()).collect(); - write!(f, "SortExec2: [{}]", expr.join(",")) - } - } - } - - fn statistics(&self) -> Statistics { - self.input.statistics() - } -} - -fn sort_batch( - batch: RecordBatch, - schema: SchemaRef, - expr: &[PhysicalSortExpr], -) -> ArrowResult { - // TODO: pushup the limit expression to sort - let indices = lexsort_to_indices( - &expr - .iter() - .map(|e| e.evaluate_to_sort_column(&batch)) - .collect::>>()?, - None, - )?; - - // reorder all rows based on sorted indices - RecordBatch::try_new( - schema, - batch - .columns() - .iter() - .map(|column| { - take( - column.as_ref(), - &indices, - // disable bound check overhead since indices are already generated from - // the same record batch - Some(TakeOptions { - check_bounds: false, - }), - ) - }) - .collect::>>()?, - ) -} - -async fn do_sort( - mut input: SendableRecordBatchStream, - partition_id: usize, - expr: Vec, - metrics_set: CompositeMetricsSet, - context: Arc, -) -> Result { - let schema = input.schema(); - let tracking_metrics = metrics_set - .new_intermediate_tracking(partition_id, context.runtime_env().clone()); - let sorter = ExternalSorter2::new( - partition_id, - schema.clone(), - expr, - metrics_set, - Arc::new(context.session_config()), - context.runtime_env(), - ); - context.runtime_env().register_requester(sorter.id()); - while let Some(batch) = input.next().await { - let batch = batch?; - sorter.insert_batch(batch, &tracking_metrics).await?; - } - sorter.sort().await -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::datafusion_data_access::object_store::local::LocalFileSystem; - use crate::execution::context::SessionConfig; - use crate::execution::runtime_env::RuntimeConfig; - use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec; - use crate::physical_plan::expressions::col; - use crate::physical_plan::memory::MemoryExec; - use crate::physical_plan::{ - collect, - file_format::{CsvExec, FileScanConfig}, - }; - use crate::prelude::SessionContext; - use crate::test; - use crate::test::assert_is_pending; - use crate::test::exec::{assert_strong_count_converges_to_zero, BlockingExec}; - use crate::test_util; - use arrow::array::*; - use arrow::compute::SortOptions; - use arrow::datatypes::*; - use futures::FutureExt; - use std::collections::{BTreeMap, HashMap}; - - #[tokio::test] - async fn test_in_mem_sort() -> Result<()> { - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); - let schema = test_util::aggr_test_schema(); - let partitions = 4; - let (_, files) = - test::create_partitioned_csv("aggregate_test_100.csv", partitions)?; - - let csv = CsvExec::new( - FileScanConfig { - object_store: Arc::new(LocalFileSystem {}), - file_schema: Arc::clone(&schema), - file_groups: files, - statistics: Statistics::default(), - projection: None, - limit: None, - table_partition_cols: vec![], - }, - true, - b',', - ); - - let sort_exec = Arc::new(SortExec2::try_new( - vec![ - // c1 string column - PhysicalSortExpr { - expr: col("c1", &schema)?, - options: SortOptions::default(), - }, - // c2 uin32 column - PhysicalSortExpr { - expr: col("c2", &schema)?, - options: SortOptions::default(), - }, - // c7 uin8 column - PhysicalSortExpr { - expr: col("c7", &schema)?, - options: SortOptions::default(), - }, - ], - Arc::new(CoalescePartitionsExec::new(Arc::new(csv))), - )?); - - let result = collect(sort_exec, task_ctx).await?; - - assert_eq!(result.len(), 1); - - let columns = result[0].columns(); - - let c1 = as_string_array(&columns[0]); - assert_eq!(c1.value(0), "a"); - assert_eq!(c1.value(c1.len() - 1), "e"); - - let c2 = as_primitive_array::(&columns[1]); - assert_eq!(c2.value(0), 1); - assert_eq!(c2.value(c2.len() - 1), 5,); - - let c7 = as_primitive_array::(&columns[6]); - assert_eq!(c7.value(0), 15); - assert_eq!(c7.value(c7.len() - 1), 254,); - - Ok(()) - } - - #[tokio::test] - async fn test_sort_spill() -> Result<()> { - // trigger spill there will be 4 batches with 5.5KB for each - let config = RuntimeConfig::new().with_memory_limit(12288, 1.0); - let runtime = Arc::new(RuntimeEnv::new(config)?); - let session_ctx = SessionContext::with_config_rt(SessionConfig::new(), runtime); - - let schema = test_util::aggr_test_schema(); - let partitions = 4; - let (_, files) = - test::create_partitioned_csv("aggregate_test_100.csv", partitions)?; - - let csv = CsvExec::new( - FileScanConfig { - object_store: Arc::new(LocalFileSystem {}), - file_schema: Arc::clone(&schema), - file_groups: files, - statistics: Statistics::default(), - projection: None, - limit: None, - table_partition_cols: vec![], - }, - true, - b',', - ); - - let sort_exec = Arc::new(SortExec2::try_new( - vec![ - // c1 string column - PhysicalSortExpr { - expr: col("c1", &schema)?, - options: SortOptions::default(), - }, - // c2 uin32 column - PhysicalSortExpr { - expr: col("c2", &schema)?, - options: SortOptions::default(), - }, - // c7 uin8 column - PhysicalSortExpr { - expr: col("c7", &schema)?, - options: SortOptions::default(), - }, - ], - Arc::new(CoalescePartitionsExec::new(Arc::new(csv))), - )?); - - let task_ctx = session_ctx.task_ctx(); - let result = collect(sort_exec.clone(), task_ctx).await?; - - assert_eq!(result.len(), 1); - - // Now, validate metrics - let metrics = sort_exec.metrics().unwrap(); - - assert_eq!(metrics.output_rows().unwrap(), 100); - assert!(metrics.elapsed_compute().unwrap() > 0); - assert!(metrics.spill_count().unwrap() > 0); - assert!(metrics.spilled_bytes().unwrap() > 0); - - let columns = result[0].columns(); - - let c1 = as_string_array(&columns[0]); - assert_eq!(c1.value(0), "a"); - assert_eq!(c1.value(c1.len() - 1), "e"); - - let c2 = as_primitive_array::(&columns[1]); - assert_eq!(c2.value(0), 1); - assert_eq!(c2.value(c2.len() - 1), 5,); - - let c7 = as_primitive_array::(&columns[6]); - assert_eq!(c7.value(0), 15); - assert_eq!(c7.value(c7.len() - 1), 254,); - - Ok(()) - } - - #[tokio::test] - async fn test_sort_metadata() -> Result<()> { - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); - let field_metadata: BTreeMap = - vec![("foo".to_string(), "bar".to_string())] - .into_iter() - .collect(); - let schema_metadata: HashMap = - vec![("baz".to_string(), "barf".to_string())] - .into_iter() - .collect(); - - let mut field = Field::new("field_name", DataType::UInt64, true); - field.set_metadata(Some(field_metadata.clone())); - let schema = Schema::new_with_metadata(vec![field], schema_metadata.clone()); - let schema = Arc::new(schema); - - let data: ArrayRef = - Arc::new(vec![3, 2, 1].into_iter().map(Some).collect::()); - - let batch = RecordBatch::try_new(schema.clone(), vec![data]).unwrap(); - let input = - Arc::new(MemoryExec::try_new(&[vec![batch]], schema.clone(), None).unwrap()); - - let sort_exec = Arc::new(SortExec2::try_new( - vec![PhysicalSortExpr { - expr: col("field_name", &schema)?, - options: SortOptions::default(), - }], - input, - )?); - - let result: Vec = collect(sort_exec, task_ctx).await?; - - let expected_data: ArrayRef = - Arc::new(vec![1, 2, 3].into_iter().map(Some).collect::()); - let expected_batch = - RecordBatch::try_new(schema.clone(), vec![expected_data]).unwrap(); - - // Data is correct - assert_eq!(&vec![expected_batch], &result); - - // explicitlty ensure the metadata is present - assert_eq!( - result[0].schema().fields()[0].metadata(), - &Some(field_metadata) - ); - assert_eq!(result[0].schema().metadata(), &schema_metadata); - - Ok(()) - } - - #[tokio::test] - async fn test_lex_sort_by_float() -> Result<()> { - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); - let schema = Arc::new(Schema::new(vec![ - Field::new("a", DataType::Float32, true), - Field::new("b", DataType::Float64, true), - ])); - - // define data. - let batch = RecordBatch::try_new( - schema.clone(), - vec![ - Arc::new(Float32Array::from(vec![ - Some(f32::NAN), - None, - None, - Some(f32::NAN), - Some(1.0_f32), - Some(1.0_f32), - Some(2.0_f32), - Some(3.0_f32), - ])), - Arc::new(Float64Array::from(vec![ - Some(200.0_f64), - Some(20.0_f64), - Some(10.0_f64), - Some(100.0_f64), - Some(f64::NAN), - None, - None, - Some(f64::NAN), - ])), - ], - )?; - - let sort_exec = Arc::new(SortExec2::try_new( - vec![ - PhysicalSortExpr { - expr: col("a", &schema)?, - options: SortOptions { - descending: true, - nulls_first: true, - }, - }, - PhysicalSortExpr { - expr: col("b", &schema)?, - options: SortOptions { - descending: false, - nulls_first: false, - }, - }, - ], - Arc::new(MemoryExec::try_new(&[vec![batch]], schema, None)?), - )?); - - assert_eq!(DataType::Float32, *sort_exec.schema().field(0).data_type()); - assert_eq!(DataType::Float64, *sort_exec.schema().field(1).data_type()); - - let result: Vec = collect(sort_exec.clone(), task_ctx).await?; - let metrics = sort_exec.metrics().unwrap(); - assert!(metrics.elapsed_compute().unwrap() > 0); - assert_eq!(metrics.output_rows().unwrap(), 8); - assert_eq!(result.len(), 1); - - let columns = result[0].columns(); - - assert_eq!(DataType::Float32, *columns[0].data_type()); - assert_eq!(DataType::Float64, *columns[1].data_type()); - - let a = as_primitive_array::(&columns[0]); - let b = as_primitive_array::(&columns[1]); - - // convert result to strings to allow comparing to expected result containing NaN - let result: Vec<(Option, Option)> = (0..result[0].num_rows()) - .map(|i| { - let aval = if a.is_valid(i) { - Some(a.value(i).to_string()) - } else { - None - }; - let bval = if b.is_valid(i) { - Some(b.value(i).to_string()) - } else { - None - }; - (aval, bval) - }) - .collect(); - - let expected: Vec<(Option, Option)> = vec![ - (None, Some("10".to_owned())), - (None, Some("20".to_owned())), - (Some("NaN".to_owned()), Some("100".to_owned())), - (Some("NaN".to_owned()), Some("200".to_owned())), - (Some("3".to_owned()), Some("NaN".to_owned())), - (Some("2".to_owned()), None), - (Some("1".to_owned()), Some("NaN".to_owned())), - (Some("1".to_owned()), None), - ]; - - assert_eq!(expected, result); - - Ok(()) - } - - #[tokio::test] - async fn test_drop_cancel() -> Result<()> { - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); - let schema = - Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, true)])); - - let blocking_exec = Arc::new(BlockingExec::new(Arc::clone(&schema), 1)); - let refs = blocking_exec.refs(); - let sort_exec = Arc::new(SortExec2::try_new( - vec![PhysicalSortExpr { - expr: col("a", &schema)?, - options: SortOptions::default(), - }], - blocking_exec, - )?); - - let fut = collect(sort_exec, task_ctx); - let mut fut = fut.boxed(); - - assert_is_pending(&mut fut); - drop(fut); - assert_strong_count_converges_to_zero(refs).await; - - Ok(()) - } -} diff --git a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs index e5151ffe32de..3fb480de21bd 100644 --- a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs +++ b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs @@ -617,7 +617,7 @@ mod tests { use crate::physical_plan::expressions::col; use crate::physical_plan::file_format::{CsvExec, FileScanConfig}; use crate::physical_plan::memory::MemoryExec; - use crate::physical_plan::sorts::sort2::SortExec2; + use crate::physical_plan::sorts::sort::SortExec; use crate::physical_plan::{collect, common}; use crate::test::{self, assert_is_pending}; use crate::{assert_batches_eq, test_util}; @@ -881,7 +881,7 @@ mod tests { context: Arc, ) -> RecordBatch { let sort_exec = - Arc::new(SortExec2::new_with_partitioning(sort.clone(), input, true)); + Arc::new(SortExec::new_with_partitioning(sort.clone(), input, true)); sorted_merge(sort_exec, sort, context).await } @@ -891,7 +891,7 @@ mod tests { context: Arc, ) -> RecordBatch { let merge = Arc::new(CoalescePartitionsExec::new(src)); - let sort_exec = Arc::new(SortExec2::try_new(sort, merge).unwrap()); + let sort_exec = Arc::new(SortExec::try_new(sort, merge).unwrap()); let mut result = collect(sort_exec, context).await.unwrap(); assert_eq!(result.len(), 1); result.remove(0) diff --git a/datafusion/core/tests/order_spill_fuzz.rs b/datafusion/core/tests/order_spill_fuzz.rs index dd363ed6138c..9c10e69d549d 100644 --- a/datafusion/core/tests/order_spill_fuzz.rs +++ b/datafusion/core/tests/order_spill_fuzz.rs @@ -27,7 +27,6 @@ use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; use datafusion::physical_plan::expressions::{col, PhysicalSortExpr}; use datafusion::physical_plan::memory::MemoryExec; use datafusion::physical_plan::sorts::sort::SortExec; -use datafusion::physical_plan::sorts::sort2::SortExec2; use datafusion::physical_plan::{collect, ExecutionPlan}; use datafusion::prelude::{SessionConfig, SessionContext}; use fuzz_utils::{add_empty_batches, batches_to_vec, partitions_to_sorted_vec}; @@ -40,21 +39,11 @@ async fn test_sort_1k_mem() { run_sort(1024, vec![(5, false), (2000, true), (1000000, true)]).await; } -#[tokio::test] -async fn test_sort_1k_mem_2() { - run_sort2(1024, vec![(5, false), (2000, true), (1000000, true)]).await; -} - #[tokio::test] async fn test_sort_100k_mem() { run_sort(102400, vec![(5, false), (2000, false), (1000000, true)]).await; } -#[tokio::test] -async fn test_sort_100k_mem_2() { - run_sort2(102400, vec![(5, false), (2000, false), (1000000, true)]).await; -} - #[tokio::test] async fn test_sort_unlimited_mem() { run_sort( @@ -64,15 +53,6 @@ async fn test_sort_unlimited_mem() { .await; } -#[tokio::test] -async fn test_sort_unlimited_mem2() { - run_sort2( - usize::MAX, - vec![(5, false), (2000, false), (1000000, false)], - ) - .await; -} - /// Sort the input using SortExec and ensure the results are correct according to `Vec::sort` async fn run_sort(pool_size: usize, size_spill: Vec<(usize, bool)>) { for (size, spill) in size_spill { @@ -117,50 +97,6 @@ async fn run_sort(pool_size: usize, size_spill: Vec<(usize, bool)>) { } } -/// Sort the input using SortExec and ensure the results are correct according to `Vec::sort` -async fn run_sort2(pool_size: usize, size_spill: Vec<(usize, bool)>) { - for (size, spill) in size_spill { - let input = vec![make_staggered_batches(size)]; - let first_batch = input - .iter() - .flat_map(|p| p.iter()) - .next() - .expect("at least one batch"); - let schema = first_batch.schema(); - - let sort = vec![PhysicalSortExpr { - expr: col("x", &schema).unwrap(), - options: SortOptions { - descending: false, - nulls_first: true, - }, - }]; - - let exec = MemoryExec::try_new(&input, schema, None).unwrap(); - let sort = Arc::new(SortExec2::try_new(sort, Arc::new(exec)).unwrap()); - - let runtime_config = RuntimeConfig::new().with_memory_manager( - MemoryManagerConfig::try_new_limit(pool_size, 1.0).unwrap(), - ); - let runtime = Arc::new(RuntimeEnv::new(runtime_config).unwrap()); - let session_ctx = SessionContext::with_config_rt(SessionConfig::new(), runtime); - - let task_ctx = session_ctx.task_ctx(); - let collected = collect(sort.clone(), task_ctx).await.unwrap(); - - let expected = partitions_to_sorted_vec(&input); - let actual = batches_to_vec(&collected); - - if spill { - assert_ne!(sort.metrics().unwrap().spill_count().unwrap(), 0); - } else { - assert_eq!(sort.metrics().unwrap().spill_count().unwrap(), 0); - } - - assert_eq!(expected, actual, "failure in @ pool_size {}", pool_size); - } -} - /// Return randomly sized record batches in a field named 'x' of type `Int32` /// with randomized i32 content fn make_staggered_batches(len: usize) -> Vec { diff --git a/datafusion/core/tests/sql/explain_analyze.rs b/datafusion/core/tests/sql/explain_analyze.rs index fe1094542bef..ef6ea52dd774 100644 --- a/datafusion/core/tests/sql/explain_analyze.rs +++ b/datafusion/core/tests/sql/explain_analyze.rs @@ -60,7 +60,7 @@ async fn explain_analyze_baseline_metrics() { ); assert_metrics!( &formatted, - "SortExec2: [c1@0 ASC NULLS LAST]", + "SortExec: [c1@0 ASC NULLS LAST]", "metrics=[output_rows=5, elapsed_compute=" ); assert_metrics!( @@ -108,7 +108,7 @@ async fn explain_analyze_baseline_metrics() { use datafusion::physical_plan; use datafusion::physical_plan::sorts; - plan.as_any().downcast_ref::().is_some() + plan.as_any().downcast_ref::().is_some() || plan.as_any().downcast_ref::().is_some() // CoalescePartitionsExec doesn't do any work so is not included || plan.as_any().downcast_ref::().is_some() @@ -648,7 +648,7 @@ async fn test_physical_plan_display_indent() { let physical_plan = ctx.create_physical_plan(&plan).await.unwrap(); let expected = vec![ "GlobalLimitExec: limit=10", - " SortExec2: [the_min@2 DESC]", + " SortExec: [the_min@2 DESC]", " CoalescePartitionsExec", " ProjectionExec: expr=[c1@0 as c1, MAX(aggregate_test_100.c12)@1 as MAX(aggregate_test_100.c12), MIN(aggregate_test_100.c12)@2 as the_min]", " HashAggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], aggr=[MAX(aggregate_test_100.c12), MIN(aggregate_test_100.c12)]", From ad361c18571b93f131341974927e8507bda5761a Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Sat, 2 Apr 2022 12:56:26 +0800 Subject: [PATCH 06/15] revert unexpected changes --- benchmarks/queries/q1.sql | 21 +++++++++++++------ .../core/src/physical_plan/sorts/sort.rs | 4 ++-- datafusion/core/tests/order_spill_fuzz.rs | 6 +++--- 3 files changed, 20 insertions(+), 11 deletions(-) diff --git a/benchmarks/queries/q1.sql b/benchmarks/queries/q1.sql index 9af11d3c99fa..a0fcf159e209 100644 --- a/benchmarks/queries/q1.sql +++ b/benchmarks/queries/q1.sql @@ -1,12 +1,21 @@ select l_returnflag, l_linestatus, - l_quantity, - l_extendedprice, - l_discount, - l_tax + sum(l_quantity) as sum_qty, + sum(l_extendedprice) as sum_base_price, + sum(l_extendedprice * (1 - l_discount)) as sum_disc_price, + sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge, + avg(l_quantity) as avg_qty, + avg(l_extendedprice) as avg_price, + avg(l_discount) as avg_disc, + count(*) as count_order from lineitem +where + l_shipdate <= date '1998-09-02' +group by + l_returnflag, + l_linestatus order by - l_extendedprice, - l_discount; \ No newline at end of file + l_returnflag, + l_linestatus; \ No newline at end of file diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs index 71ffa21ab56c..6fb2ed4bcc53 100644 --- a/datafusion/core/src/physical_plan/sorts/sort.rs +++ b/datafusion/core/src/physical_plan/sorts/sort.rs @@ -204,7 +204,7 @@ impl ExternalSorter { impl Debug for ExternalSorter { fn fmt(&self, f: &mut Formatter) -> fmt::Result { - f.debug_struct("ExternalSorter2") + f.debug_struct("ExternalSorter") .field("id", &self.id()) .field("memory_used", &self.used()) .field("spilled_bytes", &self.spilled_bytes()) @@ -228,7 +228,7 @@ impl Drop for ExternalSorter { #[async_trait] impl MemoryConsumer for ExternalSorter { fn name(&self) -> String { - "ExternalSorter2".to_owned() + "ExternalSorter".to_owned() } fn id(&self) -> &MemoryConsumerId { diff --git a/datafusion/core/tests/order_spill_fuzz.rs b/datafusion/core/tests/order_spill_fuzz.rs index 9c10e69d549d..c052382d5eac 100644 --- a/datafusion/core/tests/order_spill_fuzz.rs +++ b/datafusion/core/tests/order_spill_fuzz.rs @@ -36,12 +36,12 @@ use std::sync::Arc; #[tokio::test] async fn test_sort_1k_mem() { - run_sort(1024, vec![(5, false), (2000, true), (1000000, true)]).await; + run_sort(1024, vec![(5, false), (2000, true), (1000000, true)]).await } #[tokio::test] async fn test_sort_100k_mem() { - run_sort(102400, vec![(5, false), (2000, false), (1000000, true)]).await; + run_sort(102400, vec![(5, false), (2000, false), (1000000, true)]).await } #[tokio::test] @@ -50,7 +50,7 @@ async fn test_sort_unlimited_mem() { usize::MAX, vec![(5, false), (2000, false), (1000000, false)], ) - .await; + .await } /// Sort the input using SortExec and ensure the results are correct according to `Vec::sort` From 28f5f45a2648d03d7a818500a4feca781f10193e Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Sat, 2 Apr 2022 16:01:15 +0800 Subject: [PATCH 07/15] iter rework --- .../core/src/physical_plan/sorts/sort.rs | 127 +++++++++--------- 1 file changed, 61 insertions(+), 66 deletions(-) diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs index 6fb2ed4bcc53..409d885621e7 100644 --- a/datafusion/core/src/physical_plan/sorts/sort.rs +++ b/datafusion/core/src/physical_plan/sorts/sort.rs @@ -213,12 +213,6 @@ impl Debug for ExternalSorter { } } -#[derive(Debug, Copy, Clone)] -struct CombinedIndex { - batch_idx: usize, - row_idx: usize, -} - impl Drop for ExternalSorter { fn drop(&mut self) { self.runtime.drop_consumer(self.id(), self.used()); @@ -319,63 +313,66 @@ fn in_mem_partial_sort( } } +#[derive(Debug, Copy, Clone)] +struct CompositeIndex { + batch_idx: u32, + row_idx: u32, +} + fn get_sorted_iter( batches: &[RecordBatch], expr: &[PhysicalSortExpr], batch_size: usize, ) -> Result { - let (batch_grouped, combined): (Vec>, Vec>) = - batches - .iter() - .enumerate() - .map(|(i, batch)| { - let col: Vec = expr - .iter() - .map(|e| Ok(e.evaluate_to_sort_column(batch)?.values)) - .collect::>>()?; - - let combined_index = (0..batch.num_rows()) - .map(|r| CombinedIndex { - batch_idx: i, - row_idx: r, - }) - .collect::>(); - - Ok((col, combined_index)) - }) - .collect::, Vec)>>>()? - .into_iter() - .unzip(); + let row_indices = batches + .iter() + .enumerate() + .map(|(i, batch)| { + (0..batch.num_rows()) + .map(|r| CompositeIndex { + // since we original use UInt32Array to index the combined mono batch, + // component record batches won't overflow as well, + // use u32 here for space efficiency. + batch_idx: i as u32, + row_idx: r as u32, + }) + .collect::>() + }) + .flatten() + .collect::>(); - let column_grouped = transpose(batch_grouped); + let sort_columns = batches + .iter() + .map(|batch| { + expr.iter() + .map(|e| Ok(e.evaluate_to_sort_column(batch)?.values)) + .collect::>>() + }) + .collect::>>>()?; - let sort_columns: Vec = column_grouped + let sort_columns = expr .iter() - .zip(expr.iter()) - .map(|(c, e)| { + .enumerate() + .map(|(i, expr)| { + let columns_i = sort_columns + .iter() + .map(|cs| cs[i].as_ref()) + .collect::>(); Ok(SortColumn { - values: concat( - &*c.iter().map(|i| i.as_ref()).collect::>(), - )?, - options: Some(e.options), + values: concat(columns_i.as_slice())?, + options: Some(expr.options), }) }) - .collect::>>() - .into_iter() .collect::>>()?; - let indices = lexsort_to_indices(&sort_columns, None)?; - let combined = combined - .into_iter() - .flatten() - .collect::>(); - Ok(SortedIterator::new(indices, combined, batch_size)) + + Ok(SortedIterator::new(indices, row_indices, batch_size)) } struct SortedIterator { pos: usize, indices: UInt32Array, - combined: Vec, + composite: Vec, batch_size: usize, length: usize, } @@ -383,22 +380,28 @@ struct SortedIterator { impl SortedIterator { fn new( indices: UInt32Array, - combined: Vec, + composite: Vec, batch_size: usize, ) -> Self { - let length = combined.len(); + let length = composite.len(); Self { pos: 0, indices, - combined, + composite, batch_size, length, } } + + fn memory_size(&self) -> usize { + std::mem::size_of_val(self) + + self.indices.get_array_memory_size() + + std::mem::size_of_val(&self.composite[..]) + } } impl Iterator for SortedIterator { - type Item = Vec; + type Item = Vec; fn next(&mut self) -> Option { if self.pos >= self.length { @@ -410,7 +413,7 @@ impl Iterator for SortedIterator { for i in 0..current_size { let p = self.pos + i; let c_index = self.indices.value(p) as usize; - result.push(self.combined[c_index]) + result.push(self.composite[c_index]) } self.pos += current_size; Some(result) @@ -434,7 +437,8 @@ impl SortedSizedRecordBatchStream { sorted_iter: SortedIterator, metrics: MemTrackingMetrics, ) -> Self { - let size = batches.iter().map(batch_byte_size).sum::(); + let size = batches.iter().map(batch_byte_size).sum::() + + sorted_iter.memory_size(); metrics.init_mem_used(size); let num_cols = batches[0].num_columns(); SortedSizedRecordBatchStream { @@ -467,7 +471,12 @@ impl Stream for SortedSizedRecordBatchStream { let mut mutable = MutableArrayData::new(arrays, false, combined.len()); for x in combined.iter() { - mutable.extend(x.batch_idx, x.row_idx, x.row_idx + 1); + // we cannot extend with slice here, since the lexsort is unstable + mutable.extend( + x.batch_idx as usize, + x.row_idx as usize, + (x.row_idx + 1) as usize, + ); } output.push(make_array(mutable.freeze())) } @@ -485,20 +494,6 @@ impl RecordBatchStream for SortedSizedRecordBatchStream { } } -fn transpose(v: Vec>) -> Vec> { - assert!(!v.is_empty()); - let len = v[0].len(); - let mut iters: Vec<_> = v.into_iter().map(|n| n.into_iter()).collect(); - (0..len) - .map(|_| { - iters - .iter_mut() - .map(|n| n.next().unwrap()) - .collect::>() - }) - .collect() -} - async fn spill_partial_sorted_stream( in_mem_stream: &mut SendableRecordBatchStream, path: &Path, From e56919ba26c0133262c939792e9c411411ec071d Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Sat, 2 Apr 2022 16:24:34 +0800 Subject: [PATCH 08/15] minor --- datafusion/core/src/physical_plan/sorts/sort.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs index 409d885621e7..4b937822b5bc 100644 --- a/datafusion/core/src/physical_plan/sorts/sort.rs +++ b/datafusion/core/src/physical_plan/sorts/sort.rs @@ -319,6 +319,7 @@ struct CompositeIndex { row_idx: u32, } +/// Get sorted iterator by sort concatenated `SortColumn`s fn get_sorted_iter( batches: &[RecordBatch], expr: &[PhysicalSortExpr], @@ -327,7 +328,7 @@ fn get_sorted_iter( let row_indices = batches .iter() .enumerate() - .map(|(i, batch)| { + .flat_map(|(i, batch)| { (0..batch.num_rows()) .map(|r| CompositeIndex { // since we original use UInt32Array to index the combined mono batch, @@ -338,10 +339,9 @@ fn get_sorted_iter( }) .collect::>() }) - .flatten() .collect::>(); - let sort_columns = batches + let sort_arrays = batches .iter() .map(|batch| { expr.iter() @@ -354,7 +354,7 @@ fn get_sorted_iter( .iter() .enumerate() .map(|(i, expr)| { - let columns_i = sort_columns + let columns_i = sort_arrays .iter() .map(|cs| cs[i].as_ref()) .collect::>(); @@ -403,6 +403,7 @@ impl SortedIterator { impl Iterator for SortedIterator { type Item = Vec; + /// Emit a max of `batch_size` positions each time fn next(&mut self) -> Option { if self.pos >= self.length { return None; From 75c6912b154357618ed77562af80f5e4653155d6 Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Sat, 2 Apr 2022 19:47:33 +0800 Subject: [PATCH 09/15] prepare --- datafusion/core/Cargo.toml | 2 +- datafusion/core/src/row/mod.rs | 14 +++++++------- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index f17456e3dd88..57f53601d1aa 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -41,7 +41,7 @@ path = "src/lib.rs" # Used to enable the avro format avro = ["avro-rs", "num-traits", "datafusion-common/avro"] crypto_expressions = ["datafusion-physical-expr/crypto_expressions"] -default = ["crypto_expressions", "regex_expressions", "unicode_expressions"] +default = ["crypto_expressions", "regex_expressions", "unicode_expressions", "row"] # Used for testing ONLY: causes all values to hash to the same value (test for collisions) force_hash_collisions = [] # Used to enable JIT code generation diff --git a/datafusion/core/src/row/mod.rs b/datafusion/core/src/row/mod.rs index 531dbfe3e41e..db10c6e03c53 100644 --- a/datafusion/core/src/row/mod.rs +++ b/datafusion/core/src/row/mod.rs @@ -222,12 +222,7 @@ mod tests { use super::*; use crate::datasource::file_format::parquet::ParquetFormat; use crate::datasource::file_format::FileFormat; - use crate::datasource::object_store::local::{ - local_object_reader, local_object_reader_stream, local_unpartitioned_file, - LocalFileSystem, - }; use crate::error::Result; - use crate::execution::runtime_env::RuntimeEnv; use crate::physical_plan::file_format::FileScanConfig; use crate::physical_plan::{collect, ExecutionPlan}; use crate::row::reader::read_as_batch; @@ -243,6 +238,10 @@ mod tests { use datafusion_jit::api::Assembler; use rand::Rng; use DataType::*; + use datafusion_data_access::object_store::local::{local_object_reader, local_object_reader_stream}; + use crate::datasource::listing::local_unpartitioned_file; + use crate::prelude::SessionContext; + use datafusion_data_access::object_store::local::LocalFileSystem; fn test_validity(bs: &[bool]) { let n = bs.len(); @@ -556,12 +555,13 @@ mod tests { #[tokio::test] async fn test_with_parquet() -> Result<()> { - let runtime = Arc::new(RuntimeEnv::default()); + let session_ctx = SessionContext::new(); + let task_ctx = session_ctx.task_ctx(); let projection = Some(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]); let exec = get_exec("alltypes_plain.parquet", &projection, None).await?; let schema = exec.schema().clone(); - let batches = collect(exec, runtime).await?; + let batches = collect(exec, task_ctx).await?; assert_eq!(1, batches.len()); let batch = &batches[0]; From 6bf7b9d0c75b98680045f691ca8371419dff7a6f Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Sun, 3 Apr 2022 12:27:12 +0800 Subject: [PATCH 10/15] avoid evaluating sort columns twice --- .../core/src/physical_plan/sorts/sort.rs | 81 ++++++++++++------- 1 file changed, 53 insertions(+), 28 deletions(-) diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs index 4b937822b5bc..90e2bb63fa4e 100644 --- a/datafusion/core/src/physical_plan/sorts/sort.rs +++ b/datafusion/core/src/physical_plan/sorts/sort.rs @@ -74,7 +74,7 @@ use tokio::task; struct ExternalSorter { id: MemoryConsumerId, schema: SchemaRef, - in_mem_batches: Mutex>, + in_mem_batches: Mutex>, spills: Mutex>, /// Sort expressions expr: Vec, @@ -282,7 +282,7 @@ impl MemoryConsumer for ExternalSorter { /// consume the non-empty `sorted_bathes` and do in_mem_sort fn in_mem_partial_sort( - buffered_batches: &mut Vec, + buffered_batches: &mut Vec, schema: SchemaRef, expressions: &[PhysicalSortExpr], batch_size: usize, @@ -293,16 +293,28 @@ fn in_mem_partial_sort( let result = buffered_batches.pop(); Ok(Box::pin(SizedRecordBatchStream::new( schema, - vec![Arc::new(result.unwrap())], + vec![Arc::new(result.unwrap().sorted_batch)], tracking_metrics, ))) } else { - let batches = buffered_batches.drain(..).collect::>(); + let (sorted_arrays, batches): (Vec>, Vec) = + buffered_batches + .drain(..) + .into_iter() + .map(|b| { + let BatchWithSortArray { + sort_arrays, + sorted_batch: batch, + } = b; + (sort_arrays, batch) + }) + .unzip(); + let sorted_iter = { // NB timer records time taken on drop, so there are no // calls to `timer.done()` below. let _timer = tracking_metrics.elapsed_compute().timer(); - get_sorted_iter(&batches, expressions, batch_size)? + get_sorted_iter(&sorted_arrays, expressions, batch_size)? }; Ok(Box::pin(SortedSizedRecordBatchStream::new( schema, @@ -321,15 +333,15 @@ struct CompositeIndex { /// Get sorted iterator by sort concatenated `SortColumn`s fn get_sorted_iter( - batches: &[RecordBatch], + sort_arrays: &[Vec], expr: &[PhysicalSortExpr], batch_size: usize, ) -> Result { - let row_indices = batches + let row_indices = sort_arrays .iter() .enumerate() - .flat_map(|(i, batch)| { - (0..batch.num_rows()) + .flat_map(|(i, arrays)| { + (0..arrays[0].len()) .map(|r| CompositeIndex { // since we original use UInt32Array to index the combined mono batch, // component record batches won't overflow as well, @@ -341,15 +353,6 @@ fn get_sorted_iter( }) .collect::>(); - let sort_arrays = batches - .iter() - .map(|batch| { - expr.iter() - .map(|e| Ok(e.evaluate_to_sort_column(batch)?.values)) - .collect::>>() - }) - .collect::>>>()?; - let sort_columns = expr .iter() .enumerate() @@ -728,22 +731,26 @@ impl ExecutionPlan for SortExec { } } +struct BatchWithSortArray { + sort_arrays: Vec, + sorted_batch: RecordBatch, +} + fn sort_batch( batch: RecordBatch, schema: SchemaRef, expr: &[PhysicalSortExpr], -) -> ArrowResult { +) -> ArrowResult { // TODO: pushup the limit expression to sort - let indices = lexsort_to_indices( - &expr - .iter() - .map(|e| e.evaluate_to_sort_column(&batch)) - .collect::>>()?, - None, - )?; + let sort_columns = expr + .iter() + .map(|e| e.evaluate_to_sort_column(&batch)) + .collect::>>()?; + + let indices = lexsort_to_indices(&sort_columns, None)?; // reorder all rows based on sorted indices - RecordBatch::try_new( + let sorted_batch = RecordBatch::try_new( schema, batch .columns() @@ -760,7 +767,25 @@ fn sort_batch( ) }) .collect::>>()?, - ) + )?; + + let sort_arrays = sort_columns + .into_iter() + .map(|sc| { + Ok(take( + sc.values.as_ref(), + &indices, + Some(TakeOptions { + check_bounds: false, + }), + )?) + }) + .collect::>>()?; + + Ok(BatchWithSortArray { + sort_arrays, + sorted_batch, + }) } async fn do_sort( From 73ad8f06524d709a99a537b2e60afc1301f48f85 Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Mon, 4 Apr 2022 12:31:34 +0800 Subject: [PATCH 11/15] revert unrelated changes --- datafusion/core/Cargo.toml | 2 +- datafusion/core/src/row/mod.rs | 14 +++++++------- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index 57f53601d1aa..f17456e3dd88 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -41,7 +41,7 @@ path = "src/lib.rs" # Used to enable the avro format avro = ["avro-rs", "num-traits", "datafusion-common/avro"] crypto_expressions = ["datafusion-physical-expr/crypto_expressions"] -default = ["crypto_expressions", "regex_expressions", "unicode_expressions", "row"] +default = ["crypto_expressions", "regex_expressions", "unicode_expressions"] # Used for testing ONLY: causes all values to hash to the same value (test for collisions) force_hash_collisions = [] # Used to enable JIT code generation diff --git a/datafusion/core/src/row/mod.rs b/datafusion/core/src/row/mod.rs index db10c6e03c53..531dbfe3e41e 100644 --- a/datafusion/core/src/row/mod.rs +++ b/datafusion/core/src/row/mod.rs @@ -222,7 +222,12 @@ mod tests { use super::*; use crate::datasource::file_format::parquet::ParquetFormat; use crate::datasource::file_format::FileFormat; + use crate::datasource::object_store::local::{ + local_object_reader, local_object_reader_stream, local_unpartitioned_file, + LocalFileSystem, + }; use crate::error::Result; + use crate::execution::runtime_env::RuntimeEnv; use crate::physical_plan::file_format::FileScanConfig; use crate::physical_plan::{collect, ExecutionPlan}; use crate::row::reader::read_as_batch; @@ -238,10 +243,6 @@ mod tests { use datafusion_jit::api::Assembler; use rand::Rng; use DataType::*; - use datafusion_data_access::object_store::local::{local_object_reader, local_object_reader_stream}; - use crate::datasource::listing::local_unpartitioned_file; - use crate::prelude::SessionContext; - use datafusion_data_access::object_store::local::LocalFileSystem; fn test_validity(bs: &[bool]) { let n = bs.len(); @@ -555,13 +556,12 @@ mod tests { #[tokio::test] async fn test_with_parquet() -> Result<()> { - let session_ctx = SessionContext::new(); - let task_ctx = session_ctx.task_ctx(); + let runtime = Arc::new(RuntimeEnv::default()); let projection = Some(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]); let exec = get_exec("alltypes_plain.parquet", &projection, None).await?; let schema = exec.schema().clone(); - let batches = collect(exec, task_ctx).await?; + let batches = collect(exec, runtime).await?; assert_eq!(1, batches.len()); let batch = &batches[0]; From ad3a4d28722e0abf8389a74a77833414756894b6 Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Tue, 5 Apr 2022 08:14:35 +0800 Subject: [PATCH 12/15] Update datafusion/core/src/physical_plan/sorts/sort.rs Co-authored-by: Andrew Lamb --- datafusion/core/src/physical_plan/sorts/sort.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs index 90e2bb63fa4e..04e658a0b8b0 100644 --- a/datafusion/core/src/physical_plan/sorts/sort.rs +++ b/datafusion/core/src/physical_plan/sorts/sort.rs @@ -373,10 +373,15 @@ fn get_sorted_iter( } struct SortedIterator { + /// Current logical position in the iterator pos: usize, + /// Indexes into the input representing the correctly sorted total output indices: UInt32Array, + /// Map each each logical input index to where it can be found in the sorted input batches composite: Vec, + /// Maximum batch size to produce batch_size: usize, + /// total length of the iterator length: usize, } From e4bee9da3eeda0b728c5a4d766784dc9711931d4 Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Tue, 5 Apr 2022 10:25:48 +0800 Subject: [PATCH 13/15] resolve comments --- .../core/src/physical_plan/sorts/sort.rs | 85 ++++++++++++++----- 1 file changed, 64 insertions(+), 21 deletions(-) diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs index 04e658a0b8b0..fbe465a667da 100644 --- a/datafusion/core/src/physical_plan/sorts/sort.rs +++ b/datafusion/core/src/physical_plan/sorts/sort.rs @@ -341,15 +341,13 @@ fn get_sorted_iter( .iter() .enumerate() .flat_map(|(i, arrays)| { - (0..arrays[0].len()) - .map(|r| CompositeIndex { - // since we original use UInt32Array to index the combined mono batch, - // component record batches won't overflow as well, - // use u32 here for space efficiency. - batch_idx: i as u32, - row_idx: r as u32, - }) - .collect::>() + (0..arrays[0].len()).map(move |r| CompositeIndex { + // since we original use UInt32Array to index the combined mono batch, + // component record batches won't overflow as well, + // use u32 here for space efficiency. + batch_idx: i as u32, + row_idx: r as u32, + }) }) .collect::>(); @@ -418,12 +416,14 @@ impl Iterator for SortedIterator { } let current_size = min(self.batch_size, self.length - self.pos); - let mut result = Vec::with_capacity(current_size); - for i in 0..current_size { - let p = self.pos + i; - let c_index = self.indices.value(p) as usize; - result.push(self.composite[c_index]) - } + let result = (0..current_size) + .map(|i| { + let p = self.pos + i; + let c_index = self.indices.value(p) as usize; + self.composite[c_index] + }) + .collect::>(); + self.pos += current_size; Some(result) } @@ -470,6 +470,8 @@ impl Stream for SortedSizedRecordBatchStream { match self.sorted_iter.next() { None => Poll::Ready(None), Some(combined) => { + let num_rows = combined.len(); + let slices = combine_adjacent_indexes(combined); let mut output = Vec::with_capacity(self.num_cols); for i in 0..self.num_cols { let arrays = self @@ -477,14 +479,12 @@ impl Stream for SortedSizedRecordBatchStream { .iter() .map(|b| b.column(i).data()) .collect::>(); - let mut mutable = - MutableArrayData::new(arrays, false, combined.len()); - for x in combined.iter() { - // we cannot extend with slice here, since the lexsort is unstable + let mut mutable = MutableArrayData::new(arrays, false, num_rows); + for x in slices.iter() { mutable.extend( x.batch_idx as usize, - x.row_idx as usize, - (x.row_idx + 1) as usize, + x.start_row_idx as usize, + x.start_row_idx as usize + x.len, ); } output.push(make_array(mutable.freeze())) @@ -497,6 +497,49 @@ impl Stream for SortedSizedRecordBatchStream { } } +struct CompositeSlice { + batch_idx: u32, + start_row_idx: u32, + len: usize, +} + +/// Combine adjacent indexes from the same batch to make a slice, for more efficient `extend` later. +fn combine_adjacent_indexes(combined: Vec) -> Vec { + let mut last_batch_idx = 0; + let mut start_row_idx = 0; + let mut len = 0; + + let mut slices = vec![]; + for ci in combined { + if len == 0 { + last_batch_idx = ci.batch_idx; + start_row_idx = ci.row_idx; + len = 1; + } else if ci.batch_idx == last_batch_idx { + len += 1; + // since we have pre-sort each of the incoming batches, + // so if we witnessed a wrong order of indexes from the same batch, + // it must be of the same key with the row pointed by start_row_index. + start_row_idx = min(start_row_idx, ci.row_idx); + } else { + slices.push(CompositeSlice { + batch_idx: last_batch_idx, + start_row_idx, + len, + }); + last_batch_idx = ci.batch_idx; + start_row_idx = ci.row_idx; + len = 1; + } + } + slices.push(CompositeSlice { + batch_idx: last_batch_idx, + start_row_idx, + len, + }); + slices +} + impl RecordBatchStream for SortedSizedRecordBatchStream { fn schema(&self) -> SchemaRef { self.schema.clone() From 2325d1aac2230182c4cb496327daa76ea0d80088 Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Tue, 5 Apr 2022 12:01:04 +0800 Subject: [PATCH 14/15] refactor a little bit --- .../core/src/physical_plan/sorts/sort.rs | 127 +++++++++--------- 1 file changed, 61 insertions(+), 66 deletions(-) diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs index fbe465a667da..2d5b10a6a5ec 100644 --- a/datafusion/core/src/physical_plan/sorts/sort.rs +++ b/datafusion/core/src/physical_plan/sorts/sort.rs @@ -407,7 +407,7 @@ impl SortedIterator { } impl Iterator for SortedIterator { - type Item = Vec; + type Item = Vec; /// Emit a max of `batch_size` positions each time fn next(&mut self) -> Option { @@ -416,16 +416,48 @@ impl Iterator for SortedIterator { } let current_size = min(self.batch_size, self.length - self.pos); - let result = (0..current_size) - .map(|i| { - let p = self.pos + i; - let c_index = self.indices.value(p) as usize; - self.composite[c_index] - }) - .collect::>(); + + // Combine adjacent indexes from the same batch to make a slice, + // for more efficient `extend` later. + let mut last_batch_idx = 0; + let mut start_row_idx = 0; + let mut len = 0; + + let mut slices = vec![]; + for i in 0..current_size { + let p = self.pos + i; + let c_index = self.indices.value(p) as usize; + let ci = self.composite[c_index]; + + if len == 0 { + last_batch_idx = ci.batch_idx; + start_row_idx = ci.row_idx; + len = 1; + } else if ci.batch_idx == last_batch_idx { + len += 1; + // since we have pre-sort each of the incoming batches, + // so if we witnessed a wrong order of indexes from the same batch, + // it must be of the same key with the row pointed by start_row_index. + start_row_idx = min(start_row_idx, ci.row_idx); + } else { + slices.push(CompositeSlice { + batch_idx: last_batch_idx, + start_row_idx, + len, + }); + last_batch_idx = ci.batch_idx; + start_row_idx = ci.row_idx; + len = 1; + } + } + slices.push(CompositeSlice { + batch_idx: last_batch_idx, + start_row_idx, + len, + }); self.pos += current_size; - Some(result) + Some(slices) } } @@ -469,26 +501,26 @@ impl Stream for SortedSizedRecordBatchStream { ) -> Poll> { match self.sorted_iter.next() { None => Poll::Ready(None), - Some(combined) => { - let num_rows = combined.len(); - let slices = combine_adjacent_indexes(combined); - let mut output = Vec::with_capacity(self.num_cols); - for i in 0..self.num_cols { - let arrays = self - .batches - .iter() - .map(|b| b.column(i).data()) - .collect::>(); - let mut mutable = MutableArrayData::new(arrays, false, num_rows); - for x in slices.iter() { - mutable.extend( - x.batch_idx as usize, - x.start_row_idx as usize, - x.start_row_idx as usize + x.len, - ); - } - output.push(make_array(mutable.freeze())) - } + Some(slices) => { + let num_rows = slices.iter().map(|s| s.len).sum(); + let output = (0..self.num_cols) + .map(|i| { + let arrays = self + .batches + .iter() + .map(|b| b.column(i).data()) + .collect::>(); + let mut mutable = MutableArrayData::new(arrays, false, num_rows); + for x in slices.iter() { + mutable.extend( + x.batch_idx as usize, + x.start_row_idx as usize, + x.start_row_idx as usize + x.len, + ); + } + make_array(mutable.freeze()) + }) + .collect::>(); let batch = RecordBatch::try_new(self.schema.clone(), output); let poll = Poll::Ready(Some(batch)); self.metrics.record_poll(poll) @@ -503,43 +535,6 @@ struct CompositeSlice { len: usize, } -/// Combine adjacent indexes from the same batch to make a slice, for more efficient `extend` later. -fn combine_adjacent_indexes(combined: Vec) -> Vec { - let mut last_batch_idx = 0; - let mut start_row_idx = 0; - let mut len = 0; - - let mut slices = vec![]; - for ci in combined { - if len == 0 { - last_batch_idx = ci.batch_idx; - start_row_idx = ci.row_idx; - len = 1; - } else if ci.batch_idx == last_batch_idx { - len += 1; - // since we have pre-sort each of the incoming batches, - // so if we witnessed a wrong order of indexes from the same batch, - // it must be of the same key with the row pointed by start_row_index. - start_row_idx = min(start_row_idx, ci.row_idx); - } else { - slices.push(CompositeSlice { - batch_idx: last_batch_idx, - start_row_idx, - len, - }); - last_batch_idx = ci.batch_idx; - start_row_idx = ci.row_idx; - len = 1; - } - } - slices.push(CompositeSlice { - batch_idx: last_batch_idx, - start_row_idx, - len, - }); - slices -} - impl RecordBatchStream for SortedSizedRecordBatchStream { fn schema(&self) -> SchemaRef { self.schema.clone() From 93f9d4d1a6cab05882fd508697f19a808f47f6c2 Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Tue, 5 Apr 2022 21:18:58 +0800 Subject: [PATCH 15/15] assertion --- datafusion/core/src/physical_plan/sorts/sort.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs b/datafusion/core/src/physical_plan/sorts/sort.rs index 2d5b10a6a5ec..3ddb0bf00bc1 100644 --- a/datafusion/core/src/physical_plan/sorts/sort.rs +++ b/datafusion/core/src/physical_plan/sorts/sort.rs @@ -450,6 +450,11 @@ impl Iterator for SortedIterator { len = 1; } } + + assert!( + len > 0, + "There should have at least one record in a sort output slice." + ); slices.push(CompositeSlice { batch_idx: last_batch_idx, start_row_idx,