Skip to content

Commit

Permalink
Improved bitpacking (#176)
Browse files Browse the repository at this point in the history
This PR ports apache/arrow-rs#2278 to parquet2. Credit to the design and implementation of the unpacking path go to @tustvold - it is 5-10% faster than the bitpacking crate 馃殌
Additionally, it adds the corresponding packing code path, thereby completely replacing the dependency on bitpacking.
It also adds some traits that allows code to be written via generics.
A curious observation is that, with this PR, parquet2 no longer executes unsafe code (bitpacking had some) 馃帀
Backward changes:

renamed parquet2::encoding::bitpacking to parquet2::encoding::bitpacked
parquet2::encoding::bitpacked::Decoder now has a generic parameter (output type)
parquet2::encoding::bitpacked::Decoder::new's second parameter is now a usize
  • Loading branch information
jorgecarleitao committed Aug 15, 2022
1 parent 0305d01 commit 35447ba
Show file tree
Hide file tree
Showing 17 changed files with 758 additions and 314 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ bench = false

[dependencies]
parquet-format-safe = "0.2"
bitpacking = { version = "0.8.2", default-features = false, features = ["bitpacker1x"] }
seq-macro = { version = "0.3", default-features = false }
streaming-decompression = "0.1"

async-stream = { version = "0.3.2", optional = true }
Expand Down
4 changes: 2 additions & 2 deletions benches/decode_bitpacking.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use criterion::{criterion_group, criterion_main, Criterion};

use parquet2::encoding::bitpacking::Decoder;
use parquet2::encoding::bitpacked::Decoder;

fn add_benchmark(c: &mut Criterion) {
(10..=20).step_by(2).for_each(|log2_size| {
Expand All @@ -11,7 +11,7 @@ fn add_benchmark(c: &mut Criterion) {
.collect::<Vec<_>>();

c.bench_function(&format!("bitpacking 2^{}", log2_size), |b| {
b.iter(|| Decoder::new(&bytes, 1, size).count())
b.iter(|| Decoder::<u32>::new(&bytes, 1, size).count())
});
})
}
Expand Down
173 changes: 173 additions & 0 deletions src/encoding/bitpacked/decode.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
use super::{Packed, Unpackable, Unpacked};

/// An [`Iterator`] of [`Unpackable`] unpacked from a bitpacked slice of bytes.
/// # Implementation
/// This iterator unpacks bytes in chunks and does not allocate.
#[derive(Debug, Clone)]
pub struct Decoder<'a, T: Unpackable> {
packed: std::slice::Chunks<'a, u8>,
num_bits: usize,
remaining: usize,
current_pack_index: usize, // invariant: < T::PACK_LENGTH
unpacked: T::Unpacked, // has the current unpacked values.
}

#[inline]
fn decode_pack<T: Unpackable>(packed: &[u8], num_bits: usize, unpacked: &mut T::Unpacked) {
if packed.len() < T::Unpacked::LENGTH * num_bits / 8 {
let mut buf = T::Packed::zero();
buf.as_mut()[..packed.len()].copy_from_slice(packed);
T::unpack(buf.as_ref(), num_bits, unpacked)
} else {
T::unpack(packed, num_bits, unpacked)
}
}

impl<'a, T: Unpackable> Decoder<'a, T> {
/// Returns a [`Decoder`] with `T` encoded in `packed` with `num_bits`.
pub fn new(packed: &'a [u8], num_bits: usize, mut length: usize) -> Self {
let block_size = std::mem::size_of::<T>() * num_bits;

let mut packed = packed.chunks(block_size);
let mut unpacked = T::Unpacked::zero();
if let Some(chunk) = packed.next() {
decode_pack::<T>(chunk, num_bits, &mut unpacked);
} else {
length = 0
};

Self {
remaining: length,
packed,
num_bits,
unpacked,
current_pack_index: 0,
}
}
}

impl<'a, T: Unpackable> Iterator for Decoder<'a, T> {
type Item = T;

#[inline] // -71% improvement in bench
fn next(&mut self) -> Option<Self::Item> {
if self.remaining == 0 {
return None;
}
let result = self.unpacked[self.current_pack_index];
self.current_pack_index += 1;
self.remaining -= 1;
if self.current_pack_index == T::Unpacked::LENGTH {
if let Some(packed) = self.packed.next() {
decode_pack::<T>(packed, self.num_bits, &mut self.unpacked);
self.current_pack_index = 0;
}
}
Some(result)
}

#[inline]
fn size_hint(&self) -> (usize, Option<usize>) {
(self.remaining, Some(self.remaining))
}
}

#[cfg(test)]
mod tests {
use super::super::tests::case1;
use super::*;

#[test]
fn test_decode_rle() {
// Test data: 0-7 with bit width 3
// 0: 000
// 1: 001
// 2: 010
// 3: 011
// 4: 100
// 5: 101
// 6: 110
// 7: 111
let num_bits = 3;
let length = 8;
// encoded: 0b10001000u8, 0b11000110, 0b11111010
let data = vec![0b10001000u8, 0b11000110, 0b11111010];

let decoded = Decoder::<u32>::new(&data, num_bits, length).collect::<Vec<_>>();
assert_eq!(decoded, vec![0, 1, 2, 3, 4, 5, 6, 7]);
}

#[test]
fn decode_large() {
let (num_bits, expected, data) = case1();

let decoded = Decoder::<u32>::new(&data, num_bits, expected.len()).collect::<Vec<_>>();
assert_eq!(decoded, expected);
}

#[test]
fn test_decode_bool() {
let num_bits = 1;
let length = 8;
let data = vec![0b10101010];

let decoded = Decoder::<u32>::new(&data, num_bits, length).collect::<Vec<_>>();
assert_eq!(decoded, vec![0, 1, 0, 1, 0, 1, 0, 1]);
}

#[test]
fn test_decode_u64() {
let num_bits = 1;
let length = 8;
let data = vec![0b10101010];

let decoded = Decoder::<u64>::new(&data, num_bits, length).collect::<Vec<_>>();
assert_eq!(decoded, vec![0, 1, 0, 1, 0, 1, 0, 1]);
}

#[test]
fn even_case() {
// [0, 1, 2, 3, 4, 5, 6, 0]x99
let data = &[0b10001000u8, 0b11000110, 0b00011010];
let num_bits = 3;
let copies = 99; // 8 * 99 % 32 != 0
let expected = std::iter::repeat(&[0u32, 1, 2, 3, 4, 5, 6, 0])
.take(copies)
.flatten()
.copied()
.collect::<Vec<_>>();
let data = std::iter::repeat(data)
.take(copies)
.flatten()
.copied()
.collect::<Vec<_>>();
let length = expected.len();

let decoded = Decoder::<u32>::new(&data, num_bits, length).collect::<Vec<_>>();
assert_eq!(decoded, expected);
}

#[test]
fn odd_case() {
// [0, 1, 2, 3, 4, 5, 6, 0]x4 + [2]
let data = &[0b10001000u8, 0b11000110, 0b00011010];
let num_bits = 3;
let copies = 4;
let expected = std::iter::repeat(&[0u32, 1, 2, 3, 4, 5, 6, 0])
.take(copies)
.flatten()
.copied()
.chain(std::iter::once(2))
.collect::<Vec<_>>();
let data = std::iter::repeat(data)
.take(copies)
.flatten()
.copied()
.chain(std::iter::once(0b00000010u8))
.collect::<Vec<_>>();
let length = expected.len();

let decoded = Decoder::<u32>::new(&data, num_bits, length).collect::<Vec<_>>();
assert_eq!(decoded, expected);
}
}
54 changes: 54 additions & 0 deletions src/encoding/bitpacked/encode.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
use std::convert::TryInto;

use super::{Packed, Unpackable, Unpacked};

/// Encodes (packs) a slice of [`Unpackable`] into bitpacked bytes `packed`, using `num_bits` per value.
///
/// This function assumes that the maximum value in `unpacked` fits in `num_bits` bits
/// and saturates higher values.
///
/// Only the first `ceil8(unpacked.len() * num_bits)` of `packed` are populated.
pub fn encode<T: Unpackable>(unpacked: &[T], num_bits: usize, packed: &mut [u8]) {
let chunks = unpacked.chunks_exact(T::Unpacked::LENGTH);

let remainder = chunks.remainder();

let packed_size = (T::Unpacked::LENGTH * num_bits + 7) / 8;
if !remainder.is_empty() {
let packed_chunks = packed.chunks_mut(packed_size);
let mut last_chunk = T::Unpacked::zero();
for i in 0..remainder.len() {
last_chunk[i] = remainder[i]
}

chunks
.chain(std::iter::once(last_chunk.as_ref()))
.zip(packed_chunks)
.for_each(|(unpacked, packed)| {
T::pack(&unpacked.try_into().unwrap(), num_bits, packed);
});
} else {
let packed_chunks = packed.chunks_exact_mut(packed_size);
chunks.zip(packed_chunks).for_each(|(unpacked, packed)| {
T::pack(&unpacked.try_into().unwrap(), num_bits, packed);
});
}
}

/// Encodes (packs) a potentially incomplete pack of [`Unpackable`] into bitpacked
/// bytes `packed`, using `num_bits` per value.
///
/// This function assumes that the maximum value in `unpacked` fits in `num_bits` bits
/// and saturates higher values.
///
/// Only the first `ceil8(unpacked.len() * num_bits)` of `packed` are populated.
#[inline]
pub fn encode_pack<T: Unpackable>(unpacked: &[T], num_bits: usize, packed: &mut [u8]) {
if unpacked.len() < T::Packed::LENGTH {
let mut complete_unpacked = T::Unpacked::zero();
complete_unpacked.as_mut()[..unpacked.len()].copy_from_slice(unpacked);
T::pack(&complete_unpacked, num_bits, packed)
} else {
T::pack(&unpacked.try_into().unwrap(), num_bits, packed)
}
}

0 comments on commit 35447ba

Please sign in to comment.