diff --git a/fastfield_codecs/benches/bench.rs b/fastfield_codecs/benches/bench.rs index 0bf46ae6e0..d56cbc51e1 100644 --- a/fastfield_codecs/benches/bench.rs +++ b/fastfield_codecs/benches/bench.rs @@ -100,7 +100,8 @@ mod tests { fn get_u128_column_from_data(data: &[u128]) -> Arc> { let mut out = vec![]; - serialize_u128(VecColumn::from(&data), &mut out).unwrap(); + let iter = || data.iter().cloned(); + serialize_u128(iter, data.len() as u64, &mut out).unwrap(); let out = OwnedBytes::new(out); open_u128::(out).unwrap() } diff --git a/fastfield_codecs/src/compact_space/mod.rs b/fastfield_codecs/src/compact_space/mod.rs index 72283bb481..dd6dfbdbbe 100644 --- a/fastfield_codecs/src/compact_space/mod.rs +++ b/fastfield_codecs/src/compact_space/mod.rs @@ -171,10 +171,10 @@ pub struct IPCodecParams { impl CompactSpaceCompressor { /// Taking the vals as Vec may cost a lot of memory. It is used to sort the vals. - pub fn train_from(column: &impl Column) -> Self { + pub fn train_from(iter: impl Iterator, num_vals: u64) -> Self { let mut values_sorted = BTreeSet::new(); - values_sorted.extend(column.iter()); - let total_num_values = column.num_vals(); + values_sorted.extend(iter); + let total_num_values = num_vals; let compact_space = get_compact_space(&values_sorted, total_num_values, COST_PER_BLANK_IN_BITS); @@ -443,7 +443,7 @@ impl CompactSpaceDecompressor { mod tests { use super::*; - use crate::{open_u128, serialize_u128, VecColumn}; + use crate::{open_u128, serialize_u128}; #[test] fn compact_space_test() { @@ -513,7 +513,12 @@ mod tests { fn test_aux_vals(u128_vals: &[u128]) -> OwnedBytes { let mut out = Vec::new(); - serialize_u128(VecColumn::from(u128_vals), &mut out).unwrap(); + serialize_u128( + || u128_vals.iter().cloned(), + u128_vals.len() as u64, + &mut out, + ) + .unwrap(); let data = OwnedBytes::new(out); test_all(data.clone(), u128_vals); @@ -603,7 +608,7 @@ mod tests { 5_000_000_000, ]; let mut out = Vec::new(); - serialize_u128(VecColumn::from(vals), &mut out).unwrap(); + serialize_u128(|| vals.iter().cloned(), vals.len() as u64, &mut out).unwrap(); let decomp = open_u128::(OwnedBytes::new(out)).unwrap(); assert_eq!(decomp.get_between_vals(199..=200), vec![0]); diff --git a/fastfield_codecs/src/main.rs b/fastfield_codecs/src/main.rs index 7b963dc128..8e81c41f5f 100644 --- a/fastfield_codecs/src/main.rs +++ b/fastfield_codecs/src/main.rs @@ -90,7 +90,7 @@ fn bench_ip() { { let mut data = vec![]; for dataset in dataset.chunks(500_000) { - serialize_u128(VecColumn::from(dataset), &mut data).unwrap(); + serialize_u128(|| dataset.iter().cloned(), dataset.len() as u64, &mut data).unwrap(); } let compression = data.len() as f64 / (dataset.len() * 16) as f64; println!("Compression 50_000 chunks {:.4}", compression); @@ -101,7 +101,10 @@ fn bench_ip() { } let mut data = vec![]; - serialize_u128(VecColumn::from(&dataset), &mut data).unwrap(); + { + print_time!("creation"); + serialize_u128(|| dataset.iter().cloned(), dataset.len() as u64, &mut data).unwrap(); + } let compression = data.len() as f64 / (dataset.len() * 16) as f64; println!("Compression {:.2}", compression); diff --git a/fastfield_codecs/src/serialize.rs b/fastfield_codecs/src/serialize.rs index 92f55f5d0f..9f1188f511 100644 --- a/fastfield_codecs/src/serialize.rs +++ b/fastfield_codecs/src/serialize.rs @@ -142,15 +142,14 @@ pub fn estimate( } } -pub fn serialize_u128( - typed_column: impl Column, +pub fn serialize_u128 I, I: Iterator>( + iter_gen: F, + num_vals: u64, output: &mut impl io::Write, ) -> io::Result<()> { // TODO write header, to later support more codecs - let compressor = CompactSpaceCompressor::train_from(&typed_column); - compressor - .compress_into(typed_column.iter(), output) - .unwrap(); + let compressor = CompactSpaceCompressor::train_from(iter_gen(), num_vals); + compressor.compress_into(iter_gen(), output).unwrap(); Ok(()) } diff --git a/src/fastfield/multivalued/writer.rs b/src/fastfield/multivalued/writer.rs index a78c4c5c53..d4fee0df1f 100644 --- a/src/fastfield/multivalued/writer.rs +++ b/src/fastfield/multivalued/writer.rs @@ -409,15 +409,8 @@ impl MultiValueU128FastFieldWriter { { let field_write = serializer.get_field_writer(self.field, 1); - let mut values = Vec::with_capacity(self.vals.len()); - for vals in self.get_ordered_values(doc_id_map) { - for &val in vals { - values.push(val); - } - } - let col = VecColumn::from(&values[..]); - - serialize_u128(col, field_write)?; + let iter = || self.get_ordered_values(doc_id_map).flatten().cloned(); + serialize_u128(iter, self.vals.len() as u64, field_write)?; } Ok(()) } diff --git a/src/fastfield/writer.rs b/src/fastfield/writer.rs index 5f9dec1927..0ee11d606b 100644 --- a/src/fastfield/writer.rs +++ b/src/fastfield/writer.rs @@ -364,66 +364,32 @@ impl U128FastFieldWriter { } } - struct RemappedFFWriter<'a> { - doc_id_map: Option<&'a DocIdMapping>, - null_values: &'a RoaringBitmap, - vals: &'a [u128], - idx_to_val_idx: Vec, - val_count: u32, - } - impl<'a> Column for RemappedFFWriter<'a> { - fn get_val(&self, _idx: u64) -> u128 { - // unused by codec - unreachable!() - } - - fn min_value(&self) -> u128 { - // unused by codec - unreachable!() - } - - fn max_value(&self) -> u128 { - // unused by codec - unreachable!() - } + let field_write = serializer.get_field_writer(self.field, 0); - fn num_vals(&self) -> u64 { - self.val_count as u64 - } - fn iter(&self) -> Box + '_> { - if let Some(doc_id_map) = self.doc_id_map { - let iter = doc_id_map.iter_old_doc_ids().map(|idx| { - if self.null_values.contains(idx as u32) { - 0 // TODO properly handle nulls - } else { - self.vals[self.idx_to_val_idx[idx as usize] as usize] - } - }); - Box::new(iter) - } else { - let iter = (0..self.val_count).map(|idx| { - if self.null_values.contains(idx as u32) { - 0 // TODO properly handle nulls - } else { - self.vals[self.idx_to_val_idx[idx as usize] as usize] - } - }); - Box::new(iter) - } - } + if let Some(doc_id_map) = doc_id_map { + let iter = || { + doc_id_map.iter_old_doc_ids().map(|idx| { + if self.null_values.contains(idx as u32) { + 0 // TODO properly handle nulls + } else { + self.vals[idx_to_val_idx[idx as usize] as usize] + } + }) + }; + serialize_u128(iter, self.val_count as u64, field_write)?; + } else { + let iter = || { + (0..self.val_count).map(|idx| { + if self.null_values.contains(idx as u32) { + 0 // TODO properly handle nulls + } else { + self.vals[idx_to_val_idx[idx as usize] as usize] + } + }) + }; + serialize_u128(iter, self.val_count as u64, field_write)?; } - let column = RemappedFFWriter { - doc_id_map, - null_values: &self.null_values, - vals: &self.vals, - idx_to_val_idx, - val_count: self.val_count, - }; - - let field_write = serializer.get_field_writer(self.field, 0); - serialize_u128(column, field_write)?; - Ok(()) } } diff --git a/src/indexer/merger.rs b/src/indexer/merger.rs index 8872c27ede..c15a0212b2 100644 --- a/src/indexer/merger.rs +++ b/src/indexer/merger.rs @@ -354,49 +354,16 @@ impl IndexMerger { .map(|(_, ff_reader)| ff_reader) .collect::>(); - struct RemappedFFReader<'a> { - doc_id_mapping: &'a SegmentDocIdMapping, - fast_field_readers: Vec>, - } - impl<'a> Column for RemappedFFReader<'a> { - fn get_val(&self, _idx: u64) -> u128 { - // unused by codec - unreachable!() - } - - fn min_value(&self) -> u128 { - // unused by codec - unreachable!() - } - - fn max_value(&self) -> u128 { - // unused by codec - unreachable!() - } - - fn num_vals(&self) -> u64 { - self.doc_id_mapping.len() as u64 - } - fn iter<'b>(&'b self) -> Box + 'b> { - Box::new( - self.doc_id_mapping - .iter_old_doc_addrs() - .flat_map(|doc_addr| { - let fast_field_reader = - &self.fast_field_readers[doc_addr.segment_ord as usize]; - let mut out = vec![]; - fast_field_reader.get_vals(doc_addr.doc_id, &mut out); - out.into_iter() - }), - ) - } - } - let column = RemappedFFReader { - doc_id_mapping, - fast_field_readers, + let iter = || { + doc_id_mapping.iter_old_doc_addrs().flat_map(|doc_addr| { + let fast_field_reader = &fast_field_readers[doc_addr.segment_ord as usize]; + let mut out = vec![]; + fast_field_reader.get_vals(doc_addr.doc_id, &mut out); + out.into_iter() + }) }; let field_write = fast_field_serializer.get_field_writer(field, 1); - serialize_u128(column, field_write)?; + serialize_u128(iter, doc_id_mapping.len() as u64, field_write)?; Ok(()) } @@ -420,42 +387,14 @@ impl IndexMerger { }) .collect::>(); - struct RemappedFFReader<'a> { - doc_id_mapping: &'a SegmentDocIdMapping, - fast_field_readers: Vec>>, - } - impl<'a> Column for RemappedFFReader<'a> { - fn get_val(&self, _idx: u64) -> u128 { - // unused by codec - unreachable!() - } - - fn min_value(&self) -> u128 { - // unused by codec - unreachable!() - } - - fn max_value(&self) -> u128 { - // unused by codec - unreachable!() - } - - fn num_vals(&self) -> u64 { - self.doc_id_mapping.len() as u64 - } - fn iter<'b>(&'b self) -> Box + 'b> { - Box::new(self.doc_id_mapping.iter_old_doc_addrs().map(|doc_addr| { - let fast_field_reader = &self.fast_field_readers[doc_addr.segment_ord as usize]; - fast_field_reader.get_val(doc_addr.doc_id as u64) - })) - } - } - let column = RemappedFFReader { - doc_id_mapping, - fast_field_readers, - }; let field_write = fast_field_serializer.get_field_writer(field, 0); - serialize_u128(column, field_write)?; + let iter = || { + doc_id_mapping.iter_old_doc_addrs().map(|doc_addr| { + let fast_field_reader = &fast_field_readers[doc_addr.segment_ord as usize]; + fast_field_reader.get_val(doc_addr.doc_id as u64) + }) + }; + fastfield_codecs::serialize_u128(iter, doc_id_mapping.len() as u64, field_write)?; Ok(()) } diff --git a/src/schema/document.rs b/src/schema/document.rs index 3bde526b1a..b39acedc83 100644 --- a/src/schema/document.rs +++ b/src/schema/document.rs @@ -1,6 +1,7 @@ use std::collections::{HashMap, HashSet}; use std::io::{self, Read, Write}; use std::mem; +use std::net::IpAddr; use common::{BinarySerializable, VInt}; @@ -97,6 +98,11 @@ impl Document { self.add_field_value(field, value); } + /// Add a u64 field + pub fn add_ip(&mut self, field: Field, value: IpAddr) { + self.add_field_value(field, value); + } + /// Add a i64 field pub fn add_i64(&mut self, field: Field, value: i64) { self.add_field_value(field, value);