Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat] Support using offset index in ParquetRecordBatchStream when pu… #3616

Merged
merged 2 commits into from
Sep 28, 2022

Conversation

Ted-Jiang
Copy link
Member

@Ted-Jiang Ted-Jiang commented Sep 26, 2022

…shing down RowFilter.

Signed-off-by: yangjiang yangjiang@ebay.com

Which issue does this PR close?

Closes #3456

Rationale for this change

enable read page index in ParquetScanOptions
If true, the reader will read pageIndex, If exit:

  1. First we can use it create the RowSelector before read the file (like row-group pruning avoid I/O)
  2. Second with pageIndex it will accelerate skip records (avoid decode pageHeader) when reading values from chunk with RowSelector.

What changes are included in this PR?

update submodule testing and parquet-testing

Are there any user-facing changes?

@github-actions github-actions bot added the core Core datafusion crate label Sep 26, 2022
@codecov-commenter
Copy link

codecov-commenter commented Sep 26, 2022

Codecov Report

Merging #3616 (875542d) into master (b54a56f) will decrease coverage by 0.01%.
The diff coverage is 90.24%.

@@            Coverage Diff             @@
##           master    #3616      +/-   ##
==========================================
- Coverage   86.03%   86.02%   -0.01%     
==========================================
  Files         300      300              
  Lines       56253    56456     +203     
==========================================
+ Hits        48395    48564     +169     
- Misses       7858     7892      +34     
Impacted Files Coverage Δ
...sion/core/src/physical_plan/file_format/parquet.rs 94.33% <57.14%> (-0.32%) ⬇️
...afusion/core/src/datasource/file_format/parquet.rs 86.30% <97.05%> (+0.73%) ⬆️
datafusion/physical-expr/src/regex_expressions.rs 64.38% <0.00%> (-18.66%) ⬇️
datafusion/core/src/physical_plan/sorts/cursor.rs 57.14% <0.00%> (-7.15%) ⬇️
datafusion/expr/src/operator.rs 98.43% <0.00%> (-1.57%) ⬇️
datafusion/core/src/execution/context.rs 78.71% <0.00%> (-0.60%) ⬇️
datafusion/physical-expr/src/expressions/binary.rs 97.42% <0.00%> (-0.21%) ⬇️
datafusion/optimizer/src/simplify_expressions.rs 82.67% <0.00%> (-0.13%) ⬇️
datafusion/physical-expr/src/functions.rs 92.66% <0.00%> (-0.10%) ⬇️
datafusion/core/tests/dataframe.rs 98.56% <0.00%> (-0.06%) ⬇️
... and 43 more

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

@Ted-Jiang
Copy link
Member Author

@alamb @liukun4515 @thinkharderdev PTAL

@thinkharderdev
Copy link
Contributor

Thanks @Ted-Jiang. I think it may be better to just pass the option to ParquetRecordBatchStreamBuilder. It will fetch the pages indexes in concurrently and if users would like to cache the indexes (or fetch them from somewhere other than the file metadata they can do so with a custom AsyncFileReader)

@Ted-Jiang
Copy link
Member Author

Ted-Jiang commented Sep 26, 2022

pass the option to ParquetRecordBatchStreamBuilder

Thanks for your advice! @thinkharderdev , IMOP, if we want to read page_index we must get its location from metadata, so it must read after metadata. i think put it directly behind read metadata is better.

if users would like to cache the indexes other than the file metadata

the reason why i put the index in metadata is to reduce the code change in repo, if the user want to cache it anywhere else can use pub API in page_index::index_reader to read with metadata.

If i am wrong or misunderstand plz correct me 😊?

@thinkharderdev
Copy link
Contributor

pass the option to ParquetRecordBatchStreamBuilder

Thanks for your advice! @thinkharderdev , IMOP, if we want to read page_index we must get its location from metadata, so it must read after metadata. i think put it directly behind read metadata is better.

if users would like to cache the indexes other than the file metadata

the reason why i put the index in metadata is to reduce the code change in repo, if the user want to cache it anywhere else can use pub API in page_index::index_reader to read with metadata.

If i am wrong or misunderstand plz correct me 😊?

Correct, the advantage of what ParquetRecordBatchStreamBuilder does in arrow-rs is that it fill fetch all the indexes concurrently using AsyncFileReader::get_ranges. So if you don't read the metadata up front then it will fetch it automatically for you. If we do want to grab it up front in ParquetExec then we should copy the implementation in arrow-rs to grab everything concurrently.

@Ted-Jiang
Copy link
Member Author

Ted-Jiang commented Sep 27, 2022

AsyncFileReader

@thinkharderdev Thanks! If am right, we should make the read index api into async, so i will file a ticket to replace below into async base on AsyncFileReader::get_ranges add to arrow-rs to make it concurrently as you mentioned.

  // TODO add async version in arrow-rs avoid read whole file.
        let bytes = store.get_range(&meta.location, 0..meta.size).await?;
        let mut location_vec = vec![];
        let mut index_vec = vec![];
        for rg in result_meta.row_groups() {
            location_vec.push(index_reader::read_pages_locations(&bytes, rg.columns())?);
            index_vec.push(index_reader::read_columns_indexes(&bytes, rg.columns())?);
        }

I prefer keep read page_index in ParquetFileReader::get_metadata and save them in ParquetMetaData already define in arrow-rs

/// Global Parquet metadata.
#[derive(Debug, Clone)]
pub struct ParquetMetaData {
    file_metadata: FileMetaData,
    row_groups: Vec<RowGroupMetaData>,
    /// Page index for all pages in each column chunk
    page_indexes: Option<ParquetColumnIndex>,
    /// Offset index for all pages in each column chunk
    offset_indexes: Option<ParquetOffsetIndex>,
}

because it can reduce the code change ,
Secondly in parquet open file first thing we should do is read file metadata, following build_row_filter, build_selection_base_on_index(todo) should depend on this.🤔

@thinkharderdev
Copy link
Contributor

AsyncFileReader

@thinkharderdev Thanks! If am right, we should make the read index api into async, so i will file a ticket to replace below into async base on AsyncFileReader::get_ranges add to arrow-rs to make it concurrently as you mentioned.

  // TODO add async version in arrow-rs avoid read whole file.
        let bytes = store.get_range(&meta.location, 0..meta.size).await?;
        let mut location_vec = vec![];
        let mut index_vec = vec![];
        for rg in result_meta.row_groups() {
            location_vec.push(index_reader::read_pages_locations(&bytes, rg.columns())?);
            index_vec.push(index_reader::read_columns_indexes(&bytes, rg.columns())?);
        }

I prefer keep read page_index in ParquetFileReader::get_metadata and save them in ParquetMetaData already define in arrow-rs

/// Global Parquet metadata.
#[derive(Debug, Clone)]
pub struct ParquetMetaData {
    file_metadata: FileMetaData,
    row_groups: Vec<RowGroupMetaData>,
    /// Page index for all pages in each column chunk
    page_indexes: Option<ParquetColumnIndex>,
    /// Offset index for all pages in each column chunk
    offset_indexes: Option<ParquetOffsetIndex>,
}

because it can reduce the code change , Secondly in parquet open file first thing we should do is read file metadata, following build_row_filter, build_selection_base_on_index(todo) should depend on this.🤔

I think we may be talking about different things :).

I'm saying the code to fetch the indexes already exists in arrow-rs so we don't need to duplicate the code in datafusion. You can just construct the ArrowReadOptions to enable the page index and ParquetRecordBatchStreamBuilder will fetch the indexes (and do so concurrently) (see https://github.com/apache/arrow-rs/blob/a7cf274765945af4111fddaeec26d672715de9d0/parquet/src/arrow/async_reader.rs#L225).

let mut options = ArrowReaderOptions::new().with_page_index(true);

if enable_page_index {
   options = options.with_page_index(true);
}

let builder =
   ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
      .await?

@Ted-Jiang Ted-Jiang closed this Sep 27, 2022
@Ted-Jiang Ted-Jiang reopened this Sep 27, 2022
@Ted-Jiang
Copy link
Member Author

Ted-Jiang commented Sep 27, 2022

AsyncFileReader

@thinkharderdev Thanks! If am right, we should make the read index api into async, so i will file a ticket to replace below into async base on AsyncFileReader::get_ranges add to arrow-rs to make it concurrently as you mentioned.

  // TODO add async version in arrow-rs avoid read whole file.
        let bytes = store.get_range(&meta.location, 0..meta.size).await?;
        let mut location_vec = vec![];
        let mut index_vec = vec![];
        for rg in result_meta.row_groups() {
            location_vec.push(index_reader::read_pages_locations(&bytes, rg.columns())?);
            index_vec.push(index_reader::read_columns_indexes(&bytes, rg.columns())?);
        }

I prefer keep read page_index in ParquetFileReader::get_metadata and save them in ParquetMetaData already define in arrow-rs

/// Global Parquet metadata.
#[derive(Debug, Clone)]
pub struct ParquetMetaData {
    file_metadata: FileMetaData,
    row_groups: Vec<RowGroupMetaData>,
    /// Page index for all pages in each column chunk
    page_indexes: Option<ParquetColumnIndex>,
    /// Offset index for all pages in each column chunk
    offset_indexes: Option<ParquetOffsetIndex>,
}

because it can reduce the code change , Secondly in parquet open file first thing we should do is read file metadata, following build_row_filter, build_selection_base_on_index(todo) should depend on this.🤔

I think we may be talking about different things :).

I'm saying the code to fetch the indexes already exists in arrow-rs so we don't need to duplicate the code in datafusion. You can just construct the ArrowReadOptions to enable the page index and ParquetRecordBatchStreamBuilder will fetch the indexes (and do so concurrently) (see https://github.com/apache/arrow-rs/blob/a7cf274765945af4111fddaeec26d672715de9d0/parquet/src/arrow/async_reader.rs#L225).

let mut options = ArrowReaderOptions::new().with_page_index(true);

if enable_page_index {
   options = options.with_page_index(true);
}

let builder =
   ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
      .await?

oh! i miss this part 😂 , using the order version arrow-rs

@Ted-Jiang Ted-Jiang marked this pull request as draft September 27, 2022 13:25
…shing down RowFilter.

Signed-off-by: yangjiang <yangjiang@ebay.com>
@Ted-Jiang Ted-Jiang marked this pull request as ready for review September 27, 2022 16:04
@Ted-Jiang
Copy link
Member Author

@thinkharderdev Sorry for the mistake, i think its ready.

@tustvold
Copy link
Contributor

This is modifying submodules, is this intentional?

@Ted-Jiang
Copy link
Member Author

This is modifying submodules, is this intentional?

update the submodules for using the new added test parquet files alltypes_tiny_pages.parquet and alltypes_tiny_pages_plain.parquet with page index
apache/parquet-testing#25

Copy link
Contributor

@tustvold tustvold left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me, only a minor suggestion.

Thank you for working on this, do you intend to work on hooking up the column index as well?

datafusion/core/src/physical_plan/file_format/parquet.rs Outdated Show resolved Hide resolved
Co-authored-by: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com>
@Ted-Jiang
Copy link
Member Author

Looks good to me, only a minor suggestion.

Thank you for working on this, do you intend to work on hooking up the column index as well?

yes, i plan to do using min/max stats and filter to create the RowSelector 🤔

@tustvold tustvold merged commit 87faf86 into apache:master Sep 28, 2022
@ursabot
Copy link

ursabot commented Sep 28, 2022

Benchmark runs are scheduled for baseline = 451e441 and contender = 87faf86. 87faf86 is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
Conbench compare runs links:
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ec2-t3-xlarge-us-east-2] ec2-t3-xlarge-us-east-2
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on test-mac-arm] test-mac-arm
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ursa-i9-9960x] ursa-i9-9960x
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ursa-thinkcentre-m75q] ursa-thinkcentre-m75q
Buildkite builds:
Supported benchmarks:
ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True
test-mac-arm: Supported benchmark langs: C++, Python, R
ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is great -- thank you @Ted-Jiang and @tustvold

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core datafusion crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support using offset index in ParquetRecordBatchStream when pushing down RowFilter
6 participants