From aa391bf8430d91a0a355ba00dcb7f95f406fbf9c Mon Sep 17 00:00:00 2001 From: Pascal Seitz Date: Thu, 17 Mar 2022 14:04:15 +0800 Subject: [PATCH] refactor parameters --- src/aggregation/agg_result.rs | 144 +++++++-------- src/aggregation/bucket/histogram/histogram.rs | 172 +++++++++++++++++- src/aggregation/collector.rs | 3 +- src/aggregation/mod.rs | 7 +- 4 files changed, 232 insertions(+), 94 deletions(-) diff --git a/src/aggregation/agg_result.rs b/src/aggregation/agg_result.rs index 5918389a55..05c08aad1d 100644 --- a/src/aggregation/agg_result.rs +++ b/src/aggregation/agg_result.rs @@ -11,7 +11,7 @@ use itertools::Itertools; use serde::{Deserialize, Serialize}; use super::agg_req::{Aggregations, CollectorAggregations, CollectorBucketAggregation}; -use super::bucket::generate_buckets; +use super::bucket::intermediate_buckets_to_final_buckets; use super::intermediate_agg_result::{ IntermediateAggregationResults, IntermediateBucketResult, IntermediateHistogramBucketEntry, IntermediateMetricResult, IntermediateRangeBucketEntry, @@ -23,37 +23,52 @@ use super::Key; /// The final aggegation result. pub struct AggregationResults(pub HashMap); -impl From<(IntermediateAggregationResults, Aggregations)> for AggregationResults { - fn from(tree_and_req: (IntermediateAggregationResults, Aggregations)) -> Self { - let agg: CollectorAggregations = tree_and_req.1.into(); - (tree_and_req.0, &agg).into() +impl AggregationResults { + /// Convert and intermediate result and its aggregation request to the final result + pub fn from_intermediate_and_req( + results: IntermediateAggregationResults, + agg: Aggregations, + ) -> Self { + AggregationResults::from_intermediate_and_req_internal(results, &(agg.into())) } -} - -impl From<(IntermediateAggregationResults, &CollectorAggregations)> for AggregationResults { - fn from(data: (IntermediateAggregationResults, &CollectorAggregations)) -> Self { - let tree = data.0; - let req = data.1; + /// Convert and intermediate result and its aggregation request to the final result + /// + /// Internal function, CollectorAggregations is used instead Aggregations, which is optimized + /// for internal processing + fn from_intermediate_and_req_internal( + results: IntermediateAggregationResults, + req: &CollectorAggregations, + ) -> Self { let mut result = HashMap::default(); // Important assumption: // When the tree contains buckets/metric, we expect it to have all buckets/metrics from the // request - if let Some(buckets) = tree.buckets { + if let Some(buckets) = results.buckets { result.extend(buckets.into_iter().zip(req.buckets.values()).map( - |((key, bucket), req)| (key, AggregationResult::BucketResult((bucket, req).into())), + |((key, bucket), req)| { + ( + key, + AggregationResult::BucketResult(BucketResult::from_intermediate_and_req( + bucket, req, + )), + ) + }, )); } else { result.extend(req.buckets.iter().map(|(key, req)| { let empty_bucket = IntermediateBucketResult::empty_from_req(&req.bucket_agg); ( key.to_string(), - AggregationResult::BucketResult((empty_bucket, req).into()), + AggregationResult::BucketResult(BucketResult::from_intermediate_and_req( + empty_bucket, + req, + )), ) })); } - if let Some(metrics) = tree.metrics { + if let Some(metrics) = results.metrics { result.extend( metrics .into_iter() @@ -127,15 +142,18 @@ pub enum BucketResult { }, } -impl From<(IntermediateBucketResult, &CollectorBucketAggregation)> for BucketResult { - fn from(result_and_req: (IntermediateBucketResult, &CollectorBucketAggregation)) -> Self { - let bucket_result = result_and_req.0; - let req = result_and_req.1; +impl BucketResult { + fn from_intermediate_and_req( + bucket_result: IntermediateBucketResult, + req: &CollectorBucketAggregation, + ) -> Self { match bucket_result { IntermediateBucketResult::Range(range_map) => { let mut buckets: Vec = range_map .into_iter() - .map(|(_, bucket)| (bucket, &req.sub_aggregation).into()) + .map(|(_, bucket)| { + RangeBucketEntry::from_intermediate_and_req(bucket, &req.sub_aggregation) + }) .collect_vec(); buckets.sort_by(|a, b| { @@ -147,59 +165,11 @@ impl From<(IntermediateBucketResult, &CollectorBucketAggregation)> for BucketRes BucketResult::Range { buckets } } IntermediateBucketResult::Histogram { buckets } => { - let histogram_req = req.as_histogram(); - let buckets = if histogram_req.min_doc_count() == 0 { - // With min_doc_count != 0, we may need to add buckets, so that there are no - // gaps, since intermediate result does not contain empty buckets (filtered to - // reduce serialization size). - - let (min, max) = if buckets.is_empty() { - (f64::MAX, f64::MIN) - } else { - let min = buckets[0].key; - let max = buckets[buckets.len() - 1].key; - (min, max) - }; - - let fill_gaps_buckets = generate_buckets(histogram_req, min, max); - - let sub_aggregation = - IntermediateAggregationResults::empty_from_req(&req.sub_aggregation); - - buckets - .into_iter() - .merge_join_by( - fill_gaps_buckets.into_iter(), - |existing_bucket, fill_gaps_bucket| { - existing_bucket - .key - .partial_cmp(fill_gaps_bucket) - .unwrap_or(Ordering::Equal) - }, - ) - .map(|either| match either { - itertools::EitherOrBoth::Both(existing, _) => { - (existing, &req.sub_aggregation).into() - } - itertools::EitherOrBoth::Left(existing) => { - (existing, &req.sub_aggregation).into() - } - // Add missing bucket - itertools::EitherOrBoth::Right(bucket) => BucketEntry { - key: Key::F64(bucket), - doc_count: 0, - sub_aggregation: (sub_aggregation.clone(), &req.sub_aggregation) - .into(), - }, - }) - .collect_vec() - } else { - buckets - .into_iter() - .filter(|bucket| bucket.doc_count >= histogram_req.min_doc_count()) - .map(|bucket| (bucket, &req.sub_aggregation).into()) - .collect_vec() - }; + let buckets = intermediate_buckets_to_final_buckets( + buckets, + req.as_histogram(), + &req.sub_aggregation, + ); BucketResult::Histogram { buckets } } @@ -244,14 +214,18 @@ pub struct BucketEntry { pub sub_aggregation: AggregationResults, } -impl From<(IntermediateHistogramBucketEntry, &CollectorAggregations)> for BucketEntry { - fn from(entry_and_req: (IntermediateHistogramBucketEntry, &CollectorAggregations)) -> Self { - let entry = entry_and_req.0; - let req = entry_and_req.1; +impl BucketEntry { + pub(crate) fn from_intermediate_and_req( + entry: IntermediateHistogramBucketEntry, + req: &CollectorAggregations, + ) -> Self { BucketEntry { key: Key::F64(entry.key), doc_count: entry.doc_count, - sub_aggregation: (entry.sub_aggregation, req).into(), + sub_aggregation: AggregationResults::from_intermediate_and_req_internal( + entry.sub_aggregation, + req, + ), } } } @@ -303,14 +277,18 @@ pub struct RangeBucketEntry { pub to: Option, } -impl From<(IntermediateRangeBucketEntry, &CollectorAggregations)> for RangeBucketEntry { - fn from(entry_and_req: (IntermediateRangeBucketEntry, &CollectorAggregations)) -> Self { - let entry = entry_and_req.0; - let req = entry_and_req.1; +impl RangeBucketEntry { + fn from_intermediate_and_req( + entry: IntermediateRangeBucketEntry, + req: &CollectorAggregations, + ) -> Self { RangeBucketEntry { key: entry.key, doc_count: entry.doc_count, - sub_aggregation: (entry.sub_aggregation, req).into(), + sub_aggregation: AggregationResults::from_intermediate_and_req_internal( + entry.sub_aggregation, + req, + ), to: entry.to, from: entry.from, } diff --git a/src/aggregation/bucket/histogram/histogram.rs b/src/aggregation/bucket/histogram/histogram.rs index c4ee8da081..4da8a434a7 100644 --- a/src/aggregation/bucket/histogram/histogram.rs +++ b/src/aggregation/bucket/histogram/histogram.rs @@ -1,12 +1,18 @@ +use std::cmp::Ordering; use std::fmt::Display; +use itertools::Itertools; use serde::{Deserialize, Serialize}; +use crate::aggregation::agg_req::CollectorAggregations; use crate::aggregation::agg_req_with_accessor::{ AggregationsWithAccessor, BucketAggregationWithAccessor, }; +use crate::aggregation::agg_result::BucketEntry; use crate::aggregation::f64_from_fastfield_u64; -use crate::aggregation::intermediate_agg_result::IntermediateBucketResult; +use crate::aggregation::intermediate_agg_result::{ + IntermediateAggregationResults, IntermediateBucketResult, IntermediateHistogramBucketEntry, +}; use crate::aggregation::segment_agg_result::{ SegmentAggregationResultsCollector, SegmentHistogramBucketEntry, }; @@ -208,7 +214,7 @@ impl SegmentHistogramCollector { let min = f64_from_fastfield_u64(accessor.min_value(), &field_type); let max = f64_from_fastfield_u64(accessor.max_value(), &field_type); - let (min, max) = get_req_min_max(req, min, max); + let (min, max) = get_req_min_max(req, Some((min, max))); // We compute and generate the buckets range (min, max) based on the request and the min // max in the fast field, but this is likely not ideal when this is a subbucket, where many @@ -231,8 +237,6 @@ impl SegmentHistogramCollector { }) .collect(); - let (min, _) = get_req_min_max(req, min, max); - let first_bucket_num = get_bucket_num_f64(min, req.interval, req.offset.unwrap_or(0.0)) as i64; @@ -245,7 +249,7 @@ impl SegmentHistogramCollector { buckets, field_type, interval: req.interval, - offset: req.offset.unwrap_or(0f64), + offset: req.offset.unwrap_or(0.0), first_bucket_num, bounds, sub_aggregations, @@ -381,11 +385,94 @@ fn get_bucket_val(val: f64, interval: f64, offset: f64) -> f64 { bucket_pos * interval + offset } -fn get_req_min_max(req: &HistogramAggregation, mut min: f64, mut max: f64) -> (f64, f64) { +// Convert to BucketEntry and fill gaps +fn intermediate_buckets_to_final_buckets_fill_gaps( + buckets: Vec, + histogram_req: &HistogramAggregation, + sub_aggregation: &CollectorAggregations, +) -> Vec { + // Generate the the full list of buckets without gaps. + // + // The bounds are the min max from the current buckets, optionally extended by + // extended_bounds from the request + let min_max = if buckets.is_empty() { + None + } else { + let min = buckets[0].key; + let max = buckets[buckets.len() - 1].key; + Some((min, max)) + }; + let fill_gaps_buckets = generate_buckets_with_opt_minmax(histogram_req, min_max); + + let empty_sub_aggregation = IntermediateAggregationResults::empty_from_req(&sub_aggregation); + + // Use merge_join_by to fill in gaps, since buckets are sorted + let buckets = buckets + .into_iter() + .merge_join_by( + fill_gaps_buckets.into_iter(), + |existing_bucket, fill_gaps_bucket| { + existing_bucket + .key + .partial_cmp(fill_gaps_bucket) + .unwrap_or(Ordering::Equal) + }, + ) + .map(|either| match either { + // Ignore the generated bucket + itertools::EitherOrBoth::Both(existing, _) => existing, + itertools::EitherOrBoth::Left(existing) => existing, + // Add missing bucket + itertools::EitherOrBoth::Right(missing_bucket) => IntermediateHistogramBucketEntry { + key: missing_bucket, + doc_count: 0, + sub_aggregation: empty_sub_aggregation.clone(), + }, + }) + .map(|intermediate_bucket| { + BucketEntry::from_intermediate_and_req(intermediate_bucket, &sub_aggregation) + }) + .collect_vec(); + return buckets; +} + +// Convert to BucketEntry +pub(crate) fn intermediate_buckets_to_final_buckets( + buckets: Vec, + histogram_req: &HistogramAggregation, + sub_aggregation: &CollectorAggregations, +) -> Vec { + if histogram_req.min_doc_count() == 0 { + // With min_doc_count != 0, we may need to add buckets, so that there are no + // gaps, since intermediate result does not contain empty buckets (filtered to + // reduce serialization size). + let buckets = intermediate_buckets_to_final_buckets_fill_gaps( + buckets, + histogram_req, + sub_aggregation, + ); + return buckets; + } else { + let buckets = buckets + .into_iter() + .filter(|bucket| bucket.doc_count >= histogram_req.min_doc_count()) + .map(|bucket| BucketEntry::from_intermediate_and_req(bucket, &sub_aggregation)) + .collect_vec(); + return buckets; + }; +} + +/// Applies req extended_bounds/hard_bounds on the min_max value +/// +/// May return (f64::MAX, f64::MIN), if there is no range. +fn get_req_min_max(req: &HistogramAggregation, min_max: Option<(f64, f64)>) -> (f64, f64) { + let (mut min, mut max) = min_max.unwrap_or((f64::MAX, f64::MIN)); + if let Some(extended_bounds) = &req.extended_bounds { min = min.min(extended_bounds.min); max = max.max(extended_bounds.max); } + if let Some(hard_bounds) = &req.hard_bounds { min = min.max(hard_bounds.min); max = max.min(hard_bounds.max); @@ -394,9 +481,20 @@ fn get_req_min_max(req: &HistogramAggregation, mut min: f64, mut max: f64) -> (f (min, max) } -/// Generates buckets with req.interval, for given min, max +/// Generates buckets with req.interval +/// range is computed for provided min_max and request extended_bounds/hard_bounds pub(crate) fn generate_buckets(req: &HistogramAggregation, min: f64, max: f64) -> Vec { - let (min, max) = get_req_min_max(req, min, max); + generate_buckets_with_opt_minmax(req, Some((min, max))) +} + +/// Generates buckets with req.interval +/// Range is computed for provided min_max and request extended_bounds/hard_bounds +/// returns empty vec when there is no range to span +pub(crate) fn generate_buckets_with_opt_minmax( + req: &HistogramAggregation, + min_max: Option<(f64, f64)>, +) -> Vec { + let (min, max) = get_req_min_max(req, min_max); let offset = req.offset.unwrap_or(0.0); let first_bucket_num = get_bucket_num_f64(min, req.interval, offset) as i64; @@ -1090,6 +1188,64 @@ mod tests { assert_eq!(res["histogram"]["buckets"][10]["key"], 12.0); assert_eq!(res["histogram"]["buckets"][10]["doc_count"], 0); + let agg_req: Aggregations = vec![( + "histogram".to_string(), + Aggregation::Bucket(BucketAggregation { + bucket_agg: BucketAggregationType::Histogram(HistogramAggregation { + field: "score_f64".to_string(), + interval: 1.0, + extended_bounds: Some(HistogramBounds { min: 2.0, max: 5.0 }), + hard_bounds: Some(HistogramBounds { + min: 2.0, + max: 12.0, + }), + ..Default::default() + }), + sub_aggregation: Default::default(), + }), + )] + .into_iter() + .collect(); + + let res = exec_request(agg_req, &index)?; + + assert_eq!(res["histogram"]["buckets"][0]["key"], 2.0); + assert_eq!(res["histogram"]["buckets"][0]["doc_count"], 0); + assert_eq!(res["histogram"]["buckets"][1]["key"], 3.0); + assert_eq!(res["histogram"]["buckets"][1]["doc_count"], 0); + assert_eq!(res["histogram"]["buckets"][2]["doc_count"], 0); + assert_eq!(res["histogram"]["buckets"][10], Value::Null); + + // hard_bounds will not extend the result + let agg_req: Aggregations = vec![( + "histogram".to_string(), + Aggregation::Bucket(BucketAggregation { + bucket_agg: BucketAggregationType::Histogram(HistogramAggregation { + field: "score_f64".to_string(), + interval: 1.0, + hard_bounds: Some(HistogramBounds { + min: 2.0, + max: 12.0, + }), + ..Default::default() + }), + sub_aggregation: Default::default(), + }), + )] + .into_iter() + .collect(); + + let res = exec_request(agg_req, &index)?; + + assert_eq!( + res, + json!({ + "histogram": { + "buckets": [] + } + }) + ); + let agg_req: Aggregations = vec![ ( "stats".to_string(), diff --git a/src/aggregation/collector.rs b/src/aggregation/collector.rs index 0e8b06217c..2278361518 100644 --- a/src/aggregation/collector.rs +++ b/src/aggregation/collector.rs @@ -86,7 +86,8 @@ impl Collector for AggregationCollector { &self, segment_fruits: Vec<::Fruit>, ) -> crate::Result { - merge_fruits(segment_fruits).map(|res| (res, self.agg.clone()).into()) + merge_fruits(segment_fruits) + .map(|res| AggregationResults::from_intermediate_and_req(res, self.agg.clone())) } } diff --git a/src/aggregation/mod.rs b/src/aggregation/mod.rs index a76095ae17..697afc428e 100644 --- a/src/aggregation/mod.rs +++ b/src/aggregation/mod.rs @@ -456,7 +456,10 @@ mod tests { let collector = DistributedAggregationCollector::from_aggs(agg_req.clone()); let searcher = reader.searcher(); - (searcher.search(&term_query, &collector).unwrap(), agg_req).into() + AggregationResults::from_intermediate_and_req( + searcher.search(&term_query, &collector).unwrap(), + agg_req, + ) } else { let collector = AggregationCollector::from_aggs(agg_req); @@ -835,7 +838,7 @@ mod tests { // Test de/serialization roundtrip on intermediate_agg_result let res: IntermediateAggregationResults = serde_json::from_str(&serde_json::to_string(&res).unwrap()).unwrap(); - (res, agg_req.clone()).into() + AggregationResults::from_intermediate_and_req(res, agg_req.clone()) } else { let collector = AggregationCollector::from_aggs(agg_req.clone());