Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable serialized_reader read specific Page by passing row ranges. #1977

Closed
wants to merge 1 commit into from

Conversation

Ted-Jiang
Copy link
Member

@Ted-Jiang Ted-Jiang commented Jun 30, 2022

Which issue does this PR close?

Closes #1976.

Rationale for this change

Part support #1792
if we want to use page index get row ranges , first use SerializedFileReader get pageIndex info, then use this index get
row_ranges like below:

        //filter `x < 11`
        let filter =
            |page: &PageIndex<i32>| page.max.as_ref().map(|&x| x < 11).unwrap_or(false);

        let mask = index.indexes.iter().map(filter).collect::<Vec<_>>();

        let row_ranges = compute_row_ranges(&mask, locations, total_rows).unwrap();

Finally we can pass the row_ranges to new API to read parquet file(datafusion use this way but without row_ranges)

fn get_record_reader_by_columns_and_row_ranges(
        &mut self,
        mask: ProjectionMask,
        row_ranges: &RowRanges,
        batch_size: usize,
    ) -> Result<ParquetRecordBatchReader> {

What changes are included in this PR?

One example: if we read col1, col2 and apply filter get the result we need read row_ranges[20, 80],
For col1:
we need all data from page1, page2, page3.
For col2:
after this PR, we will filter page2 and keep page0, page1
as for page1: need all data
as for page0: we need part of its row_range(need row align TODO)

 * rows   col1   col2   col3
 *      ┌──────┬──────┬──────┐
 *   0  │  p0  │      │      │
 *      ╞══════╡  p0  │  p0  │
 *  20  │ p1(X)│------│------│
 *      ╞══════╪══════╡      │
 *  40  │ p2(X)│      │------│
 *      ╞══════╡ p1(X)╞══════╡
 *  60  │ p3(X)│      │------│
 *      ╞══════╪══════╡      │
 *  80  │  p4  │      │  p1  │
 *      ╞══════╡  p2  │      │
 * 100  │  p5  │      │      │
 *      └──────┴──────┴──────┘
 * 

Are there any user-facing changes?

@Ted-Jiang Ted-Jiang marked this pull request as draft June 30, 2022 12:38
@github-actions github-actions bot added the parquet Changes to the parquet crate label Jun 30, 2022
) -> Result<Box<dyn ArrayReader>> {
match field.field_type {
ParquetFieldType::Primitive { .. } => build_primitive_reader(field, row_groups),
ParquetFieldType::Primitive { .. } => {
build_primitive_reader(field, row_groups, row_groups_filter_offset_index)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now just support primitive_reader

i: usize,
row_groups_filter_offset_index: Option<&Vec<FilterOffsetIndex>>,
) -> Result<Box<dyn PageIterator>> {
//todo support page level filter
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will support in InMemoryReader

page_indexes: Option<Vec<Index>>,
offset_indexes: Option<Vec<Vec<PageLocation>>>,
page_indexes: Option<Vec<Vec<Index>>>,
offset_indexes: Option<Vec<Vec<Vec<PageLocation>>>>,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need 3 level vec: row_group -> column chunck -> page

@liukun4515
Copy link
Contributor

I'm confused about the design of the new API described above

fn get_record_reader_by_columns_and_row_ranges(
        &mut self,
        mask: ProjectionMask,
        row_ranges: &RowRanges,
        batch_size: usize,
    ) -> Result<ParquetRecordBatchReader> {

I think column index reader should be a function for parquet reader or parquet-rs, any one who call the parquet reader should get the benefit from this optimization with a filter.

From your implementation, I find user need to call the lower api and use the column index to calculate the ranges. If so, Any user who want to use the column index of the parquet should add complex custom logic to fit this lower interface.
What is your option?
@sunchao @tustvold @viirya

@sunchao
Copy link
Member

sunchao commented Jul 1, 2022

Yes, I think the row ranges are internal to parquet-rs and should be calculated during the predicate pushdown.

@Ted-Jiang
Copy link
Member Author

should be calculated during the predicate pushdown.

Yes, i agree it's better keep it private.
But in current code base we not have a filter struct like in java to provide predicate pushdown.
Like datafusion use 'mask' (level for column), 'row ranges' is like a page_mask

  fn get_record_reader_by_columns(
        &mut self,
        mask: ProjectionMask,
        batch_size: usize,
    ) -> Result<ParquetRecordBatchReader> {

I think for now, we make it pub, after full support for predicate pushdown, we will do like in java.

@Ted-Jiang Ted-Jiang marked this pull request as ready for review July 1, 2022 08:02
@Ted-Jiang
Copy link
Member Author

@tustvold @viirya others may interests in this, PTAL😊

}
}

pub(crate) fn set_row_ranges(&mut self, row_ranges: RowRanges) {
self.selected_row_ranges = Some(row_ranges);
Copy link
Member Author

@Ted-Jiang Ted-Jiang Jul 1, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need this row_ranges for row align in future.

@@ -353,6 +392,31 @@ impl<'a, R: 'static + ChunkReader> RowGroupReader for SerializedRowGroupReader<'
Ok(Box::new(page_reader))
}

fn get_column_page_reader_with_offset_index(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cause of lack test data in sub project in parquet-testing, will add end to end test after add test file in it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can add UT using the writer API.
#1935 has been merged, the parquet file contains the column index by default.

let iterator = FilePageIterator::new(column_index, Arc::clone(self))?;
fn column_chunks(
&self,
i: usize,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
i: usize,
column_index: usize,

let mut columns_indexes = vec![];
let mut offset_indexes = vec![];
for rg in &filtered_row_groups {
let c = index_reader::read_columns_indexes(&chunk_reader, rg.columns())?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a schema has co1,col2,col3.....col8, and we just need the col1 and col3, do we need to load other useless index data?

Copy link
Contributor

@tustvold tustvold left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've had a quick review, unfortunately I think this is missing a key detail. In particular the arrow writer must read the same records from each of its columns. As written this simply skips reading pruned pages from columns. There is no relationship between the page boundaries across columns within a parquet, and therefore this will return different rows for each of the columns.

As described in #1791 (review), you will need to extract the row selection in addition to the page selection, and push this into RecordReader and ColumnValueDecoder. This will also make the API clearer, as we aren't going behind their back and skipping pages at the block-level

Comment on lines 102 to 103
self.skip_arrow_metadata = skip_arrow_metadata;
self
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
self.skip_arrow_metadata = skip_arrow_metadata;
self
{
skip_arrow_metadata,
..self
}

And same below

Comment on lines 148 to 149
if self.options.selected_rows.is_some() {
let ranges = &self.options.selected_rows.as_ref().unwrap().clone();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if self.options.selected_rows.is_some() {
let ranges = &self.options.selected_rows.as_ref().unwrap().clone();
if let Some(ranges) = self.options.selected_rows.as_ref()

@@ -68,11 +70,19 @@ pub trait ArrowReader {
mask: ProjectionMask,
batch_size: usize,
) -> Result<Self::RecordReader>;

fn get_record_reader_by_columns_and_row_ranges(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this, or is the ArrowReaderOptions sufficient?

@@ -55,8 +55,8 @@ use crate::schema::types::{
pub struct ParquetMetaData {
file_metadata: FileMetaData,
row_groups: Vec<RowGroupMetaData>,
page_indexes: Option<Vec<Index>>,
offset_indexes: Option<Vec<Vec<PageLocation>>>,
page_indexes: Option<Vec<Vec<Index>>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
page_indexes: Option<Vec<Vec<Index>>>,
/// Page index for all pages in each column chunk
page_indexes: Option<Vec<Vec<Index>>>,

Or something like that, same for the below


/// get a serially readable slice of the current reader
/// This should fail if the slice exceeds the current bounds
fn get_multi_range_read(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed on #1955 I'm not a fan of this, I would much rather the page reader reads pages, than skipping byte ranges behind its back.

It also changes the semantics of how a column chunk is read, as it now buffers in memory an extra time

for (start, length) in start_list.into_iter().zip(length_list.into_iter()) {
combine_vec.extend(self.slice(start..start + length).to_vec());
}
let reader = Bytes::copy_from_slice(combine_vec.as_slice()).reader();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This adds an additional copy of all the page bytes, which is definitely not ideal...

// read from parquet file which before the footer.
offset_index: Vec<PageLocation>,

// use to keep needed page index.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this necessary?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We pass all PageLocation and RowRanges in this struct then do the filter logic.

if we have 5 pages, in try_new, we filter 2 pages and keep these 3 pages index_numbers in this index_map for final calculate_offset_range.

@Ted-Jiang
Copy link
Member Author

Ted-Jiang commented Jul 1, 2022

I've had a quick review, unfortunately I think this is missing a key detail. In particular the arrow writer must read the same records from each of its columns. As written this simply skips reading pruned pages from columns. There is no relationship between the page boundaries across columns within a parquet, and therefore this will return different rows for each of the columns.

Thanks @tustvold, your are right. Maybe I made the title confusing😭. as you mentioned in [#1791 (review)]. (#1791 (review)):

Pass row selection down to RecordReader
Add a skip_next_page to PageReader
Add a skip_values to ColumnValueDecoder

This pr is only about the skip_next_page part, we will only return the needed page metadata in iterator. As make the same records from each of its columns (row align), i prefer support in next pr. I prefer to separate them to avoid huge PR and conflict. If you prefer to combine them, I will make this in progress and keep developing.

As described in #1791 (review), you will need to extract the row selection in addition to the page selection, and push this into RecordReader and ColumnValueDecoder. This will also make the API clearer, as we aren't going behind their back and skipping pages at the block-level

As above, need pass the row_ranges to ColumnValueReader in future.

@tustvold
Copy link
Contributor

tustvold commented Jul 1, 2022

I think small incremental PRs is a good approach. However, I have concerns with this specific PR:

  • It introduces public APIs that don't have clear semantics (the skipped rows are somewhat arbitrary)
  • I would prefer an approach that collocates the page and row skipping logic, instead of treating them as separate concerns. Once RecordReader is skipping rows it will be incredibly confusing if pages are being skipped somewhere else in addition

I wonder if a plan of attack akin to the following might work:

  • Add a skip page function to SerializedPageReader that uses the column index to skip the next page without reading it (we may need to change it to take ChunkReader instead of Read)
  • Same as the above for InMemoryPageReader
  • Add the ability to skip decoding rows from a page to ColumnValueDecoder, potentially each impl as a separate PR
  • Pass index and row selection down to RecordReader
  • Perform skipping

Currently it feels like we're adding the high-level functionality before the necessary lower level functionality exists, and this means low-level details, like the page delimiting, leak out of the high-level APIs.

Edit: I'll try and stub out some APIs for what I mean over the next couple of days. This will also help me validate my mental model checks out 😅

@Ted-Jiang
Copy link
Member Author

Edit: I'll try and stub out some APIs for what I mean over the next couple of days. This will also help me validate my mental model checks out 😅

Got it, i will delete the get_record_reader_by_columns_and_row_ranges and use options avoid public APIs

tomorrow i will start with

  • Add a skip page function to SerializedPageReader that uses the column index to skip the next page without reading it (we may need to change it to take ChunkReader instead of Read)

After read the code, i think if we want to skip page in SerializedPageReader we need get the page meta, but in SerializedPageReader it only care about the decode work. the pages offset already set to buf: T in SerializedRowGroupReader so i try to pass column index to SerializedRowGroupReader to change pages offset.
So one question 🤔
So if i want to add skip page i need add owned page location and selected row to SerializedPageReader?

I got worried about where should i pass the column index info.

@tustvold
Copy link
Contributor

tustvold commented Jul 1, 2022

I got worried about where should i pass the column index info

Give me a day or so and I'll get a PR up with some stuff stubbed out, I think this exercise will help us both 😄

@Ted-Jiang
Copy link
Member Author

Ted-Jiang commented Jul 1, 2022

I got worried about where should i pass the column index info

Give me a day or so and I'll get a PR up with some stuff stubbed out, I think this exercise will help us both 😄

Sure ! this really need some time! 💪

@tustvold
Copy link
Contributor

tustvold commented Jul 5, 2022

Marking as a draft, as I think the approach in #1998 is what we will take forward

@tustvold tustvold marked this pull request as draft July 5, 2022 13:18
@tustvold
Copy link
Contributor

@Ted-Jiang Can this be closed now?

@Ted-Jiang Ted-Jiang closed this Jul 15, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
parquet Changes to the parquet crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Enable serialized_reader read specific Page by passing row ranges.
4 participants