Skip to content

Commit

Permalink
Add parquet predicate pushdown metrics (apache#3989)
Browse files Browse the repository at this point in the history
* Log error building row filters

Inspired by @liukun4515 at https://github.com/apache/arrow-datafusion/pull/3380/files#r970198755

* Add parquet predicate pushdown metrics

* more efficient bit counting
  • Loading branch information
alamb authored and jimexist committed Oct 31, 2022
1 parent 3f1e9ca commit e9f57c5
Show file tree
Hide file tree
Showing 3 changed files with 227 additions and 40 deletions.
186 changes: 155 additions & 31 deletions datafusion/core/src/physical_plan/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,10 @@ pub struct ParquetFileMetrics {
pub row_groups_pruned: metrics::Count,
/// Total number of bytes scanned
pub bytes_scanned: metrics::Count,
/// Total rows filtered out by predicates pushed into parquet scan
pub pushdown_rows_filtered: metrics::Count,
/// Total time spent evaluating pushdown filters
pub pushdown_eval_time: metrics::Time,
}

impl ParquetFileMetrics {
Expand All @@ -258,10 +262,20 @@ impl ParquetFileMetrics {
.with_new_label("filename", filename.to_string())
.counter("bytes_scanned", partition);

let pushdown_rows_filtered = MetricBuilder::new(metrics)
.with_new_label("filename", filename.to_string())
.counter("pushdown_rows_filtered", partition);

let pushdown_eval_time = MetricBuilder::new(metrics)
.with_new_label("filename", filename.to_string())
.subset_time("pushdown_eval_time", partition);

Self {
predicate_evaluation_errors,
row_groups_pruned,
bytes_scanned,
pushdown_rows_filtered,
pushdown_eval_time,
}
}
}
Expand Down Expand Up @@ -410,7 +424,7 @@ impl FileOpener for ParquetOpener {
) -> Result<FileOpenFuture> {
let file_range = file_meta.range.clone();

let metrics = ParquetFileMetrics::new(
let file_metrics = ParquetFileMetrics::new(
self.partition_index,
file_meta.location().as_ref(),
&self.metrics,
Expand Down Expand Up @@ -450,21 +464,38 @@ impl FileOpener for ParquetOpener {
.then(|| pruning_predicate.as_ref().map(|p| p.logical_expr()))
.flatten()
{
if let Ok(Some(filter)) = build_row_filter(
let row_filter = build_row_filter(
predicate.clone(),
builder.schema().as_ref(),
table_schema.as_ref(),
builder.metadata(),
reorder_predicates,
) {
builder = builder.with_row_filter(filter);
}
&file_metrics.pushdown_rows_filtered,
&file_metrics.pushdown_eval_time,
);

match row_filter {
Ok(Some(filter)) => {
builder = builder.with_row_filter(filter);
}
Ok(None) => {}
Err(e) => {
debug!(
"Ignoring error building row filter for '{:?}': {}",
predicate, e
);
}
};
};

let file_metadata = builder.metadata();
let groups = file_metadata.row_groups();
let row_groups =
prune_row_groups(groups, file_range, pruning_predicate.clone(), &metrics);
let row_groups = prune_row_groups(
groups,
file_range,
pruning_predicate.clone(),
&file_metrics,
);

if enable_page_index && check_page_index_push_down_valid(&pruning_predicate) {
let file_offset_indexes = file_metadata.offset_indexes();
Expand All @@ -480,7 +511,7 @@ impl FileOpener for ParquetOpener {
pruning_predicate.clone(),
file_offset_indexes.get(*r),
file_page_indexes.get(*r),
&metrics,
&file_metrics,
)
.map_err(|e| {
ArrowError::ParquetError(format!(
Expand Down Expand Up @@ -564,7 +595,7 @@ impl DefaultParquetFileReaderFactory {
struct ParquetFileReader {
store: Arc<dyn ObjectStore>,
meta: ObjectMeta,
metrics: ParquetFileMetrics,
file_metrics: ParquetFileMetrics,
metadata_size_hint: Option<usize>,
}

Expand All @@ -573,7 +604,7 @@ impl AsyncFileReader for ParquetFileReader {
&mut self,
range: Range<usize>,
) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
self.metrics.bytes_scanned.add(range.end - range.start);
self.file_metrics.bytes_scanned.add(range.end - range.start);

self.store
.get_range(&self.meta.location, range)
Expand All @@ -591,7 +622,7 @@ impl AsyncFileReader for ParquetFileReader {
Self: Send,
{
let total = ranges.iter().map(|r| r.end - r.start).sum();
self.metrics.bytes_scanned.add(total);
self.file_metrics.bytes_scanned.add(total);

async move {
self.store
Expand Down Expand Up @@ -636,7 +667,7 @@ impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory {
metadata_size_hint: Option<usize>,
metrics: &ExecutionPlanMetricsSet,
) -> Result<Box<dyn AsyncFileReader + Send>> {
let parquet_file_metrics = ParquetFileMetrics::new(
let file_metrics = ParquetFileMetrics::new(
partition_index,
file_meta.location().as_ref(),
metrics,
Expand All @@ -646,7 +677,7 @@ impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory {
meta: file_meta.object_meta,
store: Arc::clone(&self.store),
metadata_size_hint,
metrics: parquet_file_metrics,
file_metrics,
}))
}
}
Expand Down Expand Up @@ -1167,6 +1198,7 @@ mod tests {
use crate::datasource::listing::{FileRange, PartitionedFile};
use crate::datasource::object_store::ObjectStoreUrl;
use crate::execution::options::CsvReadOptions;
use crate::physical_plan::metrics::MetricValue;
use crate::prelude::{ParquetReadOptions, SessionConfig, SessionContext};
use crate::test::object_store::local_unpartitioned_file;
use crate::{
Expand Down Expand Up @@ -1199,23 +1231,46 @@ mod tests {
use std::io::Write;
use tempfile::TempDir;

/// writes each RecordBatch as an individual parquet file and then
/// reads it back in to the named location.
struct RoundTripResult {
/// Data that was read back from ParquetFiles
batches: Result<Vec<RecordBatch>>,
/// The physical plan that was created (that has statistics, etc)
parquet_exec: Arc<ParquetExec>,
}

/// writes each RecordBatch as an individual parquet file and re-reads
/// the data back. Returns the data as [RecordBatch]es
async fn round_trip_to_parquet(
batches: Vec<RecordBatch>,
projection: Option<Vec<usize>>,
schema: Option<SchemaRef>,
predicate: Option<Expr>,
pushdown_predicate: bool,
) -> Result<Vec<RecordBatch>> {
round_trip(batches, projection, schema, predicate, pushdown_predicate)
.await
.batches
}

/// Writes each RecordBatch as an individual parquet file and then
/// reads them back. Returns the parquet exec as well as the data
/// as [RecordBatch]es
async fn round_trip(
batches: Vec<RecordBatch>,
projection: Option<Vec<usize>>,
schema: Option<SchemaRef>,
predicate: Option<Expr>,
pushdown_predicate: bool,
) -> RoundTripResult {
let file_schema = match schema {
Some(schema) => schema,
None => Arc::new(Schema::try_merge(
batches.iter().map(|b| b.schema().as_ref().clone()),
)?),
None => Arc::new(
Schema::try_merge(batches.iter().map(|b| b.schema().as_ref().clone()))
.unwrap(),
),
};

let (meta, _files) = store_parquet(batches).await?;
let (meta, _files) = store_parquet(batches).await.unwrap();
let file_groups = meta.into_iter().map(Into::into).collect();

// prepare the scan
Expand All @@ -1242,7 +1297,11 @@ mod tests {

let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
collect(Arc::new(parquet_exec), task_ctx).await
let parquet_exec = Arc::new(parquet_exec);
RoundTripResult {
batches: collect(parquet_exec.clone(), task_ctx).await,
parquet_exec,
}
}

// Add a new column with the specified field name to the RecordBatch
Expand Down Expand Up @@ -1453,18 +1512,18 @@ mod tests {
let filter = col("c2").eq(lit(2_i64));

// read/write them files:
let read =
round_trip_to_parquet(vec![batch1, batch2], None, None, Some(filter), true)
.await
.unwrap();
let rt = round_trip(vec![batch1, batch2], None, None, Some(filter), true).await;
let expected = vec![
"+----+----+----+",
"| c1 | c3 | c2 |",
"+----+----+----+",
"| | 20 | 2 |",
"+----+----+----+",
];
assert_batches_sorted_eq!(expected, &read);
assert_batches_sorted_eq!(expected, &rt.batches.unwrap());
let metrics = rt.parquet_exec.metrics().unwrap();
// Note there are were 6 rows in total (across three batches)
assert_eq!(get_value(&metrics, "pushdown_rows_filtered"), 5);
}

#[tokio::test]
Expand Down Expand Up @@ -1587,7 +1646,7 @@ mod tests {
}

#[tokio::test]
async fn evolved_schema_disjoint_schema_filter_with_pushdown() {
async fn evolved_schema_disjoint_schema_with_filter_pushdown() {
let c1: ArrayRef =
Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));

Expand All @@ -1602,10 +1661,7 @@ mod tests {
let filter = col("c2").eq(lit(1_i64));

// read/write them files:
let read =
round_trip_to_parquet(vec![batch1, batch2], None, None, Some(filter), true)
.await
.unwrap();
let rt = round_trip(vec![batch1, batch2], None, None, Some(filter), true).await;

let expected = vec![
"+----+----+",
Expand All @@ -1614,7 +1670,10 @@ mod tests {
"| | 1 |",
"+----+----+",
];
assert_batches_sorted_eq!(expected, &read);
assert_batches_sorted_eq!(expected, &rt.batches.unwrap());
let metrics = rt.parquet_exec.metrics().unwrap();
// Note there are were 6 rows in total (across three batches)
assert_eq!(get_value(&metrics, "pushdown_rows_filtered"), 5);
}

#[tokio::test]
Expand Down Expand Up @@ -1895,6 +1954,71 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn parquet_exec_metrics() {
let c1: ArrayRef = Arc::new(StringArray::from(vec![
Some("Foo"),
None,
Some("bar"),
Some("bar"),
Some("bar"),
Some("bar"),
Some("zzz"),
]));

// batch1: c1(string)
let batch1 = create_batch(vec![("c1", c1.clone())]);

// on
let filter = col("c1").not_eq(lit("bar"));

// read/write them files:
let rt = round_trip(vec![batch1], None, None, Some(filter), true).await;

let metrics = rt.parquet_exec.metrics().unwrap();

// assert the batches and some metrics
let expected = vec![
"+-----+", "| c1 |", "+-----+", "| Foo |", "| zzz |", "+-----+",
];
assert_batches_sorted_eq!(expected, &rt.batches.unwrap());

// pushdown predicates have eliminated all 4 bar rows and the
// null row for 5 rows total
assert_eq!(get_value(&metrics, "pushdown_rows_filtered"), 5);
assert!(
get_value(&metrics, "pushdown_eval_time") > 0,
"no eval time in metrics: {:#?}",
metrics
);
}

/// returns the sum of all the metrics with the specified name
/// the returned set.
///
/// Count: returns value
/// Time: returns elapsed nanoseconds
///
/// Panics if no such metric.
fn get_value(metrics: &MetricsSet, metric_name: &str) -> usize {
let sum = metrics.sum(|m| match m.value() {
MetricValue::Count { name, .. } if name == metric_name => true,
MetricValue::Time { name, .. } if name == metric_name => true,
_ => false,
});

match sum {
Some(MetricValue::Count { count, .. }) => count.value(),
Some(MetricValue::Time { time, .. }) => time.value(),
_ => {
panic!(
"Expected metric not found. Looking for '{}' in\n\n{:#?}",
metric_name, metrics
);
}
}
}

fn parquet_file_metrics() -> ParquetFileMetrics {
let metrics = Arc::new(ExecutionPlanMetricsSet::new());
ParquetFileMetrics::new(0, "file.parquet", &metrics)
Expand Down

0 comments on commit e9f57c5

Please sign in to comment.