diff --git a/common/src/serialize.rs b/common/src/serialize.rs index bc893f7d06..7b96316ee4 100644 --- a/common/src/serialize.rs +++ b/common/src/serialize.rs @@ -107,6 +107,19 @@ impl FixedSize for u64 { const SIZE_IN_BYTES: usize = 8; } +impl BinarySerializable for u128 { + fn serialize(&self, writer: &mut W) -> io::Result<()> { + writer.write_u128::(*self) + } + fn deserialize(reader: &mut R) -> io::Result { + reader.read_u128::() + } +} + +impl FixedSize for u128 { + const SIZE_IN_BYTES: usize = 16; +} + impl BinarySerializable for f32 { fn serialize(&self, writer: &mut W) -> io::Result<()> { writer.write_f32::(*self) diff --git a/fastfield_codecs/benches/bench.rs b/fastfield_codecs/benches/bench.rs index 526036d4a2..5546d2af70 100644 --- a/fastfield_codecs/benches/bench.rs +++ b/fastfield_codecs/benches/bench.rs @@ -100,9 +100,10 @@ mod tests { fn get_u128_column_from_data(data: &[u128]) -> Arc> { let mut out = vec![]; - serialize_u128(VecColumn::from(&data), &mut out).unwrap(); + let iter_gen = || data.iter().cloned(); + serialize_u128(iter_gen, data.len() as u64, &mut out).unwrap(); let out = OwnedBytes::new(out); - open_u128(out).unwrap() + open_u128::(out).unwrap() } #[bench] diff --git a/fastfield_codecs/src/column.rs b/fastfield_codecs/src/column.rs index 2962472f9a..afbb7785fa 100644 --- a/fastfield_codecs/src/column.rs +++ b/fastfield_codecs/src/column.rs @@ -3,6 +3,8 @@ use std::ops::RangeInclusive; use tantivy_bitpacker::minmax; +use crate::monotonic_mapping::StrictlyMonotonicFn; + pub trait Column: Send + Sync { /// Return the value associated with the given idx. /// @@ -143,16 +145,30 @@ struct MonotonicMappingColumn { _phantom: PhantomData, } -/// Creates a view of a column transformed by a monotonic mapping. -pub fn monotonic_map_column( +/// Creates a view of a column transformed by a strictly monotonic mapping. See +/// [`StrictlyMonotonicFn`]. +/// +/// E.g. apply a gcd monotonic_mapping([100, 200, 300]) == [1, 2, 3] +/// monotonic_mapping.mapping() is expected to be injective, and we should always have +/// monotonic_mapping.inverse(monotonic_mapping.mapping(el)) == el +/// +/// The inverse of the mapping is required for: +/// `fn get_between_vals(&self, range: RangeInclusive) -> Vec ` +/// The user provides the original value range and we need to monotonic map them in the same way the +/// serialization does before calling the underlying column. +/// +/// Note that when opening a codec, the monotonic_mapping should be the inverse of the mapping +/// during serialization. And therefore the monotonic_mapping_inv when opening is the same as +/// monotonic_mapping during serialization. +pub fn monotonic_map_column( from_column: C, monotonic_mapping: T, ) -> impl Column where C: Column, - T: Fn(Input) -> Output + Send + Sync, - Input: Send + Sync, - Output: Send + Sync, + T: StrictlyMonotonicFn + Send + Sync, + Input: PartialOrd + Send + Sync + Clone, + Output: PartialOrd + Send + Sync + Clone, { MonotonicMappingColumn { from_column, @@ -161,28 +177,27 @@ where } } -impl Column - for MonotonicMappingColumn +impl Column for MonotonicMappingColumn where C: Column, - T: Fn(Input) -> Output + Send + Sync, - Input: Send + Sync, - Output: Send + Sync, + T: StrictlyMonotonicFn + Send + Sync, + Input: PartialOrd + Send + Sync + Clone, + Output: PartialOrd + Send + Sync + Clone, { #[inline] fn get_val(&self, idx: u64) -> Output { let from_val = self.from_column.get_val(idx); - (self.monotonic_mapping)(from_val) + self.monotonic_mapping.mapping(from_val) } fn min_value(&self) -> Output { let from_min_value = self.from_column.min_value(); - (self.monotonic_mapping)(from_min_value) + self.monotonic_mapping.mapping(from_min_value) } fn max_value(&self) -> Output { let from_max_value = self.from_column.max_value(); - (self.monotonic_mapping)(from_max_value) + self.monotonic_mapping.mapping(from_max_value) } fn num_vals(&self) -> u64 { @@ -190,7 +205,18 @@ where } fn iter(&self) -> Box + '_> { - Box::new(self.from_column.iter().map(&self.monotonic_mapping)) + Box::new( + self.from_column + .iter() + .map(|el| self.monotonic_mapping.mapping(el)), + ) + } + + fn get_between_vals(&self, range: RangeInclusive) -> Vec { + self.from_column.get_between_vals( + self.monotonic_mapping.inverse(range.start().clone()) + ..=self.monotonic_mapping.inverse(range.end().clone()), + ) } // We voluntarily do not implement get_range as it yields a regression, @@ -236,19 +262,22 @@ where #[cfg(test)] mod tests { use super::*; - use crate::MonotonicallyMappableToU64; + use crate::monotonic_mapping::{ + StrictlyMonotonicMappingInverter, StrictlyMonotonicMappingToInternalBaseval, + StrictlyMonotonicMappingToInternalGCDBaseval, + }; #[test] fn test_monotonic_mapping() { - let vals = &[1u64, 3u64][..]; + let vals = &[3u64, 5u64][..]; let col = VecColumn::from(vals); - let mapped = monotonic_map_column(col, |el| el + 4); - assert_eq!(mapped.min_value(), 5u64); - assert_eq!(mapped.max_value(), 7u64); + let mapped = monotonic_map_column(col, StrictlyMonotonicMappingToInternalBaseval::new(2)); + assert_eq!(mapped.min_value(), 1u64); + assert_eq!(mapped.max_value(), 3u64); assert_eq!(mapped.num_vals(), 2); assert_eq!(mapped.num_vals(), 2); - assert_eq!(mapped.get_val(0), 5); - assert_eq!(mapped.get_val(1), 7); + assert_eq!(mapped.get_val(0), 1); + assert_eq!(mapped.get_val(1), 3); } #[test] @@ -260,10 +289,15 @@ mod tests { #[test] fn test_monotonic_mapping_iter() { - let vals: Vec = (-1..99).map(i64::to_u64).collect(); + let vals: Vec = (10..110u64).map(|el| el * 10).collect(); let col = VecColumn::from(&vals); - let mapped = monotonic_map_column(col, |el| i64::from_u64(el) * 10i64); - let val_i64s: Vec = mapped.iter().collect(); + let mapped = monotonic_map_column( + col, + StrictlyMonotonicMappingInverter::from( + StrictlyMonotonicMappingToInternalGCDBaseval::new(10, 100), + ), + ); + let val_i64s: Vec = mapped.iter().collect(); for i in 0..100 { assert_eq!(val_i64s[i as usize], mapped.get_val(i)); } @@ -271,20 +305,26 @@ mod tests { #[test] fn test_monotonic_mapping_get_range() { - let vals: Vec = (-1..99).map(i64::to_u64).collect(); + let vals: Vec = (0..100u64).map(|el| el * 10).collect(); let col = VecColumn::from(&vals); - let mapped = monotonic_map_column(col, |el| i64::from_u64(el) * 10i64); - assert_eq!(mapped.min_value(), -10i64); - assert_eq!(mapped.max_value(), 980i64); + let mapped = monotonic_map_column( + col, + StrictlyMonotonicMappingInverter::from( + StrictlyMonotonicMappingToInternalGCDBaseval::new(10, 0), + ), + ); + + assert_eq!(mapped.min_value(), 0u64); + assert_eq!(mapped.max_value(), 9900u64); assert_eq!(mapped.num_vals(), 100); - let val_i64s: Vec = mapped.iter().collect(); - assert_eq!(val_i64s.len(), 100); + let val_u64s: Vec = mapped.iter().collect(); + assert_eq!(val_u64s.len(), 100); for i in 0..100 { - assert_eq!(val_i64s[i as usize], mapped.get_val(i)); - assert_eq!(val_i64s[i as usize], i64::from_u64(vals[i as usize]) * 10); + assert_eq!(val_u64s[i as usize], mapped.get_val(i)); + assert_eq!(val_u64s[i as usize], vals[i as usize] * 10); } - let mut buf = [0i64; 20]; + let mut buf = [0u64; 20]; mapped.get_range(7, &mut buf[..]); - assert_eq!(&val_i64s[7..][..20], &buf); + assert_eq!(&val_u64s[7..][..20], &buf); } } diff --git a/fastfield_codecs/src/compact_space/mod.rs b/fastfield_codecs/src/compact_space/mod.rs index 389bccf6e7..dd6dfbdbbe 100644 --- a/fastfield_codecs/src/compact_space/mod.rs +++ b/fastfield_codecs/src/compact_space/mod.rs @@ -171,10 +171,10 @@ pub struct IPCodecParams { impl CompactSpaceCompressor { /// Taking the vals as Vec may cost a lot of memory. It is used to sort the vals. - pub fn train_from(column: &impl Column) -> Self { + pub fn train_from(iter: impl Iterator, num_vals: u64) -> Self { let mut values_sorted = BTreeSet::new(); - values_sorted.extend(column.iter()); - let total_num_values = column.num_vals(); + values_sorted.extend(iter); + let total_num_values = num_vals; let compact_space = get_compact_space(&values_sorted, total_num_values, COST_PER_BLANK_IN_BITS); @@ -443,7 +443,7 @@ impl CompactSpaceDecompressor { mod tests { use super::*; - use crate::{open_u128, serialize_u128, VecColumn}; + use crate::{open_u128, serialize_u128}; #[test] fn compact_space_test() { @@ -513,7 +513,12 @@ mod tests { fn test_aux_vals(u128_vals: &[u128]) -> OwnedBytes { let mut out = Vec::new(); - serialize_u128(VecColumn::from(u128_vals), &mut out).unwrap(); + serialize_u128( + || u128_vals.iter().cloned(), + u128_vals.len() as u64, + &mut out, + ) + .unwrap(); let data = OwnedBytes::new(out); test_all(data.clone(), u128_vals); @@ -603,8 +608,8 @@ mod tests { 5_000_000_000, ]; let mut out = Vec::new(); - serialize_u128(VecColumn::from(vals), &mut out).unwrap(); - let decomp = open_u128(OwnedBytes::new(out)).unwrap(); + serialize_u128(|| vals.iter().cloned(), vals.len() as u64, &mut out).unwrap(); + let decomp = open_u128::(OwnedBytes::new(out)).unwrap(); assert_eq!(decomp.get_between_vals(199..=200), vec![0]); assert_eq!(decomp.get_between_vals(199..=201), vec![0, 1]); diff --git a/fastfield_codecs/src/lib.rs b/fastfield_codecs/src/lib.rs index 1f66a27e9b..07a86cc763 100644 --- a/fastfield_codecs/src/lib.rs +++ b/fastfield_codecs/src/lib.rs @@ -13,6 +13,10 @@ use std::sync::Arc; use common::BinarySerializable; use compact_space::CompactSpaceDecompressor; +use monotonic_mapping::{ + StrictlyMonotonicMappingInverter, StrictlyMonotonicMappingToInternal, + StrictlyMonotonicMappingToInternalBaseval, StrictlyMonotonicMappingToInternalGCDBaseval, +}; use ownedbytes::OwnedBytes; use serialize::Header; @@ -22,6 +26,7 @@ mod compact_space; mod line; mod linear; mod monotonic_mapping; +mod monotonic_mapping_u128; mod column; mod gcd; @@ -31,7 +36,8 @@ use self::bitpacked::BitpackedCodec; use self::blockwise_linear::BlockwiseLinearCodec; pub use self::column::{monotonic_map_column, Column, VecColumn}; use self::linear::LinearCodec; -pub use self::monotonic_mapping::MonotonicallyMappableToU64; +pub use self::monotonic_mapping::{MonotonicallyMappableToU64, StrictlyMonotonicFn}; +pub use self::monotonic_mapping_u128::MonotonicallyMappableToU128; pub use self::serialize::{ estimate, serialize, serialize_and_load, serialize_u128, NormalizedHeader, }; @@ -73,8 +79,13 @@ impl FastFieldCodecType { } /// Returns the correct codec reader wrapped in the `Arc` for the data. -pub fn open_u128(bytes: OwnedBytes) -> io::Result>> { - Ok(Arc::new(CompactSpaceDecompressor::open(bytes)?)) +pub fn open_u128( + bytes: OwnedBytes, +) -> io::Result>> { + let reader = CompactSpaceDecompressor::open(bytes)?; + let inverted: StrictlyMonotonicMappingInverter> = + StrictlyMonotonicMappingToInternal::::new().into(); + Ok(Arc::new(monotonic_map_column(reader, inverted))) } /// Returns the correct codec reader wrapped in the `Arc` for the data. @@ -99,11 +110,15 @@ fn open_specific_codec( let reader = C::open_from_bytes(bytes, normalized_header)?; let min_value = header.min_value; if let Some(gcd) = header.gcd { - let monotonic_mapping = move |val: u64| Item::from_u64(min_value + val * gcd.get()); - Ok(Arc::new(monotonic_map_column(reader, monotonic_mapping))) + let mapping = StrictlyMonotonicMappingInverter::from( + StrictlyMonotonicMappingToInternalGCDBaseval::new(gcd.get(), min_value), + ); + Ok(Arc::new(monotonic_map_column(reader, mapping))) } else { - let monotonic_mapping = move |val: u64| Item::from_u64(min_value + val); - Ok(Arc::new(monotonic_map_column(reader, monotonic_mapping))) + let mapping = StrictlyMonotonicMappingInverter::from( + StrictlyMonotonicMappingToInternalBaseval::new(min_value), + ); + Ok(Arc::new(monotonic_map_column(reader, mapping))) } } @@ -143,6 +158,7 @@ pub const ALL_CODEC_TYPES: [FastFieldCodecType; 3] = [ #[cfg(test)] mod tests { + use proptest::prelude::*; use proptest::strategy::Strategy; use proptest::{prop_oneof, proptest}; @@ -177,6 +193,18 @@ mod tests { `{data:?}`", ); } + + if !data.is_empty() { + let test_rand_idx = rand::thread_rng().gen_range(0..=data.len() - 1); + let expected_positions: Vec = data + .iter() + .enumerate() + .filter(|(_, el)| **el == data[test_rand_idx]) + .map(|(pos, _)| pos as u64) + .collect(); + let positions = reader.get_between_vals(data[test_rand_idx]..=data[test_rand_idx]); + assert_eq!(expected_positions, positions); + } Some((estimation, actual_compression)) } diff --git a/fastfield_codecs/src/main.rs b/fastfield_codecs/src/main.rs index d3d9c06f8d..8e81c41f5f 100644 --- a/fastfield_codecs/src/main.rs +++ b/fastfield_codecs/src/main.rs @@ -90,7 +90,7 @@ fn bench_ip() { { let mut data = vec![]; for dataset in dataset.chunks(500_000) { - serialize_u128(VecColumn::from(dataset), &mut data).unwrap(); + serialize_u128(|| dataset.iter().cloned(), dataset.len() as u64, &mut data).unwrap(); } let compression = data.len() as f64 / (dataset.len() * 16) as f64; println!("Compression 50_000 chunks {:.4}", compression); @@ -101,7 +101,10 @@ fn bench_ip() { } let mut data = vec![]; - serialize_u128(VecColumn::from(&dataset), &mut data).unwrap(); + { + print_time!("creation"); + serialize_u128(|| dataset.iter().cloned(), dataset.len() as u64, &mut data).unwrap(); + } let compression = data.len() as f64 / (dataset.len() * 16) as f64; println!("Compression {:.2}", compression); @@ -110,7 +113,7 @@ fn bench_ip() { (data.len() * 8) as f32 / dataset.len() as f32 ); - let decompressor = open_u128(OwnedBytes::new(data)).unwrap(); + let decompressor = open_u128::(OwnedBytes::new(data)).unwrap(); // Sample some ranges for value in dataset.iter().take(1110).skip(1100).cloned() { print_time!("get range"); diff --git a/fastfield_codecs/src/monotonic_mapping.rs b/fastfield_codecs/src/monotonic_mapping.rs index d4e673040f..ebd34e2fad 100644 --- a/fastfield_codecs/src/monotonic_mapping.rs +++ b/fastfield_codecs/src/monotonic_mapping.rs @@ -1,3 +1,9 @@ +use std::marker::PhantomData; + +use fastdivide::DividerU64; + +use crate::MonotonicallyMappableToU128; + pub trait MonotonicallyMappableToU64: 'static + PartialOrd + Copy + Send + Sync { /// Converts a value to u64. /// @@ -11,6 +17,145 @@ pub trait MonotonicallyMappableToU64: 'static + PartialOrd + Copy + Send + Sync fn from_u64(val: u64) -> Self; } +/// Values need to be strictly monotonic mapped to a `Internal` value (u64 or u128) that can be +/// used in fast field codecs. +/// +/// The monotonic mapping is required so that `PartialOrd` can be used on `Internal` without +/// converting to `External`. +/// +/// All strictly monotonic functions are invertible because they are guaranteed to have a one-to-one +/// mapping from their range to their domain. The `inverse` method is required when opening a codec, +/// so a value can be converted back to its original domain (e.g. ip address or f64) from its +/// internal representation. +pub trait StrictlyMonotonicFn { + /// Strictly monotonically maps the value from External to Internal. + fn mapping(&self, inp: External) -> Internal; + /// Inverse of `mapping`. Maps the value from Internal to External. + fn inverse(&self, out: Internal) -> External; +} + +/// Inverts a strictly monotonic mapping from `StrictlyMonotonicFn` to +/// `StrictlyMonotonicFn`. +/// +/// # Warning +/// +/// This type comes with a footgun. A type being strictly monotonic does not impose that the inverse +/// mapping is strictly monotonic over the entire space External. e.g. a -> a * 2. Use at your own +/// risks. +pub(crate) struct StrictlyMonotonicMappingInverter { + orig_mapping: T, +} +impl From for StrictlyMonotonicMappingInverter { + fn from(orig_mapping: T) -> Self { + Self { orig_mapping } + } +} + +impl StrictlyMonotonicFn for StrictlyMonotonicMappingInverter +where T: StrictlyMonotonicFn +{ + fn mapping(&self, val: To) -> From { + self.orig_mapping.inverse(val) + } + + fn inverse(&self, val: From) -> To { + self.orig_mapping.mapping(val) + } +} + +/// Applies the strictly monotonic mapping from `T` without any additional changes. +pub(crate) struct StrictlyMonotonicMappingToInternal { + _phantom: PhantomData, +} + +impl StrictlyMonotonicMappingToInternal { + pub(crate) fn new() -> StrictlyMonotonicMappingToInternal { + Self { + _phantom: PhantomData, + } + } +} + +impl + StrictlyMonotonicFn for StrictlyMonotonicMappingToInternal +where T: MonotonicallyMappableToU128 +{ + fn mapping(&self, inp: External) -> u128 { + External::to_u128(inp) + } + + fn inverse(&self, out: u128) -> External { + External::from_u128(out) + } +} + +impl + StrictlyMonotonicFn for StrictlyMonotonicMappingToInternal +where T: MonotonicallyMappableToU64 +{ + fn mapping(&self, inp: External) -> u64 { + External::to_u64(inp) + } + + fn inverse(&self, out: u64) -> External { + External::from_u64(out) + } +} + +/// Mapping dividing by gcd and a base value. +/// +/// The function is assumed to be only called on values divided by passed +/// gcd value. (It is necessary for the function to be monotonic.) +pub(crate) struct StrictlyMonotonicMappingToInternalGCDBaseval { + gcd_divider: DividerU64, + gcd: u64, + min_value: u64, +} +impl StrictlyMonotonicMappingToInternalGCDBaseval { + pub(crate) fn new(gcd: u64, min_value: u64) -> Self { + let gcd_divider = DividerU64::divide_by(gcd); + Self { + gcd_divider, + gcd, + min_value, + } + } +} +impl StrictlyMonotonicFn + for StrictlyMonotonicMappingToInternalGCDBaseval +{ + fn mapping(&self, inp: External) -> u64 { + self.gcd_divider + .divide(External::to_u64(inp) - self.min_value) + } + + fn inverse(&self, out: u64) -> External { + External::from_u64(self.min_value + out * self.gcd) + } +} + +/// Strictly monotonic mapping with a base value. +pub(crate) struct StrictlyMonotonicMappingToInternalBaseval { + min_value: u64, +} +impl StrictlyMonotonicMappingToInternalBaseval { + pub(crate) fn new(min_value: u64) -> Self { + Self { min_value } + } +} + +impl StrictlyMonotonicFn + for StrictlyMonotonicMappingToInternalBaseval +{ + fn mapping(&self, val: External) -> u64 { + External::to_u64(val) - self.min_value + } + + fn inverse(&self, val: u64) -> External { + External::from_u64(self.min_value + val) + } +} + impl MonotonicallyMappableToU64 for u64 { fn to_u64(self) -> u64 { self @@ -54,3 +199,33 @@ impl MonotonicallyMappableToU64 for f64 { common::u64_to_f64(val) } } + +#[cfg(test)] +mod tests { + + use super::*; + + #[test] + fn strictly_monotonic_test() { + // identity mapping + test_round_trip(&StrictlyMonotonicMappingToInternal::::new(), 100u64); + // round trip to i64 + test_round_trip(&StrictlyMonotonicMappingToInternal::::new(), 100u64); + // identity mapping + test_round_trip(&StrictlyMonotonicMappingToInternal::::new(), 100u128); + + // base value to i64 round trip + let mapping = StrictlyMonotonicMappingToInternalBaseval::new(100); + test_round_trip::<_, _, u64>(&mapping, 100i64); + // base value and gcd to u64 round trip + let mapping = StrictlyMonotonicMappingToInternalGCDBaseval::new(10, 100); + test_round_trip::<_, _, u64>(&mapping, 100u64); + } + + fn test_round_trip, K: std::fmt::Debug + Eq + Copy, L>( + mapping: &T, + test_val: K, + ) { + assert_eq!(mapping.inverse(mapping.mapping(test_val)), test_val); + } +} diff --git a/fastfield_codecs/src/monotonic_mapping_u128.rs b/fastfield_codecs/src/monotonic_mapping_u128.rs index 9f8c0d8cb9..979d6c8c39 100644 --- a/fastfield_codecs/src/monotonic_mapping_u128.rs +++ b/fastfield_codecs/src/monotonic_mapping_u128.rs @@ -1,4 +1,4 @@ -use std::net::{IpAddr, Ipv6Addr}; +use std::net::Ipv6Addr; pub trait MonotonicallyMappableToU128: 'static + PartialOrd + Copy + Send + Sync { /// Converts a value to u128. @@ -23,20 +23,16 @@ impl MonotonicallyMappableToU128 for u128 { } } -impl MonotonicallyMappableToU128 for IpAddr { +impl MonotonicallyMappableToU128 for Ipv6Addr { fn to_u128(self) -> u128 { ip_to_u128(self) } fn from_u128(val: u128) -> Self { - IpAddr::from(val.to_be_bytes()) + Ipv6Addr::from(val.to_be_bytes()) } } -fn ip_to_u128(ip_addr: IpAddr) -> u128 { - let ip_addr_v6: Ipv6Addr = match ip_addr { - IpAddr::V4(v4) => v4.to_ipv6_mapped(), - IpAddr::V6(v6) => v6, - }; - u128::from_be_bytes(ip_addr_v6.octets()) +fn ip_to_u128(ip_addr: Ipv6Addr) -> u128 { + u128::from_be_bytes(ip_addr.octets()) } diff --git a/fastfield_codecs/src/serialize.rs b/fastfield_codecs/src/serialize.rs index 92f55f5d0f..c916c758ec 100644 --- a/fastfield_codecs/src/serialize.rs +++ b/fastfield_codecs/src/serialize.rs @@ -22,7 +22,6 @@ use std::num::NonZeroU64; use std::sync::Arc; use common::{BinarySerializable, VInt}; -use fastdivide::DividerU64; use log::warn; use ownedbytes::OwnedBytes; @@ -30,6 +29,10 @@ use crate::bitpacked::BitpackedCodec; use crate::blockwise_linear::BlockwiseLinearCodec; use crate::compact_space::CompactSpaceCompressor; use crate::linear::LinearCodec; +use crate::monotonic_mapping::{ + StrictlyMonotonicFn, StrictlyMonotonicMappingToInternal, + StrictlyMonotonicMappingToInternalGCDBaseval, +}; use crate::{ monotonic_map_column, Column, FastFieldCodec, FastFieldCodecType, MonotonicallyMappableToU64, VecColumn, ALL_CODEC_TYPES, @@ -57,8 +60,11 @@ pub(crate) struct Header { impl Header { pub fn normalized(self) -> NormalizedHeader { - let max_value = - (self.max_value - self.min_value) / self.gcd.map(|gcd| gcd.get()).unwrap_or(1); + let gcd = self.gcd.map(|gcd| gcd.get()).unwrap_or(1); + let gcd_min_val_mapping = + StrictlyMonotonicMappingToInternalGCDBaseval::new(gcd, self.min_value); + + let max_value = gcd_min_val_mapping.mapping(self.max_value); NormalizedHeader { num_vals: self.num_vals, max_value, @@ -66,10 +72,7 @@ impl Header { } pub fn normalize_column(&self, from_column: C) -> impl Column { - let min_value = self.min_value; - let gcd = self.gcd.map(|gcd| gcd.get()).unwrap_or(1); - let divider = DividerU64::divide_by(gcd); - monotonic_map_column(from_column, move |val| divider.divide(val - min_value)) + normalize_column(from_column, self.min_value, self.gcd) } pub fn compute_header( @@ -81,9 +84,8 @@ impl Header { let max_value = column.max_value(); let gcd = crate::gcd::find_gcd(column.iter().map(|val| val - min_value)) .filter(|gcd| gcd.get() > 1u64); - let divider = DividerU64::divide_by(gcd.map(|gcd| gcd.get()).unwrap_or(1u64)); - let shifted_column = monotonic_map_column(&column, |val| divider.divide(val - min_value)); - let codec_type = detect_codec(shifted_column, codecs)?; + let normalized_column = normalize_column(column, min_value, gcd); + let codec_type = detect_codec(normalized_column, codecs)?; Some(Header { num_vals, min_value, @@ -94,6 +96,16 @@ impl Header { } } +pub fn normalize_column( + from_column: C, + min_value: u64, + gcd: Option, +) -> impl Column { + let gcd = gcd.map(|gcd| gcd.get()).unwrap_or(1); + let mapping = StrictlyMonotonicMappingToInternalGCDBaseval::new(gcd, min_value); + monotonic_map_column(from_column, mapping) +} + impl BinarySerializable for Header { fn serialize(&self, writer: &mut W) -> io::Result<()> { VInt(self.num_vals).serialize(writer)?; @@ -129,12 +141,15 @@ pub fn estimate( typed_column: impl Column, codec_type: FastFieldCodecType, ) -> Option { - let column = monotonic_map_column(typed_column, T::to_u64); + let column = monotonic_map_column(typed_column, StrictlyMonotonicMappingToInternal::::new()); let min_value = column.min_value(); let gcd = crate::gcd::find_gcd(column.iter().map(|val| val - min_value)) .filter(|gcd| gcd.get() > 1u64); - let divider = DividerU64::divide_by(gcd.map(|gcd| gcd.get()).unwrap_or(1u64)); - let normalized_column = monotonic_map_column(&column, |val| divider.divide(val - min_value)); + let mapping = StrictlyMonotonicMappingToInternalGCDBaseval::new( + gcd.map(|gcd| gcd.get()).unwrap_or(1u64), + min_value, + ); + let normalized_column = monotonic_map_column(&column, mapping); match codec_type { FastFieldCodecType::Bitpacked => BitpackedCodec::estimate(&normalized_column), FastFieldCodecType::Linear => LinearCodec::estimate(&normalized_column), @@ -142,15 +157,14 @@ pub fn estimate( } } -pub fn serialize_u128( - typed_column: impl Column, +pub fn serialize_u128 I, I: Iterator>( + iter_gen: F, + num_vals: u64, output: &mut impl io::Write, ) -> io::Result<()> { // TODO write header, to later support more codecs - let compressor = CompactSpaceCompressor::train_from(&typed_column); - compressor - .compress_into(typed_column.iter(), output) - .unwrap(); + let compressor = CompactSpaceCompressor::train_from(iter_gen(), num_vals); + compressor.compress_into(iter_gen(), output).unwrap(); Ok(()) } @@ -160,7 +174,7 @@ pub fn serialize( output: &mut impl io::Write, codecs: &[FastFieldCodecType], ) -> io::Result<()> { - let column = monotonic_map_column(typed_column, T::to_u64); + let column = monotonic_map_column(typed_column, StrictlyMonotonicMappingToInternal::::new()); let header = Header::compute_header(&column, codecs).ok_or_else(|| { io::Error::new( io::ErrorKind::InvalidInput, diff --git a/src/fastfield/mod.rs b/src/fastfield/mod.rs index 3fca75fce4..c825ee85c8 100644 --- a/src/fastfield/mod.rs +++ b/src/fastfield/mod.rs @@ -27,7 +27,10 @@ pub use self::bytes::{BytesFastFieldReader, BytesFastFieldWriter}; pub use self::error::{FastFieldNotAvailableError, Result}; pub use self::facet_reader::FacetReader; pub(crate) use self::multivalued::{get_fastfield_codecs_for_multivalue, MultivalueStartIndex}; -pub use self::multivalued::{MultiValuedFastFieldReader, MultiValuedFastFieldWriter}; +pub use self::multivalued::{ + MultiValueU128FastFieldWriter, MultiValuedFastFieldReader, MultiValuedFastFieldWriter, + MultiValuedU128FastFieldReader, +}; pub use self::readers::FastFieldReaders; pub(crate) use self::readers::{type_and_cardinality, FastType}; pub use self::serializer::{Column, CompositeFastFieldSerializer}; diff --git a/src/fastfield/multivalued/mod.rs b/src/fastfield/multivalued/mod.rs index 26b49abd7b..c625a2e76d 100644 --- a/src/fastfield/multivalued/mod.rs +++ b/src/fastfield/multivalued/mod.rs @@ -3,9 +3,9 @@ mod writer; use fastfield_codecs::FastFieldCodecType; -pub use self::reader::MultiValuedFastFieldReader; -pub use self::writer::MultiValuedFastFieldWriter; +pub use self::reader::{MultiValuedFastFieldReader, MultiValuedU128FastFieldReader}; pub(crate) use self::writer::MultivalueStartIndex; +pub use self::writer::{MultiValueU128FastFieldWriter, MultiValuedFastFieldWriter}; /// The valid codecs for multivalue values excludes the linear interpolation codec. /// diff --git a/src/fastfield/multivalued/reader.rs b/src/fastfield/multivalued/reader.rs index f8e41f2e1c..054bb01e4d 100644 --- a/src/fastfield/multivalued/reader.rs +++ b/src/fastfield/multivalued/reader.rs @@ -1,7 +1,7 @@ -use std::ops::Range; +use std::ops::{Range, RangeInclusive}; use std::sync::Arc; -use fastfield_codecs::Column; +use fastfield_codecs::{Column, MonotonicallyMappableToU128}; use crate::fastfield::{FastValue, MultiValueLength}; use crate::DocId; @@ -99,12 +99,176 @@ impl MultiValueLength for MultiValuedFastFieldReader { self.total_num_vals() as u64 } } + +/// Reader for a multivalued `u128` fast field. +/// +/// The reader is implemented as a `u64` fast field for the index and a `u128` fast field. +/// +/// The `vals_reader` will access the concatenated list of all +/// values for all reader. +/// The `idx_reader` associated, for each document, the index of its first value. +#[derive(Clone)] +pub struct MultiValuedU128FastFieldReader { + idx_reader: Arc>, + vals_reader: Arc>, +} + +impl MultiValuedU128FastFieldReader { + pub(crate) fn open( + idx_reader: Arc>, + vals_reader: Arc>, + ) -> MultiValuedU128FastFieldReader { + Self { + idx_reader, + vals_reader, + } + } + + /// Returns `[start, end)`, such that the values associated + /// to the given document are `start..end`. + #[inline] + fn range(&self, doc: DocId) -> Range { + let start = self.idx_reader.get_val(doc as u64); + let end = self.idx_reader.get_val(doc as u64 + 1); + start..end + } + + /// Returns the array of values associated to the given `doc`. + #[inline] + pub fn get_first_val(&self, doc: DocId) -> Option { + let range = self.range(doc); + if range.is_empty() { + return None; + } + Some(self.vals_reader.get_val(range.start)) + } + + /// Returns the array of values associated to the given `doc`. + #[inline] + fn get_vals_for_range(&self, range: Range, vals: &mut Vec) { + let len = (range.end - range.start) as usize; + vals.resize(len, T::from_u128(0)); + self.vals_reader.get_range(range.start, &mut vals[..]); + } + + /// Returns the array of values associated to the given `doc`. + #[inline] + pub fn get_vals(&self, doc: DocId, vals: &mut Vec) { + let range = self.range(doc); + self.get_vals_for_range(range, vals); + } + + /// Returns all docids which are in the provided value range + pub fn get_between_vals(&self, range: RangeInclusive) -> Vec { + let positions = self.vals_reader.get_between_vals(range); + + positions_to_docids(&positions, self.idx_reader.as_ref()) + } + + /// Iterates over all elements in the fast field + pub fn iter(&self) -> impl Iterator + '_ { + self.vals_reader.iter() + } + + /// Returns the minimum value for this fast field. + /// + /// The min value does not take in account of possible + /// deleted document, and should be considered as a lower bound + /// of the actual mimimum value. + pub fn min_value(&self) -> T { + self.vals_reader.min_value() + } + + /// Returns the maximum value for this fast field. + /// + /// The max value does not take in account of possible + /// deleted document, and should be considered as an upper bound + /// of the actual maximum value. + pub fn max_value(&self) -> T { + self.vals_reader.max_value() + } + + /// Returns the number of values associated with the document `DocId`. + #[inline] + pub fn num_vals(&self, doc: DocId) -> usize { + let range = self.range(doc); + (range.end - range.start) as usize + } + + /// Returns the overall number of values in this field. + #[inline] + pub fn total_num_vals(&self) -> u64 { + self.idx_reader.max_value() + } +} + +impl MultiValueLength for MultiValuedU128FastFieldReader { + fn get_range(&self, doc_id: DocId) -> std::ops::Range { + self.range(doc_id) + } + fn get_len(&self, doc_id: DocId) -> u64 { + self.num_vals(doc_id) as u64 + } + fn get_total_len(&self) -> u64 { + self.total_num_vals() as u64 + } +} + +/// Converts a list of positions of values in a 1:n index to the corresponding list of DocIds. +/// +/// Since there is no index for value pos -> docid, but docid -> value pos range, we scan the index. +/// +/// Correctness: positions needs to be sorted. idx_reader needs to contain monotonically increasing +/// positions. +/// +/// TODO: Instead of a linear scan we can employ a expotential search into binary search to match a +/// docid to its value position. +fn positions_to_docids(positions: &[u64], idx_reader: &C) -> Vec { + let mut docs = vec![]; + let mut cur_doc = 0u32; + let mut last_doc = None; + + for pos in positions { + loop { + let end = idx_reader.get_val(cur_doc as u64 + 1); + if end > *pos { + // avoid duplicates + if Some(cur_doc) == last_doc { + break; + } + docs.push(cur_doc); + last_doc = Some(cur_doc); + break; + } + cur_doc += 1; + } + } + + docs +} + #[cfg(test)] mod tests { + use fastfield_codecs::VecColumn; + use crate::core::Index; + use crate::fastfield::multivalued::reader::positions_to_docids; use crate::schema::{Cardinality, Facet, FacetOptions, NumericOptions, Schema}; + #[test] + fn test_positions_to_docid() { + let positions = vec![10u64, 11, 15, 20, 21, 22]; + + let offsets = vec![0, 10, 12, 15, 22, 23]; + { + let column = VecColumn::from(&offsets); + + let docids = positions_to_docids(&positions, &column); + assert_eq!(docids, vec![1, 3, 4]); + } + } + #[test] fn test_multifastfield_reader() -> crate::Result<()> { let mut schema_builder = Schema::builder(); diff --git a/src/fastfield/multivalued/writer.rs b/src/fastfield/multivalued/writer.rs index 0fb30caf63..127416f590 100644 --- a/src/fastfield/multivalued/writer.rs +++ b/src/fastfield/multivalued/writer.rs @@ -1,6 +1,8 @@ use std::io; -use fastfield_codecs::{Column, MonotonicallyMappableToU64, VecColumn}; +use fastfield_codecs::{ + Column, MonotonicallyMappableToU128, MonotonicallyMappableToU64, VecColumn, +}; use fnv::FnvHashMap; use super::get_fastfield_codecs_for_multivalue; @@ -264,6 +266,143 @@ fn iter_remapped_multivalue_index<'a, C: Column>( })) } +/// Writer for multi-valued (as in, more than one value per document) +/// int fast field. +/// +/// This `Writer` is only useful for advanced users. +/// The normal way to get your multivalued int in your index +/// is to +/// - declare your field with fast set to `Cardinality::MultiValues` +/// in your schema +/// - add your document simply by calling `.add_document(...)`. +/// +/// The `MultiValuedFastFieldWriter` can be acquired from the + +pub struct MultiValueU128FastFieldWriter { + field: Field, + vals: Vec, + doc_index: Vec, +} + +impl MultiValueU128FastFieldWriter { + /// Creates a new `U128MultiValueFastFieldWriter` + pub(crate) fn new(field: Field) -> Self { + MultiValueU128FastFieldWriter { + field, + vals: Vec::new(), + doc_index: Vec::new(), + } + } + + /// The memory used (inclusive childs) + pub fn mem_usage(&self) -> usize { + self.vals.capacity() * std::mem::size_of::() + + self.doc_index.capacity() * std::mem::size_of::() + } + + /// Finalize the current document. + pub(crate) fn next_doc(&mut self) { + self.doc_index.push(self.vals.len() as u64); + } + + /// Pushes a new value to the current document. + pub(crate) fn add_val(&mut self, val: u128) { + self.vals.push(val); + } + + /// Shift to the next document and adds + /// all of the matching field values present in the document. + pub fn add_document(&mut self, doc: &Document) { + self.next_doc(); + for field_value in doc.field_values() { + if field_value.field == self.field { + let value = field_value.value(); + let ip_addr = value + .as_ip_addr() + .unwrap_or_else(|| panic!("expected and ip, but got {:?}", value)); + let ip_addr_u128 = ip_addr.to_u128(); + self.add_val(ip_addr_u128); + } + } + } + + /// Returns an iterator over values per doc_id in ascending doc_id order. + /// + /// Normally the order is simply iterating self.doc_id_index. + /// With doc_id_map it accounts for the new mapping, returning values in the order of the + /// new doc_ids. + fn get_ordered_values<'a: 'b, 'b>( + &'a self, + doc_id_map: Option<&'b DocIdMapping>, + ) -> impl Iterator { + get_ordered_values(&self.vals, &self.doc_index, doc_id_map) + } + + /// Serializes fast field values. + pub fn serialize( + mut self, + serializer: &mut CompositeFastFieldSerializer, + doc_id_map: Option<&DocIdMapping>, + ) -> io::Result<()> { + { + // writing the offset index + // + self.doc_index.push(self.vals.len() as u64); + let col = VecColumn::from(&self.doc_index[..]); + if let Some(doc_id_map) = doc_id_map { + let multi_value_start_index = MultivalueStartIndex::new(&col, doc_id_map); + serializer.create_auto_detect_u64_fast_field_with_idx( + self.field, + multi_value_start_index, + 0, + )?; + } else { + serializer.create_auto_detect_u64_fast_field_with_idx(self.field, col, 0)?; + } + } + { + let iter_gen = || self.get_ordered_values(doc_id_map).flatten().cloned(); + + serializer.create_u128_fast_field_with_idx( + self.field, + iter_gen, + self.vals.len() as u64, + 1, + )?; + } + Ok(()) + } +} + +/// Returns an iterator over values per doc_id in ascending doc_id order. +/// +/// Normally the order is simply iterating self.doc_id_index. +/// With doc_id_map it accounts for the new mapping, returning values in the order of the +/// new doc_ids. +fn get_ordered_values<'a: 'b, 'b, T>( + vals: &'a [T], + doc_index: &'a [u64], + doc_id_map: Option<&'b DocIdMapping>, +) -> impl Iterator { + let doc_id_iter: Box> = if let Some(doc_id_map) = doc_id_map { + Box::new(doc_id_map.iter_old_doc_ids()) + } else { + let max_doc = doc_index.len() as DocId; + Box::new(0..max_doc) + }; + doc_id_iter.map(move |doc_id| get_values_for_doc_id(doc_id, vals, doc_index)) +} + +/// returns all values for a doc_id +fn get_values_for_doc_id<'a, T>(doc_id: u32, vals: &'a [T], doc_index: &'a [u64]) -> &'a [T] { + let start_pos = doc_index[doc_id as usize] as usize; + let end_pos = doc_index + .get(doc_id as usize + 1) + .cloned() + .unwrap_or(vals.len() as u64) as usize; // special case, last doc_id has no offset information + &vals[start_pos..end_pos] +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/fastfield/readers.rs b/src/fastfield/readers.rs index 68f9a811f3..257c8345a0 100644 --- a/src/fastfield/readers.rs +++ b/src/fastfield/readers.rs @@ -1,7 +1,9 @@ +use std::net::Ipv6Addr; use std::sync::Arc; -use fastfield_codecs::{open, Column}; +use fastfield_codecs::{open, open_u128, Column}; +use super::multivalued::MultiValuedU128FastFieldReader; use crate::directory::{CompositeFile, FileSlice}; use crate::fastfield::{ BytesFastFieldReader, FastFieldNotAvailableError, FastValue, MultiValuedFastFieldReader, @@ -23,6 +25,7 @@ pub struct FastFieldReaders { pub(crate) enum FastType { I64, U64, + U128, F64, Bool, Date, @@ -49,6 +52,9 @@ pub(crate) fn type_and_cardinality(field_type: &FieldType) -> Option<(FastType, FieldType::Str(options) if options.is_fast() => { Some((FastType::U64, Cardinality::MultiValues)) } + FieldType::IpAddr(options) => options + .get_fastfield_cardinality() + .map(|cardinality| (FastType::U128, cardinality)), _ => None, } } @@ -143,6 +149,59 @@ impl FastFieldReaders { self.typed_fast_field_reader(field) } + /// Returns the `ip` fast field reader reader associated to `field`. + /// + /// If `field` is not a u128 fast field, this method returns an Error. + pub fn ip_addr(&self, field: Field) -> crate::Result>> { + self.check_type(field, FastType::U128, Cardinality::SingleValue)?; + let bytes = self.fast_field_data(field, 0)?.read_bytes()?; + Ok(open_u128::(bytes)?) + } + + /// Returns the `ip` fast field reader reader associated to `field`. + /// + /// If `field` is not a u128 fast field, this method returns an Error. + pub fn ip_addrs( + &self, + field: Field, + ) -> crate::Result> { + self.check_type(field, FastType::U128, Cardinality::MultiValues)?; + let idx_reader: Arc> = self.typed_fast_field_reader(field)?; + + let bytes = self.fast_field_data(field, 1)?.read_bytes()?; + let vals_reader = open_u128::(bytes)?; + + Ok(MultiValuedU128FastFieldReader::open( + idx_reader, + vals_reader, + )) + } + + /// Returns the `u128` fast field reader reader associated to `field`. + /// + /// If `field` is not a u128 fast field, this method returns an Error. + pub(crate) fn u128(&self, field: Field) -> crate::Result>> { + self.check_type(field, FastType::U128, Cardinality::SingleValue)?; + let bytes = self.fast_field_data(field, 0)?.read_bytes()?; + Ok(open_u128::(bytes)?) + } + + /// Returns the `u128` multi-valued fast field reader reader associated to `field`. + /// + /// If `field` is not a u128 multi-valued fast field, this method returns an Error. + pub fn u128s(&self, field: Field) -> crate::Result> { + self.check_type(field, FastType::U128, Cardinality::MultiValues)?; + let idx_reader: Arc> = self.typed_fast_field_reader(field)?; + + let bytes = self.fast_field_data(field, 1)?.read_bytes()?; + let vals_reader = open_u128::(bytes)?; + + Ok(MultiValuedU128FastFieldReader::open( + idx_reader, + vals_reader, + )) + } + /// Returns the `u64` fast field reader reader associated with `field`, regardless of whether /// the given field is effectively of type `u64` or not. /// diff --git a/src/fastfield/serializer/mod.rs b/src/fastfield/serializer/mod.rs index 6ca2929317..e0fb6e64b6 100644 --- a/src/fastfield/serializer/mod.rs +++ b/src/fastfield/serializer/mod.rs @@ -84,6 +84,21 @@ impl CompositeFastFieldSerializer { Ok(()) } + /// Serialize data into a new u128 fast field. The codec will be compact space compressor, + /// which is optimized for scanning the fast field for a given range. + pub fn create_u128_fast_field_with_idx I, I: Iterator>( + &mut self, + field: Field, + iter_gen: F, + num_vals: u64, + idx: usize, + ) -> io::Result<()> { + let field_write = self.composite_write.for_field_with_idx(field, idx); + fastfield_codecs::serialize_u128(iter_gen, num_vals, field_write)?; + + Ok(()) + } + /// Start serializing a new [u8] fast field. Use the returned writer to write data into the /// bytes field. To associate the bytes with documents a seperate index must be created on /// index 0. See bytes/writer.rs::serialize for an example. diff --git a/src/fastfield/writer.rs b/src/fastfield/writer.rs index 5d1a0810e4..4e1fe50bcb 100644 --- a/src/fastfield/writer.rs +++ b/src/fastfield/writer.rs @@ -2,11 +2,11 @@ use std::collections::HashMap; use std::io; use common; -use fastfield_codecs::{Column, MonotonicallyMappableToU64}; +use fastfield_codecs::{Column, MonotonicallyMappableToU128, MonotonicallyMappableToU64}; use fnv::FnvHashMap; use tantivy_bitpacker::BlockedBitpacker; -use super::multivalued::MultiValuedFastFieldWriter; +use super::multivalued::{MultiValueU128FastFieldWriter, MultiValuedFastFieldWriter}; use super::FastFieldType; use crate::fastfield::{BytesFastFieldWriter, CompositeFastFieldSerializer}; use crate::indexer::doc_id_mapping::DocIdMapping; @@ -19,6 +19,8 @@ use crate::DatePrecision; pub struct FastFieldsWriter { term_id_writers: Vec, single_value_writers: Vec, + u128_value_writers: Vec, + u128_multi_value_writers: Vec, multi_values_writers: Vec, bytes_value_writers: Vec, } @@ -34,6 +36,8 @@ fn fast_field_default_value(field_entry: &FieldEntry) -> u64 { impl FastFieldsWriter { /// Create all `FastFieldWriter` required by the schema. pub fn from_schema(schema: &Schema) -> FastFieldsWriter { + let mut u128_value_writers = Vec::new(); + let mut u128_multi_value_writers = Vec::new(); let mut single_value_writers = Vec::new(); let mut term_id_writers = Vec::new(); let mut multi_values_writers = Vec::new(); @@ -97,10 +101,27 @@ impl FastFieldsWriter { bytes_value_writers.push(fast_field_writer); } } + FieldType::IpAddr(opt) => { + if opt.is_fast() { + match opt.get_fastfield_cardinality() { + Some(Cardinality::SingleValue) => { + let fast_field_writer = U128FastFieldWriter::new(field); + u128_value_writers.push(fast_field_writer); + } + Some(Cardinality::MultiValues) => { + let fast_field_writer = MultiValueU128FastFieldWriter::new(field); + u128_multi_value_writers.push(fast_field_writer); + } + None => {} + } + } + } FieldType::Str(_) | FieldType::JsonObject(_) => {} } } FastFieldsWriter { + u128_value_writers, + u128_multi_value_writers, term_id_writers, single_value_writers, multi_values_writers, @@ -129,6 +150,16 @@ impl FastFieldsWriter { .iter() .map(|w| w.mem_usage()) .sum::() + + self + .u128_value_writers + .iter() + .map(|w| w.mem_usage()) + .sum::() + + self + .u128_multi_value_writers + .iter() + .map(|w| w.mem_usage()) + .sum::() } /// Get the `FastFieldWriter` associated with a field. @@ -190,7 +221,6 @@ impl FastFieldsWriter { .iter_mut() .find(|field_writer| field_writer.field() == field) } - /// Indexes all of the fastfields of a new document. pub fn add_document(&mut self, doc: &Document) { for field_writer in &mut self.term_id_writers { @@ -205,6 +235,12 @@ impl FastFieldsWriter { for field_writer in &mut self.bytes_value_writers { field_writer.add_document(doc); } + for field_writer in &mut self.u128_value_writers { + field_writer.add_document(doc); + } + for field_writer in &mut self.u128_multi_value_writers { + field_writer.add_document(doc); + } } /// Serializes all of the `FastFieldWriter`s by pushing them in @@ -230,6 +266,110 @@ impl FastFieldsWriter { for field_writer in self.bytes_value_writers { field_writer.serialize(serializer, doc_id_map)?; } + for field_writer in self.u128_value_writers { + field_writer.serialize(serializer, doc_id_map)?; + } + for field_writer in self.u128_multi_value_writers { + field_writer.serialize(serializer, doc_id_map)?; + } + + Ok(()) + } +} + +/// Fast field writer for u128 values. +/// The fast field writer just keeps the values in memory. +/// +/// Only when the segment writer can be closed and +/// persisted on disk, the fast field writer is +/// sent to a `FastFieldSerializer` via the `.serialize(...)` +/// method. +/// +/// We cannot serialize earlier as the values are +/// compressed to a compact number space and the number of +/// bits required for bitpacking can only been known once +/// we have seen all of the values. +pub struct U128FastFieldWriter { + field: Field, + vals: Vec, + val_count: u32, +} + +impl U128FastFieldWriter { + /// Creates a new `IntFastFieldWriter` + pub fn new(field: Field) -> Self { + Self { + field, + vals: vec![], + val_count: 0, + } + } + + /// The memory used (inclusive childs) + pub fn mem_usage(&self) -> usize { + self.vals.len() * 16 + } + + /// Records a new value. + /// + /// The n-th value being recorded is implicitely + /// associated to the document with the `DocId` n. + /// (Well, `n-1` actually because of 0-indexing) + pub fn add_val(&mut self, val: u128) { + self.vals.push(val); + } + + /// Extract the fast field value from the document + /// (or use the default value) and records it. + /// + /// Extract the value associated to the fast field for + /// this document. + pub fn add_document(&mut self, doc: &Document) { + match doc.get_first(self.field) { + Some(v) => { + let ip_addr = v + .as_ip_addr() + .unwrap_or_else(|| panic!("expected and ip, but got {:?}", v)); + + let value = ip_addr.to_u128(); + self.add_val(value); + } + None => { + self.add_val(0); // TODO fix null handling + } + }; + self.val_count += 1; + } + + /// Push the fast fields value to the `FastFieldWriter`. + pub fn serialize( + &self, + serializer: &mut CompositeFastFieldSerializer, + doc_id_map: Option<&DocIdMapping>, + ) -> io::Result<()> { + if let Some(doc_id_map) = doc_id_map { + let iter_gen = || { + doc_id_map + .iter_old_doc_ids() + .map(|idx| self.vals[idx as usize]) + }; + + serializer.create_u128_fast_field_with_idx( + self.field, + iter_gen, + self.val_count as u64, + 0, + )?; + } else { + let iter_gen = || self.vals.iter().cloned(); + serializer.create_u128_fast_field_with_idx( + self.field, + iter_gen, + self.val_count as u64, + 0, + )?; + } + Ok(()) } } @@ -238,7 +378,7 @@ impl FastFieldsWriter { /// The fast field writer just keeps the values in memory. /// /// Only when the segment writer can be closed and -/// persisted on disc, the fast field writer is +/// persisted on disk, the fast field writer is /// sent to a `FastFieldSerializer` via the `.serialize(...)` /// method. /// diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index 3caa0f4aa3..e7cd65574f 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -803,7 +803,9 @@ impl Drop for IndexWriter { #[cfg(test)] mod tests { use std::collections::{HashMap, HashSet}; + use std::net::Ipv6Addr; + use fastfield_codecs::MonotonicallyMappableToU128; use proptest::prelude::*; use proptest::prop_oneof; use proptest::strategy::Strategy; @@ -815,7 +817,7 @@ mod tests { use crate::indexer::NoMergePolicy; use crate::query::{BooleanQuery, Occur, Query, QueryParser, TermQuery}; use crate::schema::{ - self, Cardinality, Facet, FacetOptions, IndexRecordOption, NumericOptions, + self, Cardinality, Facet, FacetOptions, IndexRecordOption, IpAddrOptions, NumericOptions, TextFieldIndexing, TextOptions, FAST, INDEXED, STORED, STRING, TEXT, }; use crate::store::DOCSTORE_CACHE_CAPACITY; @@ -1593,6 +1595,11 @@ mod tests { force_end_merge: bool, ) -> crate::Result<()> { let mut schema_builder = schema::Schema::builder(); + let ip_field = schema_builder.add_ip_addr_field("ip", FAST | INDEXED | STORED); + let ips_field = schema_builder.add_ip_addr_field( + "ips", + IpAddrOptions::default().set_fast(Cardinality::MultiValues), + ); let id_field = schema_builder.add_u64_field("id", FAST | INDEXED | STORED); let bytes_field = schema_builder.add_bytes_field("bytes", FAST | INDEXED | STORED); let bool_field = schema_builder.add_bool_field("bool", FAST | INDEXED | STORED); @@ -1648,17 +1655,37 @@ mod tests { match op { IndexingOp::AddDoc { id } => { let facet = Facet::from(&("/cola/".to_string() + &id.to_string())); - index_writer.add_document(doc!(id_field=>id, - bytes_field => id.to_le_bytes().as_slice(), - multi_numbers=> id, - multi_numbers => id, - bool_field => (id % 2u64) != 0, - multi_bools => (id % 2u64) != 0, - multi_bools => (id % 2u64) == 0, - text_field => id.to_string(), - facet_field => facet, - large_text_field=> LOREM - ))?; + let ip_from_id = Ipv6Addr::from_u128(id as u128); + + if id % 3 == 0 { + // every 3rd doc has no ip field + index_writer.add_document(doc!(id_field=>id, + bytes_field => id.to_le_bytes().as_slice(), + multi_numbers=> id, + multi_numbers => id, + bool_field => (id % 2u64) != 0, + multi_bools => (id % 2u64) != 0, + multi_bools => (id % 2u64) == 0, + text_field => id.to_string(), + facet_field => facet, + large_text_field=> LOREM + ))?; + } else { + index_writer.add_document(doc!(id_field=>id, + bytes_field => id.to_le_bytes().as_slice(), + ip_field => ip_from_id, + ips_field => ip_from_id, + ips_field => ip_from_id, + multi_numbers=> id, + multi_numbers => id, + bool_field => (id % 2u64) != 0, + multi_bools => (id % 2u64) != 0, + multi_bools => (id % 2u64) == 0, + text_field => id.to_string(), + facet_field => facet, + large_text_field=> LOREM + ))?; + } } IndexingOp::DeleteDoc { id } => { index_writer.delete_term(Term::from_field_u64(id_field, id)); @@ -1744,6 +1771,60 @@ mod tests { .collect::>() ); + // Load all ips addr + let ips: HashSet = searcher + .segment_readers() + .iter() + .flat_map(|segment_reader| { + let ff_reader = segment_reader.fast_fields().ip_addr(ip_field).unwrap(); + segment_reader.doc_ids_alive().flat_map(move |doc| { + let val = ff_reader.get_val(doc as u64); + if val == Ipv6Addr::from_u128(0) { + // TODO Fix null handling + None + } else { + Some(val) + } + }) + }) + .collect(); + + let expected_ips = expected_ids_and_num_occurrences + .keys() + .flat_map(|id| { + if id % 3 == 0 { + None + } else { + Some(Ipv6Addr::from_u128(*id as u128)) + } + }) + .collect::>(); + assert_eq!(ips, expected_ips); + + let expected_ips = expected_ids_and_num_occurrences + .keys() + .filter_map(|id| { + if id % 3 == 0 { + None + } else { + Some(Ipv6Addr::from_u128(*id as u128)) + } + }) + .collect::>(); + let ips: HashSet = searcher + .segment_readers() + .iter() + .flat_map(|segment_reader| { + let ff_reader = segment_reader.fast_fields().ip_addrs(ips_field).unwrap(); + segment_reader.doc_ids_alive().flat_map(move |doc| { + let mut vals = vec![]; + ff_reader.get_vals(doc, &mut vals); + vals.into_iter().filter(|val| val.to_u128() != 0) // TODO Fix null handling + }) + }) + .collect(); + assert_eq!(ips, expected_ips); + // multivalue fast field tests for segment_reader in searcher.segment_readers().iter() { let id_reader = segment_reader.fast_fields().u64(id_field).unwrap(); @@ -1847,6 +1928,36 @@ mod tests { Ok(()) } + #[test] + fn test_minimal() { + assert!(test_operation_strategy( + &[ + IndexingOp::AddDoc { id: 23 }, + IndexingOp::AddDoc { id: 13 }, + IndexingOp::DeleteDoc { id: 13 } + ], + true, + false + ) + .is_ok()); + + assert!(test_operation_strategy( + &[ + IndexingOp::AddDoc { id: 23 }, + IndexingOp::AddDoc { id: 13 }, + IndexingOp::DeleteDoc { id: 13 } + ], + false, + false + ) + .is_ok()); + } + + #[test] + fn test_minimal_sort_merge() { + assert!(test_operation_strategy(&[IndexingOp::AddDoc { id: 3 },], true, true).is_ok()); + } + proptest! { #![proptest_config(ProptestConfig::with_cases(20))] #[test] diff --git a/src/indexer/merger.rs b/src/indexer/merger.rs index 0ed47a9156..b1963e6749 100644 --- a/src/indexer/merger.rs +++ b/src/indexer/merger.rs @@ -6,13 +6,14 @@ use fastfield_codecs::VecColumn; use itertools::Itertools; use measure_time::debug_time; +use super::flat_map_with_buffer::FlatMapWithBufferIter; use super::sorted_doc_id_multivalue_column::RemappedDocIdMultiValueIndexColumn; use crate::core::{Segment, SegmentReader}; use crate::docset::{DocSet, TERMINATED}; use crate::error::DataCorruption; use crate::fastfield::{ get_fastfield_codecs_for_multivalue, AliveBitSet, Column, CompositeFastFieldSerializer, - MultiValueLength, MultiValuedFastFieldReader, + MultiValueLength, MultiValuedFastFieldReader, MultiValuedU128FastFieldReader, }; use crate::fieldnorm::{FieldNormReader, FieldNormReaders, FieldNormsSerializer, FieldNormsWriter}; use crate::indexer::doc_id_mapping::{expect_field_id_for_sort_field, SegmentDocIdMapping}; @@ -295,6 +296,24 @@ impl IndexMerger { self.write_bytes_fast_field(field, fast_field_serializer, doc_id_mapping)?; } } + FieldType::IpAddr(options) => match options.get_fastfield_cardinality() { + Some(Cardinality::SingleValue) => { + self.write_u128_single_fast_field( + field, + fast_field_serializer, + doc_id_mapping, + )?; + } + Some(Cardinality::MultiValues) => { + self.write_u128_multi_fast_field( + field, + fast_field_serializer, + doc_id_mapping, + )?; + } + None => {} + }, + FieldType::JsonObject(_) | FieldType::Facet(_) | FieldType::Str(_) => { // We don't handle json fast field for the moment // They can be implemented using what is done @@ -305,6 +324,91 @@ impl IndexMerger { Ok(()) } + // used to merge `u128` single fast fields. + fn write_u128_multi_fast_field( + &self, + field: Field, + fast_field_serializer: &mut CompositeFastFieldSerializer, + doc_id_mapping: &SegmentDocIdMapping, + ) -> crate::Result<()> { + let segment_and_ff_readers: Vec<(&SegmentReader, MultiValuedU128FastFieldReader)> = + self.readers + .iter() + .map(|segment_reader| { + let ff_reader: MultiValuedU128FastFieldReader = + segment_reader.fast_fields().u128s(field).expect( + "Failed to find index for multivalued field. This is a bug in \ + tantivy, please report.", + ); + (segment_reader, ff_reader) + }) + .collect::>(); + + Self::write_1_n_fast_field_idx_generic( + field, + fast_field_serializer, + doc_id_mapping, + &segment_and_ff_readers, + )?; + + let fast_field_readers = segment_and_ff_readers + .into_iter() + .map(|(_, ff_reader)| ff_reader) + .collect::>(); + + let iter_gen = || { + doc_id_mapping + .iter_old_doc_addrs() + .flat_map_with_buffer(|doc_addr, buffer| { + let fast_field_reader = &fast_field_readers[doc_addr.segment_ord as usize]; + fast_field_reader.get_vals(doc_addr.doc_id, buffer); + }) + }; + + fast_field_serializer.create_u128_fast_field_with_idx( + field, + iter_gen, + doc_id_mapping.len() as u64, + 1, + )?; + + Ok(()) + } + + // used to merge `u128` single fast fields. + fn write_u128_single_fast_field( + &self, + field: Field, + fast_field_serializer: &mut CompositeFastFieldSerializer, + doc_id_mapping: &SegmentDocIdMapping, + ) -> crate::Result<()> { + let fast_field_readers = self + .readers + .iter() + .map(|reader| { + let u128_reader: Arc> = reader.fast_fields().u128(field).expect( + "Failed to find a reader for single fast field. This is a tantivy bug and it \ + should never happen.", + ); + u128_reader + }) + .collect::>(); + + let iter_gen = || { + doc_id_mapping.iter_old_doc_addrs().map(|doc_addr| { + let fast_field_reader = &fast_field_readers[doc_addr.segment_ord as usize]; + fast_field_reader.get_val(doc_addr.doc_id as u64) + }) + }; + fast_field_serializer.create_u128_fast_field_with_idx( + field, + iter_gen, + doc_id_mapping.len() as u64, + 0, + )?; + Ok(()) + } + // used both to merge field norms, `u64/i64` single fast fields. fn write_single_fast_field( &self, diff --git a/src/indexer/segment_writer.rs b/src/indexer/segment_writer.rs index 415378752c..3e33933923 100644 --- a/src/indexer/segment_writer.rs +++ b/src/indexer/segment_writer.rs @@ -294,6 +294,7 @@ impl SegmentWriter { ctx, )?; } + FieldType::IpAddr(_) => {} } } Ok(()) diff --git a/src/postings/per_field_postings_writer.rs b/src/postings/per_field_postings_writer.rs index 61d02752f7..a414870710 100644 --- a/src/postings/per_field_postings_writer.rs +++ b/src/postings/per_field_postings_writer.rs @@ -50,6 +50,7 @@ fn posting_writer_from_field_entry(field_entry: &FieldEntry) -> Box Box::new(SpecializedPostingsWriter::::default()), FieldType::JsonObject(ref json_object_options) => { if let Some(text_indexing_option) = json_object_options.get_text_indexing_options() { diff --git a/src/postings/postings_writer.rs b/src/postings/postings_writer.rs index 84c95739e6..552964a2d5 100644 --- a/src/postings/postings_writer.rs +++ b/src/postings/postings_writer.rs @@ -89,6 +89,7 @@ pub(crate) fn serialize_postings( | FieldType::Bool(_) => {} FieldType::Bytes(_) => {} FieldType::JsonObject(_) => {} + FieldType::IpAddr(_) => {} } let postings_writer = per_field_postings_writers.get_for_field(field); diff --git a/src/query/query_parser/query_parser.rs b/src/query/query_parser/query_parser.rs index f9e032f7a0..d14a09f218 100644 --- a/src/query/query_parser/query_parser.rs +++ b/src/query/query_parser/query_parser.rs @@ -400,6 +400,9 @@ impl QueryParser { let bytes = base64::decode(phrase).map_err(QueryParserError::ExpectedBase64)?; Ok(Term::from_field_bytes(field, &bytes)) } + FieldType::IpAddr(_) => Err(QueryParserError::UnsupportedQuery( + "Range query are not supported on IpAddr field.".to_string(), + )), } } @@ -506,6 +509,7 @@ impl QueryParser { let bytes_term = Term::from_field_bytes(field, &bytes); Ok(vec![LogicalLiteral::Term(bytes_term)]) } + FieldType::IpAddr(_) => Err(QueryParserError::FieldNotIndexed(field_name.to_string())), } } diff --git a/src/schema/document.rs b/src/schema/document.rs index 3bde526b1a..253c38081d 100644 --- a/src/schema/document.rs +++ b/src/schema/document.rs @@ -1,6 +1,7 @@ use std::collections::{HashMap, HashSet}; use std::io::{self, Read, Write}; use std::mem; +use std::net::Ipv6Addr; use common::{BinarySerializable, VInt}; @@ -97,6 +98,11 @@ impl Document { self.add_field_value(field, value); } + /// Add a IP address field. Internally only Ipv6Addr is used. + pub fn add_ip_addr(&mut self, field: Field, value: Ipv6Addr) { + self.add_field_value(field, value); + } + /// Add a i64 field pub fn add_i64(&mut self, field: Field, value: i64) { self.add_field_value(field, value); diff --git a/src/schema/field_entry.rs b/src/schema/field_entry.rs index 997fbd2564..9c66663af7 100644 --- a/src/schema/field_entry.rs +++ b/src/schema/field_entry.rs @@ -1,5 +1,6 @@ use serde::{Deserialize, Serialize}; +use super::ip_options::IpAddrOptions; use crate::schema::bytes_options::BytesOptions; use crate::schema::{ is_valid_field_name, DateOptions, FacetOptions, FieldType, JsonObjectOptions, NumericOptions, @@ -60,6 +61,11 @@ impl FieldEntry { Self::new(field_name, FieldType::Date(date_options)) } + /// Creates a new ip address field entry. + pub fn new_ip_addr(field_name: String, ip_options: IpAddrOptions) -> FieldEntry { + Self::new(field_name, FieldType::IpAddr(ip_options)) + } + /// Creates a field entry for a facet. pub fn new_facet(field_name: String, facet_options: FacetOptions) -> FieldEntry { Self::new(field_name, FieldType::Facet(facet_options)) @@ -114,6 +120,7 @@ impl FieldEntry { FieldType::Facet(ref options) => options.is_stored(), FieldType::Bytes(ref options) => options.is_stored(), FieldType::JsonObject(ref options) => options.is_stored(), + FieldType::IpAddr(ref options) => options.is_stored(), } } } diff --git a/src/schema/field_type.rs b/src/schema/field_type.rs index 3a631697e8..557d2ec4d4 100644 --- a/src/schema/field_type.rs +++ b/src/schema/field_type.rs @@ -1,7 +1,11 @@ +use std::net::{IpAddr, Ipv6Addr}; +use std::str::FromStr; + use serde::{Deserialize, Serialize}; use serde_json::Value as JsonValue; use thiserror::Error; +use super::ip_options::IpAddrOptions; use super::Cardinality; use crate::schema::bytes_options::BytesOptions; use crate::schema::facet_options::FacetOptions; @@ -62,9 +66,11 @@ pub enum Type { Bytes = b'b', /// Leaf in a Json object. Json = b'j', + /// IpAddr + IpAddr = b'p', } -const ALL_TYPES: [Type; 9] = [ +const ALL_TYPES: [Type; 10] = [ Type::Str, Type::U64, Type::I64, @@ -74,6 +80,7 @@ const ALL_TYPES: [Type; 9] = [ Type::Facet, Type::Bytes, Type::Json, + Type::IpAddr, ]; impl Type { @@ -100,6 +107,7 @@ impl Type { Type::Facet => "Facet", Type::Bytes => "Bytes", Type::Json => "Json", + Type::IpAddr => "IpAddr", } } @@ -116,6 +124,7 @@ impl Type { b'h' => Some(Type::Facet), b'b' => Some(Type::Bytes), b'j' => Some(Type::Json), + b'p' => Some(Type::IpAddr), _ => None, } } @@ -146,6 +155,8 @@ pub enum FieldType { Bytes(BytesOptions), /// Json object JsonObject(JsonObjectOptions), + /// IpAddr field + IpAddr(IpAddrOptions), } impl FieldType { @@ -161,6 +172,7 @@ impl FieldType { FieldType::Facet(_) => Type::Facet, FieldType::Bytes(_) => Type::Bytes, FieldType::JsonObject(_) => Type::Json, + FieldType::IpAddr(_) => Type::IpAddr, } } @@ -176,6 +188,7 @@ impl FieldType { FieldType::Facet(ref _facet_options) => true, FieldType::Bytes(ref bytes_options) => bytes_options.is_indexed(), FieldType::JsonObject(ref json_object_options) => json_object_options.is_indexed(), + FieldType::IpAddr(_) => false, } } @@ -210,6 +223,7 @@ impl FieldType { | FieldType::F64(ref int_options) | FieldType::Bool(ref int_options) => int_options.is_fast(), FieldType::Date(ref date_options) => date_options.is_fast(), + FieldType::IpAddr(ref ip_addr_options) => ip_addr_options.is_fast(), FieldType::Facet(_) => true, FieldType::JsonObject(_) => false, } @@ -250,6 +264,7 @@ impl FieldType { FieldType::Facet(_) => false, FieldType::Bytes(ref bytes_options) => bytes_options.fieldnorms(), FieldType::JsonObject(ref _json_object_options) => false, + FieldType::IpAddr(_) => false, } } @@ -294,6 +309,7 @@ impl FieldType { FieldType::JsonObject(ref json_obj_options) => json_obj_options .get_text_indexing_options() .map(TextFieldIndexing::index_option), + FieldType::IpAddr(_) => None, } } @@ -333,6 +349,19 @@ impl FieldType { expected: "a json object", json: JsonValue::String(field_text), }), + FieldType::IpAddr(_) => { + let ip_addr: IpAddr = IpAddr::from_str(&field_text).map_err(|err| { + ValueParsingError::ParseError { + error: err.to_string(), + json: JsonValue::String(field_text), + } + })?; + let ip_addr_v6: Ipv6Addr = match ip_addr { + IpAddr::V4(v4) => v4.to_ipv6_mapped(), + IpAddr::V6(v6) => v6, + }; + Ok(Value::IpAddr(ip_addr_v6)) + } } } JsonValue::Number(field_val_num) => match self { @@ -380,6 +409,10 @@ impl FieldType { expected: "a json object", json: JsonValue::Number(field_val_num), }), + FieldType::IpAddr(_) => Err(ValueParsingError::TypeError { + expected: "a string with an ip addr", + json: JsonValue::Number(field_val_num), + }), }, JsonValue::Object(json_map) => match self { FieldType::Str(_) => { diff --git a/src/schema/ip_options.rs b/src/schema/ip_options.rs index 195d469167..ce998f43fe 100644 --- a/src/schema/ip_options.rs +++ b/src/schema/ip_options.rs @@ -7,13 +7,13 @@ use super::Cardinality; /// Define how an ip field should be handled by tantivy. #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Default)] -pub struct IpOptions { +pub struct IpAddrOptions { #[serde(skip_serializing_if = "Option::is_none")] fast: Option, stored: bool, } -impl IpOptions { +impl IpAddrOptions { /// Returns true iff the value is a fast field. pub fn is_fast(&self) -> bool { self.fast.is_some() @@ -52,52 +52,52 @@ impl IpOptions { } } -impl From<()> for IpOptions { - fn from(_: ()) -> IpOptions { - IpOptions::default() +impl From<()> for IpAddrOptions { + fn from(_: ()) -> IpAddrOptions { + IpAddrOptions::default() } } -impl From for IpOptions { +impl From for IpAddrOptions { fn from(_: FastFlag) -> Self { - IpOptions { + IpAddrOptions { stored: false, fast: Some(Cardinality::SingleValue), } } } -impl From for IpOptions { +impl From for IpAddrOptions { fn from(_: StoredFlag) -> Self { - IpOptions { + IpAddrOptions { stored: true, fast: None, } } } -impl From for IpOptions { +impl From for IpAddrOptions { fn from(_: IndexedFlag) -> Self { - IpOptions { + IpAddrOptions { stored: false, fast: None, } } } -impl> BitOr for IpOptions { - type Output = IpOptions; +impl> BitOr for IpAddrOptions { + type Output = IpAddrOptions; - fn bitor(self, other: T) -> IpOptions { + fn bitor(self, other: T) -> IpAddrOptions { let other = other.into(); - IpOptions { + IpAddrOptions { stored: self.stored | other.stored, fast: self.fast.or(other.fast), } } } -impl From> for IpOptions +impl From> for IpAddrOptions where Head: Clone, Tail: Clone, diff --git a/src/schema/mod.rs b/src/schema/mod.rs index 4d966a8b9b..c64eef788a 100644 --- a/src/schema/mod.rs +++ b/src/schema/mod.rs @@ -138,7 +138,7 @@ pub use self::field_type::{FieldType, Type}; pub use self::field_value::FieldValue; pub use self::flags::{FAST, INDEXED, STORED}; pub use self::index_record_option::IndexRecordOption; -pub use self::ip_options::IpOptions; +pub use self::ip_options::IpAddrOptions; pub use self::json_object_options::JsonObjectOptions; pub use self::named_field_document::NamedFieldDocument; pub use self::numeric_options::NumericOptions; diff --git a/src/schema/schema.rs b/src/schema/schema.rs index 783ce11fec..3b7fd22d0c 100644 --- a/src/schema/schema.rs +++ b/src/schema/schema.rs @@ -7,6 +7,7 @@ use serde::ser::SerializeSeq; use serde::{Deserialize, Deserializer, Serialize, Serializer}; use serde_json::{self, Value as JsonValue}; +use super::ip_options::IpAddrOptions; use super::*; use crate::schema::bytes_options::BytesOptions; use crate::schema::field_type::ValueParsingError; @@ -144,6 +145,26 @@ impl SchemaBuilder { self.add_field(field_entry) } + /// Adds a ip field. + /// Returns the associated field handle. + /// + /// # Caution + /// + /// Appending two fields with the same name + /// will result in the shadowing of the first + /// by the second one. + /// The first field will get a field id + /// but only the second one will be indexed + pub fn add_ip_addr_field>( + &mut self, + field_name_str: &str, + field_options: T, + ) -> Field { + let field_name = String::from(field_name_str); + let field_entry = FieldEntry::new_ip_addr(field_name, field_options.into()); + self.add_field(field_entry) + } + /// Adds a new text field. /// Returns the associated field handle /// @@ -598,12 +619,14 @@ mod tests { schema_builder.add_text_field("title", TEXT); schema_builder.add_text_field("author", STRING); schema_builder.add_u64_field("count", count_options); + schema_builder.add_ip_addr_field("ip", FAST | STORED); schema_builder.add_bool_field("is_read", is_read_options); let schema = schema_builder.build(); let doc_json = r#"{ "title": "my title", "author": "fulmicoton", "count": 4, + "ip": "127.0.0.1", "is_read": true }"#; let doc = schema.parse_document(doc_json).unwrap(); @@ -612,6 +635,39 @@ mod tests { assert_eq!(doc, doc_serdeser); } + #[test] + pub fn test_document_to_ipv4_json() { + let mut schema_builder = Schema::builder(); + schema_builder.add_ip_addr_field("ip", FAST | STORED); + let schema = schema_builder.build(); + + // IpV4 loopback + let doc_json = r#"{ + "ip": "127.0.0.1" + }"#; + let doc = schema.parse_document(doc_json).unwrap(); + let value: serde_json::Value = serde_json::from_str(&schema.to_json(&doc)).unwrap(); + assert_eq!(value["ip"][0], "127.0.0.1"); + + // Special case IpV6 loopback. We don't want to map that to IPv4 + let doc_json = r#"{ + "ip": "::1" + }"#; + let doc = schema.parse_document(doc_json).unwrap(); + + let value: serde_json::Value = serde_json::from_str(&schema.to_json(&doc)).unwrap(); + assert_eq!(value["ip"][0], "::1"); + + // testing ip address of every router in the world + let doc_json = r#"{ + "ip": "192.168.0.1" + }"#; + let doc = schema.parse_document(doc_json).unwrap(); + + let value: serde_json::Value = serde_json::from_str(&schema.to_json(&doc)).unwrap(); + assert_eq!(value["ip"][0], "192.168.0.1"); + } + #[test] pub fn test_document_from_nameddoc() { let mut schema_builder = Schema::builder(); diff --git a/src/schema/term.rs b/src/schema/term.rs index 99f3e5ed50..9bfa7614b9 100644 --- a/src/schema/term.rs +++ b/src/schema/term.rs @@ -415,6 +415,9 @@ fn debug_value_bytes(typ: Type, bytes: &[u8], f: &mut fmt::Formatter) -> fmt::Re debug_value_bytes(typ, bytes, f)?; } } + Type::IpAddr => { + write!(f, "")?; // TODO change once we actually have IP address terms. + } } Ok(()) } diff --git a/src/schema/value.rs b/src/schema/value.rs index 5caec3ca07..d3df1c46c5 100644 --- a/src/schema/value.rs +++ b/src/schema/value.rs @@ -1,4 +1,5 @@ use std::fmt; +use std::net::Ipv6Addr; use serde::de::Visitor; use serde::{Deserialize, Deserializer, Serialize, Serializer}; @@ -32,6 +33,8 @@ pub enum Value { Bytes(Vec), /// Json object value. JsonObject(serde_json::Map), + /// IpV6 Address. Internally there is no IpV4, it needs to be converted to `Ipv6Addr`. + IpAddr(Ipv6Addr), } impl Eq for Value {} @@ -50,6 +53,14 @@ impl Serialize for Value { Value::Facet(ref facet) => facet.serialize(serializer), Value::Bytes(ref bytes) => serializer.serialize_str(&base64::encode(bytes)), Value::JsonObject(ref obj) => obj.serialize(serializer), + Value::IpAddr(ref obj) => { + // Ensure IpV4 addresses get serialized as IpV4, but excluding IpV6 loopback. + if let Some(ip_v4) = obj.to_ipv4_mapped() { + ip_v4.serialize(serializer) + } else { + obj.serialize(serializer) + } + } } } } @@ -201,6 +212,16 @@ impl Value { None } } + + /// Returns the ip addr, provided the value is of the `Ip` type. + /// (Returns None if the value is not of the `Ip` type) + pub fn as_ip_addr(&self) -> Option { + if let Value::IpAddr(val) = self { + Some(*val) + } else { + None + } + } } impl From for Value { @@ -209,6 +230,12 @@ impl From for Value { } } +impl From for Value { + fn from(v: Ipv6Addr) -> Value { + Value::IpAddr(v) + } +} + impl From for Value { fn from(v: u64) -> Value { Value::U64(v) @@ -288,8 +315,10 @@ impl From for Value { mod binary_serialize { use std::io::{self, Read, Write}; + use std::net::Ipv6Addr; use common::{f64_to_u64, u64_to_f64, BinarySerializable}; + use fastfield_codecs::MonotonicallyMappableToU128; use super::Value; use crate::schema::Facet; @@ -306,6 +335,7 @@ mod binary_serialize { const EXT_CODE: u8 = 7; const JSON_OBJ_CODE: u8 = 8; const BOOL_CODE: u8 = 9; + const IP_CODE: u8 = 10; // extended types @@ -366,6 +396,10 @@ mod binary_serialize { serde_json::to_writer(writer, &map)?; Ok(()) } + Value::IpAddr(ref ip) => { + IP_CODE.serialize(writer)?; + ip.to_u128().serialize(writer) + } } } @@ -436,6 +470,11 @@ mod binary_serialize { let json_map = as serde::Deserialize>::deserialize(&mut de)?; Ok(Value::JsonObject(json_map)) } + IP_CODE => { + let value = u128::deserialize(reader)?; + Ok(Value::IpAddr(Ipv6Addr::from_u128(value))) + } + _ => Err(io::Error::new( io::ErrorKind::InvalidData, format!("No field type is associated with code {:?}", type_code),