Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add parquet predicate pushdown metrics #3989

Merged
merged 4 commits into from Oct 30, 2022

Conversation

alamb
Copy link
Contributor

@alamb alamb commented Oct 27, 2022

Which issue does this PR close?

Part of #3463

Rationale for this change

I am trying to verify the correctness and efficiency of parquet predicate pushdown, so that we can turn it on in datafusion by default. Thus I want metrics telling me how many rows were pruned as well as how long it took.

What changes are included in this PR?

  1. Adds metric for how many rows were pruned using predicate pushdown
  2. Add metric for how long the pruning took
  3. Add tests

Are there any user-facing changes?

New metrics:

You can see them in explain analyze:

+-------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Plan with Metrics | ProjectionExec: expr=[COUNT(UInt8(1))@0 as COUNT(UInt8(1))], metrics=[output_rows=1, elapsed_compute=291ns, spill_count=0, spilled_bytes=0, mem_used=0]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 |
|                   |   AggregateExec: mode=Final, gby=[], aggr=[COUNT(UInt8(1))], metrics=[output_rows=1, elapsed_compute=7.855µs, spill_count=0, spilled_bytes=0, mem_used=0]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                               |
|                   |     CoalescePartitionsExec, metrics=[output_rows=16, elapsed_compute=13.775µs, spill_count=0, spilled_bytes=0, mem_used=0]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                              |
|                   |       AggregateExec: mode=Partial, gby=[], aggr=[COUNT(UInt8(1))], metrics=[output_rows=16, elapsed_compute=38.622µs, spill_count=0, spilled_bytes=0, mem_used=0]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       |
|                   |         CoalesceBatchesExec: target_batch_size=4096, metrics=[output_rows=25550, elapsed_compute=213.035µs, spill_count=0, spilled_bytes=0, mem_used=0]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 |
|                   |           FilterExec: container@0 = database_container_1, metrics=[output_rows=25550, elapsed_compute=185.685µs, spill_count=0, spilled_bytes=0, mem_used=0]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                            |
|                   |             RepartitionExec: partitioning=RoundRobinBatch(16), metrics=[send_time{inputPartition=0}=4.525µs, fetch_time{inputPartition=0}=4.6344ms, repart_time{inputPartition=0}=1ns]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  |
|                   |               ParquetExec: limit=None, partitions=[data.parquet], predicate=container_min@0 <= database_container_1 AND database_container_1 <= container_max@1, projection=[container], metrics=[output_rows=25550, elapsed_compute=1ns, spill_count=0, spilled_bytes=0, mem_used=0, row_groups_pruned{filename=data.parquet}=0, bytes_scanned{filename=data.parquet}=354, num_predicate_creation_errors=0, pushdown_rows_filtered{filename=data.parquet}=225441, predicate_evaluation_errors{filename=data.parquet}=0, time_elapsed_processing=4.43559ms, pushdown_eval_time{filename=data.parquet}=3.602505ms, time_elapsed_scanning=4.247177ms, time_elapsed_opening=285.456µs] |
|                   |                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                         |
+-------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

You can see:

pushdown_rows_filtered{filename=data.parquet}=225441, 
pushdown_eval_time{filename=data.parquet}=3.602505ms

@alamb alamb marked this pull request as draft October 27, 2022 19:35
@github-actions github-actions bot added the core Core datafusion crate label Oct 27, 2022
@alamb alamb force-pushed the alamb/parquet_predicate_pushdown_metrics branch from 576d488 to b7171b4 Compare October 27, 2022 20:00
@alamb alamb marked this pull request as ready for review October 27, 2022 20:04
match self
.physical_expr
.evaluate(&batch)
.map(|v| v.into_array(batch.num_rows()))
{
Ok(array) => {
if let Some(mask) = array.as_any().downcast_ref::<BooleanArray>() {
Ok(BooleanArray::from(mask.data().clone()))
let bool_arr = BooleanArray::from(mask.data().clone());
// TODO is there a more efficient way to count the rows that are filtered?
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tustvold do you have any suggestions on how to count the number of true values in a boolean array faster/better than this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you don't care about nulls

bool_arr.values().count_set_bits_offset(self.offset(), self.len())

If you do care about nulls it is slightly more complicated, I'll get something into arrow-rs

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

apache/arrow-rs#2957

Could copy paste for now, it isn't hugely complicated

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do care about nulls, sadly, -- it needs to be Non-null and true.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can file a ticket in arrow-rs if that would be helpful

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks again @tustvold -- I filed apache/arrow-rs#2963 and will copy/paste your implementation here for nwo

@alamb
Copy link
Contributor Author

alamb commented Oct 27, 2022

cc @Ted-Jiang I am thinking we can use a similar approach to validate / verify the PageIndex pruning you are working on

@Ted-Jiang
Copy link
Member

cc @Ted-Jiang I am thinking we can use a similar approach to validate / verify the PageIndex pruning you are working on

Sounds great!

Copy link
Member

@Ted-Jiang Ted-Jiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, I think if there any more than one predicates in one query, should we record the each predicate's input records count to calculate the efficiency 🤔 So could guide the user do the rearrangement.

@alamb
Copy link
Contributor Author

alamb commented Oct 28, 2022

LGTM, I think if there any more than one predicates in one query, should we record the each predicate's input records count to calculate the efficiency 🤔 So could guide the user do the rearrangement.

It is a great idea -- filed #3998 and I will work on that next

@alamb
Copy link
Contributor Author

alamb commented Oct 28, 2022

I am going to merge this in shortly as I have several other PRs #3976 and one in IOx that depend on it, unless there are objections

@alamb alamb merged commit afc299a into apache:master Oct 30, 2022
@alamb alamb deleted the alamb/parquet_predicate_pushdown_metrics branch October 30, 2022 11:32
@alamb
Copy link
Contributor Author

alamb commented Oct 30, 2022

I am not sure when I will have time to add per-predicate metrics -- I'll see how my other projects go.

@ursabot
Copy link

ursabot commented Oct 30, 2022

Benchmark runs are scheduled for baseline = 71f05a3 and contender = afc299a. afc299a is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
Conbench compare runs links:
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ec2-t3-xlarge-us-east-2] ec2-t3-xlarge-us-east-2
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on test-mac-arm] test-mac-arm
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ursa-i9-9960x] ursa-i9-9960x
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ursa-thinkcentre-m75q] ursa-thinkcentre-m75q
Buildkite builds:
Supported benchmarks:
ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True
test-mac-arm: Supported benchmark langs: C++, Python, R
ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java

jimexist pushed a commit to jimexist/arrow-datafusion that referenced this pull request Oct 31, 2022
* Log error building row filters

Inspired by @liukun4515 at https://github.com/apache/arrow-datafusion/pull/3380/files#r970198755

* Add parquet predicate pushdown metrics

* more efficient bit counting
Dandandan pushed a commit to yuuch/arrow-datafusion that referenced this pull request Nov 5, 2022
* Log error building row filters

Inspired by @liukun4515 at https://github.com/apache/arrow-datafusion/pull/3380/files#r970198755

* Add parquet predicate pushdown metrics

* more efficient bit counting
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core datafusion crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants