Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support get_row_group in AsyncFileReader #3851

Closed
Ted-Jiang opened this issue Mar 13, 2023 · 9 comments
Closed

Support get_row_group in AsyncFileReader #3851

Ted-Jiang opened this issue Mar 13, 2023 · 9 comments
Labels
enhancement Any new improvement worthy of a entry in the changelog

Comments

@Ted-Jiang
Copy link
Member

Ted-Jiang commented Mar 13, 2023

Is your feature request related to a problem or challenge? Please describe what you are trying to do.
When i implementing apache/datafusion#4512
I found in AsyncFileReader(df used) can not get the specific RowGroupReader

If i got the RowGroupReader then call get_column_bloom_filter will return the bloomFilter

pub trait FileReader: Send + Sync {
/// Get metadata information about this file.
fn metadata(&self) -> &ParquetMetaData;
/// Get the total number of row groups for this file.
fn num_row_groups(&self) -> usize;
/// Get the `i`th row group reader. Note this doesn't do bound check.
fn get_row_group(&self, i: usize) -> Result<Box<dyn RowGroupReader + '_>>;
/// Get full iterator of `Row`s from a file (over all row groups).
///
/// Iterator will automatically load the next row group to advance.
///
/// Projected schema can be a subset of or equal to the file schema, when it is None,
/// full file schema is assumed.
fn get_row_iter(&self, projection: Option<SchemaType>) -> Result<RowIter>;
}

async version:
pub trait AsyncFileReader: Send {
/// Retrieve the bytes in `range`
fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>>;
/// Retrieve multiple byte ranges. The default implementation will call `get_bytes` sequentially
fn get_byte_ranges(
&mut self,
ranges: Vec<Range<usize>>,
) -> BoxFuture<'_, Result<Vec<Bytes>>> {
async move {
let mut result = Vec::with_capacity(ranges.len());
for range in ranges.into_iter() {
let data = self.get_bytes(range).await?;
result.push(data);
}
Ok(result)
}
.boxed()
}

I think they should be consistent, Is there any other reason not supported?
Describe the solution you'd like

So i try to create a new struct AsyncRowGroupReader

Describe alternatives you've considered

Additional context

@Ted-Jiang Ted-Jiang added the enhancement Any new improvement worthy of a entry in the changelog label Mar 13, 2023
@Ted-Jiang
Copy link
Member Author

@tustvold @alamb Could you take a look, is this a reasonable change 🤔 ?

@tustvold
Copy link
Contributor

I don't think it is that simple as the IO to fetch the bloom filter data needs to be done ahead of time. I'll have a think over the next couple of days and write up how to support this

@Ted-Jiang
Copy link
Member Author

Ted-Jiang commented Mar 13, 2023

btw, i think should support read the special bloom filter by row_group_id and col_id unlike page Index.

@madgene
Copy link

madgene commented Apr 9, 2023

@Ted-Jiang 如果用SerializedFileReader先把parquet文件中的所有bloom filter数据都读取出来,然后调用prune_row_groups_by_bloom_filter方法做prune,PruningPredicate::prune中首先创建一个RecordBatch再通过self.predicate_expr.evaluate(&statistics_batch)方法。请教一下怎么用bloom filter数据构建RecordBatch再evaluate,或者是通过什么别的方式能够evaluate?

@Ted-Jiang
Copy link
Member Author

@Ted-Jiang 如果用SerializedFileReader先把parquet文件中的所有bloom filter数据都读取出来,然后调用prune_row_groups_by_bloom_filter方法做prune,PruningPredicate::prune中首先创建一个RecordBatch再通过self.predicate_expr.evaluate(&statistics_batch)方法。请教一下怎么用bloom filter数据构建RecordBatch再evaluate,或者是通过什么别的方式能够evaluate?

Sorry,plz communicate in English

@madgene
Copy link

madgene commented Apr 11, 2023

@Ted-Jiang Perhaps we can prune rowgroups by invoking prune_row_groups_by_bloom_filter method with all bloom filter data which is read with SerializedFileReader. In PruningPredicate::prune, a RecordBatch is first created and then evaluated using the self.predicate_expr.evaluate(&statistics_batch) method. Could you please advise me how to construct a RecordBatch using bloom filter data and then evaluate it, or is there any other way to evaluate?

@hengfeiyang
Copy link
Contributor

@tustvold Any suggestions for this? we want to read the bloom filter in Datafusion. apache/datafusion#4512

@tustvold
Copy link
Contributor

tustvold commented Sep 20, 2023

The simplest option is likely to add an async function to ParquetRecordBatchStreamBuilder that fetches and decodes the bloom filters for a given column and row group, and returns this.

We likely don't want to eagerly fetch and decode bloom filters as the formulation parquet uses tends to be very large, and so we should only pay for this after other forms of pruning and if we have an equality predicate on that column.

@tustvold
Copy link
Contributor

Closed by #4917

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement Any new improvement worthy of a entry in the changelog
Projects
None yet
Development

No branches or pull requests

4 participants