diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs index 1247e4399e6..5c4b38d6a0c 100644 --- a/parquet/src/arrow/arrow_reader/mod.rs +++ b/parquet/src/arrow/arrow_reader/mod.rs @@ -207,7 +207,7 @@ pub trait ArrowReader { #[derive(Debug, Clone, Default)] pub struct ArrowReaderOptions { skip_arrow_metadata: bool, - page_index: bool, + pub(crate) page_index: bool, } impl ArrowReaderOptions { diff --git a/parquet/src/arrow/arrow_reader/selection.rs b/parquet/src/arrow/arrow_reader/selection.rs index f3d11d92595..b6ee273ab56 100644 --- a/parquet/src/arrow/arrow_reader/selection.rs +++ b/parquet/src/arrow/arrow_reader/selection.rs @@ -162,7 +162,12 @@ impl RowSelection { current_selector = selectors.next(); } } else { - break; + if !(selector.skip || current_page_included) { + let start = page.offset as usize; + let end = start + page.compressed_page_size as usize; + ranges.push(start..end); + } + current_selector = selectors.next() } } @@ -564,5 +569,31 @@ mod tests { // assert_eq!(mask, vec![false, true, true, false, true, true, true]); assert_eq!(ranges, vec![10..20, 20..30, 40..50, 50..60, 60..70]); + + let selection = RowSelection::from(vec![ + // Skip first page + RowSelector::skip(10), + // Multiple selects in same page + RowSelector::select(3), + RowSelector::skip(3), + RowSelector::select(4), + // Select to page boundary + RowSelector::skip(5), + RowSelector::select(5), + // Skip full page past page boundary + RowSelector::skip(12), + // Select to final page bounday + RowSelector::select(12), + RowSelector::skip(1), + // Skip across final page boundary + RowSelector::skip(8), + // Select from final page + RowSelector::select(4), + ]); + + let ranges = selection.scan_ranges(&index); + + // assert_eq!(mask, vec![false, true, true, false, true, true, true]); + assert_eq!(ranges, vec![10..20, 20..30, 40..50, 50..60, 60..70]); } } diff --git a/parquet/src/arrow/async_reader.rs b/parquet/src/arrow/async_reader.rs index 7bcc5503823..b0d9143d64d 100644 --- a/parquet/src/arrow/async_reader.rs +++ b/parquet/src/arrow/async_reader.rs @@ -78,7 +78,7 @@ use std::collections::VecDeque; use std::fmt::Formatter; -use std::io::SeekFrom; +use std::io::{Cursor, SeekFrom}; use std::ops::Range; use std::pin::Pin; use std::sync::Arc; @@ -88,6 +88,8 @@ use bytes::{Buf, Bytes}; use futures::future::{BoxFuture, FutureExt}; use futures::ready; use futures::stream::Stream; +use parquet_format::OffsetIndex; +use thrift::protocol::TCompactInputProtocol; use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt}; @@ -96,8 +98,8 @@ use arrow::record_batch::RecordBatch; use crate::arrow::array_reader::{build_array_reader, RowGroupCollection}; use crate::arrow::arrow_reader::{ - evaluate_predicate, selects_any, ArrowReaderBuilder, ParquetRecordBatchReader, - RowFilter, RowSelection, + evaluate_predicate, selects_any, ArrowReaderBuilder, ArrowReaderOptions, + ParquetRecordBatchReader, RowFilter, RowSelection, }; use crate::arrow::ProjectionMask; @@ -108,6 +110,7 @@ use crate::file::footer::{decode_footer, decode_metadata}; use crate::file::metadata::{ParquetMetaData, RowGroupMetaData}; use crate::file::reader::{ChunkReader, Length, SerializedPageReader}; +use crate::file::page_index::index_reader; use crate::file::FOOTER_SIZE; use crate::schema::types::{ColumnDescPtr, SchemaDescPtr}; @@ -218,6 +221,96 @@ impl ArrowReaderBuilder> { Self::new_builder(AsyncReader(input), metadata, Default::default()) } + pub async fn new_with_options( + mut input: T, + options: ArrowReaderOptions, + ) -> Result { + let mut metadata = input.get_metadata().await?; + + if options.page_index + && metadata + .page_indexes() + .zip(metadata.offset_indexes()) + .is_none() + { + let mut fetch_ranges = vec![]; + let mut index_lengths: Vec> = vec![]; + + for rg in metadata.row_groups() { + let (loc_offset, loc_length) = + index_reader::get_location_offset_and_total_length(rg.columns())?; + + let (idx_offset, idx_lengths) = + index_reader::get_index_offset_and_lengths(rg.columns())?; + let idx_length = idx_lengths.iter().sum::(); + + // If index data is missing, return without any indexes + if loc_length == 0 || idx_length == 0 { + return Self::new_builder(AsyncReader(input), metadata, options); + } + + fetch_ranges.push(loc_offset as usize..loc_offset as usize + loc_length); + fetch_ranges.push(idx_offset as usize..idx_offset as usize + idx_length); + index_lengths.push(idx_lengths); + } + + let mut chunks = input.get_byte_ranges(fetch_ranges).await?.into_iter(); + let mut index_lengths = index_lengths.into_iter(); + + let mut row_groups = metadata.row_groups().to_vec(); + + let mut columns_indexes = vec![]; + let mut offset_indexes = vec![]; + + for rg in row_groups.iter_mut() { + let columns = rg.columns(); + + let location_data = chunks.next().unwrap(); + let mut cursor = Cursor::new(location_data); + let mut offset_index = vec![]; + + for _ in 0..columns.len() { + let mut prot = TCompactInputProtocol::new(&mut cursor); + let offset = OffsetIndex::read_from_in_protocol(&mut prot)?; + offset_index.push(offset.page_locations); + } + + rg.set_page_offset(offset_index.clone()); + offset_indexes.push(offset_index); + + let index_data = chunks.next().unwrap(); + let index_lengths = index_lengths.next().unwrap(); + + let mut start = 0; + let data = index_lengths.into_iter().map(|length| { + let r = index_data.slice(start..start + length); + start += length; + r + }); + + let indexes = rg + .columns() + .iter() + .zip(data) + .map(|(column, data)| { + let column_type = column.column_type(); + index_reader::deserialize_column_index(&data, column_type) + }) + .collect::>>()?; + columns_indexes.push(indexes); + } + + metadata = Arc::new(ParquetMetaData::new_with_page_index( + metadata.file_metadata().clone(), + row_groups, + Some(columns_indexes), + Some(offset_indexes), + )); + } + + Self::new_builder(AsyncReader(input), metadata, options) + } + /// Build a new [`ParquetRecordBatchStream`] pub fn build(self) -> Result> { let num_row_groups = self.metadata.row_groups().len(); @@ -493,13 +586,26 @@ impl<'a> InMemoryRowGroup<'a> { let fetch_ranges = self .column_chunks .iter() + .zip(self.metadata.columns()) .enumerate() .into_iter() - .filter_map(|(idx, chunk)| { + .filter_map(|(idx, (chunk, chunk_meta))| { (chunk.is_none() && projection.leaf_included(idx)).then(|| { - let ranges = selection.scan_ranges(&page_locations[idx]); + // If the first page does not start at the beginning of the column, + // then we need to also fetch a dictionary page. + let mut ranges = vec![]; + let (start, _len) = chunk_meta.byte_range(); + match page_locations[idx].first() { + Some(first) if first.offset as u64 != start => { + ranges.push(start as usize..first.offset as usize); + } + _ => (), + } + + ranges.extend(selection.scan_ranges(&page_locations[idx])); page_start_offsets .push(ranges.iter().map(|range| range.start).collect()); + ranges }) }) @@ -687,7 +793,6 @@ mod tests { use crate::file::page_index::index_reader; use arrow::array::{Array, ArrayRef, Int32Array, StringArray}; use arrow::error::Result as ArrowResult; - use futures::TryStreamExt; use std::sync::Mutex; @@ -763,6 +868,70 @@ mod tests { ); } + #[tokio::test] + async fn test_async_reader_with_index() { + let testdata = arrow::util::test_util::parquet_test_data(); + let path = format!("{}/alltypes_tiny_pages_plain.parquet", testdata); + let data = Bytes::from(std::fs::read(path).unwrap()); + + let metadata = parse_metadata(&data).unwrap(); + let metadata = Arc::new(metadata); + + assert_eq!(metadata.num_row_groups(), 1); + + let async_reader = TestReader { + data: data.clone(), + metadata: metadata.clone(), + requests: Default::default(), + }; + + let options = ArrowReaderOptions::new().with_page_index(true); + let builder = + ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options) + .await + .unwrap(); + + // The builder should have page and offset indexes loaded now + let metadata_with_index = builder.metadata(); + + // Check offset indexes are present for all columns + for rg in metadata_with_index.row_groups() { + let page_locations = rg + .page_offset_index() + .as_ref() + .expect("expected page offset index"); + assert_eq!(page_locations.len(), rg.columns().len()) + } + + // Check page indexes are present for all columns + let page_indexes = metadata_with_index + .page_indexes() + .expect("expected page indexes"); + for (idx, rg) in metadata_with_index.row_groups().iter().enumerate() { + assert_eq!(page_indexes[idx].len(), rg.columns().len()) + } + + let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]); + let stream = builder + .with_projection(mask.clone()) + .with_batch_size(1024) + .build() + .unwrap(); + + let async_batches: Vec<_> = stream.try_collect().await.unwrap(); + + let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data) + .unwrap() + .with_projection(mask) + .with_batch_size(1024) + .build() + .unwrap() + .collect::>>() + .unwrap(); + + assert_eq!(async_batches, sync_batches); + } + #[tokio::test] async fn test_row_filter() { let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]); @@ -832,6 +1001,56 @@ mod tests { assert_eq!(requests.lock().unwrap().len(), 3); } + #[tokio::test] + async fn test_row_filter_with_index() { + let testdata = arrow::util::test_util::parquet_test_data(); + let path = format!("{}/alltypes_tiny_pages_plain.parquet", testdata); + let data = Bytes::from(std::fs::read(path).unwrap()); + + let metadata = parse_metadata(&data).unwrap(); + let parquet_schema = metadata.file_metadata().schema_descr_ptr(); + let metadata = Arc::new(metadata); + + assert_eq!(metadata.num_row_groups(), 1); + + let async_reader = TestReader { + data: data.clone(), + metadata: metadata.clone(), + requests: Default::default(), + }; + + let a_filter = ArrowPredicateFn::new( + ProjectionMask::leaves(&parquet_schema, vec![1]), + |batch| arrow::compute::eq_dyn_bool_scalar(batch.column(0), true), + ); + + let b_filter = ArrowPredicateFn::new( + ProjectionMask::leaves(&parquet_schema, vec![2]), + |batch| arrow::compute::eq_dyn_scalar(batch.column(0), 2_i32), + ); + + let filter = RowFilter::new(vec![Box::new(a_filter), Box::new(b_filter)]); + + let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 2]); + + let options = ArrowReaderOptions::new().with_page_index(true); + let stream = + ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options) + .await + .unwrap() + .with_projection(mask.clone()) + .with_batch_size(1024) + .with_row_filter(filter) + .build() + .unwrap(); + + let batches: Vec = stream.try_collect().await.unwrap(); + + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + + assert_eq!(total_rows, 730); + } + #[tokio::test] async fn test_in_memory_row_group_sparse() { let testdata = arrow::util::test_util::parquet_test_data(); @@ -882,7 +1101,7 @@ mod tests { let mut skip = true; let mut pages = offset_index[0].iter().peekable(); - // Setup `RowSelection` so that we can skip every other page + // Setup `RowSelection` so that we can skip every other page, selecting the last page let mut selectors = vec![]; let mut expected_page_requests: Vec> = vec![]; while let Some(page) = pages.next() { @@ -906,7 +1125,7 @@ mod tests { let selection = RowSelection::from(selectors); let (_factory, _reader) = reader_factory - .read_row_group(0, Some(selection), projection, 48) + .read_row_group(0, Some(selection), projection.clone(), 48) .await .expect("reading row group"); diff --git a/parquet/src/column/reader.rs b/parquet/src/column/reader.rs index 1432c72b53f..f96ccc3ea3e 100644 --- a/parquet/src/column/reader.rs +++ b/parquet/src/column/reader.rs @@ -312,7 +312,7 @@ where // If page has less rows than the remaining records to // be skipped, skip entire page - if metadata.num_rows < remaining { + if metadata.num_rows <= remaining { self.page_reader.skip_next_page()?; remaining -= metadata.num_rows; continue; diff --git a/parquet/src/file/page_index/index_reader.rs b/parquet/src/file/page_index/index_reader.rs index e3f37fbc661..e6a4e598102 100644 --- a/parquet/src/file/page_index/index_reader.rs +++ b/parquet/src/file/page_index/index_reader.rs @@ -34,8 +34,12 @@ pub fn read_columns_indexes( let (offset, lengths) = get_index_offset_and_lengths(chunks)?; let length = lengths.iter().sum::(); + if length == 0 { + return Ok(vec![Index::NONE; chunks.len()]); + } + //read all need data into buffer - let mut reader = reader.get_read(offset, reader.len() as usize)?; + let mut reader = reader.get_read(offset, length)?; let mut data = vec![0; length]; reader.read_exact(&mut data)?; @@ -64,6 +68,10 @@ pub fn read_pages_locations( ) -> Result>, ParquetError> { let (offset, total_length) = get_location_offset_and_total_length(chunks)?; + if total_length == 0 { + return Ok(vec![]); + } + //read all need data into buffer let mut reader = reader.get_read(offset, total_length)?; let mut data = vec![0; total_length]; @@ -82,7 +90,7 @@ pub fn read_pages_locations( //Get File offsets of every ColumnChunk's page_index //If there are invalid offset return a zero offset with empty lengths. -fn get_index_offset_and_lengths( +pub(crate) fn get_index_offset_and_lengths( chunks: &[ColumnChunkMetaData], ) -> Result<(u64, Vec), ParquetError> { let first_col_metadata = if let Some(chunk) = chunks.first() { @@ -111,7 +119,7 @@ fn get_index_offset_and_lengths( //Get File offset of ColumnChunk's pages_locations //If there are invalid offset return a zero offset with zero length. -fn get_location_offset_and_total_length( +pub(crate) fn get_location_offset_and_total_length( chunks: &[ColumnChunkMetaData], ) -> Result<(u64, usize), ParquetError> { let metadata = if let Some(chunk) = chunks.first() { @@ -133,7 +141,7 @@ fn get_location_offset_and_total_length( Ok((offset, total_length)) } -fn deserialize_column_index( +pub(crate) fn deserialize_column_index( data: &[u8], column_type: Type, ) -> Result {