Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable serialized_reader read specific Page by passing row ranges. #1977

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion parquet/src/arrow/async_reader.rs
Expand Up @@ -552,7 +552,7 @@ impl PageReader for InMemoryColumnChunkReader {
Ok(None)
}

fn peek_next_page(&self) -> Result<Option<PageMetadata>> {
fn peek_next_page(&mut self) -> Result<Option<PageMetadata>> {
Err(nyi_err!("https://github.com/apache/arrow-rs/issues/1792"))
}

Expand Down
2 changes: 1 addition & 1 deletion parquet/src/column/page.rs
Expand Up @@ -205,7 +205,7 @@ pub trait PageReader: Iterator<Item = Result<Page>> + Send {

/// Gets metadata about the next page, returns an error if no
/// column index information
fn peek_next_page(&self) -> Result<Option<PageMetadata>>;
fn peek_next_page(&mut self) -> Result<Option<PageMetadata>>;

/// Skips reading the next page, returns an error if no
/// column index information
Expand Down
3 changes: 3 additions & 0 deletions parquet/src/column/writer.rs
Expand Up @@ -1792,6 +1792,7 @@ mod tests {
7,
Compression::UNCOMPRESSED,
Type::INT32,
i64::MIN,
)
.unwrap();

Expand Down Expand Up @@ -1975,6 +1976,7 @@ mod tests {
data.len() as i64,
Compression::UNCOMPRESSED,
Int32Type::get_physical_type(),
i64::MIN,
)
.unwrap(),
);
Expand Down Expand Up @@ -2358,6 +2360,7 @@ mod tests {
column_metadata.num_values(),
column_metadata.compression(),
T::get_physical_type(),
i64::MIN,
)
.unwrap(),
);
Expand Down
22 changes: 21 additions & 1 deletion parquet/src/file/metadata.rs
Expand Up @@ -228,7 +228,7 @@ pub struct RowGroupMetaData {
num_rows: i64,
total_byte_size: i64,
schema_descr: SchemaDescPtr,
// Todo add filter result -> row range
page_offset_index: Option<Vec<Vec<PageLocation>>>,
}

impl RowGroupMetaData {
Expand Down Expand Up @@ -267,6 +267,11 @@ impl RowGroupMetaData {
self.columns.iter().map(|c| c.total_compressed_size).sum()
}

/// Returns reference of page offset index.
pub fn page_offset_index(&self) -> &Option<Vec<Vec<PageLocation>>> {
&self.page_offset_index
}

/// Returns reference to a schema descriptor.
pub fn schema_descr(&self) -> &SchemaDescriptor {
self.schema_descr.as_ref()
Expand All @@ -277,6 +282,11 @@ impl RowGroupMetaData {
self.schema_descr.clone()
}

/// Sets page offset index for this row group.
pub fn set_page_offset(&mut self, page_offset: Vec<Vec<PageLocation>>) {
self.page_offset_index = Some(page_offset);
}

/// Method to convert from Thrift.
pub fn from_thrift(
schema_descr: SchemaDescPtr,
Expand All @@ -295,6 +305,7 @@ impl RowGroupMetaData {
num_rows,
total_byte_size,
schema_descr,
page_offset_index: None,
})
}

Expand All @@ -318,6 +329,7 @@ pub struct RowGroupMetaDataBuilder {
schema_descr: SchemaDescPtr,
num_rows: i64,
total_byte_size: i64,
page_offset_index: Option<Vec<Vec<PageLocation>>>,
}

impl RowGroupMetaDataBuilder {
Expand All @@ -328,6 +340,7 @@ impl RowGroupMetaDataBuilder {
schema_descr,
num_rows: 0,
total_byte_size: 0,
page_offset_index: None,
}
}

Expand All @@ -349,6 +362,12 @@ impl RowGroupMetaDataBuilder {
self
}

/// Sets page offset index for this row group.
pub fn set_page_offset(mut self, page_offset: Vec<Vec<PageLocation>>) -> Self {
self.page_offset_index = Some(page_offset);
self
}

/// Builds row group metadata.
pub fn build(self) -> Result<RowGroupMetaData> {
if self.schema_descr.num_columns() != self.columns.len() {
Expand All @@ -364,6 +383,7 @@ impl RowGroupMetaDataBuilder {
num_rows: self.num_rows,
total_byte_size: self.total_byte_size,
schema_descr: self.schema_descr,
page_offset_index: self.page_offset_index,
})
}
}
Expand Down
175 changes: 167 additions & 8 deletions parquet/src/file/serialized_reader.rs
Expand Up @@ -19,9 +19,9 @@
//! Also contains implementations of the ChunkReader for files (with buffering) and byte arrays (RAM)

use bytes::{Buf, Bytes};
use std::{convert::TryFrom, fs::File, io::Read, path::Path, sync::Arc};
use std::{convert::TryFrom, fs::File, io, io::Read, path::Path, sync::Arc};

use parquet_format::{PageHeader, PageType};
use parquet_format::{PageHeader, PageLocation, PageType};
use thrift::protocol::TCompactInputProtocol;

use crate::basic::{Compression, Encoding, Type};
Expand All @@ -37,6 +37,7 @@ use crate::util::{io::TryClone, memory::ByteBufferPtr};

// export `SliceableCursor` and `FileSource` publically so clients can
// re-use the logic in their own ParquetFileWriter wrappers
use crate::util::page_util::calculate_row_count;
#[allow(deprecated)]
pub use crate::util::{cursor::SliceableCursor, io::FileSource};

Expand Down Expand Up @@ -251,11 +252,12 @@ impl<R: 'static + ChunkReader> SerializedFileReader<R> {
let mut columns_indexes = vec![];
Ted-Jiang marked this conversation as resolved.
Show resolved Hide resolved
let mut offset_indexes = vec![];

for rg in &filtered_row_groups {
for rg in &mut filtered_row_groups {
let column_index =
index_reader::read_columns_indexes(&chunk_reader, rg.columns())?;
let offset_index =
index_reader::read_pages_locations(&chunk_reader, rg.columns())?;
rg.set_page_offset(offset_index.clone());
columns_indexes.push(column_index);
offset_indexes.push(offset_index);
}
Expand Down Expand Up @@ -346,14 +348,22 @@ impl<'a, R: 'static + ChunkReader> RowGroupReader for SerializedRowGroupReader<'
fn get_column_page_reader(&self, i: usize) -> Result<Box<dyn PageReader>> {
let col = self.metadata.column(i);
let (col_start, col_length) = col.byte_range();
//Todo filter with multi row range
let file_chunk = self.chunk_reader.get_read(col_start, col_length as usize)?;
let page_reader = SerializedPageReader::new(
let mut page_reader = SerializedPageReader::new(
file_chunk,
col.num_values(),
col.compression(),
col.column_descr().physical_type(),
self.metadata.num_rows(),
)?;
if let Some(offset_index) = self.metadata.page_offset_index() {
let first_data_page_offset = offset_index[i][0].offset;
let has_dictionary_page = first_data_page_offset != col.file_offset();
page_reader.with_page_offset_index_and_has_dictionary_to_read(
offset_index[i].clone(),
has_dictionary_page,
);
}
Ok(Box::new(page_reader))
}

Expand Down Expand Up @@ -481,6 +491,19 @@ pub struct SerializedPageReader<T: Read> {

// Column chunk type.
physical_type: Type,

// total row count.
num_rows: i64,

//Page offset index.
page_offset_index: Option<Vec<PageLocation>>,

// The number of data pages we have seen so far,
// include the skipped page.
seen_num_data_pages: usize,

// A flag to check whether a dictionary page should read first
has_dictionary_page_to_read: bool,
}

impl<T: Read> SerializedPageReader<T> {
Expand All @@ -490,6 +513,7 @@ impl<T: Read> SerializedPageReader<T> {
total_num_values: i64,
compression: Compression,
physical_type: Type,
num_rows: i64,
) -> Result<Self> {
let decompressor = create_codec(compression)?;
let result = Self {
Expand All @@ -498,9 +522,22 @@ impl<T: Read> SerializedPageReader<T> {
seen_num_values: 0,
decompressor,
physical_type,
num_rows,
page_offset_index: None,
seen_num_data_pages: 0,
has_dictionary_page_to_read: false,
};
Ok(result)
}

pub fn with_page_offset_index_and_has_dictionary_to_read(
&mut self,
offset_index: Vec<PageLocation>,
has_dictionary_to_read: bool,
) {
self.page_offset_index = Some(offset_index);
self.has_dictionary_page_to_read = has_dictionary_to_read;
}
}

impl<T: Read + Send> Iterator for SerializedPageReader<T> {
Expand All @@ -514,6 +551,14 @@ impl<T: Read + Send> Iterator for SerializedPageReader<T> {
impl<T: Read + Send> PageReader for SerializedPageReader<T> {
fn get_next_page(&mut self) -> Result<Option<Page>> {
while self.seen_num_values < self.total_num_values {
// For now we can not update `seen_num_values` in `skip_next_page`,
// so we need add this check.
if let Some(indexes) = &self.page_offset_index {
if indexes.len() == self.seen_num_data_pages {
return Ok(None);
}
}

let page_header = read_page_header(&mut self.buf)?;

let to_read = page_header.compressed_page_size as usize;
Expand All @@ -540,6 +585,7 @@ impl<T: Read + Send> PageReader for SerializedPageReader<T> {
self.decompressor.as_mut(),
)?;
self.seen_num_values += decoded.num_values() as i64;
self.seen_num_data_pages += 1;
decoded
}
PageType::DictionaryPage => decode_page(
Expand All @@ -560,12 +606,53 @@ impl<T: Read + Send> PageReader for SerializedPageReader<T> {
Ok(None)
}

fn peek_next_page(&self) -> Result<Option<PageMetadata>> {
Err(nyi_err!("https://github.com/apache/arrow-rs/issues/1792"))
fn peek_next_page(&mut self) -> Result<Option<PageMetadata>> {
if let Some(page_offset_index) = &self.page_offset_index {
if self.seen_num_data_pages == page_offset_index.len() {
Ok(None)
} else if self.seen_num_data_pages == 0 && self.has_dictionary_page_to_read {
self.has_dictionary_page_to_read = false;
Ok(Some(PageMetadata {
num_rows: usize::MIN,
is_dict: true,
}))
} else {
let row_count = calculate_row_count(
page_offset_index,
self.seen_num_data_pages,
self.total_num_values,
)?;
Ok(Some(PageMetadata {
num_rows: row_count,
is_dict: false,
}))
}
} else {
Err(general_err!("Must set page_offset_index when using peek_next_page in SerializedPageReader."))
}
}

fn skip_next_page(&mut self) -> Result<()> {
Err(nyi_err!("https://github.com/apache/arrow-rs/issues/1792"))
if let Some(page_offset_index) = &self.page_offset_index {
let location = &page_offset_index[self.seen_num_data_pages];
let compressed_page_size = location.compressed_page_size;
//skip page bytes
let skip_size = io::copy(
self.buf.by_ref().take(compressed_page_size as u64).by_ref(),
&mut io::sink(),
)?;
if skip_size == compressed_page_size as u64 {
self.seen_num_data_pages += 1;
// Notice: 'self.seen_num_values += xxx', for now we can not get skip values in skip_next_page.
Ok(())
} else {
Err(general_err!(
"skip_next_page size is not equal compressed_page_size"
))
}
} else {
Err(general_err!("Must set page_offset_index when using skip_next_page in SerializedPageReader."))
}
}
}

Expand Down Expand Up @@ -1323,4 +1410,76 @@ mod tests {
let statistics = r.column(col_num).statistics().unwrap();
(statistics.min_bytes(), statistics.max_bytes())
}

#[test]
fn test_skip_page_with_offset_index() {
let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");
let builder = ReadOptionsBuilder::new();
//enable read page index
let options = builder.with_page_index().build();
let reader_result = SerializedFileReader::new_with_options(test_file, options);
let reader = reader_result.unwrap();

let row_group_reader = reader.get_row_group(0).unwrap();

//use 'int_col', Boundary order: ASCENDING, total 325 pages.
let mut column_page_reader = row_group_reader.get_column_page_reader(4).unwrap();

let mut vec = vec![];

for i in 0..325 {
if i % 2 == 0 {
vec.push(column_page_reader.get_next_page().unwrap().unwrap());
} else {
column_page_reader.skip_next_page().unwrap();
}
}
//check read all pages.
assert!(column_page_reader.peek_next_page().unwrap().is_none());
assert!(column_page_reader.get_next_page().unwrap().is_none());

assert_eq!(vec.len(), 163);
}

#[test]
fn test_peek_page_with_dictionary_page() {
let test_file = get_test_file("alltypes_tiny_pages.parquet");
let builder = ReadOptionsBuilder::new();
//enable read page index
let options = builder.with_page_index().build();
let reader_result = SerializedFileReader::new_with_options(test_file, options);
let reader = reader_result.unwrap();
let row_group_reader = reader.get_row_group(0).unwrap();

//use 'string_col', Boundary order: UNORDERED, total 352 data ages and 1 dictionary page.
let mut column_page_reader = row_group_reader.get_column_page_reader(9).unwrap();

let mut vec = vec![];

let meta = column_page_reader.peek_next_page().unwrap().unwrap();
assert!(meta.is_dict);
let page = column_page_reader.get_next_page().unwrap().unwrap();
assert!(matches!(page.page_type(), basic::PageType::DICTIONARY_PAGE));

for i in 0..352 {
let meta = column_page_reader.peek_next_page().unwrap().unwrap();
// have checked with `parquet-tools column-index -c string_col ./alltypes_tiny_pages.parquet`
// page meta has two scenarios(21, 20) of num_rows expect last page has 11 rows.
if i != 351 {
assert!((meta.num_rows == 21) || (meta.num_rows == 20));
} else {
assert_eq!(meta.num_rows, 11);
}
assert!(!meta.is_dict);
vec.push(meta);
let page = column_page_reader.get_next_page().unwrap().unwrap();
assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE));
}

//check read all pages.
assert!(column_page_reader.peek_next_page().unwrap().is_none());
assert!(column_page_reader.get_next_page().unwrap().is_none());

assert_eq!(vec.len(), 352);
}
}
1 change: 1 addition & 0 deletions parquet/src/file/writer.rs
Expand Up @@ -1052,6 +1052,7 @@ mod tests {
total_num_values,
codec,
physical_type,
i64::MIN,
)
.unwrap();

Expand Down