Skip to content

Commit

Permalink
Support skip_page missing OffsetIndex Fallback in SerializedPageReader
Browse files Browse the repository at this point in the history
  • Loading branch information
Ted-Jiang committed Aug 16, 2022
1 parent 3f0e12d commit fa7c62a
Showing 1 changed file with 106 additions and 11 deletions.
117 changes: 106 additions & 11 deletions parquet/src/file/serialized_reader.rs
Expand Up @@ -467,9 +467,32 @@ pub(crate) fn decode_page(
Ok(result)
}

pub(crate) fn get_num_values_from_data_header(page_header: PageHeader) -> Result<i32> {
match page_header.type_ {
PageType::DataPage => {
let header = page_header.data_page_header.unwrap();
Ok(header.num_values)
}
PageType::DataPageV2 => {
let header = page_header.data_page_header_v2.unwrap();
Ok(header.num_values)
}
_ => {
unimplemented!(
"Page type {:?} is not supported in get_num_values_from_header",
page_header.type_
)
}
}
}

enum SerializedPages<T: Read> {
/// Read entire chunk
Chunk { buf: T },
Chunk {
buf: T,
// If the next page header has already been "peeked", we will cache it here
next_page_header: Option<PageHeader>,
},
/// Read operate pages which can skip.
Pages {
offset_index: Vec<PageLocation>,
Expand Down Expand Up @@ -508,7 +531,10 @@ impl<T: Read> SerializedPageReader<T> {
) -> Result<Self> {
let decompressor = create_codec(compression)?;
let result = Self {
buf: SerializedPages::Chunk { buf },
buf: SerializedPages::Chunk {
buf,
next_page_header: None,
},
total_num_values,
seen_num_values: 0,
decompressor,
Expand Down Expand Up @@ -556,9 +582,19 @@ impl<T: Read + Send> PageReader for SerializedPageReader<T> {
let mut cursor;
let mut dictionary_cursor;
while self.seen_num_values < self.total_num_values {
let page_header;
match &mut self.buf {
SerializedPages::Chunk { buf } => {
SerializedPages::Chunk {
buf,
next_page_header,
} => {
cursor = buf;
page_header = if let Some(page_header) = next_page_header.take() {
// The next page header has already been peeked, so use the cached value
page_header
} else {
read_page_header(cursor)?
}
}
SerializedPages::Pages {
offset_index,
Expand All @@ -574,11 +610,11 @@ impl<T: Read + Send> PageReader for SerializedPageReader<T> {
} else {
cursor = page_bufs.get_mut(*seen_num_data_pages).unwrap();
}

page_header = read_page_header(cursor)?;
}
}

let page_header = read_page_header(cursor)?;

let to_read = page_header.compressed_page_size as usize;
let mut buffer = Vec::with_capacity(to_read);
let read = cursor.take(to_read as u64).read_to_end(&mut buffer)?;
Expand Down Expand Up @@ -639,8 +675,47 @@ impl<T: Read + Send> PageReader for SerializedPageReader<T> {

fn peek_next_page(&mut self) -> Result<Option<PageMetadata>> {
match &mut self.buf {
SerializedPages::Chunk { .. } => { Err(general_err!("Must set page_offset_index when using peek_next_page in SerializedPageReader.")) }
SerializedPages::Pages { offset_index, seen_num_data_pages, has_dictionary_page_to_read, .. } => {
SerializedPages::Chunk {
next_page_header,
buf,
} => {
loop {
if self.seen_num_values < self.total_num_values {
break;
}
return if let Some(buffered_header) = next_page_header.as_ref() {
if let Ok(page_metadata) = buffered_header.try_into() {
Ok(Some(page_metadata))
} else {
// For unknown page type (e.g., INDEX_PAGE), skip and read next.
*next_page_header = None;
continue;
}
} else {
let mut cursor = &mut *buf;
let page_header = read_page_header(&mut cursor)?;

let page_metadata =
if let Ok(page_metadata) = (&page_header).try_into() {
Ok(Some(page_metadata))
} else {
// For unknown page type (e.g., INDEX_PAGE), skip and read next.
continue;
};

*next_page_header = Some(page_header);
page_metadata
};
}

Ok(None)
}
SerializedPages::Pages {
offset_index,
seen_num_data_pages,
has_dictionary_page_to_read,
..
} => {
if *seen_num_data_pages >= offset_index.len() {
Ok(None)
} else if *seen_num_data_pages == 0 && *has_dictionary_page_to_read {
Expand All @@ -667,12 +742,32 @@ impl<T: Read + Send> PageReader for SerializedPageReader<T> {

fn skip_next_page(&mut self) -> Result<()> {
match &mut self.buf {
SerializedPages::Chunk { .. } => { Err(general_err!("Must set page_offset_index when using skip_next_page in SerializedPageReader.")) }
SerializedPages::Pages { offset_index, seen_num_data_pages, .. } => {
SerializedPages::Chunk {
next_page_header,
buf,
} => {
if let Some(header) = next_page_header.take() {
let to_skip = header.compressed_page_size;
let mut buffer = Vec::with_capacity(to_skip as usize);
buf.read_exact(&mut buffer)?;
self.seen_num_values +=
get_num_values_from_data_header(header)? as i64;
} else {
return Err(general_err!(
"SerializedPages::Chunk before skip_next_page must peek the page metadata."
));
}
Ok(())
}
SerializedPages::Pages {
offset_index,
seen_num_data_pages,
..
} => {
if offset_index.len() <= *seen_num_data_pages {
Err(general_err!(
"seen_num_data_pages is out of bound in SerializedPageReader."
))
"seen_num_data_pages is out of bound in SerializedPageReader."
))
} else {
*seen_num_data_pages += 1;
// Notice: maybe need 'self.seen_num_values += xxx', for now we can not get skip values in skip_next_page.
Expand Down

0 comments on commit fa7c62a

Please sign in to comment.