Skip to content

Commit

Permalink
Support skip_page missing OffsetIndex Fallback in SerializedPageReader
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold authored and Ted-Jiang committed Aug 18, 2022
1 parent 42e9531 commit 6c928eb
Showing 1 changed file with 111 additions and 14 deletions.
125 changes: 111 additions & 14 deletions parquet/src/file/serialized_reader.rs
Expand Up @@ -480,13 +480,17 @@ pub(crate) fn decode_page(
Ok(result)
}

#[allow(clippy::large_enum_variant)]
enum SerializedPageReaderState {
Values {
/// The current byte offset in the reader
offset: usize,

/// The length of the chunk in bytes
remaining_bytes: usize,

// If the next page header has already been "peeked", we will cache it and it`s length here
next_page_header: Option<PageHeader>,
},
Pages {
/// Remaining page locations
Expand Down Expand Up @@ -546,6 +550,7 @@ impl<R: ChunkReader> SerializedPageReader<R> {
None => SerializedPageReaderState::Values {
offset: start as usize,
remaining_bytes: len as usize,
next_page_header: None,
},
};

Expand Down Expand Up @@ -573,18 +578,24 @@ impl<R: ChunkReader> PageReader for SerializedPageReader<R> {
SerializedPageReaderState::Values {
offset,
remaining_bytes: remaining,
..
next_page_header,
} => {
if *remaining == 0 {
return Ok(None);
}

let mut read = self.reader.get_read(*offset as u64, *remaining)?;

let (header_len, header) = read_page_header_len(&mut read)?;
let header = if let Some(header) = next_page_header.take() {
header
} else {
let (header_len, header) = read_page_header_len(&mut read)?;
*offset += header_len;
*remaining -= header_len;
header
};
let data_len = header.compressed_page_size as usize;
*offset += header_len + data_len;
*remaining -= header_len + data_len;
*offset += data_len;
*remaining -= data_len;

if header.type_ == PageType::IndexPage {
continue;
Expand Down Expand Up @@ -657,20 +668,60 @@ impl<R: ChunkReader> PageReader for SerializedPageReader<R> {
}

fn peek_next_page(&mut self) -> Result<Option<PageMetadata>> {
match &self.state {
SerializedPageReaderState::Values {..} => Err(general_err!("Must set page_offset_index when using peek_next_page in SerializedPageReader.")),
SerializedPageReaderState::Pages { page_locations, dictionary_page, total_rows } => {
match &mut self.state {
SerializedPageReaderState::Values {
offset,
remaining_bytes,
next_page_header,
} => {
loop {
if *remaining_bytes == 0 {
return Ok(None);
}
return if let Some(header) = next_page_header.take() {
if let Ok(page_meta) = (&header).try_into() {
Ok(Some(page_meta))
} else {
// For unknown page type (e.g., INDEX_PAGE), skip and read next.
*next_page_header = None;
continue;
}
} else {
let mut read =
self.reader.get_read(*offset as u64, *remaining_bytes)?;
let (header_len, header) = read_page_header_len(&mut read)?;
*offset += header_len;
*remaining_bytes -= header_len;
let page_meta = if let Ok(page_meta) = (&header).try_into() {
Ok(Some(page_meta))
} else {
// For unknown page type (e.g., INDEX_PAGE), skip and read next.
continue;
};
*next_page_header = Some(header);
page_meta
};
}
}
SerializedPageReaderState::Pages {
page_locations,
dictionary_page,
total_rows,
} => {
if dictionary_page.is_some() {
Ok(Some(PageMetadata{
Ok(Some(PageMetadata {
num_rows: 0,
is_dict: true
is_dict: true,
}))
} else if let Some(page) = page_locations.front() {
let next_rows = page_locations.get(1).map(|x| x.first_row_index as usize).unwrap_or(*total_rows);
let next_rows = page_locations
.get(1)
.map(|x| x.first_row_index as usize)
.unwrap_or(*total_rows);

Ok(Some(PageMetadata{
Ok(Some(PageMetadata {
num_rows: next_rows - page.first_row_index as usize,
is_dict: false
is_dict: false,
}))
} else {
Ok(None)
Expand All @@ -681,7 +732,24 @@ impl<R: ChunkReader> PageReader for SerializedPageReader<R> {

fn skip_next_page(&mut self) -> Result<()> {
match &mut self.state {
SerializedPageReaderState::Values {..} =>{ Err(general_err!("Must set page_offset_index when using skip_next_page in SerializedPageReader.")) },
SerializedPageReaderState::Values {
offset,
remaining_bytes,
next_page_header,
} => {
if let Some(buffered_header) = next_page_header.take() {
// The next page header has already been peeked, so just advance the offset
*offset += buffered_header.compressed_page_size as usize;
} else {
let mut read =
self.reader.get_read(*offset as u64, *remaining_bytes)?;
let (header_len, header) = read_page_header_len(&mut read)?;
let data_page_size = header.compressed_page_size as usize;
*offset += header_len + data_page_size;
*remaining_bytes -= header_len + data_page_size;
}
Ok(())
}
SerializedPageReaderState::Pages { page_locations, .. } => {
page_locations.pop_front();

Expand Down Expand Up @@ -1479,6 +1547,35 @@ mod tests {
assert_eq!(vec.len(), 163);
}

#[test]
fn test_skip_page_without_offset_index() {
let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");

// use default SerializedFileReader without read offsetIndex
let reader_result = SerializedFileReader::new(test_file);
let reader = reader_result.unwrap();

let row_group_reader = reader.get_row_group(0).unwrap();

//use 'int_col', Boundary order: ASCENDING, total 325 pages.
let mut column_page_reader = row_group_reader.get_column_page_reader(4).unwrap();

let mut vec = vec![];

for i in 0..325 {
if i % 2 == 0 {
vec.push(column_page_reader.get_next_page().unwrap().unwrap());
} else {
column_page_reader.skip_next_page().unwrap();
}
}
//check read all pages.
assert!(column_page_reader.peek_next_page().unwrap().is_none());
assert!(column_page_reader.get_next_page().unwrap().is_none());

assert_eq!(vec.len(), 163);
}

#[test]
fn test_peek_page_with_dictionary_page() {
let test_file = get_test_file("alltypes_tiny_pages.parquet");
Expand Down

0 comments on commit 6c928eb

Please sign in to comment.