From 16ad70ff853f10954dc1e3e19da8468360028e99 Mon Sep 17 00:00:00 2001 From: yangjiang Date: Fri, 29 Jul 2022 19:11:04 +0800 Subject: [PATCH 1/6] replace ArrayReader::next_batch with ArrayReader::read_records and ArrayReader::consume_batch. --- parquet/src/arrow/array_reader/byte_array.rs | 20 ++++++- .../array_reader/byte_array_dictionary.rs | 21 ++++++- .../array_reader/complex_object_array.rs | 57 ++++++++++++++++--- parquet/src/arrow/array_reader/empty_array.rs | 11 +++- parquet/src/arrow/array_reader/list_array.rs | 11 +++- parquet/src/arrow/array_reader/map_array.rs | 21 ++++++- parquet/src/arrow/array_reader/mod.rs | 12 +++- parquet/src/arrow/array_reader/null_array.rs | 11 +++- .../src/arrow/array_reader/primitive_array.rs | 11 +++- .../src/arrow/array_reader/struct_array.rs | 27 ++++++++- parquet/src/arrow/array_reader/test_util.rs | 13 ++++- 11 files changed, 190 insertions(+), 25 deletions(-) diff --git a/parquet/src/arrow/array_reader/byte_array.rs b/parquet/src/arrow/array_reader/byte_array.rs index ec4188890ef..9b293dfadbf 100644 --- a/parquet/src/arrow/array_reader/byte_array.rs +++ b/parquet/src/arrow/array_reader/byte_array.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use crate::arrow::array_reader::{read_records, skip_records, ArrayReader}; +use crate::arrow::array_reader::{read_records_inner, skip_records, ArrayReader}; use crate::arrow::buffer::offset_buffer::OffsetBuffer; use crate::arrow::record_reader::buffer::ScalarValue; use crate::arrow::record_reader::GenericRecordReader; @@ -109,7 +109,23 @@ impl ArrayReader for ByteArrayReader { } fn next_batch(&mut self, batch_size: usize) -> Result { - read_records(&mut self.record_reader, self.pages.as_mut(), batch_size)?; + let size = self.read_records(batch_size)?; + self.consume_batch(size) + } + + fn read_records(&mut self, batch_size: usize) -> Result { + read_records_inner(&mut self.record_reader, self.pages.as_mut(), batch_size) + } + + fn consume_batch(&mut self, batch_size: usize) -> Result { + if self.record_reader.num_records() < batch_size { + return Err(general_err!( + "Invalid batch_size: {}, current consume: {} records in buffer.", + batch_size, + self.record_reader.num_records() + )); + } + let buffer = self.record_reader.consume_record_data(); let null_buffer = self.record_reader.consume_bitmap_buffer(); self.def_levels_buffer = self.record_reader.consume_def_levels(); diff --git a/parquet/src/arrow/array_reader/byte_array_dictionary.rs b/parquet/src/arrow/array_reader/byte_array_dictionary.rs index 51ef38d0d07..88a8b69efbd 100644 --- a/parquet/src/arrow/array_reader/byte_array_dictionary.rs +++ b/parquet/src/arrow/array_reader/byte_array_dictionary.rs @@ -25,7 +25,7 @@ use arrow::buffer::Buffer; use arrow::datatypes::{ArrowNativeType, DataType as ArrowType}; use crate::arrow::array_reader::byte_array::{ByteArrayDecoder, ByteArrayDecoderPlain}; -use crate::arrow::array_reader::{read_records, ArrayReader, skip_records}; +use crate::arrow::array_reader::{read_records_inner, skip_records, ArrayReader}; use crate::arrow::buffer::{ dictionary_buffer::DictionaryBuffer, offset_buffer::OffsetBuffer, }; @@ -168,7 +168,24 @@ where } fn next_batch(&mut self, batch_size: usize) -> Result { - read_records(&mut self.record_reader, self.pages.as_mut(), batch_size)?; + let size = self.read_records(batch_size)?; + self.consume_batch(size) + } + + fn read_records(&mut self, batch_size: usize) -> Result { + read_records_inner(&mut self.record_reader, self.pages.as_mut(), batch_size) + } + + fn consume_batch(&mut self, batch_size: usize) -> Result { + let num_records = self.record_reader.num_records(); + if num_records < batch_size { + return Err(general_err!( + "Invalid batch_size: {}, current consume: {} records in buffer.", + batch_size, + num_records + )); + } + let buffer = self.record_reader.consume_record_data(); let null_buffer = self.record_reader.consume_bitmap_buffer(); let array = buffer.into_array(null_buffer, &self.data_type)?; diff --git a/parquet/src/arrow/array_reader/complex_object_array.rs b/parquet/src/arrow/array_reader/complex_object_array.rs index 1390866cf6a..9ed11f15c12 100644 --- a/parquet/src/arrow/array_reader/complex_object_array.rs +++ b/parquet/src/arrow/array_reader/complex_object_array.rs @@ -21,7 +21,7 @@ use crate::arrow::schema::parquet_to_arrow_field; use crate::column::page::PageIterator; use crate::column::reader::ColumnReaderImpl; use crate::data_type::DataType; -use crate::errors::Result; +use crate::errors::{ParquetError, Result}; use crate::schema::types::ColumnDescPtr; use arrow::array::ArrayRef; use arrow::datatypes::DataType as ArrowType; @@ -39,6 +39,7 @@ where pages: Box, def_levels_buffer: Option>, rep_levels_buffer: Option>, + data_buffer: Vec, column_desc: ColumnDescPtr, column_reader: Option>, converter: C, @@ -60,6 +61,11 @@ where } fn next_batch(&mut self, batch_size: usize) -> Result { + let size = self.read_records(batch_size)?; + self.consume_batch(size) + } + + fn read_records(&mut self, batch_size: usize) -> Result { // Try to initialize column reader if self.column_reader.is_none() { self.next_column_reader()?; @@ -126,7 +132,6 @@ where break; } } - data_buffer.truncate(num_read); def_levels_buffer .iter_mut() @@ -135,23 +140,54 @@ where .iter_mut() .for_each(|buf| buf.truncate(num_read)); - self.def_levels_buffer = def_levels_buffer; - self.rep_levels_buffer = rep_levels_buffer; + if let Some(mut def_levels_buffer) = def_levels_buffer { + match &mut self.def_levels_buffer { + None => { + self.def_levels_buffer = Some(def_levels_buffer); + } + Some(buf) => buf.append(&mut def_levels_buffer), + } + } + + if let Some(mut rep_levels_buffer) = rep_levels_buffer { + match &mut self.rep_levels_buffer { + None => { + self.rep_levels_buffer = Some(rep_levels_buffer); + } + Some(buf) => buf.append(&mut rep_levels_buffer), + } + } + + self.data_buffer.append(&mut data_buffer); + + Ok(num_read) + } + + fn consume_batch(&mut self, batch_size: usize) -> Result { + // uncheck null count + let len = self.data_buffer.len(); + if len < batch_size { + return Err(general_err!( + "Invalid batch_size: {}, current consume: {} records in buffer.", + batch_size, + len + )); + } let data: Vec> = if self.def_levels_buffer.is_some() { - data_buffer - .into_iter() + self.data_buffer + .iter() .zip(self.def_levels_buffer.as_ref().unwrap().iter()) .map(|(t, def_level)| { if *def_level == self.column_desc.max_def_level() { - Some(t) + Some(t.clone()) } else { None } }) .collect() } else { - data_buffer.into_iter().map(Some).collect() + self.data_buffer.iter().map(|x| Some(x.clone())).collect() }; let mut array = self.converter.convert(data)?; @@ -160,6 +196,10 @@ where array = arrow::compute::cast(&array, &self.data_type)?; } + self.data_buffer = vec![]; + self.def_levels_buffer = None; + self.rep_levels_buffer = None; + Ok(array) } @@ -208,6 +248,7 @@ where pages, def_levels_buffer: None, rep_levels_buffer: None, + data_buffer: vec![], column_desc, column_reader: None, converter, diff --git a/parquet/src/arrow/array_reader/empty_array.rs b/parquet/src/arrow/array_reader/empty_array.rs index b06646cc1c6..04a8fe399cb 100644 --- a/parquet/src/arrow/array_reader/empty_array.rs +++ b/parquet/src/arrow/array_reader/empty_array.rs @@ -54,14 +54,21 @@ impl ArrayReader for EmptyArrayReader { } fn next_batch(&mut self, batch_size: usize) -> Result { + let size = self.read_records(batch_size)?; + self.consume_batch(size) + } + + fn read_records(&mut self, batch_size: usize) -> Result { let len = self.remaining_rows.min(batch_size); self.remaining_rows -= len; + Ok(len) + } + fn consume_batch(&mut self, batch_size: usize) -> Result { let data = ArrayDataBuilder::new(self.data_type.clone()) - .len(len) + .len(batch_size) .build() .unwrap(); - Ok(Arc::new(StructArray::from(data))) } diff --git a/parquet/src/arrow/array_reader/list_array.rs b/parquet/src/arrow/array_reader/list_array.rs index 33bd9772a16..29a2ae979bc 100644 --- a/parquet/src/arrow/array_reader/list_array.rs +++ b/parquet/src/arrow/array_reader/list_array.rs @@ -79,8 +79,17 @@ impl ArrayReader for ListArrayReader { } fn next_batch(&mut self, batch_size: usize) -> Result { - let next_batch_array = self.item_reader.next_batch(batch_size)?; + let size = self.read_records(batch_size)?; + self.consume_batch(size) + } + + fn read_records(&mut self, batch_size: usize) -> Result { + let size = self.item_reader.read_records(batch_size)?; + Ok(size) + } + fn consume_batch(&mut self, batch_size: usize) -> Result { + let next_batch_array = self.item_reader.consume_batch(batch_size)?; if next_batch_array.len() == 0 { return Ok(new_empty_array(&self.data_type)); } diff --git a/parquet/src/arrow/array_reader/map_array.rs b/parquet/src/arrow/array_reader/map_array.rs index 00c3db41a37..0a295936e69 100644 --- a/parquet/src/arrow/array_reader/map_array.rs +++ b/parquet/src/arrow/array_reader/map_array.rs @@ -63,8 +63,25 @@ impl ArrayReader for MapArrayReader { } fn next_batch(&mut self, batch_size: usize) -> Result { - let key_array = self.key_reader.next_batch(batch_size)?; - let value_array = self.value_reader.next_batch(batch_size)?; + let size = self.read_records(batch_size)?; + self.consume_batch(size) + } + + fn read_records(&mut self, batch_size: usize) -> Result { + let key_len = self.key_reader.read_records(batch_size)?; + let value_len = self.value_reader.read_records(batch_size)?; + // Check that key and value have the same lengths + if key_len != value_len { + return Err(general_err!( + "Map key and value should have the same lengths." + )); + } + Ok(key_len) + } + + fn consume_batch(&mut self, batch_size: usize) -> Result { + let key_array = self.key_reader.consume_batch(batch_size)?; + let value_array = self.value_reader.consume_batch(batch_size)?; // Check that key and value have the same lengths let key_length = key_array.len(); diff --git a/parquet/src/arrow/array_reader/mod.rs b/parquet/src/arrow/array_reader/mod.rs index 8bdd6c071c3..5d97fcb7e69 100644 --- a/parquet/src/arrow/array_reader/mod.rs +++ b/parquet/src/arrow/array_reader/mod.rs @@ -64,6 +64,16 @@ pub trait ArrayReader: Send { /// Reads at most `batch_size` records into an arrow array and return it. fn next_batch(&mut self, batch_size: usize) -> Result; + /// Reads at most `batch_size` records' bytes into buffer + /// + /// Returns the number of records read, which can be less than `batch_size` if + /// pages is exhausted. + fn read_records(&mut self, batch_size: usize) -> Result; + + /// Consume at most `batch_size` currently stored buffer data + /// into an arrow array and return it. + fn consume_batch(&mut self, batch_size: usize) -> Result; + /// Skips over `num_records` records, returning the number of rows skipped fn skip_records(&mut self, num_records: usize) -> Result; @@ -115,7 +125,7 @@ impl RowGroupCollection for Arc { /// /// Returns the number of records read, which can be less than `batch_size` if /// pages is exhausted. -fn read_records( +fn read_records_inner( record_reader: &mut GenericRecordReader, pages: &mut dyn PageIterator, batch_size: usize, diff --git a/parquet/src/arrow/array_reader/null_array.rs b/parquet/src/arrow/array_reader/null_array.rs index 63f73d41e4f..626a9f5f9c7 100644 --- a/parquet/src/arrow/array_reader/null_array.rs +++ b/parquet/src/arrow/array_reader/null_array.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use crate::arrow::array_reader::{read_records, ArrayReader, skip_records}; +use crate::arrow::array_reader::{read_records_inner, skip_records, ArrayReader}; use crate::arrow::record_reader::buffer::ScalarValue; use crate::arrow::record_reader::RecordReader; use crate::column::page::PageIterator; @@ -80,8 +80,15 @@ where /// Reads at most `batch_size` records into array. fn next_batch(&mut self, batch_size: usize) -> Result { - read_records(&mut self.record_reader, self.pages.as_mut(), batch_size)?; + let size = self.read_records(batch_size)?; + self.consume_batch(size) + } + + fn read_records(&mut self, batch_size: usize) -> Result { + read_records_inner(&mut self.record_reader, self.pages.as_mut(), batch_size) + } + fn consume_batch(&mut self, _batch_size: usize) -> Result { // convert to arrays let array = arrow::array::NullArray::new(self.record_reader.num_values()); diff --git a/parquet/src/arrow/array_reader/primitive_array.rs b/parquet/src/arrow/array_reader/primitive_array.rs index 2a59f0326d3..f0d8694a68f 100644 --- a/parquet/src/arrow/array_reader/primitive_array.rs +++ b/parquet/src/arrow/array_reader/primitive_array.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use crate::arrow::array_reader::{read_records, skip_records, ArrayReader}; +use crate::arrow::array_reader::{read_records_inner, skip_records, ArrayReader}; use crate::arrow::record_reader::buffer::ScalarValue; use crate::arrow::record_reader::RecordReader; use crate::arrow::schema::parquet_to_arrow_field; @@ -97,8 +97,15 @@ where /// Reads at most `batch_size` records into array. fn next_batch(&mut self, batch_size: usize) -> Result { - read_records(&mut self.record_reader, self.pages.as_mut(), batch_size)?; + let size = self.read_records(batch_size)?; + self.consume_batch(size) + } + + fn read_records(&mut self, batch_size: usize) -> Result { + read_records_inner(&mut self.record_reader, self.pages.as_mut(), batch_size) + } + fn consume_batch(&mut self, _batch_size: usize) -> Result { let target_type = self.get_data_type().clone(); let arrow_data_type = match T::get_physical_type() { PhysicalType::BOOLEAN => ArrowType::Boolean, diff --git a/parquet/src/arrow/array_reader/struct_array.rs b/parquet/src/arrow/array_reader/struct_array.rs index 602c598f826..7477599de62 100644 --- a/parquet/src/arrow/array_reader/struct_array.rs +++ b/parquet/src/arrow/array_reader/struct_array.rs @@ -81,6 +81,31 @@ impl ArrayReader for StructArrayReader { /// null_bitmap[i] = (def_levels[i] >= self.def_level); /// ``` fn next_batch(&mut self, batch_size: usize) -> Result { + let size = self.read_records(batch_size)?; + self.consume_batch(size) + } + + fn read_records(&mut self, batch_size: usize) -> Result { + let mut read = None; + for child in self.children.iter_mut() { + let child_read = child.read_records(batch_size)?; + match read { + Some(expected) => { + if expected != child_read { + return Err(general_err!( + "StructArrayReader out of sync in read_records, expected {} skipped, got {}", + expected, + child_read + )); + } + } + None => read = Some(child_read), + } + } + Ok(read.unwrap_or(0)) + } + + fn consume_batch(&mut self, batch_size: usize) -> Result { if self.children.is_empty() { return Ok(Arc::new(StructArray::from(Vec::new()))); } @@ -88,7 +113,7 @@ impl ArrayReader for StructArrayReader { let children_array = self .children .iter_mut() - .map(|reader| reader.next_batch(batch_size)) + .map(|reader| reader.consume_batch(batch_size)) .collect::>>()?; // check that array child data has same size diff --git a/parquet/src/arrow/array_reader/test_util.rs b/parquet/src/arrow/array_reader/test_util.rs index 04c0f6c68f3..d4af935e355 100644 --- a/parquet/src/arrow/array_reader/test_util.rs +++ b/parquet/src/arrow/array_reader/test_util.rs @@ -141,6 +141,11 @@ impl ArrayReader for InMemoryArrayReader { } fn next_batch(&mut self, batch_size: usize) -> Result { + let size = self.read_records(batch_size)?; + self.consume_batch(size) + } + + fn read_records(&mut self, batch_size: usize) -> Result { assert_ne!(batch_size, 0); // This replicates the logical normally performed by // RecordReader to delimit semantic records @@ -164,10 +169,14 @@ impl ArrayReader for InMemoryArrayReader { } None => batch_size.min(self.array.len() - self.cur_idx), }; + Ok(read) + } + fn consume_batch(&mut self, batch_size: usize) -> Result { + assert_ne!(batch_size, 0); self.last_idx = self.cur_idx; - self.cur_idx += read; - Ok(self.array.slice(self.last_idx, read)) + self.cur_idx += batch_size; + Ok(self.array.slice(self.last_idx, batch_size)) } fn skip_records(&mut self, num_records: usize) -> Result { From 76b7d40a308162bea98fd5790638d550e98c3eea Mon Sep 17 00:00:00 2001 From: yangjiang Date: Sat, 30 Jul 2022 13:13:08 +0800 Subject: [PATCH 2/6] fix ut --- .../array_reader/complex_object_array.rs | 85 +++++++++++-------- 1 file changed, 50 insertions(+), 35 deletions(-) diff --git a/parquet/src/arrow/array_reader/complex_object_array.rs b/parquet/src/arrow/array_reader/complex_object_array.rs index 9ed11f15c12..d025b5c5a20 100644 --- a/parquet/src/arrow/array_reader/complex_object_array.rs +++ b/parquet/src/arrow/array_reader/complex_object_array.rs @@ -149,7 +149,7 @@ where } } - if let Some(mut rep_levels_buffer) = rep_levels_buffer { + if let Some(mut rep_levels_buffer) = rep_levels_buffer { match &mut self.rep_levels_buffer { None => { self.rep_levels_buffer = Some(rep_levels_buffer); @@ -196,10 +196,15 @@ where array = arrow::compute::cast(&array, &self.data_type)?; } - self.data_buffer = vec![]; - self.def_levels_buffer = None; - self.rep_levels_buffer = None; - + self.data_buffer = self.data_buffer.split_off(batch_size); + if let Some(buf) = &mut self.def_levels_buffer { + let rest = buf.split_off(batch_size); + self.def_levels_buffer = Some(rest); + } + if let Some(buf) = &mut self.rep_levels_buffer { + let rest = buf.split_off(batch_size); + self.rep_levels_buffer = Some(rest); + } Ok(array) } @@ -208,8 +213,11 @@ where Some(reader) => reader.skip_records(num_records), None => { if self.next_column_reader()? { - self.column_reader.as_mut().unwrap().skip_records(num_records) - }else { + self.column_reader + .as_mut() + .unwrap() + .skip_records(num_records) + } else { Ok(0) } } @@ -390,30 +398,32 @@ mod tests { let mut accu_len: usize = 0; - let array = array_reader.next_batch(values_per_page / 2).unwrap(); - assert_eq!(array.len(), values_per_page / 2); + let len = array_reader.read_records(values_per_page / 2).unwrap(); + assert_eq!(len, values_per_page / 2); assert_eq!( - Some(&def_levels[accu_len..(accu_len + array.len())]), + Some(&def_levels[accu_len..(accu_len + len)]), array_reader.get_def_levels() ); assert_eq!( - Some(&rep_levels[accu_len..(accu_len + array.len())]), + Some(&rep_levels[accu_len..(accu_len + len)]), array_reader.get_rep_levels() ); - accu_len += array.len(); + accu_len += len; + array_reader.consume_batch(values_per_page / 2).unwrap(); // Read next values_per_page values, the first values_per_page/2 ones are from the first column chunk, // and the last values_per_page/2 ones are from the second column chunk - let array = array_reader.next_batch(values_per_page).unwrap(); - assert_eq!(array.len(), values_per_page); + let len = array_reader.read_records(values_per_page).unwrap(); + assert_eq!(len, values_per_page); assert_eq!( - Some(&def_levels[accu_len..(accu_len + array.len())]), + Some(&def_levels[accu_len..(accu_len + len)]), array_reader.get_def_levels() ); assert_eq!( - Some(&rep_levels[accu_len..(accu_len + array.len())]), + Some(&rep_levels[accu_len..(accu_len + len)]), array_reader.get_rep_levels() ); + let array = array_reader.consume_batch(values_per_page).unwrap(); let strings = array.as_any().downcast_ref::().unwrap(); for i in 0..array.len() { if array.is_valid(i) { @@ -425,19 +435,20 @@ mod tests { assert_eq!(all_values[i + accu_len], None) } } - accu_len += array.len(); + accu_len += len; // Try to read values_per_page values, however there are only values_per_page/2 values - let array = array_reader.next_batch(values_per_page).unwrap(); - assert_eq!(array.len(), values_per_page / 2); + let len = array_reader.read_records(values_per_page).unwrap(); + assert_eq!(len, values_per_page / 2); assert_eq!( - Some(&def_levels[accu_len..(accu_len + array.len())]), + Some(&def_levels[accu_len..(accu_len + len)]), array_reader.get_def_levels() ); assert_eq!( - Some(&rep_levels[accu_len..(accu_len + array.len())]), + Some(&rep_levels[accu_len..(accu_len + len)]), array_reader.get_rep_levels() ); + array_reader.consume_batch(len).unwrap(); } #[test] @@ -532,31 +543,34 @@ mod tests { let mut accu_len: usize = 0; // println!("---------- reading a batch of {} values ----------", values_per_page / 2); - let array = array_reader.next_batch(values_per_page / 2).unwrap(); - assert_eq!(array.len(), values_per_page / 2); + let len = array_reader.read_records(values_per_page / 2).unwrap(); + assert_eq!(len, values_per_page / 2); assert_eq!( - Some(&def_levels[accu_len..(accu_len + array.len())]), + Some(&def_levels[accu_len..(accu_len + len)]), array_reader.get_def_levels() ); assert_eq!( - Some(&rep_levels[accu_len..(accu_len + array.len())]), + Some(&rep_levels[accu_len..(accu_len + len)]), array_reader.get_rep_levels() ); - accu_len += array.len(); + accu_len += len; + array_reader.consume_batch(len).unwrap(); // Read next values_per_page values, the first values_per_page/2 ones are from the first column chunk, // and the last values_per_page/2 ones are from the second column chunk // println!("---------- reading a batch of {} values ----------", values_per_page); - let array = array_reader.next_batch(values_per_page).unwrap(); - assert_eq!(array.len(), values_per_page); + //let array = array_reader.next_batch(values_per_page).unwrap(); + let len = array_reader.read_records(values_per_page).unwrap(); + assert_eq!(len, values_per_page); assert_eq!( - Some(&def_levels[accu_len..(accu_len + array.len())]), + Some(&def_levels[accu_len..(accu_len + len)]), array_reader.get_def_levels() ); assert_eq!( - Some(&rep_levels[accu_len..(accu_len + array.len())]), + Some(&rep_levels[accu_len..(accu_len + len)]), array_reader.get_rep_levels() ); + let array = array_reader.consume_batch(len).unwrap(); let strings = array.as_any().downcast_ref::().unwrap(); for i in 0..array.len() { if array.is_valid(i) { @@ -568,19 +582,20 @@ mod tests { assert_eq!(all_values[i + accu_len], None) } } - accu_len += array.len(); + accu_len += len; // Try to read values_per_page values, however there are only values_per_page/2 values // println!("---------- reading a batch of {} values ----------", values_per_page); - let array = array_reader.next_batch(values_per_page).unwrap(); - assert_eq!(array.len(), values_per_page / 2); + let len = array_reader.read_records(values_per_page).unwrap(); + assert_eq!(len, values_per_page / 2); assert_eq!( - Some(&def_levels[accu_len..(accu_len + array.len())]), + Some(&def_levels[accu_len..(accu_len + len)]), array_reader.get_def_levels() ); assert_eq!( - Some(&rep_levels[accu_len..(accu_len + array.len())]), + Some(&rep_levels[accu_len..(accu_len + len)]), array_reader.get_rep_levels() ); + array_reader.consume_batch(len).unwrap(); } } From bc07bdb738dccacdf26c951458e9e89428a4c8d3 Mon Sep 17 00:00:00 2001 From: yangjiang Date: Sun, 31 Jul 2022 18:52:30 +0800 Subject: [PATCH 3/6] fix comment --- parquet/src/arrow/array_reader/byte_array.rs | 19 ++------ .../array_reader/byte_array_dictionary.rs | 20 ++------- .../array_reader/complex_object_array.rs | 44 +++++-------------- parquet/src/arrow/array_reader/empty_array.rs | 13 +++--- parquet/src/arrow/array_reader/list_array.rs | 9 +--- parquet/src/arrow/array_reader/map_array.rs | 11 ++--- parquet/src/arrow/array_reader/mod.rs | 11 +++-- parquet/src/arrow/array_reader/null_array.rs | 12 ++--- .../src/arrow/array_reader/primitive_array.rs | 12 ++--- .../src/arrow/array_reader/struct_array.rs | 44 +++++++++---------- parquet/src/arrow/array_reader/test_util.rs | 12 ++--- 11 files changed, 68 insertions(+), 139 deletions(-) diff --git a/parquet/src/arrow/array_reader/byte_array.rs b/parquet/src/arrow/array_reader/byte_array.rs index 9b293dfadbf..172aeb96d6d 100644 --- a/parquet/src/arrow/array_reader/byte_array.rs +++ b/parquet/src/arrow/array_reader/byte_array.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use crate::arrow::array_reader::{read_records_inner, skip_records, ArrayReader}; +use crate::arrow::array_reader::{read_records, skip_records, ArrayReader}; use crate::arrow::buffer::offset_buffer::OffsetBuffer; use crate::arrow::record_reader::buffer::ScalarValue; use crate::arrow::record_reader::GenericRecordReader; @@ -108,24 +108,11 @@ impl ArrayReader for ByteArrayReader { &self.data_type } - fn next_batch(&mut self, batch_size: usize) -> Result { - let size = self.read_records(batch_size)?; - self.consume_batch(size) - } - fn read_records(&mut self, batch_size: usize) -> Result { - read_records_inner(&mut self.record_reader, self.pages.as_mut(), batch_size) + read_records(&mut self.record_reader, self.pages.as_mut(), batch_size) } - fn consume_batch(&mut self, batch_size: usize) -> Result { - if self.record_reader.num_records() < batch_size { - return Err(general_err!( - "Invalid batch_size: {}, current consume: {} records in buffer.", - batch_size, - self.record_reader.num_records() - )); - } - + fn consume_batch(&mut self) -> Result { let buffer = self.record_reader.consume_record_data(); let null_buffer = self.record_reader.consume_bitmap_buffer(); self.def_levels_buffer = self.record_reader.consume_def_levels(); diff --git a/parquet/src/arrow/array_reader/byte_array_dictionary.rs b/parquet/src/arrow/array_reader/byte_array_dictionary.rs index 88a8b69efbd..0a5d94fa6ae 100644 --- a/parquet/src/arrow/array_reader/byte_array_dictionary.rs +++ b/parquet/src/arrow/array_reader/byte_array_dictionary.rs @@ -25,7 +25,7 @@ use arrow::buffer::Buffer; use arrow::datatypes::{ArrowNativeType, DataType as ArrowType}; use crate::arrow::array_reader::byte_array::{ByteArrayDecoder, ByteArrayDecoderPlain}; -use crate::arrow::array_reader::{read_records_inner, skip_records, ArrayReader}; +use crate::arrow::array_reader::{read_records, skip_records, ArrayReader}; use crate::arrow::buffer::{ dictionary_buffer::DictionaryBuffer, offset_buffer::OffsetBuffer, }; @@ -167,25 +167,11 @@ where &self.data_type } - fn next_batch(&mut self, batch_size: usize) -> Result { - let size = self.read_records(batch_size)?; - self.consume_batch(size) - } - fn read_records(&mut self, batch_size: usize) -> Result { - read_records_inner(&mut self.record_reader, self.pages.as_mut(), batch_size) + read_records(&mut self.record_reader, self.pages.as_mut(), batch_size) } - fn consume_batch(&mut self, batch_size: usize) -> Result { - let num_records = self.record_reader.num_records(); - if num_records < batch_size { - return Err(general_err!( - "Invalid batch_size: {}, current consume: {} records in buffer.", - batch_size, - num_records - )); - } - + fn consume_batch(&mut self) -> Result { let buffer = self.record_reader.consume_record_data(); let null_buffer = self.record_reader.consume_bitmap_buffer(); let array = buffer.into_array(null_buffer, &self.data_type)?; diff --git a/parquet/src/arrow/array_reader/complex_object_array.rs b/parquet/src/arrow/array_reader/complex_object_array.rs index d025b5c5a20..e2a205b219c 100644 --- a/parquet/src/arrow/array_reader/complex_object_array.rs +++ b/parquet/src/arrow/array_reader/complex_object_array.rs @@ -21,7 +21,7 @@ use crate::arrow::schema::parquet_to_arrow_field; use crate::column::page::PageIterator; use crate::column::reader::ColumnReaderImpl; use crate::data_type::DataType; -use crate::errors::{ParquetError, Result}; +use crate::errors::Result; use crate::schema::types::ColumnDescPtr; use arrow::array::ArrayRef; use arrow::datatypes::DataType as ArrowType; @@ -60,11 +60,6 @@ where &self.data_type } - fn next_batch(&mut self, batch_size: usize) -> Result { - let size = self.read_records(batch_size)?; - self.consume_batch(size) - } - fn read_records(&mut self, batch_size: usize) -> Result { // Try to initialize column reader if self.column_reader.is_none() { @@ -163,17 +158,7 @@ where Ok(num_read) } - fn consume_batch(&mut self, batch_size: usize) -> Result { - // uncheck null count - let len = self.data_buffer.len(); - if len < batch_size { - return Err(general_err!( - "Invalid batch_size: {}, current consume: {} records in buffer.", - batch_size, - len - )); - } - + fn consume_batch(&mut self) -> Result { let data: Vec> = if self.def_levels_buffer.is_some() { self.data_buffer .iter() @@ -196,15 +181,10 @@ where array = arrow::compute::cast(&array, &self.data_type)?; } - self.data_buffer = self.data_buffer.split_off(batch_size); - if let Some(buf) = &mut self.def_levels_buffer { - let rest = buf.split_off(batch_size); - self.def_levels_buffer = Some(rest); - } - if let Some(buf) = &mut self.rep_levels_buffer { - let rest = buf.split_off(batch_size); - self.rep_levels_buffer = Some(rest); - } + self.data_buffer = vec![]; + self.def_levels_buffer = None; + self.rep_levels_buffer = None; + Ok(array) } @@ -409,7 +389,7 @@ mod tests { array_reader.get_rep_levels() ); accu_len += len; - array_reader.consume_batch(values_per_page / 2).unwrap(); + array_reader.consume_batch().unwrap(); // Read next values_per_page values, the first values_per_page/2 ones are from the first column chunk, // and the last values_per_page/2 ones are from the second column chunk @@ -423,7 +403,7 @@ mod tests { Some(&rep_levels[accu_len..(accu_len + len)]), array_reader.get_rep_levels() ); - let array = array_reader.consume_batch(values_per_page).unwrap(); + let array = array_reader.consume_batch().unwrap(); let strings = array.as_any().downcast_ref::().unwrap(); for i in 0..array.len() { if array.is_valid(i) { @@ -448,7 +428,7 @@ mod tests { Some(&rep_levels[accu_len..(accu_len + len)]), array_reader.get_rep_levels() ); - array_reader.consume_batch(len).unwrap(); + array_reader.consume_batch().unwrap(); } #[test] @@ -554,7 +534,7 @@ mod tests { array_reader.get_rep_levels() ); accu_len += len; - array_reader.consume_batch(len).unwrap(); + array_reader.consume_batch().unwrap(); // Read next values_per_page values, the first values_per_page/2 ones are from the first column chunk, // and the last values_per_page/2 ones are from the second column chunk @@ -570,7 +550,7 @@ mod tests { Some(&rep_levels[accu_len..(accu_len + len)]), array_reader.get_rep_levels() ); - let array = array_reader.consume_batch(len).unwrap(); + let array = array_reader.consume_batch().unwrap(); let strings = array.as_any().downcast_ref::().unwrap(); for i in 0..array.len() { if array.is_valid(i) { @@ -596,6 +576,6 @@ mod tests { Some(&rep_levels[accu_len..(accu_len + len)]), array_reader.get_rep_levels() ); - array_reader.consume_batch(len).unwrap(); + array_reader.consume_batch().unwrap(); } } diff --git a/parquet/src/arrow/array_reader/empty_array.rs b/parquet/src/arrow/array_reader/empty_array.rs index 04a8fe399cb..abe839b9dc2 100644 --- a/parquet/src/arrow/array_reader/empty_array.rs +++ b/parquet/src/arrow/array_reader/empty_array.rs @@ -33,6 +33,7 @@ pub fn make_empty_array_reader(row_count: usize) -> Box { struct EmptyArrayReader { data_type: ArrowType, remaining_rows: usize, + need_consume_records: usize, } impl EmptyArrayReader { @@ -40,6 +41,7 @@ impl EmptyArrayReader { Self { data_type: ArrowType::Struct(vec![]), remaining_rows: row_count, + need_consume_records: 0, } } } @@ -53,22 +55,19 @@ impl ArrayReader for EmptyArrayReader { &self.data_type } - fn next_batch(&mut self, batch_size: usize) -> Result { - let size = self.read_records(batch_size)?; - self.consume_batch(size) - } - fn read_records(&mut self, batch_size: usize) -> Result { let len = self.remaining_rows.min(batch_size); self.remaining_rows -= len; + self.need_consume_records += len; Ok(len) } - fn consume_batch(&mut self, batch_size: usize) -> Result { + fn consume_batch(&mut self) -> Result { let data = ArrayDataBuilder::new(self.data_type.clone()) - .len(batch_size) + .len(self.need_consume_records) .build() .unwrap(); + self.need_consume_records = 0; Ok(Arc::new(StructArray::from(data))) } diff --git a/parquet/src/arrow/array_reader/list_array.rs b/parquet/src/arrow/array_reader/list_array.rs index 29a2ae979bc..c245c61312f 100644 --- a/parquet/src/arrow/array_reader/list_array.rs +++ b/parquet/src/arrow/array_reader/list_array.rs @@ -78,18 +78,13 @@ impl ArrayReader for ListArrayReader { &self.data_type } - fn next_batch(&mut self, batch_size: usize) -> Result { - let size = self.read_records(batch_size)?; - self.consume_batch(size) - } - fn read_records(&mut self, batch_size: usize) -> Result { let size = self.item_reader.read_records(batch_size)?; Ok(size) } - fn consume_batch(&mut self, batch_size: usize) -> Result { - let next_batch_array = self.item_reader.consume_batch(batch_size)?; + fn consume_batch(&mut self) -> Result { + let next_batch_array = self.item_reader.consume_batch()?; if next_batch_array.len() == 0 { return Ok(new_empty_array(&self.data_type)); } diff --git a/parquet/src/arrow/array_reader/map_array.rs b/parquet/src/arrow/array_reader/map_array.rs index 0a295936e69..83ba63ca170 100644 --- a/parquet/src/arrow/array_reader/map_array.rs +++ b/parquet/src/arrow/array_reader/map_array.rs @@ -62,11 +62,6 @@ impl ArrayReader for MapArrayReader { &self.data_type } - fn next_batch(&mut self, batch_size: usize) -> Result { - let size = self.read_records(batch_size)?; - self.consume_batch(size) - } - fn read_records(&mut self, batch_size: usize) -> Result { let key_len = self.key_reader.read_records(batch_size)?; let value_len = self.value_reader.read_records(batch_size)?; @@ -79,9 +74,9 @@ impl ArrayReader for MapArrayReader { Ok(key_len) } - fn consume_batch(&mut self, batch_size: usize) -> Result { - let key_array = self.key_reader.consume_batch(batch_size)?; - let value_array = self.value_reader.consume_batch(batch_size)?; + fn consume_batch(&mut self) -> Result { + let key_array = self.key_reader.consume_batch()?; + let value_array = self.value_reader.consume_batch()?; // Check that key and value have the same lengths let key_length = key_array.len(); diff --git a/parquet/src/arrow/array_reader/mod.rs b/parquet/src/arrow/array_reader/mod.rs index 5d97fcb7e69..d7665ef0f6b 100644 --- a/parquet/src/arrow/array_reader/mod.rs +++ b/parquet/src/arrow/array_reader/mod.rs @@ -62,7 +62,10 @@ pub trait ArrayReader: Send { fn get_data_type(&self) -> &ArrowType; /// Reads at most `batch_size` records into an arrow array and return it. - fn next_batch(&mut self, batch_size: usize) -> Result; + fn next_batch(&mut self, batch_size: usize) -> Result { + self.read_records(batch_size)?; + self.consume_batch() + } /// Reads at most `batch_size` records' bytes into buffer /// @@ -70,9 +73,9 @@ pub trait ArrayReader: Send { /// pages is exhausted. fn read_records(&mut self, batch_size: usize) -> Result; - /// Consume at most `batch_size` currently stored buffer data + /// Consume all currently stored buffer data /// into an arrow array and return it. - fn consume_batch(&mut self, batch_size: usize) -> Result; + fn consume_batch(&mut self) -> Result; /// Skips over `num_records` records, returning the number of rows skipped fn skip_records(&mut self, num_records: usize) -> Result; @@ -125,7 +128,7 @@ impl RowGroupCollection for Arc { /// /// Returns the number of records read, which can be less than `batch_size` if /// pages is exhausted. -fn read_records_inner( +fn read_records( record_reader: &mut GenericRecordReader, pages: &mut dyn PageIterator, batch_size: usize, diff --git a/parquet/src/arrow/array_reader/null_array.rs b/parquet/src/arrow/array_reader/null_array.rs index 626a9f5f9c7..682d15f8a17 100644 --- a/parquet/src/arrow/array_reader/null_array.rs +++ b/parquet/src/arrow/array_reader/null_array.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use crate::arrow::array_reader::{read_records_inner, skip_records, ArrayReader}; +use crate::arrow::array_reader::{read_records, skip_records, ArrayReader}; use crate::arrow::record_reader::buffer::ScalarValue; use crate::arrow::record_reader::RecordReader; use crate::column::page::PageIterator; @@ -78,17 +78,11 @@ where &self.data_type } - /// Reads at most `batch_size` records into array. - fn next_batch(&mut self, batch_size: usize) -> Result { - let size = self.read_records(batch_size)?; - self.consume_batch(size) - } - fn read_records(&mut self, batch_size: usize) -> Result { - read_records_inner(&mut self.record_reader, self.pages.as_mut(), batch_size) + read_records(&mut self.record_reader, self.pages.as_mut(), batch_size) } - fn consume_batch(&mut self, _batch_size: usize) -> Result { + fn consume_batch(&mut self) -> Result { // convert to arrays let array = arrow::array::NullArray::new(self.record_reader.num_values()); diff --git a/parquet/src/arrow/array_reader/primitive_array.rs b/parquet/src/arrow/array_reader/primitive_array.rs index f0d8694a68f..5892bd20fd7 100644 --- a/parquet/src/arrow/array_reader/primitive_array.rs +++ b/parquet/src/arrow/array_reader/primitive_array.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use crate::arrow::array_reader::{read_records_inner, skip_records, ArrayReader}; +use crate::arrow::array_reader::{read_records, skip_records, ArrayReader}; use crate::arrow::record_reader::buffer::ScalarValue; use crate::arrow::record_reader::RecordReader; use crate::arrow::schema::parquet_to_arrow_field; @@ -95,17 +95,11 @@ where &self.data_type } - /// Reads at most `batch_size` records into array. - fn next_batch(&mut self, batch_size: usize) -> Result { - let size = self.read_records(batch_size)?; - self.consume_batch(size) - } - fn read_records(&mut self, batch_size: usize) -> Result { - read_records_inner(&mut self.record_reader, self.pages.as_mut(), batch_size) + read_records(&mut self.record_reader, self.pages.as_mut(), batch_size) } - fn consume_batch(&mut self, _batch_size: usize) -> Result { + fn consume_batch(&mut self) -> Result { let target_type = self.get_data_type().clone(); let arrow_data_type = match T::get_physical_type() { PhysicalType::BOOLEAN => ArrowType::Boolean, diff --git a/parquet/src/arrow/array_reader/struct_array.rs b/parquet/src/arrow/array_reader/struct_array.rs index 7477599de62..b333c66cb21 100644 --- a/parquet/src/arrow/array_reader/struct_array.rs +++ b/parquet/src/arrow/array_reader/struct_array.rs @@ -63,28 +63,6 @@ impl ArrayReader for StructArrayReader { &self.data_type } - /// Read `batch_size` struct records. - /// - /// Definition levels of struct array is calculated as following: - /// ```ignore - /// def_levels[i] = min(child1_def_levels[i], child2_def_levels[i], ..., - /// childn_def_levels[i]); - /// ``` - /// - /// Repetition levels of struct array is calculated as following: - /// ```ignore - /// rep_levels[i] = child1_rep_levels[i]; - /// ``` - /// - /// The null bitmap of struct array is calculated from def_levels: - /// ```ignore - /// null_bitmap[i] = (def_levels[i] >= self.def_level); - /// ``` - fn next_batch(&mut self, batch_size: usize) -> Result { - let size = self.read_records(batch_size)?; - self.consume_batch(size) - } - fn read_records(&mut self, batch_size: usize) -> Result { let mut read = None; for child in self.children.iter_mut() { @@ -105,7 +83,25 @@ impl ArrayReader for StructArrayReader { Ok(read.unwrap_or(0)) } - fn consume_batch(&mut self, batch_size: usize) -> Result { + /// Consume struct records. + /// + /// Definition levels of struct array is calculated as following: + /// ```ignore + /// def_levels[i] = min(child1_def_levels[i], child2_def_levels[i], ..., + /// childn_def_levels[i]); + /// ``` + /// + /// Repetition levels of struct array is calculated as following: + /// ```ignore + /// rep_levels[i] = child1_rep_levels[i]; + /// ``` + /// + /// The null bitmap of struct array is calculated from def_levels: + /// ```ignore + /// null_bitmap[i] = (def_levels[i] >= self.def_level); + /// ``` + /// + fn consume_batch(&mut self) -> Result { if self.children.is_empty() { return Ok(Arc::new(StructArray::from(Vec::new()))); } @@ -113,7 +109,7 @@ impl ArrayReader for StructArrayReader { let children_array = self .children .iter_mut() - .map(|reader| reader.consume_batch(batch_size)) + .map(|reader| reader.consume_batch()) .collect::>>()?; // check that array child data has same size diff --git a/parquet/src/arrow/array_reader/test_util.rs b/parquet/src/arrow/array_reader/test_util.rs index d4af935e355..da9b8d3bf9b 100644 --- a/parquet/src/arrow/array_reader/test_util.rs +++ b/parquet/src/arrow/array_reader/test_util.rs @@ -101,6 +101,7 @@ pub struct InMemoryArrayReader { rep_levels: Option>, last_idx: usize, cur_idx: usize, + need_consume_records: usize, } impl InMemoryArrayReader { @@ -127,6 +128,7 @@ impl InMemoryArrayReader { rep_levels, cur_idx: 0, last_idx: 0, + need_consume_records: 0, } } } @@ -140,11 +142,6 @@ impl ArrayReader for InMemoryArrayReader { &self.data_type } - fn next_batch(&mut self, batch_size: usize) -> Result { - let size = self.read_records(batch_size)?; - self.consume_batch(size) - } - fn read_records(&mut self, batch_size: usize) -> Result { assert_ne!(batch_size, 0); // This replicates the logical normally performed by @@ -169,13 +166,16 @@ impl ArrayReader for InMemoryArrayReader { } None => batch_size.min(self.array.len() - self.cur_idx), }; + self.need_consume_records += read; Ok(read) } - fn consume_batch(&mut self, batch_size: usize) -> Result { + fn consume_batch(&mut self) -> Result { + let batch_size = self.need_consume_records; assert_ne!(batch_size, 0); self.last_idx = self.cur_idx; self.cur_idx += batch_size; + self.need_consume_records = 0; Ok(self.array.slice(self.last_idx, batch_size)) } From 631f55a089ffd859838ed0e4ac8ae579176b0751 Mon Sep 17 00:00:00 2001 From: yangjiang Date: Sun, 31 Jul 2022 21:36:44 +0800 Subject: [PATCH 4/6] avoid clone. --- parquet/src/arrow/array_reader/complex_object_array.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/parquet/src/arrow/array_reader/complex_object_array.rs b/parquet/src/arrow/array_reader/complex_object_array.rs index e2a205b219c..b3e64818db0 100644 --- a/parquet/src/arrow/array_reader/complex_object_array.rs +++ b/parquet/src/arrow/array_reader/complex_object_array.rs @@ -160,12 +160,13 @@ where fn consume_batch(&mut self) -> Result { let data: Vec> = if self.def_levels_buffer.is_some() { - self.data_buffer - .iter() + let data_buffer = std::mem::take(&mut self.data_buffer); + data_buffer + .into_iter() .zip(self.def_levels_buffer.as_ref().unwrap().iter()) .map(|(t, def_level)| { if *def_level == self.column_desc.max_def_level() { - Some(t.clone()) + Some(t) } else { None } From 429e2413f7d6f19bc5f627592e3c233018dac4cf Mon Sep 17 00:00:00 2001 From: yangjiang Date: Tue, 2 Aug 2022 22:45:45 +0800 Subject: [PATCH 5/6] fix new ut --- .../array_reader/complex_object_array.rs | 38 ++++++++++++++----- parquet/src/arrow/arrow_reader.rs | 38 +++++++++++++++++++ parquet/src/arrow/arrow_writer/mod.rs | 2 +- 3 files changed, 67 insertions(+), 11 deletions(-) diff --git a/parquet/src/arrow/array_reader/complex_object_array.rs b/parquet/src/arrow/array_reader/complex_object_array.rs index b3e64818db0..299c9de9728 100644 --- a/parquet/src/arrow/array_reader/complex_object_array.rs +++ b/parquet/src/arrow/array_reader/complex_object_array.rs @@ -43,6 +43,9 @@ where column_desc: ColumnDescPtr, column_reader: Option>, converter: C, + need_consume_def_levels_buffer: Option>, + need_consume_rep_levels_buffer: Option>, + before_consume: bool, _parquet_type_marker: PhantomData, _converter_marker: PhantomData, } @@ -61,6 +64,9 @@ where } fn read_records(&mut self, batch_size: usize) -> Result { + if !self.before_consume { + self.before_consume = true; + } // Try to initialize column reader if self.column_reader.is_none() { self.next_column_reader()?; @@ -136,18 +142,18 @@ where .for_each(|buf| buf.truncate(num_read)); if let Some(mut def_levels_buffer) = def_levels_buffer { - match &mut self.def_levels_buffer { + match &mut self.need_consume_def_levels_buffer { None => { - self.def_levels_buffer = Some(def_levels_buffer); + self.need_consume_def_levels_buffer = Some(def_levels_buffer); } Some(buf) => buf.append(&mut def_levels_buffer), } } if let Some(mut rep_levels_buffer) = rep_levels_buffer { - match &mut self.rep_levels_buffer { + match &mut self.need_consume_rep_levels_buffer { None => { - self.rep_levels_buffer = Some(rep_levels_buffer); + self.need_consume_rep_levels_buffer = Some(rep_levels_buffer); } Some(buf) => buf.append(&mut rep_levels_buffer), } @@ -159,11 +165,11 @@ where } fn consume_batch(&mut self) -> Result { - let data: Vec> = if self.def_levels_buffer.is_some() { + let data: Vec> = if self.need_consume_def_levels_buffer.is_some() { let data_buffer = std::mem::take(&mut self.data_buffer); data_buffer .into_iter() - .zip(self.def_levels_buffer.as_ref().unwrap().iter()) + .zip(self.need_consume_def_levels_buffer.as_ref().unwrap().iter()) .map(|(t, def_level)| { if *def_level == self.column_desc.max_def_level() { Some(t) @@ -183,8 +189,9 @@ where } self.data_buffer = vec![]; - self.def_levels_buffer = None; - self.rep_levels_buffer = None; + self.def_levels_buffer = std::mem::take(&mut self.need_consume_def_levels_buffer); + self.rep_levels_buffer = std::mem::take(&mut self.need_consume_rep_levels_buffer); + self.before_consume = false; Ok(array) } @@ -206,11 +213,19 @@ where } fn get_def_levels(&self) -> Option<&[i16]> { - self.def_levels_buffer.as_deref() + if self.before_consume { + self.need_consume_def_levels_buffer.as_deref() + } else { + self.def_levels_buffer.as_deref() + } } fn get_rep_levels(&self) -> Option<&[i16]> { - self.rep_levels_buffer.as_deref() + if self.before_consume { + self.need_consume_rep_levels_buffer.as_deref() + } else { + self.rep_levels_buffer.as_deref() + } } } @@ -241,6 +256,9 @@ where column_desc, column_reader: None, converter, + need_consume_def_levels_buffer: None, + need_consume_rep_levels_buffer: None, + before_consume: true, _parquet_type_marker: PhantomData, _converter_marker: PhantomData, }) diff --git a/parquet/src/arrow/arrow_reader.rs b/parquet/src/arrow/arrow_reader.rs index 26305cd41ba..3cd5cb9d4ed 100644 --- a/parquet/src/arrow/arrow_reader.rs +++ b/parquet/src/arrow/arrow_reader.rs @@ -769,6 +769,44 @@ mod tests { assert_eq!(&written.slice(6, 2), &read[2]); } + #[test] + fn test_int32_nullable_struct() { + let int32 = Int32Array::from_iter_values([1, 2, 3, 4, 5, 6, 7, 8]); + let data = ArrayDataBuilder::new(ArrowDataType::Struct(vec![Field::new( + "int32", + int32.data_type().clone(), + false, + )])) + .len(8) + .null_bit_buffer(Some(Buffer::from(&[0b11101111]))) + .child_data(vec![int32.into_data()]) + .build() + .unwrap(); + + let written = RecordBatch::try_from_iter([( + "struct", + Arc::new(StructArray::from(data)) as ArrayRef, + )]) + .unwrap(); + + let mut buffer = Vec::with_capacity(1024); + let mut writer = + ArrowWriter::try_new(&mut buffer, written.schema(), None).unwrap(); + writer.write(&written).unwrap(); + writer.close().unwrap(); + + let read = ParquetFileArrowReader::try_new(Bytes::from(buffer)) + .unwrap() + .get_record_reader(3) + .unwrap() + .collect::>>() + .unwrap(); + + assert_eq!(&written.slice(0, 3), &read[0]); + assert_eq!(&written.slice(3, 3), &read[1]); + assert_eq!(&written.slice(6, 2), &read[2]); + } + #[test] #[ignore] // https://github.com/apache/arrow-rs/issues/2253 fn test_decimal_list() { diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index 1c95fcc27c1..49531d9724a 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -1161,7 +1161,7 @@ mod tests { Some(props), ) .expect("Unable to write file"); - writer.write(&expected_batch).unwrap(); + writer.write(expected_batch).unwrap(); writer.close().unwrap(); let mut arrow_reader = From fa42ba0adbec3f6695c19d8de873386f7eaa15f2 Mon Sep 17 00:00:00 2001 From: yangjiang Date: Wed, 3 Aug 2022 10:59:03 +0800 Subject: [PATCH 6/6] fix comment --- .../array_reader/complex_object_array.rs | 28 +++++++++---------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/parquet/src/arrow/array_reader/complex_object_array.rs b/parquet/src/arrow/array_reader/complex_object_array.rs index 299c9de9728..79b53733176 100644 --- a/parquet/src/arrow/array_reader/complex_object_array.rs +++ b/parquet/src/arrow/array_reader/complex_object_array.rs @@ -43,8 +43,8 @@ where column_desc: ColumnDescPtr, column_reader: Option>, converter: C, - need_consume_def_levels_buffer: Option>, - need_consume_rep_levels_buffer: Option>, + in_progress_def_levels_buffer: Option>, + in_progress_rep_levels_buffer: Option>, before_consume: bool, _parquet_type_marker: PhantomData, _converter_marker: PhantomData, @@ -142,18 +142,18 @@ where .for_each(|buf| buf.truncate(num_read)); if let Some(mut def_levels_buffer) = def_levels_buffer { - match &mut self.need_consume_def_levels_buffer { + match &mut self.in_progress_def_levels_buffer { None => { - self.need_consume_def_levels_buffer = Some(def_levels_buffer); + self.in_progress_def_levels_buffer = Some(def_levels_buffer); } Some(buf) => buf.append(&mut def_levels_buffer), } } if let Some(mut rep_levels_buffer) = rep_levels_buffer { - match &mut self.need_consume_rep_levels_buffer { + match &mut self.in_progress_rep_levels_buffer { None => { - self.need_consume_rep_levels_buffer = Some(rep_levels_buffer); + self.in_progress_rep_levels_buffer = Some(rep_levels_buffer); } Some(buf) => buf.append(&mut rep_levels_buffer), } @@ -165,11 +165,11 @@ where } fn consume_batch(&mut self) -> Result { - let data: Vec> = if self.need_consume_def_levels_buffer.is_some() { + let data: Vec> = if self.in_progress_def_levels_buffer.is_some() { let data_buffer = std::mem::take(&mut self.data_buffer); data_buffer .into_iter() - .zip(self.need_consume_def_levels_buffer.as_ref().unwrap().iter()) + .zip(self.in_progress_def_levels_buffer.as_ref().unwrap().iter()) .map(|(t, def_level)| { if *def_level == self.column_desc.max_def_level() { Some(t) @@ -189,8 +189,8 @@ where } self.data_buffer = vec![]; - self.def_levels_buffer = std::mem::take(&mut self.need_consume_def_levels_buffer); - self.rep_levels_buffer = std::mem::take(&mut self.need_consume_rep_levels_buffer); + self.def_levels_buffer = std::mem::take(&mut self.in_progress_def_levels_buffer); + self.rep_levels_buffer = std::mem::take(&mut self.in_progress_rep_levels_buffer); self.before_consume = false; Ok(array) @@ -214,7 +214,7 @@ where fn get_def_levels(&self) -> Option<&[i16]> { if self.before_consume { - self.need_consume_def_levels_buffer.as_deref() + self.in_progress_def_levels_buffer.as_deref() } else { self.def_levels_buffer.as_deref() } @@ -222,7 +222,7 @@ where fn get_rep_levels(&self) -> Option<&[i16]> { if self.before_consume { - self.need_consume_rep_levels_buffer.as_deref() + self.in_progress_rep_levels_buffer.as_deref() } else { self.rep_levels_buffer.as_deref() } @@ -256,8 +256,8 @@ where column_desc, column_reader: None, converter, - need_consume_def_levels_buffer: None, - need_consume_rep_levels_buffer: None, + in_progress_def_levels_buffer: None, + in_progress_rep_levels_buffer: None, before_consume: true, _parquet_type_marker: PhantomData, _converter_marker: PhantomData,