Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix StructArrayReader handling nested lists (#1651) #1700

Merged
merged 2 commits into from May 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
167 changes: 103 additions & 64 deletions parquet/src/arrow/array_reader.rs
Expand Up @@ -16,19 +16,17 @@
// under the License.

use std::any::Any;
use std::cmp::{max, min};
use std::cmp::max;
use std::marker::PhantomData;
use std::mem::size_of;
use std::result::Result::Ok;
use std::sync::Arc;
use std::vec::Vec;

use arrow::array::{
Array, ArrayData, ArrayDataBuilder, ArrayRef, BooleanArray, BooleanBufferBuilder,
DecimalArray, Int16BufferBuilder, Int32Array, Int64Array, PrimitiveArray,
StructArray,
DecimalArray, Int32Array, Int64Array, PrimitiveArray, StructArray,
};
use arrow::buffer::{Buffer, MutableBuffer};
use arrow::buffer::Buffer;
use arrow::datatypes::{
ArrowPrimitiveType, BooleanType as ArrowBooleanType, DataType as ArrowType,
Float32Type as ArrowFloat32Type, Float64Type as ArrowFloat64Type,
Expand Down Expand Up @@ -655,8 +653,7 @@ pub struct StructArrayReader {
data_type: ArrowType,
struct_def_level: i16,
struct_rep_level: i16,
def_level_buffer: Option<Buffer>,
rep_level_buffer: Option<Buffer>,
nullable: bool,
}

impl StructArrayReader {
Expand All @@ -666,14 +663,14 @@ impl StructArrayReader {
children: Vec<Box<dyn ArrayReader>>,
def_level: i16,
rep_level: i16,
nullable: bool,
) -> Self {
Self {
data_type,
children,
struct_def_level: def_level,
struct_rep_level: rep_level,
def_level_buffer: None,
rep_level_buffer: None,
nullable,
}
}
}
Expand Down Expand Up @@ -708,8 +705,6 @@ impl ArrayReader for StructArrayReader {
/// ```
fn next_batch(&mut self, batch_size: usize) -> Result<ArrayRef> {
if self.children.is_empty() {
self.def_level_buffer = None;
self.rep_level_buffer = None;
return Ok(Arc::new(StructArray::from(Vec::new())));
}

Expand Down Expand Up @@ -742,80 +737,59 @@ impl ArrayReader for StructArrayReader {
.collect::<Vec<ArrayData>>(),
);

if self.struct_def_level != 0 {
if self.nullable {
// calculate struct def level data
let buffer_size = children_array_len * size_of::<i16>();
let mut def_level_data_buffer = MutableBuffer::new(buffer_size);
def_level_data_buffer.resize(buffer_size, 0);

// Safety: the buffer is always treated as `u16` in the code below
let def_level_data = unsafe { def_level_data_buffer.typed_data_mut() };
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for removing some unsafe

// children should have consistent view of parent, only need to inspect first child
let def_levels = self.children[0]
.get_def_levels()
.expect("child with nullable parents must have definition level");

def_level_data
.iter_mut()
.for_each(|v| *v = self.struct_def_level);
// calculate bitmap for current array
let mut bitmap_builder = BooleanBufferBuilder::new(children_array_len);

for child in &self.children {
if let Some(current_child_def_levels) = child.get_def_levels() {
if current_child_def_levels.len() != children_array_len {
return Err(general_err!("Child array length are not equal!"));
} else {
for i in 0..children_array_len {
def_level_data[i] =
min(def_level_data[i], current_child_def_levels[i]);
match self.children[0].get_rep_levels() {
Some(rep_levels) => {
// Sanity check
assert_eq!(rep_levels.len(), def_levels.len());

for (rep_level, def_level) in rep_levels.iter().zip(def_levels) {
if rep_level > &self.struct_rep_level {
// Already handled by inner list - SKIP
continue;
}
bitmap_builder.append(*def_level >= self.struct_def_level)
}
}
None => {
for def_level in def_levels {
bitmap_builder.append(*def_level >= self.struct_def_level)
}
}
}

// calculate bitmap for current array
let mut bitmap_builder = BooleanBufferBuilder::new(children_array_len);
for def_level in def_level_data {
let not_null = *def_level >= self.struct_def_level;
bitmap_builder.append(not_null);
if bitmap_builder.len() != children_array_len {
return Err(general_err!("Failed to decode level data for struct array"));
}

array_data_builder =
array_data_builder.null_bit_buffer(bitmap_builder.finish());

self.def_level_buffer = Some(def_level_data_buffer.into());
}

let array_data = unsafe { array_data_builder.build_unchecked() };

if self.struct_rep_level != 0 {
// calculate struct rep level data, since struct doesn't add to repetition
// levels, here we just need to keep repetition levels of first array
// TODO: Verify that all children array reader has same repetition levels
let rep_level_data = self
.children
.first()
.ok_or_else(|| {
general_err!("Struct array reader should have at least one child!")
})?
.get_rep_levels()
.map(|data| -> Result<Buffer> {
let mut buffer = Int16BufferBuilder::new(children_array_len);
buffer.append_slice(data);
Ok(buffer.finish())
})
.transpose()?;

self.rep_level_buffer = rep_level_data;
}
Ok(Arc::new(StructArray::from(array_data)))
}

fn get_def_levels(&self) -> Option<&[i16]> {
self.def_level_buffer
.as_ref()
.map(|buf| unsafe { buf.typed_data() })
// Children definition levels should describe the same
// parent structure, so return first child's
self.children.first().and_then(|l| l.get_def_levels())
}

fn get_rep_levels(&self) -> Option<&[i16]> {
self.rep_level_buffer
.as_ref()
.map(|buf| unsafe { buf.typed_data() })
// Children definition levels should describe the same
// parent structure, so return first child's
self.children.first().and_then(|l| l.get_rep_levels())
}
}

Expand All @@ -828,7 +802,9 @@ mod tests {
use rand::{thread_rng, Rng};

use crate::arrow::array_reader::test_util::InMemoryArrayReader;
use arrow::array::{Array, ArrayRef, PrimitiveArray, StringArray, StructArray};
use arrow::array::{
Array, ArrayRef, ListArray, PrimitiveArray, StringArray, StructArray,
};
use arrow::datatypes::{
ArrowPrimitiveType, DataType as ArrowType, Date32Type as ArrowDate32, Field,
Int32Type as ArrowInt32, Int64Type as ArrowInt64,
Expand Down Expand Up @@ -1551,6 +1527,7 @@ mod tests {
vec![Box::new(array_reader_1), Box::new(array_reader_2)],
1,
1,
true,
);

let struct_array = struct_array_reader.next_batch(5).unwrap();
Expand All @@ -1564,12 +1541,74 @@ mod tests {
.collect::<Vec<bool>>()
);
assert_eq!(
Some(vec![0, 1, 1, 1, 1].as_slice()),
Some(vec![0, 1, 2, 3, 1].as_slice()),
struct_array_reader.get_def_levels()
);
assert_eq!(
Some(vec![0, 1, 1, 1, 1].as_slice()),
struct_array_reader.get_rep_levels()
);
}

#[test]
fn test_struct_array_reader_list() {
use arrow::datatypes::Int32Type;
// [
// {foo: [1, 2, null],
// {foo: []},
// {foo: null},
// null,
// ]

let expected_l =
Arc::new(ListArray::from_iter_primitive::<Int32Type, _, _>(vec![
Some(vec![Some(1), Some(2), None]),
Some(vec![]),
None,
None,
]));

let validity = Buffer::from([0b00000111]);
let struct_fields = vec![(
Field::new("foo", expected_l.data_type().clone(), true),
expected_l.clone() as ArrayRef,
)];
let expected = StructArray::from((struct_fields, validity));

let array = Arc::new(Int32Array::from_iter(vec![
Some(1),
Some(2),
None,
None,
None,
None,
]));
let reader = InMemoryArrayReader::new(
ArrowType::Int32,
array,
Some(vec![4, 4, 3, 2, 1, 0]),
Some(vec![0, 1, 1, 0, 0, 0]),
);

let list_reader = ListArrayReader::<i32>::new(
Box::new(reader),
expected_l.data_type().clone(),
ArrowType::Int32,
3,
1,
true,
);

let mut struct_reader = StructArrayReader::new(
expected.data_type().clone(),
vec![Box::new(list_reader)],
1,
0,
true,
);

let actual = struct_reader.next_batch(1024).unwrap();
let actual = actual.as_any().downcast_ref::<StructArray>().unwrap();
assert_eq!(actual, &expected)
}
}
1 change: 1 addition & 0 deletions parquet/src/arrow/array_reader/builder.rs
Expand Up @@ -326,6 +326,7 @@ fn build_struct_reader(
children_reader,
field.def_level,
field.rep_level,
field.nullable,
)) as _)
}

Expand Down
18 changes: 7 additions & 11 deletions parquet/src/arrow/array_reader/map_array.rs
Expand Up @@ -17,7 +17,7 @@

use crate::arrow::array_reader::ArrayReader;
use crate::errors::ParquetError::ArrowError;
use crate::errors::{Result, ParquetError};
use crate::errors::{ParquetError, Result};
use arrow::array::{ArrayDataBuilder, ArrayRef, MapArray};
use arrow::buffer::{Buffer, MutableBuffer};
use arrow::datatypes::DataType as ArrowType;
Expand All @@ -33,8 +33,6 @@ pub struct MapArrayReader {
data_type: ArrowType,
map_def_level: i16,
map_rep_level: i16,
def_level_buffer: Option<Buffer>,
rep_level_buffer: Option<Buffer>,
}

impl MapArrayReader {
Expand All @@ -51,8 +49,6 @@ impl MapArrayReader {
data_type,
map_def_level: rep_level,
map_rep_level: def_level,
def_level_buffer: None,
rep_level_buffer: None,
}
}
}
Expand Down Expand Up @@ -154,15 +150,15 @@ impl ArrayReader for MapArrayReader {
}

fn get_def_levels(&self) -> Option<&[i16]> {
self.def_level_buffer
.as_ref()
.map(|buf| unsafe { buf.typed_data() })
// Children definition levels should describe the same parent structure,
// so return key_reader only
self.key_reader.get_def_levels()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Drive by fix, part of #1699

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am a big fan of less unsafe 👍

}

fn get_rep_levels(&self) -> Option<&[i16]> {
self.rep_level_buffer
.as_ref()
.map(|buf| unsafe { buf.typed_data() })
// Children repetition levels should describe the same parent structure,
// so return key_reader only
self.key_reader.get_rep_levels()
}
}

Expand Down