Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Skip for DeltaBitPackDecoder #2393

Merged
merged 4 commits into from
Aug 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
53 changes: 53 additions & 0 deletions parquet/benches/arrow_reader.rs
Expand Up @@ -300,6 +300,26 @@ fn bench_array_reader(mut array_reader: Box<dyn ArrayReader>) -> usize {
total_count
}

fn bench_array_reader_skip(mut array_reader: Box<dyn ArrayReader>) -> usize {
// test procedure: read data in batches of 8192 until no more data
let mut total_count = 0;
let mut skip = false;
let mut array_len;
loop {
if skip {
array_len = array_reader.skip_records(BATCH_SIZE).unwrap();
} else {
let array = array_reader.next_batch(BATCH_SIZE);
array_len = array.unwrap().len();
}
total_count += array_len;
skip = !skip;
if array_len < BATCH_SIZE {
break;
}
}
total_count
}
fn create_primitive_array_reader(
page_iterator: impl PageIterator + 'static,
column_desc: ColumnDescPtr,
Expand Down Expand Up @@ -445,6 +465,39 @@ fn bench_primitive<T>(
assert_eq!(count, EXPECTED_VALUE_COUNT);
});

// binary packed skip , no NULLs
let data = build_encoded_primitive_page_iterator::<T>(
schema.clone(),
mandatory_column_desc.clone(),
0.0,
Encoding::DELTA_BINARY_PACKED,
);
group.bench_function("binary packed skip, mandatory, no NULLs", |b| {
b.iter(|| {
let array_reader = create_primitive_array_reader(
data.clone(),
mandatory_column_desc.clone(),
);
count = bench_array_reader_skip(array_reader);
});
assert_eq!(count, EXPECTED_VALUE_COUNT);
});

let data = build_encoded_primitive_page_iterator::<T>(
schema.clone(),
optional_column_desc.clone(),
0.0,
Encoding::DELTA_BINARY_PACKED,
);
group.bench_function("binary packed skip, optional, no NULLs", |b| {
b.iter(|| {
let array_reader =
create_primitive_array_reader(data.clone(), optional_column_desc.clone());
count = bench_array_reader_skip(array_reader);
});
assert_eq!(count, EXPECTED_VALUE_COUNT);
});

// binary packed, half NULLs
let data = build_encoded_primitive_page_iterator::<T>(
schema.clone(),
Expand Down
57 changes: 55 additions & 2 deletions parquet/src/encodings/decoding.rs
Expand Up @@ -736,8 +736,61 @@ where
}

fn skip(&mut self, num_values: usize) -> Result<usize> {
let mut buffer = vec![T::T::default(); num_values];
self.get(&mut buffer)
let mut skip = 0;
let to_skip = num_values.min(self.values_left);
if to_skip == 0 {
return Ok(0);
}

// try to consume first value in header.
if let Some(value) = self.first_value.take() {
self.last_value = value;
skip += 1;
self.values_left -= 1;
}

let mini_block_batch_size = match T::T::PHYSICAL_TYPE {
Type::INT32 => 32,
Type::INT64 => 64,
_ => unreachable!(),
};

let mut skip_buffer = vec![T::T::default(); mini_block_batch_size];
while skip < to_skip {
if self.mini_block_remaining == 0 {
self.next_mini_block()?;
}

let bit_width = self.mini_block_bit_widths[self.mini_block_idx] as usize;
let mini_block_to_skip = self.mini_block_remaining.min(to_skip - skip);
let mini_block_should_skip = mini_block_to_skip;

let skip_count = self
.bit_reader
.get_batch(&mut skip_buffer[0..mini_block_to_skip], bit_width);

if skip_count != mini_block_to_skip {
return Err(general_err!(
"Expected to skip {} values from mini block got {}.",
mini_block_batch_size,
skip_count
));
}

for v in &mut skip_buffer[0..skip_count] {
*v = v
.wrapping_add(&self.min_delta)
.wrapping_add(&self.last_value);

self.last_value = *v;
}

skip += mini_block_should_skip;
self.mini_block_remaining -= mini_block_should_skip;
self.values_left -= mini_block_should_skip;
}

Ok(to_skip)
}
}

Expand Down