Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Permit parallel fetching of column chunks in ParquetRecordBatchStream #2110

Closed
Tracked by #3462
thinkharderdev opened this issue Jul 19, 2022 · 11 comments · Fixed by #2115
Closed
Tracked by #3462

Permit parallel fetching of column chunks in ParquetRecordBatchStream #2110

thinkharderdev opened this issue Jul 19, 2022 · 11 comments · Fixed by #2115
Labels
enhancement Any new improvement worthy of a entry in the changelog parquet Changes to the parquet crate

Comments

@thinkharderdev
Copy link
Contributor

Is your feature request related to a problem or challenge? Please describe what you are trying to do.
A clear and concise description of what the problem is. Ex. I'm always frustrated when [...]
(This section helps Arrow developers understand the context and why for this feature, in addition to the what)

We recently rebased our project onto DataFusion's latest and we've seen a pretty big performance degradation. The issue is that we've lost the ability to prefetch entire files from object storage with the new ObjectStore interface. The buffered prefetch has been moved into ParquetRecordBatchStream but in a way that doesn't work particularly well for object storage (at least in our case).

The main issues that we've seen are:

  1. Before, we would read 64k from the end of the file optimistically when fetching metadata but now we do separate range requests for the footer (8 bytes) to then fetch the metadata. Fetching 8 bytes from S3 takes about 80-90ms so when we are scanning a lot of objects this can add significantly to the execution time.
  2. We are prefetching entire column chunks (which is better than fetching page-by-page) but we are fetching the column chunks sequentially.

What we found was that (at least with parquet files on the order of 100-200MB) it was much more efficient to just fetch the entire object into memory. All else equal it is of course better to read less from object storage but if we can't do it in one shot (or maybe two shots) the cost of the extra GET requests is going to significantly outweigh the benefit of fetching less data.

Describe the solution you'd like
A clear and concise description of what you want to happen.

I think there are couple things we can do:

  1. Optimistically try and fetch the metadata in one shot. We can pass a metadata_size_hint in as a param maybe to allow users to provide information about the expected size. And maybe it defaults to 64k as it was before.
  2. Fetch the column chunks in parallel instead of sequentially. This can either be handled in ParquetRecordBatchStream or maybe added into the ObjectStore trait by adding a get_ranges(&self, location: &Path, ranges: Vec<Range<usize>>) -> Result<Bytes> method.

Describe alternatives you've considered
A clear and concise description of any alternative solutions or features you've considered.

We could leave things as they are

Additional context
Add any other context or screenshots about the feature request here.

@thinkharderdev thinkharderdev added the enhancement Any new improvement worthy of a entry in the changelog label Jul 19, 2022
@thinkharderdev
Copy link
Contributor Author

@tustvold @alamb

@tustvold
Copy link
Contributor

I think prior to fetching in parallel I would suggest the following:

  • Add a min_fetch_bytes parameter, with it falling back to fetching the entire row group or file, if it is smaller than this threshold
  • Add the metadata_size_hint you suggest
  • Coalesce adjacent byte ranges into a single request, potentially allowing gaps of some configurable threshold

@thinkharderdev
Copy link
Contributor Author

Why would you fall back to fetching the entire row group (or file) instead of fetching in parallel. For object storage fetching the entire file would probably be implemented as parallel range requests anyway.

@tustvold
Copy link
Contributor

tustvold commented Jul 19, 2022

Mainly because there is a monetary cost associated with each S3 range request. I'm also not sure that fetching in parallel will actually be faster, as I would expect a single request to be able to saturate IO easily, fetching in parallel may just add complexity, cost, and potentially contention.

@thinkharderdev
Copy link
Contributor Author

Right, there is definitely a cost but it's typically well worth it. In our workloads we parallelize object GETs into multiple 8-16MB range requests. That is good for 1-3x improvement in downloads speeds under real-world conditions.

@tustvold
Copy link
Contributor

Provided the maximum concurrency is configurable, sounds good to me. I look forward to experimenting with it and measuring its impact

@thinkharderdev
Copy link
Contributor Author

@tustvold Ok, I understand why you were worried about contention. Sine we're dealing with AsyncFileRead we need a mutable reference so if we were to do this in arrow-rs we would need to wrap it in an Arc<Mutex<>> which entirely defeats the purpose...

Instead what if we add

pub trait AsyncFileReader {
  fn get_byte_ranges(&mut self, ranges: Vec<Range<usize>>) -> BoxFuture<'_, Result<Vec<Bytes>>>;
}

This makes things quite straightforward in ParquetRecordBatchStream and we can just delegate handling of parallelism (if any) to the implementer?

@alamb
Copy link
Contributor

alamb commented Jul 20, 2022

Related DataFusion PR: apache/datafusion#2946

@tustvold
Copy link
Contributor

tustvold commented Jul 20, 2022

I think adding a get_byte_ranges with a default implementation that falls back to serial sounds good to me. Then downstreams can override it if they wish to do so, and we don't introduce a breaking API change 👍

@alamb
Copy link
Contributor

alamb commented Jul 20, 2022

Sorry for being late to the party. Thanks for bringing this topic up @thinkharderdev

Optimistically try and fetch the metadata in one shot. We can pass a metadata_size_hint in as a param maybe to allow users to provide information about the expected size. And maybe it defaults to 64k as it was before.

I think this is a great idea (and I think @thinkharderdev has already done it here): apache/datafusion#2946

Fetch the column chunks in parallel instead of sequentially. This can either be handled in ParquetRecordBatchStream or maybe added into the ObjectStore trait by adding a get_ranges(&self, location: &Path, ranges: Vec<Range>) -> Result method.

Given that the optimal prefetch strategy is likely to vary project to project and with object store to object store, I feel like it will be challenging to put logic that works everywhere in DataFusion. Ideally in my mind DataFusion would provide enough information to allow downstream consumers to efficiently implement whatever prefetch strategy they wanted.

For example, perhaps we could implement something like PrefetchingObjectStore that would wrap an existing dyn ObjectStore and that implemented the concurrent download paradigm suggested by @thinkharderdev.

If adding get_byte_ranges is needed to allow implementing the required prefetching algorithm efficiently I am all for it 👍

@thinkharderdev
Copy link
Contributor Author

Sorry for being late to the party. Thanks for bringing this topic up @thinkharderdev

Optimistically try and fetch the metadata in one shot. We can pass a metadata_size_hint in as a param maybe to allow users to provide information about the expected size. And maybe it defaults to 64k as it was before.

I think this is a great idea (and I think @thinkharderdev has already done it here): apache/arrow-datafusion#2946

Fetch the column chunks in parallel instead of sequentially. This can either be handled in ParquetRecordBatchStream or maybe added into the ObjectStore trait by adding a get_ranges(&self, location: &Path, ranges: Vec) -> Result method.

Given that the optimal prefetch strategy is likely to vary project to project and with object store to object store, I feel like it will be challenging to put logic that works everywhere in DataFusion. Ideally in my mind DataFusion would provide enough information to allow downstream consumers to efficiently implement whatever prefetch strategy they wanted.

For example, perhaps we could implement something like PrefetchingObjectStore that would wrap an existing dyn ObjectStore and that implemented the concurrent download paradigm suggested by @thinkharderdev.

If adding get_byte_ranges is needed to allow implementing the required prefetching algorithm efficiently I am all for it 👍

Yeah, I think from the point of view of arrow-rs this is the sensible way to go. It will however just push the problem one layer up the stack :). So in Datafusion we'll have to again decide whether to implement the parallel fetching directly in ParquetFileReader (which has many of the same problems as doing it here (in that we have to create a one-size-fits all solution or add extra configurations to allow some flexibility) or move it into the ObjectStore trait itself. I'll create another issue in Datafusion to discuss that though.

@alamb alamb added the parquet Changes to the parquet crate label Jul 21, 2022
@alamb alamb changed the title Parallel fetching of column chunks in ParquetRecordBatchStream Parallel fetching of column chunks in ParquetRecordBatchStream Jul 21, 2022
@alamb alamb changed the title Parallel fetching of column chunks in ParquetRecordBatchStream Permit Parallel fetching of column chunks in ParquetRecordBatchStream Jul 21, 2022
@alamb alamb changed the title Permit Parallel fetching of column chunks in ParquetRecordBatchStream Permit parallel fetching of column chunks in ParquetRecordBatchStream Jul 21, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement Any new improvement worthy of a entry in the changelog parquet Changes to the parquet crate
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants