Skip to content

Commit

Permalink
Support peek_next_page() and skip_next_page in serialized_reader.
Browse files Browse the repository at this point in the history
  • Loading branch information
Ted-Jiang committed Jul 11, 2022
1 parent ca5fe7d commit df0b733
Show file tree
Hide file tree
Showing 9 changed files with 224 additions and 12 deletions.
2 changes: 1 addition & 1 deletion parquet/src/arrow/async_reader.rs
Expand Up @@ -552,7 +552,7 @@ impl PageReader for InMemoryColumnChunkReader {
Ok(None)
}

fn peek_next_page(&self) -> Result<Option<PageMetadata>> {
fn peek_next_page(&mut self) -> Result<Option<PageMetadata>> {
Err(nyi_err!("https://github.com/apache/arrow-rs/issues/1792"))
}

Expand Down
2 changes: 1 addition & 1 deletion parquet/src/column/page.rs
Expand Up @@ -205,7 +205,7 @@ pub trait PageReader: Iterator<Item = Result<Page>> + Send {

/// Gets metadata about the next page, returns an error if no
/// column index information
fn peek_next_page(&self) -> Result<Option<PageMetadata>>;
fn peek_next_page(&mut self) -> Result<Option<PageMetadata>>;

/// Skips reading the next page, returns an error if no
/// column index information
Expand Down
3 changes: 3 additions & 0 deletions parquet/src/column/writer.rs
Expand Up @@ -1792,6 +1792,7 @@ mod tests {
7,
Compression::UNCOMPRESSED,
Type::INT32,
i64::MIN,
)
.unwrap();

Expand Down Expand Up @@ -1975,6 +1976,7 @@ mod tests {
data.len() as i64,
Compression::UNCOMPRESSED,
Int32Type::get_physical_type(),
i64::MIN,
)
.unwrap(),
);
Expand Down Expand Up @@ -2358,6 +2360,7 @@ mod tests {
column_metadata.num_values(),
column_metadata.compression(),
T::get_physical_type(),
i64::MIN,
)
.unwrap(),
);
Expand Down
22 changes: 21 additions & 1 deletion parquet/src/file/metadata.rs
Expand Up @@ -228,7 +228,7 @@ pub struct RowGroupMetaData {
num_rows: i64,
total_byte_size: i64,
schema_descr: SchemaDescPtr,
// Todo add filter result -> row range
page_offset_index: Option<Vec<Vec<PageLocation>>>,
}

impl RowGroupMetaData {
Expand Down Expand Up @@ -267,6 +267,11 @@ impl RowGroupMetaData {
self.columns.iter().map(|c| c.total_compressed_size).sum()
}

/// Returns reference of page offset index.
pub fn page_offset_index(&self) -> &Option<Vec<Vec<PageLocation>>> {
&self.page_offset_index
}

/// Returns reference to a schema descriptor.
pub fn schema_descr(&self) -> &SchemaDescriptor {
self.schema_descr.as_ref()
Expand All @@ -277,6 +282,11 @@ impl RowGroupMetaData {
self.schema_descr.clone()
}

/// Sets page offset index for this row group.
pub fn set_page_offset(&mut self, page_offset: Vec<Vec<PageLocation>>) {
self.page_offset_index = Some(page_offset);
}

/// Method to convert from Thrift.
pub fn from_thrift(
schema_descr: SchemaDescPtr,
Expand All @@ -295,6 +305,7 @@ impl RowGroupMetaData {
num_rows,
total_byte_size,
schema_descr,
page_offset_index: None,
})
}

Expand All @@ -318,6 +329,7 @@ pub struct RowGroupMetaDataBuilder {
schema_descr: SchemaDescPtr,
num_rows: i64,
total_byte_size: i64,
page_offset_index: Option<Vec<Vec<PageLocation>>>,
}

impl RowGroupMetaDataBuilder {
Expand All @@ -328,6 +340,7 @@ impl RowGroupMetaDataBuilder {
schema_descr,
num_rows: 0,
total_byte_size: 0,
page_offset_index: None,
}
}

Expand All @@ -349,6 +362,12 @@ impl RowGroupMetaDataBuilder {
self
}

/// Sets page offset index for this row group.
pub fn set_page_offset(mut self, page_offset: Vec<Vec<PageLocation>>) -> Self {
self.page_offset_index = Some(page_offset);
self
}

/// Builds row group metadata.
pub fn build(self) -> Result<RowGroupMetaData> {
if self.schema_descr.num_columns() != self.columns.len() {
Expand All @@ -364,6 +383,7 @@ impl RowGroupMetaDataBuilder {
num_rows: self.num_rows,
total_byte_size: self.total_byte_size,
schema_descr: self.schema_descr,
page_offset_index: self.page_offset_index,
})
}
}
Expand Down
175 changes: 167 additions & 8 deletions parquet/src/file/serialized_reader.rs
Expand Up @@ -19,9 +19,9 @@
//! Also contains implementations of the ChunkReader for files (with buffering) and byte arrays (RAM)

use bytes::{Buf, Bytes};
use std::{convert::TryFrom, fs::File, io::Read, path::Path, sync::Arc};
use std::{convert::TryFrom, fs::File, io, io::Read, path::Path, sync::Arc};

use parquet_format::{PageHeader, PageType};
use parquet_format::{PageHeader, PageLocation, PageType};
use thrift::protocol::TCompactInputProtocol;

use crate::basic::{Compression, Encoding, Type};
Expand All @@ -37,6 +37,7 @@ use crate::util::{io::TryClone, memory::ByteBufferPtr};

// export `SliceableCursor` and `FileSource` publically so clients can
// re-use the logic in their own ParquetFileWriter wrappers
use crate::util::page_util::calculate_row_count;
#[allow(deprecated)]
pub use crate::util::{cursor::SliceableCursor, io::FileSource};

Expand Down Expand Up @@ -251,11 +252,12 @@ impl<R: 'static + ChunkReader> SerializedFileReader<R> {
let mut columns_indexes = vec![];
let mut offset_indexes = vec![];

for rg in &filtered_row_groups {
for rg in &mut filtered_row_groups {
let column_index =
index_reader::read_columns_indexes(&chunk_reader, rg.columns())?;
let offset_index =
index_reader::read_pages_locations(&chunk_reader, rg.columns())?;
rg.set_page_offset(offset_index.clone());
columns_indexes.push(column_index);
offset_indexes.push(offset_index);
}
Expand Down Expand Up @@ -346,14 +348,22 @@ impl<'a, R: 'static + ChunkReader> RowGroupReader for SerializedRowGroupReader<'
fn get_column_page_reader(&self, i: usize) -> Result<Box<dyn PageReader>> {
let col = self.metadata.column(i);
let (col_start, col_length) = col.byte_range();
//Todo filter with multi row range
let file_chunk = self.chunk_reader.get_read(col_start, col_length as usize)?;
let page_reader = SerializedPageReader::new(
let mut page_reader = SerializedPageReader::new(
file_chunk,
col.num_values(),
col.compression(),
col.column_descr().physical_type(),
self.metadata.num_rows(),
)?;
if let Some(offset_index) = self.metadata.page_offset_index() {
let first_data_page_offset = offset_index[i][0].offset;
let has_dictionary_page = first_data_page_offset != col.file_offset();
page_reader.with_page_offset_index_and_has_dictionary_to_read(
offset_index[i].clone(),
has_dictionary_page,
);
}
Ok(Box::new(page_reader))
}

Expand Down Expand Up @@ -481,6 +491,19 @@ pub struct SerializedPageReader<T: Read> {

// Column chunk type.
physical_type: Type,

// total row count.
num_rows: i64,

//Page offset index.
page_offset_index: Option<Vec<PageLocation>>,

// The number of data pages we have seen so far,
// include the skipped page.
seen_num_data_pages: usize,

// A flag to check whether a dictionary page should read first
has_dictionary_page_to_read: bool,
}

impl<T: Read> SerializedPageReader<T> {
Expand All @@ -490,6 +513,7 @@ impl<T: Read> SerializedPageReader<T> {
total_num_values: i64,
compression: Compression,
physical_type: Type,
num_rows: i64,
) -> Result<Self> {
let decompressor = create_codec(compression)?;
let result = Self {
Expand All @@ -498,9 +522,22 @@ impl<T: Read> SerializedPageReader<T> {
seen_num_values: 0,
decompressor,
physical_type,
num_rows,
page_offset_index: None,
seen_num_data_pages: 0,
has_dictionary_page_to_read: false,
};
Ok(result)
}

pub fn with_page_offset_index_and_has_dictionary_to_read(
&mut self,
offset_index: Vec<PageLocation>,
has_dictionary_to_read: bool,
) {
self.page_offset_index = Some(offset_index);
self.has_dictionary_page_to_read = has_dictionary_to_read;
}
}

impl<T: Read + Send> Iterator for SerializedPageReader<T> {
Expand All @@ -514,6 +551,14 @@ impl<T: Read + Send> Iterator for SerializedPageReader<T> {
impl<T: Read + Send> PageReader for SerializedPageReader<T> {
fn get_next_page(&mut self) -> Result<Option<Page>> {
while self.seen_num_values < self.total_num_values {
// For now we can not update `seen_num_values` in `skip_next_page`,
// so we need add this check.
if let Some(indexes) = &self.page_offset_index {
if indexes.len() == self.seen_num_data_pages {
return Ok(None);
}
}

let page_header = read_page_header(&mut self.buf)?;

let to_read = page_header.compressed_page_size as usize;
Expand All @@ -540,6 +585,7 @@ impl<T: Read + Send> PageReader for SerializedPageReader<T> {
self.decompressor.as_mut(),
)?;
self.seen_num_values += decoded.num_values() as i64;
self.seen_num_data_pages += 1;
decoded
}
PageType::DictionaryPage => decode_page(
Expand All @@ -560,12 +606,53 @@ impl<T: Read + Send> PageReader for SerializedPageReader<T> {
Ok(None)
}

fn peek_next_page(&self) -> Result<Option<PageMetadata>> {
Err(nyi_err!("https://github.com/apache/arrow-rs/issues/1792"))
fn peek_next_page(&mut self) -> Result<Option<PageMetadata>> {
if let Some(page_offset_index) = &self.page_offset_index {
if self.seen_num_data_pages == page_offset_index.len() {
Ok(None)
} else if self.seen_num_data_pages == 0 && self.has_dictionary_page_to_read {
self.has_dictionary_page_to_read = false;
Ok(Some(PageMetadata {
num_rows: usize::MIN,
is_dict: true,
}))
} else {
let row_count = calculate_row_count(
page_offset_index,
self.seen_num_data_pages,
self.total_num_values,
)?;
Ok(Some(PageMetadata {
num_rows: row_count,
is_dict: false,
}))
}
} else {
Err(general_err!("Must set page_offset_index when using peek_next_page in SerializedPageReader."))
}
}

fn skip_next_page(&mut self) -> Result<()> {
Err(nyi_err!("https://github.com/apache/arrow-rs/issues/1792"))
if let Some(page_offset_index) = &self.page_offset_index {
let location = &page_offset_index[self.seen_num_data_pages];
let compressed_page_size = location.compressed_page_size;
//skip page bytes
let skip_size = io::copy(
self.buf.by_ref().take(compressed_page_size as u64).by_ref(),
&mut io::sink(),
)?;
if skip_size == compressed_page_size as u64 {
self.seen_num_data_pages += 1;
// Notice: 'self.seen_num_values += xxx', for now we can not get skip values in skip_next_page.
Ok(())
} else {
Err(general_err!(
"skip_next_page size is not equal compressed_page_size"
))
}
} else {
Err(general_err!("Must set page_offset_index when using skip_next_page in SerializedPageReader."))
}
}
}

Expand Down Expand Up @@ -1323,4 +1410,76 @@ mod tests {
let statistics = r.column(col_num).statistics().unwrap();
(statistics.min_bytes(), statistics.max_bytes())
}

#[test]
fn test_skip_page_with_offset_index() {
let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
let builder = ReadOptionsBuilder::new();
//enable read page index
let options = builder.with_page_index().build();
let reader_result = SerializedFileReader::new_with_options(test_file, options);
let reader = reader_result.unwrap();

let row_group_reader = reader.get_row_group(0).unwrap();

//use 'int_col', Boundary order: ASCENDING, total 325 pages.
let mut column_page_reader = row_group_reader.get_column_page_reader(4).unwrap();

let mut vec = vec![];

for i in 0..325 {
if i % 2 == 0 {
vec.push(column_page_reader.get_next_page().unwrap().unwrap());
} else {
column_page_reader.skip_next_page().unwrap();
}
}
//check read all pages.
assert!(column_page_reader.peek_next_page().unwrap().is_none());
assert!(column_page_reader.get_next_page().unwrap().is_none());

assert_eq!(vec.len(), 163);
}

#[test]
fn test_peek_page_with_dictionary_page() {
let test_file = get_test_file("alltypes_tiny_pages.parquet");
let builder = ReadOptionsBuilder::new();
//enable read page index
let options = builder.with_page_index().build();
let reader_result = SerializedFileReader::new_with_options(test_file, options);
let reader = reader_result.unwrap();
let row_group_reader = reader.get_row_group(0).unwrap();

//use 'string_col', Boundary order: UNORDERED, total 352 data ages and 1 dictionary page.
let mut column_page_reader = row_group_reader.get_column_page_reader(9).unwrap();

let mut vec = vec![];

let meta = column_page_reader.peek_next_page().unwrap().unwrap();
assert!(meta.is_dict);
let page = column_page_reader.get_next_page().unwrap().unwrap();
assert!(matches!(page.page_type(), basic::PageType::DICTIONARY_PAGE));

for i in 0..352 {
let meta = column_page_reader.peek_next_page().unwrap().unwrap();
// have checked with `parquet-tools column-index -c string_col ./alltypes_tiny_pages.parquet`
// page meta has two scenarios(21, 20) of num_rows expect last page has 11 rows.
if i != 351 {
assert!((meta.num_rows == 21) || (meta.num_rows == 20));
} else {
assert_eq!(meta.num_rows, 11);
}
assert!(!meta.is_dict);
vec.push(meta);
let page = column_page_reader.get_next_page().unwrap().unwrap();
assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE));
}

//check read all pages.
assert!(column_page_reader.peek_next_page().unwrap().is_none());
assert!(column_page_reader.get_next_page().unwrap().is_none());

assert_eq!(vec.len(), 352);
}
}
1 change: 1 addition & 0 deletions parquet/src/file/writer.rs
Expand Up @@ -1052,6 +1052,7 @@ mod tests {
total_num_values,
codec,
physical_type,
i64::MIN,
)
.unwrap();

Expand Down

0 comments on commit df0b733

Please sign in to comment.