Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Push gather down to Parquet Encoder #2109

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
110 changes: 38 additions & 72 deletions parquet/src/arrow/arrow_writer/mod.rs
Expand Up @@ -33,7 +33,7 @@ use super::schema::{
decimal_length_from_precision,
};

use crate::column::writer::{get_column_writer, ColumnWriter};
use crate::column::writer::{get_column_writer, ColumnWriter, ColumnWriterImpl};
use crate::errors::{ParquetError, Result};
use crate::file::metadata::RowGroupMetaDataPtr;
use crate::file::properties::WriterProperties;
Expand Down Expand Up @@ -399,48 +399,35 @@ fn write_leaf(
let indices = levels.non_null_indices();
let written = match writer {
ColumnWriter::Int32ColumnWriter(ref mut typed) => {
let values = match column.data_type() {
match column.data_type() {
ArrowDataType::Date64 => {
// If the column is a Date64, we cast it to a Date32, and then interpret that as Int32
let array = if let ArrowDataType::Date64 = column.data_type() {
let array = arrow::compute::cast(column, &ArrowDataType::Date32)?;
arrow::compute::cast(&array, &ArrowDataType::Int32)?
} else {
arrow::compute::cast(column, &ArrowDataType::Int32)?
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This if statement was somewhat redundant, I suspect it dates from a refactor at some point

};
let array = arrow::compute::cast(column, &ArrowDataType::Date32)?;
let array = arrow::compute::cast(&array, &ArrowDataType::Int32)?;

let array = array
.as_any()
.downcast_ref::<arrow_array::Int32Array>()
.expect("Unable to get int32 array");
get_numeric_array_slice::<Int32Type, _>(array, indices)
write_primitive(typed, array.values(), levels)?
}
ArrowDataType::UInt32 => {
let data = column.data();
let offset = data.offset();
// follow C++ implementation and use overflow/reinterpret cast from u32 to i32 which will map
// `(i32::MAX as u32)..u32::MAX` to `i32::MIN..0`
let array = column
.as_any()
.downcast_ref::<arrow_array::UInt32Array>()
.expect("Unable to get u32 array");
let array = arrow::compute::unary::<_, _, arrow::datatypes::Int32Type>(
array,
|x| x as i32,
);
get_numeric_array_slice::<Int32Type, _>(&array, indices)
let array: &[i32] = data.buffers()[0].typed_data();
write_primitive(typed, &array[offset..offset + data.len()], levels)?
}
_ => {
let array = arrow::compute::cast(column, &ArrowDataType::Int32)?;
let array = array
.as_any()
.downcast_ref::<arrow_array::Int32Array>()
.expect("Unable to get i32 array");
get_numeric_array_slice::<Int32Type, _>(array, indices)
write_primitive(typed, array.values(), levels)?
}
};
typed.write_batch(
values.as_slice(),
levels.def_levels(),
levels.rep_levels(),
)?
}
}
ColumnWriter::BoolColumnWriter(ref mut typed) => {
let array = column
Expand All @@ -454,41 +441,31 @@ fn write_leaf(
)?
}
ColumnWriter::Int64ColumnWriter(ref mut typed) => {
let values = match column.data_type() {
match column.data_type() {
ArrowDataType::Int64 => {
let array = column
.as_any()
.downcast_ref::<arrow_array::Int64Array>()
.expect("Unable to get i64 array");
get_numeric_array_slice::<Int64Type, _>(array, indices)
write_primitive(typed, array.values(), levels)?
}
ArrowDataType::UInt64 => {
// follow C++ implementation and use overflow/reinterpret cast from u64 to i64 which will map
// `(i64::MAX as u64)..u64::MAX` to `i64::MIN..0`
let array = column
.as_any()
.downcast_ref::<arrow_array::UInt64Array>()
.expect("Unable to get u64 array");
let array = arrow::compute::unary::<_, _, arrow::datatypes::Int64Type>(
array,
|x| x as i64,
);
get_numeric_array_slice::<Int64Type, _>(&array, indices)
let data = column.data();
let offset = data.offset();
let array: &[i64] = data.buffers()[0].typed_data();
write_primitive(typed, &array[offset..offset + data.len()], levels)?
}
_ => {
let array = arrow::compute::cast(column, &ArrowDataType::Int64)?;
let array = array
.as_any()
.downcast_ref::<arrow_array::Int64Array>()
.expect("Unable to get i64 array");
get_numeric_array_slice::<Int64Type, _>(array, indices)
write_primitive(typed, array.values(), levels)?
}
};
typed.write_batch(
values.as_slice(),
levels.def_levels(),
levels.rep_levels(),
)?
}
}
ColumnWriter::Int96ColumnWriter(ref mut _typed) => {
unreachable!("Currently unreachable because data type not supported")
Expand All @@ -498,22 +475,14 @@ fn write_leaf(
.as_any()
.downcast_ref::<arrow_array::Float32Array>()
.expect("Unable to get Float32 array");
typed.write_batch(
get_numeric_array_slice::<FloatType, _>(array, indices).as_slice(),
levels.def_levels(),
levels.rep_levels(),
)?
write_primitive(typed, array.values(), levels)?
}
ColumnWriter::DoubleColumnWriter(ref mut typed) => {
let array = column
.as_any()
.downcast_ref::<arrow_array::Float64Array>()
.expect("Unable to get Float64 array");
typed.write_batch(
get_numeric_array_slice::<DoubleType, _>(array, indices).as_slice(),
levels.def_levels(),
levels.rep_levels(),
)?
write_primitive(typed, array.values(), levels)?
}
ColumnWriter::ByteArrayColumnWriter(ref mut typed) => match column.data_type() {
ArrowDataType::Binary => {
Expand Down Expand Up @@ -645,31 +614,28 @@ macro_rules! def_get_binary_array_fn {
};
}

fn write_primitive<'a, T: DataType>(
writer: &mut ColumnWriterImpl<'a, T>,
values: &[T::T],
levels: LevelInfo,
) -> Result<usize> {
writer.write_batch_internal(
values,
Some(levels.non_null_indices()),
levels.def_levels(),
levels.rep_levels(),
None,
None,
None,
)
}

// TODO: These methods don't handle non null indices correctly (#1753)
def_get_binary_array_fn!(get_binary_array, arrow_array::BinaryArray);
def_get_binary_array_fn!(get_string_array, arrow_array::StringArray);
def_get_binary_array_fn!(get_large_binary_array, arrow_array::LargeBinaryArray);
def_get_binary_array_fn!(get_large_string_array, arrow_array::LargeStringArray);

/// Get the underlying numeric array slice, skipping any null values.
/// If there are no null values, it might be quicker to get the slice directly instead of
/// calling this function.
fn get_numeric_array_slice<T, A>(
array: &arrow_array::PrimitiveArray<A>,
indices: &[usize],
) -> Vec<T::T>
where
T: DataType,
A: arrow::datatypes::ArrowNumericType,
T::T: From<A::Native>,
{
let mut values = Vec::with_capacity(indices.len());
for i in indices {
values.push(array.value(*i).into())
}
values
}

fn get_bool_array_slice(
array: &arrow_array::BooleanArray,
indices: &[usize],
Expand Down
80 changes: 57 additions & 23 deletions parquet/src/column/writer/encoder.rs
Expand Up @@ -39,7 +39,11 @@ pub trait ColumnValues {
/// Returns the min and max values in this collection, skipping any NaN values
///
/// Returns `None` if no values found
fn min_max(&self, descr: &ColumnDescriptor) -> Option<(&Self::T, &Self::T)>;
fn min_max<'a>(
&'a self,
descr: &ColumnDescriptor,
value_indices: Option<&[usize]>,
) -> Option<(&'a Self::T, &'a Self::T)>;
}

/// The encoded data for a dictionary page
Expand Down Expand Up @@ -77,6 +81,9 @@ pub trait ColumnValueEncoder {
/// Write the corresponding values to this [`ColumnValueEncoder`]
fn write(&mut self, values: &Self::Values, offset: usize, len: usize) -> Result<()>;

/// Write the corresponding values to this [`ColumnValueEncoder`]
fn write_gather(&mut self, values: &Self::Values, indices: &[usize]) -> Result<()>;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not totally sold on this name, suggestions welcome


/// Returns the number of buffered values
fn num_values(&self) -> usize;

Expand Down Expand Up @@ -148,7 +155,7 @@ impl<T: DataType> ColumnValueEncoder for ColumnValueEncoderImpl<T> {
)
})?;

if let Some((min, max)) = slice.min_max(&self.descr) {
if let Some((min, max)) = slice.min_max(&self.descr, None) {
update_min(&self.descr, min, &mut self.min_value);
update_max(&self.descr, max, &mut self.max_value);
}
Expand All @@ -159,6 +166,20 @@ impl<T: DataType> ColumnValueEncoder for ColumnValueEncoderImpl<T> {
}
}

fn write_gather(&mut self, values: &Self::Values, indices: &[usize]) -> Result<()> {
self.num_values += indices.len();

if let Some((min, max)) = values.min_max(&self.descr, Some(indices)) {
update_min(&self.descr, min, &mut self.min_value);
update_max(&self.descr, max, &mut self.max_value);
}

match &mut self.dict_encoder {
Some(encoder) => encoder.put_gather(values, indices),
_ => self.encoder.put_gather(values, indices),
}
}

fn num_values(&self) -> usize {
self.num_values
}
Expand Down Expand Up @@ -222,29 +243,42 @@ impl<T: ParquetValueType> ColumnValues for [T] {
self.len()
}

fn min_max(&self, descr: &ColumnDescriptor) -> Option<(&T, &T)> {
let mut iter = self.iter();
fn min_max<'a>(
&'a self,
descr: &ColumnDescriptor,
value_indices: Option<&[usize]>,
) -> Option<(&'a T, &'a T)> {
match value_indices {
Some(indices) => get_min_max(descr, indices.iter().map(|x| &self[*x])),
None => get_min_max(descr, self.iter()),
}
}
}

let first = loop {
let next = iter.next()?;
if !is_nan(next) {
break next;
}
};
fn get_min_max<'a, T, I>(descr: &ColumnDescriptor, mut iter: I) -> Option<(&'a T, &'a T)>
where
T: ParquetValueType,
I: Iterator<Item = &'a T>,
{
let first = loop {
let next = iter.next()?;
if !is_nan(next) {
break next;
}
};

let mut min = first;
let mut max = first;
for val in iter {
if is_nan(val) {
continue;
}
if compare_greater(descr, min, val) {
min = val;
}
if compare_greater(descr, val, max) {
max = val;
}
let mut min = first;
let mut max = first;
for val in iter {
if is_nan(val) {
continue;
}
if compare_greater(descr, min, val) {
min = val;
}
if compare_greater(descr, val, max) {
max = val;
}
Some((min, max))
}
Some((min, max))
}
20 changes: 16 additions & 4 deletions parquet/src/column/writer/mod.rs
Expand Up @@ -249,9 +249,10 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
}
}

fn write_batch_internal(
pub(crate) fn write_batch_internal(
&mut self,
values: &E::Values,
value_indices: Option<&[usize]>,
def_levels: Option<&[i16]>,
rep_levels: Option<&[i16]>,
min: Option<&E::T>,
Expand Down Expand Up @@ -290,7 +291,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
panic!("min/max should be both set or both None")
}
(None, None) => {
if let Some((min, max)) = values.min_max(&self.descr) {
if let Some((min, max)) = values.min_max(&self.descr, value_indices) {
update_min(&self.descr, min, &mut self.min_column_value);
update_max(&self.descr, max, &mut self.max_column_value);
}
Expand All @@ -311,6 +312,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
values_offset += self.write_mini_batch(
values,
values_offset,
value_indices,
write_batch_size,
def_levels.map(|lv| &lv[levels_offset..levels_offset + write_batch_size]),
rep_levels.map(|lv| &lv[levels_offset..levels_offset + write_batch_size]),
Expand All @@ -321,6 +323,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
values_offset += self.write_mini_batch(
values,
values_offset,
value_indices,
num_levels - levels_offset,
def_levels.map(|lv| &lv[levels_offset..]),
rep_levels.map(|lv| &lv[levels_offset..]),
Expand Down Expand Up @@ -348,7 +351,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
def_levels: Option<&[i16]>,
rep_levels: Option<&[i16]>,
) -> Result<usize> {
self.write_batch_internal(values, def_levels, rep_levels, None, None, None)
self.write_batch_internal(values, None, def_levels, rep_levels, None, None, None)
}

/// Writer may optionally provide pre-calculated statistics for use when computing
Expand All @@ -369,6 +372,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
) -> Result<usize> {
self.write_batch_internal(
values,
None,
def_levels,
rep_levels,
min,
Expand Down Expand Up @@ -427,6 +431,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
&mut self,
values: &E::Values,
values_offset: usize,
value_indices: Option<&[usize]>,
num_levels: usize,
def_levels: Option<&[i16]>,
rep_levels: Option<&[i16]>,
Expand Down Expand Up @@ -489,7 +494,14 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
self.num_buffered_rows += num_levels as u32;
}

self.encoder.write(values, values_offset, values_to_write)?;
match value_indices {
Some(indices) => {
let indices = &indices[values_offset..values_offset + values_to_write];
self.encoder.write_gather(values, indices)?;
}
None => self.encoder.write(values, values_offset, values_to_write)?,
}

self.num_buffered_values += num_levels as u32;

if self.should_add_data_page() {
Expand Down