Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement peek_next_page and skip_next_page for `InMemoryColumnCh… #2155

Merged
merged 2 commits into from Jul 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
155 changes: 150 additions & 5 deletions parquet/src/arrow/async_reader.rs
Expand Up @@ -87,7 +87,7 @@ use std::task::{Context, Poll};
use bytes::Bytes;
use futures::future::{BoxFuture, FutureExt};
use futures::stream::Stream;
use parquet_format::PageType;
use parquet_format::{PageHeader, PageType};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};

use arrow::datatypes::SchemaRef;
Expand Down Expand Up @@ -520,6 +520,8 @@ struct InMemoryColumnChunkReader {
decompressor: Option<Box<dyn Codec>>,
offset: usize,
seen_num_values: i64,
// If the next page header has already been "peeked", we will cache it here
next_page_header: Option<PageHeader>,
}

impl InMemoryColumnChunkReader {
Expand All @@ -531,6 +533,7 @@ impl InMemoryColumnChunkReader {
decompressor,
offset: 0,
seen_num_values: 0,
next_page_header: None,
};
Ok(result)
}
Expand All @@ -548,10 +551,17 @@ impl PageReader for InMemoryColumnChunkReader {
fn get_next_page(&mut self) -> Result<Option<Page>> {
while self.seen_num_values < self.chunk.num_values {
let mut cursor = Cursor::new(&self.chunk.data.as_ref()[self.offset..]);
let page_header = read_page_header(&mut cursor)?;
let page_header = if let Some(page_header) = self.next_page_header.take() {
// The next page header has already been peeked, so use the cached value
page_header
} else {
let page_header = read_page_header(&mut cursor)?;
self.offset += cursor.position() as usize;
page_header
};

let compressed_size = page_header.compressed_page_size as usize;

self.offset += cursor.position() as usize;
let start_offset = self.offset;
let end_offset = self.offset + compressed_size;
self.offset = end_offset;
Expand Down Expand Up @@ -589,11 +599,47 @@ impl PageReader for InMemoryColumnChunkReader {
}

fn peek_next_page(&mut self) -> Result<Option<PageMetadata>> {
Err(nyi_err!("https://github.com/apache/arrow-rs/issues/1792"))
while self.seen_num_values < self.chunk.num_values {
return if let Some(buffered_header) = self.next_page_header.as_ref() {
if let Ok(page_metadata) = buffered_header.try_into() {
Ok(Some(page_metadata))
} else {
// For unknown page type (e.g., INDEX_PAGE), skip and read next.
self.next_page_header = None;
continue;
}
} else {
let mut cursor = Cursor::new(&self.chunk.data.as_ref()[self.offset..]);
let page_header = read_page_header(&mut cursor)?;
self.offset += cursor.position() as usize;

let page_metadata = if let Ok(page_metadata) = (&page_header).try_into() {
Ok(Some(page_metadata))
} else {
// For unknown page type (e.g., INDEX_PAGE), skip and read next.
continue;
};

self.next_page_header = Some(page_header);
page_metadata
};
}

Ok(None)
}

fn skip_next_page(&mut self) -> Result<()> {
Err(nyi_err!("https://github.com/apache/arrow-rs/issues/1792"))
if let Some(buffered_header) = self.next_page_header.take() {
// The next page header has already been peeked, so just advance the offset
self.offset += buffered_header.compressed_page_size as usize;
} else {
let mut cursor = Cursor::new(&self.chunk.data.as_ref()[self.offset..]);
let page_header = read_page_header(&mut cursor)?;
self.offset += cursor.position() as usize;
self.offset += page_header.compressed_page_size as usize;
}

Ok(())
}
}

Expand Down Expand Up @@ -699,4 +745,103 @@ mod tests {
]
);
}

#[tokio::test]
async fn test_in_memory_column_chunk_reader() {
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{}/alltypes_plain.parquet", testdata);
let data = Bytes::from(std::fs::read(path).unwrap());

let metadata = crate::file::footer::parse_metadata(&data).unwrap();

let column_metadata = metadata.row_group(0).column(0);

let (start, length) = column_metadata.byte_range();

let column_data = data.slice(start as usize..(start + length) as usize);

let mut reader = InMemoryColumnChunkReader::new(InMemoryColumnChunk {
num_values: column_metadata.num_values(),
compression: column_metadata.compression(),
physical_type: column_metadata.column_type(),
data: column_data,
})
.expect("building reader");

let first_page = reader
.peek_next_page()
.expect("peeking first page")
.expect("first page is empty");

assert!(first_page.is_dict);
assert_eq!(first_page.num_rows, 0);

let first_page = reader
.get_next_page()
.expect("getting first page")
.expect("first page is empty");

assert_eq!(
first_page.page_type(),
crate::basic::PageType::DICTIONARY_PAGE
);
assert_eq!(first_page.num_values(), 8);

let second_page = reader
.peek_next_page()
.expect("peeking second page")
.expect("second page is empty");

assert!(!second_page.is_dict);
assert_eq!(second_page.num_rows, 8);

let second_page = reader
.get_next_page()
.expect("getting second page")
.expect("second page is empty");

assert_eq!(second_page.page_type(), crate::basic::PageType::DATA_PAGE);
assert_eq!(second_page.num_values(), 8);

let third_page = reader.peek_next_page().expect("getting third page");

assert!(third_page.is_none());

let third_page = reader.get_next_page().expect("getting third page");

assert!(third_page.is_none());
}

#[tokio::test]
async fn test_in_memory_column_chunk_reader_skip_page() {
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{}/alltypes_plain.parquet", testdata);
let data = Bytes::from(std::fs::read(path).unwrap());

let metadata = crate::file::footer::parse_metadata(&data).unwrap();

let column_metadata = metadata.row_group(0).column(0);

let (start, length) = column_metadata.byte_range();

let column_data = data.slice(start as usize..(start + length) as usize);

let mut reader = InMemoryColumnChunkReader::new(InMemoryColumnChunk {
num_values: column_metadata.num_values(),
compression: column_metadata.compression(),
physical_type: column_metadata.column_type(),
data: column_data,
})
.expect("building reader");

reader.skip_next_page().expect("skipping first page");

let second_page = reader
.get_next_page()
.expect("getting second page")
.expect("second page is empty");

assert_eq!(second_page.page_type(), crate::basic::PageType::DATA_PAGE);
assert_eq!(second_page.num_values(), 8);
}
}
28 changes: 27 additions & 1 deletion parquet/src/column/page.rs
Expand Up @@ -18,10 +18,11 @@
//! Contains Parquet Page definitions and page reader interface.

use crate::basic::{Encoding, PageType};
use crate::errors::Result;
use crate::errors::{ParquetError, Result};
use crate::file::{metadata::ColumnChunkMetaData, statistics::Statistics};
use crate::schema::types::{ColumnDescPtr, SchemaDescPtr};
use crate::util::memory::ByteBufferPtr;
use parquet_format::PageHeader;

/// Parquet Page definition.
///
Expand Down Expand Up @@ -196,6 +197,31 @@ pub struct PageMetadata {
pub is_dict: bool,
}

impl TryFrom<&PageHeader> for PageMetadata {
type Error = ParquetError;

fn try_from(value: &PageHeader) -> std::result::Result<Self, Self::Error> {
match value.type_ {
parquet_format::PageType::DataPage => Ok(PageMetadata {
num_rows: value.data_page_header.as_ref().unwrap().num_values as usize,
is_dict: false,
}),
parquet_format::PageType::DictionaryPage => Ok(PageMetadata {
num_rows: usize::MIN,
is_dict: true,
}),
parquet_format::PageType::DataPageV2 => Ok(PageMetadata {
num_rows: value.data_page_header_v2.as_ref().unwrap().num_rows as usize,
is_dict: false,
}),
other => Err(ParquetError::General(format!(
"page type {:?} cannot be converted to PageMetadata",
other
))),
}
}
}

/// API for reading pages from a column chunk.
/// This offers a iterator like API to get the next page.
pub trait PageReader: Iterator<Item = Result<Page>> + Send {
Expand Down