From 95afefafd6172c0579daf6b8b660b8b6966e1315 Mon Sep 17 00:00:00 2001 From: Dan Harris Date: Sat, 8 Oct 2022 07:57:14 -0400 Subject: [PATCH 1/6] Add benchmarks for testing row filtering --- benchmarks/Cargo.toml | 2 + benchmarks/README.md | 34 +++ benchmarks/src/bin/access_log.rs | 356 +++++++++++++++++++++++++++++++ 3 files changed, 392 insertions(+) create mode 100644 benchmarks/src/bin/access_log.rs diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml index 7367e9682be7..868475b6f295 100644 --- a/benchmarks/Cargo.toml +++ b/benchmarks/Cargo.toml @@ -32,11 +32,13 @@ simd = ["datafusion/simd"] snmalloc = ["snmalloc-rs"] [dependencies] +arrow = "24.0.0" datafusion = { path = "../datafusion/core" } env_logger = "0.9" futures = "0.3" mimalloc = { version = "0.1", optional = true, default-features = false } num_cpus = "1.13.0" +parquet = "24.0.0" rand = "0.8.4" serde = { version = "1.0.136", features = ["derive"] } serde_json = "1.0.78" diff --git a/benchmarks/README.md b/benchmarks/README.md index 505469fc5ea7..1a08ac358987 100644 --- a/benchmarks/README.md +++ b/benchmarks/README.md @@ -126,3 +126,37 @@ h2o groupby query 1 took 1669 ms [1]: http://www.tpc.org/tpch/ [2]: https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page + +## Access Logs benchmarks + +This is a set of benchmarks for testing and verifying performance of parquet row filter pushdown. The queries are executed on +a synthetic dataset generated during the benchmark execution and designed to simulate web server access logs. + +```base +cargo run --release --bin access_logs --query --path ./data --scale-factor 1.0 +``` + +This will generate the synthetic dataset at `./data/logs.parquet`. The size of the dataset can be controlled through the `size_factor` +(with the default value of `1.0` generating a ~1GB parquet file). + +Example run: +``` +Running benchmarks with the following options: Opt { debug: false, iterations: 3, partitions: 2, path: "./data", batch_size: 8192, scale_factor: 1.0 } +Generated test dataset with 1266707 rows +Executing 'get_requests' +Query 'get_requests' iteration 0 returned 211247 rows in 677 ms +Query 'get_requests' iteration 1 returned 211247 rows in 661 ms +Query 'get_requests' iteration 2 returned 211247 rows in 699 ms +Executing 'get_requests_ignore_body' +Query 'get_requests_ignore_body' iteration 0 returned 211247 rows in 155 ms +Query 'get_requests_ignore_body' iteration 1 returned 211247 rows in 159 ms +Query 'get_requests_ignore_body' iteration 2 returned 211247 rows in 153 ms +Executing 'get_post_503' +Query 'get_post_503' iteration 0 returned 42350 rows in 650 ms +Query 'get_post_503' iteration 1 returned 42350 rows in 659 ms +Query 'get_post_503' iteration 2 returned 42350 rows in 706 ms +Executing 'get_post_503_ignore_body' +Query 'get_post_503_ignore_body' iteration 0 returned 42350 rows in 155 ms +Query 'get_post_503_ignore_body' iteration 1 returned 42350 rows in 151 ms +Query 'get_post_503_ignore_body' iteration 2 returned 42350 rows in 157 ms +``` \ No newline at end of file diff --git a/benchmarks/src/bin/access_log.rs b/benchmarks/src/bin/access_log.rs new file mode 100644 index 000000000000..2867c03658a8 --- /dev/null +++ b/benchmarks/src/bin/access_log.rs @@ -0,0 +1,356 @@ +use arrow::array::{ + Int32Builder, StringBuilder, StringDictionaryBuilder, TimestampNanosecondBuilder, + UInt16Builder, +}; +use arrow::datatypes::{DataType, Field, Int32Type, Schema, SchemaRef, TimeUnit}; +use arrow::record_batch::RecordBatch; +use arrow::util::pretty; +use datafusion::common::Result; +use datafusion::physical_plan::collect; +use datafusion::prelude::{ParquetReadOptions, SessionConfig, SessionContext}; +use parquet::arrow::ArrowWriter; +use rand::rngs::StdRng; +use rand::{Rng, SeedableRng}; +use std::collections::HashMap; +use std::fs::File; +use std::ops::Range; +use std::path::PathBuf; +use std::sync::Arc; +use std::time::Instant; +use structopt::StructOpt; + +#[cfg(feature = "snmalloc")] +#[global_allocator] +static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc; + +#[derive(Debug, StructOpt)] +#[structopt(name = "Benchmarks", about = "Apache Arrow Rust Benchmarks.")] +struct Opt { + /// Activate debug mode to see query results + #[structopt(short, long)] + debug: bool, + + /// Number of iterations of each test run + #[structopt(short = "i", long = "iterations", default_value = "3")] + iterations: usize, + + /// Number of partitions to process in parallel + #[structopt(long = "partitions", default_value = "2")] + partitions: usize, + + /// Path to folder where access log file will be generated + #[structopt(parse(from_os_str), required = true, short = "p", long = "path")] + path: PathBuf, + + /// Batch size when reading Parquet files + #[structopt(short = "s", long = "batch-size", default_value = "8192")] + batch_size: usize, + + /// Total size of generated dataset. The default scale factor of 1.0 will generate a roughly 1GB parquet file + #[structopt(short = "s", long = "scale-factor", default_value = "1.0")] + scale_factor: f32, +} + +#[tokio::main] +async fn main() -> Result<()> { + let opt: Opt = Opt::from_args(); + println!("Running benchmarks with the following options: {:?}", opt); + + let config = SessionConfig::new() + .with_target_partitions(opt.partitions) + .with_batch_size(opt.batch_size); + let mut ctx = SessionContext::with_config(config); + + let path = opt.path.join("logs.parquet"); + + gen_data(&path, opt.scale_factor); + + ctx.register_parquet( + "logs", + path.to_str().unwrap(), + ParquetReadOptions::default(), + ) + .await?; + + datafusion_sql_benchmarks(&mut ctx, opt.iterations, opt.debug).await?; + + Ok(()) +} + +async fn datafusion_sql_benchmarks( + ctx: &mut SessionContext, + iterations: usize, + debug: bool, +) -> Result<()> { + let mut queries = HashMap::new(); + queries.insert( + "get_requests", + "SELECT * FROM logs WHERE request_method = 'GET'", + ); + queries.insert( + "get_requests_ignore_body", + "SELECT service,host,pod,container,image,time,client_addr,request_duration_ns,request_user_agent,request_method,request_host,request_bytes,response_status,response_bytes FROM logs WHERE request_method = 'GET'" + ); + queries.insert( + "get_post_503", + "SELECT * FROM logs WHERE request_method = 'POST' AND response_status = 503", + ); + queries.insert( + "get_post_503_ignore_body", + "SELECT service,host,pod,container,image,time,client_addr,request_duration_ns,request_user_agent,request_method,request_host,request_bytes,response_status,response_bytes FROM logs WHERE request_method = 'POST' AND response_status = 503", + ); + for (name, sql) in &queries { + println!("Executing '{}'", name); + for i in 0..iterations { + let start = Instant::now(); + let rows = execute_sql(ctx, sql, debug).await?; + println!( + "Query '{}' iteration {} returned {} rows in {} ms", + name, + i, + rows, + start.elapsed().as_millis() + ); + } + } + Ok(()) +} + +async fn execute_sql(ctx: &SessionContext, sql: &str, debug: bool) -> Result { + let plan = ctx.create_logical_plan(sql)?; + let plan = ctx.optimize(&plan)?; + if debug { + println!("Optimized logical plan:\n{:?}", plan); + } + let physical_plan = ctx.create_physical_plan(&plan).await?; + let task_ctx = ctx.task_ctx(); + let result = collect(physical_plan, task_ctx).await?; + + if debug { + pretty::print_batches(&result)?; + } + Ok(result.iter().map(|b| b.num_rows()).sum()) +} + +fn gen_data(path: &PathBuf, scale_factor: f32) { + let generator = Generator::new(); + + let file = File::create(path).unwrap(); + let mut writer = ArrowWriter::try_new(file, generator.schema.clone(), None).unwrap(); + + let mut num_rows = 0; + + let num_batches = 12_f32 * scale_factor; + + for batch in generator.take(num_batches as usize) { + writer.write(&batch).unwrap(); + num_rows += batch.num_rows(); + } + writer.close().unwrap(); + + println!("Generated test dataset with {} rows", num_rows); +} + +#[derive(Default)] +struct BatchBuilder { + service: StringDictionaryBuilder, + host: StringDictionaryBuilder, + pod: StringDictionaryBuilder, + container: StringDictionaryBuilder, + image: StringDictionaryBuilder, + time: TimestampNanosecondBuilder, + client_addr: StringBuilder, + request_duration: Int32Builder, + request_user_agent: StringBuilder, + request_method: StringBuilder, + request_host: StringBuilder, + request_bytes: Int32Builder, + response_bytes: Int32Builder, + response_status: UInt16Builder, + response_body: StringBuilder, +} + +impl BatchBuilder { + fn schema() -> SchemaRef { + let utf8_dict = + || DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)); + + Arc::new(Schema::new(vec![ + Field::new("service", utf8_dict(), true), + Field::new("host", utf8_dict(), false), + Field::new("pod", utf8_dict(), false), + Field::new("container", utf8_dict(), false), + Field::new("image", utf8_dict(), false), + Field::new( + "time", + DataType::Timestamp(TimeUnit::Nanosecond, None), + false, + ), + Field::new("client_addr", DataType::Utf8, true), + Field::new("request_duration_ns", DataType::Int32, false), + Field::new("request_user_agent", DataType::Utf8, true), + Field::new("request_method", DataType::Utf8, true), + Field::new("request_host", DataType::Utf8, true), + Field::new("request_bytes", DataType::Int32, true), + Field::new("response_bytes", DataType::Int32, true), + Field::new("response_status", DataType::UInt16, false), + // This column will contain large values relative to the others + Field::new("response_body", DataType::Utf8, false), + ])) + } + + fn append(&mut self, rng: &mut StdRng, host: &str, service: &str) { + let num_pods = rng.gen_range(1..15); + let pods = generate_sorted_strings(rng, num_pods, 30..40); + for pod in pods { + for container_idx in 0..rng.gen_range(1..3) { + let container = format!("{}_container_{}", service, container_idx); + let image = format!( + "{}@sha256:30375999bf03beec2187843017b10c9e88d8b1a91615df4eb6350fb39472edd9", + container + ); + + let num_entries = rng.gen_range(1024..8192); + for i in 0..num_entries { + let time = i as i64 * 1024; + self.append_row(rng, host, &pod, service, &container, &image, time); + } + } + } + } + + #[allow(clippy::too_many_arguments)] + fn append_row( + &mut self, + rng: &mut StdRng, + host: &str, + pod: &str, + service: &str, + container: &str, + image: &str, + time: i64, + ) { + let methods = &["GET", "PUT", "POST", "HEAD", "PATCH", "DELETE"]; + let status = &[200, 204, 400, 503, 403]; + + self.service.append(service).unwrap(); + self.host.append(host).unwrap(); + self.pod.append(pod).unwrap(); + self.container.append(container).unwrap(); + self.image.append(image).unwrap(); + self.time.append_value(time); + + self.client_addr.append_value(format!( + "{}.{}.{}.{}", + rng.gen::(), + rng.gen::(), + rng.gen::(), + rng.gen::() + )); + self.request_duration.append_value(rng.gen()); + self.request_user_agent + .append_value(random_string(rng, 20..100)); + self.request_method + .append_value(methods[rng.gen_range(0..methods.len())]); + self.request_host + .append_value(format!("https://{}.mydomain.com", service)); + + self.request_bytes + .append_option(rng.gen_bool(0.9).then(|| rng.gen())); + self.response_bytes + .append_option(rng.gen_bool(0.9).then(|| rng.gen())); + self.response_status + .append_value(status[rng.gen_range(0..status.len())]); + self.response_body + .append_value(random_string(rng, 200..2000)) + } + + fn finish(mut self, schema: SchemaRef) -> RecordBatch { + RecordBatch::try_new( + schema, + vec![ + Arc::new(self.service.finish()), + Arc::new(self.host.finish()), + Arc::new(self.pod.finish()), + Arc::new(self.container.finish()), + Arc::new(self.image.finish()), + Arc::new(self.time.finish()), + Arc::new(self.client_addr.finish()), + Arc::new(self.request_duration.finish()), + Arc::new(self.request_user_agent.finish()), + Arc::new(self.request_method.finish()), + Arc::new(self.request_host.finish()), + Arc::new(self.request_bytes.finish()), + Arc::new(self.response_bytes.finish()), + Arc::new(self.response_status.finish()), + Arc::new(self.response_body.finish()), + ], + ) + .unwrap() + } +} + +fn random_string(rng: &mut StdRng, len_range: Range) -> String { + let len = rng.gen_range(len_range); + (0..len) + .map(|_| rng.gen_range(b'a'..=b'z') as char) + .collect::() +} + +fn generate_sorted_strings( + rng: &mut StdRng, + count: usize, + str_len: Range, +) -> Vec { + let mut strings: Vec<_> = (0..count) + .map(|_| random_string(rng, str_len.clone())) + .collect(); + + strings.sort_unstable(); + strings +} + +/// Generates sorted RecordBatch with an access log style schema for a single host +#[derive(Debug)] +struct Generator { + schema: SchemaRef, + rng: StdRng, + host_idx: usize, +} + +impl Generator { + fn new() -> Self { + let seed = [ + 1, 0, 0, 0, 23, 0, 3, 0, 200, 1, 0, 0, 210, 30, 8, 0, 1, 0, 21, 0, 6, 0, 0, + 0, 0, 0, 5, 0, 0, 0, 0, 0, + ]; + + Self { + schema: BatchBuilder::schema(), + host_idx: 0, + rng: StdRng::from_seed(seed), + } + } +} + +impl Iterator for Generator { + type Item = RecordBatch; + + fn next(&mut self) -> Option { + let mut builder = BatchBuilder::default(); + + let host = format!( + "i-{:016x}.ec2.internal", + self.host_idx * 0x7d87f8ed5c5 + 0x1ec3ca3151468928 + ); + self.host_idx += 1; + + for service in &["frontend", "backend", "database", "cache"] { + if self.rng.gen_bool(0.5) { + continue; + } + builder.append(&mut self.rng, &host, service); + } + Some(builder.finish(Arc::clone(&self.schema))) + } +} From 66c10763e64798afbc28250bd9359a41abf8b2f2 Mon Sep 17 00:00:00 2001 From: Dan Harris Date: Sat, 8 Oct 2022 09:38:53 -0400 Subject: [PATCH 2/6] Rework to test everything at once --- benchmarks/Cargo.toml | 1 + benchmarks/README.md | 40 ++-- ...cess_log.rs => parquet_filter_pushdown.rs} | 183 ++++++++++++------ 3 files changed, 148 insertions(+), 76 deletions(-) rename benchmarks/src/bin/{access_log.rs => parquet_filter_pushdown.rs} (68%) diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml index 868475b6f295..c0734450874b 100644 --- a/benchmarks/Cargo.toml +++ b/benchmarks/Cargo.toml @@ -38,6 +38,7 @@ env_logger = "0.9" futures = "0.3" mimalloc = { version = "0.1", optional = true, default-features = false } num_cpus = "1.13.0" +object_store = "0.5.0" parquet = "24.0.0" rand = "0.8.4" serde = { version = "1.0.136", features = ["derive"] } diff --git a/benchmarks/README.md b/benchmarks/README.md index 1a08ac358987..7cb8fedafc32 100644 --- a/benchmarks/README.md +++ b/benchmarks/README.md @@ -127,36 +127,36 @@ h2o groupby query 1 took 1669 ms [1]: http://www.tpc.org/tpch/ [2]: https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page -## Access Logs benchmarks +## Parquet filter pushdown benchmarks -This is a set of benchmarks for testing and verifying performance of parquet row filter pushdown. The queries are executed on +This is a set of benchmarks for testing and verifying performance of parquet filter pushdown. The queries are executed on a synthetic dataset generated during the benchmark execution and designed to simulate web server access logs. ```base -cargo run --release --bin access_logs --query --path ./data --scale-factor 1.0 +cargo run --release --bin parquet_filter_pushdown --query --path ./data --scale-factor 1.0 ``` This will generate the synthetic dataset at `./data/logs.parquet`. The size of the dataset can be controlled through the `size_factor` (with the default value of `1.0` generating a ~1GB parquet file). +For each filter we will run the query using different `ParquetScanOption` settings. + Example run: ``` Running benchmarks with the following options: Opt { debug: false, iterations: 3, partitions: 2, path: "./data", batch_size: 8192, scale_factor: 1.0 } -Generated test dataset with 1266707 rows -Executing 'get_requests' -Query 'get_requests' iteration 0 returned 211247 rows in 677 ms -Query 'get_requests' iteration 1 returned 211247 rows in 661 ms -Query 'get_requests' iteration 2 returned 211247 rows in 699 ms -Executing 'get_requests_ignore_body' -Query 'get_requests_ignore_body' iteration 0 returned 211247 rows in 155 ms -Query 'get_requests_ignore_body' iteration 1 returned 211247 rows in 159 ms -Query 'get_requests_ignore_body' iteration 2 returned 211247 rows in 153 ms -Executing 'get_post_503' -Query 'get_post_503' iteration 0 returned 42350 rows in 650 ms -Query 'get_post_503' iteration 1 returned 42350 rows in 659 ms -Query 'get_post_503' iteration 2 returned 42350 rows in 706 ms -Executing 'get_post_503_ignore_body' -Query 'get_post_503_ignore_body' iteration 0 returned 42350 rows in 155 ms -Query 'get_post_503_ignore_body' iteration 1 returned 42350 rows in 151 ms -Query 'get_post_503_ignore_body' iteration 2 returned 42350 rows in 157 ms +Generated test dataset with 10699521 rows +Executing with filter 'request_method = Utf8("GET")' +Using scan options ParquetScanOptions { pushdown_filters: false, reorder_predicates: false, enable_page_index: false } +Iteration 0 returned 10699521 rows in 1303 ms +Iteration 1 returned 10699521 rows in 1288 ms +Iteration 2 returned 10699521 rows in 1266 ms +Using scan options ParquetScanOptions { pushdown_filters: true, reorder_predicates: true, enable_page_index: true } +Iteration 0 returned 1781686 rows in 1970 ms +Iteration 1 returned 1781686 rows in 2002 ms +Iteration 2 returned 1781686 rows in 1988 ms +Using scan options ParquetScanOptions { pushdown_filters: true, reorder_predicates: false, enable_page_index: true } +Iteration 0 returned 1781686 rows in 1940 ms +Iteration 1 returned 1781686 rows in 1986 ms +Iteration 2 returned 1781686 rows in 1947 ms +... ``` \ No newline at end of file diff --git a/benchmarks/src/bin/access_log.rs b/benchmarks/src/bin/parquet_filter_pushdown.rs similarity index 68% rename from benchmarks/src/bin/access_log.rs rename to benchmarks/src/bin/parquet_filter_pushdown.rs index 2867c03658a8..5bf7c11ef70c 100644 --- a/benchmarks/src/bin/access_log.rs +++ b/benchmarks/src/bin/parquet_filter_pushdown.rs @@ -6,12 +6,19 @@ use arrow::datatypes::{DataType, Field, Int32Type, Schema, SchemaRef, TimeUnit}; use arrow::record_batch::RecordBatch; use arrow::util::pretty; use datafusion::common::Result; +use datafusion::datasource::listing::{ListingTableUrl, PartitionedFile}; +use datafusion::datasource::object_store::ObjectStoreUrl; +use datafusion::logical_expr::{lit, or, Expr}; use datafusion::physical_plan::collect; -use datafusion::prelude::{ParquetReadOptions, SessionConfig, SessionContext}; +use datafusion::physical_plan::file_format::{ + FileScanConfig, ParquetExec, ParquetScanOptions, +}; +use datafusion::prelude::{col, combine_filters, SessionConfig, SessionContext}; +use object_store::path::Path; +use object_store::ObjectMeta; use parquet::arrow::ArrowWriter; use rand::rngs::StdRng; use rand::{Rng, SeedableRng}; -use std::collections::HashMap; use std::fs::File; use std::ops::Range; use std::path::PathBuf; @@ -63,68 +70,122 @@ async fn main() -> Result<()> { let path = opt.path.join("logs.parquet"); - gen_data(&path, opt.scale_factor); + let (object_store_url, object_meta) = gen_data(path, opt.scale_factor)?; - ctx.register_parquet( - "logs", - path.to_str().unwrap(), - ParquetReadOptions::default(), + run_benchmarks( + &mut ctx, + object_store_url.clone(), + object_meta.clone(), + opt.iterations, + opt.debug, ) .await?; - datafusion_sql_benchmarks(&mut ctx, opt.iterations, opt.debug).await?; - Ok(()) } -async fn datafusion_sql_benchmarks( +async fn run_benchmarks( ctx: &mut SessionContext, + object_store_url: ObjectStoreUrl, + object_meta: ObjectMeta, iterations: usize, debug: bool, ) -> Result<()> { - let mut queries = HashMap::new(); - queries.insert( - "get_requests", - "SELECT * FROM logs WHERE request_method = 'GET'", - ); - queries.insert( - "get_requests_ignore_body", - "SELECT service,host,pod,container,image,time,client_addr,request_duration_ns,request_user_agent,request_method,request_host,request_bytes,response_status,response_bytes FROM logs WHERE request_method = 'GET'" - ); - queries.insert( - "get_post_503", - "SELECT * FROM logs WHERE request_method = 'POST' AND response_status = 503", - ); - queries.insert( - "get_post_503_ignore_body", - "SELECT service,host,pod,container,image,time,client_addr,request_duration_ns,request_user_agent,request_method,request_host,request_bytes,response_status,response_bytes FROM logs WHERE request_method = 'POST' AND response_status = 503", - ); - for (name, sql) in &queries { - println!("Executing '{}'", name); - for i in 0..iterations { - let start = Instant::now(); - let rows = execute_sql(ctx, sql, debug).await?; - println!( - "Query '{}' iteration {} returned {} rows in {} ms", - name, - i, - rows, - start.elapsed().as_millis() - ); + let scan_options_matrix = vec![ + ParquetScanOptions::default(), + ParquetScanOptions::default() + .with_page_index(true) + .with_pushdown_filters(true) + .with_reorder_predicates(true), + ParquetScanOptions::default() + .with_page_index(true) + .with_pushdown_filters(true) + .with_reorder_predicates(false), + ]; + + let filter_matrix = vec![ + // Selective-ish filter + col("request_method").eq(lit("GET")), + // Non-selective filter + col("request_method").not_eq(lit("GET")), + // Basic conjunction + col("request_method") + .eq(lit("POST")) + .and(col("response_status").eq(lit(503_u16))), + // Nested filters + col("request_method").eq(lit("POST")).and(or( + col("response_status").eq(lit(503_u16)), + col("response_status").eq(lit(403_u16)), + )), + // Many filters + combine_filters(&[ + col("request_method").not_eq(lit("GET")), + col("response_status").eq(lit(400_u16)), + col("service").eq(lit("backend")), + ]) + .unwrap(), + // Filter everything + col("response_status").eq(lit(429_u16)), + // Filter nothing + col("response_status").gt(lit(0_u16)), + ]; + + for filter_expr in &filter_matrix { + println!("Executing with filter '{}'", filter_expr); + for scan_options in &scan_options_matrix { + println!("Using scan options {:?}", scan_options); + for i in 0..iterations { + let start = Instant::now(); + let rows = exec_scan( + ctx, + object_store_url.clone(), + object_meta.clone(), + filter_expr.clone(), + scan_options.clone(), + debug, + ) + .await?; + println!( + "Iteration {} returned {} rows in {} ms", + i, + rows, + start.elapsed().as_millis() + ); + } } + println!("\n"); } Ok(()) } -async fn execute_sql(ctx: &SessionContext, sql: &str, debug: bool) -> Result { - let plan = ctx.create_logical_plan(sql)?; - let plan = ctx.optimize(&plan)?; - if debug { - println!("Optimized logical plan:\n{:?}", plan); - } - let physical_plan = ctx.create_physical_plan(&plan).await?; +async fn exec_scan( + ctx: &SessionContext, + object_store_url: ObjectStoreUrl, + object_meta: ObjectMeta, + filter: Expr, + scan_options: ParquetScanOptions, + debug: bool, +) -> Result { + let scan_config = FileScanConfig { + object_store_url, + file_schema: BatchBuilder::schema(), + file_groups: vec![vec![PartitionedFile { + object_meta, + partition_values: vec![], + range: None, + extensions: None, + }]], + statistics: Default::default(), + projection: None, + limit: None, + table_partition_cols: vec![], + }; + let exec = Arc::new( + ParquetExec::new(scan_config, Some(filter), None).with_scan_options(scan_options), + ); + let task_ctx = ctx.task_ctx(); - let result = collect(physical_plan, task_ctx).await?; + let result = collect(exec, task_ctx).await?; if debug { pretty::print_batches(&result)?; @@ -132,15 +193,15 @@ async fn execute_sql(ctx: &SessionContext, sql: &str, debug: bool) -> Result Result<(ObjectStoreUrl, ObjectMeta)> { let generator = Generator::new(); - let file = File::create(path).unwrap(); + let file = File::create(&path).unwrap(); let mut writer = ArrowWriter::try_new(file, generator.schema.clone(), None).unwrap(); let mut num_rows = 0; - let num_batches = 12_f32 * scale_factor; + let num_batches = 100_f32 * scale_factor; for batch in generator.take(num_batches as usize) { writer.write(&batch).unwrap(); @@ -149,6 +210,22 @@ fn gen_data(path: &PathBuf, scale_factor: f32) { writer.close().unwrap(); println!("Generated test dataset with {} rows", num_rows); + + let size = std::fs::metadata(&path)?.len() as usize; + + let canonical_path = path.canonicalize()?; + + let object_store_url = + ListingTableUrl::parse(canonical_path.to_str().unwrap_or_default())? + .object_store(); + + let object_meta = ObjectMeta { + location: Path::parse(canonical_path.to_str().unwrap_or_default())?, + last_modified: Default::default(), + size, + }; + + Ok((object_store_url, object_meta)) } #[derive(Default)] @@ -167,7 +244,6 @@ struct BatchBuilder { request_bytes: Int32Builder, response_bytes: Int32Builder, response_status: UInt16Builder, - response_body: StringBuilder, } impl BatchBuilder { @@ -194,8 +270,6 @@ impl BatchBuilder { Field::new("request_bytes", DataType::Int32, true), Field::new("response_bytes", DataType::Int32, true), Field::new("response_status", DataType::UInt16, false), - // This column will contain large values relative to the others - Field::new("response_body", DataType::Utf8, false), ])) } @@ -261,8 +335,6 @@ impl BatchBuilder { .append_option(rng.gen_bool(0.9).then(|| rng.gen())); self.response_status .append_value(status[rng.gen_range(0..status.len())]); - self.response_body - .append_value(random_string(rng, 200..2000)) } fn finish(mut self, schema: SchemaRef) -> RecordBatch { @@ -283,7 +355,6 @@ impl BatchBuilder { Arc::new(self.request_bytes.finish()), Arc::new(self.response_bytes.finish()), Arc::new(self.response_status.finish()), - Arc::new(self.response_body.finish()), ], ) .unwrap() From 3ca869885a57e46ba0ef272381b6b9eb0ac7eaa4 Mon Sep 17 00:00:00 2001 From: Dan Harris Date: Sat, 8 Oct 2022 10:16:15 -0400 Subject: [PATCH 3/6] Make sure we get the same results with pushdown disabled --- benchmarks/src/bin/parquet_filter_pushdown.rs | 24 ++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/benchmarks/src/bin/parquet_filter_pushdown.rs b/benchmarks/src/bin/parquet_filter_pushdown.rs index 5bf7c11ef70c..5fe3f75fe43a 100644 --- a/benchmarks/src/bin/parquet_filter_pushdown.rs +++ b/benchmarks/src/bin/parquet_filter_pushdown.rs @@ -8,11 +8,15 @@ use arrow::util::pretty; use datafusion::common::Result; use datafusion::datasource::listing::{ListingTableUrl, PartitionedFile}; use datafusion::datasource::object_store::ObjectStoreUrl; +use datafusion::execution::context::ExecutionProps; use datafusion::logical_expr::{lit, or, Expr}; +use datafusion::logical_plan::ToDFSchema; +use datafusion::physical_expr::create_physical_expr; use datafusion::physical_plan::collect; use datafusion::physical_plan::file_format::{ FileScanConfig, ParquetExec, ParquetScanOptions, }; +use datafusion::physical_plan::filter::FilterExec; use datafusion::prelude::{col, combine_filters, SessionConfig, SessionContext}; use object_store::path::Path; use object_store::ObjectMeta; @@ -121,7 +125,8 @@ async fn run_benchmarks( combine_filters(&[ col("request_method").not_eq(lit("GET")), col("response_status").eq(lit(400_u16)), - col("service").eq(lit("backend")), + // TODO this fails in the FilterExec with Error: Internal("The type of Dictionary(Int32, Utf8) = Utf8 of binary physical should be same") + // col("service").eq(lit("backend")), ]) .unwrap(), // Filter everything @@ -166,9 +171,10 @@ async fn exec_scan( scan_options: ParquetScanOptions, debug: bool, ) -> Result { + let schema = BatchBuilder::schema(); let scan_config = FileScanConfig { object_store_url, - file_schema: BatchBuilder::schema(), + file_schema: schema.clone(), file_groups: vec![vec![PartitionedFile { object_meta, partition_values: vec![], @@ -180,10 +186,22 @@ async fn exec_scan( limit: None, table_partition_cols: vec![], }; - let exec = Arc::new( + + let df_schema = schema.clone().to_dfschema()?; + + let physical_filter_expr = create_physical_expr( + &filter, + &df_schema, + schema.as_ref(), + &ExecutionProps::default(), + )?; + + let parquet_exec = Arc::new( ParquetExec::new(scan_config, Some(filter), None).with_scan_options(scan_options), ); + let exec = Arc::new(FilterExec::try_new(physical_filter_expr, parquet_exec)?); + let task_ctx = ctx.task_ctx(); let result = collect(exec, task_ctx).await?; From 8efb010902a0403c23a5a0d7f56a27f66a6a7a9a Mon Sep 17 00:00:00 2001 From: Dan Harris Date: Sat, 8 Oct 2022 10:17:01 -0400 Subject: [PATCH 4/6] Add license header --- benchmarks/src/bin/parquet_filter_pushdown.rs | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/benchmarks/src/bin/parquet_filter_pushdown.rs b/benchmarks/src/bin/parquet_filter_pushdown.rs index 5fe3f75fe43a..aedec9c97e8f 100644 --- a/benchmarks/src/bin/parquet_filter_pushdown.rs +++ b/benchmarks/src/bin/parquet_filter_pushdown.rs @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + use arrow::array::{ Int32Builder, StringBuilder, StringDictionaryBuilder, TimestampNanosecondBuilder, UInt16Builder, From 3c843cf1df5e83e9633e9295c6e9b65a2f47e12c Mon Sep 17 00:00:00 2001 From: Dan Harris <1327726+thinkharderdev@users.noreply.github.com> Date: Tue, 11 Oct 2022 15:44:29 -0400 Subject: [PATCH 5/6] Update benchmarks/README.md Co-authored-by: Andrew Lamb --- benchmarks/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/benchmarks/README.md b/benchmarks/README.md index 7cb8fedafc32..97a0bd4c6bf3 100644 --- a/benchmarks/README.md +++ b/benchmarks/README.md @@ -133,7 +133,7 @@ This is a set of benchmarks for testing and verifying performance of parquet fil a synthetic dataset generated during the benchmark execution and designed to simulate web server access logs. ```base -cargo run --release --bin parquet_filter_pushdown --query --path ./data --scale-factor 1.0 +cargo run --release --bin parquet_filter_pushdown -- --path ./data --scale-factor 1.0 ``` This will generate the synthetic dataset at `./data/logs.parquet`. The size of the dataset can be controlled through the `size_factor` From bb2aeac7d0c67fa3ae604968e9ec6d9b9e436f36 Mon Sep 17 00:00:00 2001 From: Dan Harris Date: Tue, 11 Oct 2022 15:47:02 -0400 Subject: [PATCH 6/6] PR comments --- benchmarks/src/bin/parquet_filter_pushdown.rs | 43 +++++++++++++++---- 1 file changed, 34 insertions(+), 9 deletions(-) diff --git a/benchmarks/src/bin/parquet_filter_pushdown.rs b/benchmarks/src/bin/parquet_filter_pushdown.rs index aedec9c97e8f..e3b365d4fade 100644 --- a/benchmarks/src/bin/parquet_filter_pushdown.rs +++ b/benchmarks/src/bin/parquet_filter_pushdown.rs @@ -38,6 +38,7 @@ use datafusion::prelude::{col, combine_filters, SessionConfig, SessionContext}; use object_store::path::Path; use object_store::ObjectMeta; use parquet::arrow::ArrowWriter; +use parquet::file::properties::WriterProperties; use rand::rngs::StdRng; use rand::{Rng, SeedableRng}; use std::fs::File; @@ -70,9 +71,13 @@ struct Opt { #[structopt(parse(from_os_str), required = true, short = "p", long = "path")] path: PathBuf, - /// Batch size when reading Parquet files - #[structopt(short = "s", long = "batch-size", default_value = "8192")] - batch_size: usize, + /// Data page size of the generated parquet file + #[structopt(long = "page-size")] + page_size: Option, + + /// Data page size of the generated parquet file + #[structopt(long = "row-group-size")] + row_group_size: Option, /// Total size of generated dataset. The default scale factor of 1.0 will generate a roughly 1GB parquet file #[structopt(short = "s", long = "scale-factor", default_value = "1.0")] @@ -84,14 +89,13 @@ async fn main() -> Result<()> { let opt: Opt = Opt::from_args(); println!("Running benchmarks with the following options: {:?}", opt); - let config = SessionConfig::new() - .with_target_partitions(opt.partitions) - .with_batch_size(opt.batch_size); + let config = SessionConfig::new().with_target_partitions(opt.partitions); let mut ctx = SessionContext::with_config(config); let path = opt.path.join("logs.parquet"); - let (object_store_url, object_meta) = gen_data(path, opt.scale_factor)?; + let (object_store_url, object_meta) = + gen_data(path, opt.scale_factor, opt.page_size, opt.row_group_size)?; run_benchmarks( &mut ctx, @@ -228,11 +232,31 @@ async fn exec_scan( Ok(result.iter().map(|b| b.num_rows()).sum()) } -fn gen_data(path: PathBuf, scale_factor: f32) -> Result<(ObjectStoreUrl, ObjectMeta)> { +fn gen_data( + path: PathBuf, + scale_factor: f32, + page_size: Option, + row_group_size: Option, +) -> Result<(ObjectStoreUrl, ObjectMeta)> { let generator = Generator::new(); let file = File::create(&path).unwrap(); - let mut writer = ArrowWriter::try_new(file, generator.schema.clone(), None).unwrap(); + + let mut props_builder = WriterProperties::builder(); + + if let Some(s) = page_size { + props_builder = props_builder + .set_data_pagesize_limit(s) + .set_write_batch_size(s); + } + + if let Some(s) = row_group_size { + props_builder = props_builder.set_max_row_group_size(s); + } + + let mut writer = + ArrowWriter::try_new(file, generator.schema.clone(), Some(props_builder.build())) + .unwrap(); let mut num_rows = 0; @@ -240,6 +264,7 @@ fn gen_data(path: PathBuf, scale_factor: f32) -> Result<(ObjectStoreUrl, ObjectM for batch in generator.take(num_batches as usize) { writer.write(&batch).unwrap(); + writer.flush()?; num_rows += batch.num_rows(); } writer.close().unwrap();