Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat] Support columns_sorted in row_filters with pageIndex #3477

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ object_store = "0.5.0"
ordered-float = "3.0"
parking_lot = "0.12"
parquet = { version = "22.0.0", features = ["arrow", "async"] }
parquet-format = "4.0.0"
Copy link
Member Author

@Ted-Jiang Ted-Jiang Sep 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we move this logic to arrow-rs to avoid import this parquet-format

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that makes sense

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think parquet 23.0.0 will have properly exposed arrow definitions (added by @tustvold in apache/arrow-rs#2626)

Though now that I see this, I see the proposal is probably to move the functions columns_sorted and check_is_ordered into the parquet crate which makes sense to me (and will likely get reviewed by some others with more parquet knowledge)

@Ted-Jiang can you file a ticket / PR to do so?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure!

paste = "^1.0"
pin-project-lite = "^0.2.7"
pyo3 = { version = "0.17.1", optional = true }
Expand Down
27 changes: 25 additions & 2 deletions datafusion/core/src/physical_plan/file_format/row_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ use datafusion_physical_expr::{create_physical_expr, PhysicalExpr};
use parquet::arrow::arrow_reader::{ArrowPredicate, RowFilter};
use parquet::arrow::ProjectionMask;
use parquet::file::metadata::ParquetMetaData;
use parquet::file::page_index::index::Index;
use parquet_format::BoundaryOrder;
use std::sync::Arc;

/// This module contains utilities for enabling the pushdown of DataFusion filter predicates (which
Expand Down Expand Up @@ -228,11 +230,32 @@ fn size_of_columns(columns: &[usize], metadata: &ParquetMetaData) -> Result<usiz
/// For a given set of `Column`s required for predicate `Expr` determine whether all
/// columns are sorted. Sorted columns may be queried more efficiently in the presence of
/// a PageIndex.
fn columns_sorted(_columns: &[usize], _metadata: &ParquetMetaData) -> Result<bool> {
// TODO How do we know this?
fn columns_sorted(columns: &[usize], metadata: &ParquetMetaData) -> Result<bool> {
Ted-Jiang marked this conversation as resolved.
Show resolved Hide resolved
// For now we only set sorted with single col with pageIndex
if columns.len() == 1 && metadata.page_indexes().is_some() {
if let Some(indexes) = metadata.page_indexes().unwrap().get(0) {
return match indexes.get(columns[0]).unwrap() {
Index::NONE
| Index::BOOLEAN(_)
| Index::BYTE_ARRAY(_)
| Index::FIXED_LEN_BYTE_ARRAY(_) => Ok(false),
Ted-Jiang marked this conversation as resolved.
Show resolved Hide resolved
Index::INT32(x) => Ok(check_is_ordered(&x.boundary_order)),
Index::INT64(x) => Ok(check_is_ordered(&x.boundary_order)),
Index::INT96(x) => Ok(check_is_ordered(&x.boundary_order)),
Index::FLOAT(x) => Ok(check_is_ordered(&x.boundary_order)),
Index::DOUBLE(x) => Ok(check_is_ordered(&x.boundary_order)),
};
};
}
Ok(false)
}

fn check_is_ordered(b: &BoundaryOrder) -> bool {
match b {
BoundaryOrder::Unordered => false,
BoundaryOrder::Ascending | BoundaryOrder::Descending => true,
}
}
/// Build a [`RowFilter`] from the given predicate `Expr`
pub fn build_row_filter(
expr: Expr,
Expand Down