Skip to content

Commit

Permalink
Added reserve to pushable containers in parquet extend_from_decoder (
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Mar 29, 2023
1 parent f41def4 commit 0d8320c
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 6 deletions.
9 changes: 9 additions & 0 deletions src/io/parquet/read/deserialize/binary/utils.rs
Expand Up @@ -25,6 +25,9 @@ impl<O: Offset> Offsets<O> {
}

impl<O: Offset> Pushable<O> for Offsets<O> {
fn reserve(&mut self, additional: usize) {
self.0.reserve(additional)
}
#[inline]
fn len(&self) -> usize {
self.0.len() - 1
Expand Down Expand Up @@ -88,6 +91,12 @@ impl<O: Offset> Binary<O> {
}

impl<'a, O: Offset> Pushable<&'a [u8]> for Binary<O> {
#[inline]
fn reserve(&mut self, additional: usize) {
let avg_len = self.values.len() / std::cmp::max(self.last_offset.to_usize(), 1);
self.values.reserve(additional * avg_len);
self.offsets.reserve(additional);
}
#[inline]
fn len(&self) -> usize {
self.len()
Expand Down
4 changes: 4 additions & 0 deletions src/io/parquet/read/deserialize/fixed_size_binary/utils.rs
Expand Up @@ -30,6 +30,10 @@ impl FixedSizeBinary {
}

impl<'a> Pushable<&'a [u8]> for FixedSizeBinary {
#[inline]
fn reserve(&mut self, additional: usize) {
self.values.reserve(additional * self.size);
}
#[inline]
fn push(&mut self, value: &[u8]) {
debug_assert_eq!(value.len(), self.size);
Expand Down
40 changes: 34 additions & 6 deletions src/io/parquet/read/deserialize/utils.rs
Expand Up @@ -30,14 +30,18 @@ pub fn not_implemented(page: &DataPage) -> Error {

/// A private trait representing structs that can receive elements.
pub(super) trait Pushable<T>: Sized {
//fn reserve(&mut self, additional: usize);
fn reserve(&mut self, additional: usize);
fn push(&mut self, value: T);
fn len(&self) -> usize;
fn push_null(&mut self);
fn extend_constant(&mut self, additional: usize, value: T);
}

impl Pushable<bool> for MutableBitmap {
#[inline]
fn reserve(&mut self, additional: usize) {
MutableBitmap::reserve(self, additional)
}
#[inline]
fn len(&self) -> usize {
self.len()
Expand All @@ -60,6 +64,10 @@ impl Pushable<bool> for MutableBitmap {
}

impl<A: Copy + Default> Pushable<A> for Vec<A> {
#[inline]
fn reserve(&mut self, additional: usize) {
Vec::reserve(self, additional)
}
#[inline]
fn len(&self) -> usize {
self.len()
Expand Down Expand Up @@ -290,11 +298,33 @@ pub(super) fn extend_from_decoder<'a, T: Default, P: Pushable<T>, I: Iterator<It
) {
let limit = limit.unwrap_or(usize::MAX);

let mut runs = vec![];
let mut remaining = limit;
let mut reserve_pushable = 0;

// first do a scan so that we know how much to reserve up front
while remaining > 0 {
let run = page_validity.next_limited(remaining);
let run = if let Some(run) = run { run } else { break };

match run {
FilteredHybridEncoded::Bitmap { length, .. } => {
reserve_pushable += length;
remaining -= length;
}
FilteredHybridEncoded::Repeated { length, .. } => {
reserve_pushable += length;
remaining -= length;
}
_ => {}
};
runs.push(run)
}
pushable.reserve(reserve_pushable);
validity.reserve(reserve_pushable);

// then a second loop to really fill the buffers
for run in runs {
match run {
FilteredHybridEncoded::Bitmap {
values,
Expand All @@ -313,18 +343,16 @@ pub(super) fn extend_from_decoder<'a, T: Default, P: Pushable<T>, I: Iterator<It
}
}
validity.extend_from_slice(values, offset, length);

remaining -= length;
}
FilteredHybridEncoded::Repeated { is_set, length } => {
validity.extend_constant(length, is_set);
if is_set {
(0..length).for_each(|_| pushable.push(values_iter.next().unwrap()));
for v in (&mut values_iter).take(length) {
pushable.push(v)
}
} else {
pushable.extend_constant(length, T::default());
}

remaining -= length;
}
FilteredHybridEncoded::Skipped(valids) => for _ in values_iter.by_ref().take(valids) {},
};
Expand Down

0 comments on commit 0d8320c

Please sign in to comment.