From d586c864b109d8576e6de07aa2d8fd4513a8bd8f Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Fri, 29 Jul 2022 10:05:46 +0100 Subject: [PATCH] Increase test coverage of ArrowWriter --- parquet/src/arrow/arrow_writer/mod.rs | 253 ++++++++++++++++---------- 1 file changed, 161 insertions(+), 92 deletions(-) diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index 8a79a116f54..c94749dbbfb 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -756,7 +756,9 @@ mod tests { use arrow::{array::*, buffer::Buffer}; use crate::arrow::{ArrowReader, ParquetFileArrowReader}; + use crate::basic::Encoding; use crate::file::metadata::ParquetMetaData; + use crate::file::properties::WriterVersion; use crate::file::{ reader::{FileReader, SerializedFileReader}, statistics::Statistics, @@ -1226,17 +1228,31 @@ mod tests { const SMALL_SIZE: usize = 7; - fn roundtrip(expected_batch: RecordBatch, max_row_group_size: Option) -> File { + fn roundtrip( + expected_batch: RecordBatch, + max_row_group_size: Option, + ) -> Vec { + let mut files = vec![]; + for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] { + let mut props = WriterProperties::builder().set_writer_version(version); + + if let Some(size) = max_row_group_size { + props = props.set_max_row_group_size(size) + } + + let props = props.build(); + files.push(roundtrip_opts(&expected_batch, props)) + } + files + } + + fn roundtrip_opts(expected_batch: &RecordBatch, props: WriterProperties) -> File { let file = tempfile::tempfile().unwrap(); let mut writer = ArrowWriter::try_new( file.try_clone().unwrap(), expected_batch.schema(), - max_row_group_size.map(|size| { - WriterProperties::builder() - .set_max_row_group_size(size) - .build() - }), + Some(props), ) .expect("Unable to write file"); writer.write(&expected_batch).unwrap(); @@ -1264,20 +1280,59 @@ mod tests { file } - fn one_column_roundtrip( - values: ArrayRef, - nullable: bool, - max_row_group_size: Option, - ) -> File { - let schema = Schema::new(vec![Field::new( - "col", - values.data_type().clone(), - nullable, - )]); - let expected_batch = - RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap(); + fn one_column_roundtrip(values: ArrayRef, nullable: bool) -> Vec { + let data_type = values.data_type().clone(); + let schema = Schema::new(vec![Field::new("col", data_type, nullable)]); + one_column_roundtrip_with_schema(values, Arc::new(schema)) + } - roundtrip(expected_batch, max_row_group_size) + fn one_column_roundtrip_with_schema( + values: ArrayRef, + schema: SchemaRef, + ) -> Vec { + let encodings = match values.data_type() { + DataType::Utf8 + | DataType::LargeUtf8 + | DataType::Binary + | DataType::LargeBinary => vec![ + Encoding::PLAIN, + Encoding::DELTA_BYTE_ARRAY, + Encoding::DELTA_LENGTH_BYTE_ARRAY, + ], + DataType::Int64 + | DataType::Int32 + | DataType::Int16 + | DataType::Int8 + | DataType::UInt64 + | DataType::UInt32 + | DataType::UInt16 + | DataType::UInt8 => vec![Encoding::PLAIN, Encoding::DELTA_BINARY_PACKED], + _ => vec![Encoding::PLAIN], + }; + + let expected_batch = RecordBatch::try_new(schema, vec![values]).unwrap(); + + let row_group_sizes = [1024, SMALL_SIZE, SMALL_SIZE / 2, SMALL_SIZE / 2 + 1, 10]; + + let mut files = vec![]; + for dictionary_size in [0, 1, 1024] { + for encoding in &encodings { + for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] { + for row_group_size in row_group_sizes { + let props = WriterProperties::builder() + .set_writer_version(version) + .set_max_row_group_size(row_group_size) + .set_dictionary_enabled(dictionary_size != 0) + .set_dictionary_pagesize_limit(dictionary_size.max(1)) + .set_encoding(*encoding) + .build(); + + files.push(roundtrip_opts(&expected_batch, props)) + } + } + } + } + files } fn values_required(iter: I) @@ -1287,7 +1342,7 @@ mod tests { { let raw_values: Vec<_> = iter.into_iter().collect(); let values = Arc::new(A::from(raw_values)); - one_column_roundtrip(values, false, Some(SMALL_SIZE / 2)); + one_column_roundtrip(values, false); } fn values_optional(iter: I) @@ -1301,7 +1356,7 @@ mod tests { .map(|(i, v)| if i % 2 == 0 { None } else { Some(v) }) .collect(); let optional_values = Arc::new(A::from(optional_raw_values)); - one_column_roundtrip(optional_values, true, Some(SMALL_SIZE / 2)); + one_column_roundtrip(optional_values, true); } fn required_and_optional(iter: I) @@ -1316,12 +1371,12 @@ mod tests { #[test] fn all_null_primitive_single_column() { let values = Arc::new(Int32Array::from(vec![None; SMALL_SIZE])); - one_column_roundtrip(values, true, Some(SMALL_SIZE / 2)); + one_column_roundtrip(values, true); } #[test] fn null_single_column() { let values = Arc::new(NullArray::new(SMALL_SIZE)); - one_column_roundtrip(values, true, Some(SMALL_SIZE / 2)); + one_column_roundtrip(values, true); // null arrays are always nullable, a test with non-nullable nulls fails } @@ -1417,7 +1472,7 @@ mod tests { let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect(); let values = Arc::new(TimestampSecondArray::from_vec(raw_values, None)); - one_column_roundtrip(values, false, Some(3)); + one_column_roundtrip(values, false); } #[test] @@ -1425,7 +1480,7 @@ mod tests { let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect(); let values = Arc::new(TimestampMillisecondArray::from_vec(raw_values, None)); - one_column_roundtrip(values, false, Some(SMALL_SIZE / 2 + 1)); + one_column_roundtrip(values, false); } #[test] @@ -1433,7 +1488,7 @@ mod tests { let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect(); let values = Arc::new(TimestampMicrosecondArray::from_vec(raw_values, None)); - one_column_roundtrip(values, false, Some(SMALL_SIZE / 2 + 2)); + one_column_roundtrip(values, false); } #[test] @@ -1441,7 +1496,7 @@ mod tests { let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect(); let values = Arc::new(TimestampNanosecondArray::from_vec(raw_values, None)); - one_column_roundtrip(values, false, Some(SMALL_SIZE / 2)); + one_column_roundtrip(values, false); } #[test] @@ -1548,7 +1603,7 @@ mod tests { builder.append_value(b"1112").unwrap(); let array = Arc::new(builder.finish()); - one_column_roundtrip(array, true, Some(SMALL_SIZE / 2)); + one_column_roundtrip(array, true); } #[test] @@ -1626,7 +1681,7 @@ mod tests { let a = ListArray::from(a_list_data); let values = Arc::new(a); - one_column_roundtrip(values, true, Some(SMALL_SIZE / 2)); + one_column_roundtrip(values, true); } #[test] @@ -1652,7 +1707,7 @@ mod tests { let a = LargeListArray::from(a_list_data); let values = Arc::new(a); - one_column_roundtrip(values, true, Some(SMALL_SIZE / 2)); + one_column_roundtrip(values, true); } #[test] @@ -1668,10 +1723,10 @@ mod tests { ]; let list = ListArray::from_iter_primitive::(data.clone()); - one_column_roundtrip(Arc::new(list), true, Some(SMALL_SIZE / 2)); + one_column_roundtrip(Arc::new(list), true); let list = LargeListArray::from_iter_primitive::(data); - one_column_roundtrip(Arc::new(list), true, Some(SMALL_SIZE / 2)); + one_column_roundtrip(Arc::new(list), true); } #[test] @@ -1681,7 +1736,7 @@ mod tests { let s = StructArray::from(vec![(struct_field_a, Arc::new(a_values) as ArrayRef)]); let values = Arc::new(s); - one_column_roundtrip(values, false, Some(SMALL_SIZE / 2)); + one_column_roundtrip(values, false); } #[test] @@ -1702,9 +1757,7 @@ mod tests { .collect(); // build a record batch - let expected_batch = RecordBatch::try_new(schema, vec![Arc::new(d)]).unwrap(); - - roundtrip(expected_batch, Some(SMALL_SIZE / 2)); + one_column_roundtrip_with_schema(Arc::new(d), schema); } #[test] @@ -1728,10 +1781,7 @@ mod tests { builder.append(12345678).unwrap(); let d = builder.finish(); - // build a record batch - let expected_batch = RecordBatch::try_new(schema, vec![Arc::new(d)]).unwrap(); - - roundtrip(expected_batch, Some(SMALL_SIZE / 2)); + one_column_roundtrip_with_schema(Arc::new(d), schema); } #[test] @@ -1751,16 +1801,13 @@ mod tests { .copied() .collect(); - // build a record batch - let expected_batch = RecordBatch::try_new(schema, vec![Arc::new(d)]).unwrap(); - - roundtrip(expected_batch, Some(SMALL_SIZE / 2)); + one_column_roundtrip_with_schema(Arc::new(d), schema); } #[test] fn u32_min_max() { // check values roundtrip through parquet - let values = Arc::new(UInt32Array::from_iter_values(vec![ + let src = vec![ u32::MIN, u32::MIN + 1, (i32::MAX as u32) - 1, @@ -1768,30 +1815,40 @@ mod tests { (i32::MAX as u32) + 1, u32::MAX - 1, u32::MAX, - ])); - let file = one_column_roundtrip(values, false, None); - - // check statistics are valid - let reader = SerializedFileReader::new(file).unwrap(); - let metadata = reader.metadata(); - assert_eq!(metadata.num_row_groups(), 1); - let row_group = metadata.row_group(0); - assert_eq!(row_group.num_columns(), 1); - let column = row_group.column(0); - let stats = column.statistics().unwrap(); - assert!(stats.has_min_max_set()); - if let Statistics::Int32(stats) = stats { - assert_eq!(*stats.min() as u32, u32::MIN); - assert_eq!(*stats.max() as u32, u32::MAX); - } else { - panic!("Statistics::Int32 missing") + ]; + let values = Arc::new(UInt32Array::from_iter_values(src.iter().cloned())); + let files = one_column_roundtrip(values, false); + + for file in files { + // check statistics are valid + let reader = SerializedFileReader::new(file).unwrap(); + let metadata = reader.metadata(); + + let mut row_offset = 0; + for row_group in metadata.row_groups() { + assert_eq!(row_group.num_columns(), 1); + let column = row_group.column(0); + + let num_values = column.num_values() as usize; + let src_slice = &src[row_offset..row_offset + num_values]; + row_offset += column.num_values() as usize; + + let stats = column.statistics().unwrap(); + assert!(stats.has_min_max_set()); + if let Statistics::Int32(stats) = stats { + assert_eq!(*stats.min() as u32, *src_slice.iter().min().unwrap()); + assert_eq!(*stats.max() as u32, *src_slice.iter().max().unwrap()); + } else { + panic!("Statistics::Int32 missing") + } + } } } #[test] fn u64_min_max() { // check values roundtrip through parquet - let values = Arc::new(UInt64Array::from_iter_values(vec![ + let src = vec![ u64::MIN, u64::MIN + 1, (i64::MAX as u64) - 1, @@ -1799,23 +1856,33 @@ mod tests { (i64::MAX as u64) + 1, u64::MAX - 1, u64::MAX, - ])); - let file = one_column_roundtrip(values, false, None); - - // check statistics are valid - let reader = SerializedFileReader::new(file).unwrap(); - let metadata = reader.metadata(); - assert_eq!(metadata.num_row_groups(), 1); - let row_group = metadata.row_group(0); - assert_eq!(row_group.num_columns(), 1); - let column = row_group.column(0); - let stats = column.statistics().unwrap(); - assert!(stats.has_min_max_set()); - if let Statistics::Int64(stats) = stats { - assert_eq!(*stats.min() as u64, u64::MIN); - assert_eq!(*stats.max() as u64, u64::MAX); - } else { - panic!("Statistics::Int64 missing") + ]; + let values = Arc::new(UInt64Array::from_iter_values(src.iter().cloned())); + let files = one_column_roundtrip(values, false); + + for file in files { + // check statistics are valid + let reader = SerializedFileReader::new(file).unwrap(); + let metadata = reader.metadata(); + + let mut row_offset = 0; + for row_group in metadata.row_groups() { + assert_eq!(row_group.num_columns(), 1); + let column = row_group.column(0); + + let num_values = column.num_values() as usize; + let src_slice = &src[row_offset..row_offset + num_values]; + row_offset += column.num_values() as usize; + + let stats = column.statistics().unwrap(); + assert!(stats.has_min_max_set()); + if let Statistics::Int64(stats) = stats { + assert_eq!(*stats.min() as u64, *src_slice.iter().min().unwrap()); + assert_eq!(*stats.max() as u64, *src_slice.iter().max().unwrap()); + } else { + panic!("Statistics::Int64 missing") + } + } } } @@ -1823,17 +1890,19 @@ mod tests { fn statistics_null_counts_only_nulls() { // check that null-count statistics for "only NULL"-columns are correct let values = Arc::new(UInt64Array::from(vec![None, None])); - let file = one_column_roundtrip(values, true, None); - - // check statistics are valid - let reader = SerializedFileReader::new(file).unwrap(); - let metadata = reader.metadata(); - assert_eq!(metadata.num_row_groups(), 1); - let row_group = metadata.row_group(0); - assert_eq!(row_group.num_columns(), 1); - let column = row_group.column(0); - let stats = column.statistics().unwrap(); - assert_eq!(stats.null_count(), 2); + let files = one_column_roundtrip(values, true); + + for file in files { + // check statistics are valid + let reader = SerializedFileReader::new(file).unwrap(); + let metadata = reader.metadata(); + assert_eq!(metadata.num_row_groups(), 1); + let row_group = metadata.row_group(0); + assert_eq!(row_group.num_columns(), 1); + let column = row_group.column(0); + let stats = column.statistics().unwrap(); + assert_eq!(stats.null_count(), 2); + } } #[test] @@ -1923,7 +1992,7 @@ mod tests { let array = Arc::new(list_builder.finish()); - one_column_roundtrip(array, true, Some(10)); + one_column_roundtrip(array, true); } fn row_group_sizes(metadata: &ParquetMetaData) -> Vec {