Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add parquet predicate pushdown metrics #3989

Merged
merged 4 commits into from
Oct 30, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
186 changes: 155 additions & 31 deletions datafusion/core/src/physical_plan/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,10 @@ pub struct ParquetFileMetrics {
pub row_groups_pruned: metrics::Count,
/// Total number of bytes scanned
pub bytes_scanned: metrics::Count,
/// Total rows filtered out by predicates pushed into parquet scan
pub pushdown_rows_filtered: metrics::Count,
/// Total time spent evaluating pushdown filters
pub pushdown_eval_time: metrics::Time,
}

impl ParquetFileMetrics {
Expand All @@ -258,10 +262,20 @@ impl ParquetFileMetrics {
.with_new_label("filename", filename.to_string())
.counter("bytes_scanned", partition);

let pushdown_rows_filtered = MetricBuilder::new(metrics)
.with_new_label("filename", filename.to_string())
.counter("pushdown_rows_filtered", partition);

let pushdown_eval_time = MetricBuilder::new(metrics)
.with_new_label("filename", filename.to_string())
.subset_time("pushdown_eval_time", partition);

Self {
predicate_evaluation_errors,
row_groups_pruned,
bytes_scanned,
pushdown_rows_filtered,
pushdown_eval_time,
}
}
}
Expand Down Expand Up @@ -410,7 +424,7 @@ impl FileOpener for ParquetOpener {
) -> Result<FileOpenFuture> {
let file_range = file_meta.range.clone();

let metrics = ParquetFileMetrics::new(
let file_metrics = ParquetFileMetrics::new(
self.partition_index,
file_meta.location().as_ref(),
&self.metrics,
Expand Down Expand Up @@ -450,21 +464,38 @@ impl FileOpener for ParquetOpener {
.then(|| pruning_predicate.as_ref().map(|p| p.logical_expr()))
.flatten()
{
if let Ok(Some(filter)) = build_row_filter(
let row_filter = build_row_filter(
predicate.clone(),
builder.schema().as_ref(),
table_schema.as_ref(),
builder.metadata(),
reorder_predicates,
) {
builder = builder.with_row_filter(filter);
}
&file_metrics.pushdown_rows_filtered,
&file_metrics.pushdown_eval_time,
);

match row_filter {
Ok(Some(filter)) => {
builder = builder.with_row_filter(filter);
}
Ok(None) => {}
Err(e) => {
debug!(
"Ignoring error building row filter for '{:?}': {}",
predicate, e
);
}
};
};

let file_metadata = builder.metadata();
let groups = file_metadata.row_groups();
let row_groups =
prune_row_groups(groups, file_range, pruning_predicate.clone(), &metrics);
let row_groups = prune_row_groups(
groups,
file_range,
pruning_predicate.clone(),
&file_metrics,
);

if enable_page_index && check_page_index_push_down_valid(&pruning_predicate) {
let file_offset_indexes = file_metadata.offset_indexes();
Expand All @@ -480,7 +511,7 @@ impl FileOpener for ParquetOpener {
pruning_predicate.clone(),
file_offset_indexes.get(*r),
file_page_indexes.get(*r),
&metrics,
&file_metrics,
)
.map_err(|e| {
ArrowError::ParquetError(format!(
Expand Down Expand Up @@ -564,7 +595,7 @@ impl DefaultParquetFileReaderFactory {
struct ParquetFileReader {
store: Arc<dyn ObjectStore>,
meta: ObjectMeta,
metrics: ParquetFileMetrics,
file_metrics: ParquetFileMetrics,
metadata_size_hint: Option<usize>,
}

Expand All @@ -573,7 +604,7 @@ impl AsyncFileReader for ParquetFileReader {
&mut self,
range: Range<usize>,
) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
self.metrics.bytes_scanned.add(range.end - range.start);
self.file_metrics.bytes_scanned.add(range.end - range.start);

self.store
.get_range(&self.meta.location, range)
Expand All @@ -591,7 +622,7 @@ impl AsyncFileReader for ParquetFileReader {
Self: Send,
{
let total = ranges.iter().map(|r| r.end - r.start).sum();
self.metrics.bytes_scanned.add(total);
self.file_metrics.bytes_scanned.add(total);

async move {
self.store
Expand Down Expand Up @@ -636,7 +667,7 @@ impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory {
metadata_size_hint: Option<usize>,
metrics: &ExecutionPlanMetricsSet,
) -> Result<Box<dyn AsyncFileReader + Send>> {
let parquet_file_metrics = ParquetFileMetrics::new(
let file_metrics = ParquetFileMetrics::new(
partition_index,
file_meta.location().as_ref(),
metrics,
Expand All @@ -646,7 +677,7 @@ impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory {
meta: file_meta.object_meta,
store: Arc::clone(&self.store),
metadata_size_hint,
metrics: parquet_file_metrics,
file_metrics,
}))
}
}
Expand Down Expand Up @@ -1167,6 +1198,7 @@ mod tests {
use crate::datasource::listing::{FileRange, PartitionedFile};
use crate::datasource::object_store::ObjectStoreUrl;
use crate::execution::options::CsvReadOptions;
use crate::physical_plan::metrics::MetricValue;
use crate::prelude::{ParquetReadOptions, SessionConfig, SessionContext};
use crate::test::object_store::local_unpartitioned_file;
use crate::{
Expand Down Expand Up @@ -1199,23 +1231,46 @@ mod tests {
use std::io::Write;
use tempfile::TempDir;

/// writes each RecordBatch as an individual parquet file and then
/// reads it back in to the named location.
struct RoundTripResult {
/// Data that was read back from ParquetFiles
batches: Result<Vec<RecordBatch>>,
/// The physical plan that was created (that has statistics, etc)
parquet_exec: Arc<ParquetExec>,
}

/// writes each RecordBatch as an individual parquet file and re-reads
/// the data back. Returns the data as [RecordBatch]es
async fn round_trip_to_parquet(
batches: Vec<RecordBatch>,
projection: Option<Vec<usize>>,
schema: Option<SchemaRef>,
predicate: Option<Expr>,
pushdown_predicate: bool,
) -> Result<Vec<RecordBatch>> {
round_trip(batches, projection, schema, predicate, pushdown_predicate)
.await
.batches
}

/// Writes each RecordBatch as an individual parquet file and then
/// reads them back. Returns the parquet exec as well as the data
/// as [RecordBatch]es
async fn round_trip(
batches: Vec<RecordBatch>,
projection: Option<Vec<usize>>,
schema: Option<SchemaRef>,
predicate: Option<Expr>,
pushdown_predicate: bool,
) -> RoundTripResult {
let file_schema = match schema {
Some(schema) => schema,
None => Arc::new(Schema::try_merge(
batches.iter().map(|b| b.schema().as_ref().clone()),
)?),
None => Arc::new(
Schema::try_merge(batches.iter().map(|b| b.schema().as_ref().clone()))
.unwrap(),
),
};

let (meta, _files) = store_parquet(batches).await?;
let (meta, _files) = store_parquet(batches).await.unwrap();
let file_groups = meta.into_iter().map(Into::into).collect();

// prepare the scan
Expand All @@ -1242,7 +1297,11 @@ mod tests {

let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
collect(Arc::new(parquet_exec), task_ctx).await
let parquet_exec = Arc::new(parquet_exec);
RoundTripResult {
batches: collect(parquet_exec.clone(), task_ctx).await,
parquet_exec,
}
}

// Add a new column with the specified field name to the RecordBatch
Expand Down Expand Up @@ -1453,18 +1512,18 @@ mod tests {
let filter = col("c2").eq(lit(2_i64));

// read/write them files:
let read =
round_trip_to_parquet(vec![batch1, batch2], None, None, Some(filter), true)
.await
.unwrap();
let rt = round_trip(vec![batch1, batch2], None, None, Some(filter), true).await;
let expected = vec![
"+----+----+----+",
"| c1 | c3 | c2 |",
"+----+----+----+",
"| | 20 | 2 |",
"+----+----+----+",
];
assert_batches_sorted_eq!(expected, &read);
assert_batches_sorted_eq!(expected, &rt.batches.unwrap());
let metrics = rt.parquet_exec.metrics().unwrap();
// Note there are were 6 rows in total (across three batches)
assert_eq!(get_value(&metrics, "pushdown_rows_filtered"), 5);
}

#[tokio::test]
Expand Down Expand Up @@ -1587,7 +1646,7 @@ mod tests {
}

#[tokio::test]
async fn evolved_schema_disjoint_schema_filter_with_pushdown() {
async fn evolved_schema_disjoint_schema_with_filter_pushdown() {
let c1: ArrayRef =
Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));

Expand All @@ -1602,10 +1661,7 @@ mod tests {
let filter = col("c2").eq(lit(1_i64));

// read/write them files:
let read =
round_trip_to_parquet(vec![batch1, batch2], None, None, Some(filter), true)
.await
.unwrap();
let rt = round_trip(vec![batch1, batch2], None, None, Some(filter), true).await;

let expected = vec![
"+----+----+",
Expand All @@ -1614,7 +1670,10 @@ mod tests {
"| | 1 |",
"+----+----+",
];
assert_batches_sorted_eq!(expected, &read);
assert_batches_sorted_eq!(expected, &rt.batches.unwrap());
let metrics = rt.parquet_exec.metrics().unwrap();
// Note there are were 6 rows in total (across three batches)
assert_eq!(get_value(&metrics, "pushdown_rows_filtered"), 5);
}

#[tokio::test]
Expand Down Expand Up @@ -1895,6 +1954,71 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn parquet_exec_metrics() {
let c1: ArrayRef = Arc::new(StringArray::from(vec![
Some("Foo"),
None,
Some("bar"),
Some("bar"),
Some("bar"),
Some("bar"),
Some("zzz"),
]));

// batch1: c1(string)
let batch1 = create_batch(vec![("c1", c1.clone())]);

// on
let filter = col("c1").not_eq(lit("bar"));

// read/write them files:
let rt = round_trip(vec![batch1], None, None, Some(filter), true).await;

let metrics = rt.parquet_exec.metrics().unwrap();

// assert the batches and some metrics
let expected = vec![
"+-----+", "| c1 |", "+-----+", "| Foo |", "| zzz |", "+-----+",
];
assert_batches_sorted_eq!(expected, &rt.batches.unwrap());

// pushdown predicates have eliminated all 4 bar rows and the
// null row for 5 rows total
assert_eq!(get_value(&metrics, "pushdown_rows_filtered"), 5);
assert!(
get_value(&metrics, "pushdown_eval_time") > 0,
"no eval time in metrics: {:#?}",
metrics
);
}

/// returns the sum of all the metrics with the specified name
/// the returned set.
///
/// Count: returns value
/// Time: returns elapsed nanoseconds
///
/// Panics if no such metric.
fn get_value(metrics: &MetricsSet, metric_name: &str) -> usize {
let sum = metrics.sum(|m| match m.value() {
MetricValue::Count { name, .. } if name == metric_name => true,
MetricValue::Time { name, .. } if name == metric_name => true,
_ => false,
});

match sum {
Some(MetricValue::Count { count, .. }) => count.value(),
Some(MetricValue::Time { time, .. }) => time.value(),
_ => {
panic!(
"Expected metric not found. Looking for '{}' in\n\n{:#?}",
metric_name, metrics
);
}
}
}

fn parquet_file_metrics() -> ParquetFileMetrics {
let metrics = Arc::new(ExecutionPlanMetricsSet::new());
ParquetFileMetrics::new(0, "file.parquet", &metrics)
Expand Down