diff --git a/common/src/serialize.rs b/common/src/serialize.rs index 7b96316ee4..cd94bc890e 100644 --- a/common/src/serialize.rs +++ b/common/src/serialize.rs @@ -94,6 +94,20 @@ impl FixedSize for u32 { const SIZE_IN_BYTES: usize = 4; } +impl BinarySerializable for u16 { + fn serialize(&self, writer: &mut W) -> io::Result<()> { + writer.write_u16::(*self) + } + + fn deserialize(reader: &mut R) -> io::Result { + reader.read_u16::() + } +} + +impl FixedSize for u16 { + const SIZE_IN_BYTES: usize = 2; +} + impl BinarySerializable for u64 { fn serialize(&self, writer: &mut W) -> io::Result<()> { writer.write_u64::(*self) diff --git a/fastfield_codecs/src/compact_space/mod.rs b/fastfield_codecs/src/compact_space/mod.rs index 3e744562ca..9120d2413c 100644 --- a/fastfield_codecs/src/compact_space/mod.rs +++ b/fastfield_codecs/src/compact_space/mod.rs @@ -456,6 +456,8 @@ impl CompactSpaceDecompressor { mod tests { use super::*; + use crate::format_version::read_format_version; + use crate::null_index_footer::read_null_index_footer; use crate::serialize::U128Header; use crate::{open_u128, serialize_u128}; @@ -541,7 +543,10 @@ mod tests { .unwrap(); let data = OwnedBytes::new(out); + let (data, _format_version) = read_format_version(data).unwrap(); + let (data, _null_index_footer) = read_null_index_footer(data).unwrap(); test_all(data.clone(), u128_vals); + data } @@ -559,6 +564,7 @@ mod tests { 333u128, ]; let mut data = test_aux_vals(vals); + let _header = U128Header::deserialize(&mut data); let decomp = CompactSpaceDecompressor::open(data).unwrap(); let complete_range = 0..vals.len() as u32; diff --git a/fastfield_codecs/src/format_version.rs b/fastfield_codecs/src/format_version.rs new file mode 100644 index 0000000000..7eaa342a36 --- /dev/null +++ b/fastfield_codecs/src/format_version.rs @@ -0,0 +1,39 @@ +use std::io; + +use common::BinarySerializable; +use ownedbytes::OwnedBytes; + +const MAGIC_NUMBER: u16 = 4335u16; +const FASTFIELD_FORMAT_VERSION: u8 = 1; + +pub(crate) fn append_format_version(output: &mut impl io::Write) -> io::Result<()> { + FASTFIELD_FORMAT_VERSION.serialize(output)?; + MAGIC_NUMBER.serialize(output)?; + + Ok(()) +} + +pub(crate) fn read_format_version(data: OwnedBytes) -> io::Result<(OwnedBytes, u8)> { + let (data, magic_number_bytes) = data.rsplit(2); + + let magic_number = u16::deserialize(&mut magic_number_bytes.as_slice())?; + if magic_number != MAGIC_NUMBER { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + format!("magic number mismatch {} != {}", magic_number, MAGIC_NUMBER), + )); + } + let (data, format_version_bytes) = data.rsplit(1); + let format_version = u8::deserialize(&mut format_version_bytes.as_slice())?; + if format_version > FASTFIELD_FORMAT_VERSION { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + format!( + "Unsupported fastfield format version: {}. Max supported version: {}", + format_version, FASTFIELD_FORMAT_VERSION + ), + )); + } + + Ok((data, format_version)) +} diff --git a/fastfield_codecs/src/lib.rs b/fastfield_codecs/src/lib.rs index f8a22732f3..7a12af8bcd 100644 --- a/fastfield_codecs/src/lib.rs +++ b/fastfield_codecs/src/lib.rs @@ -20,20 +20,24 @@ use std::sync::Arc; use common::BinarySerializable; use compact_space::CompactSpaceDecompressor; +use format_version::read_format_version; use monotonic_mapping::{ StrictlyMonotonicMappingInverter, StrictlyMonotonicMappingToInternal, StrictlyMonotonicMappingToInternalBaseval, StrictlyMonotonicMappingToInternalGCDBaseval, }; +use null_index_footer::read_null_index_footer; use ownedbytes::OwnedBytes; use serialize::{Header, U128Header}; mod bitpacked; mod blockwise_linear; mod compact_space; +mod format_version; mod line; mod linear; mod monotonic_mapping; mod monotonic_mapping_u128; +mod null_index_footer; mod column; mod gcd; @@ -129,8 +133,10 @@ impl U128FastFieldCodecType { /// Returns the correct codec reader wrapped in the `Arc` for the data. pub fn open_u128( - mut bytes: OwnedBytes, + bytes: OwnedBytes, ) -> io::Result>> { + let (bytes, _format_version) = read_format_version(bytes)?; + let (mut bytes, _null_index_footer) = read_null_index_footer(bytes)?; let header = U128Header::deserialize(&mut bytes)?; assert_eq!(header.codec_type, U128FastFieldCodecType::CompactSpace); let reader = CompactSpaceDecompressor::open(bytes)?; @@ -140,9 +146,9 @@ pub fn open_u128( } /// Returns the correct codec reader wrapped in the `Arc` for the data. -pub fn open( - mut bytes: OwnedBytes, -) -> io::Result>> { +pub fn open(bytes: OwnedBytes) -> io::Result>> { + let (bytes, _format_version) = read_format_version(bytes)?; + let (mut bytes, _null_index_footer) = read_null_index_footer(bytes)?; let header = Header::deserialize(&mut bytes)?; match header.codec_type { FastFieldCodecType::Bitpacked => open_specific_codec::(bytes, &header), diff --git a/fastfield_codecs/src/null_index_footer.rs b/fastfield_codecs/src/null_index_footer.rs new file mode 100644 index 0000000000..1ce3cfcda6 --- /dev/null +++ b/fastfield_codecs/src/null_index_footer.rs @@ -0,0 +1,144 @@ +use std::io::{self, Write}; +use std::ops::Range; + +use common::{BinarySerializable, CountingWriter, VInt}; +use ownedbytes::OwnedBytes; + +#[derive(Debug, Clone, Copy, Eq, PartialEq)] +pub(crate) enum FastFieldCardinality { + Single = 1, +} + +impl BinarySerializable for FastFieldCardinality { + fn serialize(&self, wrt: &mut W) -> io::Result<()> { + self.to_code().serialize(wrt) + } + + fn deserialize(reader: &mut R) -> io::Result { + let code = u8::deserialize(reader)?; + let codec_type: Self = Self::from_code(code) + .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "Unknown code `{code}.`"))?; + Ok(codec_type) + } +} + +impl FastFieldCardinality { + pub(crate) fn to_code(self) -> u8 { + self as u8 + } + + pub(crate) fn from_code(code: u8) -> Option { + match code { + 1 => Some(Self::Single), + _ => None, + } + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(crate) enum NullIndexCodec { + Full = 1, +} + +impl BinarySerializable for NullIndexCodec { + fn serialize(&self, wrt: &mut W) -> io::Result<()> { + self.to_code().serialize(wrt) + } + + fn deserialize(reader: &mut R) -> io::Result { + let code = u8::deserialize(reader)?; + let codec_type: Self = Self::from_code(code) + .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "Unknown code `{code}.`"))?; + Ok(codec_type) + } +} + +impl NullIndexCodec { + pub(crate) fn to_code(self) -> u8 { + self as u8 + } + + pub(crate) fn from_code(code: u8) -> Option { + match code { + 1 => Some(Self::Full), + _ => None, + } + } +} + +#[derive(Debug, Clone, Eq, PartialEq)] +pub(crate) struct NullIndexFooter { + pub(crate) cardinality: FastFieldCardinality, + pub(crate) null_index_codec: NullIndexCodec, + // Unused for NullIndexCodec::Full + pub(crate) null_index_byte_range: Range, +} + +impl BinarySerializable for NullIndexFooter { + fn serialize(&self, writer: &mut W) -> io::Result<()> { + self.cardinality.serialize(writer)?; + self.null_index_codec.serialize(writer)?; + VInt(self.null_index_byte_range.start).serialize(writer)?; + VInt(self.null_index_byte_range.end - self.null_index_byte_range.start) + .serialize(writer)?; + Ok(()) + } + + fn deserialize(reader: &mut R) -> io::Result { + let cardinality = FastFieldCardinality::deserialize(reader)?; + let null_index_codec = NullIndexCodec::deserialize(reader)?; + let null_index_byte_range_start = VInt::deserialize(reader)?.0; + let null_index_byte_range_end = VInt::deserialize(reader)?.0 + null_index_byte_range_start; + Ok(Self { + cardinality, + null_index_codec, + null_index_byte_range: null_index_byte_range_start..null_index_byte_range_end, + }) + } +} + +pub(crate) fn append_null_index_footer( + output: &mut impl io::Write, + null_index_footer: NullIndexFooter, +) -> io::Result<()> { + let mut counting_write = CountingWriter::wrap(output); + null_index_footer.serialize(&mut counting_write)?; + let footer_payload_len = counting_write.written_bytes(); + BinarySerializable::serialize(&(footer_payload_len as u16), &mut counting_write)?; + + Ok(()) +} + +pub(crate) fn read_null_index_footer( + data: OwnedBytes, +) -> io::Result<(OwnedBytes, NullIndexFooter)> { + let (data, null_footer_length_bytes) = data.rsplit(2); + + let footer_length = u16::deserialize(&mut null_footer_length_bytes.as_slice())?; + let (data, null_index_footer_bytes) = data.rsplit(footer_length as usize); + let null_index_footer = NullIndexFooter::deserialize(&mut null_index_footer_bytes.as_ref())?; + + Ok((data, null_index_footer)) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn null_index_footer_deser_test() { + let null_index_footer = NullIndexFooter { + cardinality: FastFieldCardinality::Single, + null_index_codec: NullIndexCodec::Full, + null_index_byte_range: 100..120, + }; + + let mut out = vec![]; + null_index_footer.serialize(&mut out).unwrap(); + + assert_eq!( + null_index_footer, + NullIndexFooter::deserialize(&mut &out[..]).unwrap() + ); + } +} diff --git a/fastfield_codecs/src/serialize.rs b/fastfield_codecs/src/serialize.rs index b0f9e15daf..33f18bada3 100644 --- a/fastfield_codecs/src/serialize.rs +++ b/fastfield_codecs/src/serialize.rs @@ -28,11 +28,15 @@ use ownedbytes::OwnedBytes; use crate::bitpacked::BitpackedCodec; use crate::blockwise_linear::BlockwiseLinearCodec; use crate::compact_space::CompactSpaceCompressor; +use crate::format_version::append_format_version; use crate::linear::LinearCodec; use crate::monotonic_mapping::{ StrictlyMonotonicFn, StrictlyMonotonicMappingToInternal, StrictlyMonotonicMappingToInternalGCDBaseval, }; +use crate::null_index_footer::{ + append_null_index_footer, FastFieldCardinality, NullIndexCodec, NullIndexFooter, +}; use crate::{ monotonic_map_column, Column, FastFieldCodec, FastFieldCodecType, MonotonicallyMappableToU64, U128FastFieldCodecType, VecColumn, ALL_CODEC_TYPES, @@ -198,6 +202,14 @@ pub fn serialize_u128 I, I: Iterator>( let compressor = CompactSpaceCompressor::train_from(iter_gen(), num_vals); compressor.compress_into(iter_gen(), output).unwrap(); + let null_index_footer = NullIndexFooter { + cardinality: FastFieldCardinality::Single, + null_index_codec: NullIndexCodec::Full, + null_index_byte_range: 0..0, + }; + append_null_index_footer(output, null_index_footer)?; + append_format_version(output)?; + Ok(()) } @@ -221,6 +233,15 @@ pub fn serialize( let normalized_column = header.normalize_column(column); assert_eq!(normalized_column.min_value(), 0u64); serialize_given_codec(normalized_column, header.codec_type, output)?; + + let null_index_footer = NullIndexFooter { + cardinality: FastFieldCardinality::Single, + null_index_codec: NullIndexCodec::Full, + null_index_byte_range: 0..0, + }; + append_null_index_footer(output, null_index_footer)?; + append_format_version(output)?; + Ok(()) } @@ -310,7 +331,7 @@ mod tests { let col = VecColumn::from(&[false, true][..]); serialize(col, &mut buffer, &ALL_CODEC_TYPES).unwrap(); // 5 bytes of header, 1 byte of value, 7 bytes of padding. - assert_eq!(buffer.len(), 5 + 8); + assert_eq!(buffer.len(), 3 + 5 + 8 + 4 + 2); } #[test] @@ -319,7 +340,7 @@ mod tests { let col = VecColumn::from(&[true][..]); serialize(col, &mut buffer, &ALL_CODEC_TYPES).unwrap(); // 5 bytes of header, 0 bytes of value, 7 bytes of padding. - assert_eq!(buffer.len(), 5 + 7); + assert_eq!(buffer.len(), 3 + 5 + 7 + 4 + 2); } #[test] @@ -329,6 +350,6 @@ mod tests { let col = VecColumn::from(&vals[..]); serialize(col, &mut buffer, &[FastFieldCodecType::Bitpacked]).unwrap(); // Values are stored over 3 bits. - assert_eq!(buffer.len(), 7 + (3 * 80 / 8) + 7); + assert_eq!(buffer.len(), 3 + 7 + (3 * 80 / 8) + 7 + 4 + 2); } } diff --git a/ownedbytes/src/lib.rs b/ownedbytes/src/lib.rs index 6c32ebba8f..622f9e66e8 100644 --- a/ownedbytes/src/lib.rs +++ b/ownedbytes/src/lib.rs @@ -80,6 +80,21 @@ impl OwnedBytes { (left, right) } + /// Splits the OwnedBytes into two OwnedBytes `(left, right)`. + /// + /// Right will hold `split_len` bytes. + /// + /// This operation is cheap and does not require to copy any memory. + /// On the other hand, both `left` and `right` retain a handle over + /// the entire slice of memory. In other words, the memory will only + /// be released when both left and right are dropped. + #[inline] + #[must_use] + pub fn rsplit(self, split_len: usize) -> (OwnedBytes, OwnedBytes) { + let data_len = self.data.len(); + self.split(data_len - split_len) + } + /// Splits the right part of the `OwnedBytes` at the given offset. /// /// `self` is truncated to `split_len`, left with the remaining bytes. diff --git a/src/fastfield/mod.rs b/src/fastfield/mod.rs index a29ee238ce..bb710b4498 100644 --- a/src/fastfield/mod.rs +++ b/src/fastfield/mod.rs @@ -207,7 +207,7 @@ mod tests { serializer.close().unwrap(); } let file = directory.open_read(path).unwrap(); - assert_eq!(file.len(), 25); + assert_eq!(file.len(), 34); let composite_file = CompositeFile::open(&file)?; let fast_field_bytes = composite_file.open_read(*FIELD).unwrap().read_bytes()?; let fast_field_reader = open::(fast_field_bytes)?; @@ -256,7 +256,7 @@ mod tests { serializer.close()?; } let file = directory.open_read(path)?; - assert_eq!(file.len(), 53); + assert_eq!(file.len(), 62); { let fast_fields_composite = CompositeFile::open(&file)?; let data = fast_fields_composite @@ -297,7 +297,7 @@ mod tests { serializer.close().unwrap(); } let file = directory.open_read(path).unwrap(); - assert_eq!(file.len(), 26); + assert_eq!(file.len(), 35); { let fast_fields_composite = CompositeFile::open(&file).unwrap(); let data = fast_fields_composite @@ -336,7 +336,7 @@ mod tests { serializer.close().unwrap(); } let file = directory.open_read(path).unwrap(); - assert_eq!(file.len(), 80040); + assert_eq!(file.len(), 80049); { let fast_fields_composite = CompositeFile::open(&file)?; let data = fast_fields_composite @@ -378,7 +378,7 @@ mod tests { serializer.close().unwrap(); } let file = directory.open_read(path).unwrap(); - assert_eq!(file.len(), 40_usize); + assert_eq!(file.len(), 49_usize); { let fast_fields_composite = CompositeFile::open(&file)?; @@ -822,7 +822,7 @@ mod tests { serializer.close().unwrap(); } let file = directory.open_read(path).unwrap(); - assert_eq!(file.len(), 24); + assert_eq!(file.len(), 33); let composite_file = CompositeFile::open(&file)?; let data = composite_file.open_read(field).unwrap().read_bytes()?; let fast_field_reader = open::(data)?; @@ -860,7 +860,7 @@ mod tests { serializer.close().unwrap(); } let file = directory.open_read(path).unwrap(); - assert_eq!(file.len(), 36); + assert_eq!(file.len(), 45); let composite_file = CompositeFile::open(&file)?; let data = composite_file.open_read(field).unwrap().read_bytes()?; let fast_field_reader = open::(data)?; @@ -892,7 +892,7 @@ mod tests { } let file = directory.open_read(path).unwrap(); let composite_file = CompositeFile::open(&file)?; - assert_eq!(file.len(), 23); + assert_eq!(file.len(), 32); let data = composite_file.open_read(field).unwrap().read_bytes()?; let fast_field_reader = open::(data)?; assert_eq!(fast_field_reader.get_val(0), false); @@ -926,10 +926,10 @@ mod tests { pub fn test_gcd_date() -> crate::Result<()> { let size_prec_sec = test_gcd_date_with_codec(FastFieldCodecType::Bitpacked, DatePrecision::Seconds)?; - assert_eq!(size_prec_sec, 28 + (1_000 * 13) / 8); // 13 bits per val = ceil(log_2(number of seconds in 2hours); + assert_eq!(size_prec_sec, 5 + 4 + 28 + (1_000 * 13) / 8); // 13 bits per val = ceil(log_2(number of seconds in 2hours); let size_prec_micro = test_gcd_date_with_codec(FastFieldCodecType::Bitpacked, DatePrecision::Microseconds)?; - assert_eq!(size_prec_micro, 26 + (1_000 * 33) / 8); // 33 bits per val = ceil(log_2(number of microsecsseconds in 2hours); + assert_eq!(size_prec_micro, 5 + 4 + 26 + (1_000 * 33) / 8); // 33 bits per val = ceil(log_2(number of microsecsseconds in 2hours); Ok(()) }