From f679322a204e6f8bfa35ce2023e03d66eb213e2d Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Fri, 13 May 2022 14:38:31 +0100 Subject: [PATCH 1/2] Fix StructArrayReader handling nested lists (#1651) --- parquet/src/arrow/array_reader.rs | 167 ++++++++++++-------- parquet/src/arrow/array_reader/builder.rs | 1 + parquet/src/arrow/array_reader/map_array.rs | 18 +-- 3 files changed, 111 insertions(+), 75 deletions(-) diff --git a/parquet/src/arrow/array_reader.rs b/parquet/src/arrow/array_reader.rs index d2250f8efbf..c1e27d3ccdb 100644 --- a/parquet/src/arrow/array_reader.rs +++ b/parquet/src/arrow/array_reader.rs @@ -16,19 +16,17 @@ // under the License. use std::any::Any; -use std::cmp::{max, min}; +use std::cmp::max; use std::marker::PhantomData; -use std::mem::size_of; use std::result::Result::Ok; use std::sync::Arc; use std::vec::Vec; use arrow::array::{ Array, ArrayData, ArrayDataBuilder, ArrayRef, BooleanArray, BooleanBufferBuilder, - DecimalArray, Int16BufferBuilder, Int32Array, Int64Array, PrimitiveArray, - StructArray, + DecimalArray, Int32Array, Int64Array, PrimitiveArray, StructArray, }; -use arrow::buffer::{Buffer, MutableBuffer}; +use arrow::buffer::Buffer; use arrow::datatypes::{ ArrowPrimitiveType, BooleanType as ArrowBooleanType, DataType as ArrowType, Float32Type as ArrowFloat32Type, Float64Type as ArrowFloat64Type, @@ -655,8 +653,7 @@ pub struct StructArrayReader { data_type: ArrowType, struct_def_level: i16, struct_rep_level: i16, - def_level_buffer: Option, - rep_level_buffer: Option, + nullable: bool, } impl StructArrayReader { @@ -666,14 +663,14 @@ impl StructArrayReader { children: Vec>, def_level: i16, rep_level: i16, + nullable: bool, ) -> Self { Self { data_type, children, struct_def_level: def_level, struct_rep_level: rep_level, - def_level_buffer: None, - rep_level_buffer: None, + nullable, } } } @@ -708,8 +705,6 @@ impl ArrayReader for StructArrayReader { /// ``` fn next_batch(&mut self, batch_size: usize) -> Result { if self.children.is_empty() { - self.def_level_buffer = None; - self.rep_level_buffer = None; return Ok(Arc::new(StructArray::from(Vec::new()))); } @@ -742,80 +737,59 @@ impl ArrayReader for StructArrayReader { .collect::>(), ); - if self.struct_def_level != 0 { + if self.nullable { // calculate struct def level data - let buffer_size = children_array_len * size_of::(); - let mut def_level_data_buffer = MutableBuffer::new(buffer_size); - def_level_data_buffer.resize(buffer_size, 0); - // Safety: the buffer is always treated as `u16` in the code below - let def_level_data = unsafe { def_level_data_buffer.typed_data_mut() }; + // children should have consistent view of parent, only need to inspect first child + let def_levels = self.children[0] + .get_def_levels() + .expect("child with nullable parents must have definition level"); - def_level_data - .iter_mut() - .for_each(|v| *v = self.struct_def_level); + // calculate bitmap for current array + let mut bitmap_builder = BooleanBufferBuilder::new(children_array_len); - for child in &self.children { - if let Some(current_child_def_levels) = child.get_def_levels() { - if current_child_def_levels.len() != children_array_len { - return Err(general_err!("Child array length are not equal!")); - } else { - for i in 0..children_array_len { - def_level_data[i] = - min(def_level_data[i], current_child_def_levels[i]); + match self.children[0].get_rep_levels() { + Some(rep_levels) => { + // Sanity check + assert_eq!(rep_levels.len(), def_levels.len()); + + for (rep_level, def_level) in rep_levels.iter().zip(def_levels) { + if rep_level > &self.struct_rep_level { + // Already handled by inner list - SKIP + continue; } + bitmap_builder.append(*def_level >= self.struct_def_level) + } + } + None => { + for def_level in def_levels { + bitmap_builder.append(*def_level >= self.struct_def_level) } } } - // calculate bitmap for current array - let mut bitmap_builder = BooleanBufferBuilder::new(children_array_len); - for def_level in def_level_data { - let not_null = *def_level >= self.struct_def_level; - bitmap_builder.append(not_null); + if bitmap_builder.len() != children_array_len { + return Err(general_err!("Failed to decode level data for struct array")); } array_data_builder = array_data_builder.null_bit_buffer(bitmap_builder.finish()); - - self.def_level_buffer = Some(def_level_data_buffer.into()); } let array_data = unsafe { array_data_builder.build_unchecked() }; - - if self.struct_rep_level != 0 { - // calculate struct rep level data, since struct doesn't add to repetition - // levels, here we just need to keep repetition levels of first array - // TODO: Verify that all children array reader has same repetition levels - let rep_level_data = self - .children - .first() - .ok_or_else(|| { - general_err!("Struct array reader should have at least one child!") - })? - .get_rep_levels() - .map(|data| -> Result { - let mut buffer = Int16BufferBuilder::new(children_array_len); - buffer.append_slice(data); - Ok(buffer.finish()) - }) - .transpose()?; - - self.rep_level_buffer = rep_level_data; - } Ok(Arc::new(StructArray::from(array_data))) } fn get_def_levels(&self) -> Option<&[i16]> { - self.def_level_buffer - .as_ref() - .map(|buf| unsafe { buf.typed_data() }) + // Children definition levels should describe the same + // parent structure, so return first child's + self.children.first().and_then(|l| l.get_def_levels()) } fn get_rep_levels(&self) -> Option<&[i16]> { - self.rep_level_buffer - .as_ref() - .map(|buf| unsafe { buf.typed_data() }) + // Children definition levels should describe the same + // parent structure, so return first child's + self.children.first().and_then(|l| l.get_rep_levels()) } } @@ -828,7 +802,9 @@ mod tests { use rand::{thread_rng, Rng}; use crate::arrow::array_reader::test_util::InMemoryArrayReader; - use arrow::array::{Array, ArrayRef, PrimitiveArray, StringArray, StructArray}; + use arrow::array::{ + Array, ArrayRef, ListArray, PrimitiveArray, StringArray, StructArray, + }; use arrow::datatypes::{ ArrowPrimitiveType, DataType as ArrowType, Date32Type as ArrowDate32, Field, Int32Type as ArrowInt32, Int64Type as ArrowInt64, @@ -1551,6 +1527,7 @@ mod tests { vec![Box::new(array_reader_1), Box::new(array_reader_2)], 1, 1, + true, ); let struct_array = struct_array_reader.next_batch(5).unwrap(); @@ -1564,7 +1541,7 @@ mod tests { .collect::>() ); assert_eq!( - Some(vec![0, 1, 1, 1, 1].as_slice()), + Some(vec![0, 1, 2, 3, 1].as_slice()), struct_array_reader.get_def_levels() ); assert_eq!( @@ -1572,4 +1549,66 @@ mod tests { struct_array_reader.get_rep_levels() ); } + + #[test] + fn test_struct_array_reader_list() { + use arrow::datatypes::Int32Type; + // [ + // {foo: [1, 2, null], + // {foo: []}, + // {foo: null}, + // null, + // ] + + let expected_l = + Arc::new(ListArray::from_iter_primitive::(vec![ + Some(vec![Some(1), Some(2), None]), + Some(vec![]), + None, + None, + ])); + + let nulls = Buffer::from([0b00000111]); + let struct_fields = vec![( + Field::new("foo", expected_l.data_type().clone(), true), + expected_l.clone() as ArrayRef, + )]; + let expected = StructArray::from((struct_fields, nulls)); + + let array = Arc::new(Int32Array::from_iter(vec![ + Some(1), + Some(2), + None, + None, + None, + None, + ])); + let reader = InMemoryArrayReader::new( + ArrowType::Int32, + array, + Some(vec![4, 4, 3, 2, 1, 0]), + Some(vec![0, 1, 1, 0, 0, 0]), + ); + + let list_reader = ListArrayReader::::new( + Box::new(reader), + expected_l.data_type().clone(), + ArrowType::Int32, + 3, + 1, + true, + ); + + let mut struct_reader = StructArrayReader::new( + expected.data_type().clone(), + vec![Box::new(list_reader)], + 1, + 0, + true, + ); + + let actual = struct_reader.next_batch(1024).unwrap(); + let actual = actual.as_any().downcast_ref::().unwrap(); + assert_eq!(actual, &expected) + } } diff --git a/parquet/src/arrow/array_reader/builder.rs b/parquet/src/arrow/array_reader/builder.rs index ab58006721a..496af52ed2b 100644 --- a/parquet/src/arrow/array_reader/builder.rs +++ b/parquet/src/arrow/array_reader/builder.rs @@ -326,6 +326,7 @@ fn build_struct_reader( children_reader, field.def_level, field.rep_level, + field.nullable, )) as _) } diff --git a/parquet/src/arrow/array_reader/map_array.rs b/parquet/src/arrow/array_reader/map_array.rs index 01038868653..2c9f037abb2 100644 --- a/parquet/src/arrow/array_reader/map_array.rs +++ b/parquet/src/arrow/array_reader/map_array.rs @@ -17,7 +17,7 @@ use crate::arrow::array_reader::ArrayReader; use crate::errors::ParquetError::ArrowError; -use crate::errors::{Result, ParquetError}; +use crate::errors::{ParquetError, Result}; use arrow::array::{ArrayDataBuilder, ArrayRef, MapArray}; use arrow::buffer::{Buffer, MutableBuffer}; use arrow::datatypes::DataType as ArrowType; @@ -33,8 +33,6 @@ pub struct MapArrayReader { data_type: ArrowType, map_def_level: i16, map_rep_level: i16, - def_level_buffer: Option, - rep_level_buffer: Option, } impl MapArrayReader { @@ -51,8 +49,6 @@ impl MapArrayReader { data_type, map_def_level: rep_level, map_rep_level: def_level, - def_level_buffer: None, - rep_level_buffer: None, } } } @@ -154,15 +150,15 @@ impl ArrayReader for MapArrayReader { } fn get_def_levels(&self) -> Option<&[i16]> { - self.def_level_buffer - .as_ref() - .map(|buf| unsafe { buf.typed_data() }) + // Children definition levels should describe the same parent structure, + // so return key_reader only + self.key_reader.get_def_levels() } fn get_rep_levels(&self) -> Option<&[i16]> { - self.rep_level_buffer - .as_ref() - .map(|buf| unsafe { buf.typed_data() }) + // Children repetition levels should describe the same parent structure, + // so return key_reader only + self.key_reader.get_rep_levels() } } From 536e599ba57c06f755f1e3a29becbe8e17c2d7f8 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Wed, 18 May 2022 20:03:11 +0100 Subject: [PATCH 2/2] Rename nulls to validity --- parquet/src/arrow/array_reader.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/parquet/src/arrow/array_reader.rs b/parquet/src/arrow/array_reader.rs index c1e27d3ccdb..12c9ca522e2 100644 --- a/parquet/src/arrow/array_reader.rs +++ b/parquet/src/arrow/array_reader.rs @@ -1568,12 +1568,12 @@ mod tests { None, ])); - let nulls = Buffer::from([0b00000111]); + let validity = Buffer::from([0b00000111]); let struct_fields = vec![( Field::new("foo", expected_l.data_type().clone(), true), expected_l.clone() as ArrayRef, )]; - let expected = StructArray::from((struct_fields, nulls)); + let expected = StructArray::from((struct_fields, validity)); let array = Arc::new(Int32Array::from_iter(vec![ Some(1),