From c7268f4021b1278c705d53e71b7ee122dea670fd Mon Sep 17 00:00:00 2001 From: liukun4515 Date: Mon, 25 Jul 2022 11:08:48 +0800 Subject: [PATCH] support read decimal from parquet binary type --- parquet/src/arrow/array_reader/builder.rs | 28 ++++++++------ parquet/src/arrow/arrow_reader.rs | 7 +++- parquet/src/arrow/buffer/converter.rs | 45 +++++++++++++++++------ parquet/src/arrow/schema.rs | 26 +++++++++++++ parquet/src/arrow/schema/primitive.rs | 6 ++- parquet/src/basic.rs | 16 ++++++++ 6 files changed, 101 insertions(+), 27 deletions(-) diff --git a/parquet/src/arrow/array_reader/builder.rs b/parquet/src/arrow/array_reader/builder.rs index f7954a91ae5..7a19d5fbc60 100644 --- a/parquet/src/arrow/array_reader/builder.rs +++ b/parquet/src/arrow/array_reader/builder.rs @@ -25,19 +25,11 @@ use crate::arrow::array_reader::{ ComplexObjectArrayReader, ListArrayReader, MapArrayReader, NullArrayReader, PrimitiveArrayReader, RowGroupCollection, StructArrayReader, }; -use crate::arrow::buffer::converter::{ - DecimalArrayConverter, DecimalConverter, FixedLenBinaryConverter, - FixedSizeArrayConverter, Int96ArrayConverter, Int96Converter, - IntervalDayTimeArrayConverter, IntervalDayTimeConverter, - IntervalYearMonthArrayConverter, IntervalYearMonthConverter, -}; +use crate::arrow::buffer::converter::{DecimalArrayConverter, DecimalByteArrayConvert, DecimalFixedLengthByteArrayConverter, FixedLenBinaryConverter, FixedSizeArrayConverter, Int96ArrayConverter, Int96Converter, IntervalDayTimeArrayConverter, IntervalDayTimeConverter, IntervalYearMonthArrayConverter, IntervalYearMonthConverter}; use crate::arrow::schema::{convert_schema, ParquetField, ParquetFieldType}; use crate::arrow::ProjectionMask; use crate::basic::Type as PhysicalType; -use crate::data_type::{ - BoolType, DoubleType, FixedLenByteArrayType, FloatType, Int32Type, Int64Type, - Int96Type, -}; +use crate::data_type::{BoolType, ByteArrayType, DoubleType, FixedLenByteArrayType, FloatType, Int32Type, Int64Type, Int96Type}; use crate::errors::Result; use crate::schema::types::{ColumnDescriptor, ColumnPath, SchemaDescPtr, Type}; @@ -233,6 +225,18 @@ fn build_primitive_reader( column_desc, arrow_type, ), + Some(DataType::Decimal(precision, scale)) => { + // read decimal data from parquet binary physical type + let convert = DecimalByteArrayConvert::new(DecimalArrayConverter::new(precision as i32, scale as i32)); + Ok(Box::new( + ComplexObjectArrayReader::::new( + page_iterator, + column_desc, + convert, + arrow_type + )? + )) + }, _ => make_byte_array_reader( page_iterator, column_desc, @@ -241,13 +245,13 @@ fn build_primitive_reader( }, PhysicalType::FIXED_LEN_BYTE_ARRAY => match field.arrow_type { DataType::Decimal(precision, scale) => { - let converter = DecimalConverter::new(DecimalArrayConverter::new( + let converter = DecimalFixedLengthByteArrayConverter::new(DecimalArrayConverter::new( precision as i32, scale as i32, )); Ok(Box::new(ComplexObjectArrayReader::< FixedLenByteArrayType, - DecimalConverter, + DecimalFixedLengthByteArrayConverter, >::new( page_iterator, column_desc, diff --git a/parquet/src/arrow/arrow_reader.rs b/parquet/src/arrow/arrow_reader.rs index 770477b025c..f0129b78053 100644 --- a/parquet/src/arrow/arrow_reader.rs +++ b/parquet/src/arrow/arrow_reader.rs @@ -719,7 +719,12 @@ mod tests { fn test_read_decimal_file() { use arrow::array::Decimal128Array; let testdata = arrow::util::test_util::parquet_test_data(); - let file_variants = vec![("fixed_length", 25), ("int32", 4), ("int64", 10)]; + let file_variants = vec![ + ("byte_array", 4), + ("fixed_length", 25), + ("int32", 4), + ("int64", 10), + ]; for (prefix, target_precision) in file_variants { let path = format!("{}/{}_decimal.parquet", testdata, prefix); let file = File::open(&path).unwrap(); diff --git a/parquet/src/arrow/buffer/converter.rs b/parquet/src/arrow/buffer/converter.rs index a841509e78c..93609308d2b 100644 --- a/parquet/src/arrow/buffer/converter.rs +++ b/parquet/src/arrow/buffer/converter.rs @@ -76,16 +76,6 @@ impl DecimalArrayConverter { pub fn new(precision: i32, scale: i32) -> Self { Self { precision, scale } } - - fn from_bytes_to_i128(b: &[u8]) -> i128 { - assert!(b.len() <= 16, "Decimal128Array supports only up to size 16"); - let first_bit = b[0] & 128u8 == 128u8; - let mut result = if first_bit { [255u8; 16] } else { [0u8; 16] }; - for (i, v) in b.iter().enumerate() { - result[i + (16 - b.len())] = *v; - } - i128::from_be_bytes(result) - } } impl Converter>, Decimal128Array> @@ -94,13 +84,41 @@ impl Converter>, Decimal128Array> fn convert(&self, source: Vec>) -> Result { let array = source .into_iter() - .map(|array| array.map(|array| Self::from_bytes_to_i128(array.data()))) + .map(|array| array.map(|array| from_bytes_to_i128(array.data()))) .collect::() .with_precision_and_scale(self.precision as usize, self.scale as usize)?; Ok(array) } } + +impl Converter>, Decimal128Array> for DecimalArrayConverter { + fn convert(&self, source: Vec>) -> Result { + let array = source + .into_iter() + .map(|array| array.map(|array| from_bytes_to_i128(array.data()))) + .collect::() + .with_precision_and_scale(self.precision as usize, self.scale as usize)?; + + Ok(array) + } +} + +// Convert the bytes array to i128. +// The endian of the input bytes array must be big-endian. +fn from_bytes_to_i128(b: &[u8]) -> i128 { + assert!(b.len() <= 16, "Decimal128Array supports only up to size 16"); + let first_bit = b[0] & 128u8 == 128u8; + let mut result = if first_bit { [255u8; 16] } else { [0u8; 16] }; + for (i, v) in b.iter().enumerate() { + result[i + (16 - b.len())] = *v; + } + // The bytes array are from parquet file and must be the big-endian. + // The endian is defined by parquet format, and the reference document + // https://github.com/apache/parquet-format/blob/54e53e5d7794d383529dd30746378f19a12afd58/src/main/thrift/parquet.thrift#L66 + i128::from_be_bytes(result) +} + /// An Arrow Interval converter, which reads the first 4 bytes of a Parquet interval, /// and interprets it as an i32 value representing the Arrow YearMonth value pub struct IntervalYearMonthArrayConverter {} @@ -272,12 +290,15 @@ pub type IntervalDayTimeConverter = ArrayRefConverter< IntervalDayTimeArrayConverter, >; -pub type DecimalConverter = ArrayRefConverter< +pub type DecimalFixedLengthByteArrayConverter = ArrayRefConverter< Vec>, Decimal128Array, DecimalArrayConverter, >; +pub type DecimalByteArrayConvert = + ArrayRefConverter>, Decimal128Array, DecimalArrayConverter>; + pub struct FromConverter { _source: PhantomData, _dest: PhantomData, diff --git a/parquet/src/arrow/schema.rs b/parquet/src/arrow/schema.rs index 1ff9c0f03c4..53d67d38041 100644 --- a/parquet/src/arrow/schema.rs +++ b/parquet/src/arrow/schema.rs @@ -531,6 +531,32 @@ mod tests { assert_eq!(&arrow_fields, converted_arrow_schema.fields()); } + #[test] + fn test_decimal_fields() { + let message_type = " + message test_schema { + REQUIRED INT32 decimal1 (DECIMAL(4,2)); + REQUIRED INT64 decimal2 (DECIMAL(12,2)); + REQUIRED FIXED_LEN_BYTE_ARRAY (16) decimal3 (DECIMAL(30,2)); + REQUIRED BYTE_ARRAY decimal4 (DECIMAL(33,2)); + } + "; + + let parquet_group_type = parse_message_type(message_type).unwrap(); + + let parquet_schema = SchemaDescriptor::new(Arc::new(parquet_group_type)); + let converted_arrow_schema = + parquet_to_arrow_schema(&parquet_schema, None).unwrap(); + + let arrow_fields = vec![ + Field::new("decimal1", DataType::Decimal(4,2), false), + Field::new("decimal2", DataType::Decimal(12,2), false), + Field::new("decimal3", DataType::Decimal(30,2), false), + Field::new("decimal4", DataType::Decimal(33,2), false), + ]; + assert_eq!(&arrow_fields, converted_arrow_schema.fields()); + } + #[test] fn test_byte_array_fields() { let message_type = " diff --git a/parquet/src/arrow/schema/primitive.rs b/parquet/src/arrow/schema/primitive.rs index 0cee5aa1e96..4bf6876d09d 100644 --- a/parquet/src/arrow/schema/primitive.rs +++ b/parquet/src/arrow/schema/primitive.rs @@ -94,7 +94,7 @@ fn from_parquet(parquet_type: &Type) -> Result { PhysicalType::INT96 => Ok(DataType::Timestamp(TimeUnit::Nanosecond, None)), PhysicalType::FLOAT => Ok(DataType::Float32), PhysicalType::DOUBLE => Ok(DataType::Float64), - PhysicalType::BYTE_ARRAY => from_byte_array(basic_info), + PhysicalType::BYTE_ARRAY => from_byte_array(basic_info, *precision, *scale), PhysicalType::FIXED_LEN_BYTE_ARRAY => { from_fixed_len_byte_array(basic_info, *scale, *precision, *type_length) } @@ -224,7 +224,7 @@ fn from_int64(info: &BasicTypeInfo, scale: i32, precision: i32) -> Result Result { +fn from_byte_array(info: &BasicTypeInfo, precision: i32, scale: i32 ) -> Result { match (info.logical_type(), info.converted_type()) { (Some(LogicalType::String), _) => Ok(DataType::Utf8), (Some(LogicalType::Json), _) => Ok(DataType::Binary), @@ -235,6 +235,8 @@ fn from_byte_array(info: &BasicTypeInfo) -> Result { (None, ConvertedType::BSON) => Ok(DataType::Binary), (None, ConvertedType::ENUM) => Ok(DataType::Binary), (None, ConvertedType::UTF8) => Ok(DataType::Utf8), + (Some(LogicalType::Decimal {precision, scale}), _) => Ok(DataType::Decimal(precision as usize, scale as usize)), + (None, ConvertedType::DECIMAL) => Ok(DataType::Decimal(precision as usize, scale as usize)), (logical, converted) => Err(arrow_err!( "Unable to convert parquet BYTE_ARRAY logical type {:?} or converted type {}", logical, diff --git a/parquet/src/basic.rs b/parquet/src/basic.rs index 2d8073e75fe..0cf1d5121b7 100644 --- a/parquet/src/basic.rs +++ b/parquet/src/basic.rs @@ -1059,6 +1059,7 @@ mod tests { assert_eq!(ConvertedType::JSON.to_string(), "JSON"); assert_eq!(ConvertedType::BSON.to_string(), "BSON"); assert_eq!(ConvertedType::INTERVAL.to_string(), "INTERVAL"); + assert_eq!(ConvertedType::DECIMAL.to_string(), "DECIMAL") } #[test] @@ -1153,6 +1154,10 @@ mod tests { ConvertedType::from(Some(parquet::ConvertedType::Interval)), ConvertedType::INTERVAL ); + assert_eq!( + ConvertedType::from(Some(parquet::ConvertedType::Decimal)), + ConvertedType::DECIMAL + ) } #[test] @@ -1244,6 +1249,10 @@ mod tests { Some(parquet::ConvertedType::Interval), ConvertedType::INTERVAL.into() ); + assert_eq!( + Some(parquet::ConvertedType::Decimal), + ConvertedType::DECIMAL.into() + ) } #[test] @@ -1409,6 +1418,13 @@ mod tests { .unwrap(), ConvertedType::INTERVAL ); + assert_eq!( + ConvertedType::DECIMAL + .to_string() + .parse::() + .unwrap(), + ConvertedType::DECIMAL + ) } #[test]