From fa7c62af5fb85fe16a309e15b91acc15348198b5 Mon Sep 17 00:00:00 2001 From: yangjiang Date: Tue, 16 Aug 2022 11:40:23 +0800 Subject: [PATCH] Support skip_page missing OffsetIndex Fallback in SerializedPageReader --- parquet/src/file/serialized_reader.rs | 117 +++++++++++++++++++++++--- 1 file changed, 106 insertions(+), 11 deletions(-) diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs index 0b7451f4bea..6ae7841b0d8 100644 --- a/parquet/src/file/serialized_reader.rs +++ b/parquet/src/file/serialized_reader.rs @@ -467,9 +467,32 @@ pub(crate) fn decode_page( Ok(result) } +pub(crate) fn get_num_values_from_data_header(page_header: PageHeader) -> Result { + match page_header.type_ { + PageType::DataPage => { + let header = page_header.data_page_header.unwrap(); + Ok(header.num_values) + } + PageType::DataPageV2 => { + let header = page_header.data_page_header_v2.unwrap(); + Ok(header.num_values) + } + _ => { + unimplemented!( + "Page type {:?} is not supported in get_num_values_from_header", + page_header.type_ + ) + } + } +} + enum SerializedPages { /// Read entire chunk - Chunk { buf: T }, + Chunk { + buf: T, + // If the next page header has already been "peeked", we will cache it here + next_page_header: Option, + }, /// Read operate pages which can skip. Pages { offset_index: Vec, @@ -508,7 +531,10 @@ impl SerializedPageReader { ) -> Result { let decompressor = create_codec(compression)?; let result = Self { - buf: SerializedPages::Chunk { buf }, + buf: SerializedPages::Chunk { + buf, + next_page_header: None, + }, total_num_values, seen_num_values: 0, decompressor, @@ -556,9 +582,19 @@ impl PageReader for SerializedPageReader { let mut cursor; let mut dictionary_cursor; while self.seen_num_values < self.total_num_values { + let page_header; match &mut self.buf { - SerializedPages::Chunk { buf } => { + SerializedPages::Chunk { + buf, + next_page_header, + } => { cursor = buf; + page_header = if let Some(page_header) = next_page_header.take() { + // The next page header has already been peeked, so use the cached value + page_header + } else { + read_page_header(cursor)? + } } SerializedPages::Pages { offset_index, @@ -574,11 +610,11 @@ impl PageReader for SerializedPageReader { } else { cursor = page_bufs.get_mut(*seen_num_data_pages).unwrap(); } + + page_header = read_page_header(cursor)?; } } - let page_header = read_page_header(cursor)?; - let to_read = page_header.compressed_page_size as usize; let mut buffer = Vec::with_capacity(to_read); let read = cursor.take(to_read as u64).read_to_end(&mut buffer)?; @@ -639,8 +675,47 @@ impl PageReader for SerializedPageReader { fn peek_next_page(&mut self) -> Result> { match &mut self.buf { - SerializedPages::Chunk { .. } => { Err(general_err!("Must set page_offset_index when using peek_next_page in SerializedPageReader.")) } - SerializedPages::Pages { offset_index, seen_num_data_pages, has_dictionary_page_to_read, .. } => { + SerializedPages::Chunk { + next_page_header, + buf, + } => { + loop { + if self.seen_num_values < self.total_num_values { + break; + } + return if let Some(buffered_header) = next_page_header.as_ref() { + if let Ok(page_metadata) = buffered_header.try_into() { + Ok(Some(page_metadata)) + } else { + // For unknown page type (e.g., INDEX_PAGE), skip and read next. + *next_page_header = None; + continue; + } + } else { + let mut cursor = &mut *buf; + let page_header = read_page_header(&mut cursor)?; + + let page_metadata = + if let Ok(page_metadata) = (&page_header).try_into() { + Ok(Some(page_metadata)) + } else { + // For unknown page type (e.g., INDEX_PAGE), skip and read next. + continue; + }; + + *next_page_header = Some(page_header); + page_metadata + }; + } + + Ok(None) + } + SerializedPages::Pages { + offset_index, + seen_num_data_pages, + has_dictionary_page_to_read, + .. + } => { if *seen_num_data_pages >= offset_index.len() { Ok(None) } else if *seen_num_data_pages == 0 && *has_dictionary_page_to_read { @@ -667,12 +742,32 @@ impl PageReader for SerializedPageReader { fn skip_next_page(&mut self) -> Result<()> { match &mut self.buf { - SerializedPages::Chunk { .. } => { Err(general_err!("Must set page_offset_index when using skip_next_page in SerializedPageReader.")) } - SerializedPages::Pages { offset_index, seen_num_data_pages, .. } => { + SerializedPages::Chunk { + next_page_header, + buf, + } => { + if let Some(header) = next_page_header.take() { + let to_skip = header.compressed_page_size; + let mut buffer = Vec::with_capacity(to_skip as usize); + buf.read_exact(&mut buffer)?; + self.seen_num_values += + get_num_values_from_data_header(header)? as i64; + } else { + return Err(general_err!( + "SerializedPages::Chunk before skip_next_page must peek the page metadata." + )); + } + Ok(()) + } + SerializedPages::Pages { + offset_index, + seen_num_data_pages, + .. + } => { if offset_index.len() <= *seen_num_data_pages { Err(general_err!( - "seen_num_data_pages is out of bound in SerializedPageReader." - )) + "seen_num_data_pages is out of bound in SerializedPageReader." + )) } else { *seen_num_data_pages += 1; // Notice: maybe need 'self.seen_num_values += xxx', for now we can not get skip values in skip_next_page.