diff --git a/src/aggregation/agg_result.rs b/src/aggregation/agg_result.rs index 3dba73d1a6..fc614990b4 100644 --- a/src/aggregation/agg_result.rs +++ b/src/aggregation/agg_result.rs @@ -4,21 +4,15 @@ //! intermediate average results, which is the sum and the number of values. The actual average is //! calculated on the step from intermediate to final aggregation result tree. -use std::cmp::Ordering; use std::collections::HashMap; use serde::{Deserialize, Serialize}; -use super::agg_req::{ - Aggregations, AggregationsInternal, BucketAggregationInternal, MetricAggregation, -}; -use super::bucket::{intermediate_buckets_to_final_buckets, GetDocCount}; -use super::intermediate_agg_result::{ - IntermediateAggregationResults, IntermediateBucketResult, IntermediateHistogramBucketEntry, - IntermediateMetricResult, IntermediateRangeBucketEntry, -}; +use super::agg_req::BucketAggregationInternal; +use super::bucket::GetDocCount; +use super::intermediate_agg_result::{IntermediateBucketResult, IntermediateMetricResult}; use super::metric::{SingleMetricResult, Stats}; -use super::{Key, VecWithNames}; +use super::Key; use crate::TantivyError; #[derive(Clone, Default, Debug, PartialEq, Serialize, Deserialize)] @@ -41,98 +35,6 @@ impl AggregationResults { ))) } } - - /// Convert and intermediate result and its aggregation request to the final result - pub fn from_intermediate_and_req( - results: IntermediateAggregationResults, - agg: Aggregations, - ) -> crate::Result { - AggregationResults::from_intermediate_and_req_internal(results, &(agg.into())) - } - - /// Convert and intermediate result and its aggregation request to the final result - /// - /// Internal function, CollectorAggregations is used instead Aggregations, which is optimized - /// for internal processing, by splitting metric and buckets into seperate groups. - pub(crate) fn from_intermediate_and_req_internal( - intermediate_results: IntermediateAggregationResults, - req: &AggregationsInternal, - ) -> crate::Result { - // Important assumption: - // When the tree contains buckets/metric, we expect it to have all buckets/metrics from the - // request - let mut results: HashMap = HashMap::new(); - - if let Some(buckets) = intermediate_results.buckets { - add_coverted_final_buckets_to_result(&mut results, buckets, &req.buckets)? - } else { - // When there are no buckets, we create empty buckets, so that the serialized json - // format is constant - add_empty_final_buckets_to_result(&mut results, &req.buckets)? - }; - - if let Some(metrics) = intermediate_results.metrics { - add_converted_final_metrics_to_result(&mut results, metrics); - } else { - // When there are no metrics, we create empty metric results, so that the serialized - // json format is constant - add_empty_final_metrics_to_result(&mut results, &req.metrics)?; - } - Ok(Self(results)) - } -} - -fn add_converted_final_metrics_to_result( - results: &mut HashMap, - metrics: VecWithNames, -) { - results.extend( - metrics - .into_iter() - .map(|(key, metric)| (key, AggregationResult::MetricResult(metric.into()))), - ); -} - -fn add_empty_final_metrics_to_result( - results: &mut HashMap, - req_metrics: &VecWithNames, -) -> crate::Result<()> { - results.extend(req_metrics.iter().map(|(key, req)| { - let empty_bucket = IntermediateMetricResult::empty_from_req(req); - ( - key.to_string(), - AggregationResult::MetricResult(empty_bucket.into()), - ) - })); - Ok(()) -} - -fn add_empty_final_buckets_to_result( - results: &mut HashMap, - req_buckets: &VecWithNames, -) -> crate::Result<()> { - let requested_buckets = req_buckets.iter(); - for (key, req) in requested_buckets { - let empty_bucket = AggregationResult::BucketResult(BucketResult::empty_from_req(req)?); - results.insert(key.to_string(), empty_bucket); - } - Ok(()) -} - -fn add_coverted_final_buckets_to_result( - results: &mut HashMap, - buckets: VecWithNames, - req_buckets: &VecWithNames, -) -> crate::Result<()> { - assert_eq!(buckets.len(), req_buckets.len()); - - let buckets_with_request = buckets.into_iter().zip(req_buckets.values()); - for ((key, bucket), req) in buckets_with_request { - let result = - AggregationResult::BucketResult(BucketResult::from_intermediate_and_req(bucket, req)?); - results.insert(key, result); - } - Ok(()) } #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] @@ -154,7 +56,8 @@ impl AggregationResult { match self { AggregationResult::BucketResult(_bucket) => Err(TantivyError::InternalError( "Tried to retrieve value from bucket aggregation. This is not supported and \ - should not happen during collection, but should be catched during validation" + should not happen during collection phase, but should be catched during \ + validation" .to_string(), )), AggregationResult::MetricResult(metric) => metric.get_value(agg_property), @@ -230,48 +133,7 @@ pub enum BucketResult { impl BucketResult { pub(crate) fn empty_from_req(req: &BucketAggregationInternal) -> crate::Result { let empty_bucket = IntermediateBucketResult::empty_from_req(&req.bucket_agg); - BucketResult::from_intermediate_and_req(empty_bucket, req) - } - - fn from_intermediate_and_req( - bucket_result: IntermediateBucketResult, - req: &BucketAggregationInternal, - ) -> crate::Result { - match bucket_result { - IntermediateBucketResult::Range(range_res) => { - let mut buckets: Vec = range_res - .buckets - .into_iter() - .map(|(_, bucket)| { - RangeBucketEntry::from_intermediate_and_req(bucket, &req.sub_aggregation) - }) - .collect::>>()?; - - buckets.sort_by(|left, right| { - // TODO use total_cmp next stable rust release - left.from - .unwrap_or(f64::MIN) - .partial_cmp(&right.from.unwrap_or(f64::MIN)) - .unwrap_or(Ordering::Equal) - }); - Ok(BucketResult::Range { buckets }) - } - IntermediateBucketResult::Histogram { buckets } => { - let buckets = intermediate_buckets_to_final_buckets( - buckets, - req.as_histogram() - .expect("unexpected aggregation, expected histogram aggregation"), - &req.sub_aggregation, - )?; - - Ok(BucketResult::Histogram { buckets }) - } - IntermediateBucketResult::Terms(terms) => terms.into_final_result( - req.as_term() - .expect("unexpected aggregation, expected term aggregation"), - &req.sub_aggregation, - ), - } + empty_bucket.into_final_bucket_result(req) } } @@ -311,22 +173,6 @@ pub struct BucketEntry { /// Sub-aggregations in this bucket. pub sub_aggregation: AggregationResults, } - -impl BucketEntry { - pub(crate) fn from_intermediate_and_req( - entry: IntermediateHistogramBucketEntry, - req: &AggregationsInternal, - ) -> crate::Result { - Ok(BucketEntry { - key: Key::F64(entry.key), - doc_count: entry.doc_count, - sub_aggregation: AggregationResults::from_intermediate_and_req_internal( - entry.sub_aggregation, - req, - )?, - }) - } -} impl GetDocCount for &BucketEntry { fn doc_count(&self) -> u64 { self.doc_count @@ -384,21 +230,3 @@ pub struct RangeBucketEntry { #[serde(skip_serializing_if = "Option::is_none")] pub to: Option, } - -impl RangeBucketEntry { - fn from_intermediate_and_req( - entry: IntermediateRangeBucketEntry, - req: &AggregationsInternal, - ) -> crate::Result { - Ok(RangeBucketEntry { - key: entry.key, - doc_count: entry.doc_count, - sub_aggregation: AggregationResults::from_intermediate_and_req_internal( - entry.sub_aggregation, - req, - )?, - to: entry.to, - from: entry.from, - }) - } -} diff --git a/src/aggregation/bucket/histogram/histogram.rs b/src/aggregation/bucket/histogram/histogram.rs index a2a4a87e59..0d5f5574c1 100644 --- a/src/aggregation/bucket/histogram/histogram.rs +++ b/src/aggregation/bucket/histogram/histogram.rs @@ -482,14 +482,12 @@ fn intermediate_buckets_to_final_buckets_fill_gaps( sub_aggregation: empty_sub_aggregation.clone(), }, }) - .map(|intermediate_bucket| { - BucketEntry::from_intermediate_and_req(intermediate_bucket, sub_aggregation) - }) + .map(|intermediate_bucket| intermediate_bucket.into_final_bucket_entry(sub_aggregation)) .collect::>>() } // Convert to BucketEntry -pub(crate) fn intermediate_buckets_to_final_buckets( +pub(crate) fn intermediate_histogram_buckets_to_final_buckets( buckets: Vec, histogram_req: &HistogramAggregation, sub_aggregation: &AggregationsInternal, @@ -503,8 +501,8 @@ pub(crate) fn intermediate_buckets_to_final_buckets( } else { buckets .into_iter() - .filter(|bucket| bucket.doc_count >= histogram_req.min_doc_count()) - .map(|bucket| BucketEntry::from_intermediate_and_req(bucket, sub_aggregation)) + .filter(|histogram_bucket| histogram_bucket.doc_count >= histogram_req.min_doc_count()) + .map(|histogram_bucket| histogram_bucket.into_final_bucket_entry(sub_aggregation)) .collect::>>() } } @@ -546,7 +544,7 @@ pub(crate) fn generate_buckets_with_opt_minmax( let offset = req.offset.unwrap_or(0.0); let first_bucket_num = get_bucket_num_f64(min, req.interval, offset) as i64; let last_bucket_num = get_bucket_num_f64(max, req.interval, offset) as i64; - let mut buckets = vec![]; + let mut buckets = Vec::with_capacity((first_bucket_num..=last_bucket_num).count()); for bucket_pos in first_bucket_num..=last_bucket_num { let bucket_key = bucket_pos as f64 * req.interval + offset; buckets.push(bucket_key); diff --git a/src/aggregation/bucket/range.rs b/src/aggregation/bucket/range.rs index d570e96a37..69206b1100 100644 --- a/src/aggregation/bucket/range.rs +++ b/src/aggregation/bucket/range.rs @@ -317,7 +317,7 @@ fn to_u64_range(range: &RangeAggregationRange, field_type: &Type) -> crate::Resu } /// Extends the provided buckets to contain the whole value range, by inserting buckets at the -/// beginning and end. +/// beginning and end and filling gaps. fn extend_validate_ranges( buckets: &[RangeAggregationRange], field_type: &Type, diff --git a/src/aggregation/collector.rs b/src/aggregation/collector.rs index f35a5e3e17..47cd94e6c9 100644 --- a/src/aggregation/collector.rs +++ b/src/aggregation/collector.rs @@ -87,7 +87,7 @@ impl Collector for AggregationCollector { segment_fruits: Vec<::Fruit>, ) -> crate::Result { let res = merge_fruits(segment_fruits)?; - AggregationResults::from_intermediate_and_req(res, self.agg.clone()) + res.into_final_bucket_result(self.agg.clone()) } } diff --git a/src/aggregation/intermediate_agg_result.rs b/src/aggregation/intermediate_agg_result.rs index 9bde00707e..cb2f9f416c 100644 --- a/src/aggregation/intermediate_agg_result.rs +++ b/src/aggregation/intermediate_agg_result.rs @@ -3,16 +3,20 @@ //! indices. use std::cmp::Ordering; +use std::collections::HashMap; use fnv::FnvHashMap; use itertools::Itertools; use serde::{Deserialize, Serialize}; -use super::agg_req::{AggregationsInternal, BucketAggregationType, MetricAggregation}; -use super::agg_result::BucketResult; +use super::agg_req::{ + Aggregations, AggregationsInternal, BucketAggregationInternal, BucketAggregationType, + MetricAggregation, +}; +use super::agg_result::{AggregationResult, BucketResult, RangeBucketEntry}; use super::bucket::{ - cut_off_buckets, get_agg_name_and_property, GetDocCount, Order, OrderTarget, - SegmentHistogramBucketEntry, TermsAggregation, + cut_off_buckets, get_agg_name_and_property, intermediate_histogram_buckets_to_final_buckets, + GetDocCount, Order, OrderTarget, SegmentHistogramBucketEntry, TermsAggregation, }; use super::metric::{IntermediateAverage, IntermediateStats}; use super::segment_agg_result::SegmentMetricResultCollector; @@ -31,6 +35,46 @@ pub struct IntermediateAggregationResults { } impl IntermediateAggregationResults { + /// Convert and intermediate result and its aggregation request to the final result + pub(crate) fn into_final_bucket_result( + self, + req: Aggregations, + ) -> crate::Result { + self.into_final_bucket_result_internal(&(req.into())) + } + + /// Convert and intermediate result and its aggregation request to the final result + /// + /// Internal function, AggregationsInternal is used instead Aggregations, which is optimized + /// for internal processing, by splitting metric and buckets into seperate groups. + pub(crate) fn into_final_bucket_result_internal( + self, + req: &AggregationsInternal, + ) -> crate::Result { + // Important assumption: + // When the tree contains buckets/metric, we expect it to have all buckets/metrics from the + // request + let mut results: HashMap = HashMap::new(); + + if let Some(buckets) = self.buckets { + convert_and_add_final_buckets_to_result(&mut results, buckets, &req.buckets)? + } else { + // When there are no buckets, we create empty buckets, so that the serialized json + // format is constant + add_empty_final_buckets_to_result(&mut results, &req.buckets)? + }; + + if let Some(metrics) = self.metrics { + convert_and_add_final_metrics_to_result(&mut results, metrics); + } else { + // When there are no metrics, we create empty metric results, so that the serialized + // json format is constant + add_empty_final_metrics_to_result(&mut results, &req.metrics)?; + } + + Ok(AggregationResults(results)) + } + pub(crate) fn empty_from_req(req: &AggregationsInternal) -> Self { let metrics = if req.metrics.is_empty() { None @@ -90,6 +134,58 @@ impl IntermediateAggregationResults { } } +fn convert_and_add_final_metrics_to_result( + results: &mut HashMap, + metrics: VecWithNames, +) { + results.extend( + metrics + .into_iter() + .map(|(key, metric)| (key, AggregationResult::MetricResult(metric.into()))), + ); +} + +fn add_empty_final_metrics_to_result( + results: &mut HashMap, + req_metrics: &VecWithNames, +) -> crate::Result<()> { + results.extend(req_metrics.iter().map(|(key, req)| { + let empty_bucket = IntermediateMetricResult::empty_from_req(req); + ( + key.to_string(), + AggregationResult::MetricResult(empty_bucket.into()), + ) + })); + Ok(()) +} + +fn add_empty_final_buckets_to_result( + results: &mut HashMap, + req_buckets: &VecWithNames, +) -> crate::Result<()> { + let requested_buckets = req_buckets.iter(); + for (key, req) in requested_buckets { + let empty_bucket = AggregationResult::BucketResult(BucketResult::empty_from_req(req)?); + results.insert(key.to_string(), empty_bucket); + } + Ok(()) +} + +fn convert_and_add_final_buckets_to_result( + results: &mut HashMap, + buckets: VecWithNames, + req_buckets: &VecWithNames, +) -> crate::Result<()> { + assert_eq!(buckets.len(), req_buckets.len()); + + let buckets_with_request = buckets.into_iter().zip(req_buckets.values()); + for ((key, bucket), req) in buckets_with_request { + let result = AggregationResult::BucketResult(bucket.into_final_bucket_result(req)?); + results.insert(key, result); + } + Ok(()) +} + /// An aggregation is either a bucket or a metric. #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub enum IntermediateAggregationResult { @@ -171,6 +267,45 @@ pub enum IntermediateBucketResult { } impl IntermediateBucketResult { + pub(crate) fn into_final_bucket_result( + self, + req: &BucketAggregationInternal, + ) -> crate::Result { + match self { + IntermediateBucketResult::Range(range_res) => { + let mut buckets: Vec = range_res + .buckets + .into_iter() + .map(|(_, bucket)| bucket.into_final_bucket_entry(&req.sub_aggregation)) + .collect::>>()?; + + buckets.sort_by(|left, right| { + // TODO use total_cmp next stable rust release + left.from + .unwrap_or(f64::MIN) + .partial_cmp(&right.from.unwrap_or(f64::MIN)) + .unwrap_or(Ordering::Equal) + }); + Ok(BucketResult::Range { buckets }) + } + IntermediateBucketResult::Histogram { buckets } => { + let buckets = intermediate_histogram_buckets_to_final_buckets( + buckets, + req.as_histogram() + .expect("unexpected aggregation, expected histogram aggregation"), + &req.sub_aggregation, + )?; + + Ok(BucketResult::Histogram { buckets }) + } + IntermediateBucketResult::Terms(terms) => terms.into_final_result( + req.as_term() + .expect("unexpected aggregation, expected term aggregation"), + &req.sub_aggregation, + ), + } + } + pub(crate) fn empty_from_req(req: &BucketAggregationType) -> Self { match req { BucketAggregationType::Terms(_) => IntermediateBucketResult::Terms(Default::default()), @@ -267,10 +402,9 @@ impl IntermediateTermBucketResult { Ok(BucketEntry { key: Key::Str(key), doc_count: entry.doc_count, - sub_aggregation: AggregationResults::from_intermediate_and_req_internal( - entry.sub_aggregation, - sub_aggregation_req, - )?, + sub_aggregation: entry + .sub_aggregation + .into_final_bucket_result_internal(sub_aggregation_req)?, }) }) .collect::>()?; @@ -374,6 +508,21 @@ pub struct IntermediateHistogramBucketEntry { pub sub_aggregation: IntermediateAggregationResults, } +impl IntermediateHistogramBucketEntry { + pub(crate) fn into_final_bucket_entry( + self, + req: &AggregationsInternal, + ) -> crate::Result { + Ok(BucketEntry { + key: Key::F64(self.key), + doc_count: self.doc_count, + sub_aggregation: self + .sub_aggregation + .into_final_bucket_result_internal(req)?, + }) + } +} + impl From for IntermediateHistogramBucketEntry { fn from(entry: SegmentHistogramBucketEntry) -> Self { IntermediateHistogramBucketEntry { @@ -402,6 +551,23 @@ pub struct IntermediateRangeBucketEntry { pub to: Option, } +impl IntermediateRangeBucketEntry { + pub(crate) fn into_final_bucket_entry( + self, + req: &AggregationsInternal, + ) -> crate::Result { + Ok(RangeBucketEntry { + key: self.key, + doc_count: self.doc_count, + sub_aggregation: self + .sub_aggregation + .into_final_bucket_result_internal(req)?, + to: self.to, + from: self.from, + }) + } +} + /// This is the term entry for a bucket, which contains a count, and optionally /// sub_aggregations. #[derive(Clone, Default, Debug, PartialEq, Serialize, Deserialize)] diff --git a/src/aggregation/mod.rs b/src/aggregation/mod.rs index 37fa05c0ff..dfaaf3265a 100644 --- a/src/aggregation/mod.rs +++ b/src/aggregation/mod.rs @@ -545,11 +545,10 @@ mod tests { let collector = DistributedAggregationCollector::from_aggs(agg_req.clone()); let searcher = reader.searcher(); - AggregationResults::from_intermediate_and_req( - searcher.search(&AllQuery, &collector).unwrap(), - agg_req, - ) - .unwrap() + let intermediate_agg_result = searcher.search(&AllQuery, &collector).unwrap(); + intermediate_agg_result + .into_final_bucket_result(agg_req) + .unwrap() } else { let collector = AggregationCollector::from_aggs(agg_req); @@ -985,7 +984,7 @@ mod tests { // Test de/serialization roundtrip on intermediate_agg_result let res: IntermediateAggregationResults = serde_json::from_str(&serde_json::to_string(&res).unwrap()).unwrap(); - AggregationResults::from_intermediate_and_req(res, agg_req.clone()).unwrap() + res.into_final_bucket_result(agg_req.clone()).unwrap() } else { let collector = AggregationCollector::from_aggs(agg_req.clone());