Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement peek_next_page and skip_next_page for `InMemoryColumnCh… #2155

Merged
merged 2 commits into from Jul 26, 2022
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
195 changes: 190 additions & 5 deletions parquet/src/arrow/async_reader.rs
Expand Up @@ -87,7 +87,7 @@ use std::task::{Context, Poll};
use bytes::Bytes;
use futures::future::{BoxFuture, FutureExt};
use futures::stream::Stream;
use parquet_format::PageType;
use parquet_format::{PageHeader, PageType};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};

use arrow::datatypes::SchemaRef;
Expand Down Expand Up @@ -520,6 +520,8 @@ struct InMemoryColumnChunkReader {
decompressor: Option<Box<dyn Codec>>,
offset: usize,
seen_num_values: i64,
// If the next page header has already been "peeked", we will cache it here
next_page_header: Option<PageHeader>,
}

impl InMemoryColumnChunkReader {
Expand All @@ -531,6 +533,7 @@ impl InMemoryColumnChunkReader {
decompressor,
offset: 0,
seen_num_values: 0,
next_page_header: None,
};
Ok(result)
}
Expand All @@ -548,10 +551,17 @@ impl PageReader for InMemoryColumnChunkReader {
fn get_next_page(&mut self) -> Result<Option<Page>> {
while self.seen_num_values < self.chunk.num_values {
let mut cursor = Cursor::new(&self.chunk.data.as_ref()[self.offset..]);
let page_header = read_page_header(&mut cursor)?;
let page_header = if let Some(page_header) = self.next_page_header.take() {
// The next page header has already been peeked, so use the cached value
page_header
} else {
let page_header = read_page_header(&mut cursor)?;
self.offset += cursor.position() as usize;
page_header
};

let compressed_size = page_header.compressed_page_size as usize;

self.offset += cursor.position() as usize;
let start_offset = self.offset;
let end_offset = self.offset + compressed_size;
self.offset = end_offset;
Expand Down Expand Up @@ -589,11 +599,87 @@ impl PageReader for InMemoryColumnChunkReader {
}

fn peek_next_page(&mut self) -> Result<Option<PageMetadata>> {
Err(nyi_err!("https://github.com/apache/arrow-rs/issues/1792"))
while self.seen_num_values < self.chunk.num_values {
return if let Some(buffered_header) = self.next_page_header.as_ref() {
match buffered_header.type_ {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we could extract this conversion into a From implementation? This would reduce code duplication in this function and potentially elsewhere

PageType::DataPage => Ok(Some(PageMetadata {
num_rows: buffered_header
.data_page_header
.as_ref()
.unwrap()
.num_values as usize,
is_dict: false,
})),
PageType::DictionaryPage => Ok(Some(PageMetadata {
num_rows: usize::MIN,
is_dict: true,
})),
PageType::DataPageV2 => Ok(Some(PageMetadata {
num_rows: buffered_header
.data_page_header_v2
.as_ref()
.unwrap()
.num_rows as usize,
is_dict: false,
})),
PageType::IndexPage => {
// For unknown page type (e.g., INDEX_PAGE), skip and read next.
self.next_page_header = None;
continue;
}
}
} else {
let mut cursor = Cursor::new(&self.chunk.data.as_ref()[self.offset..]);
let page_header = read_page_header(&mut cursor)?;
self.offset += cursor.position() as usize;

let page_metadata = match &page_header.type_ {
PageType::DataPage => Ok(Some(PageMetadata {
num_rows: page_header
.data_page_header
.as_ref()
.unwrap()
.num_values as usize,
is_dict: false,
})),
PageType::DictionaryPage => Ok(Some(PageMetadata {
num_rows: usize::MIN,
is_dict: true,
})),
PageType::DataPageV2 => Ok(Some(PageMetadata {
num_rows: page_header
.data_page_header_v2
.as_ref()
.unwrap()
.num_rows as usize,
is_dict: false,
})),
PageType::IndexPage => {
// For unknown page type (e.g., INDEX_PAGE), skip and read next.
continue;
}
};

self.next_page_header = Some(page_header);
page_metadata
};
}

Ok(None)
}

fn skip_next_page(&mut self) -> Result<()> {
Err(nyi_err!("https://github.com/apache/arrow-rs/issues/1792"))
if let Some(buffered_header) = self.next_page_header.take() {
// The next page header has already been peeked, so just advance the offset
self.offset += buffered_header.compressed_page_size as usize;
} else {
let mut cursor = Cursor::new(&self.chunk.data.as_ref()[self.offset..]);
let page_header = read_page_header(&mut cursor)?;
self.offset += cursor.position() as usize;
self.offset += page_header.compressed_page_size as usize;
}

Ok(())
}
}

Expand Down Expand Up @@ -699,4 +785,103 @@ mod tests {
]
);
}

#[tokio::test]
async fn test_in_memory_column_chunk_reader() {
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{}/alltypes_plain.parquet", testdata);
let data = Bytes::from(std::fs::read(path).unwrap());

let metadata = crate::file::footer::parse_metadata(&data).unwrap();

let column_metadata = metadata.row_group(0).column(0);

let (start, length) = column_metadata.byte_range();

let column_data = data.slice(start as usize..(start + length) as usize);

let mut reader = InMemoryColumnChunkReader::new(InMemoryColumnChunk {
num_values: column_metadata.num_values(),
compression: column_metadata.compression(),
physical_type: column_metadata.column_type(),
data: column_data,
})
.expect("building reader");

let first_page = reader
.peek_next_page()
.expect("peeking first page")
.expect("first page is empty");

assert!(first_page.is_dict);
assert_eq!(first_page.num_rows, 0);

let first_page = reader
.get_next_page()
.expect("getting first page")
.expect("first page is empty");

assert_eq!(
first_page.page_type(),
crate::basic::PageType::DICTIONARY_PAGE
);
assert_eq!(first_page.num_values(), 8);

let second_page = reader
.peek_next_page()
.expect("peeking second page")
.expect("second page is empty");

assert!(!second_page.is_dict);
assert_eq!(second_page.num_rows, 8);

let second_page = reader
.get_next_page()
.expect("getting second page")
.expect("second page is empty");

assert_eq!(second_page.page_type(), crate::basic::PageType::DATA_PAGE);
assert_eq!(second_page.num_values(), 8);

let third_page = reader.peek_next_page().expect("getting third page");

assert!(third_page.is_none());

let third_page = reader.get_next_page().expect("getting third page");

assert!(third_page.is_none());
}

#[tokio::test]
async fn test_in_memory_column_chunk_reader_skip_page() {
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{}/alltypes_plain.parquet", testdata);
let data = Bytes::from(std::fs::read(path).unwrap());

let metadata = crate::file::footer::parse_metadata(&data).unwrap();

let column_metadata = metadata.row_group(0).column(0);

let (start, length) = column_metadata.byte_range();

let column_data = data.slice(start as usize..(start + length) as usize);

let mut reader = InMemoryColumnChunkReader::new(InMemoryColumnChunk {
num_values: column_metadata.num_values(),
compression: column_metadata.compression(),
physical_type: column_metadata.column_type(),
data: column_data,
})
.expect("building reader");

reader.skip_next_page().expect("skipping first page");

let second_page = reader
.get_next_page()
.expect("getting second page")
.expect("second page is empty");

assert_eq!(second_page.page_type(), crate::basic::PageType::DATA_PAGE);
assert_eq!(second_page.num_values(), 8);
}
}