Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support skip_page missing OffsetIndex Fallback in SerializedPageReader #2460

Merged
merged 3 commits into from Aug 18, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions parquet/src/arrow/arrow_reader/selection.rs
Expand Up @@ -119,6 +119,7 @@ impl RowSelection {
}

/// Given an offset index, return the offset ranges for all data pages selected by `self`
#[allow(unused)]
Ted-Jiang marked this conversation as resolved.
Show resolved Hide resolved
pub(crate) fn scan_ranges(
&self,
page_locations: &[PageLocation],
Expand Down
167 changes: 153 additions & 14 deletions parquet/src/file/serialized_reader.rs
Expand Up @@ -484,13 +484,17 @@ pub(crate) fn decode_page(
Ok(result)
}

#[allow(clippy::large_enum_variant)]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

without this clippy get

error: large size difference between variants
   --> parquet/src/file/serialized_reader.rs:484:5
    |
484 | /     Values {
485 | |         /// The current byte offset in the reader
486 | |         offset: usize,
487 | |
...   |
492 | |         next_page_header: Option<PageHeader>,
493 | |     },
    | |_____^ this variant is 336 bytes
    |
    = note: `-D clippy::large-enum-variant` implied by `-D warnings`
note: and the second-largest variant is 72 bytes:
   --> parquet/src/file/serialized_reader.rs:494:5

but i think the 336 is wrong because in PageHeader, most of the large arg are option, clippy can not know when running only one is some , so i think remove this check here is reasonable (or wrap 'Box' i think this will downgrade)🤔

  pub data_page_header: Option<DataPageHeader>,
  pub index_page_header: Option<IndexPageHeader>,
  pub dictionary_page_header: Option<DictionaryPageHeader>,
  pub data_page_header_v2: Option<DataPageHeaderV2>,

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tustvold PTAL😊

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An option takes up the same space regardless of it is set, they're like C++ union types in that sense. I think you should box the page header probably...

Copy link
Member Author

@Ted-Jiang Ted-Jiang Aug 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you are right! tested in ut

#[test]
    fn test() {
        enum  Test {
            Large {
                page: Option<PageHeader>,
            }
        }
        let test1 = Test::Large { page: None };

        println!("{}", std::mem::size_of_val(&test1));
        ()
    }

got 320 😂 thanks for your info!
I should try before ask😔

enum SerializedPageReaderState {
Values {
/// The current byte offset in the reader
offset: usize,

/// The length of the chunk in bytes
remaining_bytes: usize,

// If the next page header has already been "peeked", we will cache it and it`s length here
next_page_header: Option<PageHeader>,
Ted-Jiang marked this conversation as resolved.
Show resolved Hide resolved
},
Pages {
/// Remaining page locations
Expand Down Expand Up @@ -550,6 +554,7 @@ impl<R: ChunkReader> SerializedPageReader<R> {
None => SerializedPageReaderState::Values {
offset: start as usize,
remaining_bytes: len as usize,
next_page_header: None,
},
};

Expand Down Expand Up @@ -577,18 +582,24 @@ impl<R: ChunkReader> PageReader for SerializedPageReader<R> {
SerializedPageReaderState::Values {
offset,
remaining_bytes: remaining,
..
next_page_header,
} => {
if *remaining == 0 {
return Ok(None);
}

let mut read = self.reader.get_read(*offset as u64, *remaining)?;

let (header_len, header) = read_page_header_len(&mut read)?;
let header = if let Some(header) = next_page_header.take() {
header
} else {
let (header_len, header) = read_page_header_len(&mut read)?;
*offset += header_len;
*remaining -= header_len;
header
};
let data_len = header.compressed_page_size as usize;
*offset += header_len + data_len;
*remaining -= header_len + data_len;
*offset += data_len;
*remaining -= data_len;

if header.type_ == PageType::IndexPage {
continue;
Expand Down Expand Up @@ -648,20 +659,60 @@ impl<R: ChunkReader> PageReader for SerializedPageReader<R> {
}

fn peek_next_page(&mut self) -> Result<Option<PageMetadata>> {
match &self.state {
SerializedPageReaderState::Values {..} => Err(general_err!("Must set page_offset_index when using peek_next_page in SerializedPageReader.")),
SerializedPageReaderState::Pages { page_locations, dictionary_page, total_rows } => {
match &mut self.state {
SerializedPageReaderState::Values {
offset,
remaining_bytes,
next_page_header,
} => {
loop {
if *remaining_bytes == 0 {
return Ok(None);
}
return if let Some(header) = next_page_header.take() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this probably shouldn't take but just be as_ref?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the only way set it none. Is after skip_next_page.

if let Ok(page_meta) = (&header).try_into() {
Ok(Some(page_meta))
} else {
// For unknown page type (e.g., INDEX_PAGE), skip and read next.
*next_page_header = None;
continue;
}
} else {
let mut read =
self.reader.get_read(*offset as u64, *remaining_bytes)?;
let (header_len, header) = read_page_header_len(&mut read)?;
*offset += header_len;
*remaining_bytes -= header_len;
let page_meta = if let Ok(page_meta) = (&header).try_into() {
Ok(Some(page_meta))
} else {
// For unknown page type (e.g., INDEX_PAGE), skip and read next.
continue;
};
*next_page_header = Some(header);
page_meta
};
}
}
SerializedPageReaderState::Pages {
page_locations,
dictionary_page,
total_rows,
} => {
if dictionary_page.is_some() {
Ok(Some(PageMetadata{
Ok(Some(PageMetadata {
num_rows: 0,
is_dict: true
is_dict: true,
}))
} else if let Some(page) = page_locations.front() {
let next_rows = page_locations.get(1).map(|x| x.first_row_index as usize).unwrap_or(*total_rows);
let next_rows = page_locations
.get(1)
.map(|x| x.first_row_index as usize)
.unwrap_or(*total_rows);

Ok(Some(PageMetadata{
Ok(Some(PageMetadata {
num_rows: next_rows - page.first_row_index as usize,
is_dict: false
is_dict: false,
}))
} else {
Ok(None)
Expand All @@ -672,7 +723,24 @@ impl<R: ChunkReader> PageReader for SerializedPageReader<R> {

fn skip_next_page(&mut self) -> Result<()> {
match &mut self.state {
SerializedPageReaderState::Values {..} =>{ Err(general_err!("Must set page_offset_index when using skip_next_page in SerializedPageReader.")) },
SerializedPageReaderState::Values {
offset,
remaining_bytes,
next_page_header,
} => {
if let Some(buffered_header) = next_page_header.take() {
// The next page header has already been peeked, so just advance the offset
*offset += buffered_header.compressed_page_size as usize;
} else {
let mut read =
self.reader.get_read(*offset as u64, *remaining_bytes)?;
let (header_len, header) = read_page_header_len(&mut read)?;
let data_page_size = header.compressed_page_size as usize;
*offset += header_len + data_page_size;
*remaining_bytes -= header_len + data_page_size;
}
Ok(())
}
SerializedPageReaderState::Pages { page_locations, .. } => {
page_locations.pop_front();

Expand Down Expand Up @@ -1470,6 +1538,35 @@ mod tests {
assert_eq!(vec.len(), 163);
}

#[test]
fn test_skip_page_without_offset_index() {
let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");

// use default SerializedFileReader without read offsetIndex
let reader_result = SerializedFileReader::new(test_file);
let reader = reader_result.unwrap();

let row_group_reader = reader.get_row_group(0).unwrap();

//use 'int_col', Boundary order: ASCENDING, total 325 pages.
let mut column_page_reader = row_group_reader.get_column_page_reader(4).unwrap();

let mut vec = vec![];

for i in 0..325 {
if i % 2 == 0 {
vec.push(column_page_reader.get_next_page().unwrap().unwrap());
} else {
column_page_reader.skip_next_page().unwrap();
}
}
//check read all pages.
assert!(column_page_reader.peek_next_page().unwrap().is_none());
assert!(column_page_reader.get_next_page().unwrap().is_none());

assert_eq!(vec.len(), 163);
}

#[test]
fn test_peek_page_with_dictionary_page() {
let test_file = get_test_file("alltypes_tiny_pages.parquet");
Expand Down Expand Up @@ -1513,4 +1610,46 @@ mod tests {

assert_eq!(vec.len(), 352);
}

#[test]
fn test_peek_page_with_dictionary_page_without_offset_index() {
let test_file = get_test_file("alltypes_tiny_pages.parquet");

let reader_result = SerializedFileReader::new(test_file);
let reader = reader_result.unwrap();
let row_group_reader = reader.get_row_group(0).unwrap();

//use 'string_col', Boundary order: UNORDERED, total 352 data ages and 1 dictionary page.
let mut column_page_reader = row_group_reader.get_column_page_reader(9).unwrap();

let mut vec = vec![];

let meta = column_page_reader.peek_next_page().unwrap().unwrap();
assert!(meta.is_dict);
let page = column_page_reader.get_next_page().unwrap().unwrap();
assert!(matches!(page.page_type(), basic::PageType::DICTIONARY_PAGE));

for i in 0..352 {
let meta = column_page_reader.peek_next_page().unwrap().unwrap();
// have checked with `parquet-tools column-index -c string_col ./alltypes_tiny_pages.parquet`
// page meta has two scenarios(21, 20) of num_rows expect last page has 11 rows.
if i != 351 {
assert!((meta.num_rows == 21) || (meta.num_rows == 20));
} else {
// last page first row index is 7290, total row count is 7300
// because first row start with zero, last page row count should be 10.
assert_eq!(meta.num_rows, 10);
}
assert!(!meta.is_dict);
vec.push(meta);
let page = column_page_reader.get_next_page().unwrap().unwrap();
assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE));
}

//check read all pages.
assert!(column_page_reader.peek_next_page().unwrap().is_none());
assert!(column_page_reader.get_next_page().unwrap().is_none());

assert_eq!(vec.len(), 352);
}
}