diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs index 1a6a9026d5d..bcba5b01878 100644 --- a/parquet/src/file/serialized_reader.rs +++ b/parquet/src/file/serialized_reader.rs @@ -480,6 +480,7 @@ pub(crate) fn decode_page( Ok(result) } +#[allow(clippy::large_enum_variant)] enum SerializedPageReaderState { Values { /// The current byte offset in the reader @@ -487,6 +488,9 @@ enum SerializedPageReaderState { /// The length of the chunk in bytes remaining_bytes: usize, + + // If the next page header has already been "peeked", we will cache it and it`s length here + next_page_header: Option, }, Pages { /// Remaining page locations @@ -546,6 +550,7 @@ impl SerializedPageReader { None => SerializedPageReaderState::Values { offset: start as usize, remaining_bytes: len as usize, + next_page_header: None, }, }; @@ -573,18 +578,24 @@ impl PageReader for SerializedPageReader { SerializedPageReaderState::Values { offset, remaining_bytes: remaining, - .. + next_page_header, } => { if *remaining == 0 { return Ok(None); } let mut read = self.reader.get_read(*offset as u64, *remaining)?; - - let (header_len, header) = read_page_header_len(&mut read)?; + let header = if let Some(header) = next_page_header.take() { + header + } else { + let (header_len, header) = read_page_header_len(&mut read)?; + *offset += header_len; + *remaining -= header_len; + header + }; let data_len = header.compressed_page_size as usize; - *offset += header_len + data_len; - *remaining -= header_len + data_len; + *offset += data_len; + *remaining -= data_len; if header.type_ == PageType::IndexPage { continue; @@ -657,20 +668,60 @@ impl PageReader for SerializedPageReader { } fn peek_next_page(&mut self) -> Result> { - match &self.state { - SerializedPageReaderState::Values {..} => Err(general_err!("Must set page_offset_index when using peek_next_page in SerializedPageReader.")), - SerializedPageReaderState::Pages { page_locations, dictionary_page, total_rows } => { + match &mut self.state { + SerializedPageReaderState::Values { + offset, + remaining_bytes, + next_page_header, + } => { + loop { + if *remaining_bytes == 0 { + return Ok(None); + } + return if let Some(header) = next_page_header.take() { + if let Ok(page_meta) = (&header).try_into() { + Ok(Some(page_meta)) + } else { + // For unknown page type (e.g., INDEX_PAGE), skip and read next. + *next_page_header = None; + continue; + } + } else { + let mut read = + self.reader.get_read(*offset as u64, *remaining_bytes)?; + let (header_len, header) = read_page_header_len(&mut read)?; + *offset += header_len; + *remaining_bytes -= header_len; + let page_meta = if let Ok(page_meta) = (&header).try_into() { + Ok(Some(page_meta)) + } else { + // For unknown page type (e.g., INDEX_PAGE), skip and read next. + continue; + }; + *next_page_header = Some(header); + page_meta + }; + } + } + SerializedPageReaderState::Pages { + page_locations, + dictionary_page, + total_rows, + } => { if dictionary_page.is_some() { - Ok(Some(PageMetadata{ + Ok(Some(PageMetadata { num_rows: 0, - is_dict: true + is_dict: true, })) } else if let Some(page) = page_locations.front() { - let next_rows = page_locations.get(1).map(|x| x.first_row_index as usize).unwrap_or(*total_rows); + let next_rows = page_locations + .get(1) + .map(|x| x.first_row_index as usize) + .unwrap_or(*total_rows); - Ok(Some(PageMetadata{ + Ok(Some(PageMetadata { num_rows: next_rows - page.first_row_index as usize, - is_dict: false + is_dict: false, })) } else { Ok(None) @@ -681,7 +732,24 @@ impl PageReader for SerializedPageReader { fn skip_next_page(&mut self) -> Result<()> { match &mut self.state { - SerializedPageReaderState::Values {..} =>{ Err(general_err!("Must set page_offset_index when using skip_next_page in SerializedPageReader.")) }, + SerializedPageReaderState::Values { + offset, + remaining_bytes, + next_page_header, + } => { + if let Some(buffered_header) = next_page_header.take() { + // The next page header has already been peeked, so just advance the offset + *offset += buffered_header.compressed_page_size as usize; + } else { + let mut read = + self.reader.get_read(*offset as u64, *remaining_bytes)?; + let (header_len, header) = read_page_header_len(&mut read)?; + let data_page_size = header.compressed_page_size as usize; + *offset += header_len + data_page_size; + *remaining_bytes -= header_len + data_page_size; + } + Ok(()) + } SerializedPageReaderState::Pages { page_locations, .. } => { page_locations.pop_front(); @@ -1479,6 +1547,35 @@ mod tests { assert_eq!(vec.len(), 163); } + #[test] + fn test_skip_page_without_offset_index() { + let test_file = get_test_file("alltypes_tiny_pages_plain.parquet"); + + // use default SerializedFileReader without read offsetIndex + let reader_result = SerializedFileReader::new(test_file); + let reader = reader_result.unwrap(); + + let row_group_reader = reader.get_row_group(0).unwrap(); + + //use 'int_col', Boundary order: ASCENDING, total 325 pages. + let mut column_page_reader = row_group_reader.get_column_page_reader(4).unwrap(); + + let mut vec = vec![]; + + for i in 0..325 { + if i % 2 == 0 { + vec.push(column_page_reader.get_next_page().unwrap().unwrap()); + } else { + column_page_reader.skip_next_page().unwrap(); + } + } + //check read all pages. + assert!(column_page_reader.peek_next_page().unwrap().is_none()); + assert!(column_page_reader.get_next_page().unwrap().is_none()); + + assert_eq!(vec.len(), 163); + } + #[test] fn test_peek_page_with_dictionary_page() { let test_file = get_test_file("alltypes_tiny_pages.parquet");