Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve speed of writing string dictionaries to parquet by skipping a copy(#1764) #2322

Merged
merged 1 commit into from Aug 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
13 changes: 12 additions & 1 deletion arrow/src/array/array_dictionary.rs
Expand Up @@ -421,14 +421,25 @@ impl<T: ArrowPrimitiveType> fmt::Debug for DictionaryArray<T> {
/// assert_eq!(maybe_val.unwrap(), orig)
/// }
/// ```
#[derive(Copy, Clone)]
pub struct TypedDictionaryArray<'a, K: ArrowPrimitiveType, V> {
/// The dictionary array
dictionary: &'a DictionaryArray<K>,
/// The values of the dictionary
values: &'a V,
}

// Manually implement `Clone` to avoid `V: Clone` type constraint
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is strange that having a reference to & V would require V: Clone in order to #[derive(Clone)] 🤷

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That rfc doesn't sound quite right (or at least it is a overkill -- like a 🔨 for swatting a 🪰 ). All that is needed in this case is to recognize that the struct only uses &V rather than V rather than a way to provide generic arguments to macros 😱

Also, I am firmly of the belief that adding more generics is not the answer to most of life's problems 🤣 Maybe because my feeble mind can't handle the extra level of indirection

impl<'a, K: ArrowPrimitiveType, V> Clone for TypedDictionaryArray<'a, K, V> {
fn clone(&self) -> Self {
Self {
dictionary: self.dictionary,
values: self.values,
}
}
}

impl<'a, K: ArrowPrimitiveType, V> Copy for TypedDictionaryArray<'a, K, V> {}

impl<'a, K: ArrowPrimitiveType, V> fmt::Debug for TypedDictionaryArray<'a, K, V> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
writeln!(f, "TypedDictionaryArray({:?})", self.dictionary)
Expand Down
11 changes: 11 additions & 0 deletions arrow/src/util/data_gen.rs
Expand Up @@ -143,6 +143,17 @@ pub fn create_random_array(
})
.collect::<Result<Vec<(&str, ArrayRef)>>>()?,
)?),
d @ Dictionary(_, value_type)
if crate::compute::can_cast_types(value_type, d) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

using cast is a neat trick here 👍

{
let f = Field::new(
field.name(),
value_type.as_ref().clone(),
field.is_nullable(),
);
let v = create_random_array(&f, size, null_density, true_density)?;
crate::compute::cast(&v, d)?
}
other => {
return Err(ArrowError::NotYetImplemented(format!(
"Generating random arrays not yet implemented for {:?}",
Expand Down
31 changes: 31 additions & 0 deletions parquet/benches/arrow_writer.rs
Expand Up @@ -92,6 +92,25 @@ fn create_string_bench_batch(
)?)
}

fn create_string_dictionary_bench_batch(
size: usize,
null_density: f32,
true_density: f32,
) -> Result<RecordBatch> {
let fields = vec![Field::new(
"_1",
DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
true,
)];
let schema = Schema::new(fields);
Ok(create_random_batch(
Arc::new(schema),
size,
null_density,
true_density,
)?)
}

fn create_string_bench_batch_non_null(
size: usize,
null_density: f32,
Expand Down Expand Up @@ -346,6 +365,18 @@ fn bench_primitive_writer(c: &mut Criterion) {
b.iter(|| write_batch(&batch).unwrap())
});

let batch = create_string_dictionary_bench_batch(4096, 0.25, 0.75).unwrap();
group.throughput(Throughput::Bytes(
batch
.columns()
.iter()
.map(|f| f.get_array_memory_size() as u64)
.sum(),
));
group.bench_function("4096 values string dictionary", |b| {
b.iter(|| write_batch(&batch).unwrap())
});

let batch = create_string_bench_batch_non_null(4096, 0.25, 0.75).unwrap();
group.throughput(Throughput::Bytes(
batch
Expand Down
92 changes: 63 additions & 29 deletions parquet/src/arrow/arrow_writer/byte_array.rs
Expand Up @@ -16,7 +16,6 @@
// under the License.

use crate::arrow::arrow_writer::levels::LevelInfo;
use crate::arrow::arrow_writer::ArrayWriter;
use crate::basic::Encoding;
use crate::column::page::PageWriter;
use crate::column::writer::encoder::{
Expand All @@ -33,11 +32,38 @@ use crate::schema::types::ColumnDescPtr;
use crate::util::bit_util::num_required_bits;
use crate::util::interner::{Interner, Storage};
use arrow::array::{
Array, ArrayAccessor, ArrayRef, BinaryArray, LargeBinaryArray, LargeStringArray,
StringArray,
Array, ArrayAccessor, ArrayRef, BinaryArray, DictionaryArray, LargeBinaryArray,
LargeStringArray, StringArray,
};
use arrow::datatypes::DataType;

macro_rules! downcast_dict_impl {
($array:ident, $key:ident, $val:ident, $op:expr $(, $arg:expr)*) => {{
$op($array
.as_any()
.downcast_ref::<DictionaryArray<arrow::datatypes::$key>>()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When you say "could be made faster" the issue is that this is effectively creating something that will iterate over strings, which means that to encode the column, the arrow array dictionary index is used to find a string, which is then used to find the parquet array index which is then written.

It could potentially be faster if we skipped the string step in the middle and simply computed an arrow dictionary index --> parquet dictionary index mapping up front and applied that mapping during writing

(I think you said this in this PR's description, but I am restating it to confirm I understand what is happening)

.unwrap()
.downcast_dict::<$val>()
.unwrap()$(, $arg)*)
}};
}

macro_rules! downcast_dict_op {
($key_type:expr, $val:ident, $array:ident, $op:expr $(, $arg:expr)*) => {
match $key_type.as_ref() {
DataType::UInt8 => downcast_dict_impl!($array, UInt8Type, $val, $op$(, $arg)*),
DataType::UInt16 => downcast_dict_impl!($array, UInt16Type, $val, $op$(, $arg)*),
DataType::UInt32 => downcast_dict_impl!($array, UInt32Type, $val, $op$(, $arg)*),
DataType::UInt64 => downcast_dict_impl!($array, UInt64Type, $val, $op$(, $arg)*),
DataType::Int8 => downcast_dict_impl!($array, Int8Type, $val, $op$(, $arg)*),
DataType::Int16 => downcast_dict_impl!($array, Int16Type, $val, $op$(, $arg)*),
DataType::Int32 => downcast_dict_impl!($array, Int32Type, $val, $op$(, $arg)*),
DataType::Int64 => downcast_dict_impl!($array, Int64Type, $val, $op$(, $arg)*),
_ => unreachable!(),
}
};
}

macro_rules! downcast_op {
($data_type:expr, $array:ident, $op:expr $(, $arg:expr)*) => {
match $data_type {
Expand All @@ -51,36 +77,44 @@ macro_rules! downcast_op {
DataType::LargeBinary => {
$op($array.as_any().downcast_ref::<LargeBinaryArray>().unwrap()$(, $arg)*)
}
d => unreachable!("cannot downcast {} to byte array", d)
DataType::Dictionary(key, value) => match value.as_ref() {
DataType::Utf8 => downcast_dict_op!(key, StringArray, $array, $op$(, $arg)*),
DataType::LargeUtf8 => {
downcast_dict_op!(key, LargeStringArray, $array, $op$(, $arg)*)
}
DataType::Binary => downcast_dict_op!(key, BinaryArray, $array, $op$(, $arg)*),
DataType::LargeBinary => {
downcast_dict_op!(key, LargeBinaryArray, $array, $op$(, $arg)*)
}
d => unreachable!("cannot downcast {} dictionary value to byte array", d),
},
d => unreachable!("cannot downcast {} to byte array", d),
}
};
}

/// Returns an [`ArrayWriter`] for byte or string arrays
pub(super) fn make_byte_array_writer<'a>(
descr: ColumnDescPtr,
data_type: DataType,
props: WriterPropertiesPtr,
page_writer: Box<dyn PageWriter + 'a>,
on_close: OnCloseColumnChunk<'a>,
) -> Box<dyn ArrayWriter + 'a> {
Box::new(ByteArrayWriter {
writer: Some(GenericColumnWriter::new(descr, props, page_writer)),
on_close: Some(on_close),
data_type,
})
}

/// An [`ArrayWriter`] for [`ByteArray`]
struct ByteArrayWriter<'a> {
writer: Option<GenericColumnWriter<'a, ByteArrayEncoder>>,
/// A writer for byte array types
pub(super) struct ByteArrayWriter<'a> {
writer: GenericColumnWriter<'a, ByteArrayEncoder>,
on_close: Option<OnCloseColumnChunk<'a>>,
data_type: DataType,
}

impl<'a> ArrayWriter for ByteArrayWriter<'a> {
fn write(&mut self, array: &ArrayRef, levels: LevelInfo) -> Result<()> {
self.writer.as_mut().unwrap().write_batch_internal(
impl<'a> ByteArrayWriter<'a> {
/// Returns a new [`ByteArrayWriter`]
pub fn new(
descr: ColumnDescPtr,
props: &'a WriterPropertiesPtr,
page_writer: Box<dyn PageWriter + 'a>,
on_close: OnCloseColumnChunk<'a>,
) -> Result<Self> {
Ok(Self {
writer: GenericColumnWriter::new(descr, props.clone(), page_writer),
on_close: Some(on_close),
})
}

pub fn write(&mut self, array: &ArrayRef, levels: LevelInfo) -> Result<()> {
self.writer.write_batch_internal(
array,
Some(levels.non_null_indices()),
levels.def_levels(),
Expand All @@ -92,11 +126,11 @@ impl<'a> ArrayWriter for ByteArrayWriter<'a> {
Ok(())
}

fn close(&mut self) -> Result<()> {
pub fn close(self) -> Result<()> {
let (bytes_written, rows_written, metadata, column_index, offset_index) =
self.writer.take().unwrap().close()?;
self.writer.close()?;

if let Some(on_close) = self.on_close.take() {
if let Some(on_close) = self.on_close {
on_close(
bytes_written,
rows_written,
Expand Down
107 changes: 30 additions & 77 deletions parquet/src/arrow/arrow_writer/mod.rs
Expand Up @@ -33,70 +33,18 @@ use super::schema::{
decimal_length_from_precision,
};

use crate::column::writer::{get_column_writer, ColumnWriter, ColumnWriterImpl};
use crate::arrow::arrow_writer::byte_array::ByteArrayWriter;
use crate::column::writer::{ColumnWriter, ColumnWriterImpl};
use crate::errors::{ParquetError, Result};
use crate::file::metadata::RowGroupMetaDataPtr;
use crate::file::properties::WriterProperties;
use crate::file::writer::{SerializedColumnWriter, SerializedRowGroupWriter};
use crate::file::writer::SerializedRowGroupWriter;
use crate::{data_type::*, file::writer::SerializedFileWriter};
use levels::{calculate_array_levels, LevelInfo};

mod byte_array;
mod levels;

/// An object-safe API for writing an [`ArrayRef`]
trait ArrayWriter {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ended up implementing type erasure within the ByteArrayWriter, and so this indirection can be removed

fn write(&mut self, array: &ArrayRef, levels: LevelInfo) -> Result<()>;

fn close(&mut self) -> Result<()>;
}

/// Fallback implementation for writing an [`ArrayRef`] that uses [`SerializedColumnWriter`]
struct ColumnArrayWriter<'a>(Option<SerializedColumnWriter<'a>>);

impl<'a> ArrayWriter for ColumnArrayWriter<'a> {
fn write(&mut self, array: &ArrayRef, levels: LevelInfo) -> Result<()> {
write_leaf(self.0.as_mut().unwrap().untyped(), array, levels)?;
Ok(())
}

fn close(&mut self) -> Result<()> {
self.0.take().unwrap().close()
}
}

fn get_writer<'a, W: Write>(
row_group_writer: &'a mut SerializedRowGroupWriter<'_, W>,
data_type: &ArrowDataType,
) -> Result<Box<dyn ArrayWriter + 'a>> {
let array_writer = row_group_writer
.next_column_with_factory(
|descr, props, page_writer, on_close| match data_type {
ArrowDataType::Utf8
| ArrowDataType::LargeUtf8
| ArrowDataType::Binary
| ArrowDataType::LargeBinary => Ok(byte_array::make_byte_array_writer(
descr,
data_type.clone(),
props.clone(),
page_writer,
on_close,
)),
_ => {
let column_writer =
get_column_writer(descr, props.clone(), page_writer);

let serialized_writer =
SerializedColumnWriter::new(column_writer, Some(on_close));

Ok(Box::new(ColumnArrayWriter(Some(serialized_writer))))
}
},
)?
.expect("Unable to get column writer");
Ok(array_writer)
}

/// Arrow writer
///
/// Writes Arrow `RecordBatch`es to a Parquet writer, buffering up `RecordBatch` in order
Expand Down Expand Up @@ -314,22 +262,24 @@ fn write_leaves<W: Write>(
| ArrowDataType::Time64(_)
| ArrowDataType::Duration(_)
| ArrowDataType::Interval(_)
| ArrowDataType::LargeBinary
| ArrowDataType::Binary
| ArrowDataType::Utf8
| ArrowDataType::LargeUtf8
| ArrowDataType::Decimal128(_, _)
| ArrowDataType::Decimal256(_, _)
| ArrowDataType::FixedSizeBinary(_) => {
let mut writer = get_writer(row_group_writer, &data_type)?;
let mut col_writer = row_group_writer.next_column()?.unwrap();
for (array, levels) in arrays.iter().zip(levels.iter_mut()) {
writer.write(
array,
levels.pop().expect("Levels exhausted"),
)?;
write_leaf(col_writer.untyped(), array, levels.pop().expect("Levels exhausted"))?;
}
writer.close()?;
Ok(())
col_writer.close()
}
ArrowDataType::LargeBinary
| ArrowDataType::Binary
| ArrowDataType::Utf8
| ArrowDataType::LargeUtf8 => {
let mut col_writer = row_group_writer.next_column_with_factory(ByteArrayWriter::new)?.unwrap();
for (array, levels) in arrays.iter().zip(levels.iter_mut()) {
col_writer.write(array, levels.pop().expect("Levels exhausted"))?;
}
col_writer.close()
}
ArrowDataType::List(_) | ArrowDataType::LargeList(_) => {
let arrays: Vec<_> = arrays.iter().map(|array|{
Expand Down Expand Up @@ -380,18 +330,21 @@ fn write_leaves<W: Write>(
write_leaves(row_group_writer, &values, levels)?;
Ok(())
}
ArrowDataType::Dictionary(_, value_type) => {
let mut writer = get_writer(row_group_writer, value_type)?;
for (array, levels) in arrays.iter().zip(levels.iter_mut()) {
// cast dictionary to a primitive
let array = arrow::compute::cast(array, value_type)?;
writer.write(
&array,
levels.pop().expect("Levels exhausted"),
)?;
ArrowDataType::Dictionary(_, value_type) => match value_type.as_ref() {
ArrowDataType::Utf8 | ArrowDataType::LargeUtf8 | ArrowDataType::Binary | ArrowDataType::LargeBinary => {
let mut col_writer = row_group_writer.next_column_with_factory(ByteArrayWriter::new)?.unwrap();
for (array, levels) in arrays.iter().zip(levels.iter_mut()) {
col_writer.write(array, levels.pop().expect("Levels exhausted"))?;
}
col_writer.close()
}
_ => {
let mut col_writer = row_group_writer.next_column()?.unwrap();
for (array, levels) in arrays.iter().zip(levels.iter_mut()) {
write_leaf(col_writer.untyped(), array, levels.pop().expect("Levels exhausted"))?;
}
col_writer.close()
}
writer.close()?;
Ok(())
}
ArrowDataType::Float16 => Err(ParquetError::ArrowError(
"Float16 arrays not supported".to_string(),
Expand Down