diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml index 7367e9682be7..c0734450874b 100644 --- a/benchmarks/Cargo.toml +++ b/benchmarks/Cargo.toml @@ -32,11 +32,14 @@ simd = ["datafusion/simd"] snmalloc = ["snmalloc-rs"] [dependencies] +arrow = "24.0.0" datafusion = { path = "../datafusion/core" } env_logger = "0.9" futures = "0.3" mimalloc = { version = "0.1", optional = true, default-features = false } num_cpus = "1.13.0" +object_store = "0.5.0" +parquet = "24.0.0" rand = "0.8.4" serde = { version = "1.0.136", features = ["derive"] } serde_json = "1.0.78" diff --git a/benchmarks/README.md b/benchmarks/README.md index 505469fc5ea7..97a0bd4c6bf3 100644 --- a/benchmarks/README.md +++ b/benchmarks/README.md @@ -126,3 +126,37 @@ h2o groupby query 1 took 1669 ms [1]: http://www.tpc.org/tpch/ [2]: https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page + +## Parquet filter pushdown benchmarks + +This is a set of benchmarks for testing and verifying performance of parquet filter pushdown. The queries are executed on +a synthetic dataset generated during the benchmark execution and designed to simulate web server access logs. + +```base +cargo run --release --bin parquet_filter_pushdown -- --path ./data --scale-factor 1.0 +``` + +This will generate the synthetic dataset at `./data/logs.parquet`. The size of the dataset can be controlled through the `size_factor` +(with the default value of `1.0` generating a ~1GB parquet file). + +For each filter we will run the query using different `ParquetScanOption` settings. + +Example run: +``` +Running benchmarks with the following options: Opt { debug: false, iterations: 3, partitions: 2, path: "./data", batch_size: 8192, scale_factor: 1.0 } +Generated test dataset with 10699521 rows +Executing with filter 'request_method = Utf8("GET")' +Using scan options ParquetScanOptions { pushdown_filters: false, reorder_predicates: false, enable_page_index: false } +Iteration 0 returned 10699521 rows in 1303 ms +Iteration 1 returned 10699521 rows in 1288 ms +Iteration 2 returned 10699521 rows in 1266 ms +Using scan options ParquetScanOptions { pushdown_filters: true, reorder_predicates: true, enable_page_index: true } +Iteration 0 returned 1781686 rows in 1970 ms +Iteration 1 returned 1781686 rows in 2002 ms +Iteration 2 returned 1781686 rows in 1988 ms +Using scan options ParquetScanOptions { pushdown_filters: true, reorder_predicates: false, enable_page_index: true } +Iteration 0 returned 1781686 rows in 1940 ms +Iteration 1 returned 1781686 rows in 1986 ms +Iteration 2 returned 1781686 rows in 1947 ms +... +``` \ No newline at end of file diff --git a/benchmarks/src/bin/parquet_filter_pushdown.rs b/benchmarks/src/bin/parquet_filter_pushdown.rs new file mode 100644 index 000000000000..e3b365d4fade --- /dev/null +++ b/benchmarks/src/bin/parquet_filter_pushdown.rs @@ -0,0 +1,487 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::array::{ + Int32Builder, StringBuilder, StringDictionaryBuilder, TimestampNanosecondBuilder, + UInt16Builder, +}; +use arrow::datatypes::{DataType, Field, Int32Type, Schema, SchemaRef, TimeUnit}; +use arrow::record_batch::RecordBatch; +use arrow::util::pretty; +use datafusion::common::Result; +use datafusion::datasource::listing::{ListingTableUrl, PartitionedFile}; +use datafusion::datasource::object_store::ObjectStoreUrl; +use datafusion::execution::context::ExecutionProps; +use datafusion::logical_expr::{lit, or, Expr}; +use datafusion::logical_plan::ToDFSchema; +use datafusion::physical_expr::create_physical_expr; +use datafusion::physical_plan::collect; +use datafusion::physical_plan::file_format::{ + FileScanConfig, ParquetExec, ParquetScanOptions, +}; +use datafusion::physical_plan::filter::FilterExec; +use datafusion::prelude::{col, combine_filters, SessionConfig, SessionContext}; +use object_store::path::Path; +use object_store::ObjectMeta; +use parquet::arrow::ArrowWriter; +use parquet::file::properties::WriterProperties; +use rand::rngs::StdRng; +use rand::{Rng, SeedableRng}; +use std::fs::File; +use std::ops::Range; +use std::path::PathBuf; +use std::sync::Arc; +use std::time::Instant; +use structopt::StructOpt; + +#[cfg(feature = "snmalloc")] +#[global_allocator] +static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc; + +#[derive(Debug, StructOpt)] +#[structopt(name = "Benchmarks", about = "Apache Arrow Rust Benchmarks.")] +struct Opt { + /// Activate debug mode to see query results + #[structopt(short, long)] + debug: bool, + + /// Number of iterations of each test run + #[structopt(short = "i", long = "iterations", default_value = "3")] + iterations: usize, + + /// Number of partitions to process in parallel + #[structopt(long = "partitions", default_value = "2")] + partitions: usize, + + /// Path to folder where access log file will be generated + #[structopt(parse(from_os_str), required = true, short = "p", long = "path")] + path: PathBuf, + + /// Data page size of the generated parquet file + #[structopt(long = "page-size")] + page_size: Option, + + /// Data page size of the generated parquet file + #[structopt(long = "row-group-size")] + row_group_size: Option, + + /// Total size of generated dataset. The default scale factor of 1.0 will generate a roughly 1GB parquet file + #[structopt(short = "s", long = "scale-factor", default_value = "1.0")] + scale_factor: f32, +} + +#[tokio::main] +async fn main() -> Result<()> { + let opt: Opt = Opt::from_args(); + println!("Running benchmarks with the following options: {:?}", opt); + + let config = SessionConfig::new().with_target_partitions(opt.partitions); + let mut ctx = SessionContext::with_config(config); + + let path = opt.path.join("logs.parquet"); + + let (object_store_url, object_meta) = + gen_data(path, opt.scale_factor, opt.page_size, opt.row_group_size)?; + + run_benchmarks( + &mut ctx, + object_store_url.clone(), + object_meta.clone(), + opt.iterations, + opt.debug, + ) + .await?; + + Ok(()) +} + +async fn run_benchmarks( + ctx: &mut SessionContext, + object_store_url: ObjectStoreUrl, + object_meta: ObjectMeta, + iterations: usize, + debug: bool, +) -> Result<()> { + let scan_options_matrix = vec![ + ParquetScanOptions::default(), + ParquetScanOptions::default() + .with_page_index(true) + .with_pushdown_filters(true) + .with_reorder_predicates(true), + ParquetScanOptions::default() + .with_page_index(true) + .with_pushdown_filters(true) + .with_reorder_predicates(false), + ]; + + let filter_matrix = vec![ + // Selective-ish filter + col("request_method").eq(lit("GET")), + // Non-selective filter + col("request_method").not_eq(lit("GET")), + // Basic conjunction + col("request_method") + .eq(lit("POST")) + .and(col("response_status").eq(lit(503_u16))), + // Nested filters + col("request_method").eq(lit("POST")).and(or( + col("response_status").eq(lit(503_u16)), + col("response_status").eq(lit(403_u16)), + )), + // Many filters + combine_filters(&[ + col("request_method").not_eq(lit("GET")), + col("response_status").eq(lit(400_u16)), + // TODO this fails in the FilterExec with Error: Internal("The type of Dictionary(Int32, Utf8) = Utf8 of binary physical should be same") + // col("service").eq(lit("backend")), + ]) + .unwrap(), + // Filter everything + col("response_status").eq(lit(429_u16)), + // Filter nothing + col("response_status").gt(lit(0_u16)), + ]; + + for filter_expr in &filter_matrix { + println!("Executing with filter '{}'", filter_expr); + for scan_options in &scan_options_matrix { + println!("Using scan options {:?}", scan_options); + for i in 0..iterations { + let start = Instant::now(); + let rows = exec_scan( + ctx, + object_store_url.clone(), + object_meta.clone(), + filter_expr.clone(), + scan_options.clone(), + debug, + ) + .await?; + println!( + "Iteration {} returned {} rows in {} ms", + i, + rows, + start.elapsed().as_millis() + ); + } + } + println!("\n"); + } + Ok(()) +} + +async fn exec_scan( + ctx: &SessionContext, + object_store_url: ObjectStoreUrl, + object_meta: ObjectMeta, + filter: Expr, + scan_options: ParquetScanOptions, + debug: bool, +) -> Result { + let schema = BatchBuilder::schema(); + let scan_config = FileScanConfig { + object_store_url, + file_schema: schema.clone(), + file_groups: vec![vec![PartitionedFile { + object_meta, + partition_values: vec![], + range: None, + extensions: None, + }]], + statistics: Default::default(), + projection: None, + limit: None, + table_partition_cols: vec![], + }; + + let df_schema = schema.clone().to_dfschema()?; + + let physical_filter_expr = create_physical_expr( + &filter, + &df_schema, + schema.as_ref(), + &ExecutionProps::default(), + )?; + + let parquet_exec = Arc::new( + ParquetExec::new(scan_config, Some(filter), None).with_scan_options(scan_options), + ); + + let exec = Arc::new(FilterExec::try_new(physical_filter_expr, parquet_exec)?); + + let task_ctx = ctx.task_ctx(); + let result = collect(exec, task_ctx).await?; + + if debug { + pretty::print_batches(&result)?; + } + Ok(result.iter().map(|b| b.num_rows()).sum()) +} + +fn gen_data( + path: PathBuf, + scale_factor: f32, + page_size: Option, + row_group_size: Option, +) -> Result<(ObjectStoreUrl, ObjectMeta)> { + let generator = Generator::new(); + + let file = File::create(&path).unwrap(); + + let mut props_builder = WriterProperties::builder(); + + if let Some(s) = page_size { + props_builder = props_builder + .set_data_pagesize_limit(s) + .set_write_batch_size(s); + } + + if let Some(s) = row_group_size { + props_builder = props_builder.set_max_row_group_size(s); + } + + let mut writer = + ArrowWriter::try_new(file, generator.schema.clone(), Some(props_builder.build())) + .unwrap(); + + let mut num_rows = 0; + + let num_batches = 100_f32 * scale_factor; + + for batch in generator.take(num_batches as usize) { + writer.write(&batch).unwrap(); + writer.flush()?; + num_rows += batch.num_rows(); + } + writer.close().unwrap(); + + println!("Generated test dataset with {} rows", num_rows); + + let size = std::fs::metadata(&path)?.len() as usize; + + let canonical_path = path.canonicalize()?; + + let object_store_url = + ListingTableUrl::parse(canonical_path.to_str().unwrap_or_default())? + .object_store(); + + let object_meta = ObjectMeta { + location: Path::parse(canonical_path.to_str().unwrap_or_default())?, + last_modified: Default::default(), + size, + }; + + Ok((object_store_url, object_meta)) +} + +#[derive(Default)] +struct BatchBuilder { + service: StringDictionaryBuilder, + host: StringDictionaryBuilder, + pod: StringDictionaryBuilder, + container: StringDictionaryBuilder, + image: StringDictionaryBuilder, + time: TimestampNanosecondBuilder, + client_addr: StringBuilder, + request_duration: Int32Builder, + request_user_agent: StringBuilder, + request_method: StringBuilder, + request_host: StringBuilder, + request_bytes: Int32Builder, + response_bytes: Int32Builder, + response_status: UInt16Builder, +} + +impl BatchBuilder { + fn schema() -> SchemaRef { + let utf8_dict = + || DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)); + + Arc::new(Schema::new(vec![ + Field::new("service", utf8_dict(), true), + Field::new("host", utf8_dict(), false), + Field::new("pod", utf8_dict(), false), + Field::new("container", utf8_dict(), false), + Field::new("image", utf8_dict(), false), + Field::new( + "time", + DataType::Timestamp(TimeUnit::Nanosecond, None), + false, + ), + Field::new("client_addr", DataType::Utf8, true), + Field::new("request_duration_ns", DataType::Int32, false), + Field::new("request_user_agent", DataType::Utf8, true), + Field::new("request_method", DataType::Utf8, true), + Field::new("request_host", DataType::Utf8, true), + Field::new("request_bytes", DataType::Int32, true), + Field::new("response_bytes", DataType::Int32, true), + Field::new("response_status", DataType::UInt16, false), + ])) + } + + fn append(&mut self, rng: &mut StdRng, host: &str, service: &str) { + let num_pods = rng.gen_range(1..15); + let pods = generate_sorted_strings(rng, num_pods, 30..40); + for pod in pods { + for container_idx in 0..rng.gen_range(1..3) { + let container = format!("{}_container_{}", service, container_idx); + let image = format!( + "{}@sha256:30375999bf03beec2187843017b10c9e88d8b1a91615df4eb6350fb39472edd9", + container + ); + + let num_entries = rng.gen_range(1024..8192); + for i in 0..num_entries { + let time = i as i64 * 1024; + self.append_row(rng, host, &pod, service, &container, &image, time); + } + } + } + } + + #[allow(clippy::too_many_arguments)] + fn append_row( + &mut self, + rng: &mut StdRng, + host: &str, + pod: &str, + service: &str, + container: &str, + image: &str, + time: i64, + ) { + let methods = &["GET", "PUT", "POST", "HEAD", "PATCH", "DELETE"]; + let status = &[200, 204, 400, 503, 403]; + + self.service.append(service).unwrap(); + self.host.append(host).unwrap(); + self.pod.append(pod).unwrap(); + self.container.append(container).unwrap(); + self.image.append(image).unwrap(); + self.time.append_value(time); + + self.client_addr.append_value(format!( + "{}.{}.{}.{}", + rng.gen::(), + rng.gen::(), + rng.gen::(), + rng.gen::() + )); + self.request_duration.append_value(rng.gen()); + self.request_user_agent + .append_value(random_string(rng, 20..100)); + self.request_method + .append_value(methods[rng.gen_range(0..methods.len())]); + self.request_host + .append_value(format!("https://{}.mydomain.com", service)); + + self.request_bytes + .append_option(rng.gen_bool(0.9).then(|| rng.gen())); + self.response_bytes + .append_option(rng.gen_bool(0.9).then(|| rng.gen())); + self.response_status + .append_value(status[rng.gen_range(0..status.len())]); + } + + fn finish(mut self, schema: SchemaRef) -> RecordBatch { + RecordBatch::try_new( + schema, + vec![ + Arc::new(self.service.finish()), + Arc::new(self.host.finish()), + Arc::new(self.pod.finish()), + Arc::new(self.container.finish()), + Arc::new(self.image.finish()), + Arc::new(self.time.finish()), + Arc::new(self.client_addr.finish()), + Arc::new(self.request_duration.finish()), + Arc::new(self.request_user_agent.finish()), + Arc::new(self.request_method.finish()), + Arc::new(self.request_host.finish()), + Arc::new(self.request_bytes.finish()), + Arc::new(self.response_bytes.finish()), + Arc::new(self.response_status.finish()), + ], + ) + .unwrap() + } +} + +fn random_string(rng: &mut StdRng, len_range: Range) -> String { + let len = rng.gen_range(len_range); + (0..len) + .map(|_| rng.gen_range(b'a'..=b'z') as char) + .collect::() +} + +fn generate_sorted_strings( + rng: &mut StdRng, + count: usize, + str_len: Range, +) -> Vec { + let mut strings: Vec<_> = (0..count) + .map(|_| random_string(rng, str_len.clone())) + .collect(); + + strings.sort_unstable(); + strings +} + +/// Generates sorted RecordBatch with an access log style schema for a single host +#[derive(Debug)] +struct Generator { + schema: SchemaRef, + rng: StdRng, + host_idx: usize, +} + +impl Generator { + fn new() -> Self { + let seed = [ + 1, 0, 0, 0, 23, 0, 3, 0, 200, 1, 0, 0, 210, 30, 8, 0, 1, 0, 21, 0, 6, 0, 0, + 0, 0, 0, 5, 0, 0, 0, 0, 0, + ]; + + Self { + schema: BatchBuilder::schema(), + host_idx: 0, + rng: StdRng::from_seed(seed), + } + } +} + +impl Iterator for Generator { + type Item = RecordBatch; + + fn next(&mut self) -> Option { + let mut builder = BatchBuilder::default(); + + let host = format!( + "i-{:016x}.ec2.internal", + self.host_idx * 0x7d87f8ed5c5 + 0x1ec3ca3151468928 + ); + self.host_idx += 1; + + for service in &["frontend", "backend", "database", "cache"] { + if self.rng.gen_bool(0.5) { + continue; + } + builder.append(&mut self.rng, &host, service); + } + Some(builder.finish(Arc::clone(&self.schema))) + } +}