From 841bd2db27cda2369fa4e0d6b7f3728ebaed11ae Mon Sep 17 00:00:00 2001 From: Dan Harris Date: Wed, 17 Aug 2022 18:04:20 -0400 Subject: [PATCH 1/3] Fallback for skip_page/peek_page without OffsetIndex in SerializedPageReader --- parquet/src/file/serialized_reader.rs | 193 +++++++++++++++++++++++--- 1 file changed, 176 insertions(+), 17 deletions(-) diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs index f7a568e9258..48c5ef30dfe 100644 --- a/parquet/src/file/serialized_reader.rs +++ b/parquet/src/file/serialized_reader.rs @@ -491,6 +491,9 @@ enum SerializedPageReaderState { /// The length of the chunk in bytes remaining_bytes: usize, + + /// If the next page has been "peeked", we cache the decoded header here + buffered_header: Option, }, Pages { /// Remaining page locations @@ -550,6 +553,7 @@ impl SerializedPageReader { None => SerializedPageReaderState::Values { offset: start as usize, remaining_bytes: len as usize, + buffered_header: None, }, }; @@ -577,7 +581,7 @@ impl PageReader for SerializedPageReader { SerializedPageReaderState::Values { offset, remaining_bytes: remaining, - .. + buffered_header, } => { if *remaining == 0 { return Ok(None); @@ -585,14 +589,30 @@ impl PageReader for SerializedPageReader { let mut read = self.reader.get_read(*offset as u64, *remaining)?; - let (header_len, header) = read_page_header_len(&mut read)?; - let data_len = header.compressed_page_size as usize; - *offset += header_len + data_len; - *remaining -= header_len + data_len; + let header = if let Some(header) = buffered_header.take() { + if header.type_ == PageType::IndexPage { + continue; + } - if header.type_ == PageType::IndexPage { - continue; - } + let data_len = header.compressed_page_size as usize; + *offset += data_len; + *remaining -= data_len; + + header + } else { + let (header_len, header) = read_page_header_len(&mut read)?; + let data_len = header.compressed_page_size as usize; + *offset += header_len + data_len; + *remaining -= header_len + data_len; + + if header.type_ == PageType::IndexPage { + continue; + } + + header + }; + + let data_len = header.compressed_page_size as usize; let mut buffer = Vec::with_capacity(data_len); let read = read.take(data_len as u64).read_to_end(&mut buffer)?; @@ -648,20 +668,63 @@ impl PageReader for SerializedPageReader { } fn peek_next_page(&mut self) -> Result> { - match &self.state { - SerializedPageReaderState::Values {..} => Err(general_err!("Must set page_offset_index when using peek_next_page in SerializedPageReader.")), - SerializedPageReaderState::Pages { page_locations, dictionary_page, total_rows } => { + match &mut self.state { + SerializedPageReaderState::Values { + offset, + remaining_bytes, + buffered_header, + } => { + loop { + if *remaining_bytes == 0 { + return Ok(None); + } + + return if let Some(header) = buffered_header.as_ref() { + if let Ok(page_metadata) = header.try_into() { + Ok(Some(page_metadata)) + } else { + // For unknown page type (e.g., INDEX_PAGE), skip and read next. + *buffered_header = None; + continue; + } + } else { + let mut read = + self.reader.get_read(*offset as u64, *remaining_bytes)?; + + let (header_len, header) = read_page_header_len(&mut read)?; + + *offset += header_len as usize; + *remaining_bytes -= header_len as usize; + + if let Ok(page_metadata) = (&header).try_into() { + *buffered_header = Some(header); + return Ok(Some(page_metadata)); + } else { + // For unknown page type (e.g., INDEX_PAGE), skip and read next. + continue; + }; + }; + } + } + SerializedPageReaderState::Pages { + page_locations, + dictionary_page, + total_rows, + } => { if dictionary_page.is_some() { - Ok(Some(PageMetadata{ + Ok(Some(PageMetadata { num_rows: 0, - is_dict: true + is_dict: true, })) } else if let Some(page) = page_locations.front() { - let next_rows = page_locations.get(1).map(|x| x.first_row_index as usize).unwrap_or(*total_rows); + let next_rows = page_locations + .get(1) + .map(|x| x.first_row_index as usize) + .unwrap_or(*total_rows); - Ok(Some(PageMetadata{ + Ok(Some(PageMetadata { num_rows: next_rows - page.first_row_index as usize, - is_dict: false + is_dict: false, })) } else { Ok(None) @@ -672,7 +735,33 @@ impl PageReader for SerializedPageReader { fn skip_next_page(&mut self) -> Result<()> { match &mut self.state { - SerializedPageReaderState::Values {..} =>{ Err(general_err!("Must set page_offset_index when using skip_next_page in SerializedPageReader.")) }, + SerializedPageReaderState::Values { + offset, + remaining_bytes, + buffered_header, + } => { + if let Some(header) = buffered_header { + let remaining_in_page = header.compressed_page_size as usize; + + *offset += remaining_in_page; + *remaining_bytes -= remaining_in_page; + + *buffered_header = None; + Ok(()) + } else { + let mut read = + self.reader.get_read(*offset as u64, *remaining_bytes)?; + + let (header_len, header) = read_page_header_len(&mut read)?; + + let data_len = header.compressed_page_size as usize; + + *offset += header_len + data_len; + *remaining_bytes -= header_len + data_len; + + Ok(()) + } + } SerializedPageReaderState::Pages { page_locations, .. } => { page_locations.pop_front(); @@ -1470,6 +1559,34 @@ mod tests { assert_eq!(vec.len(), 163); } + #[test] + fn test_skip_page_without_offset_index() { + let test_file = get_test_file("alltypes_tiny_pages_plain.parquet"); + + let reader_result = SerializedFileReader::new(test_file); + let reader = reader_result.unwrap(); + + let row_group_reader = reader.get_row_group(0).unwrap(); + + //use 'int_col', Boundary order: ASCENDING, total 325 pages. + let mut column_page_reader = row_group_reader.get_column_page_reader(4).unwrap(); + + let mut vec = vec![]; + + for i in 0..325 { + if i % 2 == 0 { + vec.push(column_page_reader.get_next_page().unwrap().unwrap()); + } else { + column_page_reader.skip_next_page().unwrap(); + } + } + //check read all pages. + assert!(column_page_reader.peek_next_page().unwrap().is_none()); + assert!(column_page_reader.get_next_page().unwrap().is_none()); + + assert_eq!(vec.len(), 163); + } + #[test] fn test_peek_page_with_dictionary_page() { let test_file = get_test_file("alltypes_tiny_pages.parquet"); @@ -1513,4 +1630,46 @@ mod tests { assert_eq!(vec.len(), 352); } + + #[test] + fn test_peek_page_with_dictionary_page_no_index() { + let test_file = get_test_file("alltypes_tiny_pages.parquet"); + + let reader_result = SerializedFileReader::new(test_file); + let reader = reader_result.unwrap(); + let row_group_reader = reader.get_row_group(0).unwrap(); + + //use 'string_col', Boundary order: UNORDERED, total 352 data ages and 1 dictionary page. + let mut column_page_reader = row_group_reader.get_column_page_reader(9).unwrap(); + + let mut vec = vec![]; + + let meta = column_page_reader.peek_next_page().unwrap().unwrap(); + assert!(meta.is_dict); + let page = column_page_reader.get_next_page().unwrap().unwrap(); + assert!(matches!(page.page_type(), basic::PageType::DICTIONARY_PAGE)); + + for i in 0..352 { + let meta = column_page_reader.peek_next_page().unwrap().unwrap(); + // have checked with `parquet-tools column-index -c string_col ./alltypes_tiny_pages.parquet` + // page meta has two scenarios(21, 20) of num_rows expect last page has 11 rows. + if i != 351 { + assert!((meta.num_rows == 21) || (meta.num_rows == 20)); + } else { + // last page first row index is 7290, total row count is 7300 + // because first row start with zero, last page row count should be 10. + assert_eq!(meta.num_rows, 10); + } + assert!(!meta.is_dict); + vec.push(meta); + let page = column_page_reader.get_next_page().unwrap().unwrap(); + assert!(matches!(page.page_type(), basic::PageType::DATA_PAGE)); + } + + //check read all pages. + assert!(column_page_reader.peek_next_page().unwrap().is_none()); + assert!(column_page_reader.get_next_page().unwrap().is_none()); + + assert_eq!(vec.len(), 352); + } } From a507a7ffd014137f33df998b9f3dd4a787c14f53 Mon Sep 17 00:00:00 2001 From: Dan Harris Date: Wed, 17 Aug 2022 18:26:57 -0400 Subject: [PATCH 2/3] Fix #2434 and clippy warning --- parquet/src/file/page_index/index_reader.rs | 29 +++++++++++++++++++++ parquet/src/file/serialized_reader.rs | 1 + 2 files changed, 30 insertions(+) diff --git a/parquet/src/file/page_index/index_reader.rs b/parquet/src/file/page_index/index_reader.rs index e3f37fbc661..b7cd9a71220 100644 --- a/parquet/src/file/page_index/index_reader.rs +++ b/parquet/src/file/page_index/index_reader.rs @@ -64,6 +64,11 @@ pub fn read_pages_locations( ) -> Result>, ParquetError> { let (offset, total_length) = get_location_offset_and_total_length(chunks)?; + // If there is no OffsetIndex, return an empty vector + if total_length == 0 { + return Ok(vec![]); + } + //read all need data into buffer let mut reader = reader.get_read(offset, total_length)?; let mut data = vec![0; total_length]; @@ -162,3 +167,27 @@ fn deserialize_column_index( Ok(index) } + +#[cfg(test)] +mod tests { + use crate::file::footer::parse_metadata; + use crate::file::page_index::index_reader::read_pages_locations; + use bytes::Bytes; + + #[test] + fn read_page_locations_no_index_in_file() { + let testdata = arrow::util::test_util::parquet_test_data(); + let path = format!("{}/alltypes_plain.parquet", testdata); + let data = Bytes::from(std::fs::read(path).unwrap()); + + let metadata = parse_metadata(&data).expect("parsing metadata"); + + let columns = metadata.row_group(0).columns(); + + let locations = read_pages_locations(&data, columns).expect("reading locations"); + + // If a parquet file does not have an OffsetIndex, read_page_locations should just return + // an empty vec and not an error + assert!(locations.is_empty()); + } +} diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs index 48c5ef30dfe..602b6994257 100644 --- a/parquet/src/file/serialized_reader.rs +++ b/parquet/src/file/serialized_reader.rs @@ -484,6 +484,7 @@ pub(crate) fn decode_page( Ok(result) } +#[allow(clippy::large_enum_variant)] enum SerializedPageReaderState { Values { /// The current byte offset in the reader From 2401335dcd3ab9224d6f5f5d4907bc832d79315a Mon Sep 17 00:00:00 2001 From: Dan Harris Date: Wed, 17 Aug 2022 18:39:06 -0400 Subject: [PATCH 3/3] Appease clippy --- parquet/src/arrow/arrow_reader/selection.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/parquet/src/arrow/arrow_reader/selection.rs b/parquet/src/arrow/arrow_reader/selection.rs index 2cce63c3d1a..0fd27808fbb 100644 --- a/parquet/src/arrow/arrow_reader/selection.rs +++ b/parquet/src/arrow/arrow_reader/selection.rs @@ -119,6 +119,7 @@ impl RowSelection { } /// Given an offset index, return the offset ranges for all data pages selected by `self` + #[allow(unused)] pub(crate) fn scan_ranges( &self, page_locations: &[PageLocation],