From 59f6495cc5c1265bb3fb1de3ad27abaf69c62802 Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Thu, 17 Nov 2022 16:29:36 +0100 Subject: [PATCH 1/2] reserve pushable containers in parquet extend_from_decoder --- src/io/parquet/read/deserialize/binary/utils.rs | 8 ++++++++ .../read/deserialize/fixed_size_binary/utils.rs | 3 +++ src/io/parquet/read/deserialize/utils.rs | 10 +++++++++- 3 files changed, 20 insertions(+), 1 deletion(-) diff --git a/src/io/parquet/read/deserialize/binary/utils.rs b/src/io/parquet/read/deserialize/binary/utils.rs index 2d0b5f38788..1f30d045a37 100644 --- a/src/io/parquet/read/deserialize/binary/utils.rs +++ b/src/io/parquet/read/deserialize/binary/utils.rs @@ -25,6 +25,9 @@ impl Offsets { } impl Pushable for Offsets { + fn reserve(&mut self, additional: usize) { + self.0.reserve(additional) + } #[inline] fn len(&self) -> usize { self.0.len() - 1 @@ -88,6 +91,11 @@ impl Binary { } impl<'a, O: Offset> Pushable<&'a [u8]> for Binary { + fn reserve(&mut self, additional: usize) { + let avg_len = self.values.len() / std::cmp::max(self.last_offset.to_usize(), 1); + self.values.reserve(additional * avg_len); + self.offsets.reserve(additional); + } #[inline] fn len(&self) -> usize { self.len() diff --git a/src/io/parquet/read/deserialize/fixed_size_binary/utils.rs b/src/io/parquet/read/deserialize/fixed_size_binary/utils.rs index a4b7d047606..88fd06ee641 100644 --- a/src/io/parquet/read/deserialize/fixed_size_binary/utils.rs +++ b/src/io/parquet/read/deserialize/fixed_size_binary/utils.rs @@ -30,6 +30,9 @@ impl FixedSizeBinary { } impl<'a> Pushable<&'a [u8]> for FixedSizeBinary { + fn reserve(&mut self, additional: usize) { + self.values.reserve(additional * self.size); + } #[inline] fn push(&mut self, value: &[u8]) { debug_assert_eq!(value.len(), self.size); diff --git a/src/io/parquet/read/deserialize/utils.rs b/src/io/parquet/read/deserialize/utils.rs index 2c137119b43..d3b1e8c351f 100644 --- a/src/io/parquet/read/deserialize/utils.rs +++ b/src/io/parquet/read/deserialize/utils.rs @@ -30,7 +30,7 @@ pub fn not_implemented(page: &DataPage) -> Error { /// A private trait representing structs that can receive elements. pub(super) trait Pushable: Sized { - //fn reserve(&mut self, additional: usize); + fn reserve(&mut self, additional: usize); fn push(&mut self, value: T); fn len(&self) -> usize; fn push_null(&mut self); @@ -38,6 +38,9 @@ pub(super) trait Pushable: Sized { } impl Pushable for MutableBitmap { + fn reserve(&mut self, additional: usize) { + MutableBitmap::reserve(self, additional) + } #[inline] fn len(&self) -> usize { self.len() @@ -60,6 +63,9 @@ impl Pushable for MutableBitmap { } impl Pushable for Vec { + fn reserve(&mut self, additional: usize) { + Vec::reserve(self, additional) + } #[inline] fn len(&self) -> usize { self.len() @@ -304,6 +310,7 @@ pub(super) fn extend_from_decoder<'a, T: Default, P: Pushable, I: Iterator, I: Iterator { validity.extend_constant(length, is_set); if is_set { + pushable.reserve(length); (0..length).for_each(|_| pushable.push(values_iter.next().unwrap())); } else { pushable.extend_constant(length, T::default()); From a40f6891c8dc2efdf52da7be052e1921a7570e0c Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Sat, 19 Nov 2022 12:45:47 +0100 Subject: [PATCH 2/2] compute length to reserve up front --- .../parquet/read/deserialize/binary/utils.rs | 1 + .../deserialize/fixed_size_binary/utils.rs | 1 + src/io/parquet/read/deserialize/utils.rs | 34 +++++++++++++++---- 3 files changed, 29 insertions(+), 7 deletions(-) diff --git a/src/io/parquet/read/deserialize/binary/utils.rs b/src/io/parquet/read/deserialize/binary/utils.rs index 1f30d045a37..a47cb967a55 100644 --- a/src/io/parquet/read/deserialize/binary/utils.rs +++ b/src/io/parquet/read/deserialize/binary/utils.rs @@ -91,6 +91,7 @@ impl Binary { } impl<'a, O: Offset> Pushable<&'a [u8]> for Binary { + #[inline] fn reserve(&mut self, additional: usize) { let avg_len = self.values.len() / std::cmp::max(self.last_offset.to_usize(), 1); self.values.reserve(additional * avg_len); diff --git a/src/io/parquet/read/deserialize/fixed_size_binary/utils.rs b/src/io/parquet/read/deserialize/fixed_size_binary/utils.rs index 88fd06ee641..f718ce1bdc2 100644 --- a/src/io/parquet/read/deserialize/fixed_size_binary/utils.rs +++ b/src/io/parquet/read/deserialize/fixed_size_binary/utils.rs @@ -30,6 +30,7 @@ impl FixedSizeBinary { } impl<'a> Pushable<&'a [u8]> for FixedSizeBinary { + #[inline] fn reserve(&mut self, additional: usize) { self.values.reserve(additional * self.size); } diff --git a/src/io/parquet/read/deserialize/utils.rs b/src/io/parquet/read/deserialize/utils.rs index d3b1e8c351f..71bce24c586 100644 --- a/src/io/parquet/read/deserialize/utils.rs +++ b/src/io/parquet/read/deserialize/utils.rs @@ -38,6 +38,7 @@ pub(super) trait Pushable: Sized { } impl Pushable for MutableBitmap { + #[inline] fn reserve(&mut self, additional: usize) { MutableBitmap::reserve(self, additional) } @@ -63,6 +64,7 @@ impl Pushable for MutableBitmap { } impl Pushable for Vec { + #[inline] fn reserve(&mut self, additional: usize) { Vec::reserve(self, additional) } @@ -296,11 +298,33 @@ pub(super) fn extend_from_decoder<'a, T: Default, P: Pushable, I: Iterator 0 { let run = page_validity.next_limited(remaining); let run = if let Some(run) = run { run } else { break }; + match run { + FilteredHybridEncoded::Bitmap { length, .. } => { + reserve_pushable += length; + remaining -= length; + } + FilteredHybridEncoded::Repeated { length, .. } => { + reserve_pushable += length; + remaining -= length; + } + _ => {} + }; + runs.push(run) + } + pushable.reserve(reserve_pushable); + validity.reserve(reserve_pushable); + + // then a second loop to really fill the buffers + for run in runs { match run { FilteredHybridEncoded::Bitmap { values, @@ -310,7 +334,6 @@ pub(super) fn extend_from_decoder<'a, T: Default, P: Pushable, I: Iterator, I: Iterator { validity.extend_constant(length, is_set); if is_set { - pushable.reserve(length); - (0..length).for_each(|_| pushable.push(values_iter.next().unwrap())); + for v in (&mut values_iter).take(length) { + pushable.push(v) + } } else { pushable.extend_constant(length, T::default()); } - - remaining -= length; } FilteredHybridEncoded::Skipped(valids) => for _ in values_iter.by_ref().take(valids) {}, };