Skip to content

Commit

Permalink
Correctness integration test for parquet filter pushdown (#3976)
Browse files Browse the repository at this point in the history
* parquet filter pushdown correctness tests

* Do not run tests on windows

* Drop shared file after tests are over

* Rework to be single threaded
  • Loading branch information
alamb committed Nov 4, 2022
1 parent 60f3ef6 commit 695cedc
Show file tree
Hide file tree
Showing 5 changed files with 494 additions and 12 deletions.
1 change: 1 addition & 0 deletions datafusion/core/Cargo.toml
Expand Up @@ -106,6 +106,7 @@ csv = "1.1.6"
ctor = "0.1.22"
doc-comment = "0.3"
env_logger = "0.9"
parquet-test-utils = { path = "../../parquet-test-utils" }
rstest = "0.15.0"
test-utils = { path = "../../test-utils" }

Expand Down
12 changes: 2 additions & 10 deletions datafusion/core/src/physical_plan/file_format/parquet.rs
Expand Up @@ -911,7 +911,6 @@ mod tests {
use crate::datasource::listing::{FileRange, PartitionedFile};
use crate::datasource::object_store::ObjectStoreUrl;
use crate::execution::options::CsvReadOptions;
use crate::physical_plan::metrics::MetricValue;
use crate::prelude::{ParquetReadOptions, SessionConfig, SessionContext};
use crate::test::object_store::local_unpartitioned_file;
use crate::{
Expand Down Expand Up @@ -1742,15 +1741,8 @@ mod tests {
///
/// Panics if no such metric.
fn get_value(metrics: &MetricsSet, metric_name: &str) -> usize {
let sum = metrics.sum(|m| match m.value() {
MetricValue::Count { name, .. } if name == metric_name => true,
MetricValue::Time { name, .. } if name == metric_name => true,
_ => false,
});

match sum {
Some(MetricValue::Count { count, .. }) => count.value(),
Some(MetricValue::Time { time, .. }) => time.value(),
match metrics.sum_by_name(metric_name) {
Some(v) => v.as_usize(),
_ => {
panic!(
"Expected metric not found. Looking for '{}' in\n\n{:#?}",
Expand Down
17 changes: 17 additions & 0 deletions datafusion/core/src/physical_plan/metrics/mod.rs
Expand Up @@ -241,6 +241,23 @@ impl MetricsSet {
Some(accum)
}

/// returns the sum of all the metrics with the specified name
/// the returned set.
pub fn sum_by_name(&self, metric_name: &str) -> Option<MetricValue> {
self.sum(|m| match m.value() {
MetricValue::Count { name, .. } => name == metric_name,
MetricValue::Time { name, .. } => name == metric_name,
MetricValue::OutputRows(_) => false,
MetricValue::ElapsedCompute(_) => false,
MetricValue::SpillCount(_) => false,
MetricValue::SpilledBytes(_) => false,
MetricValue::CurrentMemoryUsage(_) => false,
MetricValue::Gauge { name, .. } => name == metric_name,
MetricValue::StartTimestamp(_) => false,
MetricValue::EndTimestamp(_) => false,
})
}

/// Returns returns a new derived `MetricsSet` where all metrics
/// that had the same name and partition=`Some(..)` have been
/// aggregated together. The resulting `MetricsSet` has all
Expand Down

0 comments on commit 695cedc

Please sign in to comment.