diff --git a/src/aggregation/agg_req.rs b/src/aggregation/agg_req.rs index 0ceb6cb1c4..a8f8f059cf 100644 --- a/src/aggregation/agg_req.rs +++ b/src/aggregation/agg_req.rs @@ -48,8 +48,10 @@ use std::collections::{HashMap, HashSet}; use serde::{Deserialize, Serialize}; +use super::bucket::HistogramAggregation; pub use super::bucket::RangeAggregation; use super::metric::{AverageAggregation, StatsAggregation}; +use super::VecWithNames; /// The top-level aggregation request structure, which contains [Aggregation] and their user defined /// names. It is also used in [buckets](BucketAggregation) to define sub-aggregations. @@ -57,6 +59,55 @@ use super::metric::{AverageAggregation, StatsAggregation}; /// The key is the user defined name of the aggregation. pub type Aggregations = HashMap; +/// Like Aggregations, but optimized to work with the aggregation result +#[derive(Clone, Debug)] +pub(crate) struct AggregationsInternal { + pub(crate) metrics: VecWithNames, + pub(crate) buckets: VecWithNames, +} + +impl From for AggregationsInternal { + fn from(aggs: Aggregations) -> Self { + let mut metrics = vec![]; + let mut buckets = vec![]; + for (key, agg) in aggs { + match agg { + Aggregation::Bucket(bucket) => buckets.push(( + key, + BucketAggregationInternal { + bucket_agg: bucket.bucket_agg, + sub_aggregation: bucket.sub_aggregation.into(), + }, + )), + Aggregation::Metric(metric) => metrics.push((key, metric)), + } + } + Self { + metrics: VecWithNames::from_entries(metrics), + buckets: VecWithNames::from_entries(buckets), + } + } +} + +#[derive(Clone, Debug)] +// Like BucketAggregation, but optimized to work with the result +pub(crate) struct BucketAggregationInternal { + /// Bucket aggregation strategy to group documents. + pub bucket_agg: BucketAggregationType, + /// The sub_aggregations in the buckets. Each bucket will aggregate on the document set in the + /// bucket. + pub sub_aggregation: AggregationsInternal, +} + +impl BucketAggregationInternal { + pub(crate) fn as_histogram(&self) -> &HistogramAggregation { + match &self.bucket_agg { + BucketAggregationType::Range(_) => panic!("unexpected aggregation"), + BucketAggregationType::Histogram(histogram) => histogram, + } + } +} + /// Extract all fast field names used in the tree. pub fn get_fast_field_names(aggs: &Aggregations) -> HashSet { let mut fast_field_names = Default::default(); @@ -123,12 +174,18 @@ pub enum BucketAggregationType { /// Put data into buckets of user-defined ranges. #[serde(rename = "range")] Range(RangeAggregation), + /// Put data into buckets of user-defined ranges. + #[serde(rename = "histogram")] + Histogram(HistogramAggregation), } impl BucketAggregationType { fn get_fast_field_names(&self, fast_field_names: &mut HashSet) { match self { BucketAggregationType::Range(range) => fast_field_names.insert(range.field.to_string()), + BucketAggregationType::Histogram(histogram) => { + fast_field_names.insert(histogram.field.to_string()) + } }; } } diff --git a/src/aggregation/agg_req_with_accessor.rs b/src/aggregation/agg_req_with_accessor.rs index c84f11cefd..bf87e51009 100644 --- a/src/aggregation/agg_req_with_accessor.rs +++ b/src/aggregation/agg_req_with_accessor.rs @@ -1,7 +1,7 @@ //! This will enhance the request tree with access to the fastfield and metadata. use super::agg_req::{Aggregation, Aggregations, BucketAggregationType, MetricAggregation}; -use super::bucket::RangeAggregation; +use super::bucket::{HistogramAggregation, RangeAggregation}; use super::metric::{AverageAggregation, StatsAggregation}; use super::VecWithNames; use crate::fastfield::{type_and_cardinality, DynamicFastFieldReader, FastType}; @@ -48,6 +48,9 @@ impl BucketAggregationWithAccessor { field: field_name, ranges: _, }) => get_ff_reader_and_validate(reader, field_name)?, + BucketAggregationType::Histogram(HistogramAggregation { + field: field_name, .. + }) => get_ff_reader_and_validate(reader, field_name)?, }; let sub_aggregation = sub_aggregation.clone(); Ok(BucketAggregationWithAccessor { diff --git a/src/aggregation/agg_result.rs b/src/aggregation/agg_result.rs index 3d0ed20b29..6132ba7cb1 100644 --- a/src/aggregation/agg_result.rs +++ b/src/aggregation/agg_result.rs @@ -10,25 +10,80 @@ use std::collections::HashMap; use itertools::Itertools; use serde::{Deserialize, Serialize}; +use super::agg_req::{Aggregations, AggregationsInternal, BucketAggregationInternal}; +use super::bucket::intermediate_buckets_to_final_buckets; use super::intermediate_agg_result::{ - IntermediateAggregationResult, IntermediateAggregationResults, IntermediateBucketResult, + IntermediateAggregationResults, IntermediateBucketResult, IntermediateHistogramBucketEntry, IntermediateMetricResult, IntermediateRangeBucketEntry, }; use super::metric::{SingleMetricResult, Stats}; use super::Key; -#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +#[derive(Clone, Default, Debug, PartialEq, Serialize, Deserialize)] /// The final aggegation result. pub struct AggregationResults(pub HashMap); -impl From for AggregationResults { - fn from(tree: IntermediateAggregationResults) -> Self { - Self( - tree.0 - .into_iter() - .map(|(key, agg)| (key, agg.into())) - .collect(), - ) +impl AggregationResults { + /// Convert and intermediate result and its aggregation request to the final result + pub fn from_intermediate_and_req( + results: IntermediateAggregationResults, + agg: Aggregations, + ) -> Self { + AggregationResults::from_intermediate_and_req_internal(results, &(agg.into())) + } + /// Convert and intermediate result and its aggregation request to the final result + /// + /// Internal function, CollectorAggregations is used instead Aggregations, which is optimized + /// for internal processing + fn from_intermediate_and_req_internal( + results: IntermediateAggregationResults, + req: &AggregationsInternal, + ) -> Self { + let mut result = HashMap::default(); + + // Important assumption: + // When the tree contains buckets/metric, we expect it to have all buckets/metrics from the + // request + if let Some(buckets) = results.buckets { + result.extend(buckets.into_iter().zip(req.buckets.values()).map( + |((key, bucket), req)| { + ( + key, + AggregationResult::BucketResult(BucketResult::from_intermediate_and_req( + bucket, req, + )), + ) + }, + )); + } else { + result.extend(req.buckets.iter().map(|(key, req)| { + let empty_bucket = IntermediateBucketResult::empty_from_req(&req.bucket_agg); + ( + key.to_string(), + AggregationResult::BucketResult(BucketResult::from_intermediate_and_req( + empty_bucket, + req, + )), + ) + })); + } + + if let Some(metrics) = results.metrics { + result.extend( + metrics + .into_iter() + .map(|(key, metric)| (key, AggregationResult::MetricResult(metric.into()))), + ); + } else { + result.extend(req.metrics.iter().map(|(key, req)| { + let empty_bucket = IntermediateMetricResult::empty_from_req(req); + ( + key.to_string(), + AggregationResult::MetricResult(empty_bucket.into()), + ) + })); + } + Self(result) } } @@ -41,18 +96,6 @@ pub enum AggregationResult { /// Metric result variant. MetricResult(MetricResult), } -impl From for AggregationResult { - fn from(tree: IntermediateAggregationResult) -> Self { - match tree { - IntermediateAggregationResult::Bucket(bucket) => { - AggregationResult::BucketResult(bucket.into()) - } - IntermediateAggregationResult::Metric(metric) => { - AggregationResult::MetricResult(metric.into()) - } - } - } -} #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] #[serde(untagged)] @@ -81,21 +124,36 @@ impl From for MetricResult { #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] #[serde(untagged)] pub enum BucketResult { - /// This is the default entry for a bucket, which contains a key, count, and optionally + /// This is the range entry for a bucket, which contains a key, count, from, to, and optionally /// sub_aggregations. Range { /// The range buckets sorted by range. buckets: Vec, }, + /// This is the histogram entry for a bucket, which contains a key, count, and optionally + /// sub_aggregations. + Histogram { + /// The buckets. + /// + /// If there are holes depends on the request, if min_doc_count is 0, then there are no + /// holes between the first and last bucket. + /// See [HistogramAggregation](super::bucket::HistogramAggregation) + buckets: Vec, + }, } -impl From for BucketResult { - fn from(result: IntermediateBucketResult) -> Self { - match result { +impl BucketResult { + fn from_intermediate_and_req( + bucket_result: IntermediateBucketResult, + req: &BucketAggregationInternal, + ) -> Self { + match bucket_result { IntermediateBucketResult::Range(range_map) => { let mut buckets: Vec = range_map .into_iter() - .map(|(_, bucket)| bucket.into()) + .map(|(_, bucket)| { + RangeBucketEntry::from_intermediate_and_req(bucket, &req.sub_aggregation) + }) .collect_vec(); buckets.sort_by(|a, b| { @@ -106,6 +164,68 @@ impl From for BucketResult { }); BucketResult::Range { buckets } } + IntermediateBucketResult::Histogram { buckets } => { + let buckets = intermediate_buckets_to_final_buckets( + buckets, + req.as_histogram(), + &req.sub_aggregation, + ); + + BucketResult::Histogram { buckets } + } + } + } +} + +/// This is the default entry for a bucket, which contains a key, count, and optionally +/// sub_aggregations. +/// +/// # JSON Format +/// ```json +/// { +/// ... +/// "my_histogram": { +/// "buckets": [ +/// { +/// "key": "2.0", +/// "doc_count": 5 +/// }, +/// { +/// "key": "4.0", +/// "doc_count": 2 +/// }, +/// { +/// "key": "6.0", +/// "doc_count": 3 +/// } +/// ] +/// } +/// ... +/// } +/// ``` +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +pub struct BucketEntry { + /// The identifier of the bucket. + pub key: Key, + /// Number of documents in the bucket. + pub doc_count: u64, + #[serde(flatten)] + /// sub-aggregations in this bucket. + pub sub_aggregation: AggregationResults, +} + +impl BucketEntry { + pub(crate) fn from_intermediate_and_req( + entry: IntermediateHistogramBucketEntry, + req: &AggregationsInternal, + ) -> Self { + BucketEntry { + key: Key::F64(entry.key), + doc_count: entry.doc_count, + sub_aggregation: AggregationResults::from_intermediate_and_req_internal( + entry.sub_aggregation, + req, + ), } } } @@ -139,7 +259,7 @@ impl From for BucketResult { /// } /// ... /// } -/// ``` +/// ``` #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub struct RangeBucketEntry { /// The identifier of the bucket. @@ -157,12 +277,18 @@ pub struct RangeBucketEntry { pub to: Option, } -impl From for RangeBucketEntry { - fn from(entry: IntermediateRangeBucketEntry) -> Self { +impl RangeBucketEntry { + fn from_intermediate_and_req( + entry: IntermediateRangeBucketEntry, + req: &AggregationsInternal, + ) -> Self { RangeBucketEntry { key: entry.key, doc_count: entry.doc_count, - sub_aggregation: entry.sub_aggregation.into(), + sub_aggregation: AggregationResults::from_intermediate_and_req_internal( + entry.sub_aggregation, + req, + ), to: entry.to, from: entry.from, } diff --git a/src/aggregation/bucket/histogram/histogram.rs b/src/aggregation/bucket/histogram/histogram.rs new file mode 100644 index 0000000000..a420094e6e --- /dev/null +++ b/src/aggregation/bucket/histogram/histogram.rs @@ -0,0 +1,1354 @@ +use std::cmp::Ordering; +use std::fmt::Display; + +use itertools::Itertools; +use serde::{Deserialize, Serialize}; + +use crate::aggregation::agg_req::AggregationsInternal; +use crate::aggregation::agg_req_with_accessor::{ + AggregationsWithAccessor, BucketAggregationWithAccessor, +}; +use crate::aggregation::agg_result::BucketEntry; +use crate::aggregation::f64_from_fastfield_u64; +use crate::aggregation::intermediate_agg_result::{ + IntermediateAggregationResults, IntermediateBucketResult, IntermediateHistogramBucketEntry, +}; +use crate::aggregation::segment_agg_result::{ + SegmentAggregationResultsCollector, SegmentHistogramBucketEntry, +}; +use crate::fastfield::{DynamicFastFieldReader, FastFieldReader}; +use crate::schema::Type; +use crate::{DocId, TantivyError}; + +/// Histogram is a bucket aggregation, where buckets are created dynamically for given `interval`. +/// Each document value is rounded down to its bucket. +/// +/// E.g. if we have a price 18 and an interval of 5, the document will fall into the bucket with +/// the key 15. The formula used for this is: +/// `((val - offset) / interval).floor() * interval + offset` +/// +/// For this calculation all fastfield values are converted to f64. +/// +/// # Returned Buckets +/// By default buckets are returned between the min and max value of the documents, including empty +/// buckets. +/// Setting min_doc_count to != 0 will filter empty buckets. +/// +/// The value range of the buckets can bet extended via +/// [extended_bounds](HistogramAggregation::extended_bounds) or limit the range via +/// [hard_bounds](HistogramAggregation::hard_bounds). +/// +/// # Result +/// Result type is [BucketResult](crate::aggregation::agg_result::BucketResult) with +/// [BucketEntry](crate::aggregation::agg_result::BucketEntry) on the +/// AggregationCollector. +/// +/// Result type is +/// [crate::aggregation::intermediate_agg_result::IntermediateBucketResult] with +/// [crate::aggregation::intermediate_agg_result::IntermediateHistogramBucketEntry] on the +/// DistributedAggregationCollector. +/// +/// # Limitations/Compatibility +/// +/// The keyed parameter (elasticsearch) is not yet supported. +/// +/// # JSON Format +/// ```json +/// { +/// "prices": { +/// "histogram": { +/// "field": "price", +/// "interval": 10, +/// } +/// } +/// } +/// ``` +/// +/// Response +/// See [BucketEntry](crate::aggregation::agg_result::BucketEntry) + +#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)] +pub struct HistogramAggregation { + /// The field to aggregate on. + pub field: String, + /// The interval to chunk your data range. The buckets span ranges of [0..interval). + /// Must be a positive value. + pub interval: f64, + /// Intervals implicitely defines an absolute grid of buckets `[interval * k, interval * (k + 1))`. + /// + /// Offset makes it possible to shift this grid into `[offset + interval * k, offset + interval * (k + 1)) + /// Offset has to be in the range [0, interval). + /// + /// As an example. If there are two documents with value 8 and 12 and interval 10.0, they would + /// fall into the buckets with the key 0 and 10. + /// With offset 5 and interval 10, they would both fall into the bucket with they key 5 and the + /// range [5..15) + pub offset: Option, + /// The minimum number of documents in a bucket to be returned. Defaults to 0. + pub min_doc_count: Option, + /// Limits the data range to `[min, max]` closed interval. + /// + /// This can be used to filter values if they are not in the data range. + /// + /// hard_bounds only limits the buckets, to force a range set both extended_bounds and + /// hard_bounds to the same range. + pub hard_bounds: Option, + /// Can be set to extend your bounds. The range of the buckets is by default defined by the + /// data range of the values of the documents. As the name suggests, this can only be used to + /// extend the value range. If the bounds for min or max are not extending the range, the value + /// has no effect on the returned buckets. + /// + /// Cannot be set in conjunction with min_doc_count > 0, since the empty buckets from extended + /// bounds would not be returned. + pub extended_bounds: Option, +} + +impl HistogramAggregation { + fn validate(&self) -> crate::Result<()> { + if self.interval <= 0.0f64 { + return Err(TantivyError::InvalidArgument( + "interval must be a positive value".to_string(), + )); + } + + if self.min_doc_count.unwrap_or(0) > 0 && self.extended_bounds.is_some() { + return Err(TantivyError::InvalidArgument( + "Cannot set min_doc_count and extended_bounds at the same time".to_string(), + )); + } + + if let (Some(hard_bounds), Some(extended_bounds)) = (self.hard_bounds, self.extended_bounds) + { + if extended_bounds.min < hard_bounds.min || extended_bounds.max > hard_bounds.max { + return Err(TantivyError::InvalidArgument(format!( + "extended_bounds have to be inside hard_bounds, extended_bounds: {}, \ + hard_bounds {}", + extended_bounds, hard_bounds + ))); + } + } + + Ok(()) + } + + /// Returns the minimum number of documents required for a bucket to be returned. + pub fn min_doc_count(&self) -> u64 { + self.min_doc_count.unwrap_or(0) + } +} + +/// Used to set extended or hard bounds on the histogram. +#[derive(Clone, Copy, Debug, PartialEq, Serialize, Deserialize)] +pub struct HistogramBounds { + /// The lower bounds. + pub min: f64, + /// The upper bounds. + pub max: f64, +} + +impl Display for HistogramBounds { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_fmt(format_args!("[{},{}]", self.min, self.max)) + } +} + +impl HistogramBounds { + fn contains(&self, val: f64) -> bool { + val >= self.min && val <= self.max + } +} + +/// The collector puts values from the fast field into the correct buckets and does a conversion to +/// the correct datatype. +#[derive(Clone, Debug, PartialEq)] +pub struct SegmentHistogramCollector { + /// The buckets containing the aggregation data. + buckets: Vec, + sub_aggregations: Option>, + field_type: Type, + interval: f64, + offset: f64, + first_bucket_num: i64, + bounds: HistogramBounds, +} + +impl SegmentHistogramCollector { + pub fn into_intermediate_bucket_result(self) -> IntermediateBucketResult { + let mut buckets = Vec::with_capacity( + self.buckets + .iter() + .filter(|bucket| bucket.doc_count != 0) + .count(), + ); + + // Below we remove empty buckets for two reasons + // 1. To reduce the size of the intermediate result, which may be passed on the wire. + // 2. To mimic elasticsearch, there are no empty buckets at the start and end. + // + // Empty buckets may be added later again in the final result, depending on the request. + if let Some(sub_aggregations) = self.sub_aggregations { + buckets.extend( + self.buckets + .into_iter() + .zip(sub_aggregations.into_iter()) + .filter(|(bucket, _sub_aggregation)| bucket.doc_count != 0) + .map(|(bucket, sub_aggregation)| (bucket, sub_aggregation).into()), + ) + } else { + buckets.extend( + self.buckets + .into_iter() + .filter(|bucket| bucket.doc_count != 0) + .map(|bucket| bucket.into()), + ); + }; + + IntermediateBucketResult::Histogram { buckets } + } + + pub(crate) fn from_req_and_validate( + req: &HistogramAggregation, + sub_aggregation: &AggregationsWithAccessor, + field_type: Type, + accessor: &DynamicFastFieldReader, + ) -> crate::Result { + req.validate()?; + let min = f64_from_fastfield_u64(accessor.min_value(), &field_type); + let max = f64_from_fastfield_u64(accessor.max_value(), &field_type); + + let (min, max) = get_req_min_max(req, Some((min, max))); + + // We compute and generate the buckets range (min, max) based on the request and the min + // max in the fast field, but this is likely not ideal when this is a subbucket, where many + // unnecessary buckets may be generated. + let buckets = generate_buckets(req, min, max); + + let sub_aggregations = if sub_aggregation.is_empty() { + None + } else { + let sub_aggregation = + SegmentAggregationResultsCollector::from_req_and_validate(sub_aggregation)?; + Some(buckets.iter().map(|_| sub_aggregation.clone()).collect()) + }; + + let buckets = buckets + .iter() + .map(|bucket| SegmentHistogramBucketEntry { + key: *bucket, + doc_count: 0, + }) + .collect(); + + let first_bucket_num = + get_bucket_num_f64(min, req.interval, req.offset.unwrap_or(0.0)) as i64; + + let bounds = req.hard_bounds.unwrap_or(HistogramBounds { + min: f64::MIN, + max: f64::MAX, + }); + + Ok(Self { + buckets, + field_type, + interval: req.interval, + offset: req.offset.unwrap_or(0.0), + first_bucket_num, + bounds, + sub_aggregations, + }) + } + + #[inline] + pub(crate) fn collect_block( + &mut self, + doc: &[DocId], + bucket_with_accessor: &BucketAggregationWithAccessor, + force_flush: bool, + ) { + let bounds = self.bounds; + let interval = self.interval; + let offset = self.offset; + let first_bucket_num = self.first_bucket_num; + let get_bucket_num = + |val| (get_bucket_num_f64(val, interval, offset) as i64 - first_bucket_num) as usize; + + let mut iter = doc.chunks_exact(4); + for docs in iter.by_ref() { + let val0 = self.f64_from_fastfield_u64(bucket_with_accessor.accessor.get(docs[0])); + let val1 = self.f64_from_fastfield_u64(bucket_with_accessor.accessor.get(docs[1])); + let val2 = self.f64_from_fastfield_u64(bucket_with_accessor.accessor.get(docs[2])); + let val3 = self.f64_from_fastfield_u64(bucket_with_accessor.accessor.get(docs[3])); + + let bucket_pos0 = get_bucket_num(val0); + let bucket_pos1 = get_bucket_num(val1); + let bucket_pos2 = get_bucket_num(val2); + let bucket_pos3 = get_bucket_num(val3); + + self.increment_bucket_if_in_bounds( + val0, + &bounds, + bucket_pos0, + docs[0], + &bucket_with_accessor.sub_aggregation, + ); + self.increment_bucket_if_in_bounds( + val1, + &bounds, + bucket_pos1, + docs[1], + &bucket_with_accessor.sub_aggregation, + ); + self.increment_bucket_if_in_bounds( + val2, + &bounds, + bucket_pos2, + docs[2], + &bucket_with_accessor.sub_aggregation, + ); + self.increment_bucket_if_in_bounds( + val3, + &bounds, + bucket_pos3, + docs[3], + &bucket_with_accessor.sub_aggregation, + ); + } + for doc in iter.remainder() { + let val = + f64_from_fastfield_u64(bucket_with_accessor.accessor.get(*doc), &self.field_type); + if !bounds.contains(val) { + continue; + } + let bucket_pos = (get_bucket_num_f64(val, self.interval, self.offset) as i64 + - self.first_bucket_num) as usize; + + debug_assert_eq!( + self.buckets[bucket_pos].key, + get_bucket_val(val, self.interval, self.offset) as f64 + ); + self.increment_bucket(bucket_pos, *doc, &bucket_with_accessor.sub_aggregation); + } + if force_flush { + if let Some(sub_aggregations) = self.sub_aggregations.as_mut() { + for sub_aggregation in sub_aggregations { + sub_aggregation + .flush_staged_docs(&bucket_with_accessor.sub_aggregation, force_flush); + } + } + } + } + + #[inline] + fn increment_bucket_if_in_bounds( + &mut self, + val: f64, + bounds: &HistogramBounds, + bucket_pos: usize, + doc: DocId, + bucket_with_accessor: &AggregationsWithAccessor, + ) { + if bounds.contains(val) { + debug_assert_eq!( + self.buckets[bucket_pos].key, + get_bucket_val(val, self.interval, self.offset) as f64 + ); + + self.increment_bucket(bucket_pos, doc, bucket_with_accessor); + } + } + + #[inline] + fn increment_bucket( + &mut self, + bucket_pos: usize, + doc: DocId, + bucket_with_accessor: &AggregationsWithAccessor, + ) { + let bucket = &mut self.buckets[bucket_pos]; + bucket.doc_count += 1; + if let Some(sub_aggregation) = self.sub_aggregations.as_mut() { + (&mut sub_aggregation[bucket_pos]).collect(doc, bucket_with_accessor); + } + } + + fn f64_from_fastfield_u64(&self, val: u64) -> f64 { + f64_from_fastfield_u64(val, &self.field_type) + } +} + +#[inline] +fn get_bucket_num_f64(val: f64, interval: f64, offset: f64) -> f64 { + ((val - offset) / interval).floor() +} + +#[inline] +fn get_bucket_val(val: f64, interval: f64, offset: f64) -> f64 { + let bucket_pos = get_bucket_num_f64(val, interval, offset); + bucket_pos * interval + offset +} + +// Convert to BucketEntry and fill gaps +fn intermediate_buckets_to_final_buckets_fill_gaps( + buckets: Vec, + histogram_req: &HistogramAggregation, + sub_aggregation: &AggregationsInternal, +) -> Vec { + // Generate the the full list of buckets without gaps. + // + // The bounds are the min max from the current buckets, optionally extended by + // extended_bounds from the request + let min_max = if buckets.is_empty() { + None + } else { + let min = buckets[0].key; + let max = buckets[buckets.len() - 1].key; + Some((min, max)) + }; + let fill_gaps_buckets = generate_buckets_with_opt_minmax(histogram_req, min_max); + + let empty_sub_aggregation = IntermediateAggregationResults::empty_from_req(&sub_aggregation); + + // Use merge_join_by to fill in gaps, since buckets are sorted + let buckets = buckets + .into_iter() + .merge_join_by( + fill_gaps_buckets.into_iter(), + |existing_bucket, fill_gaps_bucket| { + existing_bucket + .key + .partial_cmp(fill_gaps_bucket) + .unwrap_or(Ordering::Equal) + }, + ) + .map(|either| match either { + // Ignore the generated bucket + itertools::EitherOrBoth::Both(existing, _) => existing, + itertools::EitherOrBoth::Left(existing) => existing, + // Add missing bucket + itertools::EitherOrBoth::Right(missing_bucket) => IntermediateHistogramBucketEntry { + key: missing_bucket, + doc_count: 0, + sub_aggregation: empty_sub_aggregation.clone(), + }, + }) + .map(|intermediate_bucket| { + BucketEntry::from_intermediate_and_req(intermediate_bucket, &sub_aggregation) + }) + .collect_vec(); + return buckets; +} + +// Convert to BucketEntry +pub(crate) fn intermediate_buckets_to_final_buckets( + buckets: Vec, + histogram_req: &HistogramAggregation, + sub_aggregation: &AggregationsInternal, +) -> Vec { + if histogram_req.min_doc_count() == 0 { + // With min_doc_count != 0, we may need to add buckets, so that there are no + // gaps, since intermediate result does not contain empty buckets (filtered to + // reduce serialization size). + let buckets = intermediate_buckets_to_final_buckets_fill_gaps( + buckets, + histogram_req, + sub_aggregation, + ); + return buckets; + } else { + let buckets = buckets + .into_iter() + .filter(|bucket| bucket.doc_count >= histogram_req.min_doc_count()) + .map(|bucket| BucketEntry::from_intermediate_and_req(bucket, &sub_aggregation)) + .collect_vec(); + return buckets; + }; +} + +/// Applies req extended_bounds/hard_bounds on the min_max value +/// +/// May return (f64::MAX, f64::MIN), if there is no range. +fn get_req_min_max(req: &HistogramAggregation, min_max: Option<(f64, f64)>) -> (f64, f64) { + let (mut min, mut max) = min_max.unwrap_or((f64::MAX, f64::MIN)); + + if let Some(extended_bounds) = &req.extended_bounds { + min = min.min(extended_bounds.min); + max = max.max(extended_bounds.max); + } + + if let Some(hard_bounds) = &req.hard_bounds { + min = min.max(hard_bounds.min); + max = max.min(hard_bounds.max); + } + + (min, max) +} + +/// Generates buckets with req.interval +/// range is computed for provided min_max and request extended_bounds/hard_bounds +pub(crate) fn generate_buckets(req: &HistogramAggregation, min: f64, max: f64) -> Vec { + generate_buckets_with_opt_minmax(req, Some((min, max))) +} + +/// Generates buckets with req.interval +/// Range is computed for provided min_max and request extended_bounds/hard_bounds +/// returns empty vec when there is no range to span +pub(crate) fn generate_buckets_with_opt_minmax( + req: &HistogramAggregation, + min_max: Option<(f64, f64)>, +) -> Vec { + let (min, max) = get_req_min_max(req, min_max); + + let offset = req.offset.unwrap_or(0.0); + let first_bucket_num = get_bucket_num_f64(min, req.interval, offset) as i64; + let last_bucket_num = get_bucket_num_f64(max, req.interval, offset) as i64; + let mut buckets = vec![]; + for bucket_pos in first_bucket_num..=last_bucket_num { + let bucket_key = bucket_pos as f64 * req.interval + offset; + buckets.push(bucket_key); + } + + buckets +} + +#[test] +fn generate_buckets_test() { + let histogram_req = HistogramAggregation { + field: "dummy".to_string(), + interval: 2.0, + ..Default::default() + }; + + let buckets = generate_buckets(&histogram_req, 0.0, 10.0); + assert_eq!(buckets, vec![0.0, 2.0, 4.0, 6.0, 8.0, 10.0]); + + let buckets = generate_buckets(&histogram_req, 2.5, 5.5); + assert_eq!(buckets, vec![2.0, 4.0]); + + // Single bucket + let buckets = generate_buckets(&histogram_req, 0.5, 0.75); + assert_eq!(buckets, vec![0.0]); + + // With offset + let histogram_req = HistogramAggregation { + field: "dummy".to_string(), + interval: 2.0, + offset: Some(0.5), + ..Default::default() + }; + + let buckets = generate_buckets(&histogram_req, 0.0, 10.0); + assert_eq!(buckets, vec![-1.5, 0.5, 2.5, 4.5, 6.5, 8.5]); + + let buckets = generate_buckets(&histogram_req, 2.5, 5.5); + assert_eq!(buckets, vec![2.5, 4.5]); + + // Single bucket + let buckets = generate_buckets(&histogram_req, 0.5, 0.75); + assert_eq!(buckets, vec![0.5]); + + // no bucket + let buckets = generate_buckets(&histogram_req, f64::MAX, f64::MIN); + assert_eq!(buckets, vec![] as Vec); + + // With extended_bounds + let histogram_req = HistogramAggregation { + field: "dummy".to_string(), + interval: 2.0, + extended_bounds: Some(HistogramBounds { + min: 0.0, + max: 10.0, + }), + ..Default::default() + }; + + let buckets = generate_buckets(&histogram_req, 0.0, 10.0); + assert_eq!(buckets, vec![0.0, 2.0, 4.0, 6.0, 8.0, 10.0]); + + let buckets = generate_buckets(&histogram_req, 2.5, 5.5); + assert_eq!(buckets, vec![0.0, 2.0, 4.0, 6.0, 8.0, 10.0]); + + // Single bucket, but extended_bounds + let buckets = generate_buckets(&histogram_req, 0.5, 0.75); + assert_eq!(buckets, vec![0.0, 2.0, 4.0, 6.0, 8.0, 10.0]); + + // no bucket, but extended_bounds + let buckets = generate_buckets(&histogram_req, f64::MAX, f64::MIN); + assert_eq!(buckets, vec![0.0, 2.0, 4.0, 6.0, 8.0, 10.0]); + + // With invalid extended_bounds + let histogram_req = HistogramAggregation { + field: "dummy".to_string(), + interval: 2.0, + extended_bounds: Some(HistogramBounds { min: 3.0, max: 5.0 }), + ..Default::default() + }; + + let buckets = generate_buckets(&histogram_req, 0.0, 10.0); + assert_eq!(buckets, vec![0.0, 2.0, 4.0, 6.0, 8.0, 10.0]); + + // With hard_bounds reducing + let histogram_req = HistogramAggregation { + field: "dummy".to_string(), + interval: 2.0, + hard_bounds: Some(HistogramBounds { min: 3.0, max: 5.0 }), + ..Default::default() + }; + + let buckets = generate_buckets(&histogram_req, 0.0, 10.0); + assert_eq!(buckets, vec![2.0, 4.0]); + + // With hard_bounds, extending has no effect + let histogram_req = HistogramAggregation { + field: "dummy".to_string(), + interval: 2.0, + hard_bounds: Some(HistogramBounds { + min: 0.0, + max: 10.0, + }), + ..Default::default() + }; + + let buckets = generate_buckets(&histogram_req, 2.5, 5.5); + assert_eq!(buckets, vec![2.0, 4.0]); + + // Blubber + let histogram_req = HistogramAggregation { + field: "dummy".to_string(), + interval: 2.0, + ..Default::default() + }; + + let buckets = generate_buckets(&histogram_req, 4.0, 10.0); + assert_eq!(buckets, vec![4.0, 6.0, 8.0, 10.0]); +} + +#[cfg(test)] +mod tests { + + use pretty_assertions::assert_eq; + use serde_json::Value; + + use super::*; + use crate::aggregation::agg_req::{ + Aggregation, Aggregations, BucketAggregation, BucketAggregationType, MetricAggregation, + }; + use crate::aggregation::metric::{AverageAggregation, StatsAggregation}; + use crate::aggregation::tests::{ + get_test_index_2_segments, get_test_index_from_values, get_test_index_with_num_docs, + }; + use crate::aggregation::AggregationCollector; + use crate::query::{AllQuery, TermQuery}; + use crate::schema::IndexRecordOption; + use crate::{Index, Term}; + + fn exec_request(agg_req: Aggregations, index: &Index) -> crate::Result { + exec_request_with_query(agg_req, index, None) + } + fn exec_request_with_query( + agg_req: Aggregations, + index: &Index, + query: Option<(&str, &str)>, + ) -> crate::Result { + let collector = AggregationCollector::from_aggs(agg_req); + + let reader = index.reader()?; + let searcher = reader.searcher(); + let agg_res = if let Some((field, term)) = query { + let text_field = reader.searcher().schema().get_field(field).unwrap(); + + let term_query = TermQuery::new( + Term::from_field_text(text_field, term), + IndexRecordOption::Basic, + ); + + searcher.search(&term_query, &collector)? + } else { + searcher.search(&AllQuery, &collector)? + }; + + let res: Value = serde_json::from_str(&serde_json::to_string(&agg_res)?)?; + Ok(res) + } + + #[test] + fn histogram_test_crooked_values() -> crate::Result<()> { + let values = vec![-12.0, 12.31, 14.33, 16.23]; + + let index = get_test_index_from_values(false, &values)?; + + let agg_req: Aggregations = vec![( + "my_interval".to_string(), + Aggregation::Bucket(BucketAggregation { + bucket_agg: BucketAggregationType::Histogram(HistogramAggregation { + field: "score_f64".to_string(), + interval: 3.5, + offset: Some(0.0), + ..Default::default() + }), + sub_aggregation: Default::default(), + }), + )] + .into_iter() + .collect(); + + let res = exec_request(agg_req, &index)?; + + assert_eq!(res["my_interval"]["buckets"][0]["key"], -14.0); + assert_eq!(res["my_interval"]["buckets"][0]["doc_count"], 1); + assert_eq!(res["my_interval"]["buckets"][7]["key"], 10.5); + assert_eq!(res["my_interval"]["buckets"][7]["doc_count"], 1); + assert_eq!(res["my_interval"]["buckets"][8]["key"], 14.0); + assert_eq!(res["my_interval"]["buckets"][8]["doc_count"], 2); + assert_eq!(res["my_interval"]["buckets"][9], Value::Null); + + // With offset + let agg_req: Aggregations = vec![( + "my_interval".to_string(), + Aggregation::Bucket(BucketAggregation { + bucket_agg: BucketAggregationType::Histogram(HistogramAggregation { + field: "score_f64".to_string(), + interval: 3.5, + offset: Some(1.2), + ..Default::default() + }), + sub_aggregation: Default::default(), + }), + )] + .into_iter() + .collect(); + + let res = exec_request(agg_req, &index)?; + + assert_eq!(res["my_interval"]["buckets"][0]["key"], -12.8); + assert_eq!(res["my_interval"]["buckets"][0]["doc_count"], 1); + assert_eq!(res["my_interval"]["buckets"][1]["key"], -9.3); + assert_eq!(res["my_interval"]["buckets"][1]["doc_count"], 0); + assert_eq!(res["my_interval"]["buckets"][2]["key"], -5.8); + assert_eq!(res["my_interval"]["buckets"][2]["doc_count"], 0); + assert_eq!(res["my_interval"]["buckets"][3]["key"], -2.3); + assert_eq!(res["my_interval"]["buckets"][3]["doc_count"], 0); + + assert_eq!(res["my_interval"]["buckets"][7]["key"], 11.7); + assert_eq!(res["my_interval"]["buckets"][7]["doc_count"], 2); + assert_eq!(res["my_interval"]["buckets"][8]["key"], 15.2); + assert_eq!(res["my_interval"]["buckets"][8]["doc_count"], 1); + assert_eq!(res["my_interval"]["buckets"][9], Value::Null); + + Ok(()) + } + + #[test] + fn histogram_test_min_value_positive_force_merge_segments() -> crate::Result<()> { + histogram_test_min_value_positive_merge_segments(true) + } + + #[test] + fn histogram_test_min_value_positive() -> crate::Result<()> { + histogram_test_min_value_positive_merge_segments(false) + } + fn histogram_test_min_value_positive_merge_segments(merge_segments: bool) -> crate::Result<()> { + let values = vec![10.0, 12.0, 14.0, 16.23]; + + let index = get_test_index_from_values(merge_segments, &values)?; + + let agg_req: Aggregations = vec![( + "my_interval".to_string(), + Aggregation::Bucket(BucketAggregation { + bucket_agg: BucketAggregationType::Histogram(HistogramAggregation { + field: "score_f64".to_string(), + interval: 1.0, + ..Default::default() + }), + sub_aggregation: Default::default(), + }), + )] + .into_iter() + .collect(); + + let res = exec_request(agg_req, &index)?; + + assert_eq!(res["my_interval"]["buckets"][0]["key"], 10.0); + assert_eq!(res["my_interval"]["buckets"][0]["doc_count"], 1); + assert_eq!(res["my_interval"]["buckets"][1]["key"], 11.0); + assert_eq!(res["my_interval"]["buckets"][1]["doc_count"], 0); + assert_eq!(res["my_interval"]["buckets"][2]["key"], 12.0); + assert_eq!(res["my_interval"]["buckets"][2]["doc_count"], 1); + assert_eq!(res["my_interval"]["buckets"][3]["key"], 13.0); + assert_eq!(res["my_interval"]["buckets"][3]["doc_count"], 0); + assert_eq!(res["my_interval"]["buckets"][6]["key"], 16.0); + assert_eq!(res["my_interval"]["buckets"][6]["doc_count"], 1); + assert_eq!(res["my_interval"]["buckets"][7], Value::Null); + + Ok(()) + } + + #[test] + fn histogram_simple_test() -> crate::Result<()> { + let index = get_test_index_with_num_docs(false, 100)?; + + let agg_req: Aggregations = vec![( + "histogram".to_string(), + Aggregation::Bucket(BucketAggregation { + bucket_agg: BucketAggregationType::Histogram(HistogramAggregation { + field: "score_f64".to_string(), + interval: 1.0, + ..Default::default() + }), + sub_aggregation: Default::default(), + }), + )] + .into_iter() + .collect(); + + let res = exec_request(agg_req, &index)?; + + assert_eq!(res["histogram"]["buckets"][0]["key"], 0.0); + assert_eq!(res["histogram"]["buckets"][0]["doc_count"], 1); + assert_eq!(res["histogram"]["buckets"][1]["key"], 1.0); + assert_eq!(res["histogram"]["buckets"][1]["doc_count"], 1); + assert_eq!(res["histogram"]["buckets"][99]["key"], 99.0); + assert_eq!(res["histogram"]["buckets"][99]["doc_count"], 1); + assert_eq!(res["histogram"]["buckets"][100], Value::Null); + Ok(()) + } + + #[test] + fn histogram_merge_test() -> crate::Result<()> { + // Merge buckets counts from different segments + let values = vec![10.0, 12.0, 14.0, 16.23, 10.0, 13.0, 10.0, 12.0]; + + let index = get_test_index_from_values(false, &values)?; + + let agg_req: Aggregations = vec![( + "histogram".to_string(), + Aggregation::Bucket(BucketAggregation { + bucket_agg: BucketAggregationType::Histogram(HistogramAggregation { + field: "score_f64".to_string(), + interval: 1.0, + ..Default::default() + }), + sub_aggregation: Default::default(), + }), + )] + .into_iter() + .collect(); + + let res = exec_request(agg_req, &index)?; + + assert_eq!(res["histogram"]["buckets"][0]["key"], 10.0); + assert_eq!(res["histogram"]["buckets"][0]["doc_count"], 3); + assert_eq!(res["histogram"]["buckets"][1]["key"], 11.0); + assert_eq!(res["histogram"]["buckets"][1]["doc_count"], 0); + assert_eq!(res["histogram"]["buckets"][2]["key"], 12.0); + assert_eq!(res["histogram"]["buckets"][2]["doc_count"], 2); + assert_eq!(res["histogram"]["buckets"][3]["key"], 13.0); + assert_eq!(res["histogram"]["buckets"][3]["doc_count"], 1); + + Ok(()) + } + #[test] + fn histogram_min_doc_test_multi_segments() -> crate::Result<()> { + histogram_min_doc_test_with_opt(false) + } + #[test] + fn histogram_min_doc_test_single_segments() -> crate::Result<()> { + histogram_min_doc_test_with_opt(true) + } + fn histogram_min_doc_test_with_opt(merge_segments: bool) -> crate::Result<()> { + let values = vec![10.0, 12.0, 14.0, 16.23, 10.0, 13.0, 10.0, 12.0]; + + let index = get_test_index_from_values(merge_segments, &values)?; + + let agg_req: Aggregations = vec![( + "histogram".to_string(), + Aggregation::Bucket(BucketAggregation { + bucket_agg: BucketAggregationType::Histogram(HistogramAggregation { + field: "score_f64".to_string(), + interval: 1.0, + min_doc_count: Some(2), + ..Default::default() + }), + sub_aggregation: Default::default(), + }), + )] + .into_iter() + .collect(); + + let res = exec_request(agg_req, &index)?; + + assert_eq!(res["histogram"]["buckets"][0]["key"], 10.0); + assert_eq!(res["histogram"]["buckets"][0]["doc_count"], 3); + assert_eq!(res["histogram"]["buckets"][1]["key"], 12.0); + assert_eq!(res["histogram"]["buckets"][1]["doc_count"], 2); + assert_eq!(res["histogram"]["buckets"][2], Value::Null); + + Ok(()) + } + + #[test] + fn histogram_extended_bounds_test_multi_segment() -> crate::Result<()> { + histogram_extended_bounds_test_with_opt(false) + } + #[test] + fn histogram_extended_bounds_test_single_segment() -> crate::Result<()> { + histogram_extended_bounds_test_with_opt(true) + } + fn histogram_extended_bounds_test_with_opt(merge_segments: bool) -> crate::Result<()> { + let values = vec![5.0]; + let index = get_test_index_from_values(merge_segments, &values)?; + + let agg_req: Aggregations = vec![( + "histogram".to_string(), + Aggregation::Bucket(BucketAggregation { + bucket_agg: BucketAggregationType::Histogram(HistogramAggregation { + field: "score_f64".to_string(), + interval: 1.0, + extended_bounds: Some(HistogramBounds { + min: 2.0, + max: 12.0, + }), + ..Default::default() + }), + sub_aggregation: Default::default(), + }), + )] + .into_iter() + .collect(); + + let res = exec_request(agg_req, &index)?; + + assert_eq!(res["histogram"]["buckets"][0]["key"], 2.0); + assert_eq!(res["histogram"]["buckets"][0]["doc_count"], 0); + assert_eq!(res["histogram"]["buckets"][1]["key"], 3.0); + assert_eq!(res["histogram"]["buckets"][1]["doc_count"], 0); + assert_eq!(res["histogram"]["buckets"][2]["doc_count"], 0); + assert_eq!(res["histogram"]["buckets"][10]["key"], 12.0); + assert_eq!(res["histogram"]["buckets"][10]["doc_count"], 0); + + // 2 hits + let values = vec![5.0, 5.5]; + let index = get_test_index_from_values(merge_segments, &values)?; + + let agg_req: Aggregations = vec![( + "histogram".to_string(), + Aggregation::Bucket(BucketAggregation { + bucket_agg: BucketAggregationType::Histogram(HistogramAggregation { + field: "score_f64".to_string(), + interval: 1.0, + extended_bounds: Some(HistogramBounds { min: 3.0, max: 6.0 }), + ..Default::default() + }), + sub_aggregation: Default::default(), + }), + )] + .into_iter() + .collect(); + + let res = exec_request(agg_req, &index)?; + + assert_eq!(res["histogram"]["buckets"][0]["key"], 3.0); + assert_eq!(res["histogram"]["buckets"][0]["doc_count"], 0); + assert_eq!(res["histogram"]["buckets"][1]["key"], 4.0); + assert_eq!(res["histogram"]["buckets"][1]["doc_count"], 0); + assert_eq!(res["histogram"]["buckets"][2]["key"], 5.0); + assert_eq!(res["histogram"]["buckets"][2]["doc_count"], 2); + assert_eq!(res["histogram"]["buckets"][3]["key"], 6.0); + assert_eq!(res["histogram"]["buckets"][3]["doc_count"], 0); + assert_eq!(res["histogram"]["buckets"][4], Value::Null); + + // 1 hit outside bounds + let values = vec![15.0]; + let index = get_test_index_from_values(merge_segments, &values)?; + + let agg_req: Aggregations = vec![( + "histogram".to_string(), + Aggregation::Bucket(BucketAggregation { + bucket_agg: BucketAggregationType::Histogram(HistogramAggregation { + field: "score_f64".to_string(), + interval: 1.0, + extended_bounds: Some(HistogramBounds { min: 3.0, max: 6.0 }), + hard_bounds: Some(HistogramBounds { min: 3.0, max: 6.0 }), + ..Default::default() + }), + sub_aggregation: Default::default(), + }), + )] + .into_iter() + .collect(); + + let res = exec_request(agg_req, &index)?; + + assert_eq!(res["histogram"]["buckets"][0]["key"], 3.0); + assert_eq!(res["histogram"]["buckets"][0]["doc_count"], 0); + assert_eq!(res["histogram"]["buckets"][1]["key"], 4.0); + assert_eq!(res["histogram"]["buckets"][1]["doc_count"], 0); + assert_eq!(res["histogram"]["buckets"][2]["key"], 5.0); + assert_eq!(res["histogram"]["buckets"][2]["doc_count"], 0); + assert_eq!(res["histogram"]["buckets"][3]["key"], 6.0); + assert_eq!(res["histogram"]["buckets"][3]["doc_count"], 0); + assert_eq!(res["histogram"]["buckets"][4], Value::Null); + + Ok(()) + } + + #[test] + fn histogram_hard_bounds_test_multi_segment() -> crate::Result<()> { + histogram_hard_bounds_test_with_opt(false) + } + #[test] + fn histogram_hard_bounds_test_single_segment() -> crate::Result<()> { + histogram_hard_bounds_test_with_opt(true) + } + fn histogram_hard_bounds_test_with_opt(merge_segments: bool) -> crate::Result<()> { + let values = vec![10.0, 12.0, 14.0, 16.23, 10.0, 13.0, 10.0, 12.0]; + + let index = get_test_index_from_values(merge_segments, &values)?; + + let agg_req: Aggregations = vec![( + "histogram".to_string(), + Aggregation::Bucket(BucketAggregation { + bucket_agg: BucketAggregationType::Histogram(HistogramAggregation { + field: "score_f64".to_string(), + interval: 1.0, + hard_bounds: Some(HistogramBounds { + min: 2.0, + max: 12.0, + }), + ..Default::default() + }), + sub_aggregation: Default::default(), + }), + )] + .into_iter() + .collect(); + + let res = exec_request(agg_req, &index)?; + + assert_eq!(res["histogram"]["buckets"][0]["key"], 10.0); + assert_eq!(res["histogram"]["buckets"][0]["doc_count"], 3); + assert_eq!(res["histogram"]["buckets"][1]["key"], 11.0); + assert_eq!(res["histogram"]["buckets"][1]["doc_count"], 0); + assert_eq!(res["histogram"]["buckets"][2]["key"], 12.0); + assert_eq!(res["histogram"]["buckets"][2]["doc_count"], 2); + + assert_eq!(res["histogram"]["buckets"][3], Value::Null); + + // hard_bounds and extended_bounds will act like a force bounds + // + let agg_req: Aggregations = vec![( + "histogram".to_string(), + Aggregation::Bucket(BucketAggregation { + bucket_agg: BucketAggregationType::Histogram(HistogramAggregation { + field: "score_f64".to_string(), + interval: 1.0, + hard_bounds: Some(HistogramBounds { + min: 2.0, + max: 12.0, + }), + extended_bounds: Some(HistogramBounds { + min: 2.0, + max: 12.0, + }), + ..Default::default() + }), + sub_aggregation: Default::default(), + }), + )] + .into_iter() + .collect(); + + let res = exec_request(agg_req, &index)?; + + assert_eq!(res["histogram"]["buckets"][0]["key"], 2.0); + assert_eq!(res["histogram"]["buckets"][0]["doc_count"], 0); + assert_eq!(res["histogram"]["buckets"][1]["key"], 3.0); + assert_eq!(res["histogram"]["buckets"][1]["doc_count"], 0); + assert_eq!(res["histogram"]["buckets"][10]["key"], 12.0); + assert_eq!(res["histogram"]["buckets"][10]["doc_count"], 2); + + assert_eq!(res["histogram"]["buckets"][11], Value::Null); + + // Invalid request + let agg_req: Aggregations = vec![( + "histogram".to_string(), + Aggregation::Bucket(BucketAggregation { + bucket_agg: BucketAggregationType::Histogram(HistogramAggregation { + field: "score_f64".to_string(), + interval: 1.0, + hard_bounds: Some(HistogramBounds { + min: 2.0, + max: 12.0, + }), + extended_bounds: Some(HistogramBounds { + min: 1.0, + max: 12.0, + }), + ..Default::default() + }), + sub_aggregation: Default::default(), + }), + )] + .into_iter() + .collect(); + + let res = exec_request(agg_req, &index).unwrap_err(); + assert_eq!( + res.to_string(), + "An invalid argument was passed: 'extended_bounds have to be inside hard_bounds, \ + extended_bounds: [1,12], hard_bounds [2,12]'" + ); + + Ok(()) + } + + #[test] + fn histogram_empty_result_behaviour_test_single_segment() -> crate::Result<()> { + histogram_empty_result_behaviour_test_with_opt(true) + } + + #[test] + fn histogram_empty_result_behaviour_test_multi_segment() -> crate::Result<()> { + histogram_empty_result_behaviour_test_with_opt(false) + } + + fn histogram_empty_result_behaviour_test_with_opt(merge_segments: bool) -> crate::Result<()> { + let index = get_test_index_2_segments(merge_segments)?; + + let agg_req: Aggregations = vec![( + "histogram".to_string(), + Aggregation::Bucket(BucketAggregation { + bucket_agg: BucketAggregationType::Histogram(HistogramAggregation { + field: "score_f64".to_string(), + interval: 1.0, + ..Default::default() + }), + sub_aggregation: Default::default(), + }), + )] + .into_iter() + .collect(); + + let res = exec_request_with_query(agg_req.clone(), &index, Some(("text", "blubberasdf")))?; + + assert_eq!( + res, + json!({ + "histogram": { + "buckets": [] + } + }) + ); + + // test index without segments + let values = vec![]; + + // Don't merge empty segments + let index = get_test_index_from_values(false, &values)?; + + let res = exec_request_with_query(agg_req, &index, Some(("text", "blubberasdf")))?; + + assert_eq!( + res, + json!({ + "histogram": { + "buckets": [] + } + }) + ); + + // test index without segments + let values = vec![]; + + // Don't merge empty segments + let index = get_test_index_from_values(false, &values)?; + + let agg_req: Aggregations = vec![( + "histogram".to_string(), + Aggregation::Bucket(BucketAggregation { + bucket_agg: BucketAggregationType::Histogram(HistogramAggregation { + field: "score_f64".to_string(), + interval: 1.0, + extended_bounds: Some(HistogramBounds { + min: 2.0, + max: 12.0, + }), + ..Default::default() + }), + sub_aggregation: Default::default(), + }), + )] + .into_iter() + .collect(); + + let res = exec_request(agg_req, &index)?; + + assert_eq!(res["histogram"]["buckets"][0]["key"], 2.0); + assert_eq!(res["histogram"]["buckets"][0]["doc_count"], 0); + assert_eq!(res["histogram"]["buckets"][1]["key"], 3.0); + assert_eq!(res["histogram"]["buckets"][1]["doc_count"], 0); + assert_eq!(res["histogram"]["buckets"][2]["doc_count"], 0); + assert_eq!(res["histogram"]["buckets"][10]["key"], 12.0); + assert_eq!(res["histogram"]["buckets"][10]["doc_count"], 0); + + let agg_req: Aggregations = vec![( + "histogram".to_string(), + Aggregation::Bucket(BucketAggregation { + bucket_agg: BucketAggregationType::Histogram(HistogramAggregation { + field: "score_f64".to_string(), + interval: 1.0, + extended_bounds: Some(HistogramBounds { min: 2.0, max: 5.0 }), + hard_bounds: Some(HistogramBounds { + min: 2.0, + max: 12.0, + }), + ..Default::default() + }), + sub_aggregation: Default::default(), + }), + )] + .into_iter() + .collect(); + + let res = exec_request(agg_req, &index)?; + + assert_eq!(res["histogram"]["buckets"][0]["key"], 2.0); + assert_eq!(res["histogram"]["buckets"][0]["doc_count"], 0); + assert_eq!(res["histogram"]["buckets"][1]["key"], 3.0); + assert_eq!(res["histogram"]["buckets"][1]["doc_count"], 0); + assert_eq!(res["histogram"]["buckets"][2]["doc_count"], 0); + assert_eq!(res["histogram"]["buckets"][10], Value::Null); + + // hard_bounds will not extend the result + let agg_req: Aggregations = vec![( + "histogram".to_string(), + Aggregation::Bucket(BucketAggregation { + bucket_agg: BucketAggregationType::Histogram(HistogramAggregation { + field: "score_f64".to_string(), + interval: 1.0, + hard_bounds: Some(HistogramBounds { + min: 2.0, + max: 12.0, + }), + ..Default::default() + }), + sub_aggregation: Default::default(), + }), + )] + .into_iter() + .collect(); + + let res = exec_request(agg_req, &index)?; + + assert_eq!( + res, + json!({ + "histogram": { + "buckets": [] + } + }) + ); + + let agg_req: Aggregations = vec![ + ( + "stats".to_string(), + Aggregation::Metric(MetricAggregation::Stats(StatsAggregation { + field: "score_f64".to_string(), + })), + ), + ( + "avg".to_string(), + Aggregation::Metric(MetricAggregation::Average(AverageAggregation { + field: "score_f64".to_string(), + })), + ), + ] + .into_iter() + .collect(); + + let agg_req: Aggregations = vec![( + "histogram".to_string(), + Aggregation::Bucket(BucketAggregation { + bucket_agg: BucketAggregationType::Histogram(HistogramAggregation { + field: "score_f64".to_string(), + interval: 1.0, + extended_bounds: Some(HistogramBounds { + min: 2.0, + max: 12.0, + }), + ..Default::default() + }), + sub_aggregation: agg_req, + }), + )] + .into_iter() + .collect(); + + let res = exec_request(agg_req, &index)?; + + assert_eq!( + res["histogram"]["buckets"][0], + json!({ + "avg": { + "value": Value::Null + }, + "doc_count": 0, + "key": 2.0, + "stats": { + "sum": 0.0, + "count": 0, + "min": Value::Null, + "max": Value::Null, + "avg": Value::Null, + "standard_deviation": Value::Null, + } + }) + ); + assert_eq!(res["histogram"]["buckets"][0]["key"], 2.0); + assert_eq!(res["histogram"]["buckets"][0]["doc_count"], 0); + assert_eq!(res["histogram"]["buckets"][1]["key"], 3.0); + assert_eq!(res["histogram"]["buckets"][1]["doc_count"], 0); + assert_eq!(res["histogram"]["buckets"][2]["doc_count"], 0); + assert_eq!(res["histogram"]["buckets"][10]["key"], 12.0); + assert_eq!(res["histogram"]["buckets"][10]["doc_count"], 0); + + Ok(()) + } + + #[test] + fn histogram_single_bucket_test_single_segment() -> crate::Result<()> { + histogram_single_bucket_test_with_opt(true) + } + + #[test] + fn histogram_single_bucket_test_multi_segment() -> crate::Result<()> { + histogram_single_bucket_test_with_opt(false) + } + + fn histogram_single_bucket_test_with_opt(merge_segments: bool) -> crate::Result<()> { + let index = get_test_index_2_segments(merge_segments)?; + + let agg_req: Aggregations = vec![( + "histogram".to_string(), + Aggregation::Bucket(BucketAggregation { + bucket_agg: BucketAggregationType::Histogram(HistogramAggregation { + field: "score_f64".to_string(), + interval: 100000.0, + ..Default::default() + }), + sub_aggregation: Default::default(), + }), + )] + .into_iter() + .collect(); + + let agg_res = exec_request(agg_req, &index)?; + + let res: Value = serde_json::from_str(&serde_json::to_string(&agg_res)?)?; + + assert_eq!(res["histogram"]["buckets"][0]["key"], 0.0); + assert_eq!(res["histogram"]["buckets"][0]["doc_count"], 9); + assert_eq!(res["histogram"]["buckets"][1], Value::Null); + + Ok(()) + } +} diff --git a/src/aggregation/bucket/histogram/mod.rs b/src/aggregation/bucket/histogram/mod.rs new file mode 100644 index 0000000000..77b526147c --- /dev/null +++ b/src/aggregation/bucket/histogram/mod.rs @@ -0,0 +1,2 @@ +mod histogram; +pub use histogram::*; diff --git a/src/aggregation/bucket/mod.rs b/src/aggregation/bucket/mod.rs index e69d95be9f..0a9991fce7 100644 --- a/src/aggregation/bucket/mod.rs +++ b/src/aggregation/bucket/mod.rs @@ -7,7 +7,10 @@ //! Results of intermediate buckets are //! [IntermediateBucketResult](super::intermediate_agg_result::IntermediateBucketResult) +mod histogram; mod range; +pub(crate) use histogram::SegmentHistogramCollector; +pub use histogram::*; pub(crate) use range::SegmentRangeCollector; pub use range::*; diff --git a/src/aggregation/bucket/range.rs b/src/aggregation/bucket/range.rs index 4650221717..2715993b30 100644 --- a/src/aggregation/bucket/range.rs +++ b/src/aggregation/bucket/range.rs @@ -30,6 +30,11 @@ use crate::{DocId, TantivyError}; /// [crate::aggregation::intermediate_agg_result::IntermediateRangeBucketEntry] on the /// DistributedAggregationCollector. /// +/// # Limitations/Compatibility +/// Overlapping ranges are not yet supported. +/// +/// The keyed parameter (elasticsearch) is not yet supported. +/// /// # Request JSON Format /// ```json /// { @@ -42,8 +47,8 @@ use crate::{DocId, TantivyError}; /// { "from": 20.0 } /// ] /// } -/// } -/// ``` +/// } +/// ``` #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub struct RangeAggregation { /// The field to aggregate on. diff --git a/src/aggregation/collector.rs b/src/aggregation/collector.rs index ec581d6a70..2278361518 100644 --- a/src/aggregation/collector.rs +++ b/src/aggregation/collector.rs @@ -5,7 +5,7 @@ use super::intermediate_agg_result::IntermediateAggregationResults; use super::segment_agg_result::SegmentAggregationResultsCollector; use crate::aggregation::agg_req_with_accessor::get_aggs_with_accessor_and_validate; use crate::collector::{Collector, SegmentCollector}; -use crate::{SegmentReader, TantivyError}; +use crate::SegmentReader; /// Collector for aggregations. /// @@ -75,13 +75,7 @@ impl Collector for AggregationCollector { _segment_local_id: crate::SegmentOrdinal, reader: &crate::SegmentReader, ) -> crate::Result { - let aggs_with_accessor = get_aggs_with_accessor_and_validate(&self.agg, reader)?; - let result = - SegmentAggregationResultsCollector::from_req_and_validate(&aggs_with_accessor)?; - Ok(AggregationSegmentCollector { - aggs: aggs_with_accessor, - result, - }) + AggregationSegmentCollector::from_agg_req_and_reader(&self.agg, reader) } fn requires_scoring(&self) -> bool { @@ -92,7 +86,8 @@ impl Collector for AggregationCollector { &self, segment_fruits: Vec<::Fruit>, ) -> crate::Result { - merge_fruits(segment_fruits).map(|res| res.into()) + merge_fruits(segment_fruits) + .map(|res| AggregationResults::from_intermediate_and_req(res, self.agg.clone())) } } @@ -101,13 +96,11 @@ fn merge_fruits( ) -> crate::Result { if let Some(mut fruit) = segment_fruits.pop() { for next_fruit in segment_fruits { - fruit.merge_fruits(&next_fruit); + fruit.merge_fruits(next_fruit); } Ok(fruit) } else { - Err(TantivyError::InvalidArgument( - "no fruits provided in merge_fruits".to_string(), - )) + Ok(IntermediateAggregationResults::default()) } } diff --git a/src/aggregation/intermediate_agg_result.rs b/src/aggregation/intermediate_agg_result.rs index 7a205fb698..6c577944b9 100644 --- a/src/aggregation/intermediate_agg_result.rs +++ b/src/aggregation/intermediate_agg_result.rs @@ -2,43 +2,93 @@ //! Intermediate aggregation results can be used to merge results between segments or between //! indices. -use std::collections::HashMap; +use std::cmp::Ordering; +use fnv::FnvHashMap; +use itertools::Itertools; use serde::{Deserialize, Serialize}; +use super::agg_req::{AggregationsInternal, BucketAggregationType, MetricAggregation}; use super::metric::{IntermediateAverage, IntermediateStats}; use super::segment_agg_result::{ - SegmentAggregationResultsCollector, SegmentBucketResultCollector, SegmentMetricResultCollector, - SegmentRangeBucketEntry, + SegmentAggregationResultsCollector, SegmentBucketResultCollector, SegmentHistogramBucketEntry, + SegmentMetricResultCollector, SegmentRangeBucketEntry, }; use super::{Key, SerializedKey, VecWithNames}; /// Contains the intermediate aggregation result, which is optimized to be merged with other /// intermediate results. #[derive(Default, Clone, Debug, PartialEq, Serialize, Deserialize)] -pub struct IntermediateAggregationResults(pub(crate) VecWithNames); +pub struct IntermediateAggregationResults { + pub(crate) metrics: Option>, + pub(crate) buckets: Option>, +} impl From for IntermediateAggregationResults { fn from(tree: SegmentAggregationResultsCollector) -> Self { - let mut data = vec![]; - for (key, bucket) in tree.buckets.into_iter() { - data.push((key, IntermediateAggregationResult::Bucket(bucket.into()))); - } - for (key, metric) in tree.metrics.into_iter() { - data.push((key, IntermediateAggregationResult::Metric(metric.into()))); - } - Self(VecWithNames::from_entries(data)) + let metrics = tree.metrics.map(VecWithNames::from_other); + let buckets = tree.buckets.map(VecWithNames::from_other); + + Self { metrics, buckets } } } impl IntermediateAggregationResults { + pub(crate) fn empty_from_req(req: &AggregationsInternal) -> Self { + let metrics = if req.metrics.is_empty() { + None + } else { + let metrics = req + .metrics + .iter() + .map(|(key, req)| { + ( + key.to_string(), + IntermediateMetricResult::empty_from_req(req), + ) + }) + .collect(); + Some(VecWithNames::from_entries(metrics)) + }; + + let buckets = if req.buckets.is_empty() { + None + } else { + let buckets = req + .buckets + .iter() + .map(|(key, req)| { + ( + key.to_string(), + IntermediateBucketResult::empty_from_req(&req.bucket_agg), + ) + }) + .collect(); + Some(VecWithNames::from_entries(buckets)) + }; + + Self { metrics, buckets } + } + /// Merge an other intermediate aggregation result into this result. /// /// The order of the values need to be the same on both results. This is ensured when the same /// (key values) are present on the underlying VecWithNames struct. - pub fn merge_fruits(&mut self, other: &IntermediateAggregationResults) { - for (tree_left, tree_right) in self.0.values_mut().zip(other.0.values()) { - tree_left.merge_fruits(tree_right); + pub fn merge_fruits(&mut self, other: IntermediateAggregationResults) { + if let (Some(buckets_left), Some(buckets_right)) = (&mut self.buckets, other.buckets) { + for (bucket_left, bucket_right) in + buckets_left.values_mut().zip(buckets_right.into_values()) + { + bucket_left.merge_fruits(bucket_right); + } + } + + if let (Some(metrics_left), Some(metrics_right)) = (&mut self.metrics, other.metrics) { + for (metric_left, metric_right) in + metrics_left.values_mut().zip(metrics_right.into_values()) + { + metric_left.merge_fruits(metric_right); + } } } } @@ -52,28 +102,6 @@ pub enum IntermediateAggregationResult { Metric(IntermediateMetricResult), } -impl IntermediateAggregationResult { - fn merge_fruits(&mut self, other: &IntermediateAggregationResult) { - match (self, other) { - ( - IntermediateAggregationResult::Bucket(res_left), - IntermediateAggregationResult::Bucket(res_right), - ) => { - res_left.merge_fruits(res_right); - } - ( - IntermediateAggregationResult::Metric(res_left), - IntermediateAggregationResult::Metric(res_right), - ) => { - res_left.merge_fruits(res_right); - } - _ => { - panic!("incompatible types in aggregation tree on merge fruits"); - } - } - } -} - /// Holds the intermediate data for metric results #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub enum IntermediateMetricResult { @@ -97,7 +125,17 @@ impl From for IntermediateMetricResult { } impl IntermediateMetricResult { - fn merge_fruits(&mut self, other: &IntermediateMetricResult) { + pub(crate) fn empty_from_req(req: &MetricAggregation) -> Self { + match req { + MetricAggregation::Average(_) => { + IntermediateMetricResult::Average(IntermediateAverage::default()) + } + MetricAggregation::Stats(_) => { + IntermediateMetricResult::Stats(IntermediateStats::default()) + } + } + } + fn merge_fruits(&mut self, other: IntermediateMetricResult) { match (self, other) { ( IntermediateMetricResult::Average(avg_data_left), @@ -112,7 +150,7 @@ impl IntermediateMetricResult { stats_left.merge_fruits(stats_right); } _ => { - panic!("incompatible fruit types in tree {:?}", other); + panic!("incompatible fruit types in tree"); } } } @@ -124,36 +162,137 @@ impl IntermediateMetricResult { pub enum IntermediateBucketResult { /// This is the range entry for a bucket, which contains a key, count, from, to, and optionally /// sub_aggregations. - Range(HashMap), + Range(FnvHashMap), + /// This is the histogram entry for a bucket, which contains a key, count, and optionally + /// sub_aggregations. + Histogram { + /// The buckets + buckets: Vec, + }, } impl From for IntermediateBucketResult { fn from(collector: SegmentBucketResultCollector) -> Self { match collector { SegmentBucketResultCollector::Range(range) => range.into_intermediate_bucket_result(), + SegmentBucketResultCollector::Histogram(histogram) => { + histogram.into_intermediate_bucket_result() + } } } } impl IntermediateBucketResult { - fn merge_fruits(&mut self, other: &IntermediateBucketResult) { + pub(crate) fn empty_from_req(req: &BucketAggregationType) -> Self { + match req { + BucketAggregationType::Range(_) => IntermediateBucketResult::Range(Default::default()), + BucketAggregationType::Histogram(_) => { + IntermediateBucketResult::Histogram { buckets: vec![] } + } + } + } + fn merge_fruits(&mut self, other: IntermediateBucketResult) { match (self, other) { ( IntermediateBucketResult::Range(entries_left), IntermediateBucketResult::Range(entries_right), ) => { - for (name, entry_left) in entries_left.iter_mut() { - if let Some(entry_right) = entries_right.get(name) { - entry_left.merge_fruits(entry_right); - } - } - - for (key, res) in entries_right.iter() { - if !entries_left.contains_key(key) { - entries_left.insert(key.clone(), res.clone()); - } - } + merge_maps(entries_left, entries_right); + } + ( + IntermediateBucketResult::Histogram { + buckets: entries_left, + .. + }, + IntermediateBucketResult::Histogram { + buckets: entries_right, + .. + }, + ) => { + let mut buckets = entries_left + .drain(..) + .merge_join_by(entries_right.into_iter(), |left, right| { + left.key.partial_cmp(&right.key).unwrap_or(Ordering::Equal) + }) + .map(|either| match either { + itertools::EitherOrBoth::Both(mut left, right) => { + left.merge_fruits(right); + left + } + itertools::EitherOrBoth::Left(left) => left, + itertools::EitherOrBoth::Right(right) => right, + }) + .collect(); + + std::mem::swap(entries_left, &mut buckets); + } + (IntermediateBucketResult::Range(_), _) => { + panic!("try merge on different types") } + (IntermediateBucketResult::Histogram { .. }, _) => { + panic!("try merge on different types") + } + } + } +} + +trait MergeFruits { + fn merge_fruits(&mut self, other: Self); +} + +fn merge_maps( + entries_left: &mut FnvHashMap, + mut entries_right: FnvHashMap, +) { + for (name, entry_left) in entries_left.iter_mut() { + if let Some(entry_right) = entries_right.remove(name) { + entry_left.merge_fruits(entry_right); + } + } + + for (key, res) in entries_right.into_iter() { + entries_left.entry(key).or_insert(res); + } +} + +/// This is the histogram entry for a bucket, which contains a key, count, and optionally +/// sub_aggregations. +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +pub struct IntermediateHistogramBucketEntry { + /// The unique the bucket is identified. + pub key: f64, + /// The number of documents in the bucket. + pub doc_count: u64, + /// The sub_aggregation in this bucket. + pub sub_aggregation: IntermediateAggregationResults, +} + +impl From for IntermediateHistogramBucketEntry { + fn from(entry: SegmentHistogramBucketEntry) -> Self { + IntermediateHistogramBucketEntry { + key: entry.key, + doc_count: entry.doc_count, + sub_aggregation: Default::default(), + } + } +} + +impl + From<( + SegmentHistogramBucketEntry, + SegmentAggregationResultsCollector, + )> for IntermediateHistogramBucketEntry +{ + fn from( + entry: ( + SegmentHistogramBucketEntry, + SegmentAggregationResultsCollector, + ), + ) -> Self { + IntermediateHistogramBucketEntry { + key: entry.0.key, + doc_count: entry.0.doc_count, + sub_aggregation: entry.1.into(), } } } @@ -184,7 +323,6 @@ impl From for IntermediateRangeBucketEntry { } else { Default::default() }; - // let sub_aggregation = entry.sub_aggregation.into(); IntermediateRangeBucketEntry { key: entry.key, @@ -197,22 +335,31 @@ impl From for IntermediateRangeBucketEntry { } } -impl IntermediateRangeBucketEntry { - fn merge_fruits(&mut self, other: &IntermediateRangeBucketEntry) { +impl MergeFruits for IntermediateRangeBucketEntry { + fn merge_fruits(&mut self, other: IntermediateRangeBucketEntry) { self.doc_count += other.doc_count; - self.sub_aggregation.merge_fruits(&other.sub_aggregation); + self.sub_aggregation.merge_fruits(other.sub_aggregation); + } +} + +impl MergeFruits for IntermediateHistogramBucketEntry { + fn merge_fruits(&mut self, other: IntermediateHistogramBucketEntry) { + self.doc_count += other.doc_count; + self.sub_aggregation.merge_fruits(other.sub_aggregation); } } #[cfg(test)] mod tests { + use std::collections::HashMap; + use pretty_assertions::assert_eq; use super::*; fn get_sub_test_tree(data: &[(String, u64)]) -> IntermediateAggregationResults { let mut map = HashMap::new(); - let mut buckets = HashMap::new(); + let mut buckets = FnvHashMap::default(); for (key, doc_count) in data { buckets.insert( key.to_string(), @@ -228,14 +375,19 @@ mod tests { } map.insert( "my_agg_level2".to_string(), - IntermediateAggregationResult::Bucket(IntermediateBucketResult::Range(buckets)), + IntermediateBucketResult::Range(buckets), ); - IntermediateAggregationResults(VecWithNames::from_entries(map.into_iter().collect())) + IntermediateAggregationResults { + buckets: Some(VecWithNames::from_entries(map.into_iter().collect())), + metrics: Default::default(), + } } - fn get_test_tree(data: &[(String, u64, String, u64)]) -> IntermediateAggregationResults { + fn get_intermediat_tree_with_ranges( + data: &[(String, u64, String, u64)], + ) -> IntermediateAggregationResults { let mut map = HashMap::new(); - let mut buckets = HashMap::new(); + let mut buckets: FnvHashMap<_, _> = Default::default(); for (key, doc_count, sub_aggregation_key, sub_aggregation_count) in data { buckets.insert( key.to_string(), @@ -254,25 +406,28 @@ mod tests { } map.insert( "my_agg_level1".to_string(), - IntermediateAggregationResult::Bucket(IntermediateBucketResult::Range(buckets)), + IntermediateBucketResult::Range(buckets), ); - IntermediateAggregationResults(VecWithNames::from_entries(map.into_iter().collect())) + IntermediateAggregationResults { + buckets: Some(VecWithNames::from_entries(map.into_iter().collect())), + metrics: Default::default(), + } } #[test] fn test_merge_fruits_tree_1() { - let mut tree_left = get_test_tree(&[ + let mut tree_left = get_intermediat_tree_with_ranges(&[ ("red".to_string(), 50, "1900".to_string(), 25), ("blue".to_string(), 30, "1900".to_string(), 30), ]); - let tree_right = get_test_tree(&[ + let tree_right = get_intermediat_tree_with_ranges(&[ ("red".to_string(), 60, "1900".to_string(), 30), ("blue".to_string(), 25, "1900".to_string(), 50), ]); - tree_left.merge_fruits(&tree_right); + tree_left.merge_fruits(tree_right); - let tree_expected = get_test_tree(&[ + let tree_expected = get_intermediat_tree_with_ranges(&[ ("red".to_string(), 110, "1900".to_string(), 55), ("blue".to_string(), 55, "1900".to_string(), 80), ]); @@ -282,18 +437,18 @@ mod tests { #[test] fn test_merge_fruits_tree_2() { - let mut tree_left = get_test_tree(&[ + let mut tree_left = get_intermediat_tree_with_ranges(&[ ("red".to_string(), 50, "1900".to_string(), 25), ("blue".to_string(), 30, "1900".to_string(), 30), ]); - let tree_right = get_test_tree(&[ + let tree_right = get_intermediat_tree_with_ranges(&[ ("red".to_string(), 60, "1900".to_string(), 30), ("green".to_string(), 25, "1900".to_string(), 50), ]); - tree_left.merge_fruits(&tree_right); + tree_left.merge_fruits(tree_right); - let tree_expected = get_test_tree(&[ + let tree_expected = get_intermediat_tree_with_ranges(&[ ("red".to_string(), 110, "1900".to_string(), 55), ("blue".to_string(), 30, "1900".to_string(), 30), ("green".to_string(), 25, "1900".to_string(), 50), @@ -301,4 +456,18 @@ mod tests { assert_eq!(tree_left, tree_expected); } + + #[test] + fn test_merge_fruits_tree_empty() { + let mut tree_left = get_intermediat_tree_with_ranges(&[ + ("red".to_string(), 50, "1900".to_string(), 25), + ("blue".to_string(), 30, "1900".to_string(), 30), + ]); + + let orig = tree_left.clone(); + + tree_left.merge_fruits(IntermediateAggregationResults::default()); + + assert_eq!(tree_left, orig); + } } diff --git a/src/aggregation/metric/average.rs b/src/aggregation/metric/average.rs index e582947eb7..69f353824a 100644 --- a/src/aggregation/metric/average.rs +++ b/src/aggregation/metric/average.rs @@ -20,7 +20,7 @@ use crate::DocId; /// "field": "score", /// } /// } -/// ``` +/// ``` pub struct AverageAggregation { /// The field name to compute the stats on. pub field: String, @@ -94,7 +94,7 @@ impl IntermediateAverage { } /// Merge average data into this instance. - pub fn merge_fruits(&mut self, other: &IntermediateAverage) { + pub fn merge_fruits(&mut self, other: IntermediateAverage) { self.sum += other.sum; self.doc_count += other.doc_count; } diff --git a/src/aggregation/metric/stats.rs b/src/aggregation/metric/stats.rs index b596cd7a5e..d4c95c09b7 100644 --- a/src/aggregation/metric/stats.rs +++ b/src/aggregation/metric/stats.rs @@ -17,7 +17,7 @@ use crate::DocId; /// "field": "score", /// } /// } -/// ``` +/// ``` #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub struct StatsAggregation { @@ -62,9 +62,8 @@ pub struct IntermediateStats { min: f64, max: f64, } - -impl IntermediateStats { - fn new() -> Self { +impl Default for IntermediateStats { + fn default() -> Self { Self { count: 0, sum: 0.0, @@ -73,7 +72,9 @@ impl IntermediateStats { max: f64::MIN, } } +} +impl IntermediateStats { pub(crate) fn avg(&self) -> Option { if self.count == 0 { None @@ -92,7 +93,7 @@ impl IntermediateStats { } /// Merge data from other stats into this instance. - pub fn merge_fruits(&mut self, other: &IntermediateStats) { + pub fn merge_fruits(&mut self, other: IntermediateStats) { self.count += other.count; self.sum += other.sum; self.squared_sum += other.squared_sum; @@ -142,7 +143,7 @@ impl SegmentStatsCollector { pub fn from_req(field_type: Type) -> Self { Self { field_type, - stats: IntermediateStats::new(), + stats: IntermediateStats::default(), } } pub(crate) fn collect_block(&mut self, doc: &[DocId], field: &DynamicFastFieldReader) { @@ -182,12 +183,50 @@ mod tests { }; use crate::aggregation::agg_result::AggregationResults; use crate::aggregation::metric::StatsAggregation; - use crate::aggregation::tests::get_test_index_2_segments; + use crate::aggregation::tests::{get_test_index_2_segments, get_test_index_from_values}; use crate::aggregation::AggregationCollector; - use crate::query::TermQuery; + use crate::query::{AllQuery, TermQuery}; use crate::schema::IndexRecordOption; use crate::Term; + #[test] + fn test_aggregation_stats_empty_index() -> crate::Result<()> { + // test index without segments + let values = vec![]; + + let index = get_test_index_from_values(false, &values)?; + + let agg_req_1: Aggregations = vec![( + "stats".to_string(), + Aggregation::Metric(MetricAggregation::Stats(StatsAggregation::from_field_name( + "score".to_string(), + ))), + )] + .into_iter() + .collect(); + + let collector = AggregationCollector::from_aggs(agg_req_1); + + let reader = index.reader()?; + let searcher = reader.searcher(); + let agg_res: AggregationResults = searcher.search(&AllQuery, &collector).unwrap(); + + let res: Value = serde_json::from_str(&serde_json::to_string(&agg_res)?)?; + assert_eq!( + res["stats"], + json!({ + "avg": Value::Null, + "count": 0, + "max": Value::Null, + "min": Value::Null, + "standard_deviation": Value::Null, + "sum": 0.0 + }) + ); + + Ok(()) + } + #[test] fn test_aggregation_stats() -> crate::Result<()> { let index = get_test_index_2_segments(false)?; diff --git a/src/aggregation/mod.rs b/src/aggregation/mod.rs index d4f77fa946..697afc428e 100644 --- a/src/aggregation/mod.rs +++ b/src/aggregation/mod.rs @@ -18,6 +18,10 @@ //! Create an [AggregationCollector] from this request. AggregationCollector implements the //! `Collector` trait and can be passed as collector into `searcher.search()`. //! +//! #### Limitations +//! +//! Currently aggregations work only on single value fast fields of type u64, f64 and i64. +//! //! # JSON Format //! Aggregations request and result structures de/serialize into elasticsearch compatible JSON. //! @@ -28,9 +32,14 @@ //! let agg_res = searcher.search(&term_query, &collector).unwrap_err(); //! let json_response_string: String = &serde_json::to_string(&agg_res)?; //! ``` -//! # Limitations //! -//! Currently aggregations work only on single value fast fields of type u64, f64 and i64. +//! # Supported Aggregations +//! - [Bucket](bucket) +//! - [Histogram](bucket::HistogramAggregation) +//! - [Range](bucket::RangeAggregation) +//! - [Metric](metric) +//! - [Average](metric::AverageAggregation) +//! - [Stats](metric::StatsAggregation) //! //! # Example //! Compute the average metric, by building [agg_req::Aggregations], which is built from an (String, @@ -90,7 +99,8 @@ //! } //! } //! "#; -//! let agg_req: Aggregations = serde_json::from_str(elasticsearch_compatible_json_req).unwrap(); +//! let agg_req: Aggregations = +//! serde_json::from_str(elasticsearch_compatible_json_req).unwrap(); //! ``` //! # Code Organization //! @@ -101,8 +111,7 @@ //! Buckets can contain sub-aggregations. In this example we create buckets with the range //! aggregation and then calculate the average on each bucket. //! ``` -//! use tantivy::aggregation::agg_req::{Aggregations, Aggregation, BucketAggregation, -//! MetricAggregation, BucketAggregationType}; +//! use tantivy::aggregation::agg_req::*; //! use tantivy::aggregation::metric::AverageAggregation; //! use tantivy::aggregation::bucket::RangeAggregation; //! let sub_agg_req_1: Aggregations = vec![( @@ -167,6 +176,7 @@ pub(crate) struct VecWithNames { values: Vec, keys: Vec, } + impl Default for VecWithNames { fn default() -> Self { Self { @@ -189,6 +199,14 @@ impl From> for VecWithNames { } impl VecWithNames { + fn from_other>(entries: VecWithNames) -> Self { + let values = entries.values.into_iter().map(Into::into).collect(); + Self { + keys: entries.keys, + values, + } + } + fn from_entries(mut entries: Vec<(String, T)>) -> Self { // Sort to ensure order of elements match across multiple instances entries.sort_by(|left, right| left.0.cmp(&right.0)); @@ -212,6 +230,9 @@ impl VecWithNames { fn keys(&self) -> impl Iterator + '_ { self.keys.iter().map(|key| key.as_str()) } + fn into_values(self) -> impl Iterator { + self.values.into_iter() + } fn values(&self) -> impl Iterator + '_ { self.values.iter() } @@ -310,6 +331,16 @@ mod tests { pub fn get_test_index_with_num_docs( merge_segments: bool, num_docs: usize, + ) -> crate::Result { + get_test_index_from_values( + merge_segments, + &(0..num_docs).map(|el| el as f64).collect::>(), + ) + } + + pub fn get_test_index_from_values( + merge_segments: bool, + values: &[f64], ) -> crate::Result { let mut schema_builder = Schema::builder(); let text_fieldtype = crate::schema::TextOptions::default() @@ -332,7 +363,7 @@ mod tests { let index = Index::create_in_ram(schema_builder.build()); { let mut index_writer = index.writer_for_tests()?; - for i in 0..num_docs { + for &i in values { // writing the segment index_writer.add_document(doc!( text_field => "cool", @@ -341,9 +372,8 @@ mod tests { score_field_i64 => i as i64, fraction_field => i as f64/100.0, ))?; + index_writer.commit()?; } - - index_writer.commit()?; } if merge_segments { let segment_ids = index @@ -362,7 +392,7 @@ mod tests { merge_segments: bool, use_distributed_collector: bool, ) -> crate::Result<()> { - let index = get_test_index_with_num_docs(merge_segments, 300)?; + let index = get_test_index_with_num_docs(merge_segments, 80)?; let reader = index.reader()?; let text_field = reader.searcher().schema().get_field("text").unwrap(); @@ -372,12 +402,12 @@ mod tests { IndexRecordOption::Basic, ); - assert_eq!(DOC_BLOCK_SIZE, 256); + assert_eq!(DOC_BLOCK_SIZE, 64); // In the tree we cache Documents of DOC_BLOCK_SIZE, before passing them down as one block. // // Build a request so that on the first level we have one full cache, which is then flushed. - // The same cache should have some residue docs at the end, which are flushed (Range 0-266) - // -> 266 docs + // The same cache should have some residue docs at the end, which are flushed (Range 0-70) + // -> 70 docs // // The second level should also have some residue docs in the cache that are flushed at the // end. @@ -385,33 +415,51 @@ mod tests { // A second bucket on the first level should have the cache unfilled // let elasticsearch_compatible_json_req = r#" - let elasticsearch_compatible_json_req = r#" + let elasticsearch_compatible_json = json!( { "bucketsL1": { "range": { "field": "score", - "ranges": [ { "to": 3.0 }, { "from": 3.0, "to": 266.0 }, { "from": 266.0 } ] + "ranges": [ { "to": 3.0f64 }, { "from": 3.0f64, "to": 70.0f64 }, { "from": 70.0f64 } ] }, "aggs": { "bucketsL2": { "range": { "field": "score", - "ranges": [ { "to": 100.0 }, { "from": 100.0, "to": 266.0 }, { "from": 266.0 } ] + "ranges": [ { "to": 30.0f64 }, { "from": 30.0f64, "to": 70.0f64 }, { "from": 70.0f64 } ] + } + } + } + }, + "histogram_test":{ + "histogram": { + "field": "score", + "interval": 70.0, + "offset": 3.0, + }, + "aggs": { + "bucketsL2": { + "histogram": { + "field": "score", + "interval": 70.0 } } } } - } - "#; + }); let agg_req: Aggregations = - serde_json::from_str(elasticsearch_compatible_json_req).unwrap(); + serde_json::from_str(&serde_json::to_string(&elasticsearch_compatible_json).unwrap()) + .unwrap(); let agg_res: AggregationResults = if use_distributed_collector { - let collector = DistributedAggregationCollector::from_aggs(agg_req); + let collector = DistributedAggregationCollector::from_aggs(agg_req.clone()); let searcher = reader.searcher(); - searcher.search(&term_query, &collector).unwrap().into() + AggregationResults::from_intermediate_and_req( + searcher.search(&term_query, &collector).unwrap(), + agg_req, + ) } else { let collector = AggregationCollector::from_aggs(agg_req); @@ -426,15 +474,15 @@ mod tests { res["bucketsL1"]["buckets"][0]["bucketsL2"]["buckets"][0]["doc_count"], 3 ); - assert_eq!(res["bucketsL1"]["buckets"][1]["key"], "3-266"); - assert_eq!(res["bucketsL1"]["buckets"][1]["doc_count"], 266 - 3); + assert_eq!(res["bucketsL1"]["buckets"][1]["key"], "3-70"); + assert_eq!(res["bucketsL1"]["buckets"][1]["doc_count"], 70 - 3); assert_eq!( res["bucketsL1"]["buckets"][1]["bucketsL2"]["buckets"][0]["doc_count"], - 97 + 27 ); assert_eq!( res["bucketsL1"]["buckets"][1]["bucketsL2"]["buckets"][1]["doc_count"], - 166 + 40 ); assert_eq!( res["bucketsL1"]["buckets"][1]["bucketsL2"]["buckets"][2]["doc_count"], @@ -442,9 +490,9 @@ mod tests { ); assert_eq!( res["bucketsL1"]["buckets"][2]["bucketsL2"]["buckets"][2]["doc_count"], - 300 - 266 + 80 - 70 ); - assert_eq!(res["bucketsL1"]["buckets"][2]["doc_count"], 300 - 266); + assert_eq!(res["bucketsL1"]["buckets"][2]["doc_count"], 80 - 70); Ok(()) } @@ -790,7 +838,7 @@ mod tests { // Test de/serialization roundtrip on intermediate_agg_result let res: IntermediateAggregationResults = serde_json::from_str(&serde_json::to_string(&res).unwrap()).unwrap(); - res.into() + AggregationResults::from_intermediate_and_req(res, agg_req.clone()) } else { let collector = AggregationCollector::from_aggs(agg_req.clone()); @@ -950,6 +998,7 @@ mod tests { use test::{self, Bencher}; use super::*; + use crate::aggregation::bucket::{HistogramAggregation, HistogramBounds}; use crate::aggregation::metric::StatsAggregation; use crate::query::AllQuery; @@ -1165,6 +1214,110 @@ mod tests { }); } + // hard bounds has a different algorithm, because it actually limits collection range + #[bench] + fn bench_aggregation_histogram_only_hard_bounds(b: &mut Bencher) { + let index = get_test_index_bench(false).unwrap(); + let reader = index.reader().unwrap(); + + b.iter(|| { + let agg_req_1: Aggregations = vec![( + "rangef64".to_string(), + Aggregation::Bucket(BucketAggregation { + bucket_agg: BucketAggregationType::Histogram(HistogramAggregation { + field: "score_f64".to_string(), + interval: 100f64, + hard_bounds: Some(HistogramBounds { + min: 1000.0, + max: 300_000.0, + }), + ..Default::default() + }), + sub_aggregation: Default::default(), + }), + )] + .into_iter() + .collect(); + + let collector = AggregationCollector::from_aggs(agg_req_1); + + let searcher = reader.searcher(); + let agg_res: AggregationResults = + searcher.search(&AllQuery, &collector).unwrap().into(); + + agg_res + }); + } + + #[bench] + fn bench_aggregation_histogram_with_avg(b: &mut Bencher) { + let index = get_test_index_bench(false).unwrap(); + let reader = index.reader().unwrap(); + + b.iter(|| { + let sub_agg_req: Aggregations = vec![( + "average_f64".to_string(), + Aggregation::Metric(MetricAggregation::Average( + AverageAggregation::from_field_name("score_f64".to_string()), + )), + )] + .into_iter() + .collect(); + + let agg_req_1: Aggregations = vec![( + "rangef64".to_string(), + Aggregation::Bucket(BucketAggregation { + bucket_agg: BucketAggregationType::Histogram(HistogramAggregation { + field: "score_f64".to_string(), + interval: 100f64, // 1000 buckets + ..Default::default() + }), + sub_aggregation: sub_agg_req, + }), + )] + .into_iter() + .collect(); + + let collector = AggregationCollector::from_aggs(agg_req_1); + + let searcher = reader.searcher(); + let agg_res: AggregationResults = + searcher.search(&AllQuery, &collector).unwrap().into(); + + agg_res + }); + } + + #[bench] + fn bench_aggregation_histogram_only(b: &mut Bencher) { + let index = get_test_index_bench(false).unwrap(); + let reader = index.reader().unwrap(); + + b.iter(|| { + let agg_req_1: Aggregations = vec![( + "rangef64".to_string(), + Aggregation::Bucket(BucketAggregation { + bucket_agg: BucketAggregationType::Histogram(HistogramAggregation { + field: "score_f64".to_string(), + interval: 100f64, // 1000 buckets + ..Default::default() + }), + sub_aggregation: Default::default(), + }), + )] + .into_iter() + .collect(); + + let collector = AggregationCollector::from_aggs(agg_req_1); + + let searcher = reader.searcher(); + let agg_res: AggregationResults = + searcher.search(&AllQuery, &collector).unwrap().into(); + + agg_res + }); + } + #[bench] fn bench_aggregation_sub_tree(b: &mut Bencher) { let index = get_test_index_bench(false).unwrap(); diff --git a/src/aggregation/segment_agg_result.rs b/src/aggregation/segment_agg_result.rs index 3b17bfdbd9..0064546a38 100644 --- a/src/aggregation/segment_agg_result.rs +++ b/src/aggregation/segment_agg_result.rs @@ -9,7 +9,7 @@ use super::agg_req::MetricAggregation; use super::agg_req_with_accessor::{ AggregationsWithAccessor, BucketAggregationWithAccessor, MetricAggregationWithAccessor, }; -use super::bucket::SegmentRangeCollector; +use super::bucket::{SegmentHistogramCollector, SegmentRangeCollector}; use super::metric::{ AverageAggregation, SegmentAverageCollector, SegmentStatsCollector, StatsAggregation, }; @@ -17,13 +17,13 @@ use super::{Key, VecWithNames}; use crate::aggregation::agg_req::BucketAggregationType; use crate::DocId; -pub(crate) const DOC_BLOCK_SIZE: usize = 256; +pub(crate) const DOC_BLOCK_SIZE: usize = 64; pub(crate) type DocBlock = [DocId; DOC_BLOCK_SIZE]; #[derive(Clone, PartialEq)] pub(crate) struct SegmentAggregationResultsCollector { - pub(crate) metrics: VecWithNames, - pub(crate) buckets: VecWithNames, + pub(crate) metrics: Option>, + pub(crate) buckets: Option>, staged_docs: DocBlock, num_staged_docs: usize, } @@ -50,7 +50,7 @@ impl SegmentAggregationResultsCollector { SegmentBucketResultCollector::from_req_and_validate(req)?, )) }) - .collect::>()?; + .collect::>>()?; let metrics = req .metrics .entries() @@ -60,10 +60,20 @@ impl SegmentAggregationResultsCollector { SegmentMetricResultCollector::from_req_and_validate(req)?, )) }) - .collect::>()?; + .collect::>>()?; + let metrics = if metrics.is_empty() { + None + } else { + Some(VecWithNames::from_entries(metrics)) + }; + let buckets = if buckets.is_empty() { + None + } else { + Some(VecWithNames::from_entries(buckets)) + }; Ok(SegmentAggregationResultsCollector { - metrics: VecWithNames::from_entries(metrics), - buckets: VecWithNames::from_entries(buckets), + metrics, + buckets, staged_docs: [0; DOC_BLOCK_SIZE], num_staged_docs: 0, }) @@ -82,29 +92,30 @@ impl SegmentAggregationResultsCollector { } } - #[inline(never)] pub(crate) fn flush_staged_docs( &mut self, agg_with_accessor: &AggregationsWithAccessor, force_flush: bool, ) { - for (agg_with_accessor, collector) in agg_with_accessor - .metrics - .values() - .zip(self.metrics.values_mut()) - { - collector.collect_block(&self.staged_docs[..self.num_staged_docs], agg_with_accessor); + if let Some(metrics) = &mut self.metrics { + for (collector, agg_with_accessor) in + metrics.values_mut().zip(agg_with_accessor.metrics.values()) + { + collector + .collect_block(&self.staged_docs[..self.num_staged_docs], agg_with_accessor); + } } - for (agg_with_accessor, collector) in agg_with_accessor - .buckets - .values() - .zip(self.buckets.values_mut()) - { - collector.collect_block( - &self.staged_docs[..self.num_staged_docs], - agg_with_accessor, - force_flush, - ); + + if let Some(buckets) = &mut self.buckets { + for (collector, agg_with_accessor) in + buckets.values_mut().zip(agg_with_accessor.buckets.values()) + { + collector.collect_block( + &self.staged_docs[..self.num_staged_docs], + agg_with_accessor, + force_flush, + ); + } } self.num_staged_docs = 0; @@ -151,6 +162,7 @@ impl SegmentMetricResultCollector { #[derive(Clone, Debug, PartialEq)] pub(crate) enum SegmentBucketResultCollector { Range(SegmentRangeCollector), + Histogram(SegmentHistogramCollector), } impl SegmentBucketResultCollector { @@ -163,6 +175,14 @@ impl SegmentBucketResultCollector { req.field_type, )?)) } + BucketAggregationType::Histogram(histogram) => Ok(Self::Histogram( + SegmentHistogramCollector::from_req_and_validate( + histogram, + &req.sub_aggregation, + req.field_type, + &req.accessor, + )?, + )), } } @@ -177,10 +197,19 @@ impl SegmentBucketResultCollector { SegmentBucketResultCollector::Range(range) => { range.collect_block(doc, bucket_with_accessor, force_flush); } + SegmentBucketResultCollector::Histogram(histogram) => { + histogram.collect_block(doc, bucket_with_accessor, force_flush) + } } } } +#[derive(Clone, Debug, PartialEq)] +pub(crate) struct SegmentHistogramBucketEntry { + pub key: f64, + pub doc_count: u64, +} + #[derive(Clone, PartialEq)] pub(crate) struct SegmentRangeBucketEntry { pub key: Key,