Skip to content

Commit

Permalink
refactor aggregations
Browse files Browse the repository at this point in the history
  • Loading branch information
PSeitz committed May 11, 2022
1 parent 7f45a6a commit 7c8e1e3
Show file tree
Hide file tree
Showing 6 changed files with 193 additions and 202 deletions.
185 changes: 6 additions & 179 deletions src/aggregation/agg_result.rs
Expand Up @@ -4,21 +4,15 @@
//! intermediate average results, which is the sum and the number of values. The actual average is
//! calculated on the step from intermediate to final aggregation result tree.

use std::cmp::Ordering;
use std::collections::HashMap;

use serde::{Deserialize, Serialize};

use super::agg_req::{
Aggregations, AggregationsInternal, BucketAggregationInternal, MetricAggregation,
};
use super::bucket::{intermediate_buckets_to_final_buckets, GetDocCount};
use super::intermediate_agg_result::{
IntermediateAggregationResults, IntermediateBucketResult, IntermediateHistogramBucketEntry,
IntermediateMetricResult, IntermediateRangeBucketEntry,
};
use super::agg_req::BucketAggregationInternal;
use super::bucket::GetDocCount;
use super::intermediate_agg_result::{IntermediateBucketResult, IntermediateMetricResult};
use super::metric::{SingleMetricResult, Stats};
use super::{Key, VecWithNames};
use super::Key;
use crate::TantivyError;

#[derive(Clone, Default, Debug, PartialEq, Serialize, Deserialize)]
Expand All @@ -41,98 +35,6 @@ impl AggregationResults {
)))
}
}

/// Convert and intermediate result and its aggregation request to the final result
pub fn from_intermediate_and_req(
results: IntermediateAggregationResults,
agg: Aggregations,
) -> crate::Result<Self> {
AggregationResults::from_intermediate_and_req_internal(results, &(agg.into()))
}

/// Convert and intermediate result and its aggregation request to the final result
///
/// Internal function, CollectorAggregations is used instead Aggregations, which is optimized
/// for internal processing, by splitting metric and buckets into seperate groups.
pub(crate) fn from_intermediate_and_req_internal(
intermediate_results: IntermediateAggregationResults,
req: &AggregationsInternal,
) -> crate::Result<Self> {
// Important assumption:
// When the tree contains buckets/metric, we expect it to have all buckets/metrics from the
// request
let mut results: HashMap<String, AggregationResult> = HashMap::new();

if let Some(buckets) = intermediate_results.buckets {
add_coverted_final_buckets_to_result(&mut results, buckets, &req.buckets)?
} else {
// When there are no buckets, we create empty buckets, so that the serialized json
// format is constant
add_empty_final_buckets_to_result(&mut results, &req.buckets)?
};

if let Some(metrics) = intermediate_results.metrics {
add_converted_final_metrics_to_result(&mut results, metrics);
} else {
// When there are no metrics, we create empty metric results, so that the serialized
// json format is constant
add_empty_final_metrics_to_result(&mut results, &req.metrics)?;
}
Ok(Self(results))
}
}

fn add_converted_final_metrics_to_result(
results: &mut HashMap<String, AggregationResult>,
metrics: VecWithNames<IntermediateMetricResult>,
) {
results.extend(
metrics
.into_iter()
.map(|(key, metric)| (key, AggregationResult::MetricResult(metric.into()))),
);
}

fn add_empty_final_metrics_to_result(
results: &mut HashMap<String, AggregationResult>,
req_metrics: &VecWithNames<MetricAggregation>,
) -> crate::Result<()> {
results.extend(req_metrics.iter().map(|(key, req)| {
let empty_bucket = IntermediateMetricResult::empty_from_req(req);
(
key.to_string(),
AggregationResult::MetricResult(empty_bucket.into()),
)
}));
Ok(())
}

fn add_empty_final_buckets_to_result(
results: &mut HashMap<String, AggregationResult>,
req_buckets: &VecWithNames<BucketAggregationInternal>,
) -> crate::Result<()> {
let requested_buckets = req_buckets.iter();
for (key, req) in requested_buckets {
let empty_bucket = AggregationResult::BucketResult(BucketResult::empty_from_req(req)?);
results.insert(key.to_string(), empty_bucket);
}
Ok(())
}

fn add_coverted_final_buckets_to_result(
results: &mut HashMap<String, AggregationResult>,
buckets: VecWithNames<IntermediateBucketResult>,
req_buckets: &VecWithNames<BucketAggregationInternal>,
) -> crate::Result<()> {
assert_eq!(buckets.len(), req_buckets.len());

let buckets_with_request = buckets.into_iter().zip(req_buckets.values());
for ((key, bucket), req) in buckets_with_request {
let result =
AggregationResult::BucketResult(BucketResult::from_intermediate_and_req(bucket, req)?);
results.insert(key, result);
}
Ok(())
}

#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
Expand All @@ -154,7 +56,7 @@ impl AggregationResult {
match self {
AggregationResult::BucketResult(_bucket) => Err(TantivyError::InternalError(
"Tried to retrieve value from bucket aggregation. This is not supported and \
should not happen during collection, but should be catched during validation"
should not happen during collection phase, but should be catched during validation"
.to_string(),
)),
AggregationResult::MetricResult(metric) => metric.get_value(agg_property),
Expand Down Expand Up @@ -230,48 +132,7 @@ pub enum BucketResult {
impl BucketResult {
pub(crate) fn empty_from_req(req: &BucketAggregationInternal) -> crate::Result<Self> {
let empty_bucket = IntermediateBucketResult::empty_from_req(&req.bucket_agg);
BucketResult::from_intermediate_and_req(empty_bucket, req)
}

fn from_intermediate_and_req(
bucket_result: IntermediateBucketResult,
req: &BucketAggregationInternal,
) -> crate::Result<Self> {
match bucket_result {
IntermediateBucketResult::Range(range_res) => {
let mut buckets: Vec<RangeBucketEntry> = range_res
.buckets
.into_iter()
.map(|(_, bucket)| {
RangeBucketEntry::from_intermediate_and_req(bucket, &req.sub_aggregation)
})
.collect::<crate::Result<Vec<_>>>()?;

buckets.sort_by(|left, right| {
// TODO use total_cmp next stable rust release
left.from
.unwrap_or(f64::MIN)
.partial_cmp(&right.from.unwrap_or(f64::MIN))
.unwrap_or(Ordering::Equal)
});
Ok(BucketResult::Range { buckets })
}
IntermediateBucketResult::Histogram { buckets } => {
let buckets = intermediate_buckets_to_final_buckets(
buckets,
req.as_histogram()
.expect("unexpected aggregation, expected histogram aggregation"),
&req.sub_aggregation,
)?;

Ok(BucketResult::Histogram { buckets })
}
IntermediateBucketResult::Terms(terms) => terms.into_final_result(
req.as_term()
.expect("unexpected aggregation, expected term aggregation"),
&req.sub_aggregation,
),
}
empty_bucket.into_final_bucket_result(req)
}
}

Expand Down Expand Up @@ -311,22 +172,6 @@ pub struct BucketEntry {
/// Sub-aggregations in this bucket.
pub sub_aggregation: AggregationResults,
}

impl BucketEntry {
pub(crate) fn from_intermediate_and_req(
entry: IntermediateHistogramBucketEntry,
req: &AggregationsInternal,
) -> crate::Result<Self> {
Ok(BucketEntry {
key: Key::F64(entry.key),
doc_count: entry.doc_count,
sub_aggregation: AggregationResults::from_intermediate_and_req_internal(
entry.sub_aggregation,
req,
)?,
})
}
}
impl GetDocCount for &BucketEntry {
fn doc_count(&self) -> u64 {
self.doc_count
Expand Down Expand Up @@ -384,21 +229,3 @@ pub struct RangeBucketEntry {
#[serde(skip_serializing_if = "Option::is_none")]
pub to: Option<f64>,
}

impl RangeBucketEntry {
fn from_intermediate_and_req(
entry: IntermediateRangeBucketEntry,
req: &AggregationsInternal,
) -> crate::Result<Self> {
Ok(RangeBucketEntry {
key: entry.key,
doc_count: entry.doc_count,
sub_aggregation: AggregationResults::from_intermediate_and_req_internal(
entry.sub_aggregation,
req,
)?,
to: entry.to,
from: entry.from,
})
}
}
12 changes: 5 additions & 7 deletions src/aggregation/bucket/histogram/histogram.rs
Expand Up @@ -482,14 +482,12 @@ fn intermediate_buckets_to_final_buckets_fill_gaps(
sub_aggregation: empty_sub_aggregation.clone(),
},
})
.map(|intermediate_bucket| {
BucketEntry::from_intermediate_and_req(intermediate_bucket, sub_aggregation)
})
.map(|intermediate_bucket| intermediate_bucket.into_final_bucket_entry(sub_aggregation))
.collect::<crate::Result<Vec<_>>>()
}

// Convert to BucketEntry
pub(crate) fn intermediate_buckets_to_final_buckets(
pub(crate) fn intermediate_histogram_buckets_to_final_buckets(
buckets: Vec<IntermediateHistogramBucketEntry>,
histogram_req: &HistogramAggregation,
sub_aggregation: &AggregationsInternal,
Expand All @@ -503,8 +501,8 @@ pub(crate) fn intermediate_buckets_to_final_buckets(
} else {
buckets
.into_iter()
.filter(|bucket| bucket.doc_count >= histogram_req.min_doc_count())
.map(|bucket| BucketEntry::from_intermediate_and_req(bucket, sub_aggregation))
.filter(|histogram_bucket| histogram_bucket.doc_count >= histogram_req.min_doc_count())
.map(|histogram_bucket| histogram_bucket.into_final_bucket_entry(sub_aggregation))
.collect::<crate::Result<Vec<_>>>()
}
}
Expand Down Expand Up @@ -546,7 +544,7 @@ pub(crate) fn generate_buckets_with_opt_minmax(
let offset = req.offset.unwrap_or(0.0);
let first_bucket_num = get_bucket_num_f64(min, req.interval, offset) as i64;
let last_bucket_num = get_bucket_num_f64(max, req.interval, offset) as i64;
let mut buckets = vec![];
let mut buckets = Vec::with_capacity((first_bucket_num..=last_bucket_num).count());
for bucket_pos in first_bucket_num..=last_bucket_num {
let bucket_key = bucket_pos as f64 * req.interval + offset;
buckets.push(bucket_key);
Expand Down
2 changes: 1 addition & 1 deletion src/aggregation/bucket/range.rs
Expand Up @@ -317,7 +317,7 @@ fn to_u64_range(range: &RangeAggregationRange, field_type: &Type) -> crate::Resu
}

/// Extends the provided buckets to contain the whole value range, by inserting buckets at the
/// beginning and end.
/// beginning and end and filling gaps.
fn extend_validate_ranges(
buckets: &[RangeAggregationRange],
field_type: &Type,
Expand Down
2 changes: 1 addition & 1 deletion src/aggregation/collector.rs
Expand Up @@ -87,7 +87,7 @@ impl Collector for AggregationCollector {
segment_fruits: Vec<<Self::Child as SegmentCollector>::Fruit>,
) -> crate::Result<Self::Fruit> {
let res = merge_fruits(segment_fruits)?;
AggregationResults::from_intermediate_and_req(res, self.agg.clone())
res.into_final_bucket_result(self.agg.clone())
}
}

Expand Down

0 comments on commit 7c8e1e3

Please sign in to comment.