Skip to content

Commit

Permalink
Corretness tests for parquet predicate pushdown
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Oct 31, 2022
1 parent 8f1ae58 commit 43af7a9
Show file tree
Hide file tree
Showing 7 changed files with 506 additions and 130 deletions.
15 changes: 14 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,20 @@

[workspace]
exclude = ["datafusion-cli"]
members = ["datafusion/common", "datafusion/core", "datafusion/expr", "datafusion/jit", "datafusion/optimizer", "datafusion/physical-expr", "datafusion/proto", "datafusion/row", "datafusion/sql", "datafusion-examples", "benchmarks",
members = [
"datafusion/common",
"datafusion/core",
"datafusion/expr",
"datafusion/jit",
"datafusion/optimizer",
"datafusion/physical-expr",
"datafusion/proto",
"datafusion/row",
"datafusion/sql",
"datafusion-examples",
"test-utils",
"parquet-test-utils",
"benchmarks",
]

[profile.release]
Expand Down
1 change: 1 addition & 0 deletions benchmarks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ mimalloc = { version = "0.1", optional = true, default-features = false }
num_cpus = "1.13.0"
object_store = "0.5.0"
parquet = "25.0.0"
parquet-test-utils = { path = "../parquet-test-utils/" }
rand = "0.8.4"
serde = { version = "1.0.136", features = ["derive"] }
serde_json = "1.0.78"
Expand Down
144 changes: 15 additions & 129 deletions benchmarks/src/bin/parquet_filter_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,30 +15,14 @@
// specific language governing permissions and limitations
// under the License.

use arrow::datatypes::SchemaRef;
use arrow::util::pretty;
use datafusion::common::{Result, ToDFSchema};
use datafusion::config::{
ConfigOptions, OPT_PARQUET_ENABLE_PAGE_INDEX, OPT_PARQUET_PUSHDOWN_FILTERS,
OPT_PARQUET_REORDER_FILTERS,
};
use datafusion::datasource::listing::{ListingTableUrl, PartitionedFile};
use datafusion::datasource::object_store::ObjectStoreUrl;
use datafusion::execution::context::ExecutionProps;
use datafusion::common::Result;
use datafusion::logical_expr::{lit, or, Expr};
use datafusion::optimizer::utils::disjunction;
use datafusion::physical_expr::create_physical_expr;
use datafusion::physical_plan::collect;
use datafusion::physical_plan::file_format::{FileScanConfig, ParquetExec};
use datafusion::physical_plan::filter::FilterExec;
use datafusion::prelude::{col, SessionConfig, SessionContext};
use object_store::path::Path;
use object_store::ObjectMeta;
use parquet::arrow::ArrowWriter;
use parquet::file::properties::WriterProperties;
use std::fs::File;
use parquet_test_utils::{ParquetScanOptions, TestParquetFile};
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Instant;
use structopt::StructOpt;
use test_utils::AccessLogGenerator;
Expand Down Expand Up @@ -89,34 +73,16 @@ async fn main() -> Result<()> {

let path = opt.path.join("logs.parquet");

let (schema, object_store_url, object_meta) =
gen_data(path, opt.scale_factor, opt.page_size, opt.row_group_size)?;
let test_file = gen_data(path, opt.scale_factor, opt.page_size, opt.row_group_size)?;

run_benchmarks(
&mut ctx,
schema,
object_store_url,
object_meta,
opt.iterations,
opt.debug,
)
.await?;
run_benchmarks(&mut ctx, &test_file, opt.iterations, opt.debug).await?;

Ok(())
}

#[derive(Debug, Clone)]
struct ParquetScanOptions {
pushdown_filters: bool,
reorder_filters: bool,
enable_page_index: bool,
}

async fn run_benchmarks(
ctx: &mut SessionContext,
schema: SchemaRef,
object_store_url: ObjectStoreUrl,
object_meta: ObjectMeta,
test_file: &TestParquetFile,
iterations: usize,
debug: bool,
) -> Result<()> {
Expand Down Expand Up @@ -174,9 +140,7 @@ async fn run_benchmarks(
let start = Instant::now();
let rows = exec_scan(
ctx,
schema.clone(),
object_store_url.clone(),
object_meta.clone(),
test_file,
filter_expr.clone(),
scan_options.clone(),
debug,
Expand All @@ -197,52 +161,12 @@ async fn run_benchmarks(

async fn exec_scan(
ctx: &SessionContext,
schema: SchemaRef,
object_store_url: ObjectStoreUrl,
object_meta: ObjectMeta,
test_file: &TestParquetFile,
filter: Expr,
scan_options: ParquetScanOptions,
debug: bool,
) -> Result<usize> {
let ParquetScanOptions {
pushdown_filters,
reorder_filters,
enable_page_index,
} = scan_options;

let mut config_options = ConfigOptions::new();
config_options.set_bool(OPT_PARQUET_PUSHDOWN_FILTERS, pushdown_filters);
config_options.set_bool(OPT_PARQUET_REORDER_FILTERS, reorder_filters);
config_options.set_bool(OPT_PARQUET_ENABLE_PAGE_INDEX, enable_page_index);

let scan_config = FileScanConfig {
object_store_url,
file_schema: schema.clone(),
file_groups: vec![vec![PartitionedFile {
object_meta,
partition_values: vec![],
range: None,
extensions: None,
}]],
statistics: Default::default(),
projection: None,
limit: None,
table_partition_cols: vec![],
config_options: config_options.into_shareable(),
};

let df_schema = schema.clone().to_dfschema()?;

let physical_filter_expr = create_physical_expr(
&filter,
&df_schema,
schema.as_ref(),
&ExecutionProps::default(),
)?;

let parquet_exec = Arc::new(ParquetExec::new(scan_config, Some(filter), None));

let exec = Arc::new(FilterExec::try_new(physical_filter_expr, parquet_exec)?);
let exec = test_file.create_scan(filter, scan_options).await?;

let task_ctx = ctx.task_ctx();
let result = collect(exec, task_ctx).await?;
Expand All @@ -258,53 +182,15 @@ fn gen_data(
scale_factor: f32,
page_size: Option<usize>,
row_group_size: Option<usize>,
) -> Result<(SchemaRef, ObjectStoreUrl, ObjectMeta)> {
) -> Result<TestParquetFile> {
let generator = AccessLogGenerator::new();

let file = File::create(&path).unwrap();

let mut props_builder = WriterProperties::builder();

if let Some(s) = page_size {
props_builder = props_builder
.set_data_pagesize_limit(s)
.set_write_batch_size(s);
}

if let Some(s) = row_group_size {
props_builder = props_builder.set_max_row_group_size(s);
}

let schema = generator.schema();
let mut writer =
ArrowWriter::try_new(file, schema.clone(), Some(props_builder.build())).unwrap();

let mut num_rows = 0;

let num_batches = 100_f32 * scale_factor;

for batch in generator.take(num_batches as usize) {
writer.write(&batch).unwrap();
writer.flush()?;
num_rows += batch.num_rows();
}
writer.close().unwrap();

println!("Generated test dataset with {} rows", num_rows);

let size = std::fs::metadata(&path)?.len() as usize;

let canonical_path = path.canonicalize()?;

let object_store_url =
ListingTableUrl::parse(canonical_path.to_str().unwrap_or_default())?
.object_store();

let object_meta = ObjectMeta {
location: Path::parse(canonical_path.to_str().unwrap_or_default())?,
last_modified: Default::default(),
size,
};

Ok((schema, object_store_url, object_meta))
TestParquetFile::try_new(
path,
generator.take(num_batches as usize),
page_size,
row_group_size,
)
}
1 change: 1 addition & 0 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ csv = "1.1.6"
ctor = "0.1.22"
doc-comment = "0.3"
env_logger = "0.9"
parquet-test-utils = { path = "../../parquet-test-utils" }
rstest = "0.15.0"
test-utils = { path = "../../test-utils" }

Expand Down

0 comments on commit 43af7a9

Please sign in to comment.