Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support peek_next_page() and skip_next_page in serialized_reader. #2044

Merged
merged 6 commits into from Jul 14, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion parquet/src/arrow/async_reader.rs
Expand Up @@ -552,7 +552,7 @@ impl PageReader for InMemoryColumnChunkReader {
Ok(None)
}

fn peek_next_page(&self) -> Result<Option<PageMetadata>> {
fn peek_next_page(&mut self) -> Result<Option<PageMetadata>> {
Err(nyi_err!("https://github.com/apache/arrow-rs/issues/1792"))
}

Expand Down
2 changes: 1 addition & 1 deletion parquet/src/column/page.rs
Expand Up @@ -205,7 +205,7 @@ pub trait PageReader: Iterator<Item = Result<Page>> + Send {

/// Gets metadata about the next page, returns an error if no
/// column index information
fn peek_next_page(&self) -> Result<Option<PageMetadata>>;
fn peek_next_page(&mut self) -> Result<Option<PageMetadata>>;

/// Skips reading the next page, returns an error if no
/// column index information
Expand Down
3 changes: 3 additions & 0 deletions parquet/src/column/writer.rs
Expand Up @@ -1792,6 +1792,7 @@ mod tests {
7,
Compression::UNCOMPRESSED,
Type::INT32,
i64::MIN,
)
.unwrap();

Expand Down Expand Up @@ -1975,6 +1976,7 @@ mod tests {
data.len() as i64,
Compression::UNCOMPRESSED,
Int32Type::get_physical_type(),
i64::MIN,
)
.unwrap(),
);
Expand Down Expand Up @@ -2358,6 +2360,7 @@ mod tests {
column_metadata.num_values(),
column_metadata.compression(),
T::get_physical_type(),
i64::MIN,
)
.unwrap(),
);
Expand Down
22 changes: 21 additions & 1 deletion parquet/src/file/metadata.rs
Expand Up @@ -228,7 +228,7 @@ pub struct RowGroupMetaData {
num_rows: i64,
total_byte_size: i64,
schema_descr: SchemaDescPtr,
// Todo add filter result -> row range
page_offset_index: Option<Vec<Vec<PageLocation>>>,
}

impl RowGroupMetaData {
Expand Down Expand Up @@ -267,6 +267,11 @@ impl RowGroupMetaData {
self.columns.iter().map(|c| c.total_compressed_size).sum()
}

/// Returns reference of page offset index.
pub fn page_offset_index(&self) -> &Option<Vec<Vec<PageLocation>>> {
&self.page_offset_index
}

/// Returns reference to a schema descriptor.
pub fn schema_descr(&self) -> &SchemaDescriptor {
self.schema_descr.as_ref()
Expand All @@ -277,6 +282,11 @@ impl RowGroupMetaData {
self.schema_descr.clone()
}

/// Sets page offset index for this row group.
pub fn set_page_offset(&mut self, page_offset: Vec<Vec<PageLocation>>) {
self.page_offset_index = Some(page_offset);
}

/// Method to convert from Thrift.
pub fn from_thrift(
schema_descr: SchemaDescPtr,
Expand All @@ -295,6 +305,7 @@ impl RowGroupMetaData {
num_rows,
total_byte_size,
schema_descr,
page_offset_index: None,
})
}

Expand All @@ -318,6 +329,7 @@ pub struct RowGroupMetaDataBuilder {
schema_descr: SchemaDescPtr,
num_rows: i64,
total_byte_size: i64,
page_offset_index: Option<Vec<Vec<PageLocation>>>,
}

impl RowGroupMetaDataBuilder {
Expand All @@ -328,6 +340,7 @@ impl RowGroupMetaDataBuilder {
schema_descr,
num_rows: 0,
total_byte_size: 0,
page_offset_index: None,
}
}

Expand All @@ -349,6 +362,12 @@ impl RowGroupMetaDataBuilder {
self
}

/// Sets page offset index for this row group.
pub fn set_page_offset(mut self, page_offset: Vec<Vec<PageLocation>>) -> Self {
self.page_offset_index = Some(page_offset);
self
}

/// Builds row group metadata.
pub fn build(self) -> Result<RowGroupMetaData> {
if self.schema_descr.num_columns() != self.columns.len() {
Expand All @@ -364,6 +383,7 @@ impl RowGroupMetaDataBuilder {
num_rows: self.num_rows,
total_byte_size: self.total_byte_size,
schema_descr: self.schema_descr,
page_offset_index: self.page_offset_index,
})
}
}
Expand Down
175 changes: 167 additions & 8 deletions parquet/src/file/serialized_reader.rs
Expand Up @@ -19,9 +19,9 @@
//! Also contains implementations of the ChunkReader for files (with buffering) and byte arrays (RAM)

use bytes::{Buf, Bytes};
use std::{convert::TryFrom, fs::File, io::Read, path::Path, sync::Arc};
use std::{convert::TryFrom, fs::File, io, io::Read, path::Path, sync::Arc};

use parquet_format::{PageHeader, PageType};
use parquet_format::{PageHeader, PageLocation, PageType};
use thrift::protocol::TCompactInputProtocol;

use crate::basic::{Compression, Encoding, Type};
Expand All @@ -37,6 +37,7 @@ use crate::util::{io::TryClone, memory::ByteBufferPtr};

// export `SliceableCursor` and `FileSource` publically so clients can
// re-use the logic in their own ParquetFileWriter wrappers
use crate::util::page_util::calculate_row_count;
#[allow(deprecated)]
pub use crate::util::{cursor::SliceableCursor, io::FileSource};

Expand Down Expand Up @@ -251,11 +252,12 @@ impl<R: 'static + ChunkReader> SerializedFileReader<R> {
let mut columns_indexes = vec![];
let mut offset_indexes = vec![];

for rg in &filtered_row_groups {
for rg in &mut filtered_row_groups {
let column_index =
index_reader::read_columns_indexes(&chunk_reader, rg.columns())?;
let offset_index =
index_reader::read_pages_locations(&chunk_reader, rg.columns())?;
rg.set_page_offset(offset_index.clone());
columns_indexes.push(column_index);
offset_indexes.push(offset_index);
}
Expand Down Expand Up @@ -346,14 +348,22 @@ impl<'a, R: 'static + ChunkReader> RowGroupReader for SerializedRowGroupReader<'
fn get_column_page_reader(&self, i: usize) -> Result<Box<dyn PageReader>> {
let col = self.metadata.column(i);
let (col_start, col_length) = col.byte_range();
//Todo filter with multi row range
let file_chunk = self.chunk_reader.get_read(col_start, col_length as usize)?;
let page_reader = SerializedPageReader::new(
let mut page_reader = SerializedPageReader::new(
file_chunk,
col.num_values(),
col.compression(),
col.column_descr().physical_type(),
self.metadata.num_rows(),
)?;
if let Some(offset_index) = self.metadata.page_offset_index() {
let first_data_page_offset = offset_index[i][0].offset;
let has_dictionary_page = first_data_page_offset != col.file_offset();
page_reader.with_page_offset_index_and_has_dictionary_to_read(
offset_index[i].clone(),
has_dictionary_page,
);
}
Ok(Box::new(page_reader))
}

Expand Down Expand Up @@ -481,6 +491,19 @@ pub struct SerializedPageReader<T: Read> {

// Column chunk type.
physical_type: Type,

// total row count.
num_rows: i64,
Ted-Jiang marked this conversation as resolved.
Show resolved Hide resolved

//Page offset index.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we could have something like

enum SerializedPages<T: Read>
    /// Read entire chunk
    Chunk {
        buf: T,
        total_num_values: i64,
    },
    Pages {
        index: Vec<PageLocation>,
        seen_num_data_pages: usize,
        has_dictionary_page_to_read: bool,
        page_bufs: VecDeque<T>
    }

To make it clear what is present in what mode

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good advice! i will try in this mode.👍

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix in bd5335f @tustvold PTAL, learn a lot from you 😊

page_offset_index: Option<Vec<PageLocation>>,

// The number of data pages we have seen so far,
// include the skipped page.
seen_num_data_pages: usize,

// A flag to check whether a dictionary page should read first
has_dictionary_page_to_read: bool,
}

impl<T: Read> SerializedPageReader<T> {
Expand All @@ -490,6 +513,7 @@ impl<T: Read> SerializedPageReader<T> {
total_num_values: i64,
compression: Compression,
physical_type: Type,
num_rows: i64,
) -> Result<Self> {
let decompressor = create_codec(compression)?;
let result = Self {
Expand All @@ -498,9 +522,22 @@ impl<T: Read> SerializedPageReader<T> {
seen_num_values: 0,
decompressor,
physical_type,
num_rows,
page_offset_index: None,
seen_num_data_pages: 0,
has_dictionary_page_to_read: false,
};
Ok(result)
}

pub fn with_page_offset_index_and_has_dictionary_to_read(
Ted-Jiang marked this conversation as resolved.
Show resolved Hide resolved
&mut self,
offset_index: Vec<PageLocation>,
has_dictionary_to_read: bool,
) {
self.page_offset_index = Some(offset_index);
self.has_dictionary_page_to_read = has_dictionary_to_read;
}
}

impl<T: Read + Send> Iterator for SerializedPageReader<T> {
Expand All @@ -514,6 +551,14 @@ impl<T: Read + Send> Iterator for SerializedPageReader<T> {
impl<T: Read + Send> PageReader for SerializedPageReader<T> {
fn get_next_page(&mut self) -> Result<Option<Page>> {
while self.seen_num_values < self.total_num_values {
// For now we can not update `seen_num_values` in `skip_next_page`,
// so we need add this check.
if let Some(indexes) = &self.page_offset_index {
if indexes.len() == self.seen_num_data_pages {
return Ok(None);
}
}

let page_header = read_page_header(&mut self.buf)?;

let to_read = page_header.compressed_page_size as usize;
Expand All @@ -540,6 +585,7 @@ impl<T: Read + Send> PageReader for SerializedPageReader<T> {
self.decompressor.as_mut(),
)?;
self.seen_num_values += decoded.num_values() as i64;
self.seen_num_data_pages += 1;
decoded
}
PageType::DictionaryPage => decode_page(
Expand All @@ -560,12 +606,53 @@ impl<T: Read + Send> PageReader for SerializedPageReader<T> {
Ok(None)
}

fn peek_next_page(&self) -> Result<Option<PageMetadata>> {
Err(nyi_err!("https://github.com/apache/arrow-rs/issues/1792"))
fn peek_next_page(&mut self) -> Result<Option<PageMetadata>> {
if let Some(page_offset_index) = &self.page_offset_index {
if self.seen_num_data_pages == page_offset_index.len() {
Ok(None)
} else if self.seen_num_data_pages == 0 && self.has_dictionary_page_to_read {
self.has_dictionary_page_to_read = false;
Ok(Some(PageMetadata {
num_rows: usize::MIN,
is_dict: true,
}))
} else {
let row_count = calculate_row_count(
page_offset_index,
self.seen_num_data_pages,
self.total_num_values,
)?;
Ok(Some(PageMetadata {
num_rows: row_count,
is_dict: false,
}))
}
} else {
Err(general_err!("Must set page_offset_index when using peek_next_page in SerializedPageReader."))
}
}

fn skip_next_page(&mut self) -> Result<()> {
Err(nyi_err!("https://github.com/apache/arrow-rs/issues/1792"))
if let Some(page_offset_index) = &self.page_offset_index {
let location = &page_offset_index[self.seen_num_data_pages];
let compressed_page_size = location.compressed_page_size;
//skip page bytes
let skip_size = io::copy(
self.buf.by_ref().take(compressed_page_size as u64).by_ref(),
Ted-Jiang marked this conversation as resolved.
Show resolved Hide resolved
&mut io::sink(),
)?;
if skip_size == compressed_page_size as u64 {
self.seen_num_data_pages += 1;
// Notice: 'self.seen_num_values += xxx', for now we can not get skip values in skip_next_page.
Ok(())
} else {
Err(general_err!(
"skip_next_page size is not equal compressed_page_size"
))
}
} else {
Err(general_err!("Must set page_offset_index when using skip_next_page in SerializedPageReader."))
}
}
}

Expand Down Expand Up @@ -1323,4 +1410,76 @@ mod tests {
let statistics = r.column(col_num).statistics().unwrap();
(statistics.min_bytes(), statistics.max_bytes())
}

#[test]
fn test_skip_page_with_offset_index() {
let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
let builder = ReadOptionsBuilder::new();
//enable read page index
let options = builder.with_page_index().build();
let reader_result = SerializedFileReader::new_with_options(test_file, options);
let reader = reader_result.unwrap();

let row_group_reader = reader.get_row_group(0).unwrap();

//use 'int_col', Boundary order: ASCENDING, total 325 pages.
let mut column_page_reader = row_group_reader.get_column_page_reader(4).unwrap();

let mut vec = vec![];

for i in 0..325 {
if i % 2 == 0 {
vec.push(column_page_reader.get_next_page().unwrap().unwrap());
} else {
column_page_reader.skip_next_page().unwrap();
}
}
//check read all pages.
assert!(column_page_reader.peek_next_page().unwrap().is_none());
assert!(column_page_reader.get_next_page().unwrap().is_none());

assert_eq!(vec.len(), 163);
}

#[test]
fn test_peek_page_with_dictionary_page() {
let test_file = get_test_file("alltypes_tiny_pages.parquet");
let builder = ReadOptionsBuilder::new();
//enable read page index
let options = builder.with_page_index().build();
let reader_result = SerializedFileReader::new_with_options(test_file, options);
let reader = reader_result.unwrap();
let row_group_reader = reader.get_row_group(0).unwrap();

//use 'string_col', Boundary order: UNORDERED, total 352 data ages and 1 dictionary page.
let mut column_page_reader = row_group_reader.get_column_page_reader(9).unwrap();

let mut vec = vec![];

let meta = column_page_reader.peek_next_page().unwrap().unwrap();
assert!(meta.is_dict);
let page = column_page_reader.get_next_page().unwrap().unwrap();
assert!(matches!(page.page_type(), basic::PageType::DICTIONARY_PAGE));

for i in 0..352 {
let meta = column_page_reader.peek_next_page().unwrap().unwrap();
// have checked with `parquet-tools column-index -c string_col ./alltypes_tiny_pages.parquet`
// page meta has two scenarios(21, 20) of num_rows expect last page has 11 rows.
if i != 351 {
assert!((meta.num_rows == 21) || (meta.num_rows == 20));
} else {
assert_eq!(meta.num_rows, 11);
}
assert!(!meta.is_dict);
vec.push(meta);
let page = column_page_reader.get_next_page().unwrap().unwrap();
assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE));
}

//check read all pages.
assert!(column_page_reader.peek_next_page().unwrap().is_none());
assert!(column_page_reader.get_next_page().unwrap().is_none());

assert_eq!(vec.len(), 352);
}
}
1 change: 1 addition & 0 deletions parquet/src/file/writer.rs
Expand Up @@ -1052,6 +1052,7 @@ mod tests {
total_num_values,
codec,
physical_type,
i64::MIN,
)
.unwrap();

Expand Down