Skip to content

Commit

Permalink
Merge pull request #1491 from quickwit-oss/col-trait-refact
Browse files Browse the repository at this point in the history
Introducing a column trait
  • Loading branch information
PSeitz committed Aug 28, 2022
2 parents c73b425 + 095fb68 commit 6f563b1
Show file tree
Hide file tree
Showing 27 changed files with 294 additions and 273 deletions.
5 changes: 3 additions & 2 deletions examples/custom_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@
// Of course, you can have a look at the tantivy's built-in collectors
// such as the `CountCollector` for more examples.

use fastfield_codecs::Column;
// ---
// Importing tantivy...
use tantivy::collector::{Collector, SegmentCollector};
use tantivy::fastfield::{DynamicFastFieldReader, FastFieldReader};
use tantivy::fastfield::DynamicFastFieldReader;
use tantivy::query::QueryParser;
use tantivy::schema::{Field, Schema, FAST, INDEXED, TEXT};
use tantivy::{doc, Index, Score, SegmentReader};
Expand Down Expand Up @@ -103,7 +104,7 @@ impl SegmentCollector for StatsSegmentCollector {
type Fruit = Option<Stats>;

fn collect(&mut self, doc: u32, _score: Score) {
let value = self.fast_field_reader.get(doc) as f64;
let value = self.fast_field_reader.get_val(doc as u64) as f64;
self.stats.count += 1;
self.stats.sum += value;
self.stats.squared_sum += value * value;
Expand Down
4 changes: 2 additions & 2 deletions examples/warmer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ use std::cmp::Reverse;
use std::collections::{HashMap, HashSet};
use std::sync::{Arc, RwLock, Weak};

use fastfield_codecs::Column;
use tantivy::collector::TopDocs;
use tantivy::fastfield::FastFieldReader;
use tantivy::query::QueryParser;
use tantivy::schema::{Field, Schema, FAST, TEXT};
use tantivy::{
Expand Down Expand Up @@ -52,7 +52,7 @@ impl Warmer for DynamicPriceColumn {
let product_id_reader = segment.fast_fields().u64(self.field)?;
let product_ids: Vec<ProductId> = segment
.doc_ids_alive()
.map(|doc| product_id_reader.get(doc))
.map(|doc| product_id_reader.get_val(doc as u64))
.collect();
let mut prices_it = self.price_fetcher.fetch_prices(&product_ids).into_iter();
let mut price_vals: Vec<Price> = Vec::new();
Expand Down
11 changes: 4 additions & 7 deletions fastfield_codecs/src/bitpacked.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use common::BinarySerializable;
use ownedbytes::OwnedBytes;
use tantivy_bitpacker::{compute_num_bits, BitPacker, BitUnpacker};

use crate::{FastFieldCodec, FastFieldCodecType, FastFieldDataAccess};
use crate::{Column, FastFieldCodec, FastFieldCodecType};

/// Depending on the field type, a different
/// fast field is required.
Expand All @@ -17,7 +17,7 @@ pub struct BitpackedReader {
num_vals: u64,
}

impl FastFieldDataAccess for BitpackedReader {
impl Column for BitpackedReader {
#[inline]
fn get_val(&self, doc: u64) -> u64 {
self.min_value_u64 + self.bit_unpacker.get(doc, &self.data)
Expand Down Expand Up @@ -124,10 +124,7 @@ impl FastFieldCodec for BitpackedCodec {
/// It requires a `min_value` and a `max_value` to compute
/// compute the minimum number of bits required to encode
/// values.
fn serialize(
write: &mut impl Write,
fastfield_accessor: &dyn FastFieldDataAccess,
) -> io::Result<()> {
fn serialize(write: &mut impl Write, fastfield_accessor: &dyn Column) -> io::Result<()> {
let mut serializer = BitpackedSerializerLegacy::open(
write,
fastfield_accessor.min_value(),
Expand All @@ -142,7 +139,7 @@ impl FastFieldCodec for BitpackedCodec {
Ok(())
}

fn estimate(fastfield_accessor: &impl FastFieldDataAccess) -> Option<f32> {
fn estimate(fastfield_accessor: &impl Column) -> Option<f32> {
let amplitude = fastfield_accessor.max_value() - fastfield_accessor.min_value();
let num_bits = compute_num_bits(amplitude);
let num_bits_uncompressed = 64;
Expand Down
11 changes: 4 additions & 7 deletions fastfield_codecs/src/blockwise_linear.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use ownedbytes::OwnedBytes;
use tantivy_bitpacker::{compute_num_bits, BitPacker, BitUnpacker};

use crate::linear::{get_calculated_value, get_slope};
use crate::{FastFieldCodec, FastFieldCodecType, FastFieldDataAccess};
use crate::{Column, FastFieldCodec, FastFieldCodecType};

const CHUNK_SIZE: u64 = 512;

Expand Down Expand Up @@ -146,7 +146,7 @@ fn get_interpolation_function(doc: u64, interpolations: &[Function]) -> &Functio
&interpolations[get_interpolation_position(doc)]
}

impl FastFieldDataAccess for BlockwiseLinearReader {
impl Column for BlockwiseLinearReader {
#[inline]
fn get_val(&self, idx: u64) -> u64 {
let interpolation = get_interpolation_function(idx, &self.footer.interpolations);
Expand Down Expand Up @@ -195,10 +195,7 @@ impl FastFieldCodec for BlockwiseLinearCodec {
}

/// Creates a new fast field serializer.
fn serialize(
write: &mut impl Write,
fastfield_accessor: &dyn FastFieldDataAccess,
) -> io::Result<()> {
fn serialize(write: &mut impl Write, fastfield_accessor: &dyn Column) -> io::Result<()> {
assert!(fastfield_accessor.min_value() <= fastfield_accessor.max_value());

let first_val = fastfield_accessor.get_val(0);
Expand Down Expand Up @@ -292,7 +289,7 @@ impl FastFieldCodec for BlockwiseLinearCodec {
/// estimation for linear interpolation is hard because, you don't know
/// where the local maxima are for the deviation of the calculated value and
/// the offset is also unknown.
fn estimate(fastfield_accessor: &impl FastFieldDataAccess) -> Option<f32> {
fn estimate(fastfield_accessor: &impl Column) -> Option<f32> {
if fastfield_accessor.num_vals() < 10 * CHUNK_SIZE {
return None;
}
Expand Down
49 changes: 49 additions & 0 deletions fastfield_codecs/src/column.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
pub trait Column<T = u64> {
/// Return the value associated to the given idx.
///
/// This accessor should return as fast as possible.
///
/// # Panics
///
/// May panic if `idx` is greater than the column length.
fn get_val(&self, idx: u64) -> T;

/// Fills an output buffer with the fast field values
/// associated with the `DocId` going from
/// `start` to `start + output.len()`.
///
/// Regardless of the type of `Item`, this method works
/// - transmuting the output array
/// - extracting the `Item`s as if they were `u64`
/// - possibly converting the `u64` value to the right type.
///
/// # Panics
///
/// May panic if `start + output.len()` is greater than
/// the segment's `maxdoc`.
fn get_range(&self, start: u64, output: &mut [T]) {
for (out, idx) in output.iter_mut().zip(start..) {
*out = self.get_val(idx);
}
}

/// Returns the minimum value for this fast field.
///
/// The min value does not take in account of possible
/// deleted document, and should be considered as a lower bound
/// of the actual minimum value.
fn min_value(&self) -> T;

/// Returns the maximum value for this fast field.
///
/// The max value does not take in account of possible
/// deleted document, and should be considered as an upper bound
/// of the actual maximum value
fn max_value(&self) -> T;

fn num_vals(&self) -> u64;
/// Returns a iterator over the data
fn iter<'a>(&'a self) -> Box<dyn Iterator<Item = T> + 'a> {
Box::new((0..self.num_vals()).map(|idx| self.get_val(idx)))
}
}
26 changes: 8 additions & 18 deletions fastfield_codecs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,9 @@ pub mod bitpacked;
pub mod blockwise_linear;
pub mod linear;

pub trait FastFieldDataAccess {
fn get_val(&self, doc: u64) -> u64;
fn min_value(&self) -> u64;
fn max_value(&self) -> u64;
fn num_vals(&self) -> u64;
/// Returns a iterator over the data
fn iter<'a>(&'a self) -> Box<dyn Iterator<Item = u64> + 'a> {
Box::new((0..self.num_vals()).map(|idx| self.get_val(idx)))
}
}
mod column;

pub use self::column::Column;

#[derive(PartialEq, Eq, PartialOrd, Ord, Debug, Clone, Copy)]
#[repr(u8)]
Expand Down Expand Up @@ -68,7 +61,7 @@ pub trait FastFieldCodec {
/// used for debugging and de/serialization.
const CODEC_TYPE: FastFieldCodecType;

type Reader: FastFieldDataAccess;
type Reader: Column<u64>;

/// Reads the metadata and returns the CodecReader
fn open_from_bytes(bytes: OwnedBytes) -> io::Result<Self::Reader>;
Expand All @@ -77,10 +70,7 @@ pub trait FastFieldCodec {
///
/// The fastfield_accessor iterator should be preferred over using fastfield_accessor for
/// performance reasons.
fn serialize(
write: &mut impl Write,
fastfield_accessor: &dyn FastFieldDataAccess,
) -> io::Result<()>;
fn serialize(write: &mut impl Write, fastfield_accessor: &dyn Column<u64>) -> io::Result<()>;

/// Returns an estimate of the compression ratio.
/// If the codec is not applicable, returns `None`.
Expand All @@ -89,7 +79,7 @@ pub trait FastFieldCodec {
///
/// It could make sense to also return a value representing
/// computational complexity.
fn estimate(fastfield_accessor: &impl FastFieldDataAccess) -> Option<f32>;
fn estimate(fastfield_accessor: &impl Column) -> Option<f32>;
}

#[derive(Debug, Clone)]
Expand All @@ -100,7 +90,7 @@ pub struct FastFieldStats {
pub num_vals: u64,
}

impl<'a> FastFieldDataAccess for &'a [u64] {
impl<'a> Column for &'a [u64] {
fn get_val(&self, position: u64) -> u64 {
self[position as usize]
}
Expand All @@ -122,7 +112,7 @@ impl<'a> FastFieldDataAccess for &'a [u64] {
}
}

impl FastFieldDataAccess for Vec<u64> {
impl Column for Vec<u64> {
fn get_val(&self, position: u64) -> u64 {
self[position as usize]
}
Expand Down
11 changes: 4 additions & 7 deletions fastfield_codecs/src/linear.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use common::{BinarySerializable, FixedSize};
use ownedbytes::OwnedBytes;
use tantivy_bitpacker::{compute_num_bits, BitPacker, BitUnpacker};

use crate::{FastFieldCodec, FastFieldCodecType, FastFieldDataAccess};
use crate::{Column, FastFieldCodec, FastFieldCodecType};

/// Depending on the field type, a different
/// fast field is required.
Expand Down Expand Up @@ -57,7 +57,7 @@ impl FixedSize for LinearFooter {
const SIZE_IN_BYTES: usize = 56;
}

impl FastFieldDataAccess for LinearReader {
impl Column for LinearReader {
#[inline]
fn get_val(&self, doc: u64) -> u64 {
let calculated_value = get_calculated_value(self.footer.first_val, doc, self.slope);
Expand Down Expand Up @@ -143,10 +143,7 @@ impl FastFieldCodec for LinearCodec {
}

/// Creates a new fast field serializer.
fn serialize(
write: &mut impl Write,
fastfield_accessor: &dyn FastFieldDataAccess,
) -> io::Result<()> {
fn serialize(write: &mut impl Write, fastfield_accessor: &dyn Column) -> io::Result<()> {
assert!(fastfield_accessor.min_value() <= fastfield_accessor.max_value());

let first_val = fastfield_accessor.get_val(0);
Expand Down Expand Up @@ -196,7 +193,7 @@ impl FastFieldCodec for LinearCodec {
/// estimation for linear interpolation is hard because, you don't know
/// where the local maxima for the deviation of the calculated value are and
/// the offset to shift all values to >=0 is also unknown.
fn estimate(fastfield_accessor: &impl FastFieldDataAccess) -> Option<f32> {
fn estimate(fastfield_accessor: &impl Column) -> Option<f32> {
if fastfield_accessor.num_vals() < 3 {
return None; // disable compressor for this case
}
Expand Down
17 changes: 9 additions & 8 deletions src/aggregation/bucket/histogram/histogram.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::cmp::Ordering;
use std::fmt::Display;

use fastfield_codecs::Column;
use itertools::Itertools;
use serde::{Deserialize, Serialize};

Expand All @@ -14,7 +15,7 @@ use crate::aggregation::intermediate_agg_result::{
IntermediateAggregationResults, IntermediateBucketResult, IntermediateHistogramBucketEntry,
};
use crate::aggregation::segment_agg_result::SegmentAggregationResultsCollector;
use crate::fastfield::{DynamicFastFieldReader, FastFieldReader};
use crate::fastfield::DynamicFastFieldReader;
use crate::schema::Type;
use crate::{DocId, TantivyError};

Expand Down Expand Up @@ -331,10 +332,10 @@ impl SegmentHistogramCollector {
.expect("unexpected fast field cardinatility");
let mut iter = doc.chunks_exact(4);
for docs in iter.by_ref() {
let val0 = self.f64_from_fastfield_u64(accessor.get(docs[0]));
let val1 = self.f64_from_fastfield_u64(accessor.get(docs[1]));
let val2 = self.f64_from_fastfield_u64(accessor.get(docs[2]));
let val3 = self.f64_from_fastfield_u64(accessor.get(docs[3]));
let val0 = self.f64_from_fastfield_u64(accessor.get_val(docs[0] as u64));
let val1 = self.f64_from_fastfield_u64(accessor.get_val(docs[1] as u64));
let val2 = self.f64_from_fastfield_u64(accessor.get_val(docs[2] as u64));
let val3 = self.f64_from_fastfield_u64(accessor.get_val(docs[3] as u64));

let bucket_pos0 = get_bucket_num(val0);
let bucket_pos1 = get_bucket_num(val1);
Expand Down Expand Up @@ -370,8 +371,8 @@ impl SegmentHistogramCollector {
&bucket_with_accessor.sub_aggregation,
)?;
}
for doc in iter.remainder() {
let val = f64_from_fastfield_u64(accessor.get(*doc), &self.field_type);
for &doc in iter.remainder() {
let val = f64_from_fastfield_u64(accessor.get_val(doc as u64), &self.field_type);
if !bounds.contains(val) {
continue;
}
Expand All @@ -382,7 +383,7 @@ impl SegmentHistogramCollector {
self.buckets[bucket_pos].key,
get_bucket_val(val, self.interval, self.offset) as f64
);
self.increment_bucket(bucket_pos, *doc, &bucket_with_accessor.sub_aggregation)?;
self.increment_bucket(bucket_pos, doc, &bucket_with_accessor.sub_aggregation)?;
}
if force_flush {
if let Some(sub_aggregations) = self.sub_aggregations.as_mut() {
Expand Down
16 changes: 8 additions & 8 deletions src/aggregation/bucket/range.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::fmt::Debug;
use std::ops::Range;

use fastfield_codecs::Column;
use fnv::FnvHashMap;
use serde::{Deserialize, Serialize};

Expand All @@ -12,7 +13,6 @@ use crate::aggregation::intermediate_agg_result::{
};
use crate::aggregation::segment_agg_result::{BucketCount, SegmentAggregationResultsCollector};
use crate::aggregation::{f64_from_fastfield_u64, f64_to_fastfield_u64, Key, SerializedKey};
use crate::fastfield::FastFieldReader;
use crate::schema::Type;
use crate::{DocId, TantivyError};

Expand Down Expand Up @@ -264,10 +264,10 @@ impl SegmentRangeCollector {
.as_single()
.expect("unexpected fast field cardinatility");
for docs in iter.by_ref() {
let val1 = accessor.get(docs[0]);
let val2 = accessor.get(docs[1]);
let val3 = accessor.get(docs[2]);
let val4 = accessor.get(docs[3]);
let val1 = accessor.get_val(docs[0] as u64);
let val2 = accessor.get_val(docs[1] as u64);
let val3 = accessor.get_val(docs[2] as u64);
let val4 = accessor.get_val(docs[3] as u64);
let bucket_pos1 = self.get_bucket_pos(val1);
let bucket_pos2 = self.get_bucket_pos(val2);
let bucket_pos3 = self.get_bucket_pos(val3);
Expand All @@ -278,10 +278,10 @@ impl SegmentRangeCollector {
self.increment_bucket(bucket_pos3, docs[2], &bucket_with_accessor.sub_aggregation)?;
self.increment_bucket(bucket_pos4, docs[3], &bucket_with_accessor.sub_aggregation)?;
}
for doc in iter.remainder() {
let val = accessor.get(*doc);
for &doc in iter.remainder() {
let val = accessor.get_val(doc as u64);
let bucket_pos = self.get_bucket_pos(val);
self.increment_bucket(bucket_pos, *doc, &bucket_with_accessor.sub_aggregation)?;
self.increment_bucket(bucket_pos, doc, &bucket_with_accessor.sub_aggregation)?;
}
if force_flush {
for bucket in &mut self.buckets {
Expand Down

0 comments on commit 6f563b1

Please sign in to comment.