Skip to content

Commit

Permalink
Add get_byte_ranges method to AsyncFileReader trait (#2115)
Browse files Browse the repository at this point in the history
* Add get_byte_ranges method to AsyncFileReader trait

* Remove overhead

* linting

* pr comments
  • Loading branch information
thinkharderdev committed Jul 21, 2022
1 parent 3096591 commit be0d34d
Showing 1 changed file with 48 additions and 12 deletions.
60 changes: 48 additions & 12 deletions parquet/src/arrow/async_reader.rs
Expand Up @@ -77,6 +77,7 @@

use std::collections::VecDeque;
use std::fmt::Formatter;

use std::io::{Cursor, SeekFrom};
use std::ops::Range;
use std::pin::Pin;
Expand Down Expand Up @@ -111,6 +112,27 @@ pub trait AsyncFileReader {
/// Retrieve the bytes in `range`
fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>>;

/// Retrieve multiple byte ranges. The default implementation will call `get_bytes` sequentially
fn get_byte_ranges(
&mut self,
ranges: Vec<Range<usize>>,
) -> BoxFuture<'_, Result<Vec<Bytes>>>
where
Self: Send,
{
async move {
let mut result = Vec::with_capacity(ranges.len());

for range in ranges.into_iter() {
let data = self.get_bytes(range).await?;
result.push(data);
}

Ok(result)
}
.boxed()
}

/// Provides asynchronous access to the [`ParquetMetaData`] of a parquet file,
/// allowing fine-grained control over how metadata is sourced, in particular allowing
/// for caching, pre-fetching, catalog metadata, etc...
Expand Down Expand Up @@ -367,24 +389,38 @@ where
vec![None; row_group_metadata.columns().len()];

// TODO: Combine consecutive ranges
let fetch_ranges = (0..column_chunks.len())
.into_iter()
.filter_map(|idx| {
if !projection.leaf_included(idx) {
None
} else {
let column = row_group_metadata.column(idx);
let (start, length) = column.byte_range();

Some(start as usize..(start + length) as usize)
}
})
.collect();

let mut chunk_data =
input.get_byte_ranges(fetch_ranges).await?.into_iter();

for (idx, chunk) in column_chunks.iter_mut().enumerate() {
if !projection.leaf_included(idx) {
continue;
}

let column = row_group_metadata.column(idx);
let (start, length) = column.byte_range();

let data = input
.get_bytes(start as usize..(start + length) as usize)
.await?;

*chunk = Some(InMemoryColumnChunk {
num_values: column.num_values(),
compression: column.compression(),
physical_type: column.column_type(),
data,
});

if let Some(data) = chunk_data.next() {
*chunk = Some(InMemoryColumnChunk {
num_values: column.num_values(),
compression: column.compression(),
physical_type: column.column_type(),
data,
});
}
}

Ok((
Expand Down

0 comments on commit be0d34d

Please sign in to comment.