diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index 8a79a116f54..1ebd7ee3e11 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -33,7 +33,7 @@ use super::schema::{ decimal_length_from_precision, }; -use crate::column::writer::{get_column_writer, ColumnWriter}; +use crate::column::writer::{get_column_writer, ColumnWriter, ColumnWriterImpl}; use crate::errors::{ParquetError, Result}; use crate::file::metadata::RowGroupMetaDataPtr; use crate::file::properties::WriterProperties; @@ -399,33 +399,25 @@ fn write_leaf( let indices = levels.non_null_indices(); let written = match writer { ColumnWriter::Int32ColumnWriter(ref mut typed) => { - let values = match column.data_type() { + match column.data_type() { ArrowDataType::Date64 => { // If the column is a Date64, we cast it to a Date32, and then interpret that as Int32 - let array = if let ArrowDataType::Date64 = column.data_type() { - let array = arrow::compute::cast(column, &ArrowDataType::Date32)?; - arrow::compute::cast(&array, &ArrowDataType::Int32)? - } else { - arrow::compute::cast(column, &ArrowDataType::Int32)? - }; + let array = arrow::compute::cast(column, &ArrowDataType::Date32)?; + let array = arrow::compute::cast(&array, &ArrowDataType::Int32)?; + let array = array .as_any() .downcast_ref::() .expect("Unable to get int32 array"); - get_numeric_array_slice::(array, indices) + write_primitive(typed, array.values(), levels)? } ArrowDataType::UInt32 => { + let data = column.data(); + let offset = data.offset(); // follow C++ implementation and use overflow/reinterpret cast from u32 to i32 which will map // `(i32::MAX as u32)..u32::MAX` to `i32::MIN..0` - let array = column - .as_any() - .downcast_ref::() - .expect("Unable to get u32 array"); - let array = arrow::compute::unary::<_, _, arrow::datatypes::Int32Type>( - array, - |x| x as i32, - ); - get_numeric_array_slice::(&array, indices) + let array: &[i32] = data.buffers()[0].typed_data(); + write_primitive(typed, &array[offset..offset + data.len()], levels)? } _ => { let array = arrow::compute::cast(column, &ArrowDataType::Int32)?; @@ -433,14 +425,9 @@ fn write_leaf( .as_any() .downcast_ref::() .expect("Unable to get i32 array"); - get_numeric_array_slice::(array, indices) + write_primitive(typed, array.values(), levels)? } - }; - typed.write_batch( - values.as_slice(), - levels.def_levels(), - levels.rep_levels(), - )? + } } ColumnWriter::BoolColumnWriter(ref mut typed) => { let array = column @@ -454,26 +441,21 @@ fn write_leaf( )? } ColumnWriter::Int64ColumnWriter(ref mut typed) => { - let values = match column.data_type() { + match column.data_type() { ArrowDataType::Int64 => { let array = column .as_any() .downcast_ref::() .expect("Unable to get i64 array"); - get_numeric_array_slice::(array, indices) + write_primitive(typed, array.values(), levels)? } ArrowDataType::UInt64 => { // follow C++ implementation and use overflow/reinterpret cast from u64 to i64 which will map // `(i64::MAX as u64)..u64::MAX` to `i64::MIN..0` - let array = column - .as_any() - .downcast_ref::() - .expect("Unable to get u64 array"); - let array = arrow::compute::unary::<_, _, arrow::datatypes::Int64Type>( - array, - |x| x as i64, - ); - get_numeric_array_slice::(&array, indices) + let data = column.data(); + let offset = data.offset(); + let array: &[i64] = data.buffers()[0].typed_data(); + write_primitive(typed, &array[offset..offset + data.len()], levels)? } _ => { let array = arrow::compute::cast(column, &ArrowDataType::Int64)?; @@ -481,14 +463,9 @@ fn write_leaf( .as_any() .downcast_ref::() .expect("Unable to get i64 array"); - get_numeric_array_slice::(array, indices) + write_primitive(typed, array.values(), levels)? } - }; - typed.write_batch( - values.as_slice(), - levels.def_levels(), - levels.rep_levels(), - )? + } } ColumnWriter::Int96ColumnWriter(ref mut _typed) => { unreachable!("Currently unreachable because data type not supported") @@ -498,22 +475,14 @@ fn write_leaf( .as_any() .downcast_ref::() .expect("Unable to get Float32 array"); - typed.write_batch( - get_numeric_array_slice::(array, indices).as_slice(), - levels.def_levels(), - levels.rep_levels(), - )? + write_primitive(typed, array.values(), levels)? } ColumnWriter::DoubleColumnWriter(ref mut typed) => { let array = column .as_any() .downcast_ref::() .expect("Unable to get Float64 array"); - typed.write_batch( - get_numeric_array_slice::(array, indices).as_slice(), - levels.def_levels(), - levels.rep_levels(), - )? + write_primitive(typed, array.values(), levels)? } ColumnWriter::ByteArrayColumnWriter(ref mut typed) => match column.data_type() { ArrowDataType::Binary => { @@ -645,31 +614,28 @@ macro_rules! def_get_binary_array_fn { }; } +fn write_primitive<'a, T: DataType>( + writer: &mut ColumnWriterImpl<'a, T>, + values: &[T::T], + levels: LevelInfo, +) -> Result { + writer.write_batch_internal( + values, + Some(levels.non_null_indices()), + levels.def_levels(), + levels.rep_levels(), + None, + None, + None, + ) +} + // TODO: These methods don't handle non null indices correctly (#1753) def_get_binary_array_fn!(get_binary_array, arrow_array::BinaryArray); def_get_binary_array_fn!(get_string_array, arrow_array::StringArray); def_get_binary_array_fn!(get_large_binary_array, arrow_array::LargeBinaryArray); def_get_binary_array_fn!(get_large_string_array, arrow_array::LargeStringArray); -/// Get the underlying numeric array slice, skipping any null values. -/// If there are no null values, it might be quicker to get the slice directly instead of -/// calling this function. -fn get_numeric_array_slice( - array: &arrow_array::PrimitiveArray, - indices: &[usize], -) -> Vec -where - T: DataType, - A: arrow::datatypes::ArrowNumericType, - T::T: From, -{ - let mut values = Vec::with_capacity(indices.len()); - for i in indices { - values.push(array.value(*i).into()) - } - values -} - fn get_bool_array_slice( array: &arrow_array::BooleanArray, indices: &[usize], diff --git a/parquet/src/column/writer/encoder.rs b/parquet/src/column/writer/encoder.rs index bc31aedf4e5..965a9faa274 100644 --- a/parquet/src/column/writer/encoder.rs +++ b/parquet/src/column/writer/encoder.rs @@ -39,7 +39,11 @@ pub trait ColumnValues { /// Returns the min and max values in this collection, skipping any NaN values /// /// Returns `None` if no values found - fn min_max(&self, descr: &ColumnDescriptor) -> Option<(&Self::T, &Self::T)>; + fn min_max<'a>( + &'a self, + descr: &ColumnDescriptor, + value_indices: Option<&[usize]>, + ) -> Option<(&'a Self::T, &'a Self::T)>; } /// The encoded data for a dictionary page @@ -77,6 +81,9 @@ pub trait ColumnValueEncoder { /// Write the corresponding values to this [`ColumnValueEncoder`] fn write(&mut self, values: &Self::Values, offset: usize, len: usize) -> Result<()>; + /// Write the corresponding values to this [`ColumnValueEncoder`] + fn write_gather(&mut self, values: &Self::Values, indices: &[usize]) -> Result<()>; + /// Returns the number of buffered values fn num_values(&self) -> usize; @@ -148,7 +155,7 @@ impl ColumnValueEncoder for ColumnValueEncoderImpl { ) })?; - if let Some((min, max)) = slice.min_max(&self.descr) { + if let Some((min, max)) = slice.min_max(&self.descr, None) { update_min(&self.descr, min, &mut self.min_value); update_max(&self.descr, max, &mut self.max_value); } @@ -159,6 +166,20 @@ impl ColumnValueEncoder for ColumnValueEncoderImpl { } } + fn write_gather(&mut self, values: &Self::Values, indices: &[usize]) -> Result<()> { + self.num_values += indices.len(); + + if let Some((min, max)) = values.min_max(&self.descr, Some(indices)) { + update_min(&self.descr, min, &mut self.min_value); + update_max(&self.descr, max, &mut self.max_value); + } + + match &mut self.dict_encoder { + Some(encoder) => encoder.put_gather(values, indices), + _ => self.encoder.put_gather(values, indices), + } + } + fn num_values(&self) -> usize { self.num_values } @@ -222,29 +243,42 @@ impl ColumnValues for [T] { self.len() } - fn min_max(&self, descr: &ColumnDescriptor) -> Option<(&T, &T)> { - let mut iter = self.iter(); + fn min_max<'a>( + &'a self, + descr: &ColumnDescriptor, + value_indices: Option<&[usize]>, + ) -> Option<(&'a T, &'a T)> { + match value_indices { + Some(indices) => get_min_max(descr, indices.iter().map(|x| &self[*x])), + None => get_min_max(descr, self.iter()), + } + } +} - let first = loop { - let next = iter.next()?; - if !is_nan(next) { - break next; - } - }; +fn get_min_max<'a, T, I>(descr: &ColumnDescriptor, mut iter: I) -> Option<(&'a T, &'a T)> +where + T: ParquetValueType, + I: Iterator, +{ + let first = loop { + let next = iter.next()?; + if !is_nan(next) { + break next; + } + }; - let mut min = first; - let mut max = first; - for val in iter { - if is_nan(val) { - continue; - } - if compare_greater(descr, min, val) { - min = val; - } - if compare_greater(descr, val, max) { - max = val; - } + let mut min = first; + let mut max = first; + for val in iter { + if is_nan(val) { + continue; + } + if compare_greater(descr, min, val) { + min = val; + } + if compare_greater(descr, val, max) { + max = val; } - Some((min, max)) } + Some((min, max)) } diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs index ff6c098980a..6f487b6094a 100644 --- a/parquet/src/column/writer/mod.rs +++ b/parquet/src/column/writer/mod.rs @@ -249,9 +249,10 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { } } - fn write_batch_internal( + pub(crate) fn write_batch_internal( &mut self, values: &E::Values, + value_indices: Option<&[usize]>, def_levels: Option<&[i16]>, rep_levels: Option<&[i16]>, min: Option<&E::T>, @@ -290,7 +291,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { panic!("min/max should be both set or both None") } (None, None) => { - if let Some((min, max)) = values.min_max(&self.descr) { + if let Some((min, max)) = values.min_max(&self.descr, value_indices) { update_min(&self.descr, min, &mut self.min_column_value); update_max(&self.descr, max, &mut self.max_column_value); } @@ -311,6 +312,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { values_offset += self.write_mini_batch( values, values_offset, + value_indices, write_batch_size, def_levels.map(|lv| &lv[levels_offset..levels_offset + write_batch_size]), rep_levels.map(|lv| &lv[levels_offset..levels_offset + write_batch_size]), @@ -321,6 +323,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { values_offset += self.write_mini_batch( values, values_offset, + value_indices, num_levels - levels_offset, def_levels.map(|lv| &lv[levels_offset..]), rep_levels.map(|lv| &lv[levels_offset..]), @@ -348,7 +351,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { def_levels: Option<&[i16]>, rep_levels: Option<&[i16]>, ) -> Result { - self.write_batch_internal(values, def_levels, rep_levels, None, None, None) + self.write_batch_internal(values, None, def_levels, rep_levels, None, None, None) } /// Writer may optionally provide pre-calculated statistics for use when computing @@ -369,6 +372,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { ) -> Result { self.write_batch_internal( values, + None, def_levels, rep_levels, min, @@ -427,6 +431,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { &mut self, values: &E::Values, values_offset: usize, + value_indices: Option<&[usize]>, num_levels: usize, def_levels: Option<&[i16]>, rep_levels: Option<&[i16]>, @@ -489,7 +494,14 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { self.num_buffered_rows += num_levels as u32; } - self.encoder.write(values, values_offset, values_to_write)?; + match value_indices { + Some(indices) => { + let indices = &indices[values_offset..values_offset + values_to_write]; + self.encoder.write_gather(values, indices)?; + } + None => self.encoder.write(values, values_offset, values_to_write)?, + } + self.num_buffered_values += num_levels as u32; if self.should_add_data_page() { diff --git a/parquet/src/encodings/encoding.rs b/parquet/src/encodings/encoding.rs index 651635af59c..b982156442d 100644 --- a/parquet/src/encodings/encoding.rs +++ b/parquet/src/encodings/encoding.rs @@ -42,6 +42,9 @@ pub trait Encoder { /// Encodes data from `values`. fn put(&mut self, values: &[T::T]) -> Result<()>; + /// Encodes data from `values` with the corresponding indexes + fn put_gather(&mut self, values: &[T::T], value_indices: &[usize]) -> Result<()>; + /// Encodes data from `values`, which contains spaces for null values, that is /// identified by `valid_bits`. /// @@ -152,6 +155,17 @@ impl Encoder for PlainEncoder { T::T::encode(values, &mut self.buffer, &mut self.bit_writer)?; Ok(()) } + + fn put_gather(&mut self, values: &[T::T], value_indices: &[usize]) -> Result<()> { + for idx in value_indices { + T::T::encode( + std::slice::from_ref(&values[*idx]), + &mut self.buffer, + &mut self.bit_writer, + )?; + } + Ok(()) + } } // ---------------------------------------------------------------------- @@ -352,6 +366,13 @@ impl Encoder for DictEncoder { Ok(()) } + fn put_gather(&mut self, values: &[T::T], value_indices: &[usize]) -> Result<()> { + for idx in value_indices { + self.put_one(&values[*idx])? + } + Ok(()) + } + // Performance Note: // As far as can be seen these functions are rarely called and as such we can hint to the // compiler that they dont need to be folded into hot locations in the final output. @@ -415,6 +436,12 @@ impl Encoder for RleValueEncoder { Ok(()) } + fn put_gather(&mut self, _values: &[T::T], _value_indices: &[usize]) -> Result<()> { + Err(nyi_err!( + "put_gather not yet implemented for RleValueEncoder" + )) + } + // Performance Note: // As far as can be seen these functions are rarely called and as such we can hint to the // compiler that they dont need to be folded into hot locations in the final output. @@ -587,7 +614,8 @@ impl DeltaBitPackEncoder { } // Compute bit width to store (max_delta - min_delta) - let bit_width = num_required_bits(self.subtract_u64(max_delta, min_delta)) as usize; + let bit_width = + num_required_bits(self.subtract_u64(max_delta, min_delta)) as usize; self.bit_writer.write_at(offset + i, bit_width as u8); // Encode values in current mini block using min_delta and bit_width @@ -647,6 +675,12 @@ impl Encoder for DeltaBitPackEncoder { Ok(()) } + fn put_gather(&mut self, _values: &[T::T], _value_indices: &[usize]) -> Result<()> { + Err(nyi_err!( + "put_gather not yet implemented for DeltaBitPackEncoder" + )) + } + // Performance Note: // As far as can be seen these functions are rarely called and as such we can hint to the // compiler that they dont need to be folded into hot locations in the final output. @@ -782,6 +816,12 @@ impl Encoder for DeltaLengthByteArrayEncoder { Ok(()) } + fn put_gather(&mut self, _values: &[T::T], _value_indices: &[usize]) -> Result<()> { + Err(nyi_err!( + "put_gather not yet implemented for DeltaLengthByteArrayEncoder" + )) + } + // Performance Note: // As far as can be seen these functions are rarely called and as such we can hint to the // compiler that they dont need to be folded into hot locations in the final output. @@ -874,6 +914,12 @@ impl Encoder for DeltaByteArrayEncoder { Ok(()) } + fn put_gather(&mut self, _values: &[T::T], _value_indices: &[usize]) -> Result<()> { + Err(nyi_err!( + "put_gather not yet implemented for DeltaByteArrayEncoder" + )) + } + // Performance Note: // As far as can be seen these functions are rarely called and as such we can hint to the // compiler that they dont need to be folded into hot locations in the final output.