Skip to content

Commit

Permalink
Use a comparable row format.
Browse files Browse the repository at this point in the history
  • Loading branch information
RinChanNOWWW committed Nov 1, 2022
1 parent 182f5e9 commit 75359d2
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 152 deletions.
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 2 additions & 3 deletions src/common/arrow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,10 @@ simd = ["arrow/simd"]
[dependencies] # In alphabetical order
# Workspace dependencies

# Crates.io dependencies
arrow = { package = "arrow2", version = "0.14.0", default-features = false, features = [
arrow = { package = "arrow2", git = "https://github.com/RinChanNOWWW/arrow2", rev = "78c61d5", default-features = false, features = [
"io_parquet",
"io_parquet_compression",
] }
] } # Crates.io dependencies
arrow-format = { version = "0.7.0", features = ["flight-data", "flight-service", "ipc"] }
futures = "0.3.24"
parquet2 = { version = "0.16.3", default_features = false }
Expand Down
20 changes: 0 additions & 20 deletions src/query/datablocks/src/kernels/data_block_sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,28 +205,8 @@ impl DataBlock {
}
}
}

pub fn build_compare(
left: &DataBlock,
right: &DataBlock,
sort_column_descriptions: &[SortColumnDescription],
) -> Result<ColumnsDynComparator> {
let mut res = Vec::with_capacity(sort_column_descriptions.len());
for SortColumnDescription { column_name, .. } in sort_column_descriptions {
let l = left.try_column_by_name(column_name)?;
let l = l.as_arrow_array(l.data_type());

let r = right.try_column_by_name(column_name)?;
let r = r.as_arrow_array(r.data_type());
let cmp = build_compare(&*l, &*r)?;
res.push(cmp);
}
Ok(res)
}
}

pub type ColumnsDynComparator = Vec<DynComparator>;

fn compare_variant(left: &dyn Array, right: &dyn Array) -> ArrowResult<DynComparator> {
let left = VariantColumn::from_arrow_array(left);
let right = VariantColumn::from_arrow_array(right);
Expand Down
1 change: 0 additions & 1 deletion src/query/datablocks/src/kernels/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,4 @@ mod data_block_sort;
mod data_block_take;

pub use data_block_group_by_hash::*;
pub use data_block_sort::ColumnsDynComparator;
pub use data_block_sort::SortColumnDescription;
1 change: 1 addition & 0 deletions src/query/pipeline/transforms/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ doctest = false
test = false

[dependencies]
common-arrow = { path = "../../../common/arrow" }
common-catalog = { path = "../../catalog" }
common-datablocks = { path = "../../datablocks" }
common-datavalues = { path = "../../datavalues" }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,20 @@

use std::any::Any;
use std::cmp::Ordering;
use std::cmp::Reverse;
use std::collections::BinaryHeap;
use std::collections::VecDeque;
use std::sync;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use std::sync::RwLock;

use common_datablocks::ColumnsDynComparator;
use common_arrow::arrow::compute::sort::row::Row;
use common_arrow::arrow::compute::sort::row::RowConverter;
use common_arrow::arrow::compute::sort::row::Rows;
use common_arrow::arrow::compute::sort::row::SortField;
use common_arrow::arrow::compute::sort::SortOptions;
use common_datablocks::DataBlock;
use common_datablocks::SortColumnDescription;
use common_datavalues::ColumnRef;
use common_datavalues::DataSchemaRef;
use common_exception::ErrorCode;
use common_exception::Result;
Expand Down Expand Up @@ -63,7 +66,7 @@ pub fn try_add_multi_sort_merge(
block_size,
limit,
sort_columns_descriptions,
);
)?;
pipeline.pipes.push(Pipe::ResizePipe {
inputs_port,
outputs_port: vec![output_port],
Expand All @@ -81,34 +84,17 @@ struct Cursor {

num_rows: usize,

sort_columns: Vec<ColumnRef>,
sort_columns_descriptions: Vec<SortColumnDescription>,

compare_map: Arc<RwLock<CompareMap>>,
rows: Rows,
}

impl Cursor {
pub fn try_create(
input_index: usize,
block: &DataBlock,
sort_columns_descriptions: Vec<SortColumnDescription>,
compare_map: Arc<RwLock<CompareMap>>,
) -> Result<Cursor> {
let sort_columns = sort_columns_descriptions
.iter()
.map(|f| {
let c = block.try_column_by_name(&f.column_name)?;
Ok(c.clone())
})
.collect::<Result<Vec<_>>>()?;
Ok(Cursor {
pub fn try_create(input_index: usize, rows: Rows) -> Cursor {
Cursor {
input_index,
row_index: 0,
sort_columns_descriptions,
sort_columns,
num_rows: block.num_rows(),
compare_map,
})
num_rows: rows.num_rows(),
rows,
}
}

#[inline]
Expand All @@ -123,100 +109,48 @@ impl Cursor {
self.num_rows == self.row_index
}

pub fn compare(&self, other: &Cursor) -> Result<Ordering> {
if self.sort_columns.len() != other.sort_columns.len() {
return Err(ErrorCode::LogicalError(format!(
"Sort columns length not match: {} != {}",
self.sort_columns.len(),
other.sort_columns.len()
)));
}
let compare_map = self.compare_map.read().unwrap();
let comparators = &compare_map[self.input_index][other.input_index];

for (i, ((l, r), option)) in self
.sort_columns
.iter()
.zip(other.sort_columns.iter())
.zip(self.sort_columns_descriptions.iter())
.enumerate()
{
match (!l.null_at(self.row_index), !r.null_at(other.row_index)) {
(false, true) if option.nulls_first => return Ok(Ordering::Less),
(false, true) => return Ok(Ordering::Greater),
(true, false) if option.nulls_first => return Ok(Ordering::Greater),
(true, false) => return Ok(Ordering::Less),
(false, false) => {}
(true, true) => match comparators[i](self.row_index, other.row_index) {
Ordering::Equal => {}
o if !option.asc => {
return Ok(o.reverse());
}
o => {
return Ok(o);
}
},
}
}

// If all columns are equal, compare the input index.
Ok(self.input_index.cmp(&other.input_index))
fn current(&self) -> Row<'_> {
self.rows.row(self.row_index)
}
}

impl Ord for Cursor {
fn cmp(&self, other: &Self) -> Ordering {
self.compare(other).unwrap()
self.current()
.cmp(&other.current())
.then_with(|| self.input_index.cmp(&other.input_index))
}
}

impl PartialEq for Cursor {
fn eq(&self, other: &Self) -> bool {
other.compare(self).unwrap() == Ordering::Equal
self.current() == other.current()
}
}

impl Eq for Cursor {}

impl PartialOrd for Cursor {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
other.compare(self).ok()
Some(self.cmp(other))
}
}

type CompareMap = Vec<Vec<ColumnsDynComparator>>;

fn create_compare_map(n: usize) -> CompareMap {
let mut res = Vec::with_capacity(n);
for _ in 0..n {
let mut inner = Vec::with_capacity(n);
for _ in 0..n {
inner.push(Vec::new());
}
res.push(inner);
}
res
}

/// TransformMultiSortMerge is a processor with multiple input ports;
pub struct MultiSortMergeProcessor {
/// Data from inputs (every input is sorted)
inputs: Vec<Arc<InputPort>>,
output: Arc<OutputPort>,
output_schema: DataSchemaRef,
/// Sort fields' indices in `output_schema`
sort_field_indices: Vec<usize>,

// Parameters
block_size: usize,
limit: Option<usize>,
sort_columns_descriptions: Vec<SortColumnDescription>,

/// For each input port, maintain a dequeue of data blocks.
blocks: Vec<VecDeque<DataBlock>>,
/// Compare blocks from different inputs.
///
/// There are only one block in the heap for each input port at the same time.
/// So we can use a 2d vector to store the comparators.
compare_map: Arc<RwLock<CompareMap>>,
/// Maintain a flag for each input denoting if the current cursor has finished
/// and needs to pull data from input.
cursor_finished: Vec<bool>,
Expand All @@ -225,9 +159,11 @@ pub struct MultiSortMergeProcessor {
/// Data format: (input_index, block_index, row_index)
in_progess_rows: Vec<(usize, usize, usize)>,
/// Heap that yields [`Cursor`] in increasing order.
heap: BinaryHeap<Cursor>,
heap: BinaryHeap<Reverse<Cursor>>,
/// If the input port is finished.
input_finished: Vec<bool>,
/// Used to convert columns to rows.
row_converter: RowConverter,

state: ProcessorState,

Expand All @@ -242,24 +178,41 @@ impl MultiSortMergeProcessor {
block_size: usize,
limit: Option<usize>,
sort_columns_descriptions: Vec<SortColumnDescription>,
) -> Self {
) -> Result<Self> {
let input_size = inputs.len();
Self {
let mut sort_field_indices = Vec::with_capacity(sort_columns_descriptions.len());
let sort_fields = sort_columns_descriptions
.iter()
.map(|d| {
let data_type = output_schema
.field_with_name(&d.column_name)?
.to_arrow()
.data_type()
.clone();
sort_field_indices.push(output_schema.index_of(&d.column_name)?);
Ok(SortField::new_with_options(data_type, SortOptions {
descending: !d.asc,
nulls_first: d.nulls_first,
}))
})
.collect::<Result<Vec<_>>>()?;
let row_converter = RowConverter::new(sort_fields);
Ok(Self {
inputs,
output,
output_schema,
sort_field_indices,
block_size,
limit,
sort_columns_descriptions,
blocks: vec![VecDeque::with_capacity(2); input_size],
compare_map: Arc::new(RwLock::new(create_compare_map(input_size))),
heap: BinaryHeap::with_capacity(input_size),
in_progess_rows: vec![],
cursor_finished: vec![true; input_size],
input_finished: vec![false; input_size],
row_converter,
state: ProcessorState::Consume,
aborting: Arc::new(AtomicBool::new(false)),
}
})
}

fn get_data_blocks(&mut self) -> Result<Vec<(usize, DataBlock)>> {
Expand Down Expand Up @@ -292,11 +245,11 @@ impl MultiSortMergeProcessor {
// Use `>=` because some of the input ports may be finished, but the data is still in the heap.
while self.heap.len() >= nums_active_inputs {
match self.heap.pop() {
Some(mut cursor) => {
Some(Reverse(mut cursor)) => {
let input_index = cursor.input_index;
let row_index = cursor.advance();
if !cursor.is_finished() {
self.heap.push(cursor);
self.heap.push(Reverse(cursor));
} else {
// We have read all rows of this block, need to read a new one.
self.cursor_finished[input_index] = true;
Expand Down Expand Up @@ -401,25 +354,6 @@ impl MultiSortMergeProcessor {

Ok(DataBlock::create(self.output_schema.clone(), columns))
}

/// Add comparators for newly come data blocks.
fn build_compare_map(&mut self, blocks: &[(usize, DataBlock)]) -> Result<()> {
for i in 0..self.inputs.len() {
for (j, right) in blocks {
if i != *j && !self.blocks[i].is_empty() {
let left = self.blocks[i].back().unwrap();
let mut cmp = self.compare_map.write().unwrap();
let comparators =
DataBlock::build_compare(left, right, &self.sort_columns_descriptions)?;
cmp[i][*j] = comparators;
let comparators =
DataBlock::build_compare(right, left, &self.sort_columns_descriptions)?;
cmp[*j][i] = comparators;
}
}
}
Ok(())
}
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -521,20 +455,21 @@ impl Processor for MultiSortMergeProcessor {
fn process(&mut self) -> Result<()> {
match std::mem::replace(&mut self.state, ProcessorState::Consume) {
ProcessorState::Preserve(blocks) => {
for (input_index, block) in blocks.iter() {
self.blocks[*input_index].push_back(block.clone());
}
self.build_compare_map(&blocks)?;
for (input_index, block) in blocks.into_iter() {
let columns = self
.sort_field_indices
.iter()
.map(|i| {
let col = block.column(*i);
col.as_arrow_array(col.data_type())
})
.collect::<Vec<_>>();
let rows = self.row_converter.convert_columns(&columns)?;
if !block.is_empty() {
let cursor = Cursor::try_create(
input_index,
&block,
self.sort_columns_descriptions.clone(),
self.compare_map.clone(),
)?;
self.heap.push(cursor);
let cursor = Cursor::try_create(input_index, rows);
self.heap.push(Reverse(cursor));
self.cursor_finished[input_index] = false;
self.blocks[input_index].push_back(block);
}
}
self.drain_heap();
Expand Down

0 comments on commit 75359d2

Please sign in to comment.