Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify ColumnReader::read_batch #1995

Merged
merged 1 commit into from Jul 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 4 additions & 4 deletions parquet/src/arrow/array_reader/byte_array.rs
Expand Up @@ -113,10 +113,10 @@ impl<I: OffsetSizeTrait + ScalarValue> ArrayReader for ByteArrayReader<I> {

fn next_batch(&mut self, batch_size: usize) -> Result<ArrayRef> {
read_records(&mut self.record_reader, self.pages.as_mut(), batch_size)?;
let buffer = self.record_reader.consume_record_data()?;
let null_buffer = self.record_reader.consume_bitmap_buffer()?;
self.def_levels_buffer = self.record_reader.consume_def_levels()?;
self.rep_levels_buffer = self.record_reader.consume_rep_levels()?;
let buffer = self.record_reader.consume_record_data();
let null_buffer = self.record_reader.consume_bitmap_buffer();
self.def_levels_buffer = self.record_reader.consume_def_levels();
self.rep_levels_buffer = self.record_reader.consume_rep_levels();
self.record_reader.reset();

Ok(buffer.into_array(null_buffer, self.data_type.clone()))
Expand Down
8 changes: 4 additions & 4 deletions parquet/src/arrow/array_reader/byte_array_dictionary.rs
Expand Up @@ -173,12 +173,12 @@ where

fn next_batch(&mut self, batch_size: usize) -> Result<ArrayRef> {
read_records(&mut self.record_reader, self.pages.as_mut(), batch_size)?;
let buffer = self.record_reader.consume_record_data()?;
let null_buffer = self.record_reader.consume_bitmap_buffer()?;
let buffer = self.record_reader.consume_record_data();
let null_buffer = self.record_reader.consume_bitmap_buffer();
let array = buffer.into_array(null_buffer, &self.data_type)?;

self.def_levels_buffer = self.record_reader.consume_def_levels()?;
self.rep_levels_buffer = self.record_reader.consume_rep_levels()?;
self.def_levels_buffer = self.record_reader.consume_def_levels();
self.rep_levels_buffer = self.record_reader.consume_rep_levels();
self.record_reader.reset();

Ok(array)
Expand Down
6 changes: 3 additions & 3 deletions parquet/src/arrow/array_reader/null_array.rs
Expand Up @@ -86,11 +86,11 @@ where
let array = arrow::array::NullArray::new(self.record_reader.num_values());

// save definition and repetition buffers
self.def_levels_buffer = self.record_reader.consume_def_levels()?;
self.rep_levels_buffer = self.record_reader.consume_rep_levels()?;
self.def_levels_buffer = self.record_reader.consume_def_levels();
self.rep_levels_buffer = self.record_reader.consume_rep_levels();

// Must consume bitmap buffer
self.record_reader.consume_bitmap_buffer()?;
self.record_reader.consume_bitmap_buffer();

self.record_reader.reset();
Ok(Arc::new(array))
Expand Down
8 changes: 4 additions & 4 deletions parquet/src/arrow/array_reader/primitive_array.rs
Expand Up @@ -148,7 +148,7 @@ where
// Convert to arrays by using the Parquet physical type.
// The physical types are then cast to Arrow types if necessary

let mut record_data = self.record_reader.consume_record_data()?;
let mut record_data = self.record_reader.consume_record_data();

if T::get_physical_type() == PhysicalType::BOOLEAN {
let mut boolean_buffer = BooleanBufferBuilder::new(record_data.len());
Expand All @@ -162,7 +162,7 @@ where
let array_data = ArrayDataBuilder::new(arrow_data_type)
.len(self.record_reader.num_values())
.add_buffer(record_data)
.null_bit_buffer(self.record_reader.consume_bitmap_buffer()?);
.null_bit_buffer(self.record_reader.consume_bitmap_buffer());

let array_data = unsafe { array_data.build_unchecked() };
let array = match T::get_physical_type() {
Expand Down Expand Up @@ -227,8 +227,8 @@ where
};

// save definition and repetition buffers
self.def_levels_buffer = self.record_reader.consume_def_levels()?;
self.rep_levels_buffer = self.record_reader.consume_rep_levels()?;
self.def_levels_buffer = self.record_reader.consume_def_levels();
self.rep_levels_buffer = self.record_reader.consume_rep_levels();
self.record_reader.reset();
Ok(array)
}
Expand Down
1 change: 0 additions & 1 deletion parquet/src/arrow/arrow_reader.rs
Expand Up @@ -86,7 +86,6 @@ impl ArrowReaderOptions {
///
/// For example:[ARROW-16184](https://issues.apache.org/jira/browse/ARROW-16184)
///

/// Set `skip_arrow_metadata` to true, to skip decoding this
pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self {
Self {
Expand Down
108 changes: 34 additions & 74 deletions parquet/src/arrow/record_reader/mod.rs
Expand Up @@ -218,32 +218,32 @@ where
/// The implementation has side effects. It will create a new buffer to hold those
/// definition level values that have already been read into memory but not counted
/// as record values, e.g. those from `self.num_values` to `self.values_written`.
pub fn consume_def_levels(&mut self) -> Result<Option<Buffer>> {
Ok(match self.def_levels.as_mut() {
pub fn consume_def_levels(&mut self) -> Option<Buffer> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changes in this file are cleanups to make the signature infallible when it always returns Ok, right? (and it is also an API change, and perhaps also fixes clippy)

match self.def_levels.as_mut() {
Some(x) => x.split_levels(self.num_values),
None => None,
})
}
}

/// Return repetition level data.
/// The side effect is similar to `consume_def_levels`.
pub fn consume_rep_levels(&mut self) -> Result<Option<Buffer>> {
Ok(match self.rep_levels.as_mut() {
pub fn consume_rep_levels(&mut self) -> Option<Buffer> {
match self.rep_levels.as_mut() {
Some(x) => Some(x.split_off(self.num_values)),
None => None,
})
}
}

/// Returns currently stored buffer data.
/// The side effect is similar to `consume_def_levels`.
pub fn consume_record_data(&mut self) -> Result<V::Output> {
Ok(self.records.split_off(self.num_values))
pub fn consume_record_data(&mut self) -> V::Output {
self.records.split_off(self.num_values)
}

/// Returns currently stored null bitmap data.
/// The side effect is similar to `consume_def_levels`.
pub fn consume_bitmap_buffer(&mut self) -> Result<Option<Buffer>> {
Ok(self.consume_bitmap()?.map(|b| b.into_buffer()))
pub fn consume_bitmap_buffer(&mut self) -> Option<Buffer> {
self.consume_bitmap().map(|b| b.into_buffer())
}

/// Reset state of record reader.
Expand All @@ -256,11 +256,10 @@ where
}

/// Returns bitmap data.
pub fn consume_bitmap(&mut self) -> Result<Option<Bitmap>> {
Ok(self
.def_levels
pub fn consume_bitmap(&mut self) -> Option<Bitmap> {
self.def_levels
.as_mut()
.map(|levels| levels.split_bitmask(self.num_values)))
.map(|levels| levels.split_bitmask(self.num_values))
}

/// Try to read one batch of data.
Expand Down Expand Up @@ -296,7 +295,7 @@ where
}

let values_read = max(levels_read, values_read);
self.set_values_written(self.values_written + values_read)?;
self.set_values_written(self.values_written + values_read);
Ok(values_read)
}

Expand Down Expand Up @@ -339,8 +338,7 @@ where
}
}

#[allow(clippy::unnecessary_wraps)]
fn set_values_written(&mut self, new_values_written: usize) -> Result<()> {
fn set_values_written(&mut self, new_values_written: usize) {
self.values_written = new_values_written;
self.records.set_len(self.values_written);

Expand All @@ -351,8 +349,6 @@ where
if let Some(ref mut buf) = self.def_levels {
buf.set_len(self.values_written)
};

Ok(())
}
}

Expand All @@ -365,42 +361,15 @@ mod tests {
use arrow::buffer::Buffer;

use crate::basic::Encoding;
use crate::column::page::Page;
use crate::column::page::PageReader;
use crate::data_type::Int32Type;
use crate::errors::Result;
use crate::schema::parser::parse_message_type;
use crate::schema::types::SchemaDescriptor;
use crate::util::test_common::page_util::{DataPageBuilder, DataPageBuilderImpl};
use crate::util::test_common::page_util::{
DataPageBuilder, DataPageBuilderImpl, InMemoryPageReader,
};

use super::RecordReader;

struct TestPageReader {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This just duplicated InMemoryPageReader

pages: Box<dyn Iterator<Item = Page> + Send>,
}

impl TestPageReader {
pub fn new(pages: Vec<Page>) -> Self {
Self {
pages: Box::new(pages.into_iter()),
}
}
}

impl PageReader for TestPageReader {
fn get_next_page(&mut self) -> Result<Option<Page>> {
Ok(self.pages.next())
}
}

impl Iterator for TestPageReader {
type Item = Result<Page>;

fn next(&mut self) -> Option<Self::Item> {
self.get_next_page().transpose()
}
}

#[test]
fn test_read_required_records() {
// Construct column schema
Expand Down Expand Up @@ -436,7 +405,7 @@ mod tests {
pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
let page = pb.consume();

let page_reader = Box::new(TestPageReader::new(vec![page]));
let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
record_reader.set_page_reader(page_reader).unwrap();
assert_eq!(2, record_reader.read_records(2).unwrap());
assert_eq!(2, record_reader.num_records());
Expand All @@ -459,7 +428,7 @@ mod tests {
pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
let page = pb.consume();

let page_reader = Box::new(TestPageReader::new(vec![page]));
let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
record_reader.set_page_reader(page_reader).unwrap();
assert_eq!(2, record_reader.read_records(10).unwrap());
assert_eq!(7, record_reader.num_records());
Expand All @@ -469,12 +438,9 @@ mod tests {
let mut bb = Int32BufferBuilder::new(7);
bb.append_slice(&[4, 7, 6, 3, 2, 8, 9]);
let expected_buffer = bb.finish();
assert_eq!(
expected_buffer,
record_reader.consume_record_data().unwrap()
);
assert_eq!(None, record_reader.consume_def_levels().unwrap());
assert_eq!(None, record_reader.consume_bitmap().unwrap());
assert_eq!(expected_buffer, record_reader.consume_record_data());
assert_eq!(None, record_reader.consume_def_levels());
assert_eq!(None, record_reader.consume_bitmap());
}

#[test]
Expand Down Expand Up @@ -520,7 +486,7 @@ mod tests {
pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
let page = pb.consume();

let page_reader = Box::new(TestPageReader::new(vec![page]));
let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
record_reader.set_page_reader(page_reader).unwrap();
assert_eq!(2, record_reader.read_records(2).unwrap());
assert_eq!(2, record_reader.num_records());
Expand All @@ -546,7 +512,7 @@ mod tests {
pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
let page = pb.consume();

let page_reader = Box::new(TestPageReader::new(vec![page]));
let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
record_reader.set_page_reader(page_reader).unwrap();
assert_eq!(2, record_reader.read_records(10).unwrap());
assert_eq!(7, record_reader.num_records());
Expand All @@ -559,20 +525,17 @@ mod tests {
let expected_def_levels = bb.finish();
assert_eq!(
Some(expected_def_levels),
record_reader.consume_def_levels().unwrap()
record_reader.consume_def_levels()
);

// Verify bitmap
let expected_valid = &[false, true, false, true, true, false, true];
let expected_buffer = Buffer::from_iter(expected_valid.iter().cloned());
let expected_bitmap = Bitmap::from(expected_buffer);
assert_eq!(
Some(expected_bitmap),
record_reader.consume_bitmap().unwrap()
);
assert_eq!(Some(expected_bitmap), record_reader.consume_bitmap());

// Verify result record data
let actual = record_reader.consume_record_data().unwrap();
let actual = record_reader.consume_record_data();
let actual_values = actual.typed_data::<i32>();

let expected = &[0, 7, 0, 6, 3, 0, 8];
Expand Down Expand Up @@ -631,7 +594,7 @@ mod tests {
pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
let page = pb.consume();

let page_reader = Box::new(TestPageReader::new(vec![page]));
let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
record_reader.set_page_reader(page_reader).unwrap();

assert_eq!(1, record_reader.read_records(1).unwrap());
Expand Down Expand Up @@ -659,7 +622,7 @@ mod tests {
pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
let page = pb.consume();

let page_reader = Box::new(TestPageReader::new(vec![page]));
let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
record_reader.set_page_reader(page_reader).unwrap();

assert_eq!(1, record_reader.read_records(10).unwrap());
Expand All @@ -673,20 +636,17 @@ mod tests {
let expected_def_levels = bb.finish();
assert_eq!(
Some(expected_def_levels),
record_reader.consume_def_levels().unwrap()
record_reader.consume_def_levels()
);

// Verify bitmap
let expected_valid = &[true, false, false, true, true, true, true, true, true];
let expected_buffer = Buffer::from_iter(expected_valid.iter().cloned());
let expected_bitmap = Bitmap::from(expected_buffer);
assert_eq!(
Some(expected_bitmap),
record_reader.consume_bitmap().unwrap()
);
assert_eq!(Some(expected_bitmap), record_reader.consume_bitmap());

// Verify result record data
let actual = record_reader.consume_record_data().unwrap();
let actual = record_reader.consume_record_data();
let actual_values = actual.typed_data::<i32>();
let expected = &[4, 0, 0, 7, 6, 3, 2, 8, 9];
assert_eq!(actual_values.len(), expected.len());
Expand Down Expand Up @@ -731,7 +691,7 @@ mod tests {
pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
let page = pb.consume();

let page_reader = Box::new(TestPageReader::new(vec![page]));
let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
record_reader.set_page_reader(page_reader).unwrap();

assert_eq!(1000, record_reader.read_records(1000).unwrap());
Expand Down