Skip to content

Commit

Permalink
Don't hydrate string dictionaries when writing to parquet (apache#1764)
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed Aug 5, 2022
1 parent 4a3919b commit c1b8d26
Show file tree
Hide file tree
Showing 5 changed files with 146 additions and 107 deletions.
12 changes: 11 additions & 1 deletion arrow/src/array/array_dictionary.rs
Expand Up @@ -421,14 +421,24 @@ impl<T: ArrowPrimitiveType> fmt::Debug for DictionaryArray<T> {
/// assert_eq!(maybe_val.unwrap(), orig)
/// }
/// ```
#[derive(Copy, Clone)]
pub struct TypedDictionaryArray<'a, K: ArrowPrimitiveType, V> {
/// The dictionary array
dictionary: &'a DictionaryArray<K>,
/// The values of the dictionary
values: &'a V,
}

impl<'a, K: ArrowPrimitiveType, V> Clone for TypedDictionaryArray<'a, K, V> {
fn clone(&self) -> Self {
Self {
dictionary: self.dictionary,
values: self.values,
}
}
}

impl<'a, K: ArrowPrimitiveType, V> Copy for TypedDictionaryArray<'a, K, V> {}

impl<'a, K: ArrowPrimitiveType, V> fmt::Debug for TypedDictionaryArray<'a, K, V> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
writeln!(f, "TypedDictionaryArray({:?})", self.dictionary)
Expand Down
11 changes: 11 additions & 0 deletions arrow/src/util/data_gen.rs
Expand Up @@ -143,6 +143,17 @@ pub fn create_random_array(
})
.collect::<Result<Vec<(&str, ArrayRef)>>>()?,
)?),
d @ Dictionary(_, value_type)
if crate::compute::can_cast_types(value_type, d) =>
{
let f = Field::new(
field.name(),
value_type.as_ref().clone(),
field.is_nullable(),
);
let v = create_random_array(&f, size, null_density, true_density)?;
crate::compute::cast(&v, d)?
}
other => {
return Err(ArrowError::NotYetImplemented(format!(
"Generating random arrays not yet implemented for {:?}",
Expand Down
31 changes: 31 additions & 0 deletions parquet/benches/arrow_writer.rs
Expand Up @@ -92,6 +92,25 @@ fn create_string_bench_batch(
)?)
}

fn create_string_dictionary_bench_batch(
size: usize,
null_density: f32,
true_density: f32,
) -> Result<RecordBatch> {
let fields = vec![Field::new(
"_1",
DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
true,
)];
let schema = Schema::new(fields);
Ok(create_random_batch(
Arc::new(schema),
size,
null_density,
true_density,
)?)
}

fn create_string_bench_batch_non_null(
size: usize,
null_density: f32,
Expand Down Expand Up @@ -346,6 +365,18 @@ fn bench_primitive_writer(c: &mut Criterion) {
b.iter(|| write_batch(&batch).unwrap())
});

let batch = create_string_dictionary_bench_batch(4096, 0.25, 0.75).unwrap();
group.throughput(Throughput::Bytes(
batch
.columns()
.iter()
.map(|f| f.get_array_memory_size() as u64)
.sum(),
));
group.bench_function("4096 values string dictionary", |b| {
b.iter(|| write_batch(&batch).unwrap())
});

let batch = create_string_bench_batch_non_null(4096, 0.25, 0.75).unwrap();
group.throughput(Throughput::Bytes(
batch
Expand Down
92 changes: 63 additions & 29 deletions parquet/src/arrow/arrow_writer/byte_array.rs
Expand Up @@ -16,7 +16,6 @@
// under the License.

use crate::arrow::arrow_writer::levels::LevelInfo;
use crate::arrow::arrow_writer::ArrayWriter;
use crate::basic::Encoding;
use crate::column::page::PageWriter;
use crate::column::writer::encoder::{
Expand All @@ -33,11 +32,38 @@ use crate::schema::types::ColumnDescPtr;
use crate::util::bit_util::num_required_bits;
use crate::util::interner::{Interner, Storage};
use arrow::array::{
Array, ArrayAccessor, ArrayRef, BinaryArray, LargeBinaryArray, LargeStringArray,
StringArray,
Array, ArrayAccessor, ArrayRef, BinaryArray, DictionaryArray, LargeBinaryArray,
LargeStringArray, StringArray,
};
use arrow::datatypes::DataType;

macro_rules! downcast_dict_impl {
($array:ident, $key:ident, $val:ident, $op:expr $(, $arg:expr)*) => {{
$op($array
.as_any()
.downcast_ref::<DictionaryArray<arrow::datatypes::$key>>()
.unwrap()
.downcast_dict::<$val>()
.unwrap()$(, $arg)*)
}};
}

macro_rules! downcast_dict_op {
($key_type:expr, $val:ident, $array:ident, $op:expr $(, $arg:expr)*) => {
match $key_type.as_ref() {
DataType::UInt8 => downcast_dict_impl!($array, UInt8Type, $val, $op$(, $arg)*),
DataType::UInt16 => downcast_dict_impl!($array, UInt16Type, $val, $op$(, $arg)*),
DataType::UInt32 => downcast_dict_impl!($array, UInt32Type, $val, $op$(, $arg)*),
DataType::UInt64 => downcast_dict_impl!($array, UInt64Type, $val, $op$(, $arg)*),
DataType::Int8 => downcast_dict_impl!($array, Int8Type, $val, $op$(, $arg)*),
DataType::Int16 => downcast_dict_impl!($array, Int16Type, $val, $op$(, $arg)*),
DataType::Int32 => downcast_dict_impl!($array, Int32Type, $val, $op$(, $arg)*),
DataType::Int64 => downcast_dict_impl!($array, Int64Type, $val, $op$(, $arg)*),
_ => unreachable!(),
}
};
}

macro_rules! downcast_op {
($data_type:expr, $array:ident, $op:expr $(, $arg:expr)*) => {
match $data_type {
Expand All @@ -51,36 +77,44 @@ macro_rules! downcast_op {
DataType::LargeBinary => {
$op($array.as_any().downcast_ref::<LargeBinaryArray>().unwrap()$(, $arg)*)
}
d => unreachable!("cannot downcast {} to byte array", d)
DataType::Dictionary(key, value) => match value.as_ref() {
DataType::Utf8 => downcast_dict_op!(key, StringArray, $array, $op$(, $arg)*),
DataType::LargeUtf8 => {
downcast_dict_op!(key, LargeStringArray, $array, $op$(, $arg)*)
}
DataType::Binary => downcast_dict_op!(key, BinaryArray, $array, $op$(, $arg)*),
DataType::LargeBinary => {
downcast_dict_op!(key, LargeBinaryArray, $array, $op$(, $arg)*)
}
d => unreachable!("cannot downcast {} dictionary value to byte array", d),
},
d => unreachable!("cannot downcast {} to byte array", d),
}
};
}

/// Returns an [`ArrayWriter`] for byte or string arrays
pub(super) fn make_byte_array_writer<'a>(
descr: ColumnDescPtr,
data_type: DataType,
props: WriterPropertiesPtr,
page_writer: Box<dyn PageWriter + 'a>,
on_close: OnCloseColumnChunk<'a>,
) -> Box<dyn ArrayWriter + 'a> {
Box::new(ByteArrayWriter {
writer: Some(GenericColumnWriter::new(descr, props, page_writer)),
on_close: Some(on_close),
data_type,
})
}

/// An [`ArrayWriter`] for [`ByteArray`]
struct ByteArrayWriter<'a> {
writer: Option<GenericColumnWriter<'a, ByteArrayEncoder>>,
/// A writer for byte array types
pub(super) struct ByteArrayWriter<'a> {
writer: GenericColumnWriter<'a, ByteArrayEncoder>,
on_close: Option<OnCloseColumnChunk<'a>>,
data_type: DataType,
}

impl<'a> ArrayWriter for ByteArrayWriter<'a> {
fn write(&mut self, array: &ArrayRef, levels: LevelInfo) -> Result<()> {
self.writer.as_mut().unwrap().write_batch_internal(
impl<'a> ByteArrayWriter<'a> {
/// Returns a new [`ByteArrayWriter`]
pub fn new(
descr: ColumnDescPtr,
props: &'a WriterPropertiesPtr,
page_writer: Box<dyn PageWriter + 'a>,
on_close: OnCloseColumnChunk<'a>,
) -> Result<Self> {
Ok(Self {
writer: GenericColumnWriter::new(descr, props.clone(), page_writer),
on_close: Some(on_close),
})
}

pub fn write(&mut self, array: &ArrayRef, levels: LevelInfo) -> Result<()> {
self.writer.write_batch_internal(
array,
Some(levels.non_null_indices()),
levels.def_levels(),
Expand All @@ -92,11 +126,11 @@ impl<'a> ArrayWriter for ByteArrayWriter<'a> {
Ok(())
}

fn close(&mut self) -> Result<()> {
pub fn close(self) -> Result<()> {
let (bytes_written, rows_written, metadata, column_index, offset_index) =
self.writer.take().unwrap().close()?;
self.writer.close()?;

if let Some(on_close) = self.on_close.take() {
if let Some(on_close) = self.on_close {
on_close(
bytes_written,
rows_written,
Expand Down
107 changes: 30 additions & 77 deletions parquet/src/arrow/arrow_writer/mod.rs
Expand Up @@ -33,70 +33,18 @@ use super::schema::{
decimal_length_from_precision,
};

use crate::column::writer::{get_column_writer, ColumnWriter, ColumnWriterImpl};
use crate::arrow::arrow_writer::byte_array::ByteArrayWriter;
use crate::column::writer::{ColumnWriter, ColumnWriterImpl};
use crate::errors::{ParquetError, Result};
use crate::file::metadata::RowGroupMetaDataPtr;
use crate::file::properties::WriterProperties;
use crate::file::writer::{SerializedColumnWriter, SerializedRowGroupWriter};
use crate::file::writer::SerializedRowGroupWriter;
use crate::{data_type::*, file::writer::SerializedFileWriter};
use levels::{calculate_array_levels, LevelInfo};

mod byte_array;
mod levels;

/// An object-safe API for writing an [`ArrayRef`]
trait ArrayWriter {
fn write(&mut self, array: &ArrayRef, levels: LevelInfo) -> Result<()>;

fn close(&mut self) -> Result<()>;
}

/// Fallback implementation for writing an [`ArrayRef`] that uses [`SerializedColumnWriter`]
struct ColumnArrayWriter<'a>(Option<SerializedColumnWriter<'a>>);

impl<'a> ArrayWriter for ColumnArrayWriter<'a> {
fn write(&mut self, array: &ArrayRef, levels: LevelInfo) -> Result<()> {
write_leaf(self.0.as_mut().unwrap().untyped(), array, levels)?;
Ok(())
}

fn close(&mut self) -> Result<()> {
self.0.take().unwrap().close()
}
}

fn get_writer<'a, W: Write>(
row_group_writer: &'a mut SerializedRowGroupWriter<'_, W>,
data_type: &ArrowDataType,
) -> Result<Box<dyn ArrayWriter + 'a>> {
let array_writer = row_group_writer
.next_column_with_factory(
|descr, props, page_writer, on_close| match data_type {
ArrowDataType::Utf8
| ArrowDataType::LargeUtf8
| ArrowDataType::Binary
| ArrowDataType::LargeBinary => Ok(byte_array::make_byte_array_writer(
descr,
data_type.clone(),
props.clone(),
page_writer,
on_close,
)),
_ => {
let column_writer =
get_column_writer(descr, props.clone(), page_writer);

let serialized_writer =
SerializedColumnWriter::new(column_writer, Some(on_close));

Ok(Box::new(ColumnArrayWriter(Some(serialized_writer))))
}
},
)?
.expect("Unable to get column writer");
Ok(array_writer)
}

/// Arrow writer
///
/// Writes Arrow `RecordBatch`es to a Parquet writer, buffering up `RecordBatch` in order
Expand Down Expand Up @@ -314,22 +262,24 @@ fn write_leaves<W: Write>(
| ArrowDataType::Time64(_)
| ArrowDataType::Duration(_)
| ArrowDataType::Interval(_)
| ArrowDataType::LargeBinary
| ArrowDataType::Binary
| ArrowDataType::Utf8
| ArrowDataType::LargeUtf8
| ArrowDataType::Decimal128(_, _)
| ArrowDataType::Decimal256(_, _)
| ArrowDataType::FixedSizeBinary(_) => {
let mut writer = get_writer(row_group_writer, &data_type)?;
let mut col_writer = row_group_writer.next_column()?.unwrap();
for (array, levels) in arrays.iter().zip(levels.iter_mut()) {
writer.write(
array,
levels.pop().expect("Levels exhausted"),
)?;
write_leaf(col_writer.untyped(), array, levels.pop().expect("Levels exhausted"))?;
}
writer.close()?;
Ok(())
col_writer.close()
}
ArrowDataType::LargeBinary
| ArrowDataType::Binary
| ArrowDataType::Utf8
| ArrowDataType::LargeUtf8 => {
let mut col_writer = row_group_writer.next_column_with_factory(ByteArrayWriter::new)?.unwrap();
for (array, levels) in arrays.iter().zip(levels.iter_mut()) {
col_writer.write(array, levels.pop().expect("Levels exhausted"))?;
}
col_writer.close()
}
ArrowDataType::List(_) | ArrowDataType::LargeList(_) => {
let arrays: Vec<_> = arrays.iter().map(|array|{
Expand Down Expand Up @@ -380,18 +330,21 @@ fn write_leaves<W: Write>(
write_leaves(row_group_writer, &values, levels)?;
Ok(())
}
ArrowDataType::Dictionary(_, value_type) => {
let mut writer = get_writer(row_group_writer, value_type)?;
for (array, levels) in arrays.iter().zip(levels.iter_mut()) {
// cast dictionary to a primitive
let array = arrow::compute::cast(array, value_type)?;
writer.write(
&array,
levels.pop().expect("Levels exhausted"),
)?;
ArrowDataType::Dictionary(_, value_type) => match value_type.as_ref() {
ArrowDataType::Utf8 | ArrowDataType::LargeUtf8 | ArrowDataType::Binary | ArrowDataType::LargeBinary => {
let mut col_writer = row_group_writer.next_column_with_factory(ByteArrayWriter::new)?.unwrap();
for (array, levels) in arrays.iter().zip(levels.iter_mut()) {
col_writer.write(array, levels.pop().expect("Levels exhausted"))?;
}
col_writer.close()
}
_ => {
let mut col_writer = row_group_writer.next_column()?.unwrap();
for (array, levels) in arrays.iter().zip(levels.iter_mut()) {
write_leaf(col_writer.untyped(), array, levels.pop().expect("Levels exhausted"))?;
}
col_writer.close()
}
writer.close()?;
Ok(())
}
ArrowDataType::Float16 => Err(ParquetError::ArrowError(
"Float16 arrays not supported".to_string(),
Expand Down

0 comments on commit c1b8d26

Please sign in to comment.