Skip to content

Commit

Permalink
Simplify null mask preservation in parquet reader (#2116)
Browse files Browse the repository at this point in the history
* Simplify null mask preservation

* Fix benchmarks

* Remove PackedDecoder Option

* Use match expression

* Remove inline from GenericColumnReader::read_batch
  • Loading branch information
tustvold committed Jul 22, 2022
1 parent 5e3facf commit a9fa1b4
Show file tree
Hide file tree
Showing 9 changed files with 173 additions and 203 deletions.
9 changes: 3 additions & 6 deletions parquet/benches/arrow_reader.rs
Expand Up @@ -307,21 +307,19 @@ fn create_primitive_array_reader(
use parquet::arrow::array_reader::PrimitiveArrayReader;
match column_desc.physical_type() {
Type::INT32 => {
let reader = PrimitiveArrayReader::<Int32Type>::new_with_options(
let reader = PrimitiveArrayReader::<Int32Type>::new(
Box::new(page_iterator),
column_desc,
None,
true,
)
.unwrap();
Box::new(reader)
}
Type::INT64 => {
let reader = PrimitiveArrayReader::<Int64Type>::new_with_options(
let reader = PrimitiveArrayReader::<Int64Type>::new(
Box::new(page_iterator),
column_desc,
None,
true,
)
.unwrap();
Box::new(reader)
Expand All @@ -335,7 +333,7 @@ fn create_string_byte_array_reader(
column_desc: ColumnDescPtr,
) -> Box<dyn ArrayReader> {
use parquet::arrow::array_reader::make_byte_array_reader;
make_byte_array_reader(Box::new(page_iterator), column_desc, None, true).unwrap()
make_byte_array_reader(Box::new(page_iterator), column_desc, None).unwrap()
}

fn create_string_byte_array_dictionary_reader(
Expand All @@ -350,7 +348,6 @@ fn create_string_byte_array_dictionary_reader(
Box::new(page_iterator),
column_desc,
Some(arrow_type),
true,
)
.unwrap()
}
Expand Down
18 changes: 5 additions & 13 deletions parquet/src/arrow/array_reader/builder.rs
Expand Up @@ -160,16 +160,14 @@ fn build_primitive_reader(
));

let page_iterator = row_groups.column_chunks(col_idx)?;
let null_mask_only = field.def_level == 1 && field.nullable;
let arrow_type = Some(field.arrow_type.clone());

match physical_type {
PhysicalType::BOOLEAN => Ok(Box::new(
PrimitiveArrayReader::<BoolType>::new_with_options(
PrimitiveArrayReader::<BoolType>::new(
page_iterator,
column_desc,
arrow_type,
null_mask_only,
)?,
)),
PhysicalType::INT32 => {
Expand All @@ -180,21 +178,19 @@ fn build_primitive_reader(
)?))
} else {
Ok(Box::new(
PrimitiveArrayReader::<Int32Type>::new_with_options(
PrimitiveArrayReader::<Int32Type>::new(
page_iterator,
column_desc,
arrow_type,
null_mask_only,
)?,
))
}
}
PhysicalType::INT64 => Ok(Box::new(
PrimitiveArrayReader::<Int64Type>::new_with_options(
PrimitiveArrayReader::<Int64Type>::new(
page_iterator,
column_desc,
arrow_type,
null_mask_only,
)?,
)),
PhysicalType::INT96 => {
Expand All @@ -218,33 +214,29 @@ fn build_primitive_reader(
)?))
}
PhysicalType::FLOAT => Ok(Box::new(
PrimitiveArrayReader::<FloatType>::new_with_options(
PrimitiveArrayReader::<FloatType>::new(
page_iterator,
column_desc,
arrow_type,
null_mask_only,
)?,
)),
PhysicalType::DOUBLE => Ok(Box::new(
PrimitiveArrayReader::<DoubleType>::new_with_options(
PrimitiveArrayReader::<DoubleType>::new(
page_iterator,
column_desc,
arrow_type,
null_mask_only,
)?,
)),
PhysicalType::BYTE_ARRAY => match arrow_type {
Some(DataType::Dictionary(_, _)) => make_byte_array_dictionary_reader(
page_iterator,
column_desc,
arrow_type,
null_mask_only,
),
_ => make_byte_array_reader(
page_iterator,
column_desc,
arrow_type,
null_mask_only,
),
},
PhysicalType::FIXED_LEN_BYTE_ARRAY => match field.arrow_type {
Expand Down
45 changes: 11 additions & 34 deletions parquet/src/arrow/array_reader/byte_array.rs
Expand Up @@ -42,7 +42,6 @@ pub fn make_byte_array_reader(
pages: Box<dyn PageIterator>,
column_desc: ColumnDescPtr,
arrow_type: Option<ArrowType>,
null_mask_only: bool,
) -> Result<Box<dyn ArrayReader>> {
// Check if Arrow type is specified, else create it from Parquet type
let data_type = match arrow_type {
Expand All @@ -54,15 +53,13 @@ pub fn make_byte_array_reader(

match data_type {
ArrowType::Binary | ArrowType::Utf8 => {
let reader =
GenericRecordReader::new_with_options(column_desc, null_mask_only);
let reader = GenericRecordReader::new(column_desc);
Ok(Box::new(ByteArrayReader::<i32>::new(
pages, data_type, reader,
)))
}
ArrowType::LargeUtf8 | ArrowType::LargeBinary => {
let reader =
GenericRecordReader::new_with_options(column_desc, null_mask_only);
let reader = GenericRecordReader::new(column_desc);
Ok(Box::new(ByteArrayReader::<i64>::new(
pages, data_type, reader,
)))
Expand Down Expand Up @@ -127,15 +124,11 @@ impl<I: OffsetSizeTrait + ScalarValue> ArrayReader for ByteArrayReader<I> {
}

fn get_def_levels(&self) -> Option<&[i16]> {
self.def_levels_buffer
.as_ref()
.map(|buf| buf.typed_data())
self.def_levels_buffer.as_ref().map(|buf| buf.typed_data())
}

fn get_rep_levels(&self) -> Option<&[i16]> {
self.rep_levels_buffer
.as_ref()
.map(|buf| buf.typed_data())
self.rep_levels_buffer.as_ref().map(|buf| buf.typed_data())
}
}

Expand Down Expand Up @@ -388,11 +381,8 @@ impl ByteArrayDecoderPlain {
Ok(to_read)
}

pub fn skip(
&mut self,
to_skip: usize,
) -> Result<usize> {
let to_skip = to_skip.min( self.max_remaining_values);
pub fn skip(&mut self, to_skip: usize) -> Result<usize> {
let to_skip = to_skip.min(self.max_remaining_values);
let mut skip = 0;
let buf = self.buf.as_ref();

Expand Down Expand Up @@ -478,10 +468,7 @@ impl ByteArrayDecoderDeltaLength {
Ok(to_read)
}

fn skip(
&mut self,
to_skip: usize,
) -> Result<usize> {
fn skip(&mut self, to_skip: usize) -> Result<usize> {
let remain_values = self.lengths.len() - self.length_offset;
let to_skip = remain_values.min(to_skip);

Expand Down Expand Up @@ -583,10 +570,7 @@ impl ByteArrayDecoderDelta {
Ok(to_read)
}

fn skip(
&mut self,
to_skip: usize,
) -> Result<usize> {
fn skip(&mut self, to_skip: usize) -> Result<usize> {
let to_skip = to_skip.min(self.prefix_lengths.len() - self.length_offset);

let length_range = self.length_offset..self.length_offset + to_skip;
Expand Down Expand Up @@ -704,8 +688,8 @@ impl ByteArrayDecoderDictionary {
self.index_offset = 0;
}

let skip = (to_skip - values_skip)
.min(self.index_buf_len - self.index_offset);
let skip =
(to_skip - values_skip).min(self.index_buf_len - self.index_offset);

self.index_offset += skip;
self.max_remaining_values -= skip;
Expand Down Expand Up @@ -816,14 +800,7 @@ mod tests {

assert_eq!(
strings.iter().collect::<Vec<_>>(),
vec![
None,
None,
Some("hello"),
Some("b"),
None,
None,
]
vec![None, None, Some("hello"), Some("b"), None, None,]
);
}
}
Expand Down
10 changes: 3 additions & 7 deletions parquet/src/arrow/array_reader/byte_array_dictionary.rs
Expand Up @@ -44,17 +44,14 @@ use crate::util::memory::ByteBufferPtr;
/// A macro to reduce verbosity of [`make_byte_array_dictionary_reader`]
macro_rules! make_reader {
(
($pages:expr, $column_desc:expr, $data_type:expr, $null_mask_only:expr) => match ($k:expr, $v:expr) {
($pages:expr, $column_desc:expr, $data_type:expr) => match ($k:expr, $v:expr) {
$(($key_arrow:pat, $value_arrow:pat) => ($key_type:ty, $value_type:ty),)+
}
) => {
match (($k, $v)) {
$(
($key_arrow, $value_arrow) => {
let reader = GenericRecordReader::new_with_options(
$column_desc,
$null_mask_only,
);
let reader = GenericRecordReader::new($column_desc);
Ok(Box::new(ByteArrayDictionaryReader::<$key_type, $value_type>::new(
$pages, $data_type, reader,
)))
Expand Down Expand Up @@ -84,7 +81,6 @@ pub fn make_byte_array_dictionary_reader(
pages: Box<dyn PageIterator>,
column_desc: ColumnDescPtr,
arrow_type: Option<ArrowType>,
null_mask_only: bool,
) -> Result<Box<dyn ArrayReader>> {
// Check if Arrow type is specified, else create it from Parquet type
let data_type = match arrow_type {
Expand All @@ -97,7 +93,7 @@ pub fn make_byte_array_dictionary_reader(
match &data_type {
ArrowType::Dictionary(key_type, value_type) => {
make_reader! {
(pages, column_desc, data_type, null_mask_only) => match (key_type.as_ref(), value_type.as_ref()) {
(pages, column_desc, data_type) => match (key_type.as_ref(), value_type.as_ref()) {
(ArrowType::UInt8, ArrowType::Binary | ArrowType::Utf8) => (u8, i32),
(ArrowType::UInt8, ArrowType::LargeBinary | ArrowType::LargeUtf8) => (u8, i64),
(ArrowType::Int8, ArrowType::Binary | ArrowType::Utf8) => (i8, i32),
Expand Down
14 changes: 1 addition & 13 deletions parquet/src/arrow/array_reader/primitive_array.rs
Expand Up @@ -58,17 +58,6 @@ where
pages: Box<dyn PageIterator>,
column_desc: ColumnDescPtr,
arrow_type: Option<ArrowType>,
) -> Result<Self> {
Self::new_with_options(pages, column_desc, arrow_type, false)
}

/// Construct primitive array reader with ability to only compute null mask and not
/// buffer level data
pub fn new_with_options(
pages: Box<dyn PageIterator>,
column_desc: ColumnDescPtr,
arrow_type: Option<ArrowType>,
null_mask_only: bool,
) -> Result<Self> {
// Check if Arrow type is specified, else create it from Parquet type
let data_type = match arrow_type {
Expand All @@ -78,8 +67,7 @@ where
.clone(),
};

let record_reader =
RecordReader::<T>::new_with_options(column_desc.clone(), null_mask_only);
let record_reader = RecordReader::<T>::new(column_desc.clone());

Ok(Self {
data_type,
Expand Down

0 comments on commit a9fa1b4

Please sign in to comment.