Skip to content

Commit

Permalink
Support skip_page missing OffsetIndex Fallback in SerializedPageReader (
Browse files Browse the repository at this point in the history
#2460)

* Support skip_page missing OffsetIndex Fallback in SerializedPageReader

* fix clippy

* fix comment

Co-authored-by: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com>
  • Loading branch information
Ted-Jiang and tustvold committed Aug 18, 2022
1 parent 9f77e4e commit 12cc067
Show file tree
Hide file tree
Showing 2 changed files with 154 additions and 14 deletions.
1 change: 1 addition & 0 deletions parquet/src/arrow/arrow_reader/selection.rs
Expand Up @@ -119,6 +119,7 @@ impl RowSelection {
}

/// Given an offset index, return the offset ranges for all data pages selected by `self`
#[cfg(any(test, feature = "async"))]
pub(crate) fn scan_ranges(
&self,
page_locations: &[PageLocation],
Expand Down
167 changes: 153 additions & 14 deletions parquet/src/file/serialized_reader.rs
Expand Up @@ -484,13 +484,17 @@ pub(crate) fn decode_page(
Ok(result)
}

#[allow(clippy::large_enum_variant)]
enum SerializedPageReaderState {
Values {
/// The current byte offset in the reader
offset: usize,

/// The length of the chunk in bytes
remaining_bytes: usize,

// If the next page header has already been "peeked", we will cache it and it`s length here
next_page_header: Option<Box<PageHeader>>,
},
Pages {
/// Remaining page locations
Expand Down Expand Up @@ -550,6 +554,7 @@ impl<R: ChunkReader> SerializedPageReader<R> {
None => SerializedPageReaderState::Values {
offset: start as usize,
remaining_bytes: len as usize,
next_page_header: None,
},
};

Expand Down Expand Up @@ -577,18 +582,24 @@ impl<R: ChunkReader> PageReader for SerializedPageReader<R> {
SerializedPageReaderState::Values {
offset,
remaining_bytes: remaining,
..
next_page_header,
} => {
if *remaining == 0 {
return Ok(None);
}

let mut read = self.reader.get_read(*offset as u64, *remaining)?;

let (header_len, header) = read_page_header_len(&mut read)?;
let header = if let Some(header) = next_page_header.take() {
*header
} else {
let (header_len, header) = read_page_header_len(&mut read)?;
*offset += header_len;
*remaining -= header_len;
header
};
let data_len = header.compressed_page_size as usize;
*offset += header_len + data_len;
*remaining -= header_len + data_len;
*offset += data_len;
*remaining -= data_len;

if header.type_ == PageType::IndexPage {
continue;
Expand Down Expand Up @@ -648,20 +659,60 @@ impl<R: ChunkReader> PageReader for SerializedPageReader<R> {
}

fn peek_next_page(&mut self) -> Result<Option<PageMetadata>> {
match &self.state {
SerializedPageReaderState::Values {..} => Err(general_err!("Must set page_offset_index when using peek_next_page in SerializedPageReader.")),
SerializedPageReaderState::Pages { page_locations, dictionary_page, total_rows } => {
match &mut self.state {
SerializedPageReaderState::Values {
offset,
remaining_bytes,
next_page_header,
} => {
loop {
if *remaining_bytes == 0 {
return Ok(None);
}
return if let Some(header) = next_page_header.as_ref() {
if let Ok(page_meta) = (&**header).try_into() {
Ok(Some(page_meta))
} else {
// For unknown page type (e.g., INDEX_PAGE), skip and read next.
*next_page_header = None;
continue;
}
} else {
let mut read =
self.reader.get_read(*offset as u64, *remaining_bytes)?;
let (header_len, header) = read_page_header_len(&mut read)?;
*offset += header_len;
*remaining_bytes -= header_len;
let page_meta = if let Ok(page_meta) = (&header).try_into() {
Ok(Some(page_meta))
} else {
// For unknown page type (e.g., INDEX_PAGE), skip and read next.
continue;
};
*next_page_header = Some(Box::new(header));
page_meta
};
}
}
SerializedPageReaderState::Pages {
page_locations,
dictionary_page,
total_rows,
} => {
if dictionary_page.is_some() {
Ok(Some(PageMetadata{
Ok(Some(PageMetadata {
num_rows: 0,
is_dict: true
is_dict: true,
}))
} else if let Some(page) = page_locations.front() {
let next_rows = page_locations.get(1).map(|x| x.first_row_index as usize).unwrap_or(*total_rows);
let next_rows = page_locations
.get(1)
.map(|x| x.first_row_index as usize)
.unwrap_or(*total_rows);

Ok(Some(PageMetadata{
Ok(Some(PageMetadata {
num_rows: next_rows - page.first_row_index as usize,
is_dict: false
is_dict: false,
}))
} else {
Ok(None)
Expand All @@ -672,7 +723,24 @@ impl<R: ChunkReader> PageReader for SerializedPageReader<R> {

fn skip_next_page(&mut self) -> Result<()> {
match &mut self.state {
SerializedPageReaderState::Values {..} =>{ Err(general_err!("Must set page_offset_index when using skip_next_page in SerializedPageReader.")) },
SerializedPageReaderState::Values {
offset,
remaining_bytes,
next_page_header,
} => {
if let Some(buffered_header) = next_page_header.take() {
// The next page header has already been peeked, so just advance the offset
*offset += buffered_header.compressed_page_size as usize;
} else {
let mut read =
self.reader.get_read(*offset as u64, *remaining_bytes)?;
let (header_len, header) = read_page_header_len(&mut read)?;
let data_page_size = header.compressed_page_size as usize;
*offset += header_len + data_page_size;
*remaining_bytes -= header_len + data_page_size;
}
Ok(())
}
SerializedPageReaderState::Pages { page_locations, .. } => {
page_locations.pop_front();

Expand Down Expand Up @@ -1470,6 +1538,35 @@ mod tests {
assert_eq!(vec.len(), 163);
}

#[test]
fn test_skip_page_without_offset_index() {
let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");

// use default SerializedFileReader without read offsetIndex
let reader_result = SerializedFileReader::new(test_file);
let reader = reader_result.unwrap();

let row_group_reader = reader.get_row_group(0).unwrap();

//use 'int_col', Boundary order: ASCENDING, total 325 pages.
let mut column_page_reader = row_group_reader.get_column_page_reader(4).unwrap();

let mut vec = vec![];

for i in 0..325 {
if i % 2 == 0 {
vec.push(column_page_reader.get_next_page().unwrap().unwrap());
} else {
column_page_reader.skip_next_page().unwrap();
}
}
//check read all pages.
assert!(column_page_reader.peek_next_page().unwrap().is_none());
assert!(column_page_reader.get_next_page().unwrap().is_none());

assert_eq!(vec.len(), 163);
}

#[test]
fn test_peek_page_with_dictionary_page() {
let test_file = get_test_file("alltypes_tiny_pages.parquet");
Expand Down Expand Up @@ -1513,4 +1610,46 @@ mod tests {

assert_eq!(vec.len(), 352);
}

#[test]
fn test_peek_page_with_dictionary_page_without_offset_index() {
let test_file = get_test_file("alltypes_tiny_pages.parquet");

let reader_result = SerializedFileReader::new(test_file);
let reader = reader_result.unwrap();
let row_group_reader = reader.get_row_group(0).unwrap();

//use 'string_col', Boundary order: UNORDERED, total 352 data ages and 1 dictionary page.
let mut column_page_reader = row_group_reader.get_column_page_reader(9).unwrap();

let mut vec = vec![];

let meta = column_page_reader.peek_next_page().unwrap().unwrap();
assert!(meta.is_dict);
let page = column_page_reader.get_next_page().unwrap().unwrap();
assert!(matches!(page.page_type(), basic::PageType::DICTIONARY_PAGE));

for i in 0..352 {
let meta = column_page_reader.peek_next_page().unwrap().unwrap();
// have checked with `parquet-tools column-index -c string_col ./alltypes_tiny_pages.parquet`
// page meta has two scenarios(21, 20) of num_rows expect last page has 11 rows.
if i != 351 {
assert!((meta.num_rows == 21) || (meta.num_rows == 20));
} else {
// last page first row index is 7290, total row count is 7300
// because first row start with zero, last page row count should be 10.
assert_eq!(meta.num_rows, 10);
}
assert!(!meta.is_dict);
vec.push(meta);
let page = column_page_reader.get_next_page().unwrap().unwrap();
assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE));
}

//check read all pages.
assert!(column_page_reader.peek_next_page().unwrap().is_none());
assert!(column_page_reader.get_next_page().unwrap().is_none());

assert_eq!(vec.len(), 352);
}
}

0 comments on commit 12cc067

Please sign in to comment.