-
Notifications
You must be signed in to change notification settings - Fork 977
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add metrics for parquet page level skipping #4105
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -449,7 +449,7 @@ impl FileOpener for ParquetOpener { | |
// page index pruning: if all data on individual pages can | ||
// be ruled using page metadata, rows from other columns | ||
// with that range can be skipped as well | ||
if let Some(row_selection) = enable_page_index | ||
if let Some(row_selection) = (enable_page_index && !row_groups.is_empty()) | ||
.then(|| { | ||
page_filter::build_page_filter( | ||
pruning_predicate.as_ref(), | ||
|
@@ -919,7 +919,7 @@ mod tests { | |
datasource::file_format::{parquet::ParquetFormat, FileFormat}, | ||
physical_plan::collect, | ||
}; | ||
use arrow::array::Float32Array; | ||
use arrow::array::{Float32Array, Int32Array}; | ||
use arrow::datatypes::DataType::Decimal128; | ||
use arrow::record_batch::RecordBatch; | ||
use arrow::{ | ||
|
@@ -960,9 +960,16 @@ mod tests { | |
predicate: Option<Expr>, | ||
pushdown_predicate: bool, | ||
) -> Result<Vec<RecordBatch>> { | ||
round_trip(batches, projection, schema, predicate, pushdown_predicate) | ||
.await | ||
.batches | ||
round_trip( | ||
batches, | ||
projection, | ||
schema, | ||
predicate, | ||
pushdown_predicate, | ||
false, | ||
) | ||
.await | ||
.batches | ||
} | ||
|
||
/// Writes each RecordBatch as an individual parquet file and then | ||
|
@@ -974,6 +981,7 @@ mod tests { | |
schema: Option<SchemaRef>, | ||
predicate: Option<Expr>, | ||
pushdown_predicate: bool, | ||
page_index_predicate: bool, | ||
) -> RoundTripResult { | ||
let file_schema = match schema { | ||
Some(schema) => schema, | ||
|
@@ -983,7 +991,7 @@ mod tests { | |
), | ||
}; | ||
|
||
let (meta, _files) = store_parquet(batches).await.unwrap(); | ||
let (meta, _files) = store_parquet(batches, page_index_predicate).await.unwrap(); | ||
let file_groups = meta.into_iter().map(Into::into).collect(); | ||
|
||
// prepare the scan | ||
|
@@ -1008,6 +1016,10 @@ mod tests { | |
.with_reorder_filters(true); | ||
} | ||
|
||
if page_index_predicate { | ||
parquet_exec = parquet_exec.with_enable_page_index(true); | ||
} | ||
|
||
let session_ctx = SessionContext::new(); | ||
let task_ctx = session_ctx.task_ctx(); | ||
let parquet_exec = Arc::new(parquet_exec); | ||
|
@@ -1225,7 +1237,8 @@ mod tests { | |
let filter = col("c2").eq(lit(2_i64)); | ||
|
||
// read/write them files: | ||
let rt = round_trip(vec![batch1, batch2], None, None, Some(filter), true).await; | ||
let rt = | ||
round_trip(vec![batch1, batch2], None, None, Some(filter), true, false).await; | ||
let expected = vec![ | ||
"+----+----+----+", | ||
"| c1 | c3 | c2 |", | ||
|
@@ -1374,7 +1387,8 @@ mod tests { | |
let filter = col("c2").eq(lit(1_i64)); | ||
|
||
// read/write them files: | ||
let rt = round_trip(vec![batch1, batch2], None, None, Some(filter), true).await; | ||
let rt = | ||
round_trip(vec![batch1, batch2], None, None, Some(filter), true, false).await; | ||
|
||
let expected = vec![ | ||
"+----+----+", | ||
|
@@ -1695,6 +1709,35 @@ mod tests { | |
Ok(()) | ||
} | ||
|
||
#[tokio::test] | ||
async fn parquet_page_index_exec_metrics() { | ||
let c1: ArrayRef = Arc::new(Int32Array::from(vec![Some(1), None, Some(2)])); | ||
let c2: ArrayRef = Arc::new(Int32Array::from(vec![Some(3), Some(4), Some(5)])); | ||
let batch1 = create_batch(vec![("int", c1.clone())]); | ||
let batch2 = create_batch(vec![("int", c2.clone())]); | ||
|
||
let filter = col("int").eq(lit(4_i32)); | ||
|
||
let rt = | ||
round_trip(vec![batch1, batch2], None, None, Some(filter), false, true).await; | ||
|
||
let metrics = rt.parquet_exec.metrics().unwrap(); | ||
|
||
// todo fix this https://github.com/apache/arrow-rs/issues/2941 release change to row limit. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. https://github.com/apache/arrow-rs/pull/2942/files#r1013838557 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. But from the real world file, this metic works fine😂 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. in #4039 |
||
// assert the batches and some metrics | ||
let expected = vec![ | ||
"+-----+", "| int |", "+-----+", "| |", "| 1 |", "| 2 |", "| 3 |", | ||
"| 4 |", "| 5 |", "+-----+", | ||
]; | ||
assert_batches_sorted_eq!(expected, &rt.batches.unwrap()); | ||
assert_eq!(get_value(&metrics, "page_index_rows_filtered"), 0); | ||
assert!( | ||
get_value(&metrics, "page_index_eval_time") > 0, | ||
"no eval time in metrics: {:#?}", | ||
metrics | ||
); | ||
} | ||
|
||
#[tokio::test] | ||
async fn parquet_exec_metrics() { | ||
let c1: ArrayRef = Arc::new(StringArray::from(vec![ | ||
|
@@ -1714,7 +1757,7 @@ mod tests { | |
let filter = col("c1").not_eq(lit("bar")); | ||
|
||
// read/write them files: | ||
let rt = round_trip(vec![batch1], None, None, Some(filter), true).await; | ||
let rt = round_trip(vec![batch1], None, None, Some(filter), true, false).await; | ||
|
||
let metrics = rt.parquet_exec.metrics().unwrap(); | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -35,6 +35,10 @@ pub struct ParquetFileMetrics { | |
pub pushdown_rows_filtered: Count, | ||
/// Total time spent evaluating pushdown filters | ||
pub pushdown_eval_time: Time, | ||
/// Total rows filtered out by parquet page index | ||
pub page_index_rows_filtered: Count, | ||
/// Total time spent evaluating parquet page index filters | ||
pub page_index_eval_time: Time, | ||
} | ||
|
||
impl ParquetFileMetrics { | ||
|
@@ -63,13 +67,22 @@ impl ParquetFileMetrics { | |
let pushdown_eval_time = MetricBuilder::new(metrics) | ||
.with_new_label("filename", filename.to_string()) | ||
.subset_time("pushdown_eval_time", partition); | ||
let page_index_rows_filtered = MetricBuilder::new(metrics) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
.with_new_label("filename", filename.to_string()) | ||
.counter("page_index_rows_filtered", partition); | ||
|
||
let page_index_eval_time = MetricBuilder::new(metrics) | ||
.with_new_label("filename", filename.to_string()) | ||
.subset_time("page_index_eval_time", partition); | ||
|
||
Self { | ||
predicate_evaluation_errors, | ||
row_groups_pruned, | ||
bytes_scanned, | ||
pushdown_rows_filtered, | ||
pushdown_eval_time, | ||
page_index_rows_filtered, | ||
page_index_eval_time, | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when all rowGroups are pruned by rg_metadata(min max), we do this skip fast