Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fallback for skip_page/peek_page without OffsetIndex in SerializedPag… #2490

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions parquet/src/arrow/arrow_reader/selection.rs
Expand Up @@ -119,6 +119,7 @@ impl RowSelection {
}

/// Given an offset index, return the offset ranges for all data pages selected by `self`
#[allow(unused)]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clippy was unhappy because this method is only used with the async feature

pub(crate) fn scan_ranges(
&self,
page_locations: &[PageLocation],
Expand Down
29 changes: 29 additions & 0 deletions parquet/src/file/page_index/index_reader.rs
Expand Up @@ -64,6 +64,11 @@ pub fn read_pages_locations<R: ChunkReader>(
) -> Result<Vec<Vec<PageLocation>>, ParquetError> {
let (offset, total_length) = get_location_offset_and_total_length(chunks)?;

// If there is no OffsetIndex, return an empty vector
if total_length == 0 {
return Ok(vec![]);
}

//read all need data into buffer
let mut reader = reader.get_read(offset, total_length)?;
let mut data = vec![0; total_length];
Expand Down Expand Up @@ -162,3 +167,27 @@ fn deserialize_column_index(

Ok(index)
}

#[cfg(test)]
mod tests {
use crate::file::footer::parse_metadata;
use crate::file::page_index::index_reader::read_pages_locations;
use bytes::Bytes;

#[test]
fn read_page_locations_no_index_in_file() {
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{}/alltypes_plain.parquet", testdata);
let data = Bytes::from(std::fs::read(path).unwrap());

let metadata = parse_metadata(&data).expect("parsing metadata");

let columns = metadata.row_group(0).columns();

let locations = read_pages_locations(&data, columns).expect("reading locations");

// If a parquet file does not have an OffsetIndex, read_page_locations should just return
// an empty vec and not an error
assert!(locations.is_empty());
}
}
194 changes: 177 additions & 17 deletions parquet/src/file/serialized_reader.rs
Expand Up @@ -484,13 +484,17 @@ pub(crate) fn decode_page(
Ok(result)
}

#[allow(clippy::large_enum_variant)]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Boxing buffered_header seemed counterproductive here so I just disabled the lint warning

enum SerializedPageReaderState {
Values {
/// The current byte offset in the reader
offset: usize,

/// The length of the chunk in bytes
remaining_bytes: usize,

/// If the next page has been "peeked", we cache the decoded header here
buffered_header: Option<PageHeader>,
},
Pages {
/// Remaining page locations
Expand Down Expand Up @@ -550,6 +554,7 @@ impl<R: ChunkReader> SerializedPageReader<R> {
None => SerializedPageReaderState::Values {
offset: start as usize,
remaining_bytes: len as usize,
buffered_header: None,
},
};

Expand Down Expand Up @@ -577,22 +582,38 @@ impl<R: ChunkReader> PageReader for SerializedPageReader<R> {
SerializedPageReaderState::Values {
offset,
remaining_bytes: remaining,
..
buffered_header,
} => {
if *remaining == 0 {
return Ok(None);
}

let mut read = self.reader.get_read(*offset as u64, *remaining)?;

let (header_len, header) = read_page_header_len(&mut read)?;
let data_len = header.compressed_page_size as usize;
*offset += header_len + data_len;
*remaining -= header_len + data_len;
let header = if let Some(header) = buffered_header.take() {
if header.type_ == PageType::IndexPage {
continue;
}

if header.type_ == PageType::IndexPage {
continue;
}
let data_len = header.compressed_page_size as usize;
*offset += data_len;
*remaining -= data_len;

header
} else {
let (header_len, header) = read_page_header_len(&mut read)?;
let data_len = header.compressed_page_size as usize;
*offset += header_len + data_len;
*remaining -= header_len + data_len;

if header.type_ == PageType::IndexPage {
continue;
}

header
};

let data_len = header.compressed_page_size as usize;

let mut buffer = Vec::with_capacity(data_len);
let read = read.take(data_len as u64).read_to_end(&mut buffer)?;
Expand Down Expand Up @@ -648,20 +669,63 @@ impl<R: ChunkReader> PageReader for SerializedPageReader<R> {
}

fn peek_next_page(&mut self) -> Result<Option<PageMetadata>> {
match &self.state {
SerializedPageReaderState::Values {..} => Err(general_err!("Must set page_offset_index when using peek_next_page in SerializedPageReader.")),
SerializedPageReaderState::Pages { page_locations, dictionary_page, total_rows } => {
match &mut self.state {
SerializedPageReaderState::Values {
offset,
remaining_bytes,
buffered_header,
} => {
loop {
if *remaining_bytes == 0 {
return Ok(None);
}

return if let Some(header) = buffered_header.as_ref() {
if let Ok(page_metadata) = header.try_into() {
Ok(Some(page_metadata))
} else {
// For unknown page type (e.g., INDEX_PAGE), skip and read next.
*buffered_header = None;
continue;
}
} else {
let mut read =
self.reader.get_read(*offset as u64, *remaining_bytes)?;

let (header_len, header) = read_page_header_len(&mut read)?;

*offset += header_len as usize;
*remaining_bytes -= header_len as usize;

if let Ok(page_metadata) = (&header).try_into() {
*buffered_header = Some(header);
return Ok(Some(page_metadata));
} else {
// For unknown page type (e.g., INDEX_PAGE), skip and read next.
continue;
};
};
}
}
SerializedPageReaderState::Pages {
page_locations,
dictionary_page,
total_rows,
} => {
if dictionary_page.is_some() {
Ok(Some(PageMetadata{
Ok(Some(PageMetadata {
num_rows: 0,
is_dict: true
is_dict: true,
}))
} else if let Some(page) = page_locations.front() {
let next_rows = page_locations.get(1).map(|x| x.first_row_index as usize).unwrap_or(*total_rows);
let next_rows = page_locations
.get(1)
.map(|x| x.first_row_index as usize)
.unwrap_or(*total_rows);

Ok(Some(PageMetadata{
Ok(Some(PageMetadata {
num_rows: next_rows - page.first_row_index as usize,
is_dict: false
is_dict: false,
}))
} else {
Ok(None)
Expand All @@ -672,7 +736,33 @@ impl<R: ChunkReader> PageReader for SerializedPageReader<R> {

fn skip_next_page(&mut self) -> Result<()> {
match &mut self.state {
SerializedPageReaderState::Values {..} =>{ Err(general_err!("Must set page_offset_index when using skip_next_page in SerializedPageReader.")) },
SerializedPageReaderState::Values {
offset,
remaining_bytes,
buffered_header,
} => {
if let Some(header) = buffered_header {
let remaining_in_page = header.compressed_page_size as usize;

*offset += remaining_in_page;
*remaining_bytes -= remaining_in_page;

*buffered_header = None;
Ok(())
} else {
let mut read =
self.reader.get_read(*offset as u64, *remaining_bytes)?;

let (header_len, header) = read_page_header_len(&mut read)?;

let data_len = header.compressed_page_size as usize;

*offset += header_len + data_len;
*remaining_bytes -= header_len + data_len;

Ok(())
}
}
SerializedPageReaderState::Pages { page_locations, .. } => {
page_locations.pop_front();

Expand Down Expand Up @@ -1470,6 +1560,34 @@ mod tests {
assert_eq!(vec.len(), 163);
}

#[test]
fn test_skip_page_without_offset_index() {
let test_file = get_test_file("alltypes_tiny_pages_plain.parquet");

let reader_result = SerializedFileReader::new(test_file);
let reader = reader_result.unwrap();

let row_group_reader = reader.get_row_group(0).unwrap();

//use 'int_col', Boundary order: ASCENDING, total 325 pages.
let mut column_page_reader = row_group_reader.get_column_page_reader(4).unwrap();

let mut vec = vec![];

for i in 0..325 {
if i % 2 == 0 {
vec.push(column_page_reader.get_next_page().unwrap().unwrap());
} else {
column_page_reader.skip_next_page().unwrap();
}
}
//check read all pages.
assert!(column_page_reader.peek_next_page().unwrap().is_none());
assert!(column_page_reader.get_next_page().unwrap().is_none());

assert_eq!(vec.len(), 163);
}

#[test]
fn test_peek_page_with_dictionary_page() {
let test_file = get_test_file("alltypes_tiny_pages.parquet");
Expand Down Expand Up @@ -1513,4 +1631,46 @@ mod tests {

assert_eq!(vec.len(), 352);
}

#[test]
fn test_peek_page_with_dictionary_page_no_index() {
let test_file = get_test_file("alltypes_tiny_pages.parquet");

let reader_result = SerializedFileReader::new(test_file);
let reader = reader_result.unwrap();
let row_group_reader = reader.get_row_group(0).unwrap();

//use 'string_col', Boundary order: UNORDERED, total 352 data ages and 1 dictionary page.
let mut column_page_reader = row_group_reader.get_column_page_reader(9).unwrap();

let mut vec = vec![];

let meta = column_page_reader.peek_next_page().unwrap().unwrap();
assert!(meta.is_dict);
let page = column_page_reader.get_next_page().unwrap().unwrap();
assert!(matches!(page.page_type(), basic::PageType::DICTIONARY_PAGE));

for i in 0..352 {
let meta = column_page_reader.peek_next_page().unwrap().unwrap();
// have checked with `parquet-tools column-index -c string_col ./alltypes_tiny_pages.parquet`
// page meta has two scenarios(21, 20) of num_rows expect last page has 11 rows.
if i != 351 {
assert!((meta.num_rows == 21) || (meta.num_rows == 20));
} else {
// last page first row index is 7290, total row count is 7300
// because first row start with zero, last page row count should be 10.
assert_eq!(meta.num_rows, 10);
}
assert!(!meta.is_dict);
vec.push(meta);
let page = column_page_reader.get_next_page().unwrap().unwrap();
assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE));
}

//check read all pages.
assert!(column_page_reader.peek_next_page().unwrap().is_none());
assert!(column_page_reader.get_next_page().unwrap().is_none());

assert_eq!(vec.len(), 352);
}
}