Skip to content

Commit

Permalink
Merge pull request #8452 from RinChanNOWWW/improve_sort
Browse files Browse the repository at this point in the history
feat(query): improve sort.
  • Loading branch information
mergify[bot] committed Nov 4, 2022
2 parents 05e7db2 + bbbf55e commit 93bb4ba
Show file tree
Hide file tree
Showing 15 changed files with 617 additions and 70 deletions.
7 changes: 5 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/common/arrow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ simd = ["arrow/simd"]
# Workspace dependencies

# Crates.io dependencies
arrow = { package = "arrow2", version = "0.14.0", default-features = false, features = [
arrow = { package = "arrow2", git = "https://github.com/RinChanNOWWW/arrow2", rev = "8bd6417", default-features = false, features = [
"io_parquet",
"io_parquet_compression",
] }
Expand Down
3 changes: 1 addition & 2 deletions src/query/datablocks/src/kernels/data_block_sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ impl DataBlock {
.map(|f| {
let left = lhs.try_column_by_name(f.name())?;
let right = rhs.try_column_by_name(f.name())?;
Self::take_columns_by_slices_limit(
Self::take_column_by_slices_limit(
f.data_type(),
&[left.clone(), right.clone()],
&slices,
Expand Down Expand Up @@ -210,7 +210,6 @@ impl DataBlock {
fn compare_variant(left: &dyn Array, right: &dyn Array) -> ArrowResult<DynComparator> {
let left = VariantColumn::from_arrow_array(left);
let right = VariantColumn::from_arrow_array(right);

Ok(Box::new(move |i, j| {
left.get_data(i).cmp(right.get_data(j))
}))
Expand Down
24 changes: 23 additions & 1 deletion src/query/datablocks/src/kernels/data_block_take.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,29 @@ impl DataBlock {
Ok(DataBlock::create(schema.clone(), result_columns))
}

pub fn take_columns_by_slices_limit(
pub fn block_take_by_slices_limit(
raw: &DataBlock,
slice: (usize, usize),
limit: Option<usize>,
) -> Result<DataBlock> {
let fields = raw.schema().fields();
let columns = fields
.iter()
.map(|f| {
let column = raw.try_column_by_name(f.name())?.clone();
Self::take_column_by_slices_limit(
f.data_type(),
&[column],
&[(0, slice.0, slice.1)],
limit,
)
})
.collect::<Result<Vec<_>>>()?;
let data = DataBlock::create(raw.schema().clone(), columns);
Ok(data)
}

pub fn take_column_by_slices_limit(
data_type: &DataTypeImpl,
columns: &[ColumnRef],
slices: &[MergeSlice],
Expand Down
43 changes: 28 additions & 15 deletions src/query/pipeline/core/src/pipeline_display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,21 +71,34 @@ impl<'a> Display for PipelineIndentDisplayWrapper<'a> {
..
} => {
let prev_name = Self::pipe_name(&pipes[pipes.len() - index - 2]);
let post_name = Self::pipe_name(&pipes[pipes.len() - index]);

write!(
f,
"Merge ({} × {} {}) to ({} × {})",
prev_name,
inputs_port.len(),
if inputs_port.len() == 1 {
"processor"
} else {
"processors"
},
post_name,
outputs_port.len(),
)?;
if index > 0 {
let post_name = Self::pipe_name(&pipes[pipes.len() - index]);
write!(
f,
"Merge ({} × {} {}) to ({} × {})",
prev_name,
inputs_port.len(),
if inputs_port.len() == 1 {
"processor"
} else {
"processors"
},
post_name,
outputs_port.len(),
)?;
} else {
write!(
f,
"Merge ({} × {} {})",
prev_name,
inputs_port.len(),
if inputs_port.len() == 1 {
"processor"
} else {
"processors"
},
)?;
}
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/query/pipeline/transforms/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ doctest = false
test = false

[dependencies]
common-arrow = { path = "../../../common/arrow" }
common-datablocks = { path = "../../datablocks" }
common-datavalues = { path = "../../datavalues" }
common-exception = { path = "../../../common/exception" }
common-pipeline-core = { path = "../core" }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@ pub mod transform;
pub mod transform_block_compact;
pub mod transform_compact;
pub mod transform_limit;
pub mod transform_multi_sort_merge;
pub mod transform_sort_merge;
pub mod transform_sort_partial;

pub use transform::*;
pub use transform_block_compact::*;
pub use transform_compact::*;
pub use transform_limit::*;
pub use transform_multi_sort_merge::*;
pub use transform_sort_merge::*;
pub use transform_sort_partial::*;

1 comment on commit 93bb4ba

@vercel
Copy link

@vercel vercel bot commented on 93bb4ba Nov 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

databend – ./

databend-git-main-databend.vercel.app
databend.vercel.app
databend.rs
databend-databend.vercel.app

Please sign in to comment.