diff --git a/parquet/src/arrow/async_reader.rs b/parquet/src/arrow/async_reader.rs index b251c2a827e..923f329eff2 100644 --- a/parquet/src/arrow/async_reader.rs +++ b/parquet/src/arrow/async_reader.rs @@ -552,7 +552,7 @@ impl PageReader for InMemoryColumnChunkReader { Ok(None) } - fn peek_next_page(&self) -> Result> { + fn peek_next_page(&mut self) -> Result> { Err(nyi_err!("https://github.com/apache/arrow-rs/issues/1792")) } diff --git a/parquet/src/column/page.rs b/parquet/src/column/page.rs index d667af7120a..78890f36a47 100644 --- a/parquet/src/column/page.rs +++ b/parquet/src/column/page.rs @@ -205,7 +205,7 @@ pub trait PageReader: Iterator> + Send { /// Gets metadata about the next page, returns an error if no /// column index information - fn peek_next_page(&self) -> Result>; + fn peek_next_page(&mut self) -> Result>; /// Skips reading the next page, returns an error if no /// column index information diff --git a/parquet/src/column/writer.rs b/parquet/src/column/writer.rs index 5def721353a..0c56f75f374 100644 --- a/parquet/src/column/writer.rs +++ b/parquet/src/column/writer.rs @@ -1792,6 +1792,7 @@ mod tests { 7, Compression::UNCOMPRESSED, Type::INT32, + i64::MIN, ) .unwrap(); @@ -1975,6 +1976,7 @@ mod tests { data.len() as i64, Compression::UNCOMPRESSED, Int32Type::get_physical_type(), + i64::MIN, ) .unwrap(), ); @@ -2358,6 +2360,7 @@ mod tests { column_metadata.num_values(), column_metadata.compression(), T::get_physical_type(), + i64::MIN, ) .unwrap(), ); diff --git a/parquet/src/file/metadata.rs b/parquet/src/file/metadata.rs index ad8fe16ad8b..05e46b289f8 100644 --- a/parquet/src/file/metadata.rs +++ b/parquet/src/file/metadata.rs @@ -228,7 +228,7 @@ pub struct RowGroupMetaData { num_rows: i64, total_byte_size: i64, schema_descr: SchemaDescPtr, - // Todo add filter result -> row range + page_offset_index: Option>>, } impl RowGroupMetaData { @@ -267,6 +267,11 @@ impl RowGroupMetaData { self.columns.iter().map(|c| c.total_compressed_size).sum() } + /// Returns reference of page offset index. + pub fn page_offset_index(&self) -> &Option>> { + &self.page_offset_index + } + /// Returns reference to a schema descriptor. pub fn schema_descr(&self) -> &SchemaDescriptor { self.schema_descr.as_ref() @@ -277,6 +282,11 @@ impl RowGroupMetaData { self.schema_descr.clone() } + /// Sets page offset index for this row group. + pub fn set_page_offset(&mut self, page_offset: Vec>) { + self.page_offset_index = Some(page_offset); + } + /// Method to convert from Thrift. pub fn from_thrift( schema_descr: SchemaDescPtr, @@ -295,6 +305,7 @@ impl RowGroupMetaData { num_rows, total_byte_size, schema_descr, + page_offset_index: None, }) } @@ -318,6 +329,7 @@ pub struct RowGroupMetaDataBuilder { schema_descr: SchemaDescPtr, num_rows: i64, total_byte_size: i64, + page_offset_index: Option>>, } impl RowGroupMetaDataBuilder { @@ -328,6 +340,7 @@ impl RowGroupMetaDataBuilder { schema_descr, num_rows: 0, total_byte_size: 0, + page_offset_index: None, } } @@ -349,6 +362,12 @@ impl RowGroupMetaDataBuilder { self } + /// Sets page offset index for this row group. + pub fn set_page_offset(mut self, page_offset: Vec>) -> Self { + self.page_offset_index = Some(page_offset); + self + } + /// Builds row group metadata. pub fn build(self) -> Result { if self.schema_descr.num_columns() != self.columns.len() { @@ -364,6 +383,7 @@ impl RowGroupMetaDataBuilder { num_rows: self.num_rows, total_byte_size: self.total_byte_size, schema_descr: self.schema_descr, + page_offset_index: self.page_offset_index, }) } } diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs index c0f7c3926a5..c4a41f40c79 100644 --- a/parquet/src/file/serialized_reader.rs +++ b/parquet/src/file/serialized_reader.rs @@ -19,9 +19,9 @@ //! Also contains implementations of the ChunkReader for files (with buffering) and byte arrays (RAM) use bytes::{Buf, Bytes}; -use std::{convert::TryFrom, fs::File, io::Read, path::Path, sync::Arc}; +use std::{convert::TryFrom, fs::File, io, io::Read, path::Path, sync::Arc}; -use parquet_format::{PageHeader, PageType}; +use parquet_format::{PageHeader, PageLocation, PageType}; use thrift::protocol::TCompactInputProtocol; use crate::basic::{Compression, Encoding, Type}; @@ -37,6 +37,7 @@ use crate::util::{io::TryClone, memory::ByteBufferPtr}; // export `SliceableCursor` and `FileSource` publically so clients can // re-use the logic in their own ParquetFileWriter wrappers +use crate::util::page_util::calculate_row_count; #[allow(deprecated)] pub use crate::util::{cursor::SliceableCursor, io::FileSource}; @@ -251,11 +252,12 @@ impl SerializedFileReader { let mut columns_indexes = vec![]; let mut offset_indexes = vec![]; - for rg in &filtered_row_groups { + for rg in &mut filtered_row_groups { let column_index = index_reader::read_columns_indexes(&chunk_reader, rg.columns())?; let offset_index = index_reader::read_pages_locations(&chunk_reader, rg.columns())?; + rg.set_page_offset(offset_index.clone()); columns_indexes.push(column_index); offset_indexes.push(offset_index); } @@ -346,14 +348,22 @@ impl<'a, R: 'static + ChunkReader> RowGroupReader for SerializedRowGroupReader<' fn get_column_page_reader(&self, i: usize) -> Result> { let col = self.metadata.column(i); let (col_start, col_length) = col.byte_range(); - //Todo filter with multi row range let file_chunk = self.chunk_reader.get_read(col_start, col_length as usize)?; - let page_reader = SerializedPageReader::new( + let mut page_reader = SerializedPageReader::new( file_chunk, col.num_values(), col.compression(), col.column_descr().physical_type(), + self.metadata.num_rows(), )?; + if let Some(offset_index) = self.metadata.page_offset_index() { + let first_data_page_offset = offset_index[i][0].offset; + let has_dictionary_page = first_data_page_offset != col.file_offset(); + page_reader.with_page_offset_index_and_has_dictionary_to_read( + offset_index[i].clone(), + has_dictionary_page, + ); + } Ok(Box::new(page_reader)) } @@ -481,6 +491,19 @@ pub struct SerializedPageReader { // Column chunk type. physical_type: Type, + + // total row count. + num_rows: i64, + + //Page offset index. + page_offset_index: Option>, + + // The number of data pages we have seen so far, + // include the skipped page. + seen_num_data_pages: usize, + + // A flag to check whether a dictionary page should read first + has_dictionary_page_to_read: bool, } impl SerializedPageReader { @@ -490,6 +513,7 @@ impl SerializedPageReader { total_num_values: i64, compression: Compression, physical_type: Type, + num_rows: i64, ) -> Result { let decompressor = create_codec(compression)?; let result = Self { @@ -498,9 +522,22 @@ impl SerializedPageReader { seen_num_values: 0, decompressor, physical_type, + num_rows, + page_offset_index: None, + seen_num_data_pages: 0, + has_dictionary_page_to_read: false, }; Ok(result) } + + pub fn with_page_offset_index_and_has_dictionary_to_read( + &mut self, + offset_index: Vec, + has_dictionary_to_read: bool, + ) { + self.page_offset_index = Some(offset_index); + self.has_dictionary_page_to_read = has_dictionary_to_read; + } } impl Iterator for SerializedPageReader { @@ -514,6 +551,14 @@ impl Iterator for SerializedPageReader { impl PageReader for SerializedPageReader { fn get_next_page(&mut self) -> Result> { while self.seen_num_values < self.total_num_values { + // For now we can not update `seen_num_values` in `skip_next_page`, + // so we need add this check. + if let Some(indexes) = &self.page_offset_index { + if indexes.len() == self.seen_num_data_pages { + return Ok(None); + } + } + let page_header = read_page_header(&mut self.buf)?; let to_read = page_header.compressed_page_size as usize; @@ -540,6 +585,7 @@ impl PageReader for SerializedPageReader { self.decompressor.as_mut(), )?; self.seen_num_values += decoded.num_values() as i64; + self.seen_num_data_pages += 1; decoded } PageType::DictionaryPage => decode_page( @@ -560,12 +606,53 @@ impl PageReader for SerializedPageReader { Ok(None) } - fn peek_next_page(&self) -> Result> { - Err(nyi_err!("https://github.com/apache/arrow-rs/issues/1792")) + fn peek_next_page(&mut self) -> Result> { + if let Some(page_offset_index) = &self.page_offset_index { + if self.seen_num_data_pages == page_offset_index.len() { + Ok(None) + } else if self.seen_num_data_pages == 0 && self.has_dictionary_page_to_read { + self.has_dictionary_page_to_read = false; + Ok(Some(PageMetadata { + num_rows: usize::MIN, + is_dict: true, + })) + } else { + let row_count = calculate_row_count( + page_offset_index, + self.seen_num_data_pages, + self.total_num_values, + )?; + Ok(Some(PageMetadata { + num_rows: row_count, + is_dict: false, + })) + } + } else { + Err(general_err!("Must set page_offset_index when using peek_next_page in SerializedPageReader.")) + } } fn skip_next_page(&mut self) -> Result<()> { - Err(nyi_err!("https://github.com/apache/arrow-rs/issues/1792")) + if let Some(page_offset_index) = &self.page_offset_index { + let location = &page_offset_index[self.seen_num_data_pages]; + let compressed_page_size = location.compressed_page_size; + //skip page bytes + let skip_size = io::copy( + self.buf.by_ref().take(compressed_page_size as u64).by_ref(), + &mut io::sink(), + )?; + if skip_size == compressed_page_size as u64 { + self.seen_num_data_pages += 1; + // Notice: 'self.seen_num_values += xxx', for now we can not get skip values in skip_next_page. + Ok(()) + } else { + Err(general_err!( + "skip_next_page size is not equal compressed_page_size" + )) + } + } else { + Err(general_err!("Must set page_offset_index when using skip_next_page in SerializedPageReader.")) + } } } @@ -1323,4 +1410,76 @@ mod tests { let statistics = r.column(col_num).statistics().unwrap(); (statistics.min_bytes(), statistics.max_bytes()) } + + #[test] + fn test_skip_page_with_offset_index() { + let test_file = get_test_file("alltypes_tiny_pages_plain.parquet"); + let builder = ReadOptionsBuilder::new(); + //enable read page index + let options = builder.with_page_index().build(); + let reader_result = SerializedFileReader::new_with_options(test_file, options); + let reader = reader_result.unwrap(); + + let row_group_reader = reader.get_row_group(0).unwrap(); + + //use 'int_col', Boundary order: ASCENDING, total 325 pages. + let mut column_page_reader = row_group_reader.get_column_page_reader(4).unwrap(); + + let mut vec = vec![]; + + for i in 0..325 { + if i % 2 == 0 { + vec.push(column_page_reader.get_next_page().unwrap().unwrap()); + } else { + column_page_reader.skip_next_page().unwrap(); + } + } + //check read all pages. + assert!(column_page_reader.peek_next_page().unwrap().is_none()); + assert!(column_page_reader.get_next_page().unwrap().is_none()); + + assert_eq!(vec.len(), 163); + } + + #[test] + fn test_peek_page_with_dictionary_page() { + let test_file = get_test_file("alltypes_tiny_pages.parquet"); + let builder = ReadOptionsBuilder::new(); + //enable read page index + let options = builder.with_page_index().build(); + let reader_result = SerializedFileReader::new_with_options(test_file, options); + let reader = reader_result.unwrap(); + let row_group_reader = reader.get_row_group(0).unwrap(); + + //use 'string_col', Boundary order: UNORDERED, total 352 data ages and 1 dictionary page. + let mut column_page_reader = row_group_reader.get_column_page_reader(9).unwrap(); + + let mut vec = vec![]; + + let meta = column_page_reader.peek_next_page().unwrap().unwrap(); + assert!(meta.is_dict); + let page = column_page_reader.get_next_page().unwrap().unwrap(); + assert!(matches!(page.page_type(), basic::PageType::DICTIONARY_PAGE)); + + for i in 0..352 { + let meta = column_page_reader.peek_next_page().unwrap().unwrap(); + // have checked with `parquet-tools column-index -c string_col ./alltypes_tiny_pages.parquet` + // page meta has two scenarios(21, 20) of num_rows expect last page has 11 rows. + if i != 351 { + assert!((meta.num_rows == 21) || (meta.num_rows == 20)); + } else { + assert_eq!(meta.num_rows, 11); + } + assert!(!meta.is_dict); + vec.push(meta); + let page = column_page_reader.get_next_page().unwrap().unwrap(); + assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE)); + } + + //check read all pages. + assert!(column_page_reader.peek_next_page().unwrap().is_none()); + assert!(column_page_reader.get_next_page().unwrap().is_none()); + + assert_eq!(vec.len(), 352); + } } diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs index 10983c74135..d975f7e775d 100644 --- a/parquet/src/file/writer.rs +++ b/parquet/src/file/writer.rs @@ -1052,6 +1052,7 @@ mod tests { total_num_values, codec, physical_type, + i64::MIN, ) .unwrap(); diff --git a/parquet/src/util/mod.rs b/parquet/src/util/mod.rs index 3a69df4360b..b49e3251692 100644 --- a/parquet/src/util/mod.rs +++ b/parquet/src/util/mod.rs @@ -24,6 +24,8 @@ pub mod cursor; pub mod hash_util; #[cfg(any(test, feature = "test_common"))] pub(crate) mod test_common; +pub(crate)mod page_util; + #[cfg(any(test, feature = "test_common"))] pub use self::test_common::page_util::{ DataPageBuilder, DataPageBuilderImpl, InMemoryPageIterator, diff --git a/parquet/src/util/page_util.rs b/parquet/src/util/page_util.rs new file mode 100644 index 00000000000..c96d2b036b5 --- /dev/null +++ b/parquet/src/util/page_util.rs @@ -0,0 +1,27 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::errors::Result; +use parquet_format::PageLocation; + +pub(crate) fn calculate_row_count(indexes: &[PageLocation], page_num: usize, total_row_count: i64) -> Result { + if page_num == indexes.len() - 1 { + Ok((total_row_count - indexes[page_num].first_row_index + 1) as usize) + } else { + Ok((indexes[page_num + 1].first_row_index - indexes[page_num].first_row_index) as usize) + } +} diff --git a/parquet/src/util/test_common/page_util.rs b/parquet/src/util/test_common/page_util.rs index 0b70c38ad0e..f56eaf85e63 100644 --- a/parquet/src/util/test_common/page_util.rs +++ b/parquet/src/util/test_common/page_util.rs @@ -173,7 +173,7 @@ impl + Send> PageReader for InMemoryPageReader

{ Ok(self.page_iter.next()) } - fn peek_next_page(&self) -> Result> { + fn peek_next_page(&mut self) -> Result> { unimplemented!() }