From b5fe680a4ec4c2d03e73b7f706815bfbd1cee266 Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Sun, 11 Dec 2022 16:39:38 +0100 Subject: [PATCH] fix/perf writing nested/sliced arrays to parquet --- src/io/parquet/write/binary/nested.rs | 11 ++- src/io/parquet/write/boolean/nested.rs | 11 ++- src/io/parquet/write/mod.rs | 118 +++++++++++++++++++++-- src/io/parquet/write/pages.rs | 16 +-- src/io/parquet/write/primitive/nested.rs | 11 ++- src/io/parquet/write/utf8/nested.rs | 11 ++- 6 files changed, 149 insertions(+), 29 deletions(-) diff --git a/src/io/parquet/write/binary/nested.rs b/src/io/parquet/write/binary/nested.rs index 950ea4190ca..ccb47775290 100644 --- a/src/io/parquet/write/binary/nested.rs +++ b/src/io/parquet/write/binary/nested.rs @@ -4,7 +4,7 @@ use parquet2::{encoding::Encoding, page::DataPage}; use super::super::{nested, utils, WriteOptions}; use super::basic::{build_statistics, encode_plain}; use crate::io::parquet::read::schema::is_nullable; -use crate::io::parquet::write::Nested; +use crate::io::parquet::write::{slice_nested_leaf, Nested}; use crate::{ array::{Array, BinaryArray}, error::Result, @@ -26,10 +26,15 @@ where let (repetition_levels_byte_length, definition_levels_byte_length) = nested::write_rep_and_def(options.version, nested, &mut buffer)?; - encode_plain(array, is_optional, &mut buffer); + // we slice the leaf by the offsets as dremel only computes lengths and thus + // does NOT take the starting offset into account. + // By slicing the leaf array we also don't write too many values. + let (start, len) = slice_nested_leaf(nested); + let array = array.slice(start, len); + encode_plain(&array, is_optional, &mut buffer); let statistics = if options.write_statistics { - Some(build_statistics(array, type_.clone())) + Some(build_statistics(&array, type_.clone())) } else { None }; diff --git a/src/io/parquet/write/boolean/nested.rs b/src/io/parquet/write/boolean/nested.rs index 9d9e49100f6..a65ebf8534b 100644 --- a/src/io/parquet/write/boolean/nested.rs +++ b/src/io/parquet/write/boolean/nested.rs @@ -4,7 +4,7 @@ use parquet2::{encoding::Encoding, page::DataPage}; use super::super::{nested, utils, WriteOptions}; use super::basic::{build_statistics, encode_plain}; use crate::io::parquet::read::schema::is_nullable; -use crate::io::parquet::write::Nested; +use crate::io::parquet::write::{slice_nested_leaf, Nested}; use crate::{ array::{Array, BooleanArray}, error::Result, @@ -22,10 +22,15 @@ pub fn array_to_page( let (repetition_levels_byte_length, definition_levels_byte_length) = nested::write_rep_and_def(options.version, nested, &mut buffer)?; - encode_plain(array, is_optional, &mut buffer)?; + // we slice the leaf by the offsets as dremel only computes lengths and thus + // does NOT take the starting offset into account. + // By slicing the leaf array we also don't write too many values. + let (start, len) = slice_nested_leaf(nested); + let array = array.slice(start, len); + encode_plain(&array, is_optional, &mut buffer)?; let statistics = if options.write_statistics { - Some(build_statistics(array)) + Some(build_statistics(&array)) } else { None }; diff --git a/src/io/parquet/write/mod.rs b/src/io/parquet/write/mod.rs index 84b4f1cab58..681886c374c 100644 --- a/src/io/parquet/write/mod.rs +++ b/src/io/parquet/write/mod.rs @@ -69,6 +69,30 @@ pub use sink::FileSink; pub use pages::array_to_columns; pub use pages::Nested; +/// returns offset and length to slice the leaf values +pub(self) fn slice_nested_leaf(nested: &[Nested]) -> (usize, usize) { + // find the deepest recursive dremel structure as that one determines how many values we must + // take + let mut out = (0, 0); + for nested in nested.iter().rev() { + match nested { + Nested::LargeList(l_nested) => { + let start = *l_nested.offsets.first().unwrap(); + let end = *l_nested.offsets.last().unwrap(); + return (start as usize, (end - start) as usize); + } + Nested::List(l_nested) => { + let start = *l_nested.offsets.first().unwrap(); + let end = *l_nested.offsets.last().unwrap(); + return (start as usize, (end - start) as usize); + } + Nested::Primitive(_, _, len) => out = (0, *len), + _ => {} + } + } + out +} + pub(self) fn decimal_length_from_precision(precision: usize) -> usize { // digits = floor(log_10(2^(8*n - 1) - 1)) // ceil(digits) = log10(2^(8*n - 1) - 1) @@ -130,6 +154,58 @@ pub fn can_encode(data_type: &DataType, encoding: Encoding) -> bool { ) } +fn slice_parquet_array<'a>( + array: &'a dyn Array, + nested: &'a [Nested<'a>], + offset: usize, + length: usize, +) -> (Box, Vec>) { + let mut nested = nested.to_vec(); + + let mut is_nested = false; + for nested in nested.iter_mut() { + match nested { + Nested::LargeList(l_nested) => { + is_nested = true; + // the slice is a bit awkward because we always want the latest value to compute the next length; + l_nested.offsets = &l_nested.offsets + [offset..offset + std::cmp::min(length + 1, l_nested.offsets.len())]; + } + Nested::List(l_nested) => { + is_nested = true; + l_nested.offsets = &l_nested.offsets + [offset..offset + std::cmp::min(length + 1, l_nested.offsets.len())]; + } + _ => {} + } + } + if is_nested { + (array.to_boxed(), nested) + } else { + (array.slice(offset, length), nested) + } +} + +fn get_max_length(array: &dyn Array, nested: &[Nested]) -> usize { + let mut sum = 0; + for nested in nested.iter() { + match nested { + Nested::LargeList(l_nested) => { + sum += l_nested.offsets.len() - 1; + } + Nested::List(l_nested) => { + sum += l_nested.offsets.len() - 1; + } + _ => {} + } + } + if sum > 0 { + sum + } else { + array.len() + } +} + /// Returns an iterator of [`Page`]. #[allow(clippy::needless_collect)] pub fn array_to_pages( @@ -147,13 +223,27 @@ pub fn array_to_pages( let array_byte_size = estimated_bytes_size(array); if array_byte_size >= (2u32.pow(31) - 2u32.pow(25)) as usize && array.len() > 3 { - let split_at = array.len() / 2; - let left = array.slice(0, split_at); - let right = array.slice(split_at, array.len() - split_at); + let length = get_max_length(array, nested); + let split_at = length / 2; + let (sub_array_left, subnested_left) = slice_parquet_array(array, nested, 0, split_at); + let (sub_array_right, subnested_right) = + slice_parquet_array(array, nested, split_at, length - split_at); Ok(DynIter::new( - array_to_pages(&*left, type_.clone(), nested, options, encoding)? - .chain(array_to_pages(&*right, type_, nested, options, encoding)?), + array_to_pages( + sub_array_left.as_ref(), + type_.clone(), + subnested_left.as_ref(), + options, + encoding, + )? + .chain(array_to_pages( + sub_array_right.as_ref(), + type_, + subnested_right.as_ref(), + options, + encoding, + )?), )) } else { match array.data_type() { @@ -175,17 +265,25 @@ pub fn array_to_pages( ((array_byte_size as f64) / ((array.len() + 1) as f64)) as usize; let rows_per_page = (page_size / (bytes_per_row + 1)).max(1); - let vs: Vec> = (0..array.len()) + let length = get_max_length(array, nested); + let vs: Vec> = (0..length) .step_by(rows_per_page) .map(|offset| { - let length = if offset + rows_per_page > array.len() { - array.len() - offset + let length = if offset + rows_per_page > length { + length - offset } else { rows_per_page }; - let sub_array = array.slice(offset, length); - array_to_page(sub_array.as_ref(), type_.clone(), nested, options, encoding) + let (sub_array, subnested) = + slice_parquet_array(array, nested, offset, length); + array_to_page( + sub_array.as_ref(), + type_.clone(), + &subnested, + options, + encoding, + ) }) .collect(); diff --git a/src/io/parquet/write/pages.rs b/src/io/parquet/write/pages.rs index e3e1eec410f..ba292ae82f1 100644 --- a/src/io/parquet/write/pages.rs +++ b/src/io/parquet/write/pages.rs @@ -1,5 +1,6 @@ use parquet2::schema::types::{ParquetType, PrimitiveType as ParquetPrimitiveType}; use parquet2::{page::Page, write::DynIter}; +use std::fmt::Debug; use crate::array::{ListArray, StructArray}; use crate::bitmap::Bitmap; @@ -34,6 +35,7 @@ impl<'a, O: Offset> ListNested<'a, O> { #[derive(Debug, Clone, PartialEq)] pub enum Nested<'a> { /// a primitive (leaf or parquet column) + /// bitmap, _, length Primitive(Option<&'a Bitmap>, bool, usize), /// a list List(ListNested<'a, i32>), @@ -147,13 +149,13 @@ fn to_nested_recursive<'a>( Ok(()) } -fn to_leafs(array: &dyn Array) -> Vec<&dyn Array> { +fn to_leaves(array: &dyn Array) -> Vec<&dyn Array> { let mut leafs = vec![]; - to_leafs_recursive(array, &mut leafs); + to_leaves_recursive(array, &mut leafs); leafs } -fn to_leafs_recursive<'a>(array: &'a dyn Array, leafs: &mut Vec<&'a dyn Array>) { +fn to_leaves_recursive<'a>(array: &'a dyn Array, leafs: &mut Vec<&'a dyn Array>) { use PhysicalType::*; match array.data_type().to_physical_type() { Struct => { @@ -161,15 +163,15 @@ fn to_leafs_recursive<'a>(array: &'a dyn Array, leafs: &mut Vec<&'a dyn Array>) array .values() .iter() - .for_each(|a| to_leafs_recursive(a.as_ref(), leafs)); + .for_each(|a| to_leaves_recursive(a.as_ref(), leafs)); } List => { let array = array.as_any().downcast_ref::>().unwrap(); - to_leafs_recursive(array.values().as_ref(), leafs); + to_leaves_recursive(array.values().as_ref(), leafs); } LargeList => { let array = array.as_any().downcast_ref::>().unwrap(); - to_leafs_recursive(array.values().as_ref(), leafs); + to_leaves_recursive(array.values().as_ref(), leafs); } Null | Boolean | Primitive(_) | Binary | FixedSizeBinary | LargeBinary | Utf8 | LargeUtf8 | Dictionary(_) => leafs.push(array), @@ -206,7 +208,7 @@ pub fn array_to_columns + Send + Sync>( let types = to_parquet_leafs(type_); - let values = to_leafs(array); + let values = to_leaves(array); assert_eq!(encoding.len(), types.len()); diff --git a/src/io/parquet/write/primitive/nested.rs b/src/io/parquet/write/primitive/nested.rs index e2909b02ceb..74248851787 100644 --- a/src/io/parquet/write/primitive/nested.rs +++ b/src/io/parquet/write/primitive/nested.rs @@ -7,7 +7,7 @@ use super::super::utils; use super::super::WriteOptions; use super::basic::{build_statistics, encode_plain}; use crate::io::parquet::read::schema::is_nullable; -use crate::io::parquet::write::Nested; +use crate::io::parquet::write::{slice_nested_leaf, Nested}; use crate::{ array::{Array, PrimitiveArray}, error::Result, @@ -31,11 +31,16 @@ where let (repetition_levels_byte_length, definition_levels_byte_length) = nested::write_rep_and_def(options.version, nested, &mut buffer)?; - let buffer = encode_plain(array, is_optional, buffer); + // we slice the leaf by the offsets as dremel only computes lengths and thus + // does NOT take the starting offset into account. + // By slicing the leaf array we also don't write too many values. + let (start, len) = slice_nested_leaf(nested); + let array = array.slice(start, len); + let buffer = encode_plain(&array, is_optional, buffer); let statistics = if options.write_statistics { Some(serialize_statistics(&build_statistics( - array, + &array, type_.clone(), ))) } else { diff --git a/src/io/parquet/write/utf8/nested.rs b/src/io/parquet/write/utf8/nested.rs index 2792ef35712..244dd148691 100644 --- a/src/io/parquet/write/utf8/nested.rs +++ b/src/io/parquet/write/utf8/nested.rs @@ -4,7 +4,7 @@ use parquet2::{encoding::Encoding, page::DataPage}; use super::super::{nested, utils, WriteOptions}; use super::basic::{build_statistics, encode_plain}; use crate::io::parquet::read::schema::is_nullable; -use crate::io::parquet::write::Nested; +use crate::io::parquet::write::{slice_nested_leaf, Nested}; use crate::{ array::{Array, Utf8Array}, error::Result, @@ -26,10 +26,15 @@ where let (repetition_levels_byte_length, definition_levels_byte_length) = nested::write_rep_and_def(options.version, nested, &mut buffer)?; - encode_plain(array, is_optional, &mut buffer); + // we slice the leaf by the offsets as dremel only computes lengths and thus + // does NOT take the starting offset into account. + // By slicing the leaf array we also don't write too many values. + let (start, len) = slice_nested_leaf(nested); + let array = array.slice(start, len); + encode_plain(&array, is_optional, &mut buffer); let statistics = if options.write_statistics { - Some(build_statistics(array, type_.clone())) + Some(build_statistics(&array, type_.clone())) } else { None };