Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use offset index in ParquetRecordBatchStream #2526

Merged
merged 6 commits into from Aug 20, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions object_store/src/local.rs
Expand Up @@ -1068,6 +1068,7 @@ mod tests {
integration.head(&path).await.unwrap();
}

#[ignore]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😄

#[tokio::test]
async fn test_list_root() {
let integration = LocalFileSystem::new();
Expand Down
33 changes: 32 additions & 1 deletion parquet/src/arrow/arrow_reader/selection.rs
Expand Up @@ -162,7 +162,12 @@ impl RowSelection {
current_selector = selectors.next();
}
} else {
break;
if !(selector.skip || current_page_included) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are the implications of this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before we would break if we were on the last page. So if you skipped from inside the second to last page into the last page, then this would short circuit and the last page range wouldn't be selected.

let start = page.offset as usize;
let end = start + page.compressed_page_size as usize;
ranges.push(start..end);
}
current_selector = selectors.next()
}
}

Expand Down Expand Up @@ -564,5 +569,31 @@ mod tests {

// assert_eq!(mask, vec![false, true, true, false, true, true, true]);
assert_eq!(ranges, vec![10..20, 20..30, 40..50, 50..60, 60..70]);

let selection = RowSelection::from(vec![
// Skip first page
RowSelector::skip(10),
// Multiple selects in same page
RowSelector::select(3),
RowSelector::skip(3),
RowSelector::select(4),
// Select to page boundary
RowSelector::skip(5),
RowSelector::select(5),
// Skip full page past page boundary
RowSelector::skip(12),
// Select to final page bounday
RowSelector::select(12),
RowSelector::skip(1),
// Skip across final page boundary
RowSelector::skip(8),
// Select from final page
RowSelector::select(4),
]);

let ranges = selection.scan_ranges(&index);

// assert_eq!(mask, vec![false, true, true, false, true, true, true]);
assert_eq!(ranges, vec![10..20, 20..30, 40..50, 50..60, 60..70]);
}
}
229 changes: 223 additions & 6 deletions parquet/src/arrow/async_reader.rs
Expand Up @@ -78,7 +78,7 @@
use std::collections::VecDeque;
use std::fmt::Formatter;

use std::io::SeekFrom;
use std::io::{Cursor, SeekFrom};
use std::ops::Range;
use std::pin::Pin;
use std::sync::Arc;
Expand All @@ -88,6 +88,8 @@ use bytes::{Buf, Bytes};
use futures::future::{BoxFuture, FutureExt};
use futures::ready;
use futures::stream::Stream;
use parquet_format::{OffsetIndex, PageLocation};
use thrift::protocol::TCompactInputProtocol;

use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};

Expand All @@ -96,8 +98,8 @@ use arrow::record_batch::RecordBatch;

use crate::arrow::array_reader::{build_array_reader, RowGroupCollection};
use crate::arrow::arrow_reader::{
evaluate_predicate, selects_any, ArrowReaderBuilder, ParquetRecordBatchReader,
RowFilter, RowSelection,
evaluate_predicate, selects_any, ArrowReaderBuilder, ArrowReaderOptions,
ParquetRecordBatchReader, RowFilter, RowSelection,
};
use crate::arrow::ProjectionMask;

Expand All @@ -108,6 +110,8 @@ use crate::file::footer::{decode_footer, decode_metadata};
use crate::file::metadata::{ParquetMetaData, RowGroupMetaData};
use crate::file::reader::{ChunkReader, Length, SerializedPageReader};

use crate::file::page_index::index::Index;
use crate::file::page_index::index_reader;
use crate::file::FOOTER_SIZE;

use crate::schema::types::{ColumnDescPtr, SchemaDescPtr};
Expand Down Expand Up @@ -139,6 +143,80 @@ pub trait AsyncFileReader: Send {
/// allowing fine-grained control over how metadata is sourced, in particular allowing
/// for caching, pre-fetching, catalog metadata, etc...
fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>>;

/// Provides asynchronous access to the the page index for each column chunk in a
/// row group. Will panic if `row_group_idx` is greater than or equal to `num_row_groups`
fn get_column_indexes(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if we should make this a separate constructor or just have it always use the index. If the index doesn't exist, the cost of determining that is minimal.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure of the value of exposing these on AsyncFileReader, and not just handling the logic internally. Ultimately if the implementer wants to override the way the index is fetched, they can just return ParquetMetadata from get_metadata with the index information already loaded.

&mut self,
metadata: Arc<ParquetMetaData>,
row_group_idx: usize,
) -> BoxFuture<'_, Result<Vec<Index>>> {
async move {
let chunks = metadata.row_group(row_group_idx).columns();

let (offset, lengths) = index_reader::get_index_offset_and_lengths(chunks)?;
let length = lengths.iter().sum::<usize>();

if length == 0 {
return Ok(vec![Index::NONE; chunks.len()]);
}

//read all need data into buffer
let data = self
.get_bytes(offset as usize..offset as usize + length)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will perform separate get_bytes requests to fetch the page index and column index information for each column chunk. This is likely not a good idea, especially since this will be performed serially.

Ideally we would identify the ranges of all the index information, and then call get_byte_ranges, this will allow coalescing proximate requests to ObjectStore, paralell fetch, etc...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that makes sense.

.await?;

let mut start = 0;
let data = lengths.into_iter().map(|length| {
let r = data.slice(start..start + length);
start += length;
r
});

chunks
.iter()
.zip(data)
.map(|(chunk, data)| {
let column_type = chunk.column_type();
index_reader::deserialize_column_index(&data, column_type)
})
.collect()
}
.boxed()
}

/// Provides asynchronous access to the the offset index for each column chunk in a
/// row group. Will panic if `row_group_idx` is greater than or equal to `num_row_groups`
fn get_page_locations(
&mut self,
metadata: Arc<ParquetMetaData>,
row_group_idx: usize,
) -> BoxFuture<'_, Result<Option<Vec<Vec<PageLocation>>>>> {
async move {
let chunks = metadata.row_group(row_group_idx).columns();

let (offset, total_length) =
index_reader::get_location_offset_and_total_length(chunks)?;

if total_length == 0 {
return Ok(None);
}

let data = self
.get_bytes(offset as usize..offset as usize + total_length)
.await?;
let mut d = Cursor::new(data);
let mut result = vec![];

for _ in 0..chunks.len() {
let mut prot = TCompactInputProtocol::new(&mut d);
let offset = OffsetIndex::read_from_in_protocol(&mut prot)?;
result.push(offset.page_locations);
}
Ok(Some(result))
}
.boxed()
}
}

impl AsyncFileReader for Box<dyn AsyncFileReader> {
Expand Down Expand Up @@ -218,6 +296,36 @@ impl<T: AsyncFileReader + Send + 'static> ArrowReaderBuilder<AsyncReader<T>> {
Self::new_builder(AsyncReader(input), metadata, Default::default())
}

pub async fn new_with_index(mut input: T) -> Result<Self> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be more consistent to have new_with_options accepting ArrowReaderOptions. This already has a field on it for the page index

let metadata = input.get_metadata().await?;

let mut row_groups = metadata.row_groups().to_vec();

let mut columns_indexes = vec![];
let mut offset_indexes = vec![];

for (idx, rg) in row_groups.iter_mut().enumerate() {
let column_index = input.get_column_indexes(metadata.clone(), idx).await?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should check if the column index has already been fetched to the metadata, and not fetch it again if it is already present


columns_indexes.push(column_index);
if let Some(offset_index) =
input.get_page_locations(metadata.clone(), idx).await?
{
rg.set_page_offset(offset_index.clone());
offset_indexes.push(offset_index);
Comment on lines +278 to +279
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again this interface seems really confused - something for #2530

}
}

let metadata = Arc::new(ParquetMetaData::new_with_page_index(
Copy link
Contributor

@tustvold tustvold Aug 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not part of this PR, but I still feel something is off with the way the index information is located on ParquetMetadata...

Edit: filed #2530

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I agree.

metadata.file_metadata().clone(),
row_groups,
Some(columns_indexes),
Some(offset_indexes),
));

Self::new_builder(AsyncReader(input), metadata, ArrowReaderOptions::new())
}

/// Build a new [`ParquetRecordBatchStream`]
pub fn build(self) -> Result<ParquetRecordBatchStream<T>> {
let num_row_groups = self.metadata.row_groups().len();
Expand Down Expand Up @@ -687,7 +795,6 @@ mod tests {
use crate::file::page_index::index_reader;
use arrow::array::{Array, ArrayRef, Int32Array, StringArray};
use arrow::error::Result as ArrowResult;

use futures::TryStreamExt;
use std::sync::Mutex;

Expand Down Expand Up @@ -763,6 +870,68 @@ mod tests {
);
}

#[tokio::test]
async fn test_async_reader_with_index() {
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{}/alltypes_tiny_pages_plain.parquet", testdata);
let data = Bytes::from(std::fs::read(path).unwrap());

let metadata = parse_metadata(&data).unwrap();
let metadata = Arc::new(metadata);

assert_eq!(metadata.num_row_groups(), 1);

let async_reader = TestReader {
data: data.clone(),
metadata: metadata.clone(),
requests: Default::default(),
};

let builder = ParquetRecordBatchStreamBuilder::new_with_index(async_reader)
.await
.unwrap();

// The builder should have page and offset indexes loaded now
let metadata_with_index = builder.metadata();

// Check offset indexes are present for all columns
for rg in metadata_with_index.row_groups() {
let page_locations = rg
.page_offset_index()
.as_ref()
.expect("expected page offset index");
assert_eq!(page_locations.len(), rg.columns().len())
}

// Check page indexes are present for all columns
let page_indexes = metadata_with_index
.page_indexes()
.expect("expected page indexes");
for (idx, rg) in metadata_with_index.row_groups().iter().enumerate() {
assert_eq!(page_indexes[idx].len(), rg.columns().len())
}

let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
let stream = builder
.with_projection(mask.clone())
.with_batch_size(1024)
.build()
.unwrap();

let async_batches: Vec<_> = stream.try_collect().await.unwrap();

let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
.unwrap()
.with_projection(mask)
.with_batch_size(1024)
.build()
.unwrap()
.collect::<ArrowResult<Vec<_>>>()
.unwrap();

assert_eq!(async_batches, sync_batches);
}

#[tokio::test]
async fn test_row_filter() {
let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
Expand Down Expand Up @@ -832,6 +1001,54 @@ mod tests {
assert_eq!(requests.lock().unwrap().len(), 3);
}

#[tokio::test]
async fn test_row_filter_with_index() {
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{}/alltypes_tiny_pages_plain.parquet", testdata);
let data = Bytes::from(std::fs::read(path).unwrap());

let metadata = parse_metadata(&data).unwrap();
let parquet_schema = metadata.file_metadata().schema_descr_ptr();
let metadata = Arc::new(metadata);

assert_eq!(metadata.num_row_groups(), 1);

let async_reader = TestReader {
data: data.clone(),
metadata: metadata.clone(),
requests: Default::default(),
};

let a_filter = ArrowPredicateFn::new(
ProjectionMask::leaves(&parquet_schema, vec![1]),
|batch| arrow::compute::eq_dyn_bool_scalar(batch.column(0), true),
);

let b_filter = ArrowPredicateFn::new(
ProjectionMask::leaves(&parquet_schema, vec![2]),
|batch| arrow::compute::eq_dyn_scalar(batch.column(0), 2_i32),
);

let filter = RowFilter::new(vec![Box::new(a_filter), Box::new(b_filter)]);

let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 2]);

let stream = ParquetRecordBatchStreamBuilder::new_with_index(async_reader)
.await
.unwrap()
.with_projection(mask.clone())
.with_batch_size(1024)
.with_row_filter(filter)
.build()
.unwrap();

let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();

let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();

assert_eq!(total_rows, 730);
}

#[tokio::test]
async fn test_in_memory_row_group_sparse() {
let testdata = arrow::util::test_util::parquet_test_data();
Expand Down Expand Up @@ -882,7 +1099,7 @@ mod tests {
let mut skip = true;
let mut pages = offset_index[0].iter().peekable();

// Setup `RowSelection` so that we can skip every other page
// Setup `RowSelection` so that we can skip every other page, selecting the last page
let mut selectors = vec![];
let mut expected_page_requests: Vec<Range<usize>> = vec![];
while let Some(page) = pages.next() {
Expand All @@ -906,7 +1123,7 @@ mod tests {
let selection = RowSelection::from(selectors);

let (_factory, _reader) = reader_factory
.read_row_group(0, Some(selection), projection, 48)
.read_row_group(0, Some(selection), projection.clone(), 48)
.await
.expect("reading row group");

Expand Down