Skip to content

Commit

Permalink
Implement peek_next_page and skip_next_page for `InMemoryColumnCh… (
Browse files Browse the repository at this point in the history
#2155)

* Implement `peek_next_page` and `skip_next_page` for `InMemoryColumnChunkReader`

* Conversion for parquet page header to page metadata
  • Loading branch information
thinkharderdev committed Jul 26, 2022
1 parent 37dd037 commit aeb2776
Show file tree
Hide file tree
Showing 2 changed files with 177 additions and 6 deletions.
155 changes: 150 additions & 5 deletions parquet/src/arrow/async_reader.rs
Expand Up @@ -87,7 +87,7 @@ use std::task::{Context, Poll};
use bytes::Bytes;
use futures::future::{BoxFuture, FutureExt};
use futures::stream::Stream;
use parquet_format::PageType;
use parquet_format::{PageHeader, PageType};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};

use arrow::datatypes::SchemaRef;
Expand Down Expand Up @@ -520,6 +520,8 @@ struct InMemoryColumnChunkReader {
decompressor: Option<Box<dyn Codec>>,
offset: usize,
seen_num_values: i64,
// If the next page header has already been "peeked", we will cache it here
next_page_header: Option<PageHeader>,
}

impl InMemoryColumnChunkReader {
Expand All @@ -531,6 +533,7 @@ impl InMemoryColumnChunkReader {
decompressor,
offset: 0,
seen_num_values: 0,
next_page_header: None,
};
Ok(result)
}
Expand All @@ -548,10 +551,17 @@ impl PageReader for InMemoryColumnChunkReader {
fn get_next_page(&mut self) -> Result<Option<Page>> {
while self.seen_num_values < self.chunk.num_values {
let mut cursor = Cursor::new(&self.chunk.data.as_ref()[self.offset..]);
let page_header = read_page_header(&mut cursor)?;
let page_header = if let Some(page_header) = self.next_page_header.take() {
// The next page header has already been peeked, so use the cached value
page_header
} else {
let page_header = read_page_header(&mut cursor)?;
self.offset += cursor.position() as usize;
page_header
};

let compressed_size = page_header.compressed_page_size as usize;

self.offset += cursor.position() as usize;
let start_offset = self.offset;
let end_offset = self.offset + compressed_size;
self.offset = end_offset;
Expand Down Expand Up @@ -589,11 +599,47 @@ impl PageReader for InMemoryColumnChunkReader {
}

fn peek_next_page(&mut self) -> Result<Option<PageMetadata>> {
Err(nyi_err!("https://github.com/apache/arrow-rs/issues/1792"))
while self.seen_num_values < self.chunk.num_values {
return if let Some(buffered_header) = self.next_page_header.as_ref() {
if let Ok(page_metadata) = buffered_header.try_into() {
Ok(Some(page_metadata))
} else {
// For unknown page type (e.g., INDEX_PAGE), skip and read next.
self.next_page_header = None;
continue;
}
} else {
let mut cursor = Cursor::new(&self.chunk.data.as_ref()[self.offset..]);
let page_header = read_page_header(&mut cursor)?;
self.offset += cursor.position() as usize;

let page_metadata = if let Ok(page_metadata) = (&page_header).try_into() {
Ok(Some(page_metadata))
} else {
// For unknown page type (e.g., INDEX_PAGE), skip and read next.
continue;
};

self.next_page_header = Some(page_header);
page_metadata
};
}

Ok(None)
}

fn skip_next_page(&mut self) -> Result<()> {
Err(nyi_err!("https://github.com/apache/arrow-rs/issues/1792"))
if let Some(buffered_header) = self.next_page_header.take() {
// The next page header has already been peeked, so just advance the offset
self.offset += buffered_header.compressed_page_size as usize;
} else {
let mut cursor = Cursor::new(&self.chunk.data.as_ref()[self.offset..]);
let page_header = read_page_header(&mut cursor)?;
self.offset += cursor.position() as usize;
self.offset += page_header.compressed_page_size as usize;
}

Ok(())
}
}

Expand Down Expand Up @@ -699,4 +745,103 @@ mod tests {
]
);
}

#[tokio::test]
async fn test_in_memory_column_chunk_reader() {
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{}/alltypes_plain.parquet", testdata);
let data = Bytes::from(std::fs::read(path).unwrap());

let metadata = crate::file::footer::parse_metadata(&data).unwrap();

let column_metadata = metadata.row_group(0).column(0);

let (start, length) = column_metadata.byte_range();

let column_data = data.slice(start as usize..(start + length) as usize);

let mut reader = InMemoryColumnChunkReader::new(InMemoryColumnChunk {
num_values: column_metadata.num_values(),
compression: column_metadata.compression(),
physical_type: column_metadata.column_type(),
data: column_data,
})
.expect("building reader");

let first_page = reader
.peek_next_page()
.expect("peeking first page")
.expect("first page is empty");

assert!(first_page.is_dict);
assert_eq!(first_page.num_rows, 0);

let first_page = reader
.get_next_page()
.expect("getting first page")
.expect("first page is empty");

assert_eq!(
first_page.page_type(),
crate::basic::PageType::DICTIONARY_PAGE
);
assert_eq!(first_page.num_values(), 8);

let second_page = reader
.peek_next_page()
.expect("peeking second page")
.expect("second page is empty");

assert!(!second_page.is_dict);
assert_eq!(second_page.num_rows, 8);

let second_page = reader
.get_next_page()
.expect("getting second page")
.expect("second page is empty");

assert_eq!(second_page.page_type(), crate::basic::PageType::DATA_PAGE);
assert_eq!(second_page.num_values(), 8);

let third_page = reader.peek_next_page().expect("getting third page");

assert!(third_page.is_none());

let third_page = reader.get_next_page().expect("getting third page");

assert!(third_page.is_none());
}

#[tokio::test]
async fn test_in_memory_column_chunk_reader_skip_page() {
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{}/alltypes_plain.parquet", testdata);
let data = Bytes::from(std::fs::read(path).unwrap());

let metadata = crate::file::footer::parse_metadata(&data).unwrap();

let column_metadata = metadata.row_group(0).column(0);

let (start, length) = column_metadata.byte_range();

let column_data = data.slice(start as usize..(start + length) as usize);

let mut reader = InMemoryColumnChunkReader::new(InMemoryColumnChunk {
num_values: column_metadata.num_values(),
compression: column_metadata.compression(),
physical_type: column_metadata.column_type(),
data: column_data,
})
.expect("building reader");

reader.skip_next_page().expect("skipping first page");

let second_page = reader
.get_next_page()
.expect("getting second page")
.expect("second page is empty");

assert_eq!(second_page.page_type(), crate::basic::PageType::DATA_PAGE);
assert_eq!(second_page.num_values(), 8);
}
}
28 changes: 27 additions & 1 deletion parquet/src/column/page.rs
Expand Up @@ -18,10 +18,11 @@
//! Contains Parquet Page definitions and page reader interface.

use crate::basic::{Encoding, PageType};
use crate::errors::Result;
use crate::errors::{ParquetError, Result};
use crate::file::{metadata::ColumnChunkMetaData, statistics::Statistics};
use crate::schema::types::{ColumnDescPtr, SchemaDescPtr};
use crate::util::memory::ByteBufferPtr;
use parquet_format::PageHeader;

/// Parquet Page definition.
///
Expand Down Expand Up @@ -196,6 +197,31 @@ pub struct PageMetadata {
pub is_dict: bool,
}

impl TryFrom<&PageHeader> for PageMetadata {
type Error = ParquetError;

fn try_from(value: &PageHeader) -> std::result::Result<Self, Self::Error> {
match value.type_ {
parquet_format::PageType::DataPage => Ok(PageMetadata {
num_rows: value.data_page_header.as_ref().unwrap().num_values as usize,
is_dict: false,
}),
parquet_format::PageType::DictionaryPage => Ok(PageMetadata {
num_rows: usize::MIN,
is_dict: true,
}),
parquet_format::PageType::DataPageV2 => Ok(PageMetadata {
num_rows: value.data_page_header_v2.as_ref().unwrap().num_rows as usize,
is_dict: false,
}),
other => Err(ParquetError::General(format!(
"page type {:?} cannot be converted to PageMetadata",
other
))),
}
}
}

/// API for reading pages from a column chunk.
/// This offers a iterator like API to get the next page.
pub trait PageReader: Iterator<Item = Result<Page>> + Send {
Expand Down

0 comments on commit aeb2776

Please sign in to comment.