Skip to content

Commit

Permalink
refactor parameters
Browse files Browse the repository at this point in the history
  • Loading branch information
PSeitz committed Mar 17, 2022
1 parent 47dcbdb commit aa391bf
Show file tree
Hide file tree
Showing 4 changed files with 232 additions and 94 deletions.
144 changes: 61 additions & 83 deletions src/aggregation/agg_result.rs
Expand Up @@ -11,7 +11,7 @@ use itertools::Itertools;
use serde::{Deserialize, Serialize};

use super::agg_req::{Aggregations, CollectorAggregations, CollectorBucketAggregation};
use super::bucket::generate_buckets;
use super::bucket::intermediate_buckets_to_final_buckets;
use super::intermediate_agg_result::{
IntermediateAggregationResults, IntermediateBucketResult, IntermediateHistogramBucketEntry,
IntermediateMetricResult, IntermediateRangeBucketEntry,
Expand All @@ -23,37 +23,52 @@ use super::Key;
/// The final aggegation result.
pub struct AggregationResults(pub HashMap<String, AggregationResult>);

impl From<(IntermediateAggregationResults, Aggregations)> for AggregationResults {
fn from(tree_and_req: (IntermediateAggregationResults, Aggregations)) -> Self {
let agg: CollectorAggregations = tree_and_req.1.into();
(tree_and_req.0, &agg).into()
impl AggregationResults {
/// Convert and intermediate result and its aggregation request to the final result
pub fn from_intermediate_and_req(
results: IntermediateAggregationResults,
agg: Aggregations,
) -> Self {
AggregationResults::from_intermediate_and_req_internal(results, &(agg.into()))
}
}

impl From<(IntermediateAggregationResults, &CollectorAggregations)> for AggregationResults {
fn from(data: (IntermediateAggregationResults, &CollectorAggregations)) -> Self {
let tree = data.0;
let req = data.1;
/// Convert and intermediate result and its aggregation request to the final result
///
/// Internal function, CollectorAggregations is used instead Aggregations, which is optimized
/// for internal processing
fn from_intermediate_and_req_internal(
results: IntermediateAggregationResults,
req: &CollectorAggregations,
) -> Self {
let mut result = HashMap::default();

// Important assumption:
// When the tree contains buckets/metric, we expect it to have all buckets/metrics from the
// request
if let Some(buckets) = tree.buckets {
if let Some(buckets) = results.buckets {
result.extend(buckets.into_iter().zip(req.buckets.values()).map(
|((key, bucket), req)| (key, AggregationResult::BucketResult((bucket, req).into())),
|((key, bucket), req)| {
(
key,
AggregationResult::BucketResult(BucketResult::from_intermediate_and_req(
bucket, req,
)),
)
},
));
} else {
result.extend(req.buckets.iter().map(|(key, req)| {
let empty_bucket = IntermediateBucketResult::empty_from_req(&req.bucket_agg);
(
key.to_string(),
AggregationResult::BucketResult((empty_bucket, req).into()),
AggregationResult::BucketResult(BucketResult::from_intermediate_and_req(
empty_bucket,
req,
)),
)
}));
}

if let Some(metrics) = tree.metrics {
if let Some(metrics) = results.metrics {
result.extend(
metrics
.into_iter()
Expand Down Expand Up @@ -127,15 +142,18 @@ pub enum BucketResult {
},
}

impl From<(IntermediateBucketResult, &CollectorBucketAggregation)> for BucketResult {
fn from(result_and_req: (IntermediateBucketResult, &CollectorBucketAggregation)) -> Self {
let bucket_result = result_and_req.0;
let req = result_and_req.1;
impl BucketResult {
fn from_intermediate_and_req(
bucket_result: IntermediateBucketResult,
req: &CollectorBucketAggregation,
) -> Self {
match bucket_result {
IntermediateBucketResult::Range(range_map) => {
let mut buckets: Vec<RangeBucketEntry> = range_map
.into_iter()
.map(|(_, bucket)| (bucket, &req.sub_aggregation).into())
.map(|(_, bucket)| {
RangeBucketEntry::from_intermediate_and_req(bucket, &req.sub_aggregation)
})
.collect_vec();

buckets.sort_by(|a, b| {
Expand All @@ -147,59 +165,11 @@ impl From<(IntermediateBucketResult, &CollectorBucketAggregation)> for BucketRes
BucketResult::Range { buckets }
}
IntermediateBucketResult::Histogram { buckets } => {
let histogram_req = req.as_histogram();
let buckets = if histogram_req.min_doc_count() == 0 {
// With min_doc_count != 0, we may need to add buckets, so that there are no
// gaps, since intermediate result does not contain empty buckets (filtered to
// reduce serialization size).

let (min, max) = if buckets.is_empty() {
(f64::MAX, f64::MIN)
} else {
let min = buckets[0].key;
let max = buckets[buckets.len() - 1].key;
(min, max)
};

let fill_gaps_buckets = generate_buckets(histogram_req, min, max);

let sub_aggregation =
IntermediateAggregationResults::empty_from_req(&req.sub_aggregation);

buckets
.into_iter()
.merge_join_by(
fill_gaps_buckets.into_iter(),
|existing_bucket, fill_gaps_bucket| {
existing_bucket
.key
.partial_cmp(fill_gaps_bucket)
.unwrap_or(Ordering::Equal)
},
)
.map(|either| match either {
itertools::EitherOrBoth::Both(existing, _) => {
(existing, &req.sub_aggregation).into()
}
itertools::EitherOrBoth::Left(existing) => {
(existing, &req.sub_aggregation).into()
}
// Add missing bucket
itertools::EitherOrBoth::Right(bucket) => BucketEntry {
key: Key::F64(bucket),
doc_count: 0,
sub_aggregation: (sub_aggregation.clone(), &req.sub_aggregation)
.into(),
},
})
.collect_vec()
} else {
buckets
.into_iter()
.filter(|bucket| bucket.doc_count >= histogram_req.min_doc_count())
.map(|bucket| (bucket, &req.sub_aggregation).into())
.collect_vec()
};
let buckets = intermediate_buckets_to_final_buckets(
buckets,
req.as_histogram(),
&req.sub_aggregation,
);

BucketResult::Histogram { buckets }
}
Expand Down Expand Up @@ -244,14 +214,18 @@ pub struct BucketEntry {
pub sub_aggregation: AggregationResults,
}

impl From<(IntermediateHistogramBucketEntry, &CollectorAggregations)> for BucketEntry {
fn from(entry_and_req: (IntermediateHistogramBucketEntry, &CollectorAggregations)) -> Self {
let entry = entry_and_req.0;
let req = entry_and_req.1;
impl BucketEntry {
pub(crate) fn from_intermediate_and_req(
entry: IntermediateHistogramBucketEntry,
req: &CollectorAggregations,
) -> Self {
BucketEntry {
key: Key::F64(entry.key),
doc_count: entry.doc_count,
sub_aggregation: (entry.sub_aggregation, req).into(),
sub_aggregation: AggregationResults::from_intermediate_and_req_internal(
entry.sub_aggregation,
req,
),
}
}
}
Expand Down Expand Up @@ -303,14 +277,18 @@ pub struct RangeBucketEntry {
pub to: Option<f64>,
}

impl From<(IntermediateRangeBucketEntry, &CollectorAggregations)> for RangeBucketEntry {
fn from(entry_and_req: (IntermediateRangeBucketEntry, &CollectorAggregations)) -> Self {
let entry = entry_and_req.0;
let req = entry_and_req.1;
impl RangeBucketEntry {
fn from_intermediate_and_req(
entry: IntermediateRangeBucketEntry,
req: &CollectorAggregations,
) -> Self {
RangeBucketEntry {
key: entry.key,
doc_count: entry.doc_count,
sub_aggregation: (entry.sub_aggregation, req).into(),
sub_aggregation: AggregationResults::from_intermediate_and_req_internal(
entry.sub_aggregation,
req,
),
to: entry.to,
from: entry.from,
}
Expand Down

0 comments on commit aa391bf

Please sign in to comment.