From fe3e5372d072f7e2e4101bb79346fa6170d2a76e Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Thu, 4 Aug 2022 15:17:42 +0100 Subject: [PATCH] Don't hydrate string dictionaries when writing to parquet (#1764) --- arrow/src/array/array_dictionary.rs | 13 ++- arrow/src/util/data_gen.rs | 11 ++ parquet/benches/arrow_writer.rs | 31 ++++++ parquet/src/arrow/arrow_writer/byte_array.rs | 92 +++++++++++----- parquet/src/arrow/arrow_writer/mod.rs | 107 ++++++------------- 5 files changed, 147 insertions(+), 107 deletions(-) diff --git a/arrow/src/array/array_dictionary.rs b/arrow/src/array/array_dictionary.rs index 2afc7a69e0b..2acb51750d1 100644 --- a/arrow/src/array/array_dictionary.rs +++ b/arrow/src/array/array_dictionary.rs @@ -421,7 +421,6 @@ impl fmt::Debug for DictionaryArray { /// assert_eq!(maybe_val.unwrap(), orig) /// } /// ``` -#[derive(Copy, Clone)] pub struct TypedDictionaryArray<'a, K: ArrowPrimitiveType, V> { /// The dictionary array dictionary: &'a DictionaryArray, @@ -429,6 +428,18 @@ pub struct TypedDictionaryArray<'a, K: ArrowPrimitiveType, V> { values: &'a V, } +// Manually implement `Clone` to avoid `V: Clone` type constraint +impl<'a, K: ArrowPrimitiveType, V> Clone for TypedDictionaryArray<'a, K, V> { + fn clone(&self) -> Self { + Self { + dictionary: self.dictionary, + values: self.values, + } + } +} + +impl<'a, K: ArrowPrimitiveType, V> Copy for TypedDictionaryArray<'a, K, V> {} + impl<'a, K: ArrowPrimitiveType, V> fmt::Debug for TypedDictionaryArray<'a, K, V> { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { writeln!(f, "TypedDictionaryArray({:?})", self.dictionary) diff --git a/arrow/src/util/data_gen.rs b/arrow/src/util/data_gen.rs index 21b8ee8c9fd..4d974409a0e 100644 --- a/arrow/src/util/data_gen.rs +++ b/arrow/src/util/data_gen.rs @@ -143,6 +143,17 @@ pub fn create_random_array( }) .collect::>>()?, )?), + d @ Dictionary(_, value_type) + if crate::compute::can_cast_types(value_type, d) => + { + let f = Field::new( + field.name(), + value_type.as_ref().clone(), + field.is_nullable(), + ); + let v = create_random_array(&f, size, null_density, true_density)?; + crate::compute::cast(&v, d)? + } other => { return Err(ArrowError::NotYetImplemented(format!( "Generating random arrays not yet implemented for {:?}", diff --git a/parquet/benches/arrow_writer.rs b/parquet/benches/arrow_writer.rs index 25ff1ca90dc..ddca1e53c6d 100644 --- a/parquet/benches/arrow_writer.rs +++ b/parquet/benches/arrow_writer.rs @@ -92,6 +92,25 @@ fn create_string_bench_batch( )?) } +fn create_string_dictionary_bench_batch( + size: usize, + null_density: f32, + true_density: f32, +) -> Result { + let fields = vec![Field::new( + "_1", + DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)), + true, + )]; + let schema = Schema::new(fields); + Ok(create_random_batch( + Arc::new(schema), + size, + null_density, + true_density, + )?) +} + fn create_string_bench_batch_non_null( size: usize, null_density: f32, @@ -346,6 +365,18 @@ fn bench_primitive_writer(c: &mut Criterion) { b.iter(|| write_batch(&batch).unwrap()) }); + let batch = create_string_dictionary_bench_batch(4096, 0.25, 0.75).unwrap(); + group.throughput(Throughput::Bytes( + batch + .columns() + .iter() + .map(|f| f.get_array_memory_size() as u64) + .sum(), + )); + group.bench_function("4096 values string dictionary", |b| { + b.iter(|| write_batch(&batch).unwrap()) + }); + let batch = create_string_bench_batch_non_null(4096, 0.25, 0.75).unwrap(); group.throughput(Throughput::Bytes( batch diff --git a/parquet/src/arrow/arrow_writer/byte_array.rs b/parquet/src/arrow/arrow_writer/byte_array.rs index d1a0da5b391..a7b6ccc3fc8 100644 --- a/parquet/src/arrow/arrow_writer/byte_array.rs +++ b/parquet/src/arrow/arrow_writer/byte_array.rs @@ -16,7 +16,6 @@ // under the License. use crate::arrow::arrow_writer::levels::LevelInfo; -use crate::arrow::arrow_writer::ArrayWriter; use crate::basic::Encoding; use crate::column::page::PageWriter; use crate::column::writer::encoder::{ @@ -33,11 +32,38 @@ use crate::schema::types::ColumnDescPtr; use crate::util::bit_util::num_required_bits; use crate::util::interner::{Interner, Storage}; use arrow::array::{ - Array, ArrayAccessor, ArrayRef, BinaryArray, LargeBinaryArray, LargeStringArray, - StringArray, + Array, ArrayAccessor, ArrayRef, BinaryArray, DictionaryArray, LargeBinaryArray, + LargeStringArray, StringArray, }; use arrow::datatypes::DataType; +macro_rules! downcast_dict_impl { + ($array:ident, $key:ident, $val:ident, $op:expr $(, $arg:expr)*) => {{ + $op($array + .as_any() + .downcast_ref::>() + .unwrap() + .downcast_dict::<$val>() + .unwrap()$(, $arg)*) + }}; +} + +macro_rules! downcast_dict_op { + ($key_type:expr, $val:ident, $array:ident, $op:expr $(, $arg:expr)*) => { + match $key_type.as_ref() { + DataType::UInt8 => downcast_dict_impl!($array, UInt8Type, $val, $op$(, $arg)*), + DataType::UInt16 => downcast_dict_impl!($array, UInt16Type, $val, $op$(, $arg)*), + DataType::UInt32 => downcast_dict_impl!($array, UInt32Type, $val, $op$(, $arg)*), + DataType::UInt64 => downcast_dict_impl!($array, UInt64Type, $val, $op$(, $arg)*), + DataType::Int8 => downcast_dict_impl!($array, Int8Type, $val, $op$(, $arg)*), + DataType::Int16 => downcast_dict_impl!($array, Int16Type, $val, $op$(, $arg)*), + DataType::Int32 => downcast_dict_impl!($array, Int32Type, $val, $op$(, $arg)*), + DataType::Int64 => downcast_dict_impl!($array, Int64Type, $val, $op$(, $arg)*), + _ => unreachable!(), + } + }; +} + macro_rules! downcast_op { ($data_type:expr, $array:ident, $op:expr $(, $arg:expr)*) => { match $data_type { @@ -51,36 +77,44 @@ macro_rules! downcast_op { DataType::LargeBinary => { $op($array.as_any().downcast_ref::().unwrap()$(, $arg)*) } - d => unreachable!("cannot downcast {} to byte array", d) + DataType::Dictionary(key, value) => match value.as_ref() { + DataType::Utf8 => downcast_dict_op!(key, StringArray, $array, $op$(, $arg)*), + DataType::LargeUtf8 => { + downcast_dict_op!(key, LargeStringArray, $array, $op$(, $arg)*) + } + DataType::Binary => downcast_dict_op!(key, BinaryArray, $array, $op$(, $arg)*), + DataType::LargeBinary => { + downcast_dict_op!(key, LargeBinaryArray, $array, $op$(, $arg)*) + } + d => unreachable!("cannot downcast {} dictionary value to byte array", d), + }, + d => unreachable!("cannot downcast {} to byte array", d), } }; } -/// Returns an [`ArrayWriter`] for byte or string arrays -pub(super) fn make_byte_array_writer<'a>( - descr: ColumnDescPtr, - data_type: DataType, - props: WriterPropertiesPtr, - page_writer: Box, - on_close: OnCloseColumnChunk<'a>, -) -> Box { - Box::new(ByteArrayWriter { - writer: Some(GenericColumnWriter::new(descr, props, page_writer)), - on_close: Some(on_close), - data_type, - }) -} - -/// An [`ArrayWriter`] for [`ByteArray`] -struct ByteArrayWriter<'a> { - writer: Option>, +/// A writer for byte array types +pub(super) struct ByteArrayWriter<'a> { + writer: GenericColumnWriter<'a, ByteArrayEncoder>, on_close: Option>, - data_type: DataType, } -impl<'a> ArrayWriter for ByteArrayWriter<'a> { - fn write(&mut self, array: &ArrayRef, levels: LevelInfo) -> Result<()> { - self.writer.as_mut().unwrap().write_batch_internal( +impl<'a> ByteArrayWriter<'a> { + /// Returns a new [`ByteArrayWriter`] + pub fn new( + descr: ColumnDescPtr, + props: &'a WriterPropertiesPtr, + page_writer: Box, + on_close: OnCloseColumnChunk<'a>, + ) -> Result { + Ok(Self { + writer: GenericColumnWriter::new(descr, props.clone(), page_writer), + on_close: Some(on_close), + }) + } + + pub fn write(&mut self, array: &ArrayRef, levels: LevelInfo) -> Result<()> { + self.writer.write_batch_internal( array, Some(levels.non_null_indices()), levels.def_levels(), @@ -92,11 +126,11 @@ impl<'a> ArrayWriter for ByteArrayWriter<'a> { Ok(()) } - fn close(&mut self) -> Result<()> { + pub fn close(self) -> Result<()> { let (bytes_written, rows_written, metadata, column_index, offset_index) = - self.writer.take().unwrap().close()?; + self.writer.close()?; - if let Some(on_close) = self.on_close.take() { + if let Some(on_close) = self.on_close { on_close( bytes_written, rows_written, diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index 49531d9724a..800aff98a6f 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -33,70 +33,18 @@ use super::schema::{ decimal_length_from_precision, }; -use crate::column::writer::{get_column_writer, ColumnWriter, ColumnWriterImpl}; +use crate::arrow::arrow_writer::byte_array::ByteArrayWriter; +use crate::column::writer::{ColumnWriter, ColumnWriterImpl}; use crate::errors::{ParquetError, Result}; use crate::file::metadata::RowGroupMetaDataPtr; use crate::file::properties::WriterProperties; -use crate::file::writer::{SerializedColumnWriter, SerializedRowGroupWriter}; +use crate::file::writer::SerializedRowGroupWriter; use crate::{data_type::*, file::writer::SerializedFileWriter}; use levels::{calculate_array_levels, LevelInfo}; mod byte_array; mod levels; -/// An object-safe API for writing an [`ArrayRef`] -trait ArrayWriter { - fn write(&mut self, array: &ArrayRef, levels: LevelInfo) -> Result<()>; - - fn close(&mut self) -> Result<()>; -} - -/// Fallback implementation for writing an [`ArrayRef`] that uses [`SerializedColumnWriter`] -struct ColumnArrayWriter<'a>(Option>); - -impl<'a> ArrayWriter for ColumnArrayWriter<'a> { - fn write(&mut self, array: &ArrayRef, levels: LevelInfo) -> Result<()> { - write_leaf(self.0.as_mut().unwrap().untyped(), array, levels)?; - Ok(()) - } - - fn close(&mut self) -> Result<()> { - self.0.take().unwrap().close() - } -} - -fn get_writer<'a, W: Write>( - row_group_writer: &'a mut SerializedRowGroupWriter<'_, W>, - data_type: &ArrowDataType, -) -> Result> { - let array_writer = row_group_writer - .next_column_with_factory( - |descr, props, page_writer, on_close| match data_type { - ArrowDataType::Utf8 - | ArrowDataType::LargeUtf8 - | ArrowDataType::Binary - | ArrowDataType::LargeBinary => Ok(byte_array::make_byte_array_writer( - descr, - data_type.clone(), - props.clone(), - page_writer, - on_close, - )), - _ => { - let column_writer = - get_column_writer(descr, props.clone(), page_writer); - - let serialized_writer = - SerializedColumnWriter::new(column_writer, Some(on_close)); - - Ok(Box::new(ColumnArrayWriter(Some(serialized_writer)))) - } - }, - )? - .expect("Unable to get column writer"); - Ok(array_writer) -} - /// Arrow writer /// /// Writes Arrow `RecordBatch`es to a Parquet writer, buffering up `RecordBatch` in order @@ -314,22 +262,24 @@ fn write_leaves( | ArrowDataType::Time64(_) | ArrowDataType::Duration(_) | ArrowDataType::Interval(_) - | ArrowDataType::LargeBinary - | ArrowDataType::Binary - | ArrowDataType::Utf8 - | ArrowDataType::LargeUtf8 | ArrowDataType::Decimal128(_, _) | ArrowDataType::Decimal256(_, _) | ArrowDataType::FixedSizeBinary(_) => { - let mut writer = get_writer(row_group_writer, &data_type)?; + let mut col_writer = row_group_writer.next_column()?.unwrap(); for (array, levels) in arrays.iter().zip(levels.iter_mut()) { - writer.write( - array, - levels.pop().expect("Levels exhausted"), - )?; + write_leaf(col_writer.untyped(), array, levels.pop().expect("Levels exhausted"))?; } - writer.close()?; - Ok(()) + col_writer.close() + } + ArrowDataType::LargeBinary + | ArrowDataType::Binary + | ArrowDataType::Utf8 + | ArrowDataType::LargeUtf8 => { + let mut col_writer = row_group_writer.next_column_with_factory(ByteArrayWriter::new)?.unwrap(); + for (array, levels) in arrays.iter().zip(levels.iter_mut()) { + col_writer.write(array, levels.pop().expect("Levels exhausted"))?; + } + col_writer.close() } ArrowDataType::List(_) | ArrowDataType::LargeList(_) => { let arrays: Vec<_> = arrays.iter().map(|array|{ @@ -380,18 +330,21 @@ fn write_leaves( write_leaves(row_group_writer, &values, levels)?; Ok(()) } - ArrowDataType::Dictionary(_, value_type) => { - let mut writer = get_writer(row_group_writer, value_type)?; - for (array, levels) in arrays.iter().zip(levels.iter_mut()) { - // cast dictionary to a primitive - let array = arrow::compute::cast(array, value_type)?; - writer.write( - &array, - levels.pop().expect("Levels exhausted"), - )?; + ArrowDataType::Dictionary(_, value_type) => match value_type.as_ref() { + ArrowDataType::Utf8 | ArrowDataType::LargeUtf8 | ArrowDataType::Binary | ArrowDataType::LargeBinary => { + let mut col_writer = row_group_writer.next_column_with_factory(ByteArrayWriter::new)?.unwrap(); + for (array, levels) in arrays.iter().zip(levels.iter_mut()) { + col_writer.write(array, levels.pop().expect("Levels exhausted"))?; + } + col_writer.close() + } + _ => { + let mut col_writer = row_group_writer.next_column()?.unwrap(); + for (array, levels) in arrays.iter().zip(levels.iter_mut()) { + write_leaf(col_writer.untyped(), array, levels.pop().expect("Levels exhausted"))?; + } + col_writer.close() } - writer.close()?; - Ok(()) } ArrowDataType::Float16 => Err(ParquetError::ArrowError( "Float16 arrays not supported".to_string(),