From 1eabcab7d95d27bb090743c3599274e31269bc25 Mon Sep 17 00:00:00 2001 From: Dan Harris Date: Fri, 19 Aug 2022 09:24:10 -0400 Subject: [PATCH 1/6] Use offset index in ParquetRecordBatchStream --- parquet/src/arrow/arrow_reader/selection.rs | 33 ++- parquet/src/arrow/async_reader.rs | 242 +++++++++++++++++++- parquet/src/file/page_index/index_reader.rs | 16 +- parquet/src/file/serialized_reader.rs | 2 +- 4 files changed, 280 insertions(+), 13 deletions(-) diff --git a/parquet/src/arrow/arrow_reader/selection.rs b/parquet/src/arrow/arrow_reader/selection.rs index f3d11d92595..b6ee273ab56 100644 --- a/parquet/src/arrow/arrow_reader/selection.rs +++ b/parquet/src/arrow/arrow_reader/selection.rs @@ -162,7 +162,12 @@ impl RowSelection { current_selector = selectors.next(); } } else { - break; + if !(selector.skip || current_page_included) { + let start = page.offset as usize; + let end = start + page.compressed_page_size as usize; + ranges.push(start..end); + } + current_selector = selectors.next() } } @@ -564,5 +569,31 @@ mod tests { // assert_eq!(mask, vec![false, true, true, false, true, true, true]); assert_eq!(ranges, vec![10..20, 20..30, 40..50, 50..60, 60..70]); + + let selection = RowSelection::from(vec![ + // Skip first page + RowSelector::skip(10), + // Multiple selects in same page + RowSelector::select(3), + RowSelector::skip(3), + RowSelector::select(4), + // Select to page boundary + RowSelector::skip(5), + RowSelector::select(5), + // Skip full page past page boundary + RowSelector::skip(12), + // Select to final page bounday + RowSelector::select(12), + RowSelector::skip(1), + // Skip across final page boundary + RowSelector::skip(8), + // Select from final page + RowSelector::select(4), + ]); + + let ranges = selection.scan_ranges(&index); + + // assert_eq!(mask, vec![false, true, true, false, true, true, true]); + assert_eq!(ranges, vec![10..20, 20..30, 40..50, 50..60, 60..70]); } } diff --git a/parquet/src/arrow/async_reader.rs b/parquet/src/arrow/async_reader.rs index 7bcc5503823..02b846db4b9 100644 --- a/parquet/src/arrow/async_reader.rs +++ b/parquet/src/arrow/async_reader.rs @@ -78,7 +78,7 @@ use std::collections::VecDeque; use std::fmt::Formatter; -use std::io::SeekFrom; +use std::io::{Cursor, SeekFrom}; use std::ops::Range; use std::pin::Pin; use std::sync::Arc; @@ -88,6 +88,8 @@ use bytes::{Buf, Bytes}; use futures::future::{BoxFuture, FutureExt}; use futures::ready; use futures::stream::Stream; +use parquet_format::{OffsetIndex, PageLocation}; +use thrift::protocol::TCompactInputProtocol; use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt}; @@ -96,8 +98,8 @@ use arrow::record_batch::RecordBatch; use crate::arrow::array_reader::{build_array_reader, RowGroupCollection}; use crate::arrow::arrow_reader::{ - evaluate_predicate, selects_any, ArrowReaderBuilder, ParquetRecordBatchReader, - RowFilter, RowSelection, + evaluate_predicate, selects_any, ArrowReaderBuilder, ArrowReaderOptions, + ParquetRecordBatchReader, RowFilter, RowSelection, }; use crate::arrow::ProjectionMask; @@ -108,6 +110,9 @@ use crate::file::footer::{decode_footer, decode_metadata}; use crate::file::metadata::{ParquetMetaData, RowGroupMetaData}; use crate::file::reader::{ChunkReader, Length, SerializedPageReader}; +use crate::file::page_index::index::Index; +use crate::file::page_index::index_reader; +use crate::file::serialized_reader::ReadOptions; use crate::file::FOOTER_SIZE; use crate::schema::types::{ColumnDescPtr, SchemaDescPtr}; @@ -139,6 +144,80 @@ pub trait AsyncFileReader: Send { /// allowing fine-grained control over how metadata is sourced, in particular allowing /// for caching, pre-fetching, catalog metadata, etc... fn get_metadata(&mut self) -> BoxFuture<'_, Result>>; + + /// Provides asynchronous access to the the page index for each column chunk in a + /// row group. Will panic if `row_group_idx` is greater than or equal to `num_row_groups` + fn get_column_indexes( + &mut self, + metadata: Arc, + row_group_idx: usize, + ) -> BoxFuture<'_, Result>> { + async move { + let chunks = metadata.row_group(row_group_idx).columns(); + + let (offset, lengths) = index_reader::get_index_offset_and_lengths(chunks)?; + let length = lengths.iter().sum::(); + + if length == 0 { + return Ok(vec![Index::NONE; chunks.len()]); + } + + //read all need data into buffer + let data = self + .get_bytes(offset as usize..offset as usize + length) + .await?; + + let mut start = 0; + let data = lengths.into_iter().map(|length| { + let r = data.slice(start..start + length); + start += length; + r + }); + + chunks + .iter() + .zip(data) + .map(|(chunk, data)| { + let column_type = chunk.column_type(); + index_reader::deserialize_column_index(&data, column_type) + }) + .collect() + } + .boxed() + } + + /// Provides asynchronous access to the the offset index for each column chunk in a + /// row group. Will panic if `row_group_idx` is greater than or equal to `num_row_groups` + fn get_page_locations( + &mut self, + metadata: Arc, + row_group_idx: usize, + ) -> BoxFuture<'_, Result>>>> { + async move { + let chunks = metadata.row_group(row_group_idx).columns(); + + let (offset, total_length) = + index_reader::get_location_offset_and_total_length(chunks)?; + + if total_length == 0 { + return Ok(None); + } + + let data = self + .get_bytes(offset as usize..offset as usize + total_length) + .await?; + let mut d = Cursor::new(data); + let mut result = vec![]; + + for _ in 0..chunks.len() { + let mut prot = TCompactInputProtocol::new(&mut d); + let offset = OffsetIndex::read_from_in_protocol(&mut prot)?; + result.push(offset.page_locations); + } + Ok(Some(result)) + } + .boxed() + } } impl AsyncFileReader for Box { @@ -218,6 +297,39 @@ impl ArrowReaderBuilder> { Self::new_builder(AsyncReader(input), metadata, Default::default()) } + pub async fn new_with_options(mut input: T, options: ReadOptions) -> Result { + let mut metadata = input.get_metadata().await?; + + let mut row_groups = metadata.row_groups().clone().to_vec(); + + if options.enable_page_index { + let mut columns_indexes = vec![]; + let mut offset_indexes = vec![]; + + for (idx, rg) in row_groups.iter_mut().enumerate() { + let column_index = + input.get_column_indexes(metadata.clone(), idx).await?; + + columns_indexes.push(column_index); + if let Some(offset_index) = + input.get_page_locations(metadata.clone(), idx).await? + { + rg.set_page_offset(offset_index.clone()); + offset_indexes.push(offset_index); + } + } + + metadata = Arc::new(ParquetMetaData::new_with_page_index( + metadata.file_metadata().clone(), + row_groups, + Some(columns_indexes), + Some(offset_indexes), + )) + } + + Self::new_builder(AsyncReader(input), metadata, ArrowReaderOptions::new()) + } + /// Build a new [`ParquetRecordBatchStream`] pub fn build(self) -> Result> { let num_row_groups = self.metadata.row_groups().len(); @@ -637,9 +749,10 @@ impl ChunkReader for ColumnChunkData { .binary_search_by_key(&start, |(offset, _)| *offset as u64) .map(|idx| data[idx].1.slice(0..length)) .map_err(|_| { + let valid_offsets: Vec = data.iter().map(|(offset,_)| *offset).collect(); ParquetError::General(format!( - "Invalid offset in sparse column chunk data: {}", - start + "Invalid offset in sparse column chunk data: {}. Valid offset are {:?}. Total pages {}", + start, valid_offsets, valid_offsets.len() )) }), ColumnChunkData::Dense { offset, data } => { @@ -688,6 +801,7 @@ mod tests { use arrow::array::{Array, ArrayRef, Int32Array, StringArray}; use arrow::error::Result as ArrowResult; + use crate::file::serialized_reader::ReadOptionsBuilder; use futures::TryStreamExt; use std::sync::Mutex; @@ -763,6 +877,70 @@ mod tests { ); } + #[tokio::test] + async fn test_async_reader_with_index() { + let testdata = arrow::util::test_util::parquet_test_data(); + let path = format!("{}/alltypes_tiny_pages_plain.parquet", testdata); + let data = Bytes::from(std::fs::read(path).unwrap()); + + let metadata = parse_metadata(&data).unwrap(); + let metadata = Arc::new(metadata); + + assert_eq!(metadata.num_row_groups(), 1); + + let async_reader = TestReader { + data: data.clone(), + metadata: metadata.clone(), + requests: Default::default(), + }; + + let options = ReadOptionsBuilder::new().with_page_index().build(); + let builder = + ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options) + .await + .unwrap(); + + // The builder should have page and offset indexes loaded now + let metadata_with_index = builder.metadata(); + + // Check offset indexes are present for all columns + for rg in metadata_with_index.row_groups() { + let page_locations = rg + .page_offset_index() + .as_ref() + .expect("expected page offset index"); + assert_eq!(page_locations.len(), rg.columns().len()) + } + + // Check page indexes are present for all columns + let page_indexes = metadata_with_index + .page_indexes() + .expect("expected page indexes"); + for (idx, rg) in metadata_with_index.row_groups().iter().enumerate() { + assert_eq!(page_indexes[idx].len(), rg.columns().len()) + } + + let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]); + let stream = builder + .with_projection(mask.clone()) + .with_batch_size(1024) + .build() + .unwrap(); + + let async_batches: Vec<_> = stream.try_collect().await.unwrap(); + + let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data) + .unwrap() + .with_projection(mask) + .with_batch_size(1024) + .build() + .unwrap() + .collect::>>() + .unwrap(); + + assert_eq!(async_batches, sync_batches); + } + #[tokio::test] async fn test_row_filter() { let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]); @@ -832,6 +1010,56 @@ mod tests { assert_eq!(requests.lock().unwrap().len(), 3); } + #[tokio::test] + async fn test_row_filter_with_index() { + let testdata = arrow::util::test_util::parquet_test_data(); + let path = format!("{}/alltypes_tiny_pages_plain.parquet", testdata); + let data = Bytes::from(std::fs::read(path).unwrap()); + + let metadata = parse_metadata(&data).unwrap(); + let parquet_schema = metadata.file_metadata().schema_descr_ptr(); + let metadata = Arc::new(metadata); + + assert_eq!(metadata.num_row_groups(), 1); + + let async_reader = TestReader { + data: data.clone(), + metadata: metadata.clone(), + requests: Default::default(), + }; + + let a_filter = ArrowPredicateFn::new( + ProjectionMask::leaves(&parquet_schema, vec![1]), + |batch| arrow::compute::eq_dyn_bool_scalar(batch.column(0), true), + ); + + let b_filter = ArrowPredicateFn::new( + ProjectionMask::leaves(&parquet_schema, vec![2]), + |batch| arrow::compute::eq_dyn_scalar(batch.column(0), 2_i32), + ); + + let filter = RowFilter::new(vec![Box::new(a_filter), Box::new(b_filter)]); + + let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 2]); + let options = ReadOptionsBuilder::new().with_page_index().build(); + + let stream = + ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options) + .await + .unwrap() + .with_projection(mask.clone()) + .with_batch_size(1024) + .with_row_filter(filter) + .build() + .unwrap(); + + let batches: Vec = stream.try_collect().await.unwrap(); + + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + + assert_eq!(total_rows, 730); + } + #[tokio::test] async fn test_in_memory_row_group_sparse() { let testdata = arrow::util::test_util::parquet_test_data(); @@ -882,7 +1110,7 @@ mod tests { let mut skip = true; let mut pages = offset_index[0].iter().peekable(); - // Setup `RowSelection` so that we can skip every other page + // Setup `RowSelection` so that we can skip every other page, selecting the last page let mut selectors = vec![]; let mut expected_page_requests: Vec> = vec![]; while let Some(page) = pages.next() { @@ -906,7 +1134,7 @@ mod tests { let selection = RowSelection::from(selectors); let (_factory, _reader) = reader_factory - .read_row_group(0, Some(selection), projection, 48) + .read_row_group(0, Some(selection), projection.clone(), 48) .await .expect("reading row group"); diff --git a/parquet/src/file/page_index/index_reader.rs b/parquet/src/file/page_index/index_reader.rs index e3f37fbc661..e6a4e598102 100644 --- a/parquet/src/file/page_index/index_reader.rs +++ b/parquet/src/file/page_index/index_reader.rs @@ -34,8 +34,12 @@ pub fn read_columns_indexes( let (offset, lengths) = get_index_offset_and_lengths(chunks)?; let length = lengths.iter().sum::(); + if length == 0 { + return Ok(vec![Index::NONE; chunks.len()]); + } + //read all need data into buffer - let mut reader = reader.get_read(offset, reader.len() as usize)?; + let mut reader = reader.get_read(offset, length)?; let mut data = vec![0; length]; reader.read_exact(&mut data)?; @@ -64,6 +68,10 @@ pub fn read_pages_locations( ) -> Result>, ParquetError> { let (offset, total_length) = get_location_offset_and_total_length(chunks)?; + if total_length == 0 { + return Ok(vec![]); + } + //read all need data into buffer let mut reader = reader.get_read(offset, total_length)?; let mut data = vec![0; total_length]; @@ -82,7 +90,7 @@ pub fn read_pages_locations( //Get File offsets of every ColumnChunk's page_index //If there are invalid offset return a zero offset with empty lengths. -fn get_index_offset_and_lengths( +pub(crate) fn get_index_offset_and_lengths( chunks: &[ColumnChunkMetaData], ) -> Result<(u64, Vec), ParquetError> { let first_col_metadata = if let Some(chunk) = chunks.first() { @@ -111,7 +119,7 @@ fn get_index_offset_and_lengths( //Get File offset of ColumnChunk's pages_locations //If there are invalid offset return a zero offset with zero length. -fn get_location_offset_and_total_length( +pub(crate) fn get_location_offset_and_total_length( chunks: &[ColumnChunkMetaData], ) -> Result<(u64, usize), ParquetError> { let metadata = if let Some(chunk) = chunks.first() { @@ -133,7 +141,7 @@ fn get_location_offset_and_total_length( Ok((offset, total_length)) } -fn deserialize_column_index( +pub(crate) fn deserialize_column_index( data: &[u8], column_type: Type, ) -> Result { diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs index f3beb57c02e..f600d071306 100644 --- a/parquet/src/file/serialized_reader.rs +++ b/parquet/src/file/serialized_reader.rs @@ -201,7 +201,7 @@ impl ReadOptionsBuilder { /// All predicates will be chained using 'AND' to filter the row groups. pub struct ReadOptions { predicates: Vec, - enable_page_index: bool, + pub(crate) enable_page_index: bool, } impl SerializedFileReader { From 8729e1688ffad7dcfc92455d5423a408a464fb7e Mon Sep 17 00:00:00 2001 From: Dan Harris Date: Fri, 19 Aug 2022 09:31:33 -0400 Subject: [PATCH 2/6] remove debugging cruft and fix clippy warning --- parquet/src/arrow/async_reader.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/parquet/src/arrow/async_reader.rs b/parquet/src/arrow/async_reader.rs index 02b846db4b9..d32cdc56217 100644 --- a/parquet/src/arrow/async_reader.rs +++ b/parquet/src/arrow/async_reader.rs @@ -300,7 +300,7 @@ impl ArrowReaderBuilder> { pub async fn new_with_options(mut input: T, options: ReadOptions) -> Result { let mut metadata = input.get_metadata().await?; - let mut row_groups = metadata.row_groups().clone().to_vec(); + let mut row_groups = metadata.row_groups().to_vec(); if options.enable_page_index { let mut columns_indexes = vec![]; @@ -749,10 +749,9 @@ impl ChunkReader for ColumnChunkData { .binary_search_by_key(&start, |(offset, _)| *offset as u64) .map(|idx| data[idx].1.slice(0..length)) .map_err(|_| { - let valid_offsets: Vec = data.iter().map(|(offset,_)| *offset).collect(); ParquetError::General(format!( - "Invalid offset in sparse column chunk data: {}. Valid offset are {:?}. Total pages {}", - start, valid_offsets, valid_offsets.len() + "Invalid offset in sparse column chunk data: {}", + start )) }), ColumnChunkData::Dense { offset, data } => { From bc3a2e00e228c75f2722a432641b47bfff4ecd9d Mon Sep 17 00:00:00 2001 From: Dan Harris Date: Fri, 19 Aug 2022 10:13:53 -0400 Subject: [PATCH 3/6] Do not use ReadOptions --- object_store/src/local.rs | 1 + parquet/src/arrow/async_reader.rs | 72 ++++++++++++--------------- parquet/src/file/serialized_reader.rs | 2 +- 3 files changed, 33 insertions(+), 42 deletions(-) diff --git a/object_store/src/local.rs b/object_store/src/local.rs index fd3c3592ab5..0f1a8821b86 100644 --- a/object_store/src/local.rs +++ b/object_store/src/local.rs @@ -1068,6 +1068,7 @@ mod tests { integration.head(&path).await.unwrap(); } + #[ignore] #[tokio::test] async fn test_list_root() { let integration = LocalFileSystem::new(); diff --git a/parquet/src/arrow/async_reader.rs b/parquet/src/arrow/async_reader.rs index d32cdc56217..b212b2e3741 100644 --- a/parquet/src/arrow/async_reader.rs +++ b/parquet/src/arrow/async_reader.rs @@ -112,7 +112,6 @@ use crate::file::reader::{ChunkReader, Length, SerializedPageReader}; use crate::file::page_index::index::Index; use crate::file::page_index::index_reader; -use crate::file::serialized_reader::ReadOptions; use crate::file::FOOTER_SIZE; use crate::schema::types::{ColumnDescPtr, SchemaDescPtr}; @@ -297,36 +296,33 @@ impl ArrowReaderBuilder> { Self::new_builder(AsyncReader(input), metadata, Default::default()) } - pub async fn new_with_options(mut input: T, options: ReadOptions) -> Result { - let mut metadata = input.get_metadata().await?; + pub async fn new_with_index(mut input: T) -> Result { + let metadata = input.get_metadata().await?; let mut row_groups = metadata.row_groups().to_vec(); - if options.enable_page_index { - let mut columns_indexes = vec![]; - let mut offset_indexes = vec![]; + let mut columns_indexes = vec![]; + let mut offset_indexes = vec![]; - for (idx, rg) in row_groups.iter_mut().enumerate() { - let column_index = - input.get_column_indexes(metadata.clone(), idx).await?; + for (idx, rg) in row_groups.iter_mut().enumerate() { + let column_index = input.get_column_indexes(metadata.clone(), idx).await?; - columns_indexes.push(column_index); - if let Some(offset_index) = - input.get_page_locations(metadata.clone(), idx).await? - { - rg.set_page_offset(offset_index.clone()); - offset_indexes.push(offset_index); - } + columns_indexes.push(column_index); + if let Some(offset_index) = + input.get_page_locations(metadata.clone(), idx).await? + { + rg.set_page_offset(offset_index.clone()); + offset_indexes.push(offset_index); } - - metadata = Arc::new(ParquetMetaData::new_with_page_index( - metadata.file_metadata().clone(), - row_groups, - Some(columns_indexes), - Some(offset_indexes), - )) } + let metadata = Arc::new(ParquetMetaData::new_with_page_index( + metadata.file_metadata().clone(), + row_groups, + Some(columns_indexes), + Some(offset_indexes), + )); + Self::new_builder(AsyncReader(input), metadata, ArrowReaderOptions::new()) } @@ -799,8 +795,6 @@ mod tests { use crate::file::page_index::index_reader; use arrow::array::{Array, ArrayRef, Int32Array, StringArray}; use arrow::error::Result as ArrowResult; - - use crate::file::serialized_reader::ReadOptionsBuilder; use futures::TryStreamExt; use std::sync::Mutex; @@ -893,11 +887,9 @@ mod tests { requests: Default::default(), }; - let options = ReadOptionsBuilder::new().with_page_index().build(); - let builder = - ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options) - .await - .unwrap(); + let builder = ParquetRecordBatchStreamBuilder::new_with_index(async_reader) + .await + .unwrap(); // The builder should have page and offset indexes loaded now let metadata_with_index = builder.metadata(); @@ -1040,17 +1032,15 @@ mod tests { let filter = RowFilter::new(vec![Box::new(a_filter), Box::new(b_filter)]); let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 2]); - let options = ReadOptionsBuilder::new().with_page_index().build(); - - let stream = - ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options) - .await - .unwrap() - .with_projection(mask.clone()) - .with_batch_size(1024) - .with_row_filter(filter) - .build() - .unwrap(); + + let stream = ParquetRecordBatchStreamBuilder::new_with_index(async_reader) + .await + .unwrap() + .with_projection(mask.clone()) + .with_batch_size(1024) + .with_row_filter(filter) + .build() + .unwrap(); let batches: Vec = stream.try_collect().await.unwrap(); diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs index f600d071306..f3beb57c02e 100644 --- a/parquet/src/file/serialized_reader.rs +++ b/parquet/src/file/serialized_reader.rs @@ -201,7 +201,7 @@ impl ReadOptionsBuilder { /// All predicates will be chained using 'AND' to filter the row groups. pub struct ReadOptions { predicates: Vec, - pub(crate) enable_page_index: bool, + enable_page_index: bool, } impl SerializedFileReader { From 54819026ba3bf318cad48e469ec20897ab3d367f Mon Sep 17 00:00:00 2001 From: Dan Harris Date: Fri, 19 Aug 2022 12:16:39 -0400 Subject: [PATCH 4/6] Fix bug with dictionary pages --- object_store/src/local.rs | 1 - parquet/src/arrow/async_reader.rs | 17 +++++++++++++++-- 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/object_store/src/local.rs b/object_store/src/local.rs index 0f1a8821b86..fd3c3592ab5 100644 --- a/object_store/src/local.rs +++ b/object_store/src/local.rs @@ -1068,7 +1068,6 @@ mod tests { integration.head(&path).await.unwrap(); } - #[ignore] #[tokio::test] async fn test_list_root() { let integration = LocalFileSystem::new(); diff --git a/parquet/src/arrow/async_reader.rs b/parquet/src/arrow/async_reader.rs index b212b2e3741..33745417b49 100644 --- a/parquet/src/arrow/async_reader.rs +++ b/parquet/src/arrow/async_reader.rs @@ -601,13 +601,26 @@ impl<'a> InMemoryRowGroup<'a> { let fetch_ranges = self .column_chunks .iter() + .zip(self.metadata.columns()) .enumerate() .into_iter() - .filter_map(|(idx, chunk)| { + .filter_map(|(idx, (chunk, chunk_meta))| { (chunk.is_none() && projection.leaf_included(idx)).then(|| { - let ranges = selection.scan_ranges(&page_locations[idx]); + // If the first page does not start at the beginning of the column, + // then we need to also fetch a dictionary page. + let mut ranges = vec![]; + let (start, _len) = chunk_meta.byte_range(); + match page_locations[idx].first() { + Some(first) if first.offset as u64 != start => { + ranges.push(start as usize..first.offset as usize); + } + _ => (), + } + + ranges.extend(selection.scan_ranges(&page_locations[idx])); page_start_offsets .push(ranges.iter().map(|range| range.start).collect()); + ranges }) }) From 6304227b67962c281b52da63677d4f4aa4f34a2f Mon Sep 17 00:00:00 2001 From: Dan Harris Date: Fri, 19 Aug 2022 13:08:51 -0400 Subject: [PATCH 5/6] Review comments --- parquet/src/arrow/arrow_reader/mod.rs | 2 +- parquet/src/arrow/async_reader.rs | 201 ++++++++++++-------------- 2 files changed, 96 insertions(+), 107 deletions(-) diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index 1247e4399e6..5c4b38d6a0c 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -207,7 +207,7 @@ pub trait ArrowReader { #[derive(Debug, Clone, Default)] pub struct ArrowReaderOptions { skip_arrow_metadata: bool, - page_index: bool, + pub(crate) page_index: bool, } impl ArrowReaderOptions { diff --git a/parquet/src/arrow/async_reader.rs b/parquet/src/arrow/async_reader.rs index 33745417b49..b0d9143d64d 100644 --- a/parquet/src/arrow/async_reader.rs +++ b/parquet/src/arrow/async_reader.rs @@ -88,7 +88,7 @@ use bytes::{Buf, Bytes}; use futures::future::{BoxFuture, FutureExt}; use futures::ready; use futures::stream::Stream; -use parquet_format::{OffsetIndex, PageLocation}; +use parquet_format::OffsetIndex; use thrift::protocol::TCompactInputProtocol; use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt}; @@ -110,7 +110,6 @@ use crate::file::footer::{decode_footer, decode_metadata}; use crate::file::metadata::{ParquetMetaData, RowGroupMetaData}; use crate::file::reader::{ChunkReader, Length, SerializedPageReader}; -use crate::file::page_index::index::Index; use crate::file::page_index::index_reader; use crate::file::FOOTER_SIZE; @@ -143,80 +142,6 @@ pub trait AsyncFileReader: Send { /// allowing fine-grained control over how metadata is sourced, in particular allowing /// for caching, pre-fetching, catalog metadata, etc... fn get_metadata(&mut self) -> BoxFuture<'_, Result>>; - - /// Provides asynchronous access to the the page index for each column chunk in a - /// row group. Will panic if `row_group_idx` is greater than or equal to `num_row_groups` - fn get_column_indexes( - &mut self, - metadata: Arc, - row_group_idx: usize, - ) -> BoxFuture<'_, Result>> { - async move { - let chunks = metadata.row_group(row_group_idx).columns(); - - let (offset, lengths) = index_reader::get_index_offset_and_lengths(chunks)?; - let length = lengths.iter().sum::(); - - if length == 0 { - return Ok(vec![Index::NONE; chunks.len()]); - } - - //read all need data into buffer - let data = self - .get_bytes(offset as usize..offset as usize + length) - .await?; - - let mut start = 0; - let data = lengths.into_iter().map(|length| { - let r = data.slice(start..start + length); - start += length; - r - }); - - chunks - .iter() - .zip(data) - .map(|(chunk, data)| { - let column_type = chunk.column_type(); - index_reader::deserialize_column_index(&data, column_type) - }) - .collect() - } - .boxed() - } - - /// Provides asynchronous access to the the offset index for each column chunk in a - /// row group. Will panic if `row_group_idx` is greater than or equal to `num_row_groups` - fn get_page_locations( - &mut self, - metadata: Arc, - row_group_idx: usize, - ) -> BoxFuture<'_, Result>>>> { - async move { - let chunks = metadata.row_group(row_group_idx).columns(); - - let (offset, total_length) = - index_reader::get_location_offset_and_total_length(chunks)?; - - if total_length == 0 { - return Ok(None); - } - - let data = self - .get_bytes(offset as usize..offset as usize + total_length) - .await?; - let mut d = Cursor::new(data); - let mut result = vec![]; - - for _ in 0..chunks.len() { - let mut prot = TCompactInputProtocol::new(&mut d); - let offset = OffsetIndex::read_from_in_protocol(&mut prot)?; - result.push(offset.page_locations); - } - Ok(Some(result)) - } - .boxed() - } } impl AsyncFileReader for Box { @@ -296,34 +221,94 @@ impl ArrowReaderBuilder> { Self::new_builder(AsyncReader(input), metadata, Default::default()) } - pub async fn new_with_index(mut input: T) -> Result { - let metadata = input.get_metadata().await?; + pub async fn new_with_options( + mut input: T, + options: ArrowReaderOptions, + ) -> Result { + let mut metadata = input.get_metadata().await?; + + if options.page_index + && metadata + .page_indexes() + .zip(metadata.offset_indexes()) + .is_none() + { + let mut fetch_ranges = vec![]; + let mut index_lengths: Vec> = vec![]; + + for rg in metadata.row_groups() { + let (loc_offset, loc_length) = + index_reader::get_location_offset_and_total_length(rg.columns())?; - let mut row_groups = metadata.row_groups().to_vec(); + let (idx_offset, idx_lengths) = + index_reader::get_index_offset_and_lengths(rg.columns())?; + let idx_length = idx_lengths.iter().sum::(); - let mut columns_indexes = vec![]; - let mut offset_indexes = vec![]; + // If index data is missing, return without any indexes + if loc_length == 0 || idx_length == 0 { + return Self::new_builder(AsyncReader(input), metadata, options); + } - for (idx, rg) in row_groups.iter_mut().enumerate() { - let column_index = input.get_column_indexes(metadata.clone(), idx).await?; + fetch_ranges.push(loc_offset as usize..loc_offset as usize + loc_length); + fetch_ranges.push(idx_offset as usize..idx_offset as usize + idx_length); + index_lengths.push(idx_lengths); + } + + let mut chunks = input.get_byte_ranges(fetch_ranges).await?.into_iter(); + let mut index_lengths = index_lengths.into_iter(); + + let mut row_groups = metadata.row_groups().to_vec(); + + let mut columns_indexes = vec![]; + let mut offset_indexes = vec![]; + + for rg in row_groups.iter_mut() { + let columns = rg.columns(); + + let location_data = chunks.next().unwrap(); + let mut cursor = Cursor::new(location_data); + let mut offset_index = vec![]; + + for _ in 0..columns.len() { + let mut prot = TCompactInputProtocol::new(&mut cursor); + let offset = OffsetIndex::read_from_in_protocol(&mut prot)?; + offset_index.push(offset.page_locations); + } - columns_indexes.push(column_index); - if let Some(offset_index) = - input.get_page_locations(metadata.clone(), idx).await? - { rg.set_page_offset(offset_index.clone()); offset_indexes.push(offset_index); + + let index_data = chunks.next().unwrap(); + let index_lengths = index_lengths.next().unwrap(); + + let mut start = 0; + let data = index_lengths.into_iter().map(|length| { + let r = index_data.slice(start..start + length); + start += length; + r + }); + + let indexes = rg + .columns() + .iter() + .zip(data) + .map(|(column, data)| { + let column_type = column.column_type(); + index_reader::deserialize_column_index(&data, column_type) + }) + .collect::>>()?; + columns_indexes.push(indexes); } - } - let metadata = Arc::new(ParquetMetaData::new_with_page_index( - metadata.file_metadata().clone(), - row_groups, - Some(columns_indexes), - Some(offset_indexes), - )); + metadata = Arc::new(ParquetMetaData::new_with_page_index( + metadata.file_metadata().clone(), + row_groups, + Some(columns_indexes), + Some(offset_indexes), + )); + } - Self::new_builder(AsyncReader(input), metadata, ArrowReaderOptions::new()) + Self::new_builder(AsyncReader(input), metadata, options) } /// Build a new [`ParquetRecordBatchStream`] @@ -900,9 +885,11 @@ mod tests { requests: Default::default(), }; - let builder = ParquetRecordBatchStreamBuilder::new_with_index(async_reader) - .await - .unwrap(); + let options = ArrowReaderOptions::new().with_page_index(true); + let builder = + ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options) + .await + .unwrap(); // The builder should have page and offset indexes loaded now let metadata_with_index = builder.metadata(); @@ -1046,14 +1033,16 @@ mod tests { let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 2]); - let stream = ParquetRecordBatchStreamBuilder::new_with_index(async_reader) - .await - .unwrap() - .with_projection(mask.clone()) - .with_batch_size(1024) - .with_row_filter(filter) - .build() - .unwrap(); + let options = ArrowReaderOptions::new().with_page_index(true); + let stream = + ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options) + .await + .unwrap() + .with_projection(mask.clone()) + .with_batch_size(1024) + .with_row_filter(filter) + .build() + .unwrap(); let batches: Vec = stream.try_collect().await.unwrap(); From b82a26e8c2f95f70a733eb1c530f070b3a954d4c Mon Sep 17 00:00:00 2001 From: Dan Harris Date: Fri, 19 Aug 2022 18:04:43 -0400 Subject: [PATCH 6/6] Fix bug in page skipping logic --- parquet/src/column/reader.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet/src/column/reader.rs b/parquet/src/column/reader.rs index 1432c72b53f..f96ccc3ea3e 100644 --- a/parquet/src/column/reader.rs +++ b/parquet/src/column/reader.rs @@ -312,7 +312,7 @@ where // If page has less rows than the remaining records to // be skipped, skip entire page - if metadata.num_rows < remaining { + if metadata.num_rows <= remaining { self.page_reader.skip_next_page()?; remaining -= metadata.num_rows; continue;