Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use offset index in ParquetRecordBatchStream #2526

Merged
merged 6 commits into from Aug 20, 2022

Conversation

thinkharderdev
Copy link
Contributor

@thinkharderdev thinkharderdev commented Aug 19, 2022

Which issue does this PR close?

Closes #2430
Closes #2434

Rationale for this change

Leverage OffsetIndex (if available) to prune IO in ParquetRecordBatchStream

What changes are included in this PR?

Allow user to specify read options in ParquetRecordBatchStreamBuilder which will fetch index metadata when building.

Assorted bug fixes:

  • Bug in RowSelection::scan_ranges when skipping past the final page boundary
  • We were not fetching dictionary pages when creating SerializedPageReader from InMemoryColumnChunk if we had a page index.

Are there any user-facing changes?

@github-actions github-actions bot added the parquet Changes to the parquet crate label Aug 19, 2022
@@ -34,8 +34,12 @@ pub fn read_columns_indexes<R: ChunkReader>(
let (offset, lengths) = get_index_offset_and_lengths(chunks)?;
let length = lengths.iter().sum::<usize>();

if length == 0 {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if this is right or we should return an empty vec?

@github-actions github-actions bot added the object-store Object Store Interface label Aug 19, 2022

/// Provides asynchronous access to the the page index for each column chunk in a
/// row group. Will panic if `row_group_idx` is greater than or equal to `num_row_groups`
fn get_column_indexes(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if we should make this a separate constructor or just have it always use the index. If the index doesn't exist, the cost of determining that is minimal.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure of the value of exposing these on AsyncFileReader, and not just handling the logic internally. Ultimately if the implementer wants to override the way the index is fetched, they can just return ParquetMetadata from get_metadata with the index information already loaded.

Copy link
Contributor

@tustvold tustvold left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I would opt to keep more of this logic hidden, i.e. not exposed on AsyncFileReader, and make use get_byte_ranges to avoid making lots of separate small fetch requests

@@ -1068,6 +1068,7 @@ mod tests {
integration.head(&path).await.unwrap();
}

#[ignore]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😄

@@ -218,6 +296,36 @@ impl<T: AsyncFileReader + Send + 'static> ArrowReaderBuilder<AsyncReader<T>> {
Self::new_builder(AsyncReader(input), metadata, Default::default())
}

pub async fn new_with_index(mut input: T) -> Result<Self> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be more consistent to have new_with_options accepting ArrowReaderOptions. This already has a field on it for the page index

}
}

let metadata = Arc::new(ParquetMetaData::new_with_page_index(
Copy link
Contributor

@tustvold tustvold Aug 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not part of this PR, but I still feel something is off with the way the index information is located on ParquetMetadata...

Edit: filed #2530

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I agree.

@@ -64,6 +68,10 @@ pub fn read_pages_locations<R: ChunkReader>(
) -> Result<Vec<Vec<PageLocation>>, ParquetError> {
let (offset, total_length) = get_location_offset_and_total_length(chunks)?;

if total_length == 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this might fix #2434

@@ -162,7 +162,12 @@ impl RowSelection {
current_selector = selectors.next();
}
} else {
break;
if !(selector.skip || current_page_included) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are the implications of this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before we would break if we were on the last page. So if you skipped from inside the second to last page into the last page, then this would short circuit and the last page range wouldn't be selected.


//read all need data into buffer
let data = self
.get_bytes(offset as usize..offset as usize + length)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will perform separate get_bytes requests to fetch the page index and column index information for each column chunk. This is likely not a good idea, especially since this will be performed serially.

Ideally we would identify the ranges of all the index information, and then call get_byte_ranges, this will allow coalescing proximate requests to ObjectStore, paralell fetch, etc...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that makes sense.

let mut offset_indexes = vec![];

for (idx, rg) in row_groups.iter_mut().enumerate() {
let column_index = input.get_column_indexes(metadata.clone(), idx).await?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should check if the column index has already been fetched to the metadata, and not fetch it again if it is already present


/// Provides asynchronous access to the the page index for each column chunk in a
/// row group. Will panic if `row_group_idx` is greater than or equal to `num_row_groups`
fn get_column_indexes(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure of the value of exposing these on AsyncFileReader, and not just handling the logic internally. Ultimately if the implementer wants to override the way the index is fetched, they can just return ParquetMetadata from get_metadata with the index information already loaded.

@github-actions github-actions bot removed the object-store Object Store Interface label Aug 19, 2022
// then we need to also fetch a dictionary page.
let mut ranges = vec![];
let (start, _len) = chunk_meta.byte_range();
match page_locations[idx].first() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discovered this bug as well. We weren't fetching a dictionary page if it existed

@@ -312,7 +312,7 @@ where

// If page has less rows than the remaining records to
// be skipped, skip entire page
if metadata.num_rows < remaining {
if metadata.num_rows <= remaining {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another bug I uncovered while testing. This causes the reader to try and fetch the pages unnecessarily which are not pre-fetched

return Self::new_builder(AsyncReader(input), metadata, options);
}

fetch_ranges.push(loc_offset as usize..loc_offset as usize + loc_length);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this will read one col_index and page_location alternately. but they are written separately.
https://github.com/apache/parquet-format/blob/master/doc/images/PageIndexLayout.png

I think if we not cache all bytes in memory, we should read whole col_index then page_location.
why not we use 🤔

/// Read on row group's all columns indexes and change into  [`Index`]
/// If not the format not available return an empty vector.
pub fn read_columns_indexes<R: ChunkReader>(
    reader: &R,
    chunks: &[ColumnChunkMetaData],
) -> Result<Vec<Index>, ParquetError> {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can't use read_columns_indexes as this needs to asynchronously fetch the bytes

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh! Thanks! but we can still try combine all col_index or page_location together separately

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The onus is on AsyncFileReader::get_ranges to handle this, e.g. ObjectStore::get_ranges does this already

index_reader::get_location_offset_and_total_length(rg.columns())?;

let (idx_offset, idx_lengths) =
index_reader::get_index_offset_and_lengths(rg.columns())?;
Copy link
Contributor

@tustvold tustvold Aug 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It occurs to me that this method is making a pretty strong assumption that the column index data is contiguous, I'm not sure this is actually guaranteed... Definitely a separate issue from this PR though

Comment on lines +278 to +279
rg.set_page_offset(offset_index.clone());
offset_indexes.push(offset_index);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again this interface seems really confused - something for #2530

Copy link
Contributor

@tustvold tustvold left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me, thank you. There is definitely some cleanup to do with the page index plumbing, but that is beyond the scope of this PR.

@tustvold tustvold merged commit 1eb6c45 into apache:master Aug 20, 2022
@ursabot
Copy link

ursabot commented Aug 20, 2022

Benchmark runs are scheduled for baseline = 0f45932 and contender = 1eb6c45. 1eb6c45 is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
Conbench compare runs links:
[Skipped ⚠️ Benchmarking of arrow-rs-commits is not supported on ec2-t3-xlarge-us-east-2] ec2-t3-xlarge-us-east-2
[Skipped ⚠️ Benchmarking of arrow-rs-commits is not supported on test-mac-arm] test-mac-arm
[Skipped ⚠️ Benchmarking of arrow-rs-commits is not supported on ursa-i9-9960x] ursa-i9-9960x
[Skipped ⚠️ Benchmarking of arrow-rs-commits is not supported on ursa-thinkcentre-m75q] ursa-thinkcentre-m75q
Buildkite builds:
Supported benchmarks:
ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True
test-mac-arm: Supported benchmark langs: C++, Python, R
ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
parquet Changes to the parquet crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Error Reading Page Index When Not Available Support Reading PageIndex with ParquetRecordBatchStream
4 participants