diff --git a/parquet/src/arrow/async_reader.rs b/parquet/src/arrow/async_reader.rs index b251c2a827e..923f329eff2 100644 --- a/parquet/src/arrow/async_reader.rs +++ b/parquet/src/arrow/async_reader.rs @@ -552,7 +552,7 @@ impl PageReader for InMemoryColumnChunkReader { Ok(None) } - fn peek_next_page(&self) -> Result> { + fn peek_next_page(&mut self) -> Result> { Err(nyi_err!("https://github.com/apache/arrow-rs/issues/1792")) } diff --git a/parquet/src/column/page.rs b/parquet/src/column/page.rs index d667af7120a..78890f36a47 100644 --- a/parquet/src/column/page.rs +++ b/parquet/src/column/page.rs @@ -205,7 +205,7 @@ pub trait PageReader: Iterator> + Send { /// Gets metadata about the next page, returns an error if no /// column index information - fn peek_next_page(&self) -> Result>; + fn peek_next_page(&mut self) -> Result>; /// Skips reading the next page, returns an error if no /// column index information diff --git a/parquet/src/file/metadata.rs b/parquet/src/file/metadata.rs index ad8fe16ad8b..bffe538cc72 100644 --- a/parquet/src/file/metadata.rs +++ b/parquet/src/file/metadata.rs @@ -228,7 +228,7 @@ pub struct RowGroupMetaData { num_rows: i64, total_byte_size: i64, schema_descr: SchemaDescPtr, - // Todo add filter result -> row range + page_offset_index: Option>>, } impl RowGroupMetaData { @@ -267,6 +267,11 @@ impl RowGroupMetaData { self.columns.iter().map(|c| c.total_compressed_size).sum() } + /// Returns reference of page offset index of all column in this row group. + pub fn page_offset_index(&self) -> &Option>> { + &self.page_offset_index + } + /// Returns reference to a schema descriptor. pub fn schema_descr(&self) -> &SchemaDescriptor { self.schema_descr.as_ref() @@ -277,6 +282,11 @@ impl RowGroupMetaData { self.schema_descr.clone() } + /// Sets page offset index for this row group. + pub fn set_page_offset(&mut self, page_offset: Vec>) { + self.page_offset_index = Some(page_offset); + } + /// Method to convert from Thrift. pub fn from_thrift( schema_descr: SchemaDescPtr, @@ -295,6 +305,7 @@ impl RowGroupMetaData { num_rows, total_byte_size, schema_descr, + page_offset_index: None, }) } @@ -318,6 +329,7 @@ pub struct RowGroupMetaDataBuilder { schema_descr: SchemaDescPtr, num_rows: i64, total_byte_size: i64, + page_offset_index: Option>>, } impl RowGroupMetaDataBuilder { @@ -328,6 +340,7 @@ impl RowGroupMetaDataBuilder { schema_descr, num_rows: 0, total_byte_size: 0, + page_offset_index: None, } } @@ -349,6 +362,12 @@ impl RowGroupMetaDataBuilder { self } + /// Sets page offset index for this row group. + pub fn set_page_offset(mut self, page_offset: Vec>) -> Self { + self.page_offset_index = Some(page_offset); + self + } + /// Builds row group metadata. pub fn build(self) -> Result { if self.schema_descr.num_columns() != self.columns.len() { @@ -364,6 +383,7 @@ impl RowGroupMetaDataBuilder { num_rows: self.num_rows, total_byte_size: self.total_byte_size, schema_descr: self.schema_descr, + page_offset_index: self.page_offset_index, }) } } diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs index c0f7c3926a5..766813f11ae 100644 --- a/parquet/src/file/serialized_reader.rs +++ b/parquet/src/file/serialized_reader.rs @@ -19,9 +19,10 @@ //! Also contains implementations of the ChunkReader for files (with buffering) and byte arrays (RAM) use bytes::{Buf, Bytes}; +use std::collections::VecDeque; use std::{convert::TryFrom, fs::File, io::Read, path::Path, sync::Arc}; -use parquet_format::{PageHeader, PageType}; +use parquet_format::{PageHeader, PageLocation, PageType}; use thrift::protocol::TCompactInputProtocol; use crate::basic::{Compression, Encoding, Type}; @@ -33,6 +34,7 @@ use crate::file::{footer, metadata::*, reader::*, statistics}; use crate::record::reader::RowIter; use crate::record::Row; use crate::schema::types::Type as SchemaType; +use crate::util::page_util::{calculate_row_count, get_pages_readable_slices}; use crate::util::{io::TryClone, memory::ByteBufferPtr}; // export `SliceableCursor` and `FileSource` publically so clients can @@ -251,11 +253,12 @@ impl SerializedFileReader { let mut columns_indexes = vec![]; let mut offset_indexes = vec![]; - for rg in &filtered_row_groups { + for rg in &mut filtered_row_groups { let column_index = index_reader::read_columns_indexes(&chunk_reader, rg.columns())?; let offset_index = index_reader::read_pages_locations(&chunk_reader, rg.columns())?; + rg.set_page_offset(offset_index.clone()); columns_indexes.push(column_index); offset_indexes.push(offset_index); } @@ -346,14 +349,31 @@ impl<'a, R: 'static + ChunkReader> RowGroupReader for SerializedRowGroupReader<' fn get_column_page_reader(&self, i: usize) -> Result> { let col = self.metadata.column(i); let (col_start, col_length) = col.byte_range(); - //Todo filter with multi row range - let file_chunk = self.chunk_reader.get_read(col_start, col_length as usize)?; - let page_reader = SerializedPageReader::new( - file_chunk, - col.num_values(), - col.compression(), - col.column_descr().physical_type(), - )?; + let page_reader = if let Some(offset_index) = self.metadata.page_offset_index() { + let col_chunk_offset_index = &offset_index[i]; + let (page_bufs, has_dict) = get_pages_readable_slices( + col_chunk_offset_index, + col_start, + self.chunk_reader.clone(), + )?; + SerializedPageReader::new_with_page_offsets( + col.num_values(), + col.compression(), + col.column_descr().physical_type(), + col_chunk_offset_index.clone(), + has_dict, + page_bufs, + )? + } else { + let file_chunk = + self.chunk_reader.get_read(col_start, col_length as usize)?; + SerializedPageReader::new( + file_chunk, + col.num_values(), + col.compression(), + col.column_descr().physical_type(), + )? + }; Ok(Box::new(page_reader)) } @@ -464,11 +484,23 @@ pub(crate) fn decode_page( Ok(result) } +enum SerializedPages { + /// Read entire chunk + Chunk { buf: T }, + /// Read operate pages which can skip. + Pages { + offset_index: Vec, + seen_num_data_pages: usize, + has_dictionary_page_to_read: bool, + page_bufs: VecDeque, + }, +} + /// A serialized implementation for Parquet [`PageReader`]. pub struct SerializedPageReader { // The file source buffer which references exactly the bytes for the column trunk // to be read by this page reader. - buf: T, + buf: SerializedPages, // The compression codec for this column chunk. Only set for non-PLAIN codec. decompressor: Option>, @@ -493,7 +525,32 @@ impl SerializedPageReader { ) -> Result { let decompressor = create_codec(compression)?; let result = Self { - buf, + buf: SerializedPages::Chunk { buf }, + total_num_values, + seen_num_values: 0, + decompressor, + physical_type, + }; + Ok(result) + } + + /// Creates a new serialized page reader from file source. + pub fn new_with_page_offsets( + total_num_values: i64, + compression: Compression, + physical_type: Type, + offset_index: Vec, + has_dictionary_page_to_read: bool, + page_bufs: VecDeque, + ) -> Result { + let decompressor = create_codec(compression)?; + let result = Self { + buf: SerializedPages::Pages { + offset_index, + seen_num_data_pages: 0, + has_dictionary_page_to_read, + page_bufs, + }, total_num_values, seen_num_values: 0, decompressor, @@ -513,14 +570,35 @@ impl Iterator for SerializedPageReader { impl PageReader for SerializedPageReader { fn get_next_page(&mut self) -> Result> { + let mut cursor; + let mut dictionary_cursor; while self.seen_num_values < self.total_num_values { - let page_header = read_page_header(&mut self.buf)?; + match &mut self.buf { + SerializedPages::Chunk { buf } => { + cursor = buf; + } + SerializedPages::Pages { + offset_index, + seen_num_data_pages, + has_dictionary_page_to_read, + page_bufs, + } => { + if offset_index.len() <= *seen_num_data_pages { + return Ok(None); + } else if *seen_num_data_pages == 0 && *has_dictionary_page_to_read { + dictionary_cursor = page_bufs.pop_front().unwrap(); + cursor = &mut dictionary_cursor; + } else { + cursor = page_bufs.get_mut(*seen_num_data_pages).unwrap(); + } + } + } + + let page_header = read_page_header(cursor)?; let to_read = page_header.compressed_page_size as usize; let mut buffer = Vec::with_capacity(to_read); - let read = (&mut self.buf) - .take(to_read as u64) - .read_to_end(&mut buffer)?; + let read = cursor.take(to_read as u64).read_to_end(&mut buffer)?; if read != to_read { return Err(eof_err!( @@ -540,14 +618,30 @@ impl PageReader for SerializedPageReader { self.decompressor.as_mut(), )?; self.seen_num_values += decoded.num_values() as i64; + if let SerializedPages::Pages { + seen_num_data_pages, + .. + } = &mut self.buf + { + *seen_num_data_pages += 1; + } decoded } - PageType::DictionaryPage => decode_page( - page_header, - buffer, - self.physical_type, - self.decompressor.as_mut(), - )?, + PageType::DictionaryPage => { + if let SerializedPages::Pages { + has_dictionary_page_to_read, + .. + } = &mut self.buf + { + *has_dictionary_page_to_read = false; + } + decode_page( + page_header, + buffer, + self.physical_type, + self.decompressor.as_mut(), + )? + } _ => { // For unknown page type (e.g., INDEX_PAGE), skip and read next. continue; @@ -560,12 +654,49 @@ impl PageReader for SerializedPageReader { Ok(None) } - fn peek_next_page(&self) -> Result> { - Err(nyi_err!("https://github.com/apache/arrow-rs/issues/1792")) + fn peek_next_page(&mut self) -> Result> { + match &mut self.buf { + SerializedPages::Chunk { .. } => { Err(general_err!("Must set page_offset_index when using peek_next_page in SerializedPageReader.")) } + SerializedPages::Pages { offset_index, seen_num_data_pages, has_dictionary_page_to_read, .. } => { + if *seen_num_data_pages >= offset_index.len() { + Ok(None) + } else if *seen_num_data_pages == 0 && *has_dictionary_page_to_read { + // Will set `has_dictionary_page_to_read` false in `get_next_page`, + // assume dictionary page must be read and cannot be skipped. + Ok(Some(PageMetadata { + num_rows: usize::MIN, + is_dict: true, + })) + } else { + let row_count = calculate_row_count( + offset_index, + *seen_num_data_pages, + self.total_num_values, + )?; + Ok(Some(PageMetadata { + num_rows: row_count, + is_dict: false, + })) + } + } + } } fn skip_next_page(&mut self) -> Result<()> { - Err(nyi_err!("https://github.com/apache/arrow-rs/issues/1792")) + match &mut self.buf { + SerializedPages::Chunk { .. } => { Err(general_err!("Must set page_offset_index when using skip_next_page in SerializedPageReader.")) } + SerializedPages::Pages { offset_index, seen_num_data_pages, .. } => { + if offset_index.len() <= *seen_num_data_pages { + Err(general_err!( + "seen_num_data_pages is out of bound in SerializedPageReader." + )) + } else { + *seen_num_data_pages += 1; + // Notice: maybe need 'self.seen_num_values += xxx', for now we can not get skip values in skip_next_page. + Ok(()) + } + } + } } } @@ -1323,4 +1454,76 @@ mod tests { let statistics = r.column(col_num).statistics().unwrap(); (statistics.min_bytes(), statistics.max_bytes()) } + + #[test] + fn test_skip_page_with_offset_index() { + let test_file = get_test_file("alltypes_tiny_pages_plain.parquet"); + let builder = ReadOptionsBuilder::new(); + //enable read page index + let options = builder.with_page_index().build(); + let reader_result = SerializedFileReader::new_with_options(test_file, options); + let reader = reader_result.unwrap(); + + let row_group_reader = reader.get_row_group(0).unwrap(); + + //use 'int_col', Boundary order: ASCENDING, total 325 pages. + let mut column_page_reader = row_group_reader.get_column_page_reader(4).unwrap(); + + let mut vec = vec![]; + + for i in 0..325 { + if i % 2 == 0 { + vec.push(column_page_reader.get_next_page().unwrap().unwrap()); + } else { + column_page_reader.skip_next_page().unwrap(); + } + } + //check read all pages. + assert!(column_page_reader.peek_next_page().unwrap().is_none()); + assert!(column_page_reader.get_next_page().unwrap().is_none()); + + assert_eq!(vec.len(), 163); + } + + #[test] + fn test_peek_page_with_dictionary_page() { + let test_file = get_test_file("alltypes_tiny_pages.parquet"); + let builder = ReadOptionsBuilder::new(); + //enable read page index + let options = builder.with_page_index().build(); + let reader_result = SerializedFileReader::new_with_options(test_file, options); + let reader = reader_result.unwrap(); + let row_group_reader = reader.get_row_group(0).unwrap(); + + //use 'string_col', Boundary order: UNORDERED, total 352 data ages and 1 dictionary page. + let mut column_page_reader = row_group_reader.get_column_page_reader(9).unwrap(); + + let mut vec = vec![]; + + let meta = column_page_reader.peek_next_page().unwrap().unwrap(); + assert!(meta.is_dict); + let page = column_page_reader.get_next_page().unwrap().unwrap(); + assert!(matches!(page.page_type(), basic::PageType::DICTIONARY_PAGE)); + + for i in 0..352 { + let meta = column_page_reader.peek_next_page().unwrap().unwrap(); + // have checked with `parquet-tools column-index -c string_col ./alltypes_tiny_pages.parquet` + // page meta has two scenarios(21, 20) of num_rows expect last page has 11 rows. + if i != 351 { + assert!((meta.num_rows == 21) || (meta.num_rows == 20)); + } else { + assert_eq!(meta.num_rows, 11); + } + assert!(!meta.is_dict); + vec.push(meta); + let page = column_page_reader.get_next_page().unwrap().unwrap(); + assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE)); + } + + //check read all pages. + assert!(column_page_reader.peek_next_page().unwrap().is_none()); + assert!(column_page_reader.get_next_page().unwrap().is_none()); + + assert_eq!(vec.len(), 352); + } } diff --git a/parquet/src/util/mod.rs b/parquet/src/util/mod.rs index 3a69df4360b..b49e3251692 100644 --- a/parquet/src/util/mod.rs +++ b/parquet/src/util/mod.rs @@ -24,6 +24,8 @@ pub mod cursor; pub mod hash_util; #[cfg(any(test, feature = "test_common"))] pub(crate) mod test_common; +pub(crate)mod page_util; + #[cfg(any(test, feature = "test_common"))] pub use self::test_common::page_util::{ DataPageBuilder, DataPageBuilderImpl, InMemoryPageIterator, diff --git a/parquet/src/util/page_util.rs b/parquet/src/util/page_util.rs new file mode 100644 index 00000000000..5cdcf7535c6 --- /dev/null +++ b/parquet/src/util/page_util.rs @@ -0,0 +1,54 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::VecDeque; +use std::io::Read; +use std::sync::Arc; +use crate::errors::Result; +use parquet_format::PageLocation; +use crate::file::reader::ChunkReader; + +/// Use column chunk's offset index to get the `page_num` page row count. +pub(crate) fn calculate_row_count(indexes: &[PageLocation], page_num: usize, total_row_count: i64) -> Result { + if page_num == indexes.len() - 1 { + Ok((total_row_count - indexes[page_num].first_row_index + 1) as usize) + } else { + Ok((indexes[page_num + 1].first_row_index - indexes[page_num].first_row_index) as usize) + } +} + +/// Use column chunk's offset index to get each page serially readable slice +/// and a flag indicates whether having one dictionary page in this column chunk. +pub(crate) fn get_pages_readable_slices>(col_chunk_offset_index: &[PageLocation], col_start: u64, chunk_reader: Arc) -> Result<(VecDeque, bool)> { + let first_data_page_offset = col_chunk_offset_index[0].offset as u64; + let has_dictionary_page = first_data_page_offset != col_start; + let mut page_readers = VecDeque::with_capacity(col_chunk_offset_index.len() + 1); + + if has_dictionary_page { + let length = (first_data_page_offset - col_start) as usize; + let reader: T = chunk_reader.get_read(col_start, length)?; + page_readers.push_back(reader); + } + + for index in col_chunk_offset_index { + let start = index.offset as u64; + let length = index.compressed_page_size as usize; + let reader: T = chunk_reader.get_read(start, length)?; + page_readers.push_back(reader) + } + Ok((page_readers, has_dictionary_page)) +} diff --git a/parquet/src/util/test_common/page_util.rs b/parquet/src/util/test_common/page_util.rs index 0b70c38ad0e..f56eaf85e63 100644 --- a/parquet/src/util/test_common/page_util.rs +++ b/parquet/src/util/test_common/page_util.rs @@ -173,7 +173,7 @@ impl + Send> PageReader for InMemoryPageReader

{ Ok(self.page_iter.next()) } - fn peek_next_page(&self) -> Result> { + fn peek_next_page(&mut self) -> Result> { unimplemented!() }