Skip to content

Commit

Permalink
Add Histogram aggregation
Browse files Browse the repository at this point in the history
  • Loading branch information
PSeitz committed Mar 11, 2022
1 parent 4b62f79 commit a88e260
Show file tree
Hide file tree
Showing 10 changed files with 1,198 additions and 45 deletions.
7 changes: 7 additions & 0 deletions src/aggregation/agg_req.rs
Expand Up @@ -48,6 +48,7 @@ use std::collections::{HashMap, HashSet};

use serde::{Deserialize, Serialize};

use super::bucket::HistogramAggregation;
pub use super::bucket::RangeAggregation;
use super::metric::{AverageAggregation, StatsAggregation};

Expand Down Expand Up @@ -123,12 +124,18 @@ pub enum BucketAggregationType {
/// Put data into buckets of user-defined ranges.
#[serde(rename = "range")]
Range(RangeAggregation),
/// Put data into buckets of user-defined ranges.
#[serde(rename = "histogram")]
Histogram(HistogramAggregation),
}

impl BucketAggregationType {
fn get_fast_field_names(&self, fast_field_names: &mut HashSet<String>) {
match self {
BucketAggregationType::Range(range) => fast_field_names.insert(range.field.to_string()),
BucketAggregationType::Histogram(histogram) => {
fast_field_names.insert(histogram.field.to_string())
}
};
}
}
Expand Down
5 changes: 4 additions & 1 deletion src/aggregation/agg_req_with_accessor.rs
@@ -1,7 +1,7 @@
//! This will enhance the request tree with access to the fastfield and metadata.

use super::agg_req::{Aggregation, Aggregations, BucketAggregationType, MetricAggregation};
use super::bucket::RangeAggregation;
use super::bucket::{HistogramAggregation, RangeAggregation};
use super::metric::{AverageAggregation, StatsAggregation};
use super::VecWithNames;
use crate::fastfield::{type_and_cardinality, DynamicFastFieldReader, FastType};
Expand Down Expand Up @@ -48,6 +48,9 @@ impl BucketAggregationWithAccessor {
field: field_name,
ranges: _,
}) => get_ff_reader_and_validate(reader, field_name)?,
BucketAggregationType::Histogram(HistogramAggregation {
field: field_name, ..
}) => get_ff_reader_and_validate(reader, field_name)?,
};
let sub_aggregation = sub_aggregation.clone();
Ok(BucketAggregationWithAccessor {
Expand Down
105 changes: 101 additions & 4 deletions src/aggregation/agg_result.rs
Expand Up @@ -10,14 +10,15 @@ use std::collections::HashMap;
use itertools::Itertools;
use serde::{Deserialize, Serialize};

use super::bucket::generate_buckets;
use super::intermediate_agg_result::{
IntermediateAggregationResult, IntermediateAggregationResults, IntermediateBucketResult,
IntermediateMetricResult, IntermediateRangeBucketEntry,
IntermediateHistogramBucketEntry, IntermediateMetricResult, IntermediateRangeBucketEntry,
};
use super::metric::{SingleMetricResult, Stats};
use super::Key;

#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[derive(Clone, Default, Debug, PartialEq, Serialize, Deserialize)]
/// The final aggegation result.
pub struct AggregationResults(pub HashMap<String, AggregationResult>);

Expand Down Expand Up @@ -81,12 +82,18 @@ impl From<IntermediateMetricResult> for MetricResult {
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[serde(untagged)]
pub enum BucketResult {
/// This is the default entry for a bucket, which contains a key, count, and optionally
/// This is the range entry for a bucket, which contains a key, count, from, to, and optionally
/// sub_aggregations.
Range {
/// The range buckets sorted by range.
buckets: Vec<RangeBucketEntry>,
},
/// This is the histogram entry for a bucket, which contains a key, count, and optionally
/// sub_aggregations.
Histogram {
/// The buckets.
buckets: Vec<BucketEntry>,
},
}

impl From<IntermediateBucketResult> for BucketResult {
Expand All @@ -106,6 +113,96 @@ impl From<IntermediateBucketResult> for BucketResult {
});
BucketResult::Range { buckets }
}
IntermediateBucketResult::Histogram { buckets, req } => {
let buckets = if req.min_doc_count() == 0 {
// We need to fill up the buckets for the total ranges, so that there are no
// gaps
let max = buckets
.iter()
.map(|bucket| bucket.key)
.fold(f64::NEG_INFINITY, f64::max);
let min = buckets
.iter()
.map(|bucket| bucket.key)
.fold(f64::INFINITY, f64::min);
let all_buckets = generate_buckets(&req, min, max);

buckets
.into_iter()
.merge_join_by(all_buckets.into_iter(), |existing_bucket, all_bucket| {
existing_bucket
.key
.partial_cmp(all_bucket)
.unwrap_or(Ordering::Equal)
})
.map(|either| match either {
itertools::EitherOrBoth::Both(existing, _) => existing.into(),
itertools::EitherOrBoth::Left(existing) => existing.into(),
// Add missing bucket
itertools::EitherOrBoth::Right(bucket) => BucketEntry {
key: Key::F64(bucket),
doc_count: 0,
sub_aggregation: Default::default(),
},
})
.collect_vec()
} else {
buckets
.into_iter()
.filter(|bucket| bucket.doc_count >= req.min_doc_count())
.map(|bucket| bucket.into())
.collect_vec()
};

BucketResult::Histogram { buckets }
}
}
}
}

/// This is the default entry for a bucket, which contains a key, count, and optionally
/// sub_aggregations.
///
/// # JSON Format
/// ```ignore
/// {
/// ...
/// "my_histogram": {
/// "buckets": [
/// {
/// "key": "2.0",
/// "doc_count": 5
/// },
/// {
/// "key": "4.0",
/// "doc_count": 2
/// },
/// {
/// "key": "6.0",
/// "doc_count": 3
/// }
/// ]
/// }
/// ...
/// }
/// ```
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct BucketEntry {
/// The identifier of the bucket.
pub key: Key,
/// Number of documents in the bucket.
pub doc_count: u64,
#[serde(flatten)]
/// sub-aggregations in this bucket.
pub sub_aggregation: AggregationResults,
}

impl From<IntermediateHistogramBucketEntry> for BucketEntry {
fn from(entry: IntermediateHistogramBucketEntry) -> Self {
BucketEntry {
key: Key::F64(entry.key),
doc_count: entry.doc_count,
sub_aggregation: entry.sub_aggregation.into(),
}
}
}
Expand All @@ -114,7 +211,7 @@ impl From<IntermediateBucketResult> for BucketResult {
/// sub_aggregations.
///
/// # JSON Format
/// ```json
/// ```ignore
/// {
/// ...
/// "my_ranges": {
Expand Down

0 comments on commit a88e260

Please sign in to comment.