New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use offset index in ParquetRecordBatchStream #2526
Changes from 4 commits
1eabcab
8729e16
bc3a2e0
5481902
6304227
b82a26e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -78,7 +78,7 @@ | |
use std::collections::VecDeque; | ||
use std::fmt::Formatter; | ||
|
||
use std::io::SeekFrom; | ||
use std::io::{Cursor, SeekFrom}; | ||
use std::ops::Range; | ||
use std::pin::Pin; | ||
use std::sync::Arc; | ||
|
@@ -88,6 +88,8 @@ use bytes::{Buf, Bytes}; | |
use futures::future::{BoxFuture, FutureExt}; | ||
use futures::ready; | ||
use futures::stream::Stream; | ||
use parquet_format::{OffsetIndex, PageLocation}; | ||
use thrift::protocol::TCompactInputProtocol; | ||
|
||
use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt}; | ||
|
||
|
@@ -96,8 +98,8 @@ use arrow::record_batch::RecordBatch; | |
|
||
use crate::arrow::array_reader::{build_array_reader, RowGroupCollection}; | ||
use crate::arrow::arrow_reader::{ | ||
evaluate_predicate, selects_any, ArrowReaderBuilder, ParquetRecordBatchReader, | ||
RowFilter, RowSelection, | ||
evaluate_predicate, selects_any, ArrowReaderBuilder, ArrowReaderOptions, | ||
ParquetRecordBatchReader, RowFilter, RowSelection, | ||
}; | ||
use crate::arrow::ProjectionMask; | ||
|
||
|
@@ -108,6 +110,8 @@ use crate::file::footer::{decode_footer, decode_metadata}; | |
use crate::file::metadata::{ParquetMetaData, RowGroupMetaData}; | ||
use crate::file::reader::{ChunkReader, Length, SerializedPageReader}; | ||
|
||
use crate::file::page_index::index::Index; | ||
use crate::file::page_index::index_reader; | ||
use crate::file::FOOTER_SIZE; | ||
|
||
use crate::schema::types::{ColumnDescPtr, SchemaDescPtr}; | ||
|
@@ -139,6 +143,80 @@ pub trait AsyncFileReader: Send { | |
/// allowing fine-grained control over how metadata is sourced, in particular allowing | ||
/// for caching, pre-fetching, catalog metadata, etc... | ||
fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>>; | ||
|
||
/// Provides asynchronous access to the the page index for each column chunk in a | ||
/// row group. Will panic if `row_group_idx` is greater than or equal to `num_row_groups` | ||
fn get_column_indexes( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure if we should make this a separate constructor or just have it always use the index. If the index doesn't exist, the cost of determining that is minimal. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure of the value of exposing these on |
||
&mut self, | ||
metadata: Arc<ParquetMetaData>, | ||
row_group_idx: usize, | ||
) -> BoxFuture<'_, Result<Vec<Index>>> { | ||
async move { | ||
let chunks = metadata.row_group(row_group_idx).columns(); | ||
|
||
let (offset, lengths) = index_reader::get_index_offset_and_lengths(chunks)?; | ||
let length = lengths.iter().sum::<usize>(); | ||
|
||
if length == 0 { | ||
return Ok(vec![Index::NONE; chunks.len()]); | ||
} | ||
|
||
//read all need data into buffer | ||
let data = self | ||
.get_bytes(offset as usize..offset as usize + length) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This will perform separate get_bytes requests to fetch the page index and column index information for each column chunk. This is likely not a good idea, especially since this will be performed serially. Ideally we would identify the ranges of all the index information, and then call get_byte_ranges, this will allow coalescing proximate requests to ObjectStore, paralell fetch, etc... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, that makes sense. |
||
.await?; | ||
|
||
let mut start = 0; | ||
let data = lengths.into_iter().map(|length| { | ||
let r = data.slice(start..start + length); | ||
start += length; | ||
r | ||
}); | ||
|
||
chunks | ||
.iter() | ||
.zip(data) | ||
.map(|(chunk, data)| { | ||
let column_type = chunk.column_type(); | ||
index_reader::deserialize_column_index(&data, column_type) | ||
}) | ||
.collect() | ||
} | ||
.boxed() | ||
} | ||
|
||
/// Provides asynchronous access to the the offset index for each column chunk in a | ||
/// row group. Will panic if `row_group_idx` is greater than or equal to `num_row_groups` | ||
fn get_page_locations( | ||
&mut self, | ||
metadata: Arc<ParquetMetaData>, | ||
row_group_idx: usize, | ||
) -> BoxFuture<'_, Result<Option<Vec<Vec<PageLocation>>>>> { | ||
async move { | ||
let chunks = metadata.row_group(row_group_idx).columns(); | ||
|
||
let (offset, total_length) = | ||
index_reader::get_location_offset_and_total_length(chunks)?; | ||
|
||
if total_length == 0 { | ||
return Ok(None); | ||
} | ||
|
||
let data = self | ||
.get_bytes(offset as usize..offset as usize + total_length) | ||
.await?; | ||
let mut d = Cursor::new(data); | ||
let mut result = vec![]; | ||
|
||
for _ in 0..chunks.len() { | ||
let mut prot = TCompactInputProtocol::new(&mut d); | ||
let offset = OffsetIndex::read_from_in_protocol(&mut prot)?; | ||
result.push(offset.page_locations); | ||
} | ||
Ok(Some(result)) | ||
} | ||
.boxed() | ||
} | ||
} | ||
|
||
impl AsyncFileReader for Box<dyn AsyncFileReader> { | ||
|
@@ -218,6 +296,36 @@ impl<T: AsyncFileReader + Send + 'static> ArrowReaderBuilder<AsyncReader<T>> { | |
Self::new_builder(AsyncReader(input), metadata, Default::default()) | ||
} | ||
|
||
pub async fn new_with_index(mut input: T) -> Result<Self> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it would be more consistent to have |
||
let metadata = input.get_metadata().await?; | ||
|
||
let mut row_groups = metadata.row_groups().to_vec(); | ||
|
||
let mut columns_indexes = vec![]; | ||
let mut offset_indexes = vec![]; | ||
|
||
for (idx, rg) in row_groups.iter_mut().enumerate() { | ||
let column_index = input.get_column_indexes(metadata.clone(), idx).await?; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should check if the column index has already been fetched to the metadata, and not fetch it again if it is already present |
||
|
||
columns_indexes.push(column_index); | ||
if let Some(offset_index) = | ||
input.get_page_locations(metadata.clone(), idx).await? | ||
{ | ||
rg.set_page_offset(offset_index.clone()); | ||
offset_indexes.push(offset_index); | ||
Comment on lines
+278
to
+279
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Again this interface seems really confused - something for #2530 |
||
} | ||
} | ||
|
||
let metadata = Arc::new(ParquetMetaData::new_with_page_index( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not part of this PR, but I still feel something is off with the way the index information is located on ParquetMetadata... Edit: filed #2530 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, I agree. |
||
metadata.file_metadata().clone(), | ||
row_groups, | ||
Some(columns_indexes), | ||
Some(offset_indexes), | ||
)); | ||
|
||
Self::new_builder(AsyncReader(input), metadata, ArrowReaderOptions::new()) | ||
} | ||
|
||
/// Build a new [`ParquetRecordBatchStream`] | ||
pub fn build(self) -> Result<ParquetRecordBatchStream<T>> { | ||
let num_row_groups = self.metadata.row_groups().len(); | ||
|
@@ -493,13 +601,26 @@ impl<'a> InMemoryRowGroup<'a> { | |
let fetch_ranges = self | ||
.column_chunks | ||
.iter() | ||
.zip(self.metadata.columns()) | ||
.enumerate() | ||
.into_iter() | ||
.filter_map(|(idx, chunk)| { | ||
.filter_map(|(idx, (chunk, chunk_meta))| { | ||
(chunk.is_none() && projection.leaf_included(idx)).then(|| { | ||
let ranges = selection.scan_ranges(&page_locations[idx]); | ||
// If the first page does not start at the beginning of the column, | ||
// then we need to also fetch a dictionary page. | ||
let mut ranges = vec![]; | ||
let (start, _len) = chunk_meta.byte_range(); | ||
match page_locations[idx].first() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Discovered this bug as well. We weren't fetching a dictionary page if it existed |
||
Some(first) if first.offset as u64 != start => { | ||
ranges.push(start as usize..first.offset as usize); | ||
} | ||
_ => (), | ||
} | ||
|
||
ranges.extend(selection.scan_ranges(&page_locations[idx])); | ||
page_start_offsets | ||
.push(ranges.iter().map(|range| range.start).collect()); | ||
|
||
ranges | ||
}) | ||
}) | ||
|
@@ -687,7 +808,6 @@ mod tests { | |
use crate::file::page_index::index_reader; | ||
use arrow::array::{Array, ArrayRef, Int32Array, StringArray}; | ||
use arrow::error::Result as ArrowResult; | ||
|
||
use futures::TryStreamExt; | ||
use std::sync::Mutex; | ||
|
||
|
@@ -763,6 +883,68 @@ mod tests { | |
); | ||
} | ||
|
||
#[tokio::test] | ||
async fn test_async_reader_with_index() { | ||
let testdata = arrow::util::test_util::parquet_test_data(); | ||
let path = format!("{}/alltypes_tiny_pages_plain.parquet", testdata); | ||
let data = Bytes::from(std::fs::read(path).unwrap()); | ||
|
||
let metadata = parse_metadata(&data).unwrap(); | ||
let metadata = Arc::new(metadata); | ||
|
||
assert_eq!(metadata.num_row_groups(), 1); | ||
|
||
let async_reader = TestReader { | ||
data: data.clone(), | ||
metadata: metadata.clone(), | ||
requests: Default::default(), | ||
}; | ||
|
||
let builder = ParquetRecordBatchStreamBuilder::new_with_index(async_reader) | ||
.await | ||
.unwrap(); | ||
|
||
// The builder should have page and offset indexes loaded now | ||
let metadata_with_index = builder.metadata(); | ||
|
||
// Check offset indexes are present for all columns | ||
for rg in metadata_with_index.row_groups() { | ||
let page_locations = rg | ||
.page_offset_index() | ||
.as_ref() | ||
.expect("expected page offset index"); | ||
assert_eq!(page_locations.len(), rg.columns().len()) | ||
} | ||
|
||
// Check page indexes are present for all columns | ||
let page_indexes = metadata_with_index | ||
.page_indexes() | ||
.expect("expected page indexes"); | ||
for (idx, rg) in metadata_with_index.row_groups().iter().enumerate() { | ||
assert_eq!(page_indexes[idx].len(), rg.columns().len()) | ||
} | ||
|
||
let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]); | ||
let stream = builder | ||
.with_projection(mask.clone()) | ||
.with_batch_size(1024) | ||
.build() | ||
.unwrap(); | ||
|
||
let async_batches: Vec<_> = stream.try_collect().await.unwrap(); | ||
|
||
let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data) | ||
.unwrap() | ||
.with_projection(mask) | ||
.with_batch_size(1024) | ||
.build() | ||
.unwrap() | ||
.collect::<ArrowResult<Vec<_>>>() | ||
.unwrap(); | ||
|
||
assert_eq!(async_batches, sync_batches); | ||
} | ||
|
||
#[tokio::test] | ||
async fn test_row_filter() { | ||
let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]); | ||
|
@@ -832,6 +1014,54 @@ mod tests { | |
assert_eq!(requests.lock().unwrap().len(), 3); | ||
} | ||
|
||
#[tokio::test] | ||
async fn test_row_filter_with_index() { | ||
let testdata = arrow::util::test_util::parquet_test_data(); | ||
let path = format!("{}/alltypes_tiny_pages_plain.parquet", testdata); | ||
let data = Bytes::from(std::fs::read(path).unwrap()); | ||
|
||
let metadata = parse_metadata(&data).unwrap(); | ||
let parquet_schema = metadata.file_metadata().schema_descr_ptr(); | ||
let metadata = Arc::new(metadata); | ||
|
||
assert_eq!(metadata.num_row_groups(), 1); | ||
|
||
let async_reader = TestReader { | ||
data: data.clone(), | ||
metadata: metadata.clone(), | ||
requests: Default::default(), | ||
}; | ||
|
||
let a_filter = ArrowPredicateFn::new( | ||
ProjectionMask::leaves(&parquet_schema, vec![1]), | ||
|batch| arrow::compute::eq_dyn_bool_scalar(batch.column(0), true), | ||
); | ||
|
||
let b_filter = ArrowPredicateFn::new( | ||
ProjectionMask::leaves(&parquet_schema, vec![2]), | ||
|batch| arrow::compute::eq_dyn_scalar(batch.column(0), 2_i32), | ||
); | ||
|
||
let filter = RowFilter::new(vec![Box::new(a_filter), Box::new(b_filter)]); | ||
|
||
let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 2]); | ||
|
||
let stream = ParquetRecordBatchStreamBuilder::new_with_index(async_reader) | ||
.await | ||
.unwrap() | ||
.with_projection(mask.clone()) | ||
.with_batch_size(1024) | ||
.with_row_filter(filter) | ||
.build() | ||
.unwrap(); | ||
|
||
let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap(); | ||
|
||
let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); | ||
|
||
assert_eq!(total_rows, 730); | ||
} | ||
|
||
#[tokio::test] | ||
async fn test_in_memory_row_group_sparse() { | ||
let testdata = arrow::util::test_util::parquet_test_data(); | ||
|
@@ -882,7 +1112,7 @@ mod tests { | |
let mut skip = true; | ||
let mut pages = offset_index[0].iter().peekable(); | ||
|
||
// Setup `RowSelection` so that we can skip every other page | ||
// Setup `RowSelection` so that we can skip every other page, selecting the last page | ||
let mut selectors = vec![]; | ||
let mut expected_page_requests: Vec<Range<usize>> = vec![]; | ||
while let Some(page) = pages.next() { | ||
|
@@ -906,7 +1136,7 @@ mod tests { | |
let selection = RowSelection::from(selectors); | ||
|
||
let (_factory, _reader) = reader_factory | ||
.read_row_group(0, Some(selection), projection, 48) | ||
.read_row_group(0, Some(selection), projection.clone(), 48) | ||
.await | ||
.expect("reading row group"); | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What are the implications of this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before we would break if we were on the last page. So if you skipped from inside the second to last page into the last page, then this would short circuit and the last page range wouldn't be selected.