Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add metrics for parquet page level skipping #4105

Merged
merged 4 commits into from
Nov 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
58 changes: 38 additions & 20 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -532,25 +532,43 @@ pub(crate) mod test_util {

pub async fn store_parquet(
batches: Vec<RecordBatch>,
multi_page: bool,
) -> Result<(Vec<ObjectMeta>, Vec<NamedTempFile>)> {
let files: Vec<_> = batches
.into_iter()
.map(|batch| {
let mut output = NamedTempFile::new().expect("creating temp file");

let props = WriterProperties::builder().build();
let mut writer =
ArrowWriter::try_new(&mut output, batch.schema(), Some(props))
.expect("creating writer");

writer.write(&batch).expect("Writing batch");
writer.close().unwrap();
output
})
.collect();

let meta: Vec<_> = files.iter().map(local_unpartitioned_file).collect();
Ok((meta, files))
if multi_page {
// All batches write in to one file, each batch must have same schema.
let mut output = NamedTempFile::new().expect("creating temp file");
let mut builder = WriterProperties::builder();
builder = builder.set_data_page_row_count_limit(2);
let proper = builder.build();
let mut writer =
ArrowWriter::try_new(&mut output, batches[0].schema(), Some(proper))
.expect("creating writer");
for b in batches {
writer.write(&b).expect("Writing batch");
}
writer.close().unwrap();
Ok((vec![local_unpartitioned_file(&output)], vec![output]))
} else {
// Each batch writes to their own file
let files: Vec<_> = batches
.into_iter()
.map(|batch| {
let mut output = NamedTempFile::new().expect("creating temp file");

let props = WriterProperties::builder().build();
let mut writer =
ArrowWriter::try_new(&mut output, batch.schema(), Some(props))
.expect("creating writer");

writer.write(&batch).expect("Writing batch");
writer.close().unwrap();
output
})
.collect();

let meta: Vec<_> = files.iter().map(local_unpartitioned_file).collect();
Ok((meta, files))
}
}
}

Expand Down Expand Up @@ -599,7 +617,7 @@ mod tests {
let batch2 = RecordBatch::try_from_iter(vec![("c2", c2)]).unwrap();

let store = Arc::new(LocalFileSystem::new()) as _;
let (meta, _files) = store_parquet(vec![batch1, batch2]).await?;
let (meta, _files) = store_parquet(vec![batch1, batch2], false).await?;

let format = ParquetFormat::default();
let schema = format.infer_schema(&store, &meta).await.unwrap();
Expand Down Expand Up @@ -738,7 +756,7 @@ mod tests {
let store = Arc::new(RequestCountingObjectStore::new(Arc::new(
LocalFileSystem::new(),
)));
let (meta, _files) = store_parquet(vec![batch1, batch2]).await?;
let (meta, _files) = store_parquet(vec![batch1, batch2], false).await?;

// Use a size hint larger than the parquet footer but smaller than the actual metadata, requiring a second fetch
// for the remaining metadata
Expand Down
59 changes: 50 additions & 9 deletions datafusion/core/src/physical_plan/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ impl FileOpener for ParquetOpener {
// page index pruning: if all data on individual pages can
// be ruled using page metadata, rows from other columns
// with that range can be skipped as well
if let Some(row_selection) = enable_page_index
if let Some(row_selection) = (enable_page_index && !row_groups.is_empty())
Copy link
Member Author

@Ted-Jiang Ted-Jiang Nov 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when all rowGroups are pruned by rg_metadata(min max), we do this skip fast

.then(|| {
page_filter::build_page_filter(
pruning_predicate.as_ref(),
Expand Down Expand Up @@ -919,7 +919,7 @@ mod tests {
datasource::file_format::{parquet::ParquetFormat, FileFormat},
physical_plan::collect,
};
use arrow::array::Float32Array;
use arrow::array::{Float32Array, Int32Array};
use arrow::datatypes::DataType::Decimal128;
use arrow::record_batch::RecordBatch;
use arrow::{
Expand Down Expand Up @@ -960,9 +960,16 @@ mod tests {
predicate: Option<Expr>,
pushdown_predicate: bool,
) -> Result<Vec<RecordBatch>> {
round_trip(batches, projection, schema, predicate, pushdown_predicate)
.await
.batches
round_trip(
batches,
projection,
schema,
predicate,
pushdown_predicate,
false,
)
.await
.batches
}

/// Writes each RecordBatch as an individual parquet file and then
Expand All @@ -974,6 +981,7 @@ mod tests {
schema: Option<SchemaRef>,
predicate: Option<Expr>,
pushdown_predicate: bool,
page_index_predicate: bool,
) -> RoundTripResult {
let file_schema = match schema {
Some(schema) => schema,
Expand All @@ -983,7 +991,7 @@ mod tests {
),
};

let (meta, _files) = store_parquet(batches).await.unwrap();
let (meta, _files) = store_parquet(batches, page_index_predicate).await.unwrap();
let file_groups = meta.into_iter().map(Into::into).collect();

// prepare the scan
Expand All @@ -1008,6 +1016,10 @@ mod tests {
.with_reorder_filters(true);
}

if page_index_predicate {
parquet_exec = parquet_exec.with_enable_page_index(true);
}

let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let parquet_exec = Arc::new(parquet_exec);
Expand Down Expand Up @@ -1225,7 +1237,8 @@ mod tests {
let filter = col("c2").eq(lit(2_i64));

// read/write them files:
let rt = round_trip(vec![batch1, batch2], None, None, Some(filter), true).await;
let rt =
round_trip(vec![batch1, batch2], None, None, Some(filter), true, false).await;
let expected = vec![
"+----+----+----+",
"| c1 | c3 | c2 |",
Expand Down Expand Up @@ -1374,7 +1387,8 @@ mod tests {
let filter = col("c2").eq(lit(1_i64));

// read/write them files:
let rt = round_trip(vec![batch1, batch2], None, None, Some(filter), true).await;
let rt =
round_trip(vec![batch1, batch2], None, None, Some(filter), true, false).await;

let expected = vec![
"+----+----+",
Expand Down Expand Up @@ -1695,6 +1709,33 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn parquet_page_index_exec_metrics() {
let c1: ArrayRef = Arc::new(Int32Array::from(vec![Some(1), None, Some(2)]));
let c2: ArrayRef = Arc::new(Int32Array::from(vec![Some(3), Some(4), Some(5)]));
let batch1 = create_batch(vec![("int", c1.clone())]);
let batch2 = create_batch(vec![("int", c2.clone())]);

let filter = col("int").eq(lit(4_i32));

let rt =
round_trip(vec![batch1, batch2], None, None, Some(filter), false, true).await;

let metrics = rt.parquet_exec.metrics().unwrap();

// assert the batches and some metrics
let expected = vec![
"+-----+", "| int |", "+-----+", "| 3 |", "| 4 |", "| 5 |", "+-----+",
];
assert_batches_sorted_eq!(expected, &rt.batches.unwrap());
assert_eq!(get_value(&metrics, "page_index_rows_filtered"), 3);
assert!(
get_value(&metrics, "page_index_eval_time") > 0,
"no eval time in metrics: {:#?}",
metrics
);
}

#[tokio::test]
async fn parquet_exec_metrics() {
let c1: ArrayRef = Arc::new(StringArray::from(vec![
Expand All @@ -1714,7 +1755,7 @@ mod tests {
let filter = col("c1").not_eq(lit("bar"));

// read/write them files:
let rt = round_trip(vec![batch1], None, None, Some(filter), true).await;
let rt = round_trip(vec![batch1], None, None, Some(filter), true, false).await;

let metrics = rt.parquet_exec.metrics().unwrap();

Expand Down
13 changes: 13 additions & 0 deletions datafusion/core/src/physical_plan/file_format/parquet/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ pub struct ParquetFileMetrics {
pub pushdown_rows_filtered: Count,
/// Total time spent evaluating pushdown filters
pub pushdown_eval_time: Time,
/// Total rows filtered out by parquet page index
pub page_index_rows_filtered: Count,
/// Total time spent evaluating parquet page index filters
pub page_index_eval_time: Time,
}

impl ParquetFileMetrics {
Expand Down Expand Up @@ -63,13 +67,22 @@ impl ParquetFileMetrics {
let pushdown_eval_time = MetricBuilder::new(metrics)
.with_new_label("filename", filename.to_string())
.subset_time("pushdown_eval_time", partition);
let page_index_rows_filtered = MetricBuilder::new(metrics)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

.with_new_label("filename", filename.to_string())
.counter("page_index_rows_filtered", partition);

let page_index_eval_time = MetricBuilder::new(metrics)
.with_new_label("filename", filename.to_string())
.subset_time("page_index_eval_time", partition);

Self {
predicate_evaluation_errors,
row_groups_pruned,
bytes_scanned,
pushdown_rows_filtered,
pushdown_eval_time,
page_index_rows_filtered,
page_index_eval_time,
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ pub(crate) fn build_page_filter(
file_metadata: &ParquetMetaData,
file_metrics: &ParquetFileMetrics,
) -> Result<Option<RowSelection>> {
// scoped timer updates on drop
let _timer_guard = file_metrics.page_index_eval_time.timer();
let page_index_predicates =
extract_page_index_push_down_predicates(pruning_predicate, schema)?;

Expand Down Expand Up @@ -154,6 +156,18 @@ pub(crate) fn build_page_filter(
row_selections.push_back(selectors.into_iter().flatten().collect::<Vec<_>>());
}
let final_selection = combine_multi_col_selection(row_selections);
let total_skip =
final_selection.iter().fold(
0,
|acc, x| {
if x.skip {
acc + x.row_count
} else {
acc
}
},
);
file_metrics.page_index_rows_filtered.add(total_skip);
Ok(Some(final_selection.into()))
} else {
Ok(None)
Expand Down