New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use offset index in ParquetRecordBatchStream #2526
Changes from all commits
1eabcab
8729e16
bc3a2e0
5481902
6304227
b82a26e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -78,7 +78,7 @@ | |
use std::collections::VecDeque; | ||
use std::fmt::Formatter; | ||
|
||
use std::io::SeekFrom; | ||
use std::io::{Cursor, SeekFrom}; | ||
use std::ops::Range; | ||
use std::pin::Pin; | ||
use std::sync::Arc; | ||
|
@@ -88,6 +88,8 @@ use bytes::{Buf, Bytes}; | |
use futures::future::{BoxFuture, FutureExt}; | ||
use futures::ready; | ||
use futures::stream::Stream; | ||
use parquet_format::OffsetIndex; | ||
use thrift::protocol::TCompactInputProtocol; | ||
|
||
use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt}; | ||
|
||
|
@@ -96,8 +98,8 @@ use arrow::record_batch::RecordBatch; | |
|
||
use crate::arrow::array_reader::{build_array_reader, RowGroupCollection}; | ||
use crate::arrow::arrow_reader::{ | ||
evaluate_predicate, selects_any, ArrowReaderBuilder, ParquetRecordBatchReader, | ||
RowFilter, RowSelection, | ||
evaluate_predicate, selects_any, ArrowReaderBuilder, ArrowReaderOptions, | ||
ParquetRecordBatchReader, RowFilter, RowSelection, | ||
}; | ||
use crate::arrow::ProjectionMask; | ||
|
||
|
@@ -108,6 +110,7 @@ use crate::file::footer::{decode_footer, decode_metadata}; | |
use crate::file::metadata::{ParquetMetaData, RowGroupMetaData}; | ||
use crate::file::reader::{ChunkReader, Length, SerializedPageReader}; | ||
|
||
use crate::file::page_index::index_reader; | ||
use crate::file::FOOTER_SIZE; | ||
|
||
use crate::schema::types::{ColumnDescPtr, SchemaDescPtr}; | ||
|
@@ -218,6 +221,96 @@ impl<T: AsyncFileReader + Send + 'static> ArrowReaderBuilder<AsyncReader<T>> { | |
Self::new_builder(AsyncReader(input), metadata, Default::default()) | ||
} | ||
|
||
pub async fn new_with_options( | ||
mut input: T, | ||
options: ArrowReaderOptions, | ||
) -> Result<Self> { | ||
let mut metadata = input.get_metadata().await?; | ||
|
||
if options.page_index | ||
&& metadata | ||
.page_indexes() | ||
.zip(metadata.offset_indexes()) | ||
.is_none() | ||
{ | ||
let mut fetch_ranges = vec![]; | ||
let mut index_lengths: Vec<Vec<usize>> = vec![]; | ||
|
||
for rg in metadata.row_groups() { | ||
let (loc_offset, loc_length) = | ||
index_reader::get_location_offset_and_total_length(rg.columns())?; | ||
|
||
let (idx_offset, idx_lengths) = | ||
index_reader::get_index_offset_and_lengths(rg.columns())?; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It occurs to me that this method is making a pretty strong assumption that the column index data is contiguous, I'm not sure this is actually guaranteed... Definitely a separate issue from this PR though |
||
let idx_length = idx_lengths.iter().sum::<usize>(); | ||
|
||
// If index data is missing, return without any indexes | ||
if loc_length == 0 || idx_length == 0 { | ||
return Self::new_builder(AsyncReader(input), metadata, options); | ||
} | ||
|
||
fetch_ranges.push(loc_offset as usize..loc_offset as usize + loc_length); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this will read one col_index and page_location alternately. but they are written separately. I think if we not cache all bytes in memory, we should read whole col_index then page_location. /// Read on row group's all columns indexes and change into [`Index`]
/// If not the format not available return an empty vector.
pub fn read_columns_indexes<R: ChunkReader>(
reader: &R,
chunks: &[ColumnChunkMetaData],
) -> Result<Vec<Index>, ParquetError> { There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This can't use read_columns_indexes as this needs to asynchronously fetch the bytes There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh! Thanks! but we can still try combine all There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The onus is on |
||
fetch_ranges.push(idx_offset as usize..idx_offset as usize + idx_length); | ||
index_lengths.push(idx_lengths); | ||
} | ||
|
||
let mut chunks = input.get_byte_ranges(fetch_ranges).await?.into_iter(); | ||
let mut index_lengths = index_lengths.into_iter(); | ||
|
||
let mut row_groups = metadata.row_groups().to_vec(); | ||
|
||
let mut columns_indexes = vec![]; | ||
let mut offset_indexes = vec![]; | ||
|
||
for rg in row_groups.iter_mut() { | ||
let columns = rg.columns(); | ||
|
||
let location_data = chunks.next().unwrap(); | ||
let mut cursor = Cursor::new(location_data); | ||
let mut offset_index = vec![]; | ||
|
||
for _ in 0..columns.len() { | ||
let mut prot = TCompactInputProtocol::new(&mut cursor); | ||
let offset = OffsetIndex::read_from_in_protocol(&mut prot)?; | ||
offset_index.push(offset.page_locations); | ||
} | ||
|
||
rg.set_page_offset(offset_index.clone()); | ||
offset_indexes.push(offset_index); | ||
Comment on lines
+278
to
+279
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Again this interface seems really confused - something for #2530 |
||
|
||
let index_data = chunks.next().unwrap(); | ||
let index_lengths = index_lengths.next().unwrap(); | ||
|
||
let mut start = 0; | ||
let data = index_lengths.into_iter().map(|length| { | ||
let r = index_data.slice(start..start + length); | ||
start += length; | ||
r | ||
}); | ||
|
||
let indexes = rg | ||
.columns() | ||
.iter() | ||
.zip(data) | ||
.map(|(column, data)| { | ||
let column_type = column.column_type(); | ||
index_reader::deserialize_column_index(&data, column_type) | ||
}) | ||
.collect::<Result<Vec<_>>>()?; | ||
columns_indexes.push(indexes); | ||
} | ||
|
||
metadata = Arc::new(ParquetMetaData::new_with_page_index( | ||
metadata.file_metadata().clone(), | ||
row_groups, | ||
Some(columns_indexes), | ||
Some(offset_indexes), | ||
)); | ||
} | ||
|
||
Self::new_builder(AsyncReader(input), metadata, options) | ||
} | ||
|
||
/// Build a new [`ParquetRecordBatchStream`] | ||
pub fn build(self) -> Result<ParquetRecordBatchStream<T>> { | ||
let num_row_groups = self.metadata.row_groups().len(); | ||
|
@@ -493,13 +586,26 @@ impl<'a> InMemoryRowGroup<'a> { | |
let fetch_ranges = self | ||
.column_chunks | ||
.iter() | ||
.zip(self.metadata.columns()) | ||
.enumerate() | ||
.into_iter() | ||
.filter_map(|(idx, chunk)| { | ||
.filter_map(|(idx, (chunk, chunk_meta))| { | ||
(chunk.is_none() && projection.leaf_included(idx)).then(|| { | ||
let ranges = selection.scan_ranges(&page_locations[idx]); | ||
// If the first page does not start at the beginning of the column, | ||
// then we need to also fetch a dictionary page. | ||
let mut ranges = vec![]; | ||
let (start, _len) = chunk_meta.byte_range(); | ||
match page_locations[idx].first() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Discovered this bug as well. We weren't fetching a dictionary page if it existed |
||
Some(first) if first.offset as u64 != start => { | ||
ranges.push(start as usize..first.offset as usize); | ||
} | ||
_ => (), | ||
} | ||
|
||
ranges.extend(selection.scan_ranges(&page_locations[idx])); | ||
page_start_offsets | ||
.push(ranges.iter().map(|range| range.start).collect()); | ||
|
||
ranges | ||
}) | ||
}) | ||
|
@@ -687,7 +793,6 @@ mod tests { | |
use crate::file::page_index::index_reader; | ||
use arrow::array::{Array, ArrayRef, Int32Array, StringArray}; | ||
use arrow::error::Result as ArrowResult; | ||
|
||
use futures::TryStreamExt; | ||
use std::sync::Mutex; | ||
|
||
|
@@ -763,6 +868,70 @@ mod tests { | |
); | ||
} | ||
|
||
#[tokio::test] | ||
async fn test_async_reader_with_index() { | ||
let testdata = arrow::util::test_util::parquet_test_data(); | ||
let path = format!("{}/alltypes_tiny_pages_plain.parquet", testdata); | ||
let data = Bytes::from(std::fs::read(path).unwrap()); | ||
|
||
let metadata = parse_metadata(&data).unwrap(); | ||
let metadata = Arc::new(metadata); | ||
|
||
assert_eq!(metadata.num_row_groups(), 1); | ||
|
||
let async_reader = TestReader { | ||
data: data.clone(), | ||
metadata: metadata.clone(), | ||
requests: Default::default(), | ||
}; | ||
|
||
let options = ArrowReaderOptions::new().with_page_index(true); | ||
let builder = | ||
ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options) | ||
.await | ||
.unwrap(); | ||
|
||
// The builder should have page and offset indexes loaded now | ||
let metadata_with_index = builder.metadata(); | ||
|
||
// Check offset indexes are present for all columns | ||
for rg in metadata_with_index.row_groups() { | ||
let page_locations = rg | ||
.page_offset_index() | ||
.as_ref() | ||
.expect("expected page offset index"); | ||
assert_eq!(page_locations.len(), rg.columns().len()) | ||
} | ||
|
||
// Check page indexes are present for all columns | ||
let page_indexes = metadata_with_index | ||
.page_indexes() | ||
.expect("expected page indexes"); | ||
for (idx, rg) in metadata_with_index.row_groups().iter().enumerate() { | ||
assert_eq!(page_indexes[idx].len(), rg.columns().len()) | ||
} | ||
|
||
let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]); | ||
let stream = builder | ||
.with_projection(mask.clone()) | ||
.with_batch_size(1024) | ||
.build() | ||
.unwrap(); | ||
|
||
let async_batches: Vec<_> = stream.try_collect().await.unwrap(); | ||
|
||
let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data) | ||
.unwrap() | ||
.with_projection(mask) | ||
.with_batch_size(1024) | ||
.build() | ||
.unwrap() | ||
.collect::<ArrowResult<Vec<_>>>() | ||
.unwrap(); | ||
|
||
assert_eq!(async_batches, sync_batches); | ||
} | ||
|
||
#[tokio::test] | ||
async fn test_row_filter() { | ||
let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]); | ||
|
@@ -832,6 +1001,56 @@ mod tests { | |
assert_eq!(requests.lock().unwrap().len(), 3); | ||
} | ||
|
||
#[tokio::test] | ||
async fn test_row_filter_with_index() { | ||
let testdata = arrow::util::test_util::parquet_test_data(); | ||
let path = format!("{}/alltypes_tiny_pages_plain.parquet", testdata); | ||
let data = Bytes::from(std::fs::read(path).unwrap()); | ||
|
||
let metadata = parse_metadata(&data).unwrap(); | ||
let parquet_schema = metadata.file_metadata().schema_descr_ptr(); | ||
let metadata = Arc::new(metadata); | ||
|
||
assert_eq!(metadata.num_row_groups(), 1); | ||
|
||
let async_reader = TestReader { | ||
data: data.clone(), | ||
metadata: metadata.clone(), | ||
requests: Default::default(), | ||
}; | ||
|
||
let a_filter = ArrowPredicateFn::new( | ||
ProjectionMask::leaves(&parquet_schema, vec![1]), | ||
|batch| arrow::compute::eq_dyn_bool_scalar(batch.column(0), true), | ||
); | ||
|
||
let b_filter = ArrowPredicateFn::new( | ||
ProjectionMask::leaves(&parquet_schema, vec![2]), | ||
|batch| arrow::compute::eq_dyn_scalar(batch.column(0), 2_i32), | ||
); | ||
|
||
let filter = RowFilter::new(vec![Box::new(a_filter), Box::new(b_filter)]); | ||
|
||
let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 2]); | ||
|
||
let options = ArrowReaderOptions::new().with_page_index(true); | ||
let stream = | ||
ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options) | ||
.await | ||
.unwrap() | ||
.with_projection(mask.clone()) | ||
.with_batch_size(1024) | ||
.with_row_filter(filter) | ||
.build() | ||
.unwrap(); | ||
|
||
let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap(); | ||
|
||
let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); | ||
|
||
assert_eq!(total_rows, 730); | ||
} | ||
|
||
#[tokio::test] | ||
async fn test_in_memory_row_group_sparse() { | ||
let testdata = arrow::util::test_util::parquet_test_data(); | ||
|
@@ -882,7 +1101,7 @@ mod tests { | |
let mut skip = true; | ||
let mut pages = offset_index[0].iter().peekable(); | ||
|
||
// Setup `RowSelection` so that we can skip every other page | ||
// Setup `RowSelection` so that we can skip every other page, selecting the last page | ||
let mut selectors = vec![]; | ||
let mut expected_page_requests: Vec<Range<usize>> = vec![]; | ||
while let Some(page) = pages.next() { | ||
|
@@ -906,7 +1125,7 @@ mod tests { | |
let selection = RowSelection::from(selectors); | ||
|
||
let (_factory, _reader) = reader_factory | ||
.read_row_group(0, Some(selection), projection, 48) | ||
.read_row_group(0, Some(selection), projection.clone(), 48) | ||
.await | ||
.expect("reading row group"); | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -312,7 +312,7 @@ where | |
|
||
// If page has less rows than the remaining records to | ||
// be skipped, skip entire page | ||
if metadata.num_rows < remaining { | ||
if metadata.num_rows <= remaining { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Another bug I uncovered while testing. This causes the reader to try and fetch the pages unnecessarily which are not pre-fetched |
||
self.page_reader.skip_next_page()?; | ||
remaining -= metadata.num_rows; | ||
continue; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What are the implications of this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before we would break if we were on the last page. So if you skipped from inside the second to last page into the last page, then this would short circuit and the last page range wouldn't be selected.