Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use offset index in ParquetRecordBatchStream #2526

Merged
merged 6 commits into from Aug 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion parquet/src/arrow/arrow_reader/mod.rs
Expand Up @@ -207,7 +207,7 @@ pub trait ArrowReader {
#[derive(Debug, Clone, Default)]
pub struct ArrowReaderOptions {
skip_arrow_metadata: bool,
page_index: bool,
pub(crate) page_index: bool,
}

impl ArrowReaderOptions {
Expand Down
33 changes: 32 additions & 1 deletion parquet/src/arrow/arrow_reader/selection.rs
Expand Up @@ -162,7 +162,12 @@ impl RowSelection {
current_selector = selectors.next();
}
} else {
break;
if !(selector.skip || current_page_included) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are the implications of this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before we would break if we were on the last page. So if you skipped from inside the second to last page into the last page, then this would short circuit and the last page range wouldn't be selected.

let start = page.offset as usize;
let end = start + page.compressed_page_size as usize;
ranges.push(start..end);
}
current_selector = selectors.next()
}
}

Expand Down Expand Up @@ -564,5 +569,31 @@ mod tests {

// assert_eq!(mask, vec![false, true, true, false, true, true, true]);
assert_eq!(ranges, vec![10..20, 20..30, 40..50, 50..60, 60..70]);

let selection = RowSelection::from(vec![
// Skip first page
RowSelector::skip(10),
// Multiple selects in same page
RowSelector::select(3),
RowSelector::skip(3),
RowSelector::select(4),
// Select to page boundary
RowSelector::skip(5),
RowSelector::select(5),
// Skip full page past page boundary
RowSelector::skip(12),
// Select to final page bounday
RowSelector::select(12),
RowSelector::skip(1),
// Skip across final page boundary
RowSelector::skip(8),
// Select from final page
RowSelector::select(4),
]);

let ranges = selection.scan_ranges(&index);

// assert_eq!(mask, vec![false, true, true, false, true, true, true]);
assert_eq!(ranges, vec![10..20, 20..30, 40..50, 50..60, 60..70]);
}
}
235 changes: 227 additions & 8 deletions parquet/src/arrow/async_reader.rs
Expand Up @@ -78,7 +78,7 @@
use std::collections::VecDeque;
use std::fmt::Formatter;

use std::io::SeekFrom;
use std::io::{Cursor, SeekFrom};
use std::ops::Range;
use std::pin::Pin;
use std::sync::Arc;
Expand All @@ -88,6 +88,8 @@ use bytes::{Buf, Bytes};
use futures::future::{BoxFuture, FutureExt};
use futures::ready;
use futures::stream::Stream;
use parquet_format::OffsetIndex;
use thrift::protocol::TCompactInputProtocol;

use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};

Expand All @@ -96,8 +98,8 @@ use arrow::record_batch::RecordBatch;

use crate::arrow::array_reader::{build_array_reader, RowGroupCollection};
use crate::arrow::arrow_reader::{
evaluate_predicate, selects_any, ArrowReaderBuilder, ParquetRecordBatchReader,
RowFilter, RowSelection,
evaluate_predicate, selects_any, ArrowReaderBuilder, ArrowReaderOptions,
ParquetRecordBatchReader, RowFilter, RowSelection,
};
use crate::arrow::ProjectionMask;

Expand All @@ -108,6 +110,7 @@ use crate::file::footer::{decode_footer, decode_metadata};
use crate::file::metadata::{ParquetMetaData, RowGroupMetaData};
use crate::file::reader::{ChunkReader, Length, SerializedPageReader};

use crate::file::page_index::index_reader;
use crate::file::FOOTER_SIZE;

use crate::schema::types::{ColumnDescPtr, SchemaDescPtr};
Expand Down Expand Up @@ -218,6 +221,96 @@ impl<T: AsyncFileReader + Send + 'static> ArrowReaderBuilder<AsyncReader<T>> {
Self::new_builder(AsyncReader(input), metadata, Default::default())
}

pub async fn new_with_options(
mut input: T,
options: ArrowReaderOptions,
) -> Result<Self> {
let mut metadata = input.get_metadata().await?;

if options.page_index
&& metadata
.page_indexes()
.zip(metadata.offset_indexes())
.is_none()
{
let mut fetch_ranges = vec![];
let mut index_lengths: Vec<Vec<usize>> = vec![];

for rg in metadata.row_groups() {
let (loc_offset, loc_length) =
index_reader::get_location_offset_and_total_length(rg.columns())?;

let (idx_offset, idx_lengths) =
index_reader::get_index_offset_and_lengths(rg.columns())?;
Copy link
Contributor

@tustvold tustvold Aug 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It occurs to me that this method is making a pretty strong assumption that the column index data is contiguous, I'm not sure this is actually guaranteed... Definitely a separate issue from this PR though

let idx_length = idx_lengths.iter().sum::<usize>();

// If index data is missing, return without any indexes
if loc_length == 0 || idx_length == 0 {
return Self::new_builder(AsyncReader(input), metadata, options);
}

fetch_ranges.push(loc_offset as usize..loc_offset as usize + loc_length);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this will read one col_index and page_location alternately. but they are written separately.
https://github.com/apache/parquet-format/blob/master/doc/images/PageIndexLayout.png

I think if we not cache all bytes in memory, we should read whole col_index then page_location.
why not we use 🤔

/// Read on row group's all columns indexes and change into  [`Index`]
/// If not the format not available return an empty vector.
pub fn read_columns_indexes<R: ChunkReader>(
    reader: &R,
    chunks: &[ColumnChunkMetaData],
) -> Result<Vec<Index>, ParquetError> {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can't use read_columns_indexes as this needs to asynchronously fetch the bytes

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh! Thanks! but we can still try combine all col_index or page_location together separately

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The onus is on AsyncFileReader::get_ranges to handle this, e.g. ObjectStore::get_ranges does this already

fetch_ranges.push(idx_offset as usize..idx_offset as usize + idx_length);
index_lengths.push(idx_lengths);
}

let mut chunks = input.get_byte_ranges(fetch_ranges).await?.into_iter();
let mut index_lengths = index_lengths.into_iter();

let mut row_groups = metadata.row_groups().to_vec();

let mut columns_indexes = vec![];
let mut offset_indexes = vec![];

for rg in row_groups.iter_mut() {
let columns = rg.columns();

let location_data = chunks.next().unwrap();
let mut cursor = Cursor::new(location_data);
let mut offset_index = vec![];

for _ in 0..columns.len() {
let mut prot = TCompactInputProtocol::new(&mut cursor);
let offset = OffsetIndex::read_from_in_protocol(&mut prot)?;
offset_index.push(offset.page_locations);
}

rg.set_page_offset(offset_index.clone());
offset_indexes.push(offset_index);
Comment on lines +278 to +279
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again this interface seems really confused - something for #2530


let index_data = chunks.next().unwrap();
let index_lengths = index_lengths.next().unwrap();

let mut start = 0;
let data = index_lengths.into_iter().map(|length| {
let r = index_data.slice(start..start + length);
start += length;
r
});

let indexes = rg
.columns()
.iter()
.zip(data)
.map(|(column, data)| {
let column_type = column.column_type();
index_reader::deserialize_column_index(&data, column_type)
})
.collect::<Result<Vec<_>>>()?;
columns_indexes.push(indexes);
}

metadata = Arc::new(ParquetMetaData::new_with_page_index(
metadata.file_metadata().clone(),
row_groups,
Some(columns_indexes),
Some(offset_indexes),
));
}

Self::new_builder(AsyncReader(input), metadata, options)
}

/// Build a new [`ParquetRecordBatchStream`]
pub fn build(self) -> Result<ParquetRecordBatchStream<T>> {
let num_row_groups = self.metadata.row_groups().len();
Expand Down Expand Up @@ -493,13 +586,26 @@ impl<'a> InMemoryRowGroup<'a> {
let fetch_ranges = self
.column_chunks
.iter()
.zip(self.metadata.columns())
.enumerate()
.into_iter()
.filter_map(|(idx, chunk)| {
.filter_map(|(idx, (chunk, chunk_meta))| {
(chunk.is_none() && projection.leaf_included(idx)).then(|| {
let ranges = selection.scan_ranges(&page_locations[idx]);
// If the first page does not start at the beginning of the column,
// then we need to also fetch a dictionary page.
let mut ranges = vec![];
let (start, _len) = chunk_meta.byte_range();
match page_locations[idx].first() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discovered this bug as well. We weren't fetching a dictionary page if it existed

Some(first) if first.offset as u64 != start => {
ranges.push(start as usize..first.offset as usize);
}
_ => (),
}

ranges.extend(selection.scan_ranges(&page_locations[idx]));
page_start_offsets
.push(ranges.iter().map(|range| range.start).collect());

ranges
})
})
Expand Down Expand Up @@ -687,7 +793,6 @@ mod tests {
use crate::file::page_index::index_reader;
use arrow::array::{Array, ArrayRef, Int32Array, StringArray};
use arrow::error::Result as ArrowResult;

use futures::TryStreamExt;
use std::sync::Mutex;

Expand Down Expand Up @@ -763,6 +868,70 @@ mod tests {
);
}

#[tokio::test]
async fn test_async_reader_with_index() {
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{}/alltypes_tiny_pages_plain.parquet", testdata);
let data = Bytes::from(std::fs::read(path).unwrap());

let metadata = parse_metadata(&data).unwrap();
let metadata = Arc::new(metadata);

assert_eq!(metadata.num_row_groups(), 1);

let async_reader = TestReader {
data: data.clone(),
metadata: metadata.clone(),
requests: Default::default(),
};

let options = ArrowReaderOptions::new().with_page_index(true);
let builder =
ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
.await
.unwrap();

// The builder should have page and offset indexes loaded now
let metadata_with_index = builder.metadata();

// Check offset indexes are present for all columns
for rg in metadata_with_index.row_groups() {
let page_locations = rg
.page_offset_index()
.as_ref()
.expect("expected page offset index");
assert_eq!(page_locations.len(), rg.columns().len())
}

// Check page indexes are present for all columns
let page_indexes = metadata_with_index
.page_indexes()
.expect("expected page indexes");
for (idx, rg) in metadata_with_index.row_groups().iter().enumerate() {
assert_eq!(page_indexes[idx].len(), rg.columns().len())
}

let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
let stream = builder
.with_projection(mask.clone())
.with_batch_size(1024)
.build()
.unwrap();

let async_batches: Vec<_> = stream.try_collect().await.unwrap();

let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
.unwrap()
.with_projection(mask)
.with_batch_size(1024)
.build()
.unwrap()
.collect::<ArrowResult<Vec<_>>>()
.unwrap();

assert_eq!(async_batches, sync_batches);
}

#[tokio::test]
async fn test_row_filter() {
let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
Expand Down Expand Up @@ -832,6 +1001,56 @@ mod tests {
assert_eq!(requests.lock().unwrap().len(), 3);
}

#[tokio::test]
async fn test_row_filter_with_index() {
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{}/alltypes_tiny_pages_plain.parquet", testdata);
let data = Bytes::from(std::fs::read(path).unwrap());

let metadata = parse_metadata(&data).unwrap();
let parquet_schema = metadata.file_metadata().schema_descr_ptr();
let metadata = Arc::new(metadata);

assert_eq!(metadata.num_row_groups(), 1);

let async_reader = TestReader {
data: data.clone(),
metadata: metadata.clone(),
requests: Default::default(),
};

let a_filter = ArrowPredicateFn::new(
ProjectionMask::leaves(&parquet_schema, vec![1]),
|batch| arrow::compute::eq_dyn_bool_scalar(batch.column(0), true),
);

let b_filter = ArrowPredicateFn::new(
ProjectionMask::leaves(&parquet_schema, vec![2]),
|batch| arrow::compute::eq_dyn_scalar(batch.column(0), 2_i32),
);

let filter = RowFilter::new(vec![Box::new(a_filter), Box::new(b_filter)]);

let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 2]);

let options = ArrowReaderOptions::new().with_page_index(true);
let stream =
ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
.await
.unwrap()
.with_projection(mask.clone())
.with_batch_size(1024)
.with_row_filter(filter)
.build()
.unwrap();

let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();

let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();

assert_eq!(total_rows, 730);
}

#[tokio::test]
async fn test_in_memory_row_group_sparse() {
let testdata = arrow::util::test_util::parquet_test_data();
Expand Down Expand Up @@ -882,7 +1101,7 @@ mod tests {
let mut skip = true;
let mut pages = offset_index[0].iter().peekable();

// Setup `RowSelection` so that we can skip every other page
// Setup `RowSelection` so that we can skip every other page, selecting the last page
let mut selectors = vec![];
let mut expected_page_requests: Vec<Range<usize>> = vec![];
while let Some(page) = pages.next() {
Expand All @@ -906,7 +1125,7 @@ mod tests {
let selection = RowSelection::from(selectors);

let (_factory, _reader) = reader_factory
.read_row_group(0, Some(selection), projection, 48)
.read_row_group(0, Some(selection), projection.clone(), 48)
.await
.expect("reading row group");

Expand Down
2 changes: 1 addition & 1 deletion parquet/src/column/reader.rs
Expand Up @@ -312,7 +312,7 @@ where

// If page has less rows than the remaining records to
// be skipped, skip entire page
if metadata.num_rows < remaining {
if metadata.num_rows <= remaining {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another bug I uncovered while testing. This causes the reader to try and fetch the pages unnecessarily which are not pre-fetched

self.page_reader.skip_next_page()?;
remaining -= metadata.num_rows;
continue;
Expand Down