diff --git a/Cargo.toml b/Cargo.toml index 5cf1696345..c3d817b01c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -60,6 +60,8 @@ pretty_assertions = "1.2.1" serde_cbor = { version = "0.11.2", optional = true } async-trait = "0.1.53" arc-swap = "1.5.0" +gcd = "2.1.0" +libdivide = "0.4.0" [target.'cfg(windows)'.dependencies] winapi = "0.3.9" diff --git a/fastfield_codecs/src/bitpacked.rs b/fastfield_codecs/src/bitpacked.rs index 5509a78a7a..c3d8e4c4e0 100644 --- a/fastfield_codecs/src/bitpacked.rs +++ b/fastfield_codecs/src/bitpacked.rs @@ -105,9 +105,9 @@ impl FastFieldCodecSerializer for BitpackedFastFieldSerializer { /// It requires a `min_value` and a `max_value` to compute /// compute the minimum number of bits required to encode /// values. - fn serialize( - write: &mut impl Write, - _fastfield_accessor: &impl FastFieldDataAccess, + fn serialize( + write: &mut W, + _fastfield_accessor: &dyn FastFieldDataAccess, stats: FastFieldStats, data_iter: impl Iterator, _data_iter1: impl Iterator, diff --git a/fastfield_codecs/src/lib.rs b/fastfield_codecs/src/lib.rs index 9285321ea9..8fdcfef253 100644 --- a/fastfield_codecs/src/lib.rs +++ b/fastfield_codecs/src/lib.rs @@ -40,13 +40,15 @@ pub trait FastFieldCodecSerializer { /// Serializes the data using the serializer into write. /// There are multiple iterators, in case the codec needs to read the data multiple times. /// The iterators should be preferred over using fastfield_accessor for performance reasons. - fn serialize( - write: &mut impl Write, - fastfield_accessor: &impl FastFieldDataAccess, + fn serialize( + write: &mut W, + fastfield_accessor: &dyn FastFieldDataAccess, stats: FastFieldStats, data_iter: impl Iterator, data_iter1: impl Iterator, - ) -> io::Result<()>; + ) -> io::Result<()> + where + W: Write; } /// FastFieldDataAccess is the trait to access fast field data during serialization and estimation. diff --git a/fastfield_codecs/src/linearinterpol.rs b/fastfield_codecs/src/linearinterpol.rs index b4d0bb4801..e3a5e083f1 100644 --- a/fastfield_codecs/src/linearinterpol.rs +++ b/fastfield_codecs/src/linearinterpol.rs @@ -109,13 +109,16 @@ impl FastFieldCodecSerializer for LinearInterpolFastFieldSerializer { const NAME: &'static str = "LinearInterpol"; const ID: u8 = 2; /// Creates a new fast field serializer. - fn serialize( - write: &mut impl Write, - fastfield_accessor: &impl FastFieldDataAccess, + fn serialize( + write: &mut W, + fastfield_accessor: &dyn FastFieldDataAccess, stats: FastFieldStats, data_iter: impl Iterator, data_iter1: impl Iterator, - ) -> io::Result<()> { + ) -> io::Result<()> + where + W: Write, + { assert!(stats.min_value <= stats.max_value); let first_val = fastfield_accessor.get_val(0); diff --git a/fastfield_codecs/src/multilinearinterpol.rs b/fastfield_codecs/src/multilinearinterpol.rs index d1f122bf04..405732e185 100644 --- a/fastfield_codecs/src/multilinearinterpol.rs +++ b/fastfield_codecs/src/multilinearinterpol.rs @@ -75,6 +75,7 @@ impl BinarySerializable for Function { self.positive_val_offset.serialize(write)?; self.slope.serialize(write)?; self.num_bits.serialize(write)?; + Ok(()) } @@ -193,13 +194,16 @@ impl FastFieldCodecSerializer for MultiLinearInterpolFastFieldSerializer { const NAME: &'static str = "MultiLinearInterpol"; const ID: u8 = 3; /// Creates a new fast field serializer. - fn serialize( - write: &mut impl Write, - fastfield_accessor: &impl FastFieldDataAccess, + fn serialize( + write: &mut W, + fastfield_accessor: &dyn FastFieldDataAccess, stats: FastFieldStats, data_iter: impl Iterator, _data_iter1: impl Iterator, - ) -> io::Result<()> { + ) -> io::Result<()> + where + W: Write, + { assert!(stats.min_value <= stats.max_value); let first_val = fastfield_accessor.get_val(0); diff --git a/src/fastfield/bytes/mod.rs b/src/fastfield/bytes/mod.rs index 37bda14f9f..12858e3c77 100644 --- a/src/fastfield/bytes/mod.rs +++ b/src/fastfield/bytes/mod.rs @@ -11,7 +11,7 @@ mod tests { use crate::{DocAddress, DocSet, Index, Searcher, Term}; #[test] - fn test_bytes() -> crate::Result<()> { + fn test_bytes2() -> crate::Result<()> { let mut schema_builder = Schema::builder(); let bytes_field = schema_builder.add_bytes_field("bytesfield", FAST); let schema = schema_builder.build(); diff --git a/src/fastfield/mod.rs b/src/fastfield/mod.rs index 004a5328e6..8d0effa1e6 100644 --- a/src/fastfield/mod.rs +++ b/src/fastfield/mod.rs @@ -276,10 +276,17 @@ mod tests { schema_builder.build() }); + pub static SCHEMAI64: Lazy = Lazy::new(|| { + let mut schema_builder = Schema::builder(); + schema_builder.add_i64_field("field", FAST); + schema_builder.build() + }); + pub static FIELD: Lazy = Lazy::new(|| SCHEMA.get_field("field").unwrap()); + pub static FIELDI64: Lazy = Lazy::new(|| SCHEMAI64.get_field("field").unwrap()); #[test] - pub fn test_fastfield() { + pub fn test_fastfield2() { let test_fastfield = DynamicFastFieldReader::::from(vec![100, 200, 300]); assert_eq!(test_fastfield.get(0), 100); assert_eq!(test_fastfield.get(1), 200); @@ -309,7 +316,7 @@ mod tests { serializer.close().unwrap(); } let file = directory.open_read(path).unwrap(); - assert_eq!(file.len(), 37); + assert_eq!(file.len(), 55); let composite_file = CompositeFile::open(&file)?; let file = composite_file.open_read(*FIELD).unwrap(); let fast_field_reader = DynamicFastFieldReader::::open(file)?; @@ -340,7 +347,7 @@ mod tests { serializer.close()?; } let file = directory.open_read(path)?; - assert_eq!(file.len(), 62); + assert_eq!(file.len(), 80); { let fast_fields_composite = CompositeFile::open(&file)?; let data = fast_fields_composite.open_read(*FIELD).unwrap(); @@ -376,7 +383,7 @@ mod tests { serializer.close().unwrap(); } let file = directory.open_read(path).unwrap(); - assert_eq!(file.len(), 35); + assert_eq!(file.len(), 53); { let fast_fields_composite = CompositeFile::open(&file).unwrap(); let data = fast_fields_composite.open_read(*FIELD).unwrap(); @@ -408,7 +415,7 @@ mod tests { serializer.close().unwrap(); } let file = directory.open_read(path).unwrap(); - assert_eq!(file.len(), 80043); + assert_eq!(file.len(), 80061); { let fast_fields_composite = CompositeFile::open(&file)?; let data = fast_fields_composite.open_read(*FIELD).unwrap(); @@ -448,7 +455,8 @@ mod tests { } let file = directory.open_read(path).unwrap(); // assert_eq!(file.len(), 17710 as usize); //bitpacked size - assert_eq!(file.len(), 10175_usize); // linear interpol size + // assert_eq!(file.len(), 10201_usize); // linear interpol size, before gcd = min_value + assert_eq!(file.len(), 93_usize); { let fast_fields_composite = CompositeFile::open(&file)?; let data = fast_fields_composite.open_read(i64_field).unwrap(); @@ -505,10 +513,15 @@ mod tests { permutation } - #[test] - fn test_intfastfield_permutation() -> crate::Result<()> { + // Warning: this generates the same permutation at each call + pub fn generate_permutation_gcd() -> Vec { + let mut permutation: Vec = (1u64..100_000u64).map(|el| el * 1000).collect(); + permutation.shuffle(&mut StdRng::from_seed([1u8; 32])); + permutation + } + + fn test_intfastfield_permutation_with_data(permutation: Vec) -> crate::Result<()> { let path = Path::new("test"); - let permutation = generate_permutation(); let n = permutation.len(); let directory = RamDirectory::create(); { @@ -527,15 +540,27 @@ mod tests { let data = fast_fields_composite.open_read(*FIELD).unwrap(); let fast_field_reader = DynamicFastFieldReader::::open(data)?; - let mut a = 0u64; - for _ in 0..n { + for a in 0..n { assert_eq!(fast_field_reader.get(a as u32), permutation[a as usize]); - a = fast_field_reader.get(a as u32); } } Ok(()) } + #[test] + fn test_intfastfield_permutation_gcd() -> crate::Result<()> { + let permutation = generate_permutation_gcd(); + test_intfastfield_permutation_with_data(permutation)?; + Ok(()) + } + + #[test] + fn test_intfastfield_permutation() -> crate::Result<()> { + let permutation = generate_permutation(); + test_intfastfield_permutation_with_data(permutation)?; + Ok(()) + } + #[test] fn test_merge_missing_date_fast_field() -> crate::Result<()> { let mut schema_builder = Schema::builder(); @@ -861,7 +886,7 @@ mod tests { serializer.close().unwrap(); } let file = directory.open_read(path).unwrap(); - assert_eq!(file.len(), 36); + assert_eq!(file.len(), 54); let composite_file = CompositeFile::open(&file)?; let file = composite_file.open_read(field).unwrap(); let fast_field_reader = DynamicFastFieldReader::::open(file)?; @@ -897,7 +922,7 @@ mod tests { serializer.close().unwrap(); } let file = directory.open_read(path).unwrap(); - assert_eq!(file.len(), 48); + assert_eq!(file.len(), 66); let composite_file = CompositeFile::open(&file)?; let file = composite_file.open_read(field).unwrap(); let fast_field_reader = DynamicFastFieldReader::::open(file)?; @@ -931,7 +956,7 @@ mod tests { serializer.close().unwrap(); } let file = directory.open_read(path).unwrap(); - assert_eq!(file.len(), 35); + assert_eq!(file.len(), 53); let composite_file = CompositeFile::open(&file)?; let file = composite_file.open_read(field).unwrap(); let fast_field_reader = DynamicFastFieldReader::::open(file)?; @@ -946,6 +971,7 @@ mod bench { use std::collections::HashMap; use std::path::Path; + use crate::fastfield::tests::generate_permutation_gcd; use test::{self, Bencher}; use super::tests::{generate_permutation, FIELD, SCHEMA}; @@ -1046,4 +1072,38 @@ mod bench { }); } } + + #[bench] + fn bench_intfastfield_fflookup_gcd(b: &mut Bencher) { + let path = Path::new("test"); + let permutation = generate_permutation_gcd(); + let directory: RamDirectory = RamDirectory::create(); + { + let write: WritePtr = directory.open_write(Path::new("test")).unwrap(); + let mut serializer = CompositeFastFieldSerializer::from_write(write).unwrap(); + let mut fast_field_writers = FastFieldsWriter::from_schema(&SCHEMA); + for &x in &permutation { + fast_field_writers.add_document(&doc!(*FIELD=>x)); + } + fast_field_writers + .serialize(&mut serializer, &HashMap::new(), None) + .unwrap(); + serializer.close().unwrap(); + } + let file = directory.open_read(&path).unwrap(); + { + let fast_fields_composite = CompositeFile::open(&file).unwrap(); + let data = fast_fields_composite.open_read(*FIELD).unwrap(); + let fast_field_reader = DynamicFastFieldReader::::open(data).unwrap(); + + b.iter(|| { + let n = test::black_box(1000u32); + let mut a = 0u32; + for _ in 0u32..n { + a = fast_field_reader.get(a) as u32; + } + a + }); + } + } } diff --git a/src/fastfield/multivalued/mod.rs b/src/fastfield/multivalued/mod.rs index 69870d0324..c7ba3313a5 100644 --- a/src/fastfield/multivalued/mod.rs +++ b/src/fastfield/multivalued/mod.rs @@ -346,6 +346,13 @@ mod tests { assert!(test_multivalued_no_panic(&ops[..]).is_ok()); } } + #[test] + fn test_multivalued_proptest_gcd() { + use IndexingOp::*; + let ops = [AddDoc { id: 9 }, AddDoc { id: 9 }, Merge]; + + assert!(test_multivalued_no_panic(&ops[..]).is_ok()); + } #[test] fn test_multivalued_proptest_off_by_one_bug_1151() { diff --git a/src/fastfield/reader.rs b/src/fastfield/reader.rs index eeb6b3d9bc..3e1da384bc 100644 --- a/src/fastfield/reader.rs +++ b/src/fastfield/reader.rs @@ -14,6 +14,7 @@ use fastfield_codecs::multilinearinterpol::{ }; use fastfield_codecs::{FastFieldCodecReader, FastFieldCodecSerializer}; +use super::serializer::FF_HEADER_MAGIC_NUMBER; use super::FastValue; use crate::directory::{CompositeFile, Directory, FileSlice, OwnedBytes, RamDirectory, WritePtr}; use crate::fastfield::{CompositeFastFieldSerializer, FastFieldsWriter}; @@ -61,6 +62,34 @@ pub trait FastFieldReader: Clone { fn max_value(&self) -> Item; } +struct FFHeader { + field_id: u8, + gcd: u64, + min_value: u64, +} + +fn read_header(bytes: &mut OwnedBytes) -> FFHeader { + let magic_number_or_field_id = bytes.read_u8(); + if magic_number_or_field_id == FF_HEADER_MAGIC_NUMBER { + let _header_version = bytes.read_u8(); + let field_id = bytes.read_u8(); + let gcd = bytes.read_u64(); + let min_value = bytes.read_u64(); + FFHeader { + field_id, + gcd, + min_value, + } + } else { + // old version + FFHeader { + field_id: magic_number_or_field_id, + gcd: 1, + min_value: 0, + } + } +} + #[derive(Clone)] /// DynamicFastFieldReader wraps different readers to access /// the various encoded fastfield data @@ -75,29 +104,35 @@ pub enum DynamicFastFieldReader { impl DynamicFastFieldReader { /// Returns correct the reader wrapped in the `DynamicFastFieldReader` enum for the data. - pub fn open(file: FileSlice) -> crate::Result> { - let mut bytes = file.read_bytes()?; - let id = bytes.read_u8(); - + pub fn open_from_id( + bytes: OwnedBytes, + id: u8, + gcd: u64, + min_value: u64, + ) -> crate::Result> { let reader = match id { BitpackedFastFieldSerializer::ID => { DynamicFastFieldReader::Bitpacked(FastFieldReaderCodecWrapper::< Item, BitpackedReader, - >::open_from_bytes(bytes)?) + >::open_from_bytes( + bytes, gcd, min_value + )?) } LinearInterpolFastFieldSerializer::ID => { DynamicFastFieldReader::LinearInterpol(FastFieldReaderCodecWrapper::< Item, LinearInterpolFastFieldReader, - >::open_from_bytes(bytes)?) + >::open_from_bytes( + bytes, gcd, min_value + )?) } MultiLinearInterpolFastFieldSerializer::ID => { DynamicFastFieldReader::MultiLinearInterpol(FastFieldReaderCodecWrapper::< Item, MultiLinearInterpolFastFieldReader, >::open_from_bytes( - bytes + bytes, gcd, min_value )?) } _ => { @@ -109,6 +144,13 @@ impl DynamicFastFieldReader { }; Ok(reader) } + /// Returns correct the reader wrapped in the `DynamicFastFieldReader` enum for the data. + pub fn open(file: FileSlice) -> crate::Result> { + let mut bytes = file.read_bytes()?; + let header = read_header(&mut bytes); + + Self::open_from_id(bytes, header.field_id, header.gcd, header.min_value) + } } impl FastFieldReader for DynamicFastFieldReader { @@ -149,6 +191,8 @@ impl FastFieldReader for DynamicFastFieldReader { /// Holds the data and the codec to the read the data. #[derive(Clone)] pub struct FastFieldReaderCodecWrapper { + gcd: u64, + min_value: u64, reader: CodecReader, bytes: OwnedBytes, _phantom: PhantomData, @@ -158,19 +202,22 @@ impl FastFieldReaderCodecWrapper crate::Result { let mut bytes = file.read_bytes()?; - let id = u8::deserialize(&mut bytes)?; + let header = read_header(&mut bytes); + let id = header.field_id; assert_eq!( BitpackedFastFieldSerializer::ID, id, "Tried to open fast field as bitpacked encoded (id=1), but got serializer with \ different id" ); - Self::open_from_bytes(bytes) + Self::open_from_bytes(bytes, header.gcd, header.min_value) } /// Opens a fast field given the bytes. - pub fn open_from_bytes(bytes: OwnedBytes) -> crate::Result { + pub fn open_from_bytes(bytes: OwnedBytes, gcd: u64, min_value: u64) -> crate::Result { let reader = C::open_from_bytes(bytes.as_slice())?; Ok(FastFieldReaderCodecWrapper { + gcd, + min_value, reader, bytes, _phantom: PhantomData, @@ -178,7 +225,12 @@ impl FastFieldReaderCodecWrapper Item { - Item::from_u64(self.reader.get_u64(doc, self.bytes.as_slice())) + let mut data = self.reader.get_u64(doc, self.bytes.as_slice()); + if self.gcd != 1 { + data *= self.gcd; + } + data += self.min_value; + Item::from_u64(data) } /// Internally `multivalued` also use SingleValue Fast fields. @@ -238,7 +290,7 @@ impl FastFieldReader /// deleted document, and should be considered as an upper bound /// of the actual maximum value. fn min_value(&self) -> Item { - Item::from_u64(self.reader.min_value()) + Item::from_u64(self.reader.min_value() * self.gcd + self.min_value) } /// Returns the maximum value for this fast field. @@ -247,7 +299,7 @@ impl FastFieldReader /// deleted document, and should be considered as an upper bound /// of the actual maximum value. fn max_value(&self) -> Item { - Item::from_u64(self.reader.max_value()) + Item::from_u64((self.reader.max_value() * self.gcd) + self.min_value) } } diff --git a/src/fastfield/serializer/gcd.rs b/src/fastfield/serializer/gcd.rs new file mode 100644 index 0000000000..5753831c95 --- /dev/null +++ b/src/fastfield/serializer/gcd.rs @@ -0,0 +1,226 @@ +use fastfield_codecs::{FastFieldDataAccess, FastFieldStats}; +use gcd::Gcd; +use libdivide::Divider; + +pub const GCD_DEFAULT: u64 = 1; + +fn compute_gcd(vals: &[u64], base: u64) -> u64 { + let mut gcd = (vals[0] - base).gcd(vals[1] - base); + + for el in vals.iter().map(|el| el - base) { + gcd = gcd.gcd(el); + } + gcd +} + +fn is_valid_gcd(vals: impl Iterator, divider: u64, base: u64) -> bool { + if divider <= 1 { + return false; + } + let d = Divider::new(divider).unwrap(); // this is slow + + for val in vals { + let val = val - base; + if val != (val / &d) * divider { + return false; + } + } + true +} + +fn get_samples(fastfield_accessor: &impl FastFieldDataAccess, stats: &FastFieldStats) -> Vec { + // let's sample at 0%, 5%, 10% .. 95%, 100% + let num_samples = stats.num_vals.min(20); + let step_size = 100.0 / num_samples as f32; + let mut sample_values = (0..num_samples) + .map(|idx| (idx as f32 * step_size / 100.0 * stats.num_vals as f32) as usize) + .map(|pos| fastfield_accessor.get_val(pos as u64)) + .collect::>(); + + sample_values.push(stats.min_value); + sample_values.push(stats.max_value); + sample_values +} + +pub(crate) fn find_gcd_from_samples( + samples: &[u64], + vals: impl Iterator, + base: u64, +) -> Option { + let estimate_gcd = compute_gcd(samples, base); + if is_valid_gcd(vals, estimate_gcd, base) { + Some(estimate_gcd) + } else { + None + } +} + +pub(crate) fn find_gcd( + fastfield_accessor: &impl FastFieldDataAccess, + stats: FastFieldStats, + vals: impl Iterator, +) -> Option { + if stats.num_vals == 0 { + return None; + } + + let samples = get_samples(fastfield_accessor, &stats); + find_gcd_from_samples(&samples, vals, stats.min_value) +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + use std::path::Path; + + use common::HasLen; + + use super::*; + use crate::directory::{CompositeFile, RamDirectory, WritePtr}; + use crate::fastfield::serializer::{FastFieldCodecEnableCheck, FastFieldCodecName, ALL_CODECS}; + use crate::fastfield::tests::{FIELD, FIELDI64, SCHEMA, SCHEMAI64}; + use crate::fastfield::{ + CompositeFastFieldSerializer, DynamicFastFieldReader, FastFieldReader, FastFieldsWriter, + }; + use crate::schema::Schema; + use crate::Directory; + + fn get_index( + docs: &[crate::Document], + schema: &Schema, + codec_enable_checker: FastFieldCodecEnableCheck, + ) -> crate::Result { + let directory: RamDirectory = RamDirectory::create(); + { + let write: WritePtr = directory.open_write(Path::new("test")).unwrap(); + let mut serializer = + CompositeFastFieldSerializer::from_write_with_codec(write, codec_enable_checker) + .unwrap(); + let mut fast_field_writers = FastFieldsWriter::from_schema(schema); + for doc in docs { + fast_field_writers.add_document(doc); + } + fast_field_writers + .serialize(&mut serializer, &HashMap::new(), None) + .unwrap(); + serializer.close().unwrap(); + } + Ok(directory) + } + + fn test_fastfield_gcd_i64_with_codec( + codec_name: FastFieldCodecName, + num_vals: usize, + ) -> crate::Result<()> { + let path = Path::new("test"); + let mut docs = vec![]; + for i in 1..=num_vals { + let val = i as i64 * 1000i64; + docs.push(doc!(*FIELDI64=>val)); + } + let directory = get_index(&docs, &SCHEMAI64, codec_name.clone().into())?; + let file = directory.open_read(path).unwrap(); + // assert_eq!(file.len(), 118); + let composite_file = CompositeFile::open(&file)?; + let file = composite_file.open_read(*FIELD).unwrap(); + let fast_field_reader = DynamicFastFieldReader::::open(file)?; + assert_eq!(fast_field_reader.get(0), 1000i64); + assert_eq!(fast_field_reader.get(1), 2000i64); + assert_eq!(fast_field_reader.get(2), 3000i64); + assert_eq!(fast_field_reader.max_value(), num_vals as i64 * 1000); + assert_eq!(fast_field_reader.min_value(), 1000i64); + let file = directory.open_read(path).unwrap(); + + // Can't apply gcd + let path = Path::new("test"); + docs.pop(); + docs.push(doc!(*FIELDI64=>2001i64)); + let directory = get_index(&docs, &SCHEMAI64, codec_name.into())?; + let file2 = directory.open_read(path).unwrap(); + assert!(file2.len() > file.len()); + + Ok(()) + } + + #[test] + fn test_fastfield_gcd_i64() -> crate::Result<()> { + for codec_name in ALL_CODECS { + test_fastfield_gcd_i64_with_codec(codec_name.clone(), 5005)?; + } + Ok(()) + } + + fn test_fastfield_gcd_u64_with_codec( + codec_name: FastFieldCodecName, + num_vals: usize, + ) -> crate::Result<()> { + let path = Path::new("test"); + let mut docs = vec![]; + for i in 1..=num_vals { + let val = i as u64 * 1000u64; + docs.push(doc!(*FIELD=>val)); + } + let directory = get_index(&docs, &SCHEMA, codec_name.clone().into())?; + let file = directory.open_read(path).unwrap(); + // assert_eq!(file.len(), 118); + let composite_file = CompositeFile::open(&file)?; + let file = composite_file.open_read(*FIELD).unwrap(); + let fast_field_reader = DynamicFastFieldReader::::open(file)?; + assert_eq!(fast_field_reader.get(0), 1000u64); + assert_eq!(fast_field_reader.get(1), 2000u64); + assert_eq!(fast_field_reader.get(2), 3000u64); + assert_eq!(fast_field_reader.max_value(), num_vals as u64 * 1000); + assert_eq!(fast_field_reader.min_value(), 1000u64); + let file = directory.open_read(path).unwrap(); + + // Can't apply gcd + let path = Path::new("test"); + docs.pop(); + docs.push(doc!(*FIELDI64=>2001u64)); + let directory = get_index(&docs, &SCHEMA, codec_name.into())?; + let file2 = directory.open_read(path).unwrap(); + assert!(file2.len() > file.len()); + + Ok(()) + } + + #[test] + fn test_fastfield_gcd_u64() -> crate::Result<()> { + for codec_name in ALL_CODECS { + test_fastfield_gcd_u64_with_codec(codec_name.clone(), 5005)?; + } + Ok(()) + } + + #[test] + pub fn test_fastfield2() { + let test_fastfield = DynamicFastFieldReader::::from(vec![100, 200, 300]); + assert_eq!(test_fastfield.get(0), 100); + assert_eq!(test_fastfield.get(1), 200); + assert_eq!(test_fastfield.get(2), 300); + } + + #[test] + fn test_gcd() { + let data = vec![ + 9223372036854775808_u64, + 9223372036854775808, + 9223372036854775808, + ]; + + let gcd = find_gcd_from_samples(&data, data.iter().cloned(), *data.iter().min().unwrap()); + assert_eq!(gcd, None); + } + + #[test] + fn test_gcd2() { + let data = vec![ + 9223372036854775808_u64, + 9223372036854776808, + 9223372036854777808, + ]; + + let gcd = find_gcd_from_samples(&data, data.iter().cloned(), *data.iter().min().unwrap()); + assert_eq!(gcd, Some(1000)); + } +} diff --git a/src/fastfield/serializer/mod.rs b/src/fastfield/serializer/mod.rs index cc53297301..0911b74c9c 100644 --- a/src/fastfield/serializer/mod.rs +++ b/src/fastfield/serializer/mod.rs @@ -1,3 +1,5 @@ +mod gcd; + use std::io::{self, Write}; use common::{BinarySerializable, CountingWriter}; @@ -9,6 +11,7 @@ use fastfield_codecs::multilinearinterpol::MultiLinearInterpolFastFieldSerialize pub use fastfield_codecs::{FastFieldCodecSerializer, FastFieldDataAccess, FastFieldStats}; use crate::directory::{CompositeWrite, WritePtr}; +use crate::fastfield::serializer::gcd::{find_gcd, GCD_DEFAULT}; use crate::schema::Field; /// `CompositeFastFieldSerializer` is in charge of serializing @@ -33,8 +36,45 @@ use crate::schema::Field; /// * `close()` pub struct CompositeFastFieldSerializer { composite_write: CompositeWrite, + codec_enable_checker: FastFieldCodecEnableCheck, +} + +pub struct FastFieldCodecEnableCheck { + enabled_codecs: Vec, +} +impl FastFieldCodecEnableCheck { + fn allow_all() -> Self { + FastFieldCodecEnableCheck { + enabled_codecs: ALL_CODECS.to_vec(), + } + } + fn is_enabled(&self, codec_name: FastFieldCodecName) -> bool { + self.enabled_codecs.contains(&codec_name) + } } +impl From for FastFieldCodecEnableCheck { + fn from(codec_name: FastFieldCodecName) -> Self { + FastFieldCodecEnableCheck { + enabled_codecs: vec![codec_name], + } + } +} + +pub const FF_HEADER_MAGIC_NUMBER: u8 = 123u8; + +#[derive(PartialEq, Eq, PartialOrd, Ord, Debug, Clone)] +enum FastFieldCodecName { + Bitpacked, + LinearInterpol, + BlockwiseLinearInterpol, +} +const ALL_CODECS: &[FastFieldCodecName; 3] = &[ + FastFieldCodecName::Bitpacked, + FastFieldCodecName::LinearInterpol, + FastFieldCodecName::BlockwiseLinearInterpol, +]; + // use this, when this is merged and stabilized explicit_generic_args_with_impl_trait // https://github.com/rust-lang/rust/pull/86176 fn codec_estimation( @@ -52,60 +92,128 @@ fn codec_estimation( impl CompositeFastFieldSerializer { /// Constructor pub fn from_write(write: WritePtr) -> io::Result { + Self::from_write_with_codec(write, FastFieldCodecEnableCheck::allow_all()) + } + + /// Constructor + pub fn from_write_with_codec( + write: WritePtr, + codec_enable_checker: FastFieldCodecEnableCheck, + ) -> io::Result { // just making room for the pointer to header. let composite_write = CompositeWrite::wrap(write); - Ok(CompositeFastFieldSerializer { composite_write }) + Ok(CompositeFastFieldSerializer { + composite_write, + codec_enable_checker, + }) } /// Serialize data into a new u64 fast field. The best compression codec will be chosen /// automatically. - pub fn create_auto_detect_u64_fast_field( + pub fn create_auto_detect_u64_fast_field( &mut self, field: Field, stats: FastFieldStats, fastfield_accessor: impl FastFieldDataAccess, - data_iter_1: impl Iterator, - data_iter_2: impl Iterator, - ) -> io::Result<()> { + iter_gen: F, + ) -> io::Result<()> + where + F: Fn() -> I, + I: Iterator, + { self.create_auto_detect_u64_fast_field_with_idx( field, stats, fastfield_accessor, - data_iter_1, - data_iter_2, + iter_gen, 0, ) } + + /// Serialize data into a new u64 fast field. The best compression codec will be chosen + /// automatically. + pub fn write_header( + field_write: &mut W, + field_id: u8, + stats: FastFieldStats, + gcd: Option, + ) -> io::Result<()> { + FF_HEADER_MAGIC_NUMBER.serialize(field_write)?; + let header_version = 1_u8; + header_version.serialize(field_write)?; + + field_id.serialize(field_write)?; + gcd.unwrap_or(GCD_DEFAULT).serialize(field_write)?; + stats.min_value.serialize(field_write)?; + + Ok(()) + } + /// Serialize data into a new u64 fast field. The best compression codec will be chosen /// automatically. - pub fn create_auto_detect_u64_fast_field_with_idx( + pub fn create_auto_detect_u64_fast_field_with_idx( &mut self, field: Field, stats: FastFieldStats, fastfield_accessor: impl FastFieldDataAccess, - data_iter_1: impl Iterator, - data_iter_2: impl Iterator, + iter_gen: F, idx: usize, - ) -> io::Result<()> { + ) -> io::Result<()> + where + F: Fn() -> I, + I: Iterator, + { let field_write = self.composite_write.for_field_with_idx(field, idx); + struct WrappedFFAccess { + fastfield_accessor: T, + min_value: u64, + gcd: u64, + } + impl FastFieldDataAccess for WrappedFFAccess { + fn get_val(&self, position: u64) -> u64 { + (self.fastfield_accessor.get_val(position) - self.min_value) / self.gcd + } + } + let gcd = find_gcd(&fastfield_accessor, stats.clone(), iter_gen()).unwrap_or(GCD_DEFAULT); + let fastfield_accessor = WrappedFFAccess { + fastfield_accessor, + min_value: stats.min_value, + gcd, + }; + let mut estimations = vec![]; - codec_estimation::( - stats.clone(), - &fastfield_accessor, - &mut estimations, - ); - codec_estimation::( - stats.clone(), - &fastfield_accessor, - &mut estimations, - ); - codec_estimation::( - stats.clone(), - &fastfield_accessor, - &mut estimations, - ); + if self + .codec_enable_checker + .is_enabled(FastFieldCodecName::Bitpacked) + { + codec_estimation::( + stats.clone(), + &fastfield_accessor, + &mut estimations, + ); + } + if self + .codec_enable_checker + .is_enabled(FastFieldCodecName::LinearInterpol) + { + codec_estimation::( + stats.clone(), + &fastfield_accessor, + &mut estimations, + ); + } + if self + .codec_enable_checker + .is_enabled(FastFieldCodecName::BlockwiseLinearInterpol) + { + codec_estimation::( + stats.clone(), + &fastfield_accessor, + &mut estimations, + ); + } if let Some(broken_estimation) = estimations.iter().find(|estimation| estimation.0.is_nan()) { warn!( @@ -122,15 +230,27 @@ impl CompositeFastFieldSerializer { "choosing fast field codec {} for field_id {:?}", name, field ); // todo print actual field name - id.serialize(field_write)?; + + Self::write_header(field_write, id, stats.clone(), Some(gcd))?; + let min_value = stats.min_value; + // let min_value = 0; + let stats = FastFieldStats { + min_value: 0, + max_value: (stats.max_value - stats.min_value) / gcd, + num_vals: stats.num_vals, + }; + let iter1 = iter_gen().map(|val| (val - min_value) / gcd); + let iter2 = iter_gen().map(|val| (val - min_value) / gcd); + // let iter1 = iter_gen(); + // let iter2 = iter_gen(); match name { BitpackedFastFieldSerializer::NAME => { BitpackedFastFieldSerializer::serialize( field_write, &fastfield_accessor, stats, - data_iter_1, - data_iter_2, + iter1, + iter2, )?; } LinearInterpolFastFieldSerializer::NAME => { @@ -138,8 +258,8 @@ impl CompositeFastFieldSerializer { field_write, &fastfield_accessor, stats, - data_iter_1, - data_iter_2, + iter1, + iter2, )?; } MultiLinearInterpolFastFieldSerializer::NAME => { @@ -147,19 +267,29 @@ impl CompositeFastFieldSerializer { field_write, &fastfield_accessor, stats, - data_iter_1, - data_iter_2, + iter1, + iter2, )?; } _ => { panic!("unknown fastfield serializer {}", name) } - }; + } field_write.flush()?; Ok(()) } + /// Start serializing a new u64 fast field + pub fn serialize_into( + &mut self, + field: Field, + min_value: u64, + max_value: u64, + ) -> io::Result>> { + self.new_u64_fast_field_with_idx(field, min_value, max_value, 0) + } + /// Start serializing a new u64 fast field pub fn new_u64_fast_field( &mut self, diff --git a/src/fastfield/writer.rs b/src/fastfield/writer.rs index 4d1b5d3467..f5d1d4e014 100644 --- a/src/fastfield/writer.rs +++ b/src/fastfield/writer.rs @@ -370,23 +370,25 @@ impl IntFastFieldWriter { }; if let Some(doc_id_map) = doc_id_map { - let iter = doc_id_map - .iter_old_doc_ids() - .map(|doc_id| self.vals.get(doc_id as usize)); + let iter_gen = || { + doc_id_map + .iter_old_doc_ids() + .map(|doc_id| self.vals.get(doc_id as usize)) + }; serializer.create_auto_detect_u64_fast_field( self.field, stats, fastfield_accessor, - iter.clone(), - iter, + iter_gen, )?; } else { + let iter_gen = || self.vals.iter(); + serializer.create_auto_detect_u64_fast_field( self.field, stats, fastfield_accessor, - self.vals.iter(), - self.vals.iter(), + iter_gen, )?; }; Ok(()) diff --git a/src/indexer/merger.rs b/src/indexer/merger.rs index 2ac6ec339b..8b5d60b99a 100644 --- a/src/indexer/merger.rs +++ b/src/indexer/merger.rs @@ -385,20 +385,17 @@ impl IndexMerger { doc_id_mapping, fast_field_readers: &fast_field_readers, }; - let iter1 = doc_id_mapping.iter().map(|(doc_id, reader_ordinal)| { - let fast_field_reader = &fast_field_readers[*reader_ordinal as usize]; - fast_field_reader.get(*doc_id) - }); - let iter2 = doc_id_mapping.iter().map(|(doc_id, reader_ordinal)| { - let fast_field_reader = &fast_field_readers[*reader_ordinal as usize]; - fast_field_reader.get(*doc_id) - }); + let iter_gen = || { + doc_id_mapping.iter().map(|(doc_id, reader_ordinal)| { + let fast_field_reader = &fast_field_readers[*reader_ordinal as usize]; + fast_field_reader.get(*doc_id) + }) + }; fast_field_serializer.create_auto_detect_u64_fast_field( field, stats, fastfield_accessor, - iter1, - iter2, + iter_gen, )?; Ok(()) @@ -560,12 +557,12 @@ impl IndexMerger { } offsets.push(offset); + let iter_gen = || offsets.iter().cloned(); fast_field_serializer.create_auto_detect_u64_fast_field( field, stats, &offsets[..], - offsets.iter().cloned(), - offsets.iter().cloned(), + iter_gen, )?; Ok(offsets) } @@ -768,24 +765,19 @@ impl IndexMerger { fast_field_readers: &ff_readers, offsets, }; - let iter1 = doc_id_mapping.iter().flat_map(|(doc_id, reader_ordinal)| { - let ff_reader = &ff_readers[*reader_ordinal as usize]; - let mut vals = vec![]; - ff_reader.get_vals(*doc_id, &mut vals); - vals.into_iter() - }); - let iter2 = doc_id_mapping.iter().flat_map(|(doc_id, reader_ordinal)| { - let ff_reader = &ff_readers[*reader_ordinal as usize]; - let mut vals = vec![]; - ff_reader.get_vals(*doc_id, &mut vals); - vals.into_iter() - }); + let iter_gen = || { + doc_id_mapping.iter().flat_map(|(doc_id, reader_ordinal)| { + let ff_reader = &ff_readers[*reader_ordinal as usize]; + let mut vals = vec![]; + ff_reader.get_vals(*doc_id, &mut vals); + vals.into_iter() + }) + }; fast_field_serializer.create_auto_detect_u64_fast_field_with_idx( field, stats, fastfield_accessor, - iter1, - iter2, + iter_gen, 1, )?;