From a901927e46083b3915b57475df1d802d9806f68c Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> Date: Thu, 23 Jun 2022 21:51:30 +0100 Subject: [PATCH] Split up parquet::arrow::array_reader (#1483) (#1933) * Split up parquet::arrow::array_reader * RAT --- .../array_reader/complex_object_array.rs | 532 ++++++ parquet/src/arrow/array_reader/mod.rs | 1495 +---------------- parquet/src/arrow/array_reader/null_array.rs | 106 ++ .../src/arrow/array_reader/primitive_array.rs | 613 +++++++ .../src/arrow/array_reader/struct_array.rs | 294 ++++ 5 files changed, 1559 insertions(+), 1481 deletions(-) create mode 100644 parquet/src/arrow/array_reader/complex_object_array.rs create mode 100644 parquet/src/arrow/array_reader/null_array.rs create mode 100644 parquet/src/arrow/array_reader/primitive_array.rs create mode 100644 parquet/src/arrow/array_reader/struct_array.rs diff --git a/parquet/src/arrow/array_reader/complex_object_array.rs b/parquet/src/arrow/array_reader/complex_object_array.rs new file mode 100644 index 00000000000..b91fde5c427 --- /dev/null +++ b/parquet/src/arrow/array_reader/complex_object_array.rs @@ -0,0 +1,532 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::arrow::array_reader::ArrayReader; +use crate::arrow::buffer::converter::Converter; +use crate::arrow::schema::parquet_to_arrow_field; +use crate::column::page::PageIterator; +use crate::column::reader::ColumnReaderImpl; +use crate::data_type::DataType; +use crate::errors::Result; +use crate::schema::types::ColumnDescPtr; +use arrow::array::ArrayRef; +use arrow::datatypes::DataType as ArrowType; +use std::any::Any; +use std::marker::PhantomData; + +/// Primitive array readers are leaves of array reader tree. They accept page iterator +/// and read them into primitive arrays. +pub struct ComplexObjectArrayReader +where + T: DataType, + C: Converter>, ArrayRef> + 'static, +{ + data_type: ArrowType, + pages: Box, + def_levels_buffer: Option>, + rep_levels_buffer: Option>, + column_desc: ColumnDescPtr, + column_reader: Option>, + converter: C, + _parquet_type_marker: PhantomData, + _converter_marker: PhantomData, +} + +impl ArrayReader for ComplexObjectArrayReader +where + T: DataType, + C: Converter>, ArrayRef> + Send + 'static, +{ + fn as_any(&self) -> &dyn Any { + self + } + + fn get_data_type(&self) -> &ArrowType { + &self.data_type + } + + fn next_batch(&mut self, batch_size: usize) -> Result { + // Try to initialize column reader + if self.column_reader.is_none() { + self.next_column_reader()?; + } + + let mut data_buffer: Vec = Vec::with_capacity(batch_size); + data_buffer.resize_with(batch_size, T::T::default); + + let mut def_levels_buffer = if self.column_desc.max_def_level() > 0 { + let mut buf: Vec = Vec::with_capacity(batch_size); + buf.resize_with(batch_size, || 0); + Some(buf) + } else { + None + }; + + let mut rep_levels_buffer = if self.column_desc.max_rep_level() > 0 { + let mut buf: Vec = Vec::with_capacity(batch_size); + buf.resize_with(batch_size, || 0); + Some(buf) + } else { + None + }; + + let mut num_read = 0; + + while self.column_reader.is_some() && num_read < batch_size { + let num_to_read = batch_size - num_read; + let cur_data_buf = &mut data_buffer[num_read..]; + let cur_def_levels_buf = + def_levels_buffer.as_mut().map(|b| &mut b[num_read..]); + let cur_rep_levels_buf = + rep_levels_buffer.as_mut().map(|b| &mut b[num_read..]); + let (data_read, levels_read) = + self.column_reader.as_mut().unwrap().read_batch( + num_to_read, + cur_def_levels_buf, + cur_rep_levels_buf, + cur_data_buf, + )?; + + // Fill space + if levels_read > data_read { + def_levels_buffer.iter().for_each(|def_levels_buffer| { + let (mut level_pos, mut data_pos) = (levels_read, data_read); + while level_pos > 0 && data_pos > 0 { + if def_levels_buffer[num_read + level_pos - 1] + == self.column_desc.max_def_level() + { + cur_data_buf.swap(level_pos - 1, data_pos - 1); + level_pos -= 1; + data_pos -= 1; + } else { + level_pos -= 1; + } + } + }); + } + + let values_read = levels_read.max(data_read); + num_read += values_read; + // current page exhausted && page iterator exhausted + if values_read < num_to_read && !self.next_column_reader()? { + break; + } + } + + data_buffer.truncate(num_read); + def_levels_buffer + .iter_mut() + .for_each(|buf| buf.truncate(num_read)); + rep_levels_buffer + .iter_mut() + .for_each(|buf| buf.truncate(num_read)); + + self.def_levels_buffer = def_levels_buffer; + self.rep_levels_buffer = rep_levels_buffer; + + let data: Vec> = if self.def_levels_buffer.is_some() { + data_buffer + .into_iter() + .zip(self.def_levels_buffer.as_ref().unwrap().iter()) + .map(|(t, def_level)| { + if *def_level == self.column_desc.max_def_level() { + Some(t) + } else { + None + } + }) + .collect() + } else { + data_buffer.into_iter().map(Some).collect() + }; + + let mut array = self.converter.convert(data)?; + + if let ArrowType::Dictionary(_, _) = self.data_type { + array = arrow::compute::cast(&array, &self.data_type)?; + } + + Ok(array) + } + + fn get_def_levels(&self) -> Option<&[i16]> { + self.def_levels_buffer.as_deref() + } + + fn get_rep_levels(&self) -> Option<&[i16]> { + self.rep_levels_buffer.as_deref() + } +} + +impl ComplexObjectArrayReader +where + T: DataType, + C: Converter>, ArrayRef> + 'static, +{ + pub fn new( + pages: Box, + column_desc: ColumnDescPtr, + converter: C, + arrow_type: Option, + ) -> Result { + let data_type = match arrow_type { + Some(t) => t, + None => parquet_to_arrow_field(column_desc.as_ref())? + .data_type() + .clone(), + }; + + Ok(Self { + data_type, + pages, + def_levels_buffer: None, + rep_levels_buffer: None, + column_desc, + column_reader: None, + converter, + _parquet_type_marker: PhantomData, + _converter_marker: PhantomData, + }) + } + + fn next_column_reader(&mut self) -> Result { + Ok(match self.pages.next() { + Some(page) => { + self.column_reader = + Some(ColumnReaderImpl::::new(self.column_desc.clone(), page?)); + true + } + None => false, + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::arrow::buffer::converter::{Utf8ArrayConverter, Utf8Converter}; + use crate::basic::Encoding; + use crate::column::page::Page; + use crate::data_type::{ByteArray, ByteArrayType}; + use crate::schema::parser::parse_message_type; + use crate::schema::types::SchemaDescriptor; + use crate::util::{DataPageBuilder, DataPageBuilderImpl, InMemoryPageIterator}; + use arrow::array::StringArray; + use rand::{thread_rng, Rng}; + use std::sync::Arc; + + #[test] + fn test_complex_array_reader_no_pages() { + let message_type = " + message test_schema { + REPEATED Group test_mid { + OPTIONAL BYTE_ARRAY leaf (UTF8); + } + } + "; + let schema = parse_message_type(message_type) + .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t)))) + .unwrap(); + let column_desc = schema.column(0); + let pages: Vec> = Vec::new(); + let page_iterator = InMemoryPageIterator::new(schema, column_desc.clone(), pages); + + let converter = Utf8Converter::new(Utf8ArrayConverter {}); + let mut array_reader = + ComplexObjectArrayReader::::new( + Box::new(page_iterator), + column_desc, + converter, + None, + ) + .unwrap(); + + let values_per_page = 100; // this value is arbitrary in this test - the result should always be an array of 0 length + let array = array_reader.next_batch(values_per_page).unwrap(); + assert_eq!(array.len(), 0); + } + + #[test] + fn test_complex_array_reader_def_and_rep_levels() { + // Construct column schema + let message_type = " + message test_schema { + REPEATED Group test_mid { + OPTIONAL BYTE_ARRAY leaf (UTF8); + } + } + "; + let num_pages = 2; + let values_per_page = 100; + let str_base = "Hello World"; + + let schema = parse_message_type(message_type) + .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t)))) + .unwrap(); + + let max_def_level = schema.column(0).max_def_level(); + let max_rep_level = schema.column(0).max_rep_level(); + + assert_eq!(max_def_level, 2); + assert_eq!(max_rep_level, 1); + + let mut rng = thread_rng(); + let column_desc = schema.column(0); + let mut pages: Vec> = Vec::new(); + + let mut rep_levels = Vec::with_capacity(num_pages * values_per_page); + let mut def_levels = Vec::with_capacity(num_pages * values_per_page); + let mut all_values = Vec::with_capacity(num_pages * values_per_page); + + for i in 0..num_pages { + let mut values = Vec::with_capacity(values_per_page); + + for _ in 0..values_per_page { + let def_level = rng.gen_range(0..max_def_level + 1); + let rep_level = rng.gen_range(0..max_rep_level + 1); + if def_level == max_def_level { + let len = rng.gen_range(1..str_base.len()); + let slice = &str_base[..len]; + values.push(ByteArray::from(slice)); + all_values.push(Some(slice.to_string())); + } else { + all_values.push(None) + } + rep_levels.push(rep_level); + def_levels.push(def_level) + } + + let range = i * values_per_page..(i + 1) * values_per_page; + let mut pb = + DataPageBuilderImpl::new(column_desc.clone(), values.len() as u32, true); + + pb.add_rep_levels(max_rep_level, &rep_levels.as_slice()[range.clone()]); + pb.add_def_levels(max_def_level, &def_levels.as_slice()[range]); + pb.add_values::(Encoding::PLAIN, values.as_slice()); + + let data_page = pb.consume(); + pages.push(vec![data_page]); + } + + let page_iterator = InMemoryPageIterator::new(schema, column_desc.clone(), pages); + + let converter = Utf8Converter::new(Utf8ArrayConverter {}); + let mut array_reader = + ComplexObjectArrayReader::::new( + Box::new(page_iterator), + column_desc, + converter, + None, + ) + .unwrap(); + + let mut accu_len: usize = 0; + + let array = array_reader.next_batch(values_per_page / 2).unwrap(); + assert_eq!(array.len(), values_per_page / 2); + assert_eq!( + Some(&def_levels[accu_len..(accu_len + array.len())]), + array_reader.get_def_levels() + ); + assert_eq!( + Some(&rep_levels[accu_len..(accu_len + array.len())]), + array_reader.get_rep_levels() + ); + accu_len += array.len(); + + // Read next values_per_page values, the first values_per_page/2 ones are from the first column chunk, + // and the last values_per_page/2 ones are from the second column chunk + let array = array_reader.next_batch(values_per_page).unwrap(); + assert_eq!(array.len(), values_per_page); + assert_eq!( + Some(&def_levels[accu_len..(accu_len + array.len())]), + array_reader.get_def_levels() + ); + assert_eq!( + Some(&rep_levels[accu_len..(accu_len + array.len())]), + array_reader.get_rep_levels() + ); + let strings = array.as_any().downcast_ref::().unwrap(); + for i in 0..array.len() { + if array.is_valid(i) { + assert_eq!( + all_values[i + accu_len].as_ref().unwrap().as_str(), + strings.value(i) + ) + } else { + assert_eq!(all_values[i + accu_len], None) + } + } + accu_len += array.len(); + + // Try to read values_per_page values, however there are only values_per_page/2 values + let array = array_reader.next_batch(values_per_page).unwrap(); + assert_eq!(array.len(), values_per_page / 2); + assert_eq!( + Some(&def_levels[accu_len..(accu_len + array.len())]), + array_reader.get_def_levels() + ); + assert_eq!( + Some(&rep_levels[accu_len..(accu_len + array.len())]), + array_reader.get_rep_levels() + ); + } + + #[test] + fn test_complex_array_reader_dict_enc_string() { + use crate::encodings::encoding::{DictEncoder, Encoder}; + // Construct column schema + let message_type = " + message test_schema { + REPEATED Group test_mid { + OPTIONAL BYTE_ARRAY leaf (UTF8); + } + } + "; + let num_pages = 2; + let values_per_page = 100; + let str_base = "Hello World"; + + let schema = parse_message_type(message_type) + .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t)))) + .unwrap(); + let column_desc = schema.column(0); + let max_def_level = column_desc.max_def_level(); + let max_rep_level = column_desc.max_rep_level(); + + assert_eq!(max_def_level, 2); + assert_eq!(max_rep_level, 1); + + let mut rng = thread_rng(); + let mut pages: Vec> = Vec::new(); + + let mut rep_levels = Vec::with_capacity(num_pages * values_per_page); + let mut def_levels = Vec::with_capacity(num_pages * values_per_page); + let mut all_values = Vec::with_capacity(num_pages * values_per_page); + + for i in 0..num_pages { + let mut dict_encoder = DictEncoder::::new(column_desc.clone()); + // add data page + let mut values = Vec::with_capacity(values_per_page); + + for _ in 0..values_per_page { + let def_level = rng.gen_range(0..max_def_level + 1); + let rep_level = rng.gen_range(0..max_rep_level + 1); + if def_level == max_def_level { + let len = rng.gen_range(1..str_base.len()); + let slice = &str_base[..len]; + values.push(ByteArray::from(slice)); + all_values.push(Some(slice.to_string())); + } else { + all_values.push(None) + } + rep_levels.push(rep_level); + def_levels.push(def_level) + } + + let range = i * values_per_page..(i + 1) * values_per_page; + let mut pb = + DataPageBuilderImpl::new(column_desc.clone(), values.len() as u32, true); + pb.add_rep_levels(max_rep_level, &rep_levels.as_slice()[range.clone()]); + pb.add_def_levels(max_def_level, &def_levels.as_slice()[range]); + let _ = dict_encoder.put(&values); + let indices = dict_encoder + .write_indices() + .expect("write_indices() should be OK"); + pb.add_indices(indices); + let data_page = pb.consume(); + // for each page log num_values vs actual values in page + // println!("page num_values: {}, values.len(): {}", data_page.num_values(), values.len()); + // add dictionary page + let dict = dict_encoder + .write_dict() + .expect("write_dict() should be OK"); + let dict_page = Page::DictionaryPage { + buf: dict, + num_values: dict_encoder.num_entries() as u32, + encoding: Encoding::RLE_DICTIONARY, + is_sorted: false, + }; + pages.push(vec![dict_page, data_page]); + } + + let page_iterator = InMemoryPageIterator::new(schema, column_desc.clone(), pages); + let converter = Utf8Converter::new(Utf8ArrayConverter {}); + let mut array_reader = + ComplexObjectArrayReader::::new( + Box::new(page_iterator), + column_desc, + converter, + None, + ) + .unwrap(); + + let mut accu_len: usize = 0; + + // println!("---------- reading a batch of {} values ----------", values_per_page / 2); + let array = array_reader.next_batch(values_per_page / 2).unwrap(); + assert_eq!(array.len(), values_per_page / 2); + assert_eq!( + Some(&def_levels[accu_len..(accu_len + array.len())]), + array_reader.get_def_levels() + ); + assert_eq!( + Some(&rep_levels[accu_len..(accu_len + array.len())]), + array_reader.get_rep_levels() + ); + accu_len += array.len(); + + // Read next values_per_page values, the first values_per_page/2 ones are from the first column chunk, + // and the last values_per_page/2 ones are from the second column chunk + // println!("---------- reading a batch of {} values ----------", values_per_page); + let array = array_reader.next_batch(values_per_page).unwrap(); + assert_eq!(array.len(), values_per_page); + assert_eq!( + Some(&def_levels[accu_len..(accu_len + array.len())]), + array_reader.get_def_levels() + ); + assert_eq!( + Some(&rep_levels[accu_len..(accu_len + array.len())]), + array_reader.get_rep_levels() + ); + let strings = array.as_any().downcast_ref::().unwrap(); + for i in 0..array.len() { + if array.is_valid(i) { + assert_eq!( + all_values[i + accu_len].as_ref().unwrap().as_str(), + strings.value(i) + ) + } else { + assert_eq!(all_values[i + accu_len], None) + } + } + accu_len += array.len(); + + // Try to read values_per_page values, however there are only values_per_page/2 values + // println!("---------- reading a batch of {} values ----------", values_per_page); + let array = array_reader.next_batch(values_per_page).unwrap(); + assert_eq!(array.len(), values_per_page / 2); + assert_eq!( + Some(&def_levels[accu_len..(accu_len + array.len())]), + array_reader.get_def_levels() + ); + assert_eq!( + Some(&rep_levels[accu_len..(accu_len + array.len())]), + array_reader.get_rep_levels() + ); + } +} diff --git a/parquet/src/arrow/array_reader/mod.rs b/parquet/src/arrow/array_reader/mod.rs index 6207b377d13..dd65a3626b7 100644 --- a/parquet/src/arrow/array_reader/mod.rs +++ b/parquet/src/arrow/array_reader/mod.rs @@ -17,44 +17,29 @@ //! Logic for reading into arrow arrays +use crate::errors::Result; +use arrow::array::ArrayRef; +use arrow::datatypes::DataType as ArrowType; use std::any::Any; -use std::cmp::max; -use std::marker::PhantomData; -use std::result::Result::Ok; use std::sync::Arc; -use std::vec::Vec; -use arrow::array::{ - Array, ArrayData, ArrayDataBuilder, ArrayRef, BooleanArray, BooleanBufferBuilder, - DecimalArray, Int32Array, Int64Array, PrimitiveArray, StructArray, -}; -use arrow::buffer::Buffer; -use arrow::datatypes::{ - ArrowPrimitiveType, BooleanType as ArrowBooleanType, DataType as ArrowType, - Float32Type as ArrowFloat32Type, Float64Type as ArrowFloat64Type, - Int32Type as ArrowInt32Type, Int64Type as ArrowInt64Type, - UInt32Type as ArrowUInt32Type, UInt64Type as ArrowUInt64Type, -}; - -use crate::arrow::buffer::converter::Converter; -use crate::arrow::record_reader::buffer::{ScalarValue, ValuesBuffer}; -use crate::arrow::record_reader::{GenericRecordReader, RecordReader}; -use crate::arrow::schema::parquet_to_arrow_field; -use crate::basic::Type as PhysicalType; +use crate::arrow::record_reader::buffer::ValuesBuffer; +use crate::arrow::record_reader::GenericRecordReader; use crate::column::page::PageIterator; use crate::column::reader::decoder::ColumnValueDecoder; -use crate::column::reader::ColumnReaderImpl; -use crate::data_type::DataType; -use crate::errors::{ParquetError, ParquetError::ArrowError, Result}; use crate::file::reader::{FilePageIterator, FileReader}; -use crate::schema::types::{ColumnDescPtr, SchemaDescPtr}; +use crate::schema::types::SchemaDescPtr; mod builder; mod byte_array; mod byte_array_dictionary; +mod complex_object_array; mod empty_array; mod list_array; mod map_array; +mod null_array; +mod primitive_array; +mod struct_array; #[cfg(test)] mod test_util; @@ -62,8 +47,12 @@ mod test_util; pub use builder::build_array_reader; pub use byte_array::make_byte_array_reader; pub use byte_array_dictionary::make_byte_array_dictionary_reader; +pub use complex_object_array::ComplexObjectArrayReader; pub use list_array::ListArrayReader; pub use map_array::MapArrayReader; +pub use null_array::NullArrayReader; +pub use primitive_array::PrimitiveArrayReader; +pub use struct_array::StructArrayReader; /// Array reader reads parquet data into arrow array. pub trait ArrayReader: Send { @@ -152,1459 +141,3 @@ where } Ok(records_read) } - -/// A NullArrayReader reads Parquet columns stored as null int32s with an Arrow -/// NullArray type. -pub struct NullArrayReader -where - T: DataType, - T::T: ScalarValue, -{ - data_type: ArrowType, - pages: Box, - def_levels_buffer: Option, - rep_levels_buffer: Option, - column_desc: ColumnDescPtr, - record_reader: RecordReader, - _type_marker: PhantomData, -} - -impl NullArrayReader -where - T: DataType, - T::T: ScalarValue, -{ - /// Construct null array reader. - pub fn new(pages: Box, column_desc: ColumnDescPtr) -> Result { - let record_reader = RecordReader::::new(column_desc.clone()); - - Ok(Self { - data_type: ArrowType::Null, - pages, - def_levels_buffer: None, - rep_levels_buffer: None, - column_desc, - record_reader, - _type_marker: PhantomData, - }) - } -} - -/// Implementation of primitive array reader. -impl ArrayReader for NullArrayReader -where - T: DataType, - T::T: ScalarValue, -{ - fn as_any(&self) -> &dyn Any { - self - } - - /// Returns data type of primitive array. - fn get_data_type(&self) -> &ArrowType { - &self.data_type - } - - /// Reads at most `batch_size` records into array. - fn next_batch(&mut self, batch_size: usize) -> Result { - read_records(&mut self.record_reader, self.pages.as_mut(), batch_size)?; - - // convert to arrays - let array = arrow::array::NullArray::new(self.record_reader.num_values()); - - // save definition and repetition buffers - self.def_levels_buffer = self.record_reader.consume_def_levels()?; - self.rep_levels_buffer = self.record_reader.consume_rep_levels()?; - - // Must consume bitmap buffer - self.record_reader.consume_bitmap_buffer()?; - - self.record_reader.reset(); - Ok(Arc::new(array)) - } - - fn get_def_levels(&self) -> Option<&[i16]> { - self.def_levels_buffer - .as_ref() - .map(|buf| buf.typed_data()) - } - - fn get_rep_levels(&self) -> Option<&[i16]> { - self.rep_levels_buffer - .as_ref() - .map(|buf| buf.typed_data()) - } -} - -/// Primitive array readers are leaves of array reader tree. They accept page iterator -/// and read them into primitive arrays. -pub struct PrimitiveArrayReader -where - T: DataType, - T::T: ScalarValue, -{ - data_type: ArrowType, - pages: Box, - def_levels_buffer: Option, - rep_levels_buffer: Option, - column_desc: ColumnDescPtr, - record_reader: RecordReader, -} - -impl PrimitiveArrayReader -where - T: DataType, - T::T: ScalarValue, -{ - /// Construct primitive array reader. - pub fn new( - pages: Box, - column_desc: ColumnDescPtr, - arrow_type: Option, - ) -> Result { - Self::new_with_options(pages, column_desc, arrow_type, false) - } - - /// Construct primitive array reader with ability to only compute null mask and not - /// buffer level data - pub fn new_with_options( - pages: Box, - column_desc: ColumnDescPtr, - arrow_type: Option, - null_mask_only: bool, - ) -> Result { - // Check if Arrow type is specified, else create it from Parquet type - let data_type = match arrow_type { - Some(t) => t, - None => parquet_to_arrow_field(column_desc.as_ref())? - .data_type() - .clone(), - }; - - let record_reader = - RecordReader::::new_with_options(column_desc.clone(), null_mask_only); - - Ok(Self { - data_type, - pages, - def_levels_buffer: None, - rep_levels_buffer: None, - column_desc, - record_reader, - }) - } -} - -/// Implementation of primitive array reader. -impl ArrayReader for PrimitiveArrayReader -where - T: DataType, - T::T: ScalarValue, -{ - fn as_any(&self) -> &dyn Any { - self - } - - /// Returns data type of primitive array. - fn get_data_type(&self) -> &ArrowType { - &self.data_type - } - - /// Reads at most `batch_size` records into array. - fn next_batch(&mut self, batch_size: usize) -> Result { - read_records(&mut self.record_reader, self.pages.as_mut(), batch_size)?; - - let target_type = self.get_data_type().clone(); - let arrow_data_type = match T::get_physical_type() { - PhysicalType::BOOLEAN => ArrowBooleanType::DATA_TYPE, - PhysicalType::INT32 => { - match target_type { - ArrowType::UInt32 => { - // follow C++ implementation and use overflow/reinterpret cast from i32 to u32 which will map - // `i32::MIN..0` to `(i32::MAX as u32)..u32::MAX` - ArrowUInt32Type::DATA_TYPE - } - _ => ArrowInt32Type::DATA_TYPE, - } - } - PhysicalType::INT64 => { - match target_type { - ArrowType::UInt64 => { - // follow C++ implementation and use overflow/reinterpret cast from i64 to u64 which will map - // `i64::MIN..0` to `(i64::MAX as u64)..u64::MAX` - ArrowUInt64Type::DATA_TYPE - } - _ => ArrowInt64Type::DATA_TYPE, - } - } - PhysicalType::FLOAT => ArrowFloat32Type::DATA_TYPE, - PhysicalType::DOUBLE => ArrowFloat64Type::DATA_TYPE, - PhysicalType::INT96 - | PhysicalType::BYTE_ARRAY - | PhysicalType::FIXED_LEN_BYTE_ARRAY => { - unreachable!( - "PrimitiveArrayReaders don't support complex physical types" - ); - } - }; - - // Convert to arrays by using the Parquet physical type. - // The physical types are then cast to Arrow types if necessary - - let mut record_data = self.record_reader.consume_record_data()?; - - if T::get_physical_type() == PhysicalType::BOOLEAN { - let mut boolean_buffer = BooleanBufferBuilder::new(record_data.len()); - - for e in record_data.as_slice() { - boolean_buffer.append(*e > 0); - } - record_data = boolean_buffer.finish(); - } - - let array_data = ArrayDataBuilder::new(arrow_data_type) - .len(self.record_reader.num_values()) - .add_buffer(record_data) - .null_bit_buffer(self.record_reader.consume_bitmap_buffer()?); - - let array_data = unsafe { array_data.build_unchecked() }; - let array = match T::get_physical_type() { - PhysicalType::BOOLEAN => Arc::new(BooleanArray::from(array_data)) as ArrayRef, - PhysicalType::INT32 => { - Arc::new(PrimitiveArray::::from(array_data)) as ArrayRef - } - PhysicalType::INT64 => { - Arc::new(PrimitiveArray::::from(array_data)) as ArrayRef - } - PhysicalType::FLOAT => { - Arc::new(PrimitiveArray::::from(array_data)) as ArrayRef - } - PhysicalType::DOUBLE => { - Arc::new(PrimitiveArray::::from(array_data)) as ArrayRef - } - PhysicalType::INT96 - | PhysicalType::BYTE_ARRAY - | PhysicalType::FIXED_LEN_BYTE_ARRAY => { - unreachable!( - "PrimitiveArrayReaders don't support complex physical types" - ); - } - }; - - // cast to Arrow type - // We make a strong assumption here that the casts should be infallible. - // If the cast fails because of incompatible datatypes, then there might - // be a bigger problem with how Arrow schemas are converted to Parquet. - // - // As there is not always a 1:1 mapping between Arrow and Parquet, there - // are datatypes which we must convert explicitly. - // These are: - // - date64: we should cast int32 to date32, then date32 to date64. - let array = match target_type { - ArrowType::Date64 => { - // this is cheap as it internally reinterprets the data - let a = arrow::compute::cast(&array, &ArrowType::Date32)?; - arrow::compute::cast(&a, &target_type)? - } - ArrowType::Decimal(p, s) => { - let array = match array.data_type() { - ArrowType::Int32 => array - .as_any() - .downcast_ref::() - .unwrap() - .iter() - .map(|v| v.map(|v| v.into())) - .collect::(), - - ArrowType::Int64 => array - .as_any() - .downcast_ref::() - .unwrap() - .iter() - .map(|v| v.map(|v| v.into())) - .collect::(), - _ => { - return Err(ArrowError(format!( - "Cannot convert {:?} to decimal", - array.data_type() - ))) - } - } - .with_precision_and_scale(p, s)?; - - Arc::new(array) as ArrayRef - } - _ => arrow::compute::cast(&array, &target_type)?, - }; - - // save definition and repetition buffers - self.def_levels_buffer = self.record_reader.consume_def_levels()?; - self.rep_levels_buffer = self.record_reader.consume_rep_levels()?; - self.record_reader.reset(); - Ok(array) - } - - fn get_def_levels(&self) -> Option<&[i16]> { - self.def_levels_buffer - .as_ref() - .map(|buf| buf.typed_data()) - } - - fn get_rep_levels(&self) -> Option<&[i16]> { - self.rep_levels_buffer - .as_ref() - .map(|buf| buf.typed_data()) - } -} - -/// Primitive array readers are leaves of array reader tree. They accept page iterator -/// and read them into primitive arrays. -pub struct ComplexObjectArrayReader -where - T: DataType, - C: Converter>, ArrayRef> + 'static, -{ - data_type: ArrowType, - pages: Box, - def_levels_buffer: Option>, - rep_levels_buffer: Option>, - column_desc: ColumnDescPtr, - column_reader: Option>, - converter: C, - _parquet_type_marker: PhantomData, - _converter_marker: PhantomData, -} - -impl ArrayReader for ComplexObjectArrayReader -where - T: DataType, - C: Converter>, ArrayRef> + Send + 'static, -{ - fn as_any(&self) -> &dyn Any { - self - } - - fn get_data_type(&self) -> &ArrowType { - &self.data_type - } - - fn next_batch(&mut self, batch_size: usize) -> Result { - // Try to initialize column reader - if self.column_reader.is_none() { - self.next_column_reader()?; - } - - let mut data_buffer: Vec = Vec::with_capacity(batch_size); - data_buffer.resize_with(batch_size, T::T::default); - - let mut def_levels_buffer = if self.column_desc.max_def_level() > 0 { - let mut buf: Vec = Vec::with_capacity(batch_size); - buf.resize_with(batch_size, || 0); - Some(buf) - } else { - None - }; - - let mut rep_levels_buffer = if self.column_desc.max_rep_level() > 0 { - let mut buf: Vec = Vec::with_capacity(batch_size); - buf.resize_with(batch_size, || 0); - Some(buf) - } else { - None - }; - - let mut num_read = 0; - - while self.column_reader.is_some() && num_read < batch_size { - let num_to_read = batch_size - num_read; - let cur_data_buf = &mut data_buffer[num_read..]; - let cur_def_levels_buf = - def_levels_buffer.as_mut().map(|b| &mut b[num_read..]); - let cur_rep_levels_buf = - rep_levels_buffer.as_mut().map(|b| &mut b[num_read..]); - let (data_read, levels_read) = - self.column_reader.as_mut().unwrap().read_batch( - num_to_read, - cur_def_levels_buf, - cur_rep_levels_buf, - cur_data_buf, - )?; - - // Fill space - if levels_read > data_read { - def_levels_buffer.iter().for_each(|def_levels_buffer| { - let (mut level_pos, mut data_pos) = (levels_read, data_read); - while level_pos > 0 && data_pos > 0 { - if def_levels_buffer[num_read + level_pos - 1] - == self.column_desc.max_def_level() - { - cur_data_buf.swap(level_pos - 1, data_pos - 1); - level_pos -= 1; - data_pos -= 1; - } else { - level_pos -= 1; - } - } - }); - } - - let values_read = max(levels_read, data_read); - num_read += values_read; - // current page exhausted && page iterator exhausted - if values_read < num_to_read && !self.next_column_reader()? { - break; - } - } - - data_buffer.truncate(num_read); - def_levels_buffer - .iter_mut() - .for_each(|buf| buf.truncate(num_read)); - rep_levels_buffer - .iter_mut() - .for_each(|buf| buf.truncate(num_read)); - - self.def_levels_buffer = def_levels_buffer; - self.rep_levels_buffer = rep_levels_buffer; - - let data: Vec> = if self.def_levels_buffer.is_some() { - data_buffer - .into_iter() - .zip(self.def_levels_buffer.as_ref().unwrap().iter()) - .map(|(t, def_level)| { - if *def_level == self.column_desc.max_def_level() { - Some(t) - } else { - None - } - }) - .collect() - } else { - data_buffer.into_iter().map(Some).collect() - }; - - let mut array = self.converter.convert(data)?; - - if let ArrowType::Dictionary(_, _) = self.data_type { - array = arrow::compute::cast(&array, &self.data_type)?; - } - - Ok(array) - } - - fn get_def_levels(&self) -> Option<&[i16]> { - self.def_levels_buffer.as_deref() - } - - fn get_rep_levels(&self) -> Option<&[i16]> { - self.rep_levels_buffer.as_deref() - } -} - -impl ComplexObjectArrayReader -where - T: DataType, - C: Converter>, ArrayRef> + 'static, -{ - pub fn new( - pages: Box, - column_desc: ColumnDescPtr, - converter: C, - arrow_type: Option, - ) -> Result { - let data_type = match arrow_type { - Some(t) => t, - None => parquet_to_arrow_field(column_desc.as_ref())? - .data_type() - .clone(), - }; - - Ok(Self { - data_type, - pages, - def_levels_buffer: None, - rep_levels_buffer: None, - column_desc, - column_reader: None, - converter, - _parquet_type_marker: PhantomData, - _converter_marker: PhantomData, - }) - } - - fn next_column_reader(&mut self) -> Result { - Ok(match self.pages.next() { - Some(page) => { - self.column_reader = - Some(ColumnReaderImpl::::new(self.column_desc.clone(), page?)); - true - } - None => false, - }) - } -} - -/// Implementation of struct array reader. -pub struct StructArrayReader { - children: Vec>, - data_type: ArrowType, - struct_def_level: i16, - struct_rep_level: i16, - nullable: bool, -} - -impl StructArrayReader { - /// Construct struct array reader. - pub fn new( - data_type: ArrowType, - children: Vec>, - def_level: i16, - rep_level: i16, - nullable: bool, - ) -> Self { - Self { - data_type, - children, - struct_def_level: def_level, - struct_rep_level: rep_level, - nullable, - } - } -} - -impl ArrayReader for StructArrayReader { - fn as_any(&self) -> &dyn Any { - self - } - - /// Returns data type. - /// This must be a struct. - fn get_data_type(&self) -> &ArrowType { - &self.data_type - } - - /// Read `batch_size` struct records. - /// - /// Definition levels of struct array is calculated as following: - /// ```ignore - /// def_levels[i] = min(child1_def_levels[i], child2_def_levels[i], ..., - /// childn_def_levels[i]); - /// ``` - /// - /// Repetition levels of struct array is calculated as following: - /// ```ignore - /// rep_levels[i] = child1_rep_levels[i]; - /// ``` - /// - /// The null bitmap of struct array is calculated from def_levels: - /// ```ignore - /// null_bitmap[i] = (def_levels[i] >= self.def_level); - /// ``` - fn next_batch(&mut self, batch_size: usize) -> Result { - if self.children.is_empty() { - return Ok(Arc::new(StructArray::from(Vec::new()))); - } - - let children_array = self - .children - .iter_mut() - .map(|reader| reader.next_batch(batch_size)) - .collect::>>()?; - - // check that array child data has same size - let children_array_len = - children_array.first().map(|arr| arr.len()).ok_or_else(|| { - general_err!("Struct array reader should have at least one child!") - })?; - - let all_children_len_eq = children_array - .iter() - .all(|arr| arr.len() == children_array_len); - if !all_children_len_eq { - return Err(general_err!("Not all children array length are the same!")); - } - - // Now we can build array data - let mut array_data_builder = ArrayDataBuilder::new(self.data_type.clone()) - .len(children_array_len) - .child_data( - children_array - .iter() - .map(|x| x.data().clone()) - .collect::>(), - ); - - if self.nullable { - // calculate struct def level data - - // children should have consistent view of parent, only need to inspect first child - let def_levels = self.children[0] - .get_def_levels() - .expect("child with nullable parents must have definition level"); - - // calculate bitmap for current array - let mut bitmap_builder = BooleanBufferBuilder::new(children_array_len); - - match self.children[0].get_rep_levels() { - Some(rep_levels) => { - // Sanity check - assert_eq!(rep_levels.len(), def_levels.len()); - - for (rep_level, def_level) in rep_levels.iter().zip(def_levels) { - if rep_level > &self.struct_rep_level { - // Already handled by inner list - SKIP - continue; - } - bitmap_builder.append(*def_level >= self.struct_def_level) - } - } - None => { - for def_level in def_levels { - bitmap_builder.append(*def_level >= self.struct_def_level) - } - } - } - - if bitmap_builder.len() != children_array_len { - return Err(general_err!("Failed to decode level data for struct array")); - } - - array_data_builder = - array_data_builder.null_bit_buffer(Some(bitmap_builder.finish())); - } - - let array_data = unsafe { array_data_builder.build_unchecked() }; - Ok(Arc::new(StructArray::from(array_data))) - } - - fn get_def_levels(&self) -> Option<&[i16]> { - // Children definition levels should describe the same - // parent structure, so return first child's - self.children.first().and_then(|l| l.get_def_levels()) - } - - fn get_rep_levels(&self) -> Option<&[i16]> { - // Children definition levels should describe the same - // parent structure, so return first child's - self.children.first().and_then(|l| l.get_rep_levels()) - } -} - -#[cfg(test)] -mod tests { - use std::collections::VecDeque; - use std::sync::Arc; - - use rand::distributions::uniform::SampleUniform; - use rand::{thread_rng, Rng}; - - use crate::arrow::array_reader::test_util::InMemoryArrayReader; - use arrow::array::{ - Array, ArrayRef, ListArray, PrimitiveArray, StringArray, StructArray, - }; - use arrow::datatypes::{ - ArrowPrimitiveType, DataType as ArrowType, Date32Type as ArrowDate32, Field, - Int32Type as ArrowInt32, Int64Type as ArrowInt64, - Time32MillisecondType as ArrowTime32MillisecondArray, - Time64MicrosecondType as ArrowTime64MicrosecondArray, - TimestampMicrosecondType as ArrowTimestampMicrosecondType, - TimestampMillisecondType as ArrowTimestampMillisecondType, - }; - - use crate::arrow::buffer::converter::{Utf8ArrayConverter, Utf8Converter}; - use crate::basic::{Encoding, Type as PhysicalType}; - use crate::column::page::Page; - use crate::data_type::{ByteArray, ByteArrayType, DataType, Int32Type, Int64Type}; - use crate::schema::parser::parse_message_type; - use crate::schema::types::{ColumnDescPtr, SchemaDescriptor}; - use crate::util::test_common::make_pages; - use crate::util::test_common::page_util::{ - DataPageBuilder, DataPageBuilderImpl, InMemoryPageIterator, - }; - - use super::*; - - fn make_column_chunks( - column_desc: ColumnDescPtr, - encoding: Encoding, - num_levels: usize, - min_value: T::T, - max_value: T::T, - def_levels: &mut Vec, - rep_levels: &mut Vec, - values: &mut Vec, - page_lists: &mut Vec>, - use_v2: bool, - num_chunks: usize, - ) where - T::T: PartialOrd + SampleUniform + Copy, - { - for _i in 0..num_chunks { - let mut pages = VecDeque::new(); - let mut data = Vec::new(); - let mut page_def_levels = Vec::new(); - let mut page_rep_levels = Vec::new(); - - make_pages::( - column_desc.clone(), - encoding, - 1, - num_levels, - min_value, - max_value, - &mut page_def_levels, - &mut page_rep_levels, - &mut data, - &mut pages, - use_v2, - ); - - def_levels.append(&mut page_def_levels); - rep_levels.append(&mut page_rep_levels); - values.append(&mut data); - page_lists.push(Vec::from(pages)); - } - } - - #[test] - fn test_primitive_array_reader_empty_pages() { - // Construct column schema - let message_type = " - message test_schema { - REQUIRED INT32 leaf; - } - "; - - let schema = parse_message_type(message_type) - .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t)))) - .unwrap(); - - let column_desc = schema.column(0); - let page_iterator = test_util::EmptyPageIterator::new(schema); - - let mut array_reader = PrimitiveArrayReader::::new( - Box::new(page_iterator), - column_desc, - None, - ) - .unwrap(); - - // expect no values to be read - let array = array_reader.next_batch(50).unwrap(); - assert!(array.is_empty()); - } - - #[test] - fn test_primitive_array_reader_data() { - // Construct column schema - let message_type = " - message test_schema { - REQUIRED INT32 leaf; - } - "; - - let schema = parse_message_type(message_type) - .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t)))) - .unwrap(); - - let column_desc = schema.column(0); - - // Construct page iterator - { - let mut data = Vec::new(); - let mut page_lists = Vec::new(); - make_column_chunks::( - column_desc.clone(), - Encoding::PLAIN, - 100, - 1, - 200, - &mut Vec::new(), - &mut Vec::new(), - &mut data, - &mut page_lists, - true, - 2, - ); - let page_iterator = - InMemoryPageIterator::new(schema, column_desc.clone(), page_lists); - - let mut array_reader = PrimitiveArrayReader::::new( - Box::new(page_iterator), - column_desc, - None, - ) - .unwrap(); - - // Read first 50 values, which are all from the first column chunk - let array = array_reader.next_batch(50).unwrap(); - let array = array - .as_any() - .downcast_ref::>() - .unwrap(); - - assert_eq!( - &PrimitiveArray::::from(data[0..50].to_vec()), - array - ); - - // Read next 100 values, the first 50 ones are from the first column chunk, - // and the last 50 ones are from the second column chunk - let array = array_reader.next_batch(100).unwrap(); - let array = array - .as_any() - .downcast_ref::>() - .unwrap(); - - assert_eq!( - &PrimitiveArray::::from(data[50..150].to_vec()), - array - ); - - // Try to read 100 values, however there are only 50 values - let array = array_reader.next_batch(100).unwrap(); - let array = array - .as_any() - .downcast_ref::>() - .unwrap(); - - assert_eq!( - &PrimitiveArray::::from(data[150..200].to_vec()), - array - ); - } - } - - macro_rules! test_primitive_array_reader_one_type { - ($arrow_parquet_type:ty, $physical_type:expr, $converted_type_str:expr, $result_arrow_type:ty, $result_arrow_cast_type:ty, $result_primitive_type:ty) => {{ - let message_type = format!( - " - message test_schema {{ - REQUIRED {:?} leaf ({}); - }} - ", - $physical_type, $converted_type_str - ); - let schema = parse_message_type(&message_type) - .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t)))) - .unwrap(); - - let column_desc = schema.column(0); - - // Construct page iterator - { - let mut data = Vec::new(); - let mut page_lists = Vec::new(); - make_column_chunks::<$arrow_parquet_type>( - column_desc.clone(), - Encoding::PLAIN, - 100, - 1, - 200, - &mut Vec::new(), - &mut Vec::new(), - &mut data, - &mut page_lists, - true, - 2, - ); - let page_iterator = InMemoryPageIterator::new( - schema.clone(), - column_desc.clone(), - page_lists, - ); - let mut array_reader = PrimitiveArrayReader::<$arrow_parquet_type>::new( - Box::new(page_iterator), - column_desc.clone(), - None, - ) - .expect("Unable to get array reader"); - - let array = array_reader - .next_batch(50) - .expect("Unable to get batch from reader"); - - let result_data_type = <$result_arrow_type>::DATA_TYPE; - let array = array - .as_any() - .downcast_ref::>() - .expect( - format!( - "Unable to downcast {:?} to {:?}", - array.data_type(), - result_data_type - ) - .as_str(), - ); - - // create expected array as primitive, and cast to result type - let expected = PrimitiveArray::<$result_arrow_cast_type>::from( - data[0..50] - .iter() - .map(|x| *x as $result_primitive_type) - .collect::>(), - ); - let expected = Arc::new(expected) as ArrayRef; - let expected = arrow::compute::cast(&expected, &result_data_type) - .expect("Unable to cast expected array"); - assert_eq!(expected.data_type(), &result_data_type); - let expected = expected - .as_any() - .downcast_ref::>() - .expect( - format!( - "Unable to downcast expected {:?} to {:?}", - expected.data_type(), - result_data_type - ) - .as_str(), - ); - assert_eq!(expected, array); - } - }}; - } - - #[test] - fn test_primitive_array_reader_temporal_types() { - test_primitive_array_reader_one_type!( - Int32Type, - PhysicalType::INT32, - "DATE", - ArrowDate32, - ArrowInt32, - i32 - ); - test_primitive_array_reader_one_type!( - Int32Type, - PhysicalType::INT32, - "TIME_MILLIS", - ArrowTime32MillisecondArray, - ArrowInt32, - i32 - ); - test_primitive_array_reader_one_type!( - Int64Type, - PhysicalType::INT64, - "TIME_MICROS", - ArrowTime64MicrosecondArray, - ArrowInt64, - i64 - ); - test_primitive_array_reader_one_type!( - Int64Type, - PhysicalType::INT64, - "TIMESTAMP_MILLIS", - ArrowTimestampMillisecondType, - ArrowInt64, - i64 - ); - test_primitive_array_reader_one_type!( - Int64Type, - PhysicalType::INT64, - "TIMESTAMP_MICROS", - ArrowTimestampMicrosecondType, - ArrowInt64, - i64 - ); - } - - #[test] - fn test_primitive_array_reader_def_and_rep_levels() { - // Construct column schema - let message_type = " - message test_schema { - REPEATED Group test_mid { - OPTIONAL INT32 leaf; - } - } - "; - - let schema = parse_message_type(message_type) - .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t)))) - .unwrap(); - - let column_desc = schema.column(0); - - // Construct page iterator - { - let mut def_levels = Vec::new(); - let mut rep_levels = Vec::new(); - let mut page_lists = Vec::new(); - make_column_chunks::( - column_desc.clone(), - Encoding::PLAIN, - 100, - 1, - 200, - &mut def_levels, - &mut rep_levels, - &mut Vec::new(), - &mut page_lists, - true, - 2, - ); - - let page_iterator = - InMemoryPageIterator::new(schema, column_desc.clone(), page_lists); - - let mut array_reader = PrimitiveArrayReader::::new( - Box::new(page_iterator), - column_desc, - None, - ) - .unwrap(); - - let mut accu_len: usize = 0; - - // Read first 50 values, which are all from the first column chunk - let array = array_reader.next_batch(50).unwrap(); - assert_eq!( - Some(&def_levels[accu_len..(accu_len + array.len())]), - array_reader.get_def_levels() - ); - assert_eq!( - Some(&rep_levels[accu_len..(accu_len + array.len())]), - array_reader.get_rep_levels() - ); - accu_len += array.len(); - - // Read next 100 values, the first 50 ones are from the first column chunk, - // and the last 50 ones are from the second column chunk - let array = array_reader.next_batch(100).unwrap(); - assert_eq!( - Some(&def_levels[accu_len..(accu_len + array.len())]), - array_reader.get_def_levels() - ); - assert_eq!( - Some(&rep_levels[accu_len..(accu_len + array.len())]), - array_reader.get_rep_levels() - ); - accu_len += array.len(); - - // Try to read 100 values, however there are only 50 values - let array = array_reader.next_batch(100).unwrap(); - assert_eq!( - Some(&def_levels[accu_len..(accu_len + array.len())]), - array_reader.get_def_levels() - ); - assert_eq!( - Some(&rep_levels[accu_len..(accu_len + array.len())]), - array_reader.get_rep_levels() - ); - } - } - - #[test] - fn test_complex_array_reader_no_pages() { - let message_type = " - message test_schema { - REPEATED Group test_mid { - OPTIONAL BYTE_ARRAY leaf (UTF8); - } - } - "; - let schema = parse_message_type(message_type) - .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t)))) - .unwrap(); - let column_desc = schema.column(0); - let pages: Vec> = Vec::new(); - let page_iterator = InMemoryPageIterator::new(schema, column_desc.clone(), pages); - - let converter = Utf8Converter::new(Utf8ArrayConverter {}); - let mut array_reader = - ComplexObjectArrayReader::::new( - Box::new(page_iterator), - column_desc, - converter, - None, - ) - .unwrap(); - - let values_per_page = 100; // this value is arbitrary in this test - the result should always be an array of 0 length - let array = array_reader.next_batch(values_per_page).unwrap(); - assert_eq!(array.len(), 0); - } - - #[test] - fn test_complex_array_reader_def_and_rep_levels() { - // Construct column schema - let message_type = " - message test_schema { - REPEATED Group test_mid { - OPTIONAL BYTE_ARRAY leaf (UTF8); - } - } - "; - let num_pages = 2; - let values_per_page = 100; - let str_base = "Hello World"; - - let schema = parse_message_type(message_type) - .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t)))) - .unwrap(); - - let max_def_level = schema.column(0).max_def_level(); - let max_rep_level = schema.column(0).max_rep_level(); - - assert_eq!(max_def_level, 2); - assert_eq!(max_rep_level, 1); - - let mut rng = thread_rng(); - let column_desc = schema.column(0); - let mut pages: Vec> = Vec::new(); - - let mut rep_levels = Vec::with_capacity(num_pages * values_per_page); - let mut def_levels = Vec::with_capacity(num_pages * values_per_page); - let mut all_values = Vec::with_capacity(num_pages * values_per_page); - - for i in 0..num_pages { - let mut values = Vec::with_capacity(values_per_page); - - for _ in 0..values_per_page { - let def_level = rng.gen_range(0..max_def_level + 1); - let rep_level = rng.gen_range(0..max_rep_level + 1); - if def_level == max_def_level { - let len = rng.gen_range(1..str_base.len()); - let slice = &str_base[..len]; - values.push(ByteArray::from(slice)); - all_values.push(Some(slice.to_string())); - } else { - all_values.push(None) - } - rep_levels.push(rep_level); - def_levels.push(def_level) - } - - let range = i * values_per_page..(i + 1) * values_per_page; - let mut pb = - DataPageBuilderImpl::new(column_desc.clone(), values.len() as u32, true); - - pb.add_rep_levels(max_rep_level, &rep_levels.as_slice()[range.clone()]); - pb.add_def_levels(max_def_level, &def_levels.as_slice()[range]); - pb.add_values::(Encoding::PLAIN, values.as_slice()); - - let data_page = pb.consume(); - pages.push(vec![data_page]); - } - - let page_iterator = InMemoryPageIterator::new(schema, column_desc.clone(), pages); - - let converter = Utf8Converter::new(Utf8ArrayConverter {}); - let mut array_reader = - ComplexObjectArrayReader::::new( - Box::new(page_iterator), - column_desc, - converter, - None, - ) - .unwrap(); - - let mut accu_len: usize = 0; - - let array = array_reader.next_batch(values_per_page / 2).unwrap(); - assert_eq!(array.len(), values_per_page / 2); - assert_eq!( - Some(&def_levels[accu_len..(accu_len + array.len())]), - array_reader.get_def_levels() - ); - assert_eq!( - Some(&rep_levels[accu_len..(accu_len + array.len())]), - array_reader.get_rep_levels() - ); - accu_len += array.len(); - - // Read next values_per_page values, the first values_per_page/2 ones are from the first column chunk, - // and the last values_per_page/2 ones are from the second column chunk - let array = array_reader.next_batch(values_per_page).unwrap(); - assert_eq!(array.len(), values_per_page); - assert_eq!( - Some(&def_levels[accu_len..(accu_len + array.len())]), - array_reader.get_def_levels() - ); - assert_eq!( - Some(&rep_levels[accu_len..(accu_len + array.len())]), - array_reader.get_rep_levels() - ); - let strings = array.as_any().downcast_ref::().unwrap(); - for i in 0..array.len() { - if array.is_valid(i) { - assert_eq!( - all_values[i + accu_len].as_ref().unwrap().as_str(), - strings.value(i) - ) - } else { - assert_eq!(all_values[i + accu_len], None) - } - } - accu_len += array.len(); - - // Try to read values_per_page values, however there are only values_per_page/2 values - let array = array_reader.next_batch(values_per_page).unwrap(); - assert_eq!(array.len(), values_per_page / 2); - assert_eq!( - Some(&def_levels[accu_len..(accu_len + array.len())]), - array_reader.get_def_levels() - ); - assert_eq!( - Some(&rep_levels[accu_len..(accu_len + array.len())]), - array_reader.get_rep_levels() - ); - } - - #[test] - fn test_complex_array_reader_dict_enc_string() { - use crate::encodings::encoding::{DictEncoder, Encoder}; - // Construct column schema - let message_type = " - message test_schema { - REPEATED Group test_mid { - OPTIONAL BYTE_ARRAY leaf (UTF8); - } - } - "; - let num_pages = 2; - let values_per_page = 100; - let str_base = "Hello World"; - - let schema = parse_message_type(message_type) - .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t)))) - .unwrap(); - let column_desc = schema.column(0); - let max_def_level = column_desc.max_def_level(); - let max_rep_level = column_desc.max_rep_level(); - - assert_eq!(max_def_level, 2); - assert_eq!(max_rep_level, 1); - - let mut rng = thread_rng(); - let mut pages: Vec> = Vec::new(); - - let mut rep_levels = Vec::with_capacity(num_pages * values_per_page); - let mut def_levels = Vec::with_capacity(num_pages * values_per_page); - let mut all_values = Vec::with_capacity(num_pages * values_per_page); - - for i in 0..num_pages { - let mut dict_encoder = DictEncoder::::new(column_desc.clone()); - // add data page - let mut values = Vec::with_capacity(values_per_page); - - for _ in 0..values_per_page { - let def_level = rng.gen_range(0..max_def_level + 1); - let rep_level = rng.gen_range(0..max_rep_level + 1); - if def_level == max_def_level { - let len = rng.gen_range(1..str_base.len()); - let slice = &str_base[..len]; - values.push(ByteArray::from(slice)); - all_values.push(Some(slice.to_string())); - } else { - all_values.push(None) - } - rep_levels.push(rep_level); - def_levels.push(def_level) - } - - let range = i * values_per_page..(i + 1) * values_per_page; - let mut pb = - DataPageBuilderImpl::new(column_desc.clone(), values.len() as u32, true); - pb.add_rep_levels(max_rep_level, &rep_levels.as_slice()[range.clone()]); - pb.add_def_levels(max_def_level, &def_levels.as_slice()[range]); - let _ = dict_encoder.put(&values); - let indices = dict_encoder - .write_indices() - .expect("write_indices() should be OK"); - pb.add_indices(indices); - let data_page = pb.consume(); - // for each page log num_values vs actual values in page - // println!("page num_values: {}, values.len(): {}", data_page.num_values(), values.len()); - // add dictionary page - let dict = dict_encoder - .write_dict() - .expect("write_dict() should be OK"); - let dict_page = Page::DictionaryPage { - buf: dict, - num_values: dict_encoder.num_entries() as u32, - encoding: Encoding::RLE_DICTIONARY, - is_sorted: false, - }; - pages.push(vec![dict_page, data_page]); - } - - let page_iterator = InMemoryPageIterator::new(schema, column_desc.clone(), pages); - let converter = Utf8Converter::new(Utf8ArrayConverter {}); - let mut array_reader = - ComplexObjectArrayReader::::new( - Box::new(page_iterator), - column_desc, - converter, - None, - ) - .unwrap(); - - let mut accu_len: usize = 0; - - // println!("---------- reading a batch of {} values ----------", values_per_page / 2); - let array = array_reader.next_batch(values_per_page / 2).unwrap(); - assert_eq!(array.len(), values_per_page / 2); - assert_eq!( - Some(&def_levels[accu_len..(accu_len + array.len())]), - array_reader.get_def_levels() - ); - assert_eq!( - Some(&rep_levels[accu_len..(accu_len + array.len())]), - array_reader.get_rep_levels() - ); - accu_len += array.len(); - - // Read next values_per_page values, the first values_per_page/2 ones are from the first column chunk, - // and the last values_per_page/2 ones are from the second column chunk - // println!("---------- reading a batch of {} values ----------", values_per_page); - let array = array_reader.next_batch(values_per_page).unwrap(); - assert_eq!(array.len(), values_per_page); - assert_eq!( - Some(&def_levels[accu_len..(accu_len + array.len())]), - array_reader.get_def_levels() - ); - assert_eq!( - Some(&rep_levels[accu_len..(accu_len + array.len())]), - array_reader.get_rep_levels() - ); - let strings = array.as_any().downcast_ref::().unwrap(); - for i in 0..array.len() { - if array.is_valid(i) { - assert_eq!( - all_values[i + accu_len].as_ref().unwrap().as_str(), - strings.value(i) - ) - } else { - assert_eq!(all_values[i + accu_len], None) - } - } - accu_len += array.len(); - - // Try to read values_per_page values, however there are only values_per_page/2 values - // println!("---------- reading a batch of {} values ----------", values_per_page); - let array = array_reader.next_batch(values_per_page).unwrap(); - assert_eq!(array.len(), values_per_page / 2); - assert_eq!( - Some(&def_levels[accu_len..(accu_len + array.len())]), - array_reader.get_def_levels() - ); - assert_eq!( - Some(&rep_levels[accu_len..(accu_len + array.len())]), - array_reader.get_rep_levels() - ); - } - - #[test] - fn test_struct_array_reader() { - let array_1 = Arc::new(PrimitiveArray::::from(vec![1, 2, 3, 4, 5])); - let array_reader_1 = InMemoryArrayReader::new( - ArrowType::Int32, - array_1.clone(), - Some(vec![0, 1, 2, 3, 1]), - Some(vec![0, 1, 1, 1, 1]), - ); - - let array_2 = Arc::new(PrimitiveArray::::from(vec![5, 4, 3, 2, 1])); - let array_reader_2 = InMemoryArrayReader::new( - ArrowType::Int32, - array_2.clone(), - Some(vec![0, 1, 3, 1, 2]), - Some(vec![0, 1, 1, 1, 1]), - ); - - let struct_type = ArrowType::Struct(vec![ - Field::new("f1", array_1.data_type().clone(), true), - Field::new("f2", array_2.data_type().clone(), true), - ]); - - let mut struct_array_reader = StructArrayReader::new( - struct_type, - vec![Box::new(array_reader_1), Box::new(array_reader_2)], - 1, - 1, - true, - ); - - let struct_array = struct_array_reader.next_batch(5).unwrap(); - let struct_array = struct_array.as_any().downcast_ref::().unwrap(); - - assert_eq!(5, struct_array.len()); - assert_eq!( - vec![true, false, false, false, false], - (0..5) - .map(|idx| struct_array.data_ref().is_null(idx)) - .collect::>() - ); - assert_eq!( - Some(vec![0, 1, 2, 3, 1].as_slice()), - struct_array_reader.get_def_levels() - ); - assert_eq!( - Some(vec![0, 1, 1, 1, 1].as_slice()), - struct_array_reader.get_rep_levels() - ); - } - - #[test] - fn test_struct_array_reader_list() { - use arrow::datatypes::Int32Type; - // [ - // {foo: [1, 2, null], - // {foo: []}, - // {foo: null}, - // null, - // ] - - let expected_l = - Arc::new(ListArray::from_iter_primitive::(vec![ - Some(vec![Some(1), Some(2), None]), - Some(vec![]), - None, - None, - ])); - - let validity = Buffer::from([0b00000111]); - let struct_fields = vec![( - Field::new("foo", expected_l.data_type().clone(), true), - expected_l.clone() as ArrayRef, - )]; - let expected = StructArray::from((struct_fields, validity)); - - let array = Arc::new(Int32Array::from_iter(vec![ - Some(1), - Some(2), - None, - None, - None, - None, - ])); - let reader = InMemoryArrayReader::new( - ArrowType::Int32, - array, - Some(vec![4, 4, 3, 2, 1, 0]), - Some(vec![0, 1, 1, 0, 0, 0]), - ); - - let list_reader = ListArrayReader::::new( - Box::new(reader), - expected_l.data_type().clone(), - ArrowType::Int32, - 3, - 1, - true, - ); - - let mut struct_reader = StructArrayReader::new( - expected.data_type().clone(), - vec![Box::new(list_reader)], - 1, - 0, - true, - ); - - let actual = struct_reader.next_batch(1024).unwrap(); - let actual = actual.as_any().downcast_ref::().unwrap(); - assert_eq!(actual, &expected) - } -} diff --git a/parquet/src/arrow/array_reader/null_array.rs b/parquet/src/arrow/array_reader/null_array.rs new file mode 100644 index 00000000000..53ac0852fab --- /dev/null +++ b/parquet/src/arrow/array_reader/null_array.rs @@ -0,0 +1,106 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::arrow::array_reader::{read_records, ArrayReader}; +use crate::arrow::record_reader::buffer::ScalarValue; +use crate::arrow::record_reader::RecordReader; +use crate::column::page::PageIterator; +use crate::data_type::DataType; +use crate::errors::Result; +use crate::schema::types::ColumnDescPtr; +use arrow::array::ArrayRef; +use arrow::buffer::Buffer; +use arrow::datatypes::DataType as ArrowType; +use std::any::Any; +use std::sync::Arc; + +/// A NullArrayReader reads Parquet columns stored as null int32s with an Arrow +/// NullArray type. +pub struct NullArrayReader +where + T: DataType, + T::T: ScalarValue, +{ + data_type: ArrowType, + pages: Box, + def_levels_buffer: Option, + rep_levels_buffer: Option, + column_desc: ColumnDescPtr, + record_reader: RecordReader, +} + +impl NullArrayReader +where + T: DataType, + T::T: ScalarValue, +{ + /// Construct null array reader. + pub fn new(pages: Box, column_desc: ColumnDescPtr) -> Result { + let record_reader = RecordReader::::new(column_desc.clone()); + + Ok(Self { + data_type: ArrowType::Null, + pages, + def_levels_buffer: None, + rep_levels_buffer: None, + column_desc, + record_reader, + }) + } +} + +/// Implementation of primitive array reader. +impl ArrayReader for NullArrayReader +where + T: DataType, + T::T: ScalarValue, +{ + fn as_any(&self) -> &dyn Any { + self + } + + /// Returns data type of primitive array. + fn get_data_type(&self) -> &ArrowType { + &self.data_type + } + + /// Reads at most `batch_size` records into array. + fn next_batch(&mut self, batch_size: usize) -> Result { + read_records(&mut self.record_reader, self.pages.as_mut(), batch_size)?; + + // convert to arrays + let array = arrow::array::NullArray::new(self.record_reader.num_values()); + + // save definition and repetition buffers + self.def_levels_buffer = self.record_reader.consume_def_levels()?; + self.rep_levels_buffer = self.record_reader.consume_rep_levels()?; + + // Must consume bitmap buffer + self.record_reader.consume_bitmap_buffer()?; + + self.record_reader.reset(); + Ok(Arc::new(array)) + } + + fn get_def_levels(&self) -> Option<&[i16]> { + self.def_levels_buffer.as_ref().map(|buf| buf.typed_data()) + } + + fn get_rep_levels(&self) -> Option<&[i16]> { + self.rep_levels_buffer.as_ref().map(|buf| buf.typed_data()) + } +} diff --git a/parquet/src/arrow/array_reader/primitive_array.rs b/parquet/src/arrow/array_reader/primitive_array.rs new file mode 100644 index 00000000000..222b595c2e1 --- /dev/null +++ b/parquet/src/arrow/array_reader/primitive_array.rs @@ -0,0 +1,613 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::arrow::array_reader::{read_records, ArrayReader}; +use crate::arrow::record_reader::buffer::ScalarValue; +use crate::arrow::record_reader::RecordReader; +use crate::arrow::schema::parquet_to_arrow_field; +use crate::basic::Type as PhysicalType; +use crate::column::page::PageIterator; +use crate::data_type::DataType; +use crate::errors::{ParquetError, Result}; +use crate::schema::types::ColumnDescPtr; +use arrow::array::{ + ArrayDataBuilder, ArrayRef, BooleanArray, BooleanBufferBuilder, DecimalArray, + Float32Array, Float64Array, Int32Array, Int64Array, +}; +use arrow::buffer::Buffer; +use arrow::datatypes::DataType as ArrowType; +use std::any::Any; +use std::sync::Arc; + +/// Primitive array readers are leaves of array reader tree. They accept page iterator +/// and read them into primitive arrays. +pub struct PrimitiveArrayReader +where + T: DataType, + T::T: ScalarValue, +{ + data_type: ArrowType, + pages: Box, + def_levels_buffer: Option, + rep_levels_buffer: Option, + column_desc: ColumnDescPtr, + record_reader: RecordReader, +} + +impl PrimitiveArrayReader +where + T: DataType, + T::T: ScalarValue, +{ + /// Construct primitive array reader. + pub fn new( + pages: Box, + column_desc: ColumnDescPtr, + arrow_type: Option, + ) -> Result { + Self::new_with_options(pages, column_desc, arrow_type, false) + } + + /// Construct primitive array reader with ability to only compute null mask and not + /// buffer level data + pub fn new_with_options( + pages: Box, + column_desc: ColumnDescPtr, + arrow_type: Option, + null_mask_only: bool, + ) -> Result { + // Check if Arrow type is specified, else create it from Parquet type + let data_type = match arrow_type { + Some(t) => t, + None => parquet_to_arrow_field(column_desc.as_ref())? + .data_type() + .clone(), + }; + + let record_reader = + RecordReader::::new_with_options(column_desc.clone(), null_mask_only); + + Ok(Self { + data_type, + pages, + def_levels_buffer: None, + rep_levels_buffer: None, + column_desc, + record_reader, + }) + } +} + +/// Implementation of primitive array reader. +impl ArrayReader for PrimitiveArrayReader +where + T: DataType, + T::T: ScalarValue, +{ + fn as_any(&self) -> &dyn Any { + self + } + + /// Returns data type of primitive array. + fn get_data_type(&self) -> &ArrowType { + &self.data_type + } + + /// Reads at most `batch_size` records into array. + fn next_batch(&mut self, batch_size: usize) -> Result { + read_records(&mut self.record_reader, self.pages.as_mut(), batch_size)?; + + let target_type = self.get_data_type().clone(); + let arrow_data_type = match T::get_physical_type() { + PhysicalType::BOOLEAN => ArrowType::Boolean, + PhysicalType::INT32 => { + match target_type { + ArrowType::UInt32 => { + // follow C++ implementation and use overflow/reinterpret cast from i32 to u32 which will map + // `i32::MIN..0` to `(i32::MAX as u32)..u32::MAX` + ArrowType::UInt32 + } + _ => ArrowType::Int32, + } + } + PhysicalType::INT64 => { + match target_type { + ArrowType::UInt64 => { + // follow C++ implementation and use overflow/reinterpret cast from i64 to u64 which will map + // `i64::MIN..0` to `(i64::MAX as u64)..u64::MAX` + ArrowType::UInt64 + } + _ => ArrowType::Int64, + } + } + PhysicalType::FLOAT => ArrowType::Float32, + PhysicalType::DOUBLE => ArrowType::Float64, + PhysicalType::INT96 + | PhysicalType::BYTE_ARRAY + | PhysicalType::FIXED_LEN_BYTE_ARRAY => { + unreachable!( + "PrimitiveArrayReaders don't support complex physical types" + ); + } + }; + + // Convert to arrays by using the Parquet physical type. + // The physical types are then cast to Arrow types if necessary + + let mut record_data = self.record_reader.consume_record_data()?; + + if T::get_physical_type() == PhysicalType::BOOLEAN { + let mut boolean_buffer = BooleanBufferBuilder::new(record_data.len()); + + for e in record_data.as_slice() { + boolean_buffer.append(*e > 0); + } + record_data = boolean_buffer.finish(); + } + + let array_data = ArrayDataBuilder::new(arrow_data_type) + .len(self.record_reader.num_values()) + .add_buffer(record_data) + .null_bit_buffer(self.record_reader.consume_bitmap_buffer()?); + + let array_data = unsafe { array_data.build_unchecked() }; + let array = match T::get_physical_type() { + PhysicalType::BOOLEAN => Arc::new(BooleanArray::from(array_data)) as ArrayRef, + PhysicalType::INT32 => Arc::new(Int32Array::from(array_data)) as ArrayRef, + PhysicalType::INT64 => Arc::new(Int64Array::from(array_data)) as ArrayRef, + PhysicalType::FLOAT => Arc::new(Float32Array::from(array_data)) as ArrayRef, + PhysicalType::DOUBLE => Arc::new(Float64Array::from(array_data)) as ArrayRef, + PhysicalType::INT96 + | PhysicalType::BYTE_ARRAY + | PhysicalType::FIXED_LEN_BYTE_ARRAY => { + unreachable!( + "PrimitiveArrayReaders don't support complex physical types" + ); + } + }; + + // cast to Arrow type + // We make a strong assumption here that the casts should be infallible. + // If the cast fails because of incompatible datatypes, then there might + // be a bigger problem with how Arrow schemas are converted to Parquet. + // + // As there is not always a 1:1 mapping between Arrow and Parquet, there + // are datatypes which we must convert explicitly. + // These are: + // - date64: we should cast int32 to date32, then date32 to date64. + let array = match target_type { + ArrowType::Date64 => { + // this is cheap as it internally reinterprets the data + let a = arrow::compute::cast(&array, &ArrowType::Date32)?; + arrow::compute::cast(&a, &target_type)? + } + ArrowType::Decimal(p, s) => { + let array = match array.data_type() { + ArrowType::Int32 => array + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .map(|v| v.map(|v| v.into())) + .collect::(), + + ArrowType::Int64 => array + .as_any() + .downcast_ref::() + .unwrap() + .iter() + .map(|v| v.map(|v| v.into())) + .collect::(), + _ => { + return Err(arrow_err!( + "Cannot convert {:?} to decimal", + array.data_type() + )) + } + } + .with_precision_and_scale(p, s)?; + + Arc::new(array) as ArrayRef + } + _ => arrow::compute::cast(&array, &target_type)?, + }; + + // save definition and repetition buffers + self.def_levels_buffer = self.record_reader.consume_def_levels()?; + self.rep_levels_buffer = self.record_reader.consume_rep_levels()?; + self.record_reader.reset(); + Ok(array) + } + + fn get_def_levels(&self) -> Option<&[i16]> { + self.def_levels_buffer.as_ref().map(|buf| buf.typed_data()) + } + + fn get_rep_levels(&self) -> Option<&[i16]> { + self.rep_levels_buffer.as_ref().map(|buf| buf.typed_data()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::arrow::array_reader::test_util::EmptyPageIterator; + use crate::basic::Encoding; + use crate::column::page::Page; + use crate::data_type::Int32Type; + use crate::schema::parser::parse_message_type; + use crate::schema::types::SchemaDescriptor; + use crate::util::test_common::make_pages; + use crate::util::InMemoryPageIterator; + use arrow::array::PrimitiveArray; + use arrow::datatypes::ArrowPrimitiveType; + + use rand::distributions::uniform::SampleUniform; + use std::collections::VecDeque; + + fn make_column_chunks( + column_desc: ColumnDescPtr, + encoding: Encoding, + num_levels: usize, + min_value: T::T, + max_value: T::T, + def_levels: &mut Vec, + rep_levels: &mut Vec, + values: &mut Vec, + page_lists: &mut Vec>, + use_v2: bool, + num_chunks: usize, + ) where + T::T: PartialOrd + SampleUniform + Copy, + { + for _i in 0..num_chunks { + let mut pages = VecDeque::new(); + let mut data = Vec::new(); + let mut page_def_levels = Vec::new(); + let mut page_rep_levels = Vec::new(); + + make_pages::( + column_desc.clone(), + encoding, + 1, + num_levels, + min_value, + max_value, + &mut page_def_levels, + &mut page_rep_levels, + &mut data, + &mut pages, + use_v2, + ); + + def_levels.append(&mut page_def_levels); + rep_levels.append(&mut page_rep_levels); + values.append(&mut data); + page_lists.push(Vec::from(pages)); + } + } + + #[test] + fn test_primitive_array_reader_empty_pages() { + // Construct column schema + let message_type = " + message test_schema { + REQUIRED INT32 leaf; + } + "; + + let schema = parse_message_type(message_type) + .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t)))) + .unwrap(); + + let column_desc = schema.column(0); + let page_iterator = EmptyPageIterator::new(schema); + + let mut array_reader = PrimitiveArrayReader::::new( + Box::new(page_iterator), + column_desc, + None, + ) + .unwrap(); + + // expect no values to be read + let array = array_reader.next_batch(50).unwrap(); + assert!(array.is_empty()); + } + + #[test] + fn test_primitive_array_reader_data() { + // Construct column schema + let message_type = " + message test_schema { + REQUIRED INT32 leaf; + } + "; + + let schema = parse_message_type(message_type) + .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t)))) + .unwrap(); + + let column_desc = schema.column(0); + + // Construct page iterator + { + let mut data = Vec::new(); + let mut page_lists = Vec::new(); + make_column_chunks::( + column_desc.clone(), + Encoding::PLAIN, + 100, + 1, + 200, + &mut Vec::new(), + &mut Vec::new(), + &mut data, + &mut page_lists, + true, + 2, + ); + let page_iterator = + InMemoryPageIterator::new(schema, column_desc.clone(), page_lists); + + let mut array_reader = PrimitiveArrayReader::::new( + Box::new(page_iterator), + column_desc, + None, + ) + .unwrap(); + + // Read first 50 values, which are all from the first column chunk + let array = array_reader.next_batch(50).unwrap(); + let array = array.as_any().downcast_ref::().unwrap(); + + assert_eq!(&Int32Array::from(data[0..50].to_vec()), array); + + // Read next 100 values, the first 50 ones are from the first column chunk, + // and the last 50 ones are from the second column chunk + let array = array_reader.next_batch(100).unwrap(); + let array = array.as_any().downcast_ref::().unwrap(); + + assert_eq!(&Int32Array::from(data[50..150].to_vec()), array); + + // Try to read 100 values, however there are only 50 values + let array = array_reader.next_batch(100).unwrap(); + let array = array.as_any().downcast_ref::().unwrap(); + + assert_eq!(&Int32Array::from(data[150..200].to_vec()), array); + } + } + + macro_rules! test_primitive_array_reader_one_type { + ($arrow_parquet_type:ty, $physical_type:expr, $converted_type_str:expr, $result_arrow_type:ty, $result_arrow_cast_type:ty, $result_primitive_type:ty) => {{ + let message_type = format!( + " + message test_schema {{ + REQUIRED {:?} leaf ({}); + }} + ", + $physical_type, $converted_type_str + ); + let schema = parse_message_type(&message_type) + .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t)))) + .unwrap(); + + let column_desc = schema.column(0); + + // Construct page iterator + { + let mut data = Vec::new(); + let mut page_lists = Vec::new(); + make_column_chunks::<$arrow_parquet_type>( + column_desc.clone(), + Encoding::PLAIN, + 100, + 1, + 200, + &mut Vec::new(), + &mut Vec::new(), + &mut data, + &mut page_lists, + true, + 2, + ); + let page_iterator = InMemoryPageIterator::new( + schema.clone(), + column_desc.clone(), + page_lists, + ); + let mut array_reader = PrimitiveArrayReader::<$arrow_parquet_type>::new( + Box::new(page_iterator), + column_desc.clone(), + None, + ) + .expect("Unable to get array reader"); + + let array = array_reader + .next_batch(50) + .expect("Unable to get batch from reader"); + + let result_data_type = <$result_arrow_type>::DATA_TYPE; + let array = array + .as_any() + .downcast_ref::>() + .expect( + format!( + "Unable to downcast {:?} to {:?}", + array.data_type(), + result_data_type + ) + .as_str(), + ); + + // create expected array as primitive, and cast to result type + let expected = PrimitiveArray::<$result_arrow_cast_type>::from( + data[0..50] + .iter() + .map(|x| *x as $result_primitive_type) + .collect::>(), + ); + let expected = Arc::new(expected) as ArrayRef; + let expected = arrow::compute::cast(&expected, &result_data_type) + .expect("Unable to cast expected array"); + assert_eq!(expected.data_type(), &result_data_type); + let expected = expected + .as_any() + .downcast_ref::>() + .expect( + format!( + "Unable to downcast expected {:?} to {:?}", + expected.data_type(), + result_data_type + ) + .as_str(), + ); + assert_eq!(expected, array); + } + }}; + } + + #[test] + fn test_primitive_array_reader_temporal_types() { + test_primitive_array_reader_one_type!( + crate::data_type::Int32Type, + PhysicalType::INT32, + "DATE", + arrow::datatypes::Date32Type, + arrow::datatypes::Int32Type, + i32 + ); + test_primitive_array_reader_one_type!( + crate::data_type::Int32Type, + PhysicalType::INT32, + "TIME_MILLIS", + arrow::datatypes::Time32MillisecondType, + arrow::datatypes::Int32Type, + i32 + ); + test_primitive_array_reader_one_type!( + crate::data_type::Int64Type, + PhysicalType::INT64, + "TIME_MICROS", + arrow::datatypes::Time64MicrosecondType, + arrow::datatypes::Int64Type, + i64 + ); + test_primitive_array_reader_one_type!( + crate::data_type::Int64Type, + PhysicalType::INT64, + "TIMESTAMP_MILLIS", + arrow::datatypes::TimestampMillisecondType, + arrow::datatypes::Int64Type, + i64 + ); + test_primitive_array_reader_one_type!( + crate::data_type::Int64Type, + PhysicalType::INT64, + "TIMESTAMP_MICROS", + arrow::datatypes::TimestampMicrosecondType, + arrow::datatypes::Int64Type, + i64 + ); + } + + #[test] + fn test_primitive_array_reader_def_and_rep_levels() { + // Construct column schema + let message_type = " + message test_schema { + REPEATED Group test_mid { + OPTIONAL INT32 leaf; + } + } + "; + + let schema = parse_message_type(message_type) + .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t)))) + .unwrap(); + + let column_desc = schema.column(0); + + // Construct page iterator + { + let mut def_levels = Vec::new(); + let mut rep_levels = Vec::new(); + let mut page_lists = Vec::new(); + make_column_chunks::( + column_desc.clone(), + Encoding::PLAIN, + 100, + 1, + 200, + &mut def_levels, + &mut rep_levels, + &mut Vec::new(), + &mut page_lists, + true, + 2, + ); + + let page_iterator = + InMemoryPageIterator::new(schema, column_desc.clone(), page_lists); + + let mut array_reader = PrimitiveArrayReader::::new( + Box::new(page_iterator), + column_desc, + None, + ) + .unwrap(); + + let mut accu_len: usize = 0; + + // Read first 50 values, which are all from the first column chunk + let array = array_reader.next_batch(50).unwrap(); + assert_eq!( + Some(&def_levels[accu_len..(accu_len + array.len())]), + array_reader.get_def_levels() + ); + assert_eq!( + Some(&rep_levels[accu_len..(accu_len + array.len())]), + array_reader.get_rep_levels() + ); + accu_len += array.len(); + + // Read next 100 values, the first 50 ones are from the first column chunk, + // and the last 50 ones are from the second column chunk + let array = array_reader.next_batch(100).unwrap(); + assert_eq!( + Some(&def_levels[accu_len..(accu_len + array.len())]), + array_reader.get_def_levels() + ); + assert_eq!( + Some(&rep_levels[accu_len..(accu_len + array.len())]), + array_reader.get_rep_levels() + ); + accu_len += array.len(); + + // Try to read 100 values, however there are only 50 values + let array = array_reader.next_batch(100).unwrap(); + assert_eq!( + Some(&def_levels[accu_len..(accu_len + array.len())]), + array_reader.get_def_levels() + ); + assert_eq!( + Some(&rep_levels[accu_len..(accu_len + array.len())]), + array_reader.get_rep_levels() + ); + } + } +} diff --git a/parquet/src/arrow/array_reader/struct_array.rs b/parquet/src/arrow/array_reader/struct_array.rs new file mode 100644 index 00000000000..30824d74203 --- /dev/null +++ b/parquet/src/arrow/array_reader/struct_array.rs @@ -0,0 +1,294 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::arrow::array_reader::ArrayReader; +use crate::errors::{ParquetError, Result}; +use arrow::array::{ + ArrayData, ArrayDataBuilder, ArrayRef, BooleanBufferBuilder, StructArray, +}; +use arrow::datatypes::DataType as ArrowType; +use std::any::Any; +use std::sync::Arc; + +/// Implementation of struct array reader. +pub struct StructArrayReader { + children: Vec>, + data_type: ArrowType, + struct_def_level: i16, + struct_rep_level: i16, + nullable: bool, +} + +impl StructArrayReader { + /// Construct struct array reader. + pub fn new( + data_type: ArrowType, + children: Vec>, + def_level: i16, + rep_level: i16, + nullable: bool, + ) -> Self { + Self { + data_type, + children, + struct_def_level: def_level, + struct_rep_level: rep_level, + nullable, + } + } +} + +impl ArrayReader for StructArrayReader { + fn as_any(&self) -> &dyn Any { + self + } + + /// Returns data type. + /// This must be a struct. + fn get_data_type(&self) -> &ArrowType { + &self.data_type + } + + /// Read `batch_size` struct records. + /// + /// Definition levels of struct array is calculated as following: + /// ```ignore + /// def_levels[i] = min(child1_def_levels[i], child2_def_levels[i], ..., + /// childn_def_levels[i]); + /// ``` + /// + /// Repetition levels of struct array is calculated as following: + /// ```ignore + /// rep_levels[i] = child1_rep_levels[i]; + /// ``` + /// + /// The null bitmap of struct array is calculated from def_levels: + /// ```ignore + /// null_bitmap[i] = (def_levels[i] >= self.def_level); + /// ``` + fn next_batch(&mut self, batch_size: usize) -> Result { + if self.children.is_empty() { + return Ok(Arc::new(StructArray::from(Vec::new()))); + } + + let children_array = self + .children + .iter_mut() + .map(|reader| reader.next_batch(batch_size)) + .collect::>>()?; + + // check that array child data has same size + let children_array_len = + children_array.first().map(|arr| arr.len()).ok_or_else(|| { + general_err!("Struct array reader should have at least one child!") + })?; + + let all_children_len_eq = children_array + .iter() + .all(|arr| arr.len() == children_array_len); + if !all_children_len_eq { + return Err(general_err!("Not all children array length are the same!")); + } + + // Now we can build array data + let mut array_data_builder = ArrayDataBuilder::new(self.data_type.clone()) + .len(children_array_len) + .child_data( + children_array + .iter() + .map(|x| x.data().clone()) + .collect::>(), + ); + + if self.nullable { + // calculate struct def level data + + // children should have consistent view of parent, only need to inspect first child + let def_levels = self.children[0] + .get_def_levels() + .expect("child with nullable parents must have definition level"); + + // calculate bitmap for current array + let mut bitmap_builder = BooleanBufferBuilder::new(children_array_len); + + match self.children[0].get_rep_levels() { + Some(rep_levels) => { + // Sanity check + assert_eq!(rep_levels.len(), def_levels.len()); + + for (rep_level, def_level) in rep_levels.iter().zip(def_levels) { + if rep_level > &self.struct_rep_level { + // Already handled by inner list - SKIP + continue; + } + bitmap_builder.append(*def_level >= self.struct_def_level) + } + } + None => { + for def_level in def_levels { + bitmap_builder.append(*def_level >= self.struct_def_level) + } + } + } + + if bitmap_builder.len() != children_array_len { + return Err(general_err!("Failed to decode level data for struct array")); + } + + array_data_builder = + array_data_builder.null_bit_buffer(Some(bitmap_builder.finish())); + } + + let array_data = unsafe { array_data_builder.build_unchecked() }; + Ok(Arc::new(StructArray::from(array_data))) + } + + fn get_def_levels(&self) -> Option<&[i16]> { + // Children definition levels should describe the same + // parent structure, so return first child's + self.children.first().and_then(|l| l.get_def_levels()) + } + + fn get_rep_levels(&self) -> Option<&[i16]> { + // Children definition levels should describe the same + // parent structure, so return first child's + self.children.first().and_then(|l| l.get_rep_levels()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::arrow::array_reader::test_util::InMemoryArrayReader; + use crate::arrow::array_reader::ListArrayReader; + use arrow::array::{Array, Int32Array, ListArray}; + use arrow::buffer::Buffer; + use arrow::datatypes::Field; + + #[test] + fn test_struct_array_reader() { + let array_1 = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5])); + let array_reader_1 = InMemoryArrayReader::new( + ArrowType::Int32, + array_1.clone(), + Some(vec![0, 1, 2, 3, 1]), + Some(vec![0, 1, 1, 1, 1]), + ); + + let array_2 = Arc::new(Int32Array::from(vec![5, 4, 3, 2, 1])); + let array_reader_2 = InMemoryArrayReader::new( + ArrowType::Int32, + array_2.clone(), + Some(vec![0, 1, 3, 1, 2]), + Some(vec![0, 1, 1, 1, 1]), + ); + + let struct_type = ArrowType::Struct(vec![ + Field::new("f1", array_1.data_type().clone(), true), + Field::new("f2", array_2.data_type().clone(), true), + ]); + + let mut struct_array_reader = StructArrayReader::new( + struct_type, + vec![Box::new(array_reader_1), Box::new(array_reader_2)], + 1, + 1, + true, + ); + + let struct_array = struct_array_reader.next_batch(5).unwrap(); + let struct_array = struct_array.as_any().downcast_ref::().unwrap(); + + assert_eq!(5, struct_array.len()); + assert_eq!( + vec![true, false, false, false, false], + (0..5) + .map(|idx| struct_array.data_ref().is_null(idx)) + .collect::>() + ); + assert_eq!( + Some(vec![0, 1, 2, 3, 1].as_slice()), + struct_array_reader.get_def_levels() + ); + assert_eq!( + Some(vec![0, 1, 1, 1, 1].as_slice()), + struct_array_reader.get_rep_levels() + ); + } + + #[test] + fn test_struct_array_reader_list() { + use arrow::datatypes::Int32Type; + // [ + // {foo: [1, 2, null], + // {foo: []}, + // {foo: null}, + // null, + // ] + + let expected_l = + Arc::new(ListArray::from_iter_primitive::(vec![ + Some(vec![Some(1), Some(2), None]), + Some(vec![]), + None, + None, + ])); + + let validity = Buffer::from([0b00000111]); + let struct_fields = vec![( + Field::new("foo", expected_l.data_type().clone(), true), + expected_l.clone() as ArrayRef, + )]; + let expected = StructArray::from((struct_fields, validity)); + + let array = Arc::new(Int32Array::from_iter(vec![ + Some(1), + Some(2), + None, + None, + None, + None, + ])); + let reader = InMemoryArrayReader::new( + ArrowType::Int32, + array, + Some(vec![4, 4, 3, 2, 1, 0]), + Some(vec![0, 1, 1, 0, 0, 0]), + ); + + let list_reader = ListArrayReader::::new( + Box::new(reader), + expected_l.data_type().clone(), + ArrowType::Int32, + 3, + 1, + true, + ); + + let mut struct_reader = StructArrayReader::new( + expected.data_type().clone(), + vec![Box::new(list_reader)], + 1, + 0, + true, + ); + + let actual = struct_reader.next_batch(1024).unwrap(); + let actual = actual.as_any().downcast_ref::().unwrap(); + assert_eq!(actual, &expected) + } +}