diff --git a/Cargo.toml b/Cargo.toml index 5cf1696345..e9ee6bd3ed 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -60,6 +60,7 @@ pretty_assertions = "1.2.1" serde_cbor = { version = "0.11.2", optional = true } async-trait = "0.1.53" arc-swap = "1.5.0" +gcd = "2.1.0" [target.'cfg(windows)'.dependencies] winapi = "0.3.9" diff --git a/fastfield_codecs/src/bitpacked.rs b/fastfield_codecs/src/bitpacked.rs index 5509a78a7a..088c39efe9 100644 --- a/fastfield_codecs/src/bitpacked.rs +++ b/fastfield_codecs/src/bitpacked.rs @@ -107,7 +107,7 @@ impl FastFieldCodecSerializer for BitpackedFastFieldSerializer { /// values. fn serialize( write: &mut impl Write, - _fastfield_accessor: &impl FastFieldDataAccess, + _fastfield_accessor: &dyn FastFieldDataAccess, stats: FastFieldStats, data_iter: impl Iterator, _data_iter1: impl Iterator, diff --git a/fastfield_codecs/src/lib.rs b/fastfield_codecs/src/lib.rs index 9285321ea9..b75b76b306 100644 --- a/fastfield_codecs/src/lib.rs +++ b/fastfield_codecs/src/lib.rs @@ -42,7 +42,7 @@ pub trait FastFieldCodecSerializer { /// The iterators should be preferred over using fastfield_accessor for performance reasons. fn serialize( write: &mut impl Write, - fastfield_accessor: &impl FastFieldDataAccess, + fastfield_accessor: &dyn FastFieldDataAccess, stats: FastFieldStats, data_iter: impl Iterator, data_iter1: impl Iterator, diff --git a/fastfield_codecs/src/linearinterpol.rs b/fastfield_codecs/src/linearinterpol.rs index b4d0bb4801..4f10df2262 100644 --- a/fastfield_codecs/src/linearinterpol.rs +++ b/fastfield_codecs/src/linearinterpol.rs @@ -111,7 +111,7 @@ impl FastFieldCodecSerializer for LinearInterpolFastFieldSerializer { /// Creates a new fast field serializer. fn serialize( write: &mut impl Write, - fastfield_accessor: &impl FastFieldDataAccess, + fastfield_accessor: &dyn FastFieldDataAccess, stats: FastFieldStats, data_iter: impl Iterator, data_iter1: impl Iterator, diff --git a/fastfield_codecs/src/multilinearinterpol.rs b/fastfield_codecs/src/multilinearinterpol.rs index d1f122bf04..b45d307e94 100644 --- a/fastfield_codecs/src/multilinearinterpol.rs +++ b/fastfield_codecs/src/multilinearinterpol.rs @@ -195,7 +195,7 @@ impl FastFieldCodecSerializer for MultiLinearInterpolFastFieldSerializer { /// Creates a new fast field serializer. fn serialize( write: &mut impl Write, - fastfield_accessor: &impl FastFieldDataAccess, + fastfield_accessor: &dyn FastFieldDataAccess, stats: FastFieldStats, data_iter: impl Iterator, _data_iter1: impl Iterator, diff --git a/src/fastfield/gcd.rs b/src/fastfield/gcd.rs new file mode 100644 index 0000000000..ac4fb9ec47 --- /dev/null +++ b/src/fastfield/gcd.rs @@ -0,0 +1,224 @@ +use std::io::{self, Write}; + +use common::BinarySerializable; +use fastdivide::DividerU64; +use fastfield_codecs::FastFieldCodecReader; +use gcd::Gcd; + +pub const GCD_DEFAULT: u64 = 1; +pub const GCD_CODEC_ID: u8 = 4; + +/// Wrapper for accessing a fastfield. +/// +/// Holds the data and the codec to the read the data. +#[derive(Clone)] +pub struct GCDFastFieldCodec { + gcd: u64, + min_value: u64, + reader: CodecReader, +} +impl FastFieldCodecReader for GCDFastFieldCodec { + /// Opens a fast field given the bytes. + fn open_from_bytes(bytes: &[u8]) -> std::io::Result { + let (header, mut footer) = bytes.split_at(bytes.len() - 16); + let gcd = u64::deserialize(&mut footer)?; + let min_value = u64::deserialize(&mut footer)?; + let reader = C::open_from_bytes(header)?; + + Ok(GCDFastFieldCodec { + gcd, + min_value, + reader, + }) + } + + #[inline] + fn get_u64(&self, doc: u64, data: &[u8]) -> u64 { + let mut data = self.reader.get_u64(doc, data); + data *= self.gcd; + data += self.min_value; + data + } + + fn min_value(&self) -> u64 { + self.min_value + self.reader.min_value() * self.gcd + } + + fn max_value(&self) -> u64 { + self.min_value + self.reader.max_value() * self.gcd + } +} + +pub fn write_gcd_header(field_write: &mut W, min_value: u64, gcd: u64) -> io::Result<()> { + gcd.serialize(field_write)?; + min_value.serialize(field_write)?; + Ok(()) +} + +// Find GCD for iterator of numbers +pub fn find_gcd(numbers: impl Iterator) -> Option { + let mut numbers = numbers.filter(|n| *n != 0); + let mut gcd = numbers.next()?; + if gcd == 1 { + return Some(1); + } + + let mut gcd_divider = DividerU64::divide_by(gcd); + for val in numbers { + let remainder = val - (gcd_divider.divide(val)) * gcd; + if remainder == 0 { + continue; + } + gcd = gcd.gcd(val); + if gcd == 1 { + return Some(1); + } + + gcd_divider = DividerU64::divide_by(gcd); + } + Some(gcd) +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + use std::path::Path; + + use common::HasLen; + + use crate::directory::{CompositeFile, RamDirectory, WritePtr}; + use crate::fastfield::serializer::FastFieldCodecEnableCheck; + use crate::fastfield::tests::{FIELD, FIELDI64, SCHEMA, SCHEMAI64}; + use crate::fastfield::{ + find_gcd, CompositeFastFieldSerializer, DynamicFastFieldReader, FastFieldCodecName, + FastFieldReader, FastFieldsWriter, ALL_CODECS, + }; + use crate::schema::Schema; + use crate::Directory; + + fn get_index( + docs: &[crate::Document], + schema: &Schema, + codec_enable_checker: FastFieldCodecEnableCheck, + ) -> crate::Result { + let directory: RamDirectory = RamDirectory::create(); + { + let write: WritePtr = directory.open_write(Path::new("test")).unwrap(); + let mut serializer = + CompositeFastFieldSerializer::from_write_with_codec(write, codec_enable_checker) + .unwrap(); + let mut fast_field_writers = FastFieldsWriter::from_schema(schema); + for doc in docs { + fast_field_writers.add_document(doc); + } + fast_field_writers + .serialize(&mut serializer, &HashMap::new(), None) + .unwrap(); + serializer.close().unwrap(); + } + Ok(directory) + } + + fn test_fastfield_gcd_i64_with_codec( + codec_name: FastFieldCodecName, + num_vals: usize, + ) -> crate::Result<()> { + let path = Path::new("test"); + let mut docs = vec![]; + for i in 1..=num_vals { + let val = i as i64 * 1000i64; + docs.push(doc!(*FIELDI64=>val)); + } + let directory = get_index(&docs, &SCHEMAI64, codec_name.clone().into())?; + let file = directory.open_read(path).unwrap(); + // assert_eq!(file.len(), 118); + let composite_file = CompositeFile::open(&file)?; + let file = composite_file.open_read(*FIELD).unwrap(); + let fast_field_reader = DynamicFastFieldReader::::open(file)?; + assert_eq!(fast_field_reader.get(0), 1000i64); + assert_eq!(fast_field_reader.get(1), 2000i64); + assert_eq!(fast_field_reader.get(2), 3000i64); + assert_eq!(fast_field_reader.max_value(), num_vals as i64 * 1000); + assert_eq!(fast_field_reader.min_value(), 1000i64); + let file = directory.open_read(path).unwrap(); + + // Can't apply gcd + let path = Path::new("test"); + docs.pop(); + docs.push(doc!(*FIELDI64=>2001i64)); + let directory = get_index(&docs, &SCHEMAI64, codec_name.into())?; + let file2 = directory.open_read(path).unwrap(); + assert!(file2.len() > file.len()); + + Ok(()) + } + + #[test] + fn test_fastfield_gcd_i64() -> crate::Result<()> { + for codec_name in ALL_CODECS { + test_fastfield_gcd_i64_with_codec(codec_name.clone(), 5005)?; + } + Ok(()) + } + + fn test_fastfield_gcd_u64_with_codec( + codec_name: FastFieldCodecName, + num_vals: usize, + ) -> crate::Result<()> { + let path = Path::new("test"); + let mut docs = vec![]; + for i in 1..=num_vals { + let val = i as u64 * 1000u64; + docs.push(doc!(*FIELD=>val)); + } + let directory = get_index(&docs, &SCHEMA, codec_name.clone().into())?; + let file = directory.open_read(path).unwrap(); + // assert_eq!(file.len(), 118); + let composite_file = CompositeFile::open(&file)?; + let file = composite_file.open_read(*FIELD).unwrap(); + let fast_field_reader = DynamicFastFieldReader::::open(file)?; + assert_eq!(fast_field_reader.get(0), 1000u64); + assert_eq!(fast_field_reader.get(1), 2000u64); + assert_eq!(fast_field_reader.get(2), 3000u64); + assert_eq!(fast_field_reader.max_value(), num_vals as u64 * 1000); + assert_eq!(fast_field_reader.min_value(), 1000u64); + let file = directory.open_read(path).unwrap(); + + // Can't apply gcd + let path = Path::new("test"); + docs.pop(); + docs.push(doc!(*FIELDI64=>2001u64)); + let directory = get_index(&docs, &SCHEMA, codec_name.into())?; + let file2 = directory.open_read(path).unwrap(); + assert!(file2.len() > file.len()); + + Ok(()) + } + + #[test] + fn test_fastfield_gcd_u64() -> crate::Result<()> { + for codec_name in ALL_CODECS { + test_fastfield_gcd_u64_with_codec(codec_name.clone(), 5005)?; + } + Ok(()) + } + + #[test] + pub fn test_fastfield2() { + let test_fastfield = DynamicFastFieldReader::::from(vec![100, 200, 300]); + assert_eq!(test_fastfield.get(0), 100); + assert_eq!(test_fastfield.get(1), 200); + assert_eq!(test_fastfield.get(2), 300); + } + + #[test] + fn find_gcd_test() { + assert_eq!(find_gcd([0].into_iter()), None); + assert_eq!(find_gcd([0, 10].into_iter()), Some(10)); + assert_eq!(find_gcd([10, 0].into_iter()), Some(10)); + assert_eq!(find_gcd([].into_iter()), None); + assert_eq!(find_gcd([15, 30, 5, 10].into_iter()), Some(5)); + assert_eq!(find_gcd([15, 16, 10].into_iter()), Some(1)); + assert_eq!(find_gcd([0, 5, 5, 5].into_iter()), Some(5)); + } +} diff --git a/src/fastfield/mod.rs b/src/fastfield/mod.rs index 004a5328e6..c6277a255d 100644 --- a/src/fastfield/mod.rs +++ b/src/fastfield/mod.rs @@ -24,6 +24,7 @@ pub use self::alive_bitset::{intersect_alive_bitsets, write_alive_bitset, AliveB pub use self::bytes::{BytesFastFieldReader, BytesFastFieldWriter}; pub use self::error::{FastFieldNotAvailableError, Result}; pub use self::facet_reader::FacetReader; +pub(crate) use self::gcd::{find_gcd, GCDFastFieldCodec, GCD_CODEC_ID, GCD_DEFAULT}; pub use self::multivalued::{MultiValuedFastFieldReader, MultiValuedFastFieldWriter}; pub use self::reader::{DynamicFastFieldReader, FastFieldReader}; pub use self::readers::FastFieldReaders; @@ -37,12 +38,25 @@ mod alive_bitset; mod bytes; mod error; mod facet_reader; +mod gcd; mod multivalued; mod reader; mod readers; mod serializer; mod writer; +#[derive(PartialEq, Eq, PartialOrd, Ord, Debug, Clone)] +pub(crate) enum FastFieldCodecName { + Bitpacked, + LinearInterpol, + BlockwiseLinearInterpol, +} +pub(crate) const ALL_CODECS: &[FastFieldCodecName; 3] = &[ + FastFieldCodecName::Bitpacked, + FastFieldCodecName::LinearInterpol, + FastFieldCodecName::BlockwiseLinearInterpol, +]; + /// Trait for `BytesFastFieldReader` and `MultiValuedFastFieldReader` to return the length of data /// for a doc_id pub trait MultiValueLength { @@ -276,7 +290,14 @@ mod tests { schema_builder.build() }); + pub static SCHEMAI64: Lazy = Lazy::new(|| { + let mut schema_builder = Schema::builder(); + schema_builder.add_i64_field("field", FAST); + schema_builder.build() + }); + pub static FIELD: Lazy = Lazy::new(|| SCHEMA.get_field("field").unwrap()); + pub static FIELDI64: Lazy = Lazy::new(|| SCHEMAI64.get_field("field").unwrap()); #[test] pub fn test_fastfield() { @@ -425,7 +446,7 @@ mod tests { } #[test] - fn test_signed_intfastfield() -> crate::Result<()> { + fn test_signed_intfastfield_normal() -> crate::Result<()> { let path = Path::new("test"); let directory: RamDirectory = RamDirectory::create(); let mut schema_builder = Schema::builder(); @@ -505,10 +526,15 @@ mod tests { permutation } - #[test] - fn test_intfastfield_permutation() -> crate::Result<()> { + // Warning: this generates the same permutation at each call + pub fn generate_permutation_gcd() -> Vec { + let mut permutation: Vec = (1u64..100_000u64).map(|el| el * 1000).collect(); + permutation.shuffle(&mut StdRng::from_seed([1u8; 32])); + permutation + } + + fn test_intfastfield_permutation_with_data(permutation: Vec) -> crate::Result<()> { let path = Path::new("test"); - let permutation = generate_permutation(); let n = permutation.len(); let directory = RamDirectory::create(); { @@ -527,15 +553,27 @@ mod tests { let data = fast_fields_composite.open_read(*FIELD).unwrap(); let fast_field_reader = DynamicFastFieldReader::::open(data)?; - let mut a = 0u64; - for _ in 0..n { + for a in 0..n { assert_eq!(fast_field_reader.get(a as u32), permutation[a as usize]); - a = fast_field_reader.get(a as u32); } } Ok(()) } + #[test] + fn test_intfastfield_permutation_gcd() -> crate::Result<()> { + let permutation = generate_permutation_gcd(); + test_intfastfield_permutation_with_data(permutation)?; + Ok(()) + } + + #[test] + fn test_intfastfield_permutation() -> crate::Result<()> { + let permutation = generate_permutation(); + test_intfastfield_permutation_with_data(permutation)?; + Ok(()) + } + #[test] fn test_merge_missing_date_fast_field() -> crate::Result<()> { let mut schema_builder = Schema::builder(); @@ -951,6 +989,7 @@ mod bench { use super::tests::{generate_permutation, FIELD, SCHEMA}; use super::*; use crate::directory::{CompositeFile, Directory, RamDirectory, WritePtr}; + use crate::fastfield::tests::generate_permutation_gcd; use crate::fastfield::FastFieldReader; #[bench] @@ -1037,10 +1076,42 @@ mod bench { let fast_field_reader = DynamicFastFieldReader::::open(data).unwrap(); b.iter(|| { - let n = test::black_box(1000u32); let mut a = 0u32; - for _ in 0u32..n { - a = fast_field_reader.get(a) as u32; + for i in 0u32..permutation.len() as u32 { + a = fast_field_reader.get(i) as u32; + } + a + }); + } + } + + #[bench] + fn bench_intfastfield_fflookup_gcd(b: &mut Bencher) { + let path = Path::new("test"); + let permutation = generate_permutation_gcd(); + let directory: RamDirectory = RamDirectory::create(); + { + let write: WritePtr = directory.open_write(Path::new("test")).unwrap(); + let mut serializer = CompositeFastFieldSerializer::from_write(write).unwrap(); + let mut fast_field_writers = FastFieldsWriter::from_schema(&SCHEMA); + for &x in &permutation { + fast_field_writers.add_document(&doc!(*FIELD=>x)); + } + fast_field_writers + .serialize(&mut serializer, &HashMap::new(), None) + .unwrap(); + serializer.close().unwrap(); + } + let file = directory.open_read(&path).unwrap(); + { + let fast_fields_composite = CompositeFile::open(&file).unwrap(); + let data = fast_fields_composite.open_read(*FIELD).unwrap(); + let fast_field_reader = DynamicFastFieldReader::::open(data).unwrap(); + + b.iter(|| { + let mut a = 0u32; + for i in 0u32..permutation.len() as u32 { + a = fast_field_reader.get(i) as u32; } a }); diff --git a/src/fastfield/multivalued/mod.rs b/src/fastfield/multivalued/mod.rs index 69870d0324..c7ba3313a5 100644 --- a/src/fastfield/multivalued/mod.rs +++ b/src/fastfield/multivalued/mod.rs @@ -346,6 +346,13 @@ mod tests { assert!(test_multivalued_no_panic(&ops[..]).is_ok()); } } + #[test] + fn test_multivalued_proptest_gcd() { + use IndexingOp::*; + let ops = [AddDoc { id: 9 }, AddDoc { id: 9 }, Merge]; + + assert!(test_multivalued_no_panic(&ops[..]).is_ok()); + } #[test] fn test_multivalued_proptest_off_by_one_bug_1151() { diff --git a/src/fastfield/reader.rs b/src/fastfield/reader.rs index eeb6b3d9bc..79f342c9b1 100644 --- a/src/fastfield/reader.rs +++ b/src/fastfield/reader.rs @@ -2,7 +2,6 @@ use std::collections::HashMap; use std::marker::PhantomData; use std::path::Path; -use common::BinarySerializable; use fastfield_codecs::bitpacked::{ BitpackedFastFieldReader as BitpackedReader, BitpackedFastFieldSerializer, }; @@ -14,7 +13,7 @@ use fastfield_codecs::multilinearinterpol::{ }; use fastfield_codecs::{FastFieldCodecReader, FastFieldCodecSerializer}; -use super::FastValue; +use super::{FastValue, GCDFastFieldCodec, GCD_CODEC_ID}; use crate::directory::{CompositeFile, Directory, FileSlice, OwnedBytes, RamDirectory, WritePtr}; use crate::fastfield::{CompositeFastFieldSerializer, FastFieldsWriter}; use crate::schema::{Schema, FAST}; @@ -71,15 +70,26 @@ pub enum DynamicFastFieldReader { LinearInterpol(FastFieldReaderCodecWrapper), /// Blockwise linear interpolated values + bitpacked MultiLinearInterpol(FastFieldReaderCodecWrapper), + + /// GCD and Bitpacked compressed fastfield data. + BitpackedGCD(FastFieldReaderCodecWrapper>), + /// GCD and Linear interpolated values + bitpacked + LinearInterpolGCD( + FastFieldReaderCodecWrapper>, + ), + /// GCD and Blockwise linear interpolated values + bitpacked + MultiLinearInterpolGCD( + FastFieldReaderCodecWrapper>, + ), } impl DynamicFastFieldReader { /// Returns correct the reader wrapped in the `DynamicFastFieldReader` enum for the data. - pub fn open(file: FileSlice) -> crate::Result> { - let mut bytes = file.read_bytes()?; - let id = bytes.read_u8(); - - let reader = match id { + pub fn open_from_id( + mut bytes: OwnedBytes, + codec_id: u8, + ) -> crate::Result> { + let reader = match codec_id { BitpackedFastFieldSerializer::ID => { DynamicFastFieldReader::Bitpacked(FastFieldReaderCodecWrapper::< Item, @@ -100,15 +110,59 @@ impl DynamicFastFieldReader { bytes )?) } + _ if codec_id == GCD_CODEC_ID => { + let codec_id = bytes.read_u8(); + + match codec_id { + BitpackedFastFieldSerializer::ID => { + DynamicFastFieldReader::BitpackedGCD(FastFieldReaderCodecWrapper::< + Item, + GCDFastFieldCodec, + >::open_from_bytes( + bytes + )?) + } + LinearInterpolFastFieldSerializer::ID => { + DynamicFastFieldReader::LinearInterpolGCD(FastFieldReaderCodecWrapper::< + Item, + GCDFastFieldCodec, + >::open_from_bytes( + bytes + )?) + } + MultiLinearInterpolFastFieldSerializer::ID => { + DynamicFastFieldReader::MultiLinearInterpolGCD( + FastFieldReaderCodecWrapper::< + Item, + GCDFastFieldCodec, + >::open_from_bytes(bytes)?, + ) + } + _ => { + panic!( + "unknown fastfield codec id {:?}. Data corrupted or using old tantivy \ + version.", + codec_id + ) + } + } + } _ => { panic!( - "unknown fastfield id {:?}. Data corrupted or using old tantivy version.", - id + "unknown fastfield codec id {:?}. Data corrupted or using old tantivy version.", + codec_id ) } }; Ok(reader) } + /// Returns correct the reader wrapped in the `DynamicFastFieldReader` enum for the data. + pub fn open(file: FileSlice) -> crate::Result> { + let mut bytes = file.read_bytes()?; + let codec_id = bytes.read_u8(); + + Self::open_from_id(bytes, codec_id) + } } impl FastFieldReader for DynamicFastFieldReader { @@ -118,6 +172,9 @@ impl FastFieldReader for DynamicFastFieldReader { Self::Bitpacked(reader) => reader.get(doc), Self::LinearInterpol(reader) => reader.get(doc), Self::MultiLinearInterpol(reader) => reader.get(doc), + Self::BitpackedGCD(reader) => reader.get(doc), + Self::LinearInterpolGCD(reader) => reader.get(doc), + Self::MultiLinearInterpolGCD(reader) => reader.get(doc), } } #[inline] @@ -126,6 +183,9 @@ impl FastFieldReader for DynamicFastFieldReader { Self::Bitpacked(reader) => reader.get_range(start, output), Self::LinearInterpol(reader) => reader.get_range(start, output), Self::MultiLinearInterpol(reader) => reader.get_range(start, output), + Self::BitpackedGCD(reader) => reader.get_range(start, output), + Self::LinearInterpolGCD(reader) => reader.get_range(start, output), + Self::MultiLinearInterpolGCD(reader) => reader.get_range(start, output), } } fn min_value(&self) -> Item { @@ -133,6 +193,9 @@ impl FastFieldReader for DynamicFastFieldReader { Self::Bitpacked(reader) => reader.min_value(), Self::LinearInterpol(reader) => reader.min_value(), Self::MultiLinearInterpol(reader) => reader.min_value(), + Self::BitpackedGCD(reader) => reader.min_value(), + Self::LinearInterpolGCD(reader) => reader.min_value(), + Self::MultiLinearInterpolGCD(reader) => reader.min_value(), } } fn max_value(&self) -> Item { @@ -140,6 +203,9 @@ impl FastFieldReader for DynamicFastFieldReader { Self::Bitpacked(reader) => reader.max_value(), Self::LinearInterpol(reader) => reader.max_value(), Self::MultiLinearInterpol(reader) => reader.max_value(), + Self::BitpackedGCD(reader) => reader.max_value(), + Self::LinearInterpolGCD(reader) => reader.max_value(), + Self::MultiLinearInterpolGCD(reader) => reader.max_value(), } } } @@ -158,10 +224,10 @@ impl FastFieldReaderCodecWrapper crate::Result { let mut bytes = file.read_bytes()?; - let id = u8::deserialize(&mut bytes)?; + let codec_id = bytes.read_u8(); assert_eq!( BitpackedFastFieldSerializer::ID, - id, + codec_id, "Tried to open fast field as bitpacked encoded (id=1), but got serializer with \ different id" ); @@ -178,7 +244,8 @@ impl FastFieldReaderCodecWrapper Item { - Item::from_u64(self.reader.get_u64(doc, self.bytes.as_slice())) + let data = self.reader.get_u64(doc, self.bytes.as_slice()); + Item::from_u64(data) } /// Internally `multivalued` also use SingleValue Fast fields. diff --git a/src/fastfield/serializer/mod.rs b/src/fastfield/serializer/mod.rs index cc53297301..2ab4f22e66 100644 --- a/src/fastfield/serializer/mod.rs +++ b/src/fastfield/serializer/mod.rs @@ -8,7 +8,10 @@ use fastfield_codecs::linearinterpol::LinearInterpolFastFieldSerializer; use fastfield_codecs::multilinearinterpol::MultiLinearInterpolFastFieldSerializer; pub use fastfield_codecs::{FastFieldCodecSerializer, FastFieldDataAccess, FastFieldStats}; +use super::{find_gcd, FastFieldCodecName, ALL_CODECS, GCD_DEFAULT}; use crate::directory::{CompositeWrite, WritePtr}; +use crate::fastfield::gcd::write_gcd_header; +use crate::fastfield::GCD_CODEC_ID; use crate::schema::Field; /// `CompositeFastFieldSerializer` is in charge of serializing @@ -33,6 +36,30 @@ use crate::schema::Field; /// * `close()` pub struct CompositeFastFieldSerializer { composite_write: CompositeWrite, + codec_enable_checker: FastFieldCodecEnableCheck, +} + +#[derive(Debug, Clone)] +pub struct FastFieldCodecEnableCheck { + enabled_codecs: Vec, +} +impl FastFieldCodecEnableCheck { + fn allow_all() -> Self { + FastFieldCodecEnableCheck { + enabled_codecs: ALL_CODECS.to_vec(), + } + } + fn is_enabled(&self, codec_name: FastFieldCodecName) -> bool { + self.enabled_codecs.contains(&codec_name) + } +} + +impl From for FastFieldCodecEnableCheck { + fn from(codec_name: FastFieldCodecName) -> Self { + FastFieldCodecEnableCheck { + enabled_codecs: vec![codec_name], + } + } } // use this, when this is merged and stabilized explicit_generic_args_with_impl_trait @@ -52,60 +79,154 @@ fn codec_estimation( impl CompositeFastFieldSerializer { /// Constructor pub fn from_write(write: WritePtr) -> io::Result { + Self::from_write_with_codec(write, FastFieldCodecEnableCheck::allow_all()) + } + + /// Constructor + pub fn from_write_with_codec( + write: WritePtr, + codec_enable_checker: FastFieldCodecEnableCheck, + ) -> io::Result { // just making room for the pointer to header. let composite_write = CompositeWrite::wrap(write); - Ok(CompositeFastFieldSerializer { composite_write }) + Ok(CompositeFastFieldSerializer { + composite_write, + codec_enable_checker, + }) } /// Serialize data into a new u64 fast field. The best compression codec will be chosen /// automatically. - pub fn create_auto_detect_u64_fast_field( + pub fn create_auto_detect_u64_fast_field( &mut self, field: Field, stats: FastFieldStats, fastfield_accessor: impl FastFieldDataAccess, - data_iter_1: impl Iterator, - data_iter_2: impl Iterator, - ) -> io::Result<()> { + iter_gen: F, + ) -> io::Result<()> + where + F: Fn() -> I, + I: Iterator, + { self.create_auto_detect_u64_fast_field_with_idx( field, stats, fastfield_accessor, - data_iter_1, - data_iter_2, + iter_gen, 0, ) } + + /// Serialize data into a new u64 fast field. The best compression codec will be chosen + /// automatically. + pub fn write_header(field_write: &mut W, codec_id: u8) -> io::Result<()> { + codec_id.serialize(field_write)?; + + Ok(()) + } + /// Serialize data into a new u64 fast field. The best compression codec will be chosen /// automatically. - pub fn create_auto_detect_u64_fast_field_with_idx( + pub fn create_auto_detect_u64_fast_field_with_idx( &mut self, field: Field, stats: FastFieldStats, fastfield_accessor: impl FastFieldDataAccess, - data_iter_1: impl Iterator, - data_iter_2: impl Iterator, + iter_gen: F, idx: usize, - ) -> io::Result<()> { + ) -> io::Result<()> + where + F: Fn() -> I, + I: Iterator, + { let field_write = self.composite_write.for_field_with_idx(field, idx); + let gcd = find_gcd(iter_gen().map(|val| val - stats.min_value)).unwrap_or(GCD_DEFAULT); + + if gcd == 1 { + return Self::create_auto_detect_u64_fast_field_with_idx_gcd( + self.codec_enable_checker.clone(), + field, + field_write, + stats, + fastfield_accessor, + iter_gen(), + iter_gen(), + ); + } + + Self::write_header(field_write, GCD_CODEC_ID)?; + struct GCDWrappedFFAccess { + fastfield_accessor: T, + min_value: u64, + gcd: u64, + } + impl FastFieldDataAccess for GCDWrappedFFAccess { + fn get_val(&self, position: u64) -> u64 { + (self.fastfield_accessor.get_val(position) - self.min_value) / self.gcd + } + } + + let fastfield_accessor = GCDWrappedFFAccess { + fastfield_accessor, + min_value: stats.min_value, + gcd, + }; + let min_value = stats.min_value; + let stats = FastFieldStats { + min_value: 0, + max_value: (stats.max_value - stats.min_value) / gcd, + num_vals: stats.num_vals, + }; + let iter1 = iter_gen().map(|val| (val - min_value) / gcd); + let iter2 = iter_gen().map(|val| (val - min_value) / gcd); + Self::create_auto_detect_u64_fast_field_with_idx_gcd( + self.codec_enable_checker.clone(), + field, + field_write, + stats, + fastfield_accessor, + iter1, + iter2, + )?; + write_gcd_header(field_write, min_value, gcd)?; + Ok(()) + } + + /// Serialize data into a new u64 fast field. The best compression codec will be chosen + /// automatically. + pub fn create_auto_detect_u64_fast_field_with_idx_gcd( + codec_enable_checker: FastFieldCodecEnableCheck, + field: Field, + field_write: &mut CountingWriter, + stats: FastFieldStats, + fastfield_accessor: impl FastFieldDataAccess, + iter1: impl Iterator, + iter2: impl Iterator, + ) -> io::Result<()> { let mut estimations = vec![]; - codec_estimation::( - stats.clone(), - &fastfield_accessor, - &mut estimations, - ); - codec_estimation::( - stats.clone(), - &fastfield_accessor, - &mut estimations, - ); - codec_estimation::( - stats.clone(), - &fastfield_accessor, - &mut estimations, - ); + if codec_enable_checker.is_enabled(FastFieldCodecName::Bitpacked) { + codec_estimation::( + stats.clone(), + &fastfield_accessor, + &mut estimations, + ); + } + if codec_enable_checker.is_enabled(FastFieldCodecName::LinearInterpol) { + codec_estimation::( + stats.clone(), + &fastfield_accessor, + &mut estimations, + ); + } + if codec_enable_checker.is_enabled(FastFieldCodecName::BlockwiseLinearInterpol) { + codec_estimation::( + stats.clone(), + &fastfield_accessor, + &mut estimations, + ); + } if let Some(broken_estimation) = estimations.iter().find(|estimation| estimation.0.is_nan()) { warn!( @@ -122,15 +243,16 @@ impl CompositeFastFieldSerializer { "choosing fast field codec {} for field_id {:?}", name, field ); // todo print actual field name - id.serialize(field_write)?; + + Self::write_header(field_write, id)?; match name { BitpackedFastFieldSerializer::NAME => { BitpackedFastFieldSerializer::serialize( field_write, &fastfield_accessor, stats, - data_iter_1, - data_iter_2, + iter1, + iter2, )?; } LinearInterpolFastFieldSerializer::NAME => { @@ -138,8 +260,8 @@ impl CompositeFastFieldSerializer { field_write, &fastfield_accessor, stats, - data_iter_1, - data_iter_2, + iter1, + iter2, )?; } MultiLinearInterpolFastFieldSerializer::NAME => { @@ -147,19 +269,29 @@ impl CompositeFastFieldSerializer { field_write, &fastfield_accessor, stats, - data_iter_1, - data_iter_2, + iter1, + iter2, )?; } _ => { panic!("unknown fastfield serializer {}", name) } - }; + } field_write.flush()?; Ok(()) } + /// Start serializing a new u64 fast field + pub fn serialize_into( + &mut self, + field: Field, + min_value: u64, + max_value: u64, + ) -> io::Result>> { + self.new_u64_fast_field_with_idx(field, min_value, max_value, 0) + } + /// Start serializing a new u64 fast field pub fn new_u64_fast_field( &mut self, diff --git a/src/fastfield/writer.rs b/src/fastfield/writer.rs index 4d1b5d3467..f5d1d4e014 100644 --- a/src/fastfield/writer.rs +++ b/src/fastfield/writer.rs @@ -370,23 +370,25 @@ impl IntFastFieldWriter { }; if let Some(doc_id_map) = doc_id_map { - let iter = doc_id_map - .iter_old_doc_ids() - .map(|doc_id| self.vals.get(doc_id as usize)); + let iter_gen = || { + doc_id_map + .iter_old_doc_ids() + .map(|doc_id| self.vals.get(doc_id as usize)) + }; serializer.create_auto_detect_u64_fast_field( self.field, stats, fastfield_accessor, - iter.clone(), - iter, + iter_gen, )?; } else { + let iter_gen = || self.vals.iter(); + serializer.create_auto_detect_u64_fast_field( self.field, stats, fastfield_accessor, - self.vals.iter(), - self.vals.iter(), + iter_gen, )?; }; Ok(()) diff --git a/src/indexer/merger.rs b/src/indexer/merger.rs index 2ac6ec339b..8b5d60b99a 100644 --- a/src/indexer/merger.rs +++ b/src/indexer/merger.rs @@ -385,20 +385,17 @@ impl IndexMerger { doc_id_mapping, fast_field_readers: &fast_field_readers, }; - let iter1 = doc_id_mapping.iter().map(|(doc_id, reader_ordinal)| { - let fast_field_reader = &fast_field_readers[*reader_ordinal as usize]; - fast_field_reader.get(*doc_id) - }); - let iter2 = doc_id_mapping.iter().map(|(doc_id, reader_ordinal)| { - let fast_field_reader = &fast_field_readers[*reader_ordinal as usize]; - fast_field_reader.get(*doc_id) - }); + let iter_gen = || { + doc_id_mapping.iter().map(|(doc_id, reader_ordinal)| { + let fast_field_reader = &fast_field_readers[*reader_ordinal as usize]; + fast_field_reader.get(*doc_id) + }) + }; fast_field_serializer.create_auto_detect_u64_fast_field( field, stats, fastfield_accessor, - iter1, - iter2, + iter_gen, )?; Ok(()) @@ -560,12 +557,12 @@ impl IndexMerger { } offsets.push(offset); + let iter_gen = || offsets.iter().cloned(); fast_field_serializer.create_auto_detect_u64_fast_field( field, stats, &offsets[..], - offsets.iter().cloned(), - offsets.iter().cloned(), + iter_gen, )?; Ok(offsets) } @@ -768,24 +765,19 @@ impl IndexMerger { fast_field_readers: &ff_readers, offsets, }; - let iter1 = doc_id_mapping.iter().flat_map(|(doc_id, reader_ordinal)| { - let ff_reader = &ff_readers[*reader_ordinal as usize]; - let mut vals = vec![]; - ff_reader.get_vals(*doc_id, &mut vals); - vals.into_iter() - }); - let iter2 = doc_id_mapping.iter().flat_map(|(doc_id, reader_ordinal)| { - let ff_reader = &ff_readers[*reader_ordinal as usize]; - let mut vals = vec![]; - ff_reader.get_vals(*doc_id, &mut vals); - vals.into_iter() - }); + let iter_gen = || { + doc_id_mapping.iter().flat_map(|(doc_id, reader_ordinal)| { + let ff_reader = &ff_readers[*reader_ordinal as usize]; + let mut vals = vec![]; + ff_reader.get_vals(*doc_id, &mut vals); + vals.into_iter() + }) + }; fast_field_serializer.create_auto_detect_u64_fast_field_with_idx( field, stats, fastfield_accessor, - iter1, - iter2, + iter_gen, 1, )?;