From 5c217746b5d44269efab94d444fe6be80e0d73f7 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> Date: Wed, 17 Aug 2022 17:01:33 +0800 Subject: [PATCH 1/3] Support skip_page missing OffsetIndex Fallback in SerializedPageReader --- parquet/src/file/serialized_reader.rs | 125 +++++++++++++++++++++++--- 1 file changed, 111 insertions(+), 14 deletions(-) diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs index f7a568e9258..89ef744aca9 100644 --- a/parquet/src/file/serialized_reader.rs +++ b/parquet/src/file/serialized_reader.rs @@ -484,6 +484,7 @@ pub(crate) fn decode_page( Ok(result) } +#[allow(clippy::large_enum_variant)] enum SerializedPageReaderState { Values { /// The current byte offset in the reader @@ -491,6 +492,9 @@ enum SerializedPageReaderState { /// The length of the chunk in bytes remaining_bytes: usize, + + // If the next page header has already been "peeked", we will cache it and it`s length here + next_page_header: Option, }, Pages { /// Remaining page locations @@ -550,6 +554,7 @@ impl SerializedPageReader { None => SerializedPageReaderState::Values { offset: start as usize, remaining_bytes: len as usize, + next_page_header: None, }, }; @@ -577,18 +582,24 @@ impl PageReader for SerializedPageReader { SerializedPageReaderState::Values { offset, remaining_bytes: remaining, - .. + next_page_header, } => { if *remaining == 0 { return Ok(None); } let mut read = self.reader.get_read(*offset as u64, *remaining)?; - - let (header_len, header) = read_page_header_len(&mut read)?; + let header = if let Some(header) = next_page_header.take() { + header + } else { + let (header_len, header) = read_page_header_len(&mut read)?; + *offset += header_len; + *remaining -= header_len; + header + }; let data_len = header.compressed_page_size as usize; - *offset += header_len + data_len; - *remaining -= header_len + data_len; + *offset += data_len; + *remaining -= data_len; if header.type_ == PageType::IndexPage { continue; @@ -648,20 +659,60 @@ impl PageReader for SerializedPageReader { } fn peek_next_page(&mut self) -> Result> { - match &self.state { - SerializedPageReaderState::Values {..} => Err(general_err!("Must set page_offset_index when using peek_next_page in SerializedPageReader.")), - SerializedPageReaderState::Pages { page_locations, dictionary_page, total_rows } => { + match &mut self.state { + SerializedPageReaderState::Values { + offset, + remaining_bytes, + next_page_header, + } => { + loop { + if *remaining_bytes == 0 { + return Ok(None); + } + return if let Some(header) = next_page_header.take() { + if let Ok(page_meta) = (&header).try_into() { + Ok(Some(page_meta)) + } else { + // For unknown page type (e.g., INDEX_PAGE), skip and read next. + *next_page_header = None; + continue; + } + } else { + let mut read = + self.reader.get_read(*offset as u64, *remaining_bytes)?; + let (header_len, header) = read_page_header_len(&mut read)?; + *offset += header_len; + *remaining_bytes -= header_len; + let page_meta = if let Ok(page_meta) = (&header).try_into() { + Ok(Some(page_meta)) + } else { + // For unknown page type (e.g., INDEX_PAGE), skip and read next. + continue; + }; + *next_page_header = Some(header); + page_meta + }; + } + } + SerializedPageReaderState::Pages { + page_locations, + dictionary_page, + total_rows, + } => { if dictionary_page.is_some() { - Ok(Some(PageMetadata{ + Ok(Some(PageMetadata { num_rows: 0, - is_dict: true + is_dict: true, })) } else if let Some(page) = page_locations.front() { - let next_rows = page_locations.get(1).map(|x| x.first_row_index as usize).unwrap_or(*total_rows); + let next_rows = page_locations + .get(1) + .map(|x| x.first_row_index as usize) + .unwrap_or(*total_rows); - Ok(Some(PageMetadata{ + Ok(Some(PageMetadata { num_rows: next_rows - page.first_row_index as usize, - is_dict: false + is_dict: false, })) } else { Ok(None) @@ -672,7 +723,24 @@ impl PageReader for SerializedPageReader { fn skip_next_page(&mut self) -> Result<()> { match &mut self.state { - SerializedPageReaderState::Values {..} =>{ Err(general_err!("Must set page_offset_index when using skip_next_page in SerializedPageReader.")) }, + SerializedPageReaderState::Values { + offset, + remaining_bytes, + next_page_header, + } => { + if let Some(buffered_header) = next_page_header.take() { + // The next page header has already been peeked, so just advance the offset + *offset += buffered_header.compressed_page_size as usize; + } else { + let mut read = + self.reader.get_read(*offset as u64, *remaining_bytes)?; + let (header_len, header) = read_page_header_len(&mut read)?; + let data_page_size = header.compressed_page_size as usize; + *offset += header_len + data_page_size; + *remaining_bytes -= header_len + data_page_size; + } + Ok(()) + } SerializedPageReaderState::Pages { page_locations, .. } => { page_locations.pop_front(); @@ -1470,6 +1538,35 @@ mod tests { assert_eq!(vec.len(), 163); } + #[test] + fn test_skip_page_without_offset_index() { + let test_file = get_test_file("alltypes_tiny_pages_plain.parquet"); + + // use default SerializedFileReader without read offsetIndex + let reader_result = SerializedFileReader::new(test_file); + let reader = reader_result.unwrap(); + + let row_group_reader = reader.get_row_group(0).unwrap(); + + //use 'int_col', Boundary order: ASCENDING, total 325 pages. + let mut column_page_reader = row_group_reader.get_column_page_reader(4).unwrap(); + + let mut vec = vec![]; + + for i in 0..325 { + if i % 2 == 0 { + vec.push(column_page_reader.get_next_page().unwrap().unwrap()); + } else { + column_page_reader.skip_next_page().unwrap(); + } + } + //check read all pages. + assert!(column_page_reader.peek_next_page().unwrap().is_none()); + assert!(column_page_reader.get_next_page().unwrap().is_none()); + + assert_eq!(vec.len(), 163); + } + #[test] fn test_peek_page_with_dictionary_page() { let test_file = get_test_file("alltypes_tiny_pages.parquet"); From 3efc8c881b374c9f90be6688a7ec5e2a97f0fa42 Mon Sep 17 00:00:00 2001 From: yangjiang Date: Thu, 18 Aug 2022 12:15:43 +0800 Subject: [PATCH 2/3] fix clippy --- parquet/src/arrow/arrow_reader/selection.rs | 1 + parquet/src/file/serialized_reader.rs | 42 +++++++++++++++++++++ 2 files changed, 43 insertions(+) diff --git a/parquet/src/arrow/arrow_reader/selection.rs b/parquet/src/arrow/arrow_reader/selection.rs index 2cce63c3d1a..0fd27808fbb 100644 --- a/parquet/src/arrow/arrow_reader/selection.rs +++ b/parquet/src/arrow/arrow_reader/selection.rs @@ -119,6 +119,7 @@ impl RowSelection { } /// Given an offset index, return the offset ranges for all data pages selected by `self` + #[allow(unused)] pub(crate) fn scan_ranges( &self, page_locations: &[PageLocation], diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs index 89ef744aca9..e72c2a148f5 100644 --- a/parquet/src/file/serialized_reader.rs +++ b/parquet/src/file/serialized_reader.rs @@ -1610,4 +1610,46 @@ mod tests { assert_eq!(vec.len(), 352); } + + #[test] + fn test_peek_page_with_dictionary_page_without_offset_index() { + let test_file = get_test_file("alltypes_tiny_pages.parquet"); + + let reader_result = SerializedFileReader::new(test_file); + let reader = reader_result.unwrap(); + let row_group_reader = reader.get_row_group(0).unwrap(); + + //use 'string_col', Boundary order: UNORDERED, total 352 data ages and 1 dictionary page. + let mut column_page_reader = row_group_reader.get_column_page_reader(9).unwrap(); + + let mut vec = vec![]; + + let meta = column_page_reader.peek_next_page().unwrap().unwrap(); + assert!(meta.is_dict); + let page = column_page_reader.get_next_page().unwrap().unwrap(); + assert!(matches!(page.page_type(), basic::PageType::DICTIONARY_PAGE)); + + for i in 0..352 { + let meta = column_page_reader.peek_next_page().unwrap().unwrap(); + // have checked with `parquet-tools column-index -c string_col ./alltypes_tiny_pages.parquet` + // page meta has two scenarios(21, 20) of num_rows expect last page has 11 rows. + if i != 351 { + assert!((meta.num_rows == 21) || (meta.num_rows == 20)); + } else { + // last page first row index is 7290, total row count is 7300 + // because first row start with zero, last page row count should be 10. + assert_eq!(meta.num_rows, 10); + } + assert!(!meta.is_dict); + vec.push(meta); + let page = column_page_reader.get_next_page().unwrap().unwrap(); + assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE)); + } + + //check read all pages. + assert!(column_page_reader.peek_next_page().unwrap().is_none()); + assert!(column_page_reader.get_next_page().unwrap().is_none()); + + assert_eq!(vec.len(), 352); + } } From 3b73331bbad749afc4984fbfeb8fcdc96eab3943 Mon Sep 17 00:00:00 2001 From: yangjiang Date: Thu, 18 Aug 2022 17:40:58 +0800 Subject: [PATCH 3/3] fix comment --- parquet/src/arrow/arrow_reader/selection.rs | 2 +- parquet/src/file/serialized_reader.rs | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/parquet/src/arrow/arrow_reader/selection.rs b/parquet/src/arrow/arrow_reader/selection.rs index 0fd27808fbb..d94ad106086 100644 --- a/parquet/src/arrow/arrow_reader/selection.rs +++ b/parquet/src/arrow/arrow_reader/selection.rs @@ -119,7 +119,7 @@ impl RowSelection { } /// Given an offset index, return the offset ranges for all data pages selected by `self` - #[allow(unused)] + #[cfg(any(test, feature = "async"))] pub(crate) fn scan_ranges( &self, page_locations: &[PageLocation], diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs index e72c2a148f5..a9433bc63b4 100644 --- a/parquet/src/file/serialized_reader.rs +++ b/parquet/src/file/serialized_reader.rs @@ -494,7 +494,7 @@ enum SerializedPageReaderState { remaining_bytes: usize, // If the next page header has already been "peeked", we will cache it and it`s length here - next_page_header: Option, + next_page_header: Option>, }, Pages { /// Remaining page locations @@ -590,7 +590,7 @@ impl PageReader for SerializedPageReader { let mut read = self.reader.get_read(*offset as u64, *remaining)?; let header = if let Some(header) = next_page_header.take() { - header + *header } else { let (header_len, header) = read_page_header_len(&mut read)?; *offset += header_len; @@ -669,8 +669,8 @@ impl PageReader for SerializedPageReader { if *remaining_bytes == 0 { return Ok(None); } - return if let Some(header) = next_page_header.take() { - if let Ok(page_meta) = (&header).try_into() { + return if let Some(header) = next_page_header.as_ref() { + if let Ok(page_meta) = (&**header).try_into() { Ok(Some(page_meta)) } else { // For unknown page type (e.g., INDEX_PAGE), skip and read next. @@ -689,7 +689,7 @@ impl PageReader for SerializedPageReader { // For unknown page type (e.g., INDEX_PAGE), skip and read next. continue; }; - *next_page_header = Some(header); + *next_page_header = Some(Box::new(header)); page_meta }; }