Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
fix/perf writing nested/sliced arrays to parquet
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Dec 11, 2022
1 parent 1417f88 commit b5fe680
Show file tree
Hide file tree
Showing 6 changed files with 149 additions and 29 deletions.
11 changes: 8 additions & 3 deletions src/io/parquet/write/binary/nested.rs
Expand Up @@ -4,7 +4,7 @@ use parquet2::{encoding::Encoding, page::DataPage};
use super::super::{nested, utils, WriteOptions};
use super::basic::{build_statistics, encode_plain};
use crate::io::parquet::read::schema::is_nullable;
use crate::io::parquet::write::Nested;
use crate::io::parquet::write::{slice_nested_leaf, Nested};
use crate::{
array::{Array, BinaryArray},
error::Result,
Expand All @@ -26,10 +26,15 @@ where
let (repetition_levels_byte_length, definition_levels_byte_length) =
nested::write_rep_and_def(options.version, nested, &mut buffer)?;

encode_plain(array, is_optional, &mut buffer);
// we slice the leaf by the offsets as dremel only computes lengths and thus
// does NOT take the starting offset into account.
// By slicing the leaf array we also don't write too many values.
let (start, len) = slice_nested_leaf(nested);
let array = array.slice(start, len);
encode_plain(&array, is_optional, &mut buffer);

let statistics = if options.write_statistics {
Some(build_statistics(array, type_.clone()))
Some(build_statistics(&array, type_.clone()))
} else {
None
};
Expand Down
11 changes: 8 additions & 3 deletions src/io/parquet/write/boolean/nested.rs
Expand Up @@ -4,7 +4,7 @@ use parquet2::{encoding::Encoding, page::DataPage};
use super::super::{nested, utils, WriteOptions};
use super::basic::{build_statistics, encode_plain};
use crate::io::parquet::read::schema::is_nullable;
use crate::io::parquet::write::Nested;
use crate::io::parquet::write::{slice_nested_leaf, Nested};
use crate::{
array::{Array, BooleanArray},
error::Result,
Expand All @@ -22,10 +22,15 @@ pub fn array_to_page(
let (repetition_levels_byte_length, definition_levels_byte_length) =
nested::write_rep_and_def(options.version, nested, &mut buffer)?;

encode_plain(array, is_optional, &mut buffer)?;
// we slice the leaf by the offsets as dremel only computes lengths and thus
// does NOT take the starting offset into account.
// By slicing the leaf array we also don't write too many values.
let (start, len) = slice_nested_leaf(nested);
let array = array.slice(start, len);
encode_plain(&array, is_optional, &mut buffer)?;

let statistics = if options.write_statistics {
Some(build_statistics(array))
Some(build_statistics(&array))
} else {
None
};
Expand Down
118 changes: 108 additions & 10 deletions src/io/parquet/write/mod.rs
Expand Up @@ -69,6 +69,30 @@ pub use sink::FileSink;
pub use pages::array_to_columns;
pub use pages::Nested;

/// returns offset and length to slice the leaf values
pub(self) fn slice_nested_leaf(nested: &[Nested]) -> (usize, usize) {
// find the deepest recursive dremel structure as that one determines how many values we must
// take
let mut out = (0, 0);
for nested in nested.iter().rev() {
match nested {
Nested::LargeList(l_nested) => {
let start = *l_nested.offsets.first().unwrap();
let end = *l_nested.offsets.last().unwrap();
return (start as usize, (end - start) as usize);
}
Nested::List(l_nested) => {
let start = *l_nested.offsets.first().unwrap();
let end = *l_nested.offsets.last().unwrap();
return (start as usize, (end - start) as usize);
}
Nested::Primitive(_, _, len) => out = (0, *len),
_ => {}
}
}
out
}

pub(self) fn decimal_length_from_precision(precision: usize) -> usize {
// digits = floor(log_10(2^(8*n - 1) - 1))
// ceil(digits) = log10(2^(8*n - 1) - 1)
Expand Down Expand Up @@ -130,6 +154,58 @@ pub fn can_encode(data_type: &DataType, encoding: Encoding) -> bool {
)
}

fn slice_parquet_array<'a>(
array: &'a dyn Array,
nested: &'a [Nested<'a>],
offset: usize,
length: usize,
) -> (Box<dyn Array>, Vec<Nested<'a>>) {
let mut nested = nested.to_vec();

let mut is_nested = false;
for nested in nested.iter_mut() {
match nested {
Nested::LargeList(l_nested) => {
is_nested = true;
// the slice is a bit awkward because we always want the latest value to compute the next length;
l_nested.offsets = &l_nested.offsets
[offset..offset + std::cmp::min(length + 1, l_nested.offsets.len())];
}
Nested::List(l_nested) => {
is_nested = true;
l_nested.offsets = &l_nested.offsets
[offset..offset + std::cmp::min(length + 1, l_nested.offsets.len())];
}
_ => {}
}
}
if is_nested {
(array.to_boxed(), nested)
} else {
(array.slice(offset, length), nested)
}
}

fn get_max_length(array: &dyn Array, nested: &[Nested]) -> usize {
let mut sum = 0;
for nested in nested.iter() {
match nested {
Nested::LargeList(l_nested) => {
sum += l_nested.offsets.len() - 1;
}
Nested::List(l_nested) => {
sum += l_nested.offsets.len() - 1;
}
_ => {}
}
}
if sum > 0 {
sum
} else {
array.len()
}
}

/// Returns an iterator of [`Page`].
#[allow(clippy::needless_collect)]
pub fn array_to_pages(
Expand All @@ -147,13 +223,27 @@ pub fn array_to_pages(

let array_byte_size = estimated_bytes_size(array);
if array_byte_size >= (2u32.pow(31) - 2u32.pow(25)) as usize && array.len() > 3 {
let split_at = array.len() / 2;
let left = array.slice(0, split_at);
let right = array.slice(split_at, array.len() - split_at);
let length = get_max_length(array, nested);
let split_at = length / 2;
let (sub_array_left, subnested_left) = slice_parquet_array(array, nested, 0, split_at);
let (sub_array_right, subnested_right) =
slice_parquet_array(array, nested, split_at, length - split_at);

Ok(DynIter::new(
array_to_pages(&*left, type_.clone(), nested, options, encoding)?
.chain(array_to_pages(&*right, type_, nested, options, encoding)?),
array_to_pages(
sub_array_left.as_ref(),
type_.clone(),
subnested_left.as_ref(),
options,
encoding,
)?
.chain(array_to_pages(
sub_array_right.as_ref(),
type_,
subnested_right.as_ref(),
options,
encoding,
)?),
))
} else {
match array.data_type() {
Expand All @@ -175,17 +265,25 @@ pub fn array_to_pages(
((array_byte_size as f64) / ((array.len() + 1) as f64)) as usize;
let rows_per_page = (page_size / (bytes_per_row + 1)).max(1);

let vs: Vec<Result<Page>> = (0..array.len())
let length = get_max_length(array, nested);
let vs: Vec<Result<Page>> = (0..length)
.step_by(rows_per_page)
.map(|offset| {
let length = if offset + rows_per_page > array.len() {
array.len() - offset
let length = if offset + rows_per_page > length {
length - offset
} else {
rows_per_page
};

let sub_array = array.slice(offset, length);
array_to_page(sub_array.as_ref(), type_.clone(), nested, options, encoding)
let (sub_array, subnested) =
slice_parquet_array(array, nested, offset, length);
array_to_page(
sub_array.as_ref(),
type_.clone(),
&subnested,
options,
encoding,
)
})
.collect();

Expand Down
16 changes: 9 additions & 7 deletions src/io/parquet/write/pages.rs
@@ -1,5 +1,6 @@
use parquet2::schema::types::{ParquetType, PrimitiveType as ParquetPrimitiveType};
use parquet2::{page::Page, write::DynIter};
use std::fmt::Debug;

use crate::array::{ListArray, StructArray};
use crate::bitmap::Bitmap;
Expand Down Expand Up @@ -34,6 +35,7 @@ impl<'a, O: Offset> ListNested<'a, O> {
#[derive(Debug, Clone, PartialEq)]
pub enum Nested<'a> {
/// a primitive (leaf or parquet column)
/// bitmap, _, length
Primitive(Option<&'a Bitmap>, bool, usize),
/// a list
List(ListNested<'a, i32>),
Expand Down Expand Up @@ -147,29 +149,29 @@ fn to_nested_recursive<'a>(
Ok(())
}

fn to_leafs(array: &dyn Array) -> Vec<&dyn Array> {
fn to_leaves(array: &dyn Array) -> Vec<&dyn Array> {
let mut leafs = vec![];
to_leafs_recursive(array, &mut leafs);
to_leaves_recursive(array, &mut leafs);
leafs
}

fn to_leafs_recursive<'a>(array: &'a dyn Array, leafs: &mut Vec<&'a dyn Array>) {
fn to_leaves_recursive<'a>(array: &'a dyn Array, leafs: &mut Vec<&'a dyn Array>) {
use PhysicalType::*;
match array.data_type().to_physical_type() {
Struct => {
let array = array.as_any().downcast_ref::<StructArray>().unwrap();
array
.values()
.iter()
.for_each(|a| to_leafs_recursive(a.as_ref(), leafs));
.for_each(|a| to_leaves_recursive(a.as_ref(), leafs));
}
List => {
let array = array.as_any().downcast_ref::<ListArray<i32>>().unwrap();
to_leafs_recursive(array.values().as_ref(), leafs);
to_leaves_recursive(array.values().as_ref(), leafs);
}
LargeList => {
let array = array.as_any().downcast_ref::<ListArray<i64>>().unwrap();
to_leafs_recursive(array.values().as_ref(), leafs);
to_leaves_recursive(array.values().as_ref(), leafs);
}
Null | Boolean | Primitive(_) | Binary | FixedSizeBinary | LargeBinary | Utf8
| LargeUtf8 | Dictionary(_) => leafs.push(array),
Expand Down Expand Up @@ -206,7 +208,7 @@ pub fn array_to_columns<A: AsRef<dyn Array> + Send + Sync>(

let types = to_parquet_leafs(type_);

let values = to_leafs(array);
let values = to_leaves(array);

assert_eq!(encoding.len(), types.len());

Expand Down
11 changes: 8 additions & 3 deletions src/io/parquet/write/primitive/nested.rs
Expand Up @@ -7,7 +7,7 @@ use super::super::utils;
use super::super::WriteOptions;
use super::basic::{build_statistics, encode_plain};
use crate::io::parquet::read::schema::is_nullable;
use crate::io::parquet::write::Nested;
use crate::io::parquet::write::{slice_nested_leaf, Nested};
use crate::{
array::{Array, PrimitiveArray},
error::Result,
Expand All @@ -31,11 +31,16 @@ where
let (repetition_levels_byte_length, definition_levels_byte_length) =
nested::write_rep_and_def(options.version, nested, &mut buffer)?;

let buffer = encode_plain(array, is_optional, buffer);
// we slice the leaf by the offsets as dremel only computes lengths and thus
// does NOT take the starting offset into account.
// By slicing the leaf array we also don't write too many values.
let (start, len) = slice_nested_leaf(nested);
let array = array.slice(start, len);
let buffer = encode_plain(&array, is_optional, buffer);

let statistics = if options.write_statistics {
Some(serialize_statistics(&build_statistics(
array,
&array,
type_.clone(),
)))
} else {
Expand Down
11 changes: 8 additions & 3 deletions src/io/parquet/write/utf8/nested.rs
Expand Up @@ -4,7 +4,7 @@ use parquet2::{encoding::Encoding, page::DataPage};
use super::super::{nested, utils, WriteOptions};
use super::basic::{build_statistics, encode_plain};
use crate::io::parquet::read::schema::is_nullable;
use crate::io::parquet::write::Nested;
use crate::io::parquet::write::{slice_nested_leaf, Nested};
use crate::{
array::{Array, Utf8Array},
error::Result,
Expand All @@ -26,10 +26,15 @@ where
let (repetition_levels_byte_length, definition_levels_byte_length) =
nested::write_rep_and_def(options.version, nested, &mut buffer)?;

encode_plain(array, is_optional, &mut buffer);
// we slice the leaf by the offsets as dremel only computes lengths and thus
// does NOT take the starting offset into account.
// By slicing the leaf array we also don't write too many values.
let (start, len) = slice_nested_leaf(nested);
let array = array.slice(start, len);
encode_plain(&array, is_optional, &mut buffer);

let statistics = if options.write_statistics {
Some(build_statistics(array, type_.clone()))
Some(build_statistics(&array, type_.clone()))
} else {
None
};
Expand Down

0 comments on commit b5fe680

Please sign in to comment.