From 0d8320c4ee180a6326b8ff378b41690552fdade2 Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Wed, 23 Nov 2022 06:41:14 +0100 Subject: [PATCH] Added `reserve` to pushable containers in parquet extend_from_decoder (#1301) --- .../parquet/read/deserialize/binary/utils.rs | 9 +++++ .../deserialize/fixed_size_binary/utils.rs | 4 ++ src/io/parquet/read/deserialize/utils.rs | 40 ++++++++++++++++--- 3 files changed, 47 insertions(+), 6 deletions(-) diff --git a/src/io/parquet/read/deserialize/binary/utils.rs b/src/io/parquet/read/deserialize/binary/utils.rs index 2d0b5f38788..a47cb967a55 100644 --- a/src/io/parquet/read/deserialize/binary/utils.rs +++ b/src/io/parquet/read/deserialize/binary/utils.rs @@ -25,6 +25,9 @@ impl Offsets { } impl Pushable for Offsets { + fn reserve(&mut self, additional: usize) { + self.0.reserve(additional) + } #[inline] fn len(&self) -> usize { self.0.len() - 1 @@ -88,6 +91,12 @@ impl Binary { } impl<'a, O: Offset> Pushable<&'a [u8]> for Binary { + #[inline] + fn reserve(&mut self, additional: usize) { + let avg_len = self.values.len() / std::cmp::max(self.last_offset.to_usize(), 1); + self.values.reserve(additional * avg_len); + self.offsets.reserve(additional); + } #[inline] fn len(&self) -> usize { self.len() diff --git a/src/io/parquet/read/deserialize/fixed_size_binary/utils.rs b/src/io/parquet/read/deserialize/fixed_size_binary/utils.rs index a4b7d047606..f718ce1bdc2 100644 --- a/src/io/parquet/read/deserialize/fixed_size_binary/utils.rs +++ b/src/io/parquet/read/deserialize/fixed_size_binary/utils.rs @@ -30,6 +30,10 @@ impl FixedSizeBinary { } impl<'a> Pushable<&'a [u8]> for FixedSizeBinary { + #[inline] + fn reserve(&mut self, additional: usize) { + self.values.reserve(additional * self.size); + } #[inline] fn push(&mut self, value: &[u8]) { debug_assert_eq!(value.len(), self.size); diff --git a/src/io/parquet/read/deserialize/utils.rs b/src/io/parquet/read/deserialize/utils.rs index 2c137119b43..71bce24c586 100644 --- a/src/io/parquet/read/deserialize/utils.rs +++ b/src/io/parquet/read/deserialize/utils.rs @@ -30,7 +30,7 @@ pub fn not_implemented(page: &DataPage) -> Error { /// A private trait representing structs that can receive elements. pub(super) trait Pushable: Sized { - //fn reserve(&mut self, additional: usize); + fn reserve(&mut self, additional: usize); fn push(&mut self, value: T); fn len(&self) -> usize; fn push_null(&mut self); @@ -38,6 +38,10 @@ pub(super) trait Pushable: Sized { } impl Pushable for MutableBitmap { + #[inline] + fn reserve(&mut self, additional: usize) { + MutableBitmap::reserve(self, additional) + } #[inline] fn len(&self) -> usize { self.len() @@ -60,6 +64,10 @@ impl Pushable for MutableBitmap { } impl Pushable for Vec { + #[inline] + fn reserve(&mut self, additional: usize) { + Vec::reserve(self, additional) + } #[inline] fn len(&self) -> usize { self.len() @@ -290,11 +298,33 @@ pub(super) fn extend_from_decoder<'a, T: Default, P: Pushable, I: Iterator 0 { let run = page_validity.next_limited(remaining); let run = if let Some(run) = run { run } else { break }; + match run { + FilteredHybridEncoded::Bitmap { length, .. } => { + reserve_pushable += length; + remaining -= length; + } + FilteredHybridEncoded::Repeated { length, .. } => { + reserve_pushable += length; + remaining -= length; + } + _ => {} + }; + runs.push(run) + } + pushable.reserve(reserve_pushable); + validity.reserve(reserve_pushable); + + // then a second loop to really fill the buffers + for run in runs { match run { FilteredHybridEncoded::Bitmap { values, @@ -313,18 +343,16 @@ pub(super) fn extend_from_decoder<'a, T: Default, P: Pushable, I: Iterator { validity.extend_constant(length, is_set); if is_set { - (0..length).for_each(|_| pushable.push(values_iter.next().unwrap())); + for v in (&mut values_iter).take(length) { + pushable.push(v) + } } else { pushable.extend_constant(length, T::default()); } - - remaining -= length; } FilteredHybridEncoded::Skipped(valids) => for _ in values_iter.by_ref().take(valids) {}, };