Skip to content

Commit

Permalink
use iter api
Browse files Browse the repository at this point in the history
  • Loading branch information
PSeitz committed Sep 27, 2022
1 parent 6adf461 commit 6834424
Show file tree
Hide file tree
Showing 8 changed files with 69 additions and 157 deletions.
3 changes: 2 additions & 1 deletion fastfield_codecs/benches/bench.rs
Expand Up @@ -100,7 +100,8 @@ mod tests {

fn get_u128_column_from_data(data: &[u128]) -> Arc<dyn Column<u128>> {
let mut out = vec![];
serialize_u128(VecColumn::from(&data), &mut out).unwrap();
let iter = || data.iter().cloned();
serialize_u128(iter, data.len() as u64, &mut out).unwrap();
let out = OwnedBytes::new(out);
open_u128::<u128>(out).unwrap()
}
Expand Down
17 changes: 11 additions & 6 deletions fastfield_codecs/src/compact_space/mod.rs
Expand Up @@ -171,10 +171,10 @@ pub struct IPCodecParams {

impl CompactSpaceCompressor {
/// Taking the vals as Vec may cost a lot of memory. It is used to sort the vals.
pub fn train_from(column: &impl Column<u128>) -> Self {
pub fn train_from(iter: impl Iterator<Item = u128>, num_vals: u64) -> Self {
let mut values_sorted = BTreeSet::new();
values_sorted.extend(column.iter());
let total_num_values = column.num_vals();
values_sorted.extend(iter);
let total_num_values = num_vals;

let compact_space =
get_compact_space(&values_sorted, total_num_values, COST_PER_BLANK_IN_BITS);
Expand Down Expand Up @@ -443,7 +443,7 @@ impl CompactSpaceDecompressor {
mod tests {

use super::*;
use crate::{open_u128, serialize_u128, VecColumn};
use crate::{open_u128, serialize_u128};

#[test]
fn compact_space_test() {
Expand Down Expand Up @@ -513,7 +513,12 @@ mod tests {

fn test_aux_vals(u128_vals: &[u128]) -> OwnedBytes {
let mut out = Vec::new();
serialize_u128(VecColumn::from(u128_vals), &mut out).unwrap();
serialize_u128(
|| u128_vals.iter().cloned(),
u128_vals.len() as u64,
&mut out,
)
.unwrap();

let data = OwnedBytes::new(out);
test_all(data.clone(), u128_vals);
Expand Down Expand Up @@ -603,7 +608,7 @@ mod tests {
5_000_000_000,
];
let mut out = Vec::new();
serialize_u128(VecColumn::from(vals), &mut out).unwrap();
serialize_u128(|| vals.iter().cloned(), vals.len() as u64, &mut out).unwrap();
let decomp = open_u128::<u128>(OwnedBytes::new(out)).unwrap();

assert_eq!(decomp.get_between_vals(199..=200), vec![0]);
Expand Down
7 changes: 5 additions & 2 deletions fastfield_codecs/src/main.rs
Expand Up @@ -90,7 +90,7 @@ fn bench_ip() {
{
let mut data = vec![];
for dataset in dataset.chunks(500_000) {
serialize_u128(VecColumn::from(dataset), &mut data).unwrap();
serialize_u128(|| dataset.iter().cloned(), dataset.len() as u64, &mut data).unwrap();
}
let compression = data.len() as f64 / (dataset.len() * 16) as f64;
println!("Compression 50_000 chunks {:.4}", compression);
Expand All @@ -101,7 +101,10 @@ fn bench_ip() {
}

let mut data = vec![];
serialize_u128(VecColumn::from(&dataset), &mut data).unwrap();
{
print_time!("creation");
serialize_u128(|| dataset.iter().cloned(), dataset.len() as u64, &mut data).unwrap();
}

let compression = data.len() as f64 / (dataset.len() * 16) as f64;
println!("Compression {:.2}", compression);
Expand Down
11 changes: 5 additions & 6 deletions fastfield_codecs/src/serialize.rs
Expand Up @@ -142,15 +142,14 @@ pub fn estimate<T: MonotonicallyMappableToU64>(
}
}

pub fn serialize_u128(
typed_column: impl Column<u128>,
pub fn serialize_u128<F: Fn() -> I, I: Iterator<Item = u128>>(
iter_gen: F,
num_vals: u64,
output: &mut impl io::Write,
) -> io::Result<()> {
// TODO write header, to later support more codecs
let compressor = CompactSpaceCompressor::train_from(&typed_column);
compressor
.compress_into(typed_column.iter(), output)
.unwrap();
let compressor = CompactSpaceCompressor::train_from(iter_gen(), num_vals);
compressor.compress_into(iter_gen(), output).unwrap();

Ok(())
}
Expand Down
11 changes: 2 additions & 9 deletions src/fastfield/multivalued/writer.rs
Expand Up @@ -409,15 +409,8 @@ impl MultiValueU128FastFieldWriter {
{
let field_write = serializer.get_field_writer(self.field, 1);

let mut values = Vec::with_capacity(self.vals.len());
for vals in self.get_ordered_values(doc_id_map) {
for &val in vals {
values.push(val);
}
}
let col = VecColumn::from(&values[..]);

serialize_u128(col, field_write)?;
let iter = || self.get_ordered_values(doc_id_map).flatten().cloned();
serialize_u128(iter, self.vals.len() as u64, field_write)?;
}
Ok(())
}
Expand Down
80 changes: 23 additions & 57 deletions src/fastfield/writer.rs
Expand Up @@ -364,66 +364,32 @@ impl U128FastFieldWriter {
}
}

struct RemappedFFWriter<'a> {
doc_id_map: Option<&'a DocIdMapping>,
null_values: &'a RoaringBitmap,
vals: &'a [u128],
idx_to_val_idx: Vec<u32>,
val_count: u32,
}
impl<'a> Column<u128> for RemappedFFWriter<'a> {
fn get_val(&self, _idx: u64) -> u128 {
// unused by codec
unreachable!()
}

fn min_value(&self) -> u128 {
// unused by codec
unreachable!()
}

fn max_value(&self) -> u128 {
// unused by codec
unreachable!()
}
let field_write = serializer.get_field_writer(self.field, 0);

fn num_vals(&self) -> u64 {
self.val_count as u64
}
fn iter(&self) -> Box<dyn Iterator<Item = u128> + '_> {
if let Some(doc_id_map) = self.doc_id_map {
let iter = doc_id_map.iter_old_doc_ids().map(|idx| {
if self.null_values.contains(idx as u32) {
0 // TODO properly handle nulls
} else {
self.vals[self.idx_to_val_idx[idx as usize] as usize]
}
});
Box::new(iter)
} else {
let iter = (0..self.val_count).map(|idx| {
if self.null_values.contains(idx as u32) {
0 // TODO properly handle nulls
} else {
self.vals[self.idx_to_val_idx[idx as usize] as usize]
}
});
Box::new(iter)
}
}
if let Some(doc_id_map) = doc_id_map {
let iter = || {
doc_id_map.iter_old_doc_ids().map(|idx| {
if self.null_values.contains(idx as u32) {
0 // TODO properly handle nulls
} else {
self.vals[idx_to_val_idx[idx as usize] as usize]
}
})
};
serialize_u128(iter, self.val_count as u64, field_write)?;
} else {
let iter = || {
(0..self.val_count).map(|idx| {
if self.null_values.contains(idx as u32) {
0 // TODO properly handle nulls
} else {
self.vals[idx_to_val_idx[idx as usize] as usize]
}
})
};
serialize_u128(iter, self.val_count as u64, field_write)?;
}

let column = RemappedFFWriter {
doc_id_map,
null_values: &self.null_values,
vals: &self.vals,
idx_to_val_idx,
val_count: self.val_count,
};

let field_write = serializer.get_field_writer(self.field, 0);
serialize_u128(column, field_write)?;

Ok(())
}
}
Expand Down
91 changes: 15 additions & 76 deletions src/indexer/merger.rs
Expand Up @@ -354,49 +354,16 @@ impl IndexMerger {
.map(|(_, ff_reader)| ff_reader)
.collect::<Vec<_>>();

struct RemappedFFReader<'a> {
doc_id_mapping: &'a SegmentDocIdMapping,
fast_field_readers: Vec<MultiValuedU128FastFieldReader<u128>>,
}
impl<'a> Column<u128> for RemappedFFReader<'a> {
fn get_val(&self, _idx: u64) -> u128 {
// unused by codec
unreachable!()
}

fn min_value(&self) -> u128 {
// unused by codec
unreachable!()
}

fn max_value(&self) -> u128 {
// unused by codec
unreachable!()
}

fn num_vals(&self) -> u64 {
self.doc_id_mapping.len() as u64
}
fn iter<'b>(&'b self) -> Box<dyn Iterator<Item = u128> + 'b> {
Box::new(
self.doc_id_mapping
.iter_old_doc_addrs()
.flat_map(|doc_addr| {
let fast_field_reader =
&self.fast_field_readers[doc_addr.segment_ord as usize];
let mut out = vec![];
fast_field_reader.get_vals(doc_addr.doc_id, &mut out);
out.into_iter()
}),
)
}
}
let column = RemappedFFReader {
doc_id_mapping,
fast_field_readers,
let iter = || {
doc_id_mapping.iter_old_doc_addrs().flat_map(|doc_addr| {
let fast_field_reader = &fast_field_readers[doc_addr.segment_ord as usize];
let mut out = vec![];
fast_field_reader.get_vals(doc_addr.doc_id, &mut out);
out.into_iter()
})
};
let field_write = fast_field_serializer.get_field_writer(field, 1);
serialize_u128(column, field_write)?;
serialize_u128(iter, doc_id_mapping.len() as u64, field_write)?;

Ok(())
}
Expand All @@ -420,42 +387,14 @@ impl IndexMerger {
})
.collect::<Vec<_>>();

struct RemappedFFReader<'a> {
doc_id_mapping: &'a SegmentDocIdMapping,
fast_field_readers: Vec<Arc<dyn Column<u128>>>,
}
impl<'a> Column<u128> for RemappedFFReader<'a> {
fn get_val(&self, _idx: u64) -> u128 {
// unused by codec
unreachable!()
}

fn min_value(&self) -> u128 {
// unused by codec
unreachable!()
}

fn max_value(&self) -> u128 {
// unused by codec
unreachable!()
}

fn num_vals(&self) -> u64 {
self.doc_id_mapping.len() as u64
}
fn iter<'b>(&'b self) -> Box<dyn Iterator<Item = u128> + 'b> {
Box::new(self.doc_id_mapping.iter_old_doc_addrs().map(|doc_addr| {
let fast_field_reader = &self.fast_field_readers[doc_addr.segment_ord as usize];
fast_field_reader.get_val(doc_addr.doc_id as u64)
}))
}
}
let column = RemappedFFReader {
doc_id_mapping,
fast_field_readers,
};
let field_write = fast_field_serializer.get_field_writer(field, 0);
serialize_u128(column, field_write)?;
let iter = || {
doc_id_mapping.iter_old_doc_addrs().map(|doc_addr| {
let fast_field_reader = &fast_field_readers[doc_addr.segment_ord as usize];
fast_field_reader.get_val(doc_addr.doc_id as u64)
})
};
fastfield_codecs::serialize_u128(iter, doc_id_mapping.len() as u64, field_write)?;
Ok(())
}

Expand Down
6 changes: 6 additions & 0 deletions src/schema/document.rs
@@ -1,6 +1,7 @@
use std::collections::{HashMap, HashSet};
use std::io::{self, Read, Write};
use std::mem;
use std::net::IpAddr;

use common::{BinarySerializable, VInt};

Expand Down Expand Up @@ -97,6 +98,11 @@ impl Document {
self.add_field_value(field, value);
}

/// Add a u64 field
pub fn add_ip(&mut self, field: Field, value: IpAddr) {
self.add_field_value(field, value);
}

/// Add a i64 field
pub fn add_i64(&mut self, field: Field, value: i64) {
self.add_field_value(field, value);
Expand Down

0 comments on commit 6834424

Please sign in to comment.