Skip to content

Commit

Permalink
Merge pull request #1306 from PSeitz/histogram
Browse files Browse the repository at this point in the history
add Histogram aggregation
  • Loading branch information
PSeitz committed Mar 18, 2022
2 parents 958b2be + c5a6282 commit 141b9aa
Show file tree
Hide file tree
Showing 13 changed files with 2,113 additions and 180 deletions.
57 changes: 57 additions & 0 deletions src/aggregation/agg_req.rs
Expand Up @@ -48,15 +48,66 @@ use std::collections::{HashMap, HashSet};

use serde::{Deserialize, Serialize};

use super::bucket::HistogramAggregation;
pub use super::bucket::RangeAggregation;
use super::metric::{AverageAggregation, StatsAggregation};
use super::VecWithNames;

/// The top-level aggregation request structure, which contains [Aggregation] and their user defined
/// names. It is also used in [buckets](BucketAggregation) to define sub-aggregations.
///
/// The key is the user defined name of the aggregation.
pub type Aggregations = HashMap<String, Aggregation>;

/// Like Aggregations, but optimized to work with the aggregation result
#[derive(Clone, Debug)]
pub(crate) struct AggregationsInternal {
pub(crate) metrics: VecWithNames<MetricAggregation>,
pub(crate) buckets: VecWithNames<BucketAggregationInternal>,
}

impl From<Aggregations> for AggregationsInternal {
fn from(aggs: Aggregations) -> Self {
let mut metrics = vec![];
let mut buckets = vec![];
for (key, agg) in aggs {
match agg {
Aggregation::Bucket(bucket) => buckets.push((
key,
BucketAggregationInternal {
bucket_agg: bucket.bucket_agg,
sub_aggregation: bucket.sub_aggregation.into(),
},
)),
Aggregation::Metric(metric) => metrics.push((key, metric)),
}
}
Self {
metrics: VecWithNames::from_entries(metrics),
buckets: VecWithNames::from_entries(buckets),
}
}
}

#[derive(Clone, Debug)]
// Like BucketAggregation, but optimized to work with the result
pub(crate) struct BucketAggregationInternal {
/// Bucket aggregation strategy to group documents.
pub bucket_agg: BucketAggregationType,
/// The sub_aggregations in the buckets. Each bucket will aggregate on the document set in the
/// bucket.
pub sub_aggregation: AggregationsInternal,
}

impl BucketAggregationInternal {
pub(crate) fn as_histogram(&self) -> &HistogramAggregation {
match &self.bucket_agg {
BucketAggregationType::Range(_) => panic!("unexpected aggregation"),
BucketAggregationType::Histogram(histogram) => histogram,
}
}
}

/// Extract all fast field names used in the tree.
pub fn get_fast_field_names(aggs: &Aggregations) -> HashSet<String> {
let mut fast_field_names = Default::default();
Expand Down Expand Up @@ -123,12 +174,18 @@ pub enum BucketAggregationType {
/// Put data into buckets of user-defined ranges.
#[serde(rename = "range")]
Range(RangeAggregation),
/// Put data into buckets of user-defined ranges.
#[serde(rename = "histogram")]
Histogram(HistogramAggregation),
}

impl BucketAggregationType {
fn get_fast_field_names(&self, fast_field_names: &mut HashSet<String>) {
match self {
BucketAggregationType::Range(range) => fast_field_names.insert(range.field.to_string()),
BucketAggregationType::Histogram(histogram) => {
fast_field_names.insert(histogram.field.to_string())
}
};
}
}
Expand Down
5 changes: 4 additions & 1 deletion src/aggregation/agg_req_with_accessor.rs
@@ -1,7 +1,7 @@
//! This will enhance the request tree with access to the fastfield and metadata.

use super::agg_req::{Aggregation, Aggregations, BucketAggregationType, MetricAggregation};
use super::bucket::RangeAggregation;
use super::bucket::{HistogramAggregation, RangeAggregation};
use super::metric::{AverageAggregation, StatsAggregation};
use super::VecWithNames;
use crate::fastfield::{type_and_cardinality, DynamicFastFieldReader, FastType};
Expand Down Expand Up @@ -48,6 +48,9 @@ impl BucketAggregationWithAccessor {
field: field_name,
ranges: _,
}) => get_ff_reader_and_validate(reader, field_name)?,
BucketAggregationType::Histogram(HistogramAggregation {
field: field_name, ..
}) => get_ff_reader_and_validate(reader, field_name)?,
};
let sub_aggregation = sub_aggregation.clone();
Ok(BucketAggregationWithAccessor {
Expand Down
188 changes: 157 additions & 31 deletions src/aggregation/agg_result.rs
Expand Up @@ -10,25 +10,80 @@ use std::collections::HashMap;
use itertools::Itertools;
use serde::{Deserialize, Serialize};

use super::agg_req::{Aggregations, AggregationsInternal, BucketAggregationInternal};
use super::bucket::intermediate_buckets_to_final_buckets;
use super::intermediate_agg_result::{
IntermediateAggregationResult, IntermediateAggregationResults, IntermediateBucketResult,
IntermediateAggregationResults, IntermediateBucketResult, IntermediateHistogramBucketEntry,
IntermediateMetricResult, IntermediateRangeBucketEntry,
};
use super::metric::{SingleMetricResult, Stats};
use super::Key;

#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[derive(Clone, Default, Debug, PartialEq, Serialize, Deserialize)]
/// The final aggegation result.
pub struct AggregationResults(pub HashMap<String, AggregationResult>);

impl From<IntermediateAggregationResults> for AggregationResults {
fn from(tree: IntermediateAggregationResults) -> Self {
Self(
tree.0
.into_iter()
.map(|(key, agg)| (key, agg.into()))
.collect(),
)
impl AggregationResults {
/// Convert and intermediate result and its aggregation request to the final result
pub fn from_intermediate_and_req(
results: IntermediateAggregationResults,
agg: Aggregations,
) -> Self {
AggregationResults::from_intermediate_and_req_internal(results, &(agg.into()))
}
/// Convert and intermediate result and its aggregation request to the final result
///
/// Internal function, CollectorAggregations is used instead Aggregations, which is optimized
/// for internal processing
fn from_intermediate_and_req_internal(
results: IntermediateAggregationResults,
req: &AggregationsInternal,
) -> Self {
let mut result = HashMap::default();

// Important assumption:
// When the tree contains buckets/metric, we expect it to have all buckets/metrics from the
// request
if let Some(buckets) = results.buckets {
result.extend(buckets.into_iter().zip(req.buckets.values()).map(
|((key, bucket), req)| {
(
key,
AggregationResult::BucketResult(BucketResult::from_intermediate_and_req(
bucket, req,
)),
)
},
));
} else {
result.extend(req.buckets.iter().map(|(key, req)| {
let empty_bucket = IntermediateBucketResult::empty_from_req(&req.bucket_agg);
(
key.to_string(),
AggregationResult::BucketResult(BucketResult::from_intermediate_and_req(
empty_bucket,
req,
)),
)
}));
}

if let Some(metrics) = results.metrics {
result.extend(
metrics
.into_iter()
.map(|(key, metric)| (key, AggregationResult::MetricResult(metric.into()))),
);
} else {
result.extend(req.metrics.iter().map(|(key, req)| {
let empty_bucket = IntermediateMetricResult::empty_from_req(req);
(
key.to_string(),
AggregationResult::MetricResult(empty_bucket.into()),
)
}));
}
Self(result)
}
}

Expand All @@ -41,18 +96,6 @@ pub enum AggregationResult {
/// Metric result variant.
MetricResult(MetricResult),
}
impl From<IntermediateAggregationResult> for AggregationResult {
fn from(tree: IntermediateAggregationResult) -> Self {
match tree {
IntermediateAggregationResult::Bucket(bucket) => {
AggregationResult::BucketResult(bucket.into())
}
IntermediateAggregationResult::Metric(metric) => {
AggregationResult::MetricResult(metric.into())
}
}
}
}

#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[serde(untagged)]
Expand Down Expand Up @@ -81,21 +124,36 @@ impl From<IntermediateMetricResult> for MetricResult {
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[serde(untagged)]
pub enum BucketResult {
/// This is the default entry for a bucket, which contains a key, count, and optionally
/// This is the range entry for a bucket, which contains a key, count, from, to, and optionally
/// sub_aggregations.
Range {
/// The range buckets sorted by range.
buckets: Vec<RangeBucketEntry>,
},
/// This is the histogram entry for a bucket, which contains a key, count, and optionally
/// sub_aggregations.
Histogram {
/// The buckets.
///
/// If there are holes depends on the request, if min_doc_count is 0, then there are no
/// holes between the first and last bucket.
/// See [HistogramAggregation](super::bucket::HistogramAggregation)
buckets: Vec<BucketEntry>,
},
}

impl From<IntermediateBucketResult> for BucketResult {
fn from(result: IntermediateBucketResult) -> Self {
match result {
impl BucketResult {
fn from_intermediate_and_req(
bucket_result: IntermediateBucketResult,
req: &BucketAggregationInternal,
) -> Self {
match bucket_result {
IntermediateBucketResult::Range(range_map) => {
let mut buckets: Vec<RangeBucketEntry> = range_map
.into_iter()
.map(|(_, bucket)| bucket.into())
.map(|(_, bucket)| {
RangeBucketEntry::from_intermediate_and_req(bucket, &req.sub_aggregation)
})
.collect_vec();

buckets.sort_by(|a, b| {
Expand All @@ -106,6 +164,68 @@ impl From<IntermediateBucketResult> for BucketResult {
});
BucketResult::Range { buckets }
}
IntermediateBucketResult::Histogram { buckets } => {
let buckets = intermediate_buckets_to_final_buckets(
buckets,
req.as_histogram(),
&req.sub_aggregation,
);

BucketResult::Histogram { buckets }
}
}
}
}

/// This is the default entry for a bucket, which contains a key, count, and optionally
/// sub_aggregations.
///
/// # JSON Format
/// ```json
/// {
/// ...
/// "my_histogram": {
/// "buckets": [
/// {
/// "key": "2.0",
/// "doc_count": 5
/// },
/// {
/// "key": "4.0",
/// "doc_count": 2
/// },
/// {
/// "key": "6.0",
/// "doc_count": 3
/// }
/// ]
/// }
/// ...
/// }
/// ```
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct BucketEntry {
/// The identifier of the bucket.
pub key: Key,
/// Number of documents in the bucket.
pub doc_count: u64,
#[serde(flatten)]
/// sub-aggregations in this bucket.
pub sub_aggregation: AggregationResults,
}

impl BucketEntry {
pub(crate) fn from_intermediate_and_req(
entry: IntermediateHistogramBucketEntry,
req: &AggregationsInternal,
) -> Self {
BucketEntry {
key: Key::F64(entry.key),
doc_count: entry.doc_count,
sub_aggregation: AggregationResults::from_intermediate_and_req_internal(
entry.sub_aggregation,
req,
),
}
}
}
Expand Down Expand Up @@ -139,7 +259,7 @@ impl From<IntermediateBucketResult> for BucketResult {
/// }
/// ...
/// }
/// ```
/// ```
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct RangeBucketEntry {
/// The identifier of the bucket.
Expand All @@ -157,12 +277,18 @@ pub struct RangeBucketEntry {
pub to: Option<f64>,
}

impl From<IntermediateRangeBucketEntry> for RangeBucketEntry {
fn from(entry: IntermediateRangeBucketEntry) -> Self {
impl RangeBucketEntry {
fn from_intermediate_and_req(
entry: IntermediateRangeBucketEntry,
req: &AggregationsInternal,
) -> Self {
RangeBucketEntry {
key: entry.key,
doc_count: entry.doc_count,
sub_aggregation: entry.sub_aggregation.into(),
sub_aggregation: AggregationResults::from_intermediate_and_req_internal(
entry.sub_aggregation,
req,
),
to: entry.to,
from: entry.from,
}
Expand Down

0 comments on commit 141b9aa

Please sign in to comment.