Skip to content

Commit

Permalink
prepare fastfield format for null index (#1691)
Browse files Browse the repository at this point in the history
* prepare fastfield format for null index
* add format version for fastfield
* Update fastfield_codecs/src/compact_space/mod.rs
* switch to variable size footer
* serialize delta of end
  • Loading branch information
PSeitz committed Nov 28, 2022
1 parent ee1f2c1 commit 1119e59
Show file tree
Hide file tree
Showing 8 changed files with 262 additions and 17 deletions.
14 changes: 14 additions & 0 deletions common/src/serialize.rs
Expand Up @@ -94,6 +94,20 @@ impl FixedSize for u32 {
const SIZE_IN_BYTES: usize = 4;
}

impl BinarySerializable for u16 {
fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
writer.write_u16::<Endianness>(*self)
}

fn deserialize<R: Read>(reader: &mut R) -> io::Result<u16> {
reader.read_u16::<Endianness>()
}
}

impl FixedSize for u16 {
const SIZE_IN_BYTES: usize = 2;
}

impl BinarySerializable for u64 {
fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
writer.write_u64::<Endianness>(*self)
Expand Down
6 changes: 6 additions & 0 deletions fastfield_codecs/src/compact_space/mod.rs
Expand Up @@ -456,6 +456,8 @@ impl CompactSpaceDecompressor {
mod tests {

use super::*;
use crate::format_version::read_format_version;
use crate::null_index_footer::read_null_index_footer;
use crate::serialize::U128Header;
use crate::{open_u128, serialize_u128};

Expand Down Expand Up @@ -541,7 +543,10 @@ mod tests {
.unwrap();

let data = OwnedBytes::new(out);
let (data, _format_version) = read_format_version(data).unwrap();
let (data, _null_index_footer) = read_null_index_footer(data).unwrap();
test_all(data.clone(), u128_vals);

data
}

Expand All @@ -559,6 +564,7 @@ mod tests {
333u128,
];
let mut data = test_aux_vals(vals);

let _header = U128Header::deserialize(&mut data);
let decomp = CompactSpaceDecompressor::open(data).unwrap();
let complete_range = 0..vals.len() as u32;
Expand Down
39 changes: 39 additions & 0 deletions fastfield_codecs/src/format_version.rs
@@ -0,0 +1,39 @@
use std::io;

use common::BinarySerializable;
use ownedbytes::OwnedBytes;

const MAGIC_NUMBER: u16 = 4335u16;
const FASTFIELD_FORMAT_VERSION: u8 = 1;

pub(crate) fn append_format_version(output: &mut impl io::Write) -> io::Result<()> {
FASTFIELD_FORMAT_VERSION.serialize(output)?;
MAGIC_NUMBER.serialize(output)?;

Ok(())
}

pub(crate) fn read_format_version(data: OwnedBytes) -> io::Result<(OwnedBytes, u8)> {
let (data, magic_number_bytes) = data.rsplit(2);

let magic_number = u16::deserialize(&mut magic_number_bytes.as_slice())?;
if magic_number != MAGIC_NUMBER {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!("magic number mismatch {} != {}", magic_number, MAGIC_NUMBER),
));
}
let (data, format_version_bytes) = data.rsplit(1);
let format_version = u8::deserialize(&mut format_version_bytes.as_slice())?;
if format_version > FASTFIELD_FORMAT_VERSION {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!(
"Unsupported fastfield format version: {}. Max supported version: {}",
format_version, FASTFIELD_FORMAT_VERSION
),
));
}

Ok((data, format_version))
}
14 changes: 10 additions & 4 deletions fastfield_codecs/src/lib.rs
Expand Up @@ -20,20 +20,24 @@ use std::sync::Arc;

use common::BinarySerializable;
use compact_space::CompactSpaceDecompressor;
use format_version::read_format_version;
use monotonic_mapping::{
StrictlyMonotonicMappingInverter, StrictlyMonotonicMappingToInternal,
StrictlyMonotonicMappingToInternalBaseval, StrictlyMonotonicMappingToInternalGCDBaseval,
};
use null_index_footer::read_null_index_footer;
use ownedbytes::OwnedBytes;
use serialize::{Header, U128Header};

mod bitpacked;
mod blockwise_linear;
mod compact_space;
mod format_version;
mod line;
mod linear;
mod monotonic_mapping;
mod monotonic_mapping_u128;
mod null_index_footer;

mod column;
mod gcd;
Expand Down Expand Up @@ -129,8 +133,10 @@ impl U128FastFieldCodecType {

/// Returns the correct codec reader wrapped in the `Arc` for the data.
pub fn open_u128<Item: MonotonicallyMappableToU128>(
mut bytes: OwnedBytes,
bytes: OwnedBytes,
) -> io::Result<Arc<dyn Column<Item>>> {
let (bytes, _format_version) = read_format_version(bytes)?;
let (mut bytes, _null_index_footer) = read_null_index_footer(bytes)?;
let header = U128Header::deserialize(&mut bytes)?;
assert_eq!(header.codec_type, U128FastFieldCodecType::CompactSpace);
let reader = CompactSpaceDecompressor::open(bytes)?;
Expand All @@ -140,9 +146,9 @@ pub fn open_u128<Item: MonotonicallyMappableToU128>(
}

/// Returns the correct codec reader wrapped in the `Arc` for the data.
pub fn open<T: MonotonicallyMappableToU64>(
mut bytes: OwnedBytes,
) -> io::Result<Arc<dyn Column<T>>> {
pub fn open<T: MonotonicallyMappableToU64>(bytes: OwnedBytes) -> io::Result<Arc<dyn Column<T>>> {
let (bytes, _format_version) = read_format_version(bytes)?;
let (mut bytes, _null_index_footer) = read_null_index_footer(bytes)?;
let header = Header::deserialize(&mut bytes)?;
match header.codec_type {
FastFieldCodecType::Bitpacked => open_specific_codec::<BitpackedCodec, _>(bytes, &header),
Expand Down
144 changes: 144 additions & 0 deletions fastfield_codecs/src/null_index_footer.rs
@@ -0,0 +1,144 @@
use std::io::{self, Write};
use std::ops::Range;

use common::{BinarySerializable, CountingWriter, VInt};
use ownedbytes::OwnedBytes;

#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub(crate) enum FastFieldCardinality {
Single = 1,
}

impl BinarySerializable for FastFieldCardinality {
fn serialize<W: Write>(&self, wrt: &mut W) -> io::Result<()> {
self.to_code().serialize(wrt)
}

fn deserialize<R: io::Read>(reader: &mut R) -> io::Result<Self> {
let code = u8::deserialize(reader)?;
let codec_type: Self = Self::from_code(code)
.ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "Unknown code `{code}.`"))?;
Ok(codec_type)
}
}

impl FastFieldCardinality {
pub(crate) fn to_code(self) -> u8 {
self as u8
}

pub(crate) fn from_code(code: u8) -> Option<Self> {
match code {
1 => Some(Self::Single),
_ => None,
}
}
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum NullIndexCodec {
Full = 1,
}

impl BinarySerializable for NullIndexCodec {
fn serialize<W: Write>(&self, wrt: &mut W) -> io::Result<()> {
self.to_code().serialize(wrt)
}

fn deserialize<R: io::Read>(reader: &mut R) -> io::Result<Self> {
let code = u8::deserialize(reader)?;
let codec_type: Self = Self::from_code(code)
.ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "Unknown code `{code}.`"))?;
Ok(codec_type)
}
}

impl NullIndexCodec {
pub(crate) fn to_code(self) -> u8 {
self as u8
}

pub(crate) fn from_code(code: u8) -> Option<Self> {
match code {
1 => Some(Self::Full),
_ => None,
}
}
}

#[derive(Debug, Clone, Eq, PartialEq)]
pub(crate) struct NullIndexFooter {
pub(crate) cardinality: FastFieldCardinality,
pub(crate) null_index_codec: NullIndexCodec,
// Unused for NullIndexCodec::Full
pub(crate) null_index_byte_range: Range<u64>,
}

impl BinarySerializable for NullIndexFooter {
fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
self.cardinality.serialize(writer)?;
self.null_index_codec.serialize(writer)?;
VInt(self.null_index_byte_range.start).serialize(writer)?;
VInt(self.null_index_byte_range.end - self.null_index_byte_range.start)
.serialize(writer)?;
Ok(())
}

fn deserialize<R: io::Read>(reader: &mut R) -> io::Result<Self> {
let cardinality = FastFieldCardinality::deserialize(reader)?;
let null_index_codec = NullIndexCodec::deserialize(reader)?;
let null_index_byte_range_start = VInt::deserialize(reader)?.0;
let null_index_byte_range_end = VInt::deserialize(reader)?.0 + null_index_byte_range_start;
Ok(Self {
cardinality,
null_index_codec,
null_index_byte_range: null_index_byte_range_start..null_index_byte_range_end,
})
}
}

pub(crate) fn append_null_index_footer(
output: &mut impl io::Write,
null_index_footer: NullIndexFooter,
) -> io::Result<()> {
let mut counting_write = CountingWriter::wrap(output);
null_index_footer.serialize(&mut counting_write)?;
let footer_payload_len = counting_write.written_bytes();
BinarySerializable::serialize(&(footer_payload_len as u16), &mut counting_write)?;

Ok(())
}

pub(crate) fn read_null_index_footer(
data: OwnedBytes,
) -> io::Result<(OwnedBytes, NullIndexFooter)> {
let (data, null_footer_length_bytes) = data.rsplit(2);

let footer_length = u16::deserialize(&mut null_footer_length_bytes.as_slice())?;
let (data, null_index_footer_bytes) = data.rsplit(footer_length as usize);
let null_index_footer = NullIndexFooter::deserialize(&mut null_index_footer_bytes.as_ref())?;

Ok((data, null_index_footer))
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn null_index_footer_deser_test() {
let null_index_footer = NullIndexFooter {
cardinality: FastFieldCardinality::Single,
null_index_codec: NullIndexCodec::Full,
null_index_byte_range: 100..120,
};

let mut out = vec![];
null_index_footer.serialize(&mut out).unwrap();

assert_eq!(
null_index_footer,
NullIndexFooter::deserialize(&mut &out[..]).unwrap()
);
}
}
27 changes: 24 additions & 3 deletions fastfield_codecs/src/serialize.rs
Expand Up @@ -28,11 +28,15 @@ use ownedbytes::OwnedBytes;
use crate::bitpacked::BitpackedCodec;
use crate::blockwise_linear::BlockwiseLinearCodec;
use crate::compact_space::CompactSpaceCompressor;
use crate::format_version::append_format_version;
use crate::linear::LinearCodec;
use crate::monotonic_mapping::{
StrictlyMonotonicFn, StrictlyMonotonicMappingToInternal,
StrictlyMonotonicMappingToInternalGCDBaseval,
};
use crate::null_index_footer::{
append_null_index_footer, FastFieldCardinality, NullIndexCodec, NullIndexFooter,
};
use crate::{
monotonic_map_column, Column, FastFieldCodec, FastFieldCodecType, MonotonicallyMappableToU64,
U128FastFieldCodecType, VecColumn, ALL_CODEC_TYPES,
Expand Down Expand Up @@ -198,6 +202,14 @@ pub fn serialize_u128<F: Fn() -> I, I: Iterator<Item = u128>>(
let compressor = CompactSpaceCompressor::train_from(iter_gen(), num_vals);
compressor.compress_into(iter_gen(), output).unwrap();

let null_index_footer = NullIndexFooter {
cardinality: FastFieldCardinality::Single,
null_index_codec: NullIndexCodec::Full,
null_index_byte_range: 0..0,
};
append_null_index_footer(output, null_index_footer)?;
append_format_version(output)?;

Ok(())
}

Expand All @@ -221,6 +233,15 @@ pub fn serialize<T: MonotonicallyMappableToU64>(
let normalized_column = header.normalize_column(column);
assert_eq!(normalized_column.min_value(), 0u64);
serialize_given_codec(normalized_column, header.codec_type, output)?;

let null_index_footer = NullIndexFooter {
cardinality: FastFieldCardinality::Single,
null_index_codec: NullIndexCodec::Full,
null_index_byte_range: 0..0,
};
append_null_index_footer(output, null_index_footer)?;
append_format_version(output)?;

Ok(())
}

Expand Down Expand Up @@ -310,7 +331,7 @@ mod tests {
let col = VecColumn::from(&[false, true][..]);
serialize(col, &mut buffer, &ALL_CODEC_TYPES).unwrap();
// 5 bytes of header, 1 byte of value, 7 bytes of padding.
assert_eq!(buffer.len(), 5 + 8);
assert_eq!(buffer.len(), 3 + 5 + 8 + 4 + 2);
}

#[test]
Expand All @@ -319,7 +340,7 @@ mod tests {
let col = VecColumn::from(&[true][..]);
serialize(col, &mut buffer, &ALL_CODEC_TYPES).unwrap();
// 5 bytes of header, 0 bytes of value, 7 bytes of padding.
assert_eq!(buffer.len(), 5 + 7);
assert_eq!(buffer.len(), 3 + 5 + 7 + 4 + 2);
}

#[test]
Expand All @@ -329,6 +350,6 @@ mod tests {
let col = VecColumn::from(&vals[..]);
serialize(col, &mut buffer, &[FastFieldCodecType::Bitpacked]).unwrap();
// Values are stored over 3 bits.
assert_eq!(buffer.len(), 7 + (3 * 80 / 8) + 7);
assert_eq!(buffer.len(), 3 + 7 + (3 * 80 / 8) + 7 + 4 + 2);
}
}
15 changes: 15 additions & 0 deletions ownedbytes/src/lib.rs
Expand Up @@ -80,6 +80,21 @@ impl OwnedBytes {
(left, right)
}

/// Splits the OwnedBytes into two OwnedBytes `(left, right)`.
///
/// Right will hold `split_len` bytes.
///
/// This operation is cheap and does not require to copy any memory.
/// On the other hand, both `left` and `right` retain a handle over
/// the entire slice of memory. In other words, the memory will only
/// be released when both left and right are dropped.
#[inline]
#[must_use]
pub fn rsplit(self, split_len: usize) -> (OwnedBytes, OwnedBytes) {
let data_len = self.data.len();
self.split(data_len - split_len)
}

/// Splits the right part of the `OwnedBytes` at the given offset.
///
/// `self` is truncated to `split_len`, left with the remaining bytes.
Expand Down

0 comments on commit 1119e59

Please sign in to comment.