diff --git a/parquet/src/arrow/arrow_reader.rs b/parquet/src/arrow/arrow_reader.rs index 726c200606f..fd68f4bc02d 100644 --- a/parquet/src/arrow/arrow_reader.rs +++ b/parquet/src/arrow/arrow_reader.rs @@ -420,8 +420,7 @@ mod tests { ParquetRecordBatchReader, RowSelection, }; use crate::arrow::buffer::converter::{ - BinaryArrayConverter, Converter, FixedSizeArrayConverter, FromConverter, - IntervalDayTimeArrayConverter, LargeUtf8ArrayConverter, Utf8ArrayConverter, + Converter, FixedSizeArrayConverter, IntervalDayTimeArrayConverter, }; use crate::arrow::schema::add_encoded_arrow_schema_to_metadata; use crate::arrow::{ArrowWriter, ProjectionMask}; @@ -517,29 +516,29 @@ mod tests { #[test] fn test_primitive_single_column_reader_test() { - run_single_column_reader_tests::( + run_single_column_reader_tests::( 2, ConvertedType::NONE, None, - &FromConverter::new(), + |vals| Arc::new(BooleanArray::from_iter(vals.iter().cloned())), &[Encoding::PLAIN, Encoding::RLE, Encoding::RLE_DICTIONARY], ); - run_single_column_reader_tests::( + run_single_column_reader_tests::( 2, ConvertedType::NONE, None, - &FromConverter::new(), + |vals| Arc::new(Int32Array::from_iter(vals.iter().cloned())), &[ Encoding::PLAIN, Encoding::RLE_DICTIONARY, Encoding::DELTA_BINARY_PACKED, ], ); - run_single_column_reader_tests::( + run_single_column_reader_tests::( 2, ConvertedType::NONE, None, - &FromConverter::new(), + |vals| Arc::new(Int64Array::from_iter(vals.iter().cloned())), &[ Encoding::PLAIN, Encoding::RLE_DICTIONARY, @@ -561,16 +560,11 @@ mod tests { #[test] fn test_fixed_length_binary_column_reader() { let converter = FixedSizeArrayConverter::new(20); - run_single_column_reader_tests::< - FixedLenByteArrayType, - FixedSizeBinaryArray, - FixedSizeArrayConverter, - RandFixedLenGen, - >( + run_single_column_reader_tests::( 20, ConvertedType::NONE, None, - &converter, + |vals| Arc::new(converter.convert(vals.to_vec()).unwrap()), &[Encoding::PLAIN, Encoding::RLE_DICTIONARY], ); } @@ -578,16 +572,11 @@ mod tests { #[test] fn test_interval_day_time_column_reader() { let converter = IntervalDayTimeArrayConverter {}; - run_single_column_reader_tests::< - FixedLenByteArrayType, - IntervalDayTimeArray, - IntervalDayTimeArrayConverter, - RandFixedLenGen, - >( + run_single_column_reader_tests::( 12, ConvertedType::INTERVAL, None, - &converter, + |vals| Arc::new(converter.convert(vals.to_vec()).unwrap()), &[Encoding::PLAIN, Encoding::RLE_DICTIONARY], ); } @@ -602,6 +591,12 @@ mod tests { #[test] fn test_utf8_single_column_reader_test() { + fn string_converter(vals: &[Option]) -> ArrayRef { + Arc::new(GenericStringArray::::from_iter(vals.iter().map(|x| { + x.as_ref().map(|b| std::str::from_utf8(b.data()).unwrap()) + }))) + } + let encodings = &[ Encoding::PLAIN, Encoding::RLE_DICTIONARY, @@ -609,46 +604,39 @@ mod tests { Encoding::DELTA_BYTE_ARRAY, ]; - let converter = BinaryArrayConverter {}; - run_single_column_reader_tests::< - ByteArrayType, - BinaryArray, - BinaryArrayConverter, - RandUtf8Gen, - >(2, ConvertedType::NONE, None, &converter, encodings); - - let utf8_converter = Utf8ArrayConverter {}; - run_single_column_reader_tests::< - ByteArrayType, - StringArray, - Utf8ArrayConverter, - RandUtf8Gen, - >(2, ConvertedType::UTF8, None, &utf8_converter, encodings); - - run_single_column_reader_tests::< - ByteArrayType, - StringArray, - Utf8ArrayConverter, - RandUtf8Gen, - >( + run_single_column_reader_tests::( + 2, + ConvertedType::NONE, + None, + |vals| { + Arc::new(BinaryArray::from_iter( + vals.iter().map(|x| x.as_ref().map(|x| x.data())), + )) + }, + encodings, + ); + + run_single_column_reader_tests::( + 2, + ConvertedType::UTF8, + None, + string_converter::, + encodings, + ); + + run_single_column_reader_tests::( 2, ConvertedType::UTF8, Some(ArrowDataType::Utf8), - &utf8_converter, + string_converter::, encodings, ); - let large_utf8_converter = LargeUtf8ArrayConverter {}; - run_single_column_reader_tests::< - ByteArrayType, - LargeStringArray, - LargeUtf8ArrayConverter, - RandUtf8Gen, - >( + run_single_column_reader_tests::( 2, ConvertedType::UTF8, Some(ArrowDataType::LargeUtf8), - &large_utf8_converter, + string_converter::, encodings, ); @@ -658,21 +646,21 @@ mod tests { let mut opts = TestOptions::new(2, 20, 15).with_null_percent(50); opts.encoding = *encoding; + let data_type = ArrowDataType::Dictionary( + Box::new(key.clone()), + Box::new(ArrowDataType::Utf8), + ); + // Cannot run full test suite as keys overflow, run small test instead - single_column_reader_test::< - ByteArrayType, - StringArray, - Utf8ArrayConverter, - RandUtf8Gen, - >( + single_column_reader_test::( opts, 2, ConvertedType::UTF8, - Some(ArrowDataType::Dictionary( - Box::new(key.clone()), - Box::new(ArrowDataType::Utf8), - )), - &utf8_converter, + Some(data_type.clone()), + move |vals| { + let vals = string_converter::(vals); + arrow::compute::cast(&vals, &data_type).unwrap() + }, ); } } @@ -687,37 +675,37 @@ mod tests { ]; for key in &key_types { - run_single_column_reader_tests::< - ByteArrayType, - StringArray, - Utf8ArrayConverter, - RandUtf8Gen, - >( + let data_type = ArrowDataType::Dictionary( + Box::new(key.clone()), + Box::new(ArrowDataType::Utf8), + ); + + run_single_column_reader_tests::( 2, ConvertedType::UTF8, - Some(ArrowDataType::Dictionary( - Box::new(key.clone()), - Box::new(ArrowDataType::Utf8), - )), - &utf8_converter, + Some(data_type.clone()), + move |vals| { + let vals = string_converter::(vals); + arrow::compute::cast(&vals, &data_type).unwrap() + }, encodings, ); // https://github.com/apache/arrow-rs/issues/1179 - // run_single_column_reader_tests::< - // ByteArrayType, - // LargeStringArray, - // LargeUtf8ArrayConverter, - // RandUtf8Gen, - // >( + // let data_type = ArrowDataType::Dictionary( + // Box::new(key.clone()), + // Box::new(ArrowDataType::LargeUtf8), + // ); + // + // run_single_column_reader_tests::( // 2, // ConvertedType::UTF8, - // Some(ArrowDataType::Dictionary( - // Box::new(key.clone()), - // Box::new(ArrowDataType::LargeUtf8), - // )), - // &large_utf8_converter, - // encodings + // Some(data_type.clone()), + // move |vals| { + // let vals = string_converter::(vals); + // arrow::compute::cast(&vals, &data_type).unwrap() + // }, + // encodings, // ); } } @@ -1004,17 +992,16 @@ mod tests { /// /// `rand_max` represents the maximum size of value to pass to to /// value generator - fn run_single_column_reader_tests( + fn run_single_column_reader_tests( rand_max: i32, converted_type: ConvertedType, arrow_type: Option, - converter: &C, + converter: F, encodings: &[Encoding], ) where T: DataType, G: RandGen, - A: Array + 'static, - C: Converter>, A> + 'static, + F: Fn(&[Option]) -> ArrayRef, { let mut all_options = vec![ // choose record_batch_batch (15) so batches cross row @@ -1090,12 +1077,12 @@ mod tests { ..opts.clone() }; - single_column_reader_test::( + single_column_reader_test::( opts, rand_max, converted_type, arrow_type.clone(), - converter, + &converter, ) } } @@ -1105,17 +1092,16 @@ mod tests { /// Create a parquet file and then read it using /// `ParquetFileArrowReader` using the parameters described in /// `opts`. - fn single_column_reader_test( + fn single_column_reader_test( opts: TestOptions, rand_max: i32, converted_type: ConvertedType, arrow_type: Option, - converter: &C, + converter: F, ) where T: DataType, G: RandGen, - A: Array + 'static, - C: Converter>, A> + 'static, + F: Fn(&[Option]) -> ArrayRef, { // Print out options to facilitate debugging failures on CI println!( @@ -1175,9 +1161,7 @@ mod tests { .unwrap(), ); - let arrow_field = arrow_type - .clone() - .map(|t| arrow::datatypes::Field::new("leaf", t, false)); + let arrow_field = arrow_type.map(|t| Field::new("leaf", t, false)); let mut file = tempfile::tempfile().unwrap(); @@ -1231,19 +1215,9 @@ mod tests { let batch = maybe_batch.unwrap().unwrap(); assert_eq!(end - total_read, batch.num_rows()); - let mut data = vec![]; - data.extend_from_slice(&expected_data[total_read..end]); + let a = converter(&expected_data[total_read..end]); + let b = Arc::clone(batch.column(0)); - let a = converter.convert(data).unwrap(); - let mut b = Arc::clone(batch.column(0)); - - if let Some(arrow_type) = arrow_type.as_ref() { - assert_eq!(b.data_type(), arrow_type); - if let ArrowDataType::Dictionary(_, v) = arrow_type { - assert_eq!(a.data_type(), v.as_ref()); - b = arrow::compute::cast(&b, v.as_ref()).unwrap() - } - } assert_eq!(a.data_type(), b.data_type()); assert_eq!(a.data(), b.data(), "{:#?} vs {:#?}", a.data(), b.data()); @@ -1282,12 +1256,12 @@ mod tests { def_levels: Option<&Vec>>, file: File, schema: TypePtr, - field: Option, + field: Option, opts: &TestOptions, ) -> Result { let mut writer_props = opts.writer_props(); if let Some(field) = field { - let arrow_schema = arrow::datatypes::Schema::new(vec![field]); + let arrow_schema = Schema::new(vec![field]); add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut writer_props); } diff --git a/parquet/src/arrow/buffer/converter.rs b/parquet/src/arrow/buffer/converter.rs index d8cbd256a46..806118e810c 100644 --- a/parquet/src/arrow/buffer/converter.rs +++ b/parquet/src/arrow/buffer/converter.rs @@ -27,10 +27,7 @@ use crate::errors::Result; use std::marker::PhantomData; #[cfg(test)] -use arrow::array::{ - BinaryArray, BinaryBuilder, LargeStringArray, LargeStringBuilder, StringArray, - StringBuilder, -}; +use arrow::array::{StringArray, StringBuilder}; /// A converter is used to consume record reader's content and convert it to arrow /// primitive array. @@ -211,47 +208,6 @@ impl Converter>, StringArray> for Utf8ArrayConverter { } } -#[cfg(test)] -pub struct LargeUtf8ArrayConverter {} - -#[cfg(test)] -impl Converter>, LargeStringArray> for LargeUtf8ArrayConverter { - fn convert(&self, source: Vec>) -> Result { - let data_size = source - .iter() - .map(|x| x.as_ref().map(|b| b.len()).unwrap_or(0)) - .sum(); - - let mut builder = LargeStringBuilder::with_capacity(source.len(), data_size); - for v in source { - match v { - Some(array) => builder.append_value(array.as_utf8()?), - None => builder.append_null(), - } - } - - Ok(builder.finish()) - } -} - -#[cfg(test)] -pub struct BinaryArrayConverter {} - -#[cfg(test)] -impl Converter>, BinaryArray> for BinaryArrayConverter { - fn convert(&self, source: Vec>) -> Result { - let mut builder = BinaryBuilder::new(source.len()); - for v in source { - match v { - Some(array) => builder.append_value(array.data()), - None => builder.append_null(), - } - } - - Ok(builder.finish()) - } -} - #[cfg(test)] pub type Utf8Converter = ArrayRefConverter>, StringArray, Utf8ArrayConverter>; @@ -284,35 +240,6 @@ pub type DecimalFixedLengthByteArrayConverter = ArrayRefConverter< pub type DecimalByteArrayConvert = ArrayRefConverter>, Decimal128Array, DecimalArrayConverter>; -#[cfg(test)] -pub struct FromConverter { - _source: PhantomData, - _dest: PhantomData, -} - -#[cfg(test)] -impl FromConverter -where - T: From, -{ - pub fn new() -> Self { - Self { - _source: PhantomData, - _dest: PhantomData, - } - } -} - -#[cfg(test)] -impl Converter for FromConverter -where - T: From, -{ - fn convert(&self, source: S) -> Result { - Ok(T::from(source)) - } -} - pub struct ArrayRefConverter { _source: PhantomData, _array: PhantomData,