Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add Histogram aggregation #1306

Merged
merged 14 commits into from Mar 18, 2022
56 changes: 56 additions & 0 deletions src/aggregation/agg_req.rs
Expand Up @@ -48,15 +48,65 @@ use std::collections::{HashMap, HashSet};

use serde::{Deserialize, Serialize};

use super::bucket::HistogramAggregation;
pub use super::bucket::RangeAggregation;
use super::metric::{AverageAggregation, StatsAggregation};
use super::VecWithNames;

/// The top-level aggregation request structure, which contains [Aggregation] and their user defined
/// names. It is also used in [buckets](BucketAggregation) to define sub-aggregations.
///
/// The key is the user defined name of the aggregation.
pub type Aggregations = HashMap<String, Aggregation>;

/// Like Aggregations, but optimized to work with the aggregation result
#[derive(Clone, Debug)]
pub(crate) struct CollectorAggregations {
pub(crate) metrics: VecWithNames<MetricAggregation>,
pub(crate) buckets: VecWithNames<CollectorBucketAggregation>,
}

impl From<Aggregations> for CollectorAggregations {
fn from(aggs: Aggregations) -> Self {
let mut metrics = vec![];
let mut buckets = vec![];
for (key, agg) in aggs {
match agg {
Aggregation::Bucket(bucket) => buckets.push((
key,
CollectorBucketAggregation {
bucket_agg: bucket.bucket_agg,
sub_aggregation: bucket.sub_aggregation.into(),
},
)),
Aggregation::Metric(metric) => metrics.push((key, metric)),
}
}
Self {
metrics: VecWithNames::from_entries(metrics),
buckets: VecWithNames::from_entries(buckets),
}
}
}

#[derive(Clone, Debug)]
pub(crate) struct CollectorBucketAggregation {
/// Bucket aggregation strategy to group documents.
pub bucket_agg: BucketAggregationType,
/// The sub_aggregations in the buckets. Each bucket will aggregate on the document set in the
/// bucket.
pub sub_aggregation: CollectorAggregations,
}

impl CollectorBucketAggregation {
pub(crate) fn as_histogram(&self) -> &HistogramAggregation {
match &self.bucket_agg {
BucketAggregationType::Range(_) => panic!("unexpected aggregation"),
BucketAggregationType::Histogram(histogram) => histogram,
}
}
}

/// Extract all fast field names used in the tree.
pub fn get_fast_field_names(aggs: &Aggregations) -> HashSet<String> {
let mut fast_field_names = Default::default();
Expand Down Expand Up @@ -123,12 +173,18 @@ pub enum BucketAggregationType {
/// Put data into buckets of user-defined ranges.
#[serde(rename = "range")]
Range(RangeAggregation),
/// Put data into buckets of user-defined ranges.
#[serde(rename = "histogram")]
Histogram(HistogramAggregation),
}

impl BucketAggregationType {
fn get_fast_field_names(&self, fast_field_names: &mut HashSet<String>) {
match self {
BucketAggregationType::Range(range) => fast_field_names.insert(range.field.to_string()),
BucketAggregationType::Histogram(histogram) => {
fast_field_names.insert(histogram.field.to_string())
}
};
}
}
Expand Down
5 changes: 4 additions & 1 deletion src/aggregation/agg_req_with_accessor.rs
@@ -1,7 +1,7 @@
//! This will enhance the request tree with access to the fastfield and metadata.

use super::agg_req::{Aggregation, Aggregations, BucketAggregationType, MetricAggregation};
use super::bucket::RangeAggregation;
use super::bucket::{HistogramAggregation, RangeAggregation};
use super::metric::{AverageAggregation, StatsAggregation};
use super::VecWithNames;
use crate::fastfield::{type_and_cardinality, DynamicFastFieldReader, FastType};
Expand Down Expand Up @@ -48,6 +48,9 @@ impl BucketAggregationWithAccessor {
field: field_name,
ranges: _,
}) => get_ff_reader_and_validate(reader, field_name)?,
BucketAggregationType::Histogram(HistogramAggregation {
field: field_name, ..
}) => get_ff_reader_and_validate(reader, field_name)?,
};
let sub_aggregation = sub_aggregation.clone();
Ok(BucketAggregationWithAccessor {
Expand Down
210 changes: 179 additions & 31 deletions src/aggregation/agg_result.rs
Expand Up @@ -10,25 +10,65 @@ use std::collections::HashMap;
use itertools::Itertools;
use serde::{Deserialize, Serialize};

use super::agg_req::{Aggregations, CollectorAggregations, CollectorBucketAggregation};
use super::bucket::generate_buckets;
use super::intermediate_agg_result::{
IntermediateAggregationResult, IntermediateAggregationResults, IntermediateBucketResult,
IntermediateAggregationResults, IntermediateBucketResult, IntermediateHistogramBucketEntry,
IntermediateMetricResult, IntermediateRangeBucketEntry,
};
use super::metric::{SingleMetricResult, Stats};
use super::Key;

#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[derive(Clone, Default, Debug, PartialEq, Serialize, Deserialize)]
/// The final aggegation result.
pub struct AggregationResults(pub HashMap<String, AggregationResult>);

impl From<IntermediateAggregationResults> for AggregationResults {
fn from(tree: IntermediateAggregationResults) -> Self {
Self(
tree.0
.into_iter()
.map(|(key, agg)| (key, agg.into()))
.collect(),
)
impl From<(IntermediateAggregationResults, Aggregations)> for AggregationResults {
fn from(tree_and_req: (IntermediateAggregationResults, Aggregations)) -> Self {
let agg: CollectorAggregations = tree_and_req.1.into();
(tree_and_req.0, &agg).into()
}
}

impl From<(IntermediateAggregationResults, &CollectorAggregations)> for AggregationResults {
fn from(data: (IntermediateAggregationResults, &CollectorAggregations)) -> Self {
let tree = data.0;
let req = data.1;
let mut result = HashMap::default();

// Important assumption:
// When the tree contains buckets/metric, we expect it to have all buckets/metrics from the
fulmicoton marked this conversation as resolved.
Show resolved Hide resolved
fulmicoton marked this conversation as resolved.
Show resolved Hide resolved
// request
if let Some(buckets) = tree.buckets {
result.extend(buckets.into_iter().zip(req.buckets.values()).map(
|((key, bucket), req)| (key, AggregationResult::BucketResult((bucket, req).into())),
));
} else {
result.extend(req.buckets.iter().map(|(key, req)| {
let empty_bucket = IntermediateBucketResult::empty_from_req(&req.bucket_agg);
(
key.to_string(),
AggregationResult::BucketResult((empty_bucket, req).into()),
)
}));
}

if let Some(metrics) = tree.metrics {
result.extend(
metrics
.into_iter()
.map(|(key, metric)| (key, AggregationResult::MetricResult(metric.into()))),
);
} else {
result.extend(req.metrics.iter().map(|(key, req)| {
let empty_bucket = IntermediateMetricResult::empty_from_req(req);
(
key.to_string(),
AggregationResult::MetricResult(empty_bucket.into()),
)
}));
}
Self(result)
}
}

Expand All @@ -41,18 +81,6 @@ pub enum AggregationResult {
/// Metric result variant.
MetricResult(MetricResult),
}
impl From<IntermediateAggregationResult> for AggregationResult {
fn from(tree: IntermediateAggregationResult) -> Self {
match tree {
IntermediateAggregationResult::Bucket(bucket) => {
AggregationResult::BucketResult(bucket.into())
}
IntermediateAggregationResult::Metric(metric) => {
AggregationResult::MetricResult(metric.into())
}
}
}
}

#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[serde(untagged)]
Expand Down Expand Up @@ -81,21 +109,33 @@ impl From<IntermediateMetricResult> for MetricResult {
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[serde(untagged)]
pub enum BucketResult {
/// This is the default entry for a bucket, which contains a key, count, and optionally
/// This is the range entry for a bucket, which contains a key, count, from, to, and optionally
/// sub_aggregations.
Range {
/// The range buckets sorted by range.
buckets: Vec<RangeBucketEntry>,
},
/// This is the histogram entry for a bucket, which contains a key, count, and optionally
/// sub_aggregations.
Histogram {
/// The buckets.
///
/// If there are holes depends on the request, if min_doc_count is 0, then there are no
/// holes between the first and last bucket.
/// See [HistogramAggregation](super::bucket::HistogramAggregation)
buckets: Vec<BucketEntry>,
fulmicoton marked this conversation as resolved.
Show resolved Hide resolved
},
}

impl From<IntermediateBucketResult> for BucketResult {
fn from(result: IntermediateBucketResult) -> Self {
match result {
impl From<(IntermediateBucketResult, &CollectorBucketAggregation)> for BucketResult {
fn from(result_and_req: (IntermediateBucketResult, &CollectorBucketAggregation)) -> Self {
let bucket_result = result_and_req.0;
let req = result_and_req.1;
match bucket_result {
IntermediateBucketResult::Range(range_map) => {
let mut buckets: Vec<RangeBucketEntry> = range_map
.into_iter()
.map(|(_, bucket)| bucket.into())
.map(|(_, bucket)| (bucket, &req.sub_aggregation).into())
.collect_vec();

buckets.sort_by(|a, b| {
Expand All @@ -106,6 +146,112 @@ impl From<IntermediateBucketResult> for BucketResult {
});
BucketResult::Range { buckets }
}
IntermediateBucketResult::Histogram { buckets } => {
let histogram_req = req.as_histogram();
let buckets = if histogram_req.min_doc_count() == 0 {
PSeitz marked this conversation as resolved.
Show resolved Hide resolved
// With min_doc_count != 0, we may need to add buckets, so that there are no
// gaps, since intermediate result does not contain empty buckets (filtered to
// reduce serialization size).

let (min, max) = if buckets.is_empty() {
(f64::MAX, f64::MIN)
} else {
let min = buckets[0].key;
let max = buckets[buckets.len() - 1].key;
(min, max)
};

let fill_gaps_buckets = generate_buckets(histogram_req, min, max);

let sub_aggregation =
IntermediateAggregationResults::empty_from_req(&req.sub_aggregation);

buckets
.into_iter()
.merge_join_by(
fill_gaps_buckets.into_iter(),
|existing_bucket, fill_gaps_bucket| {
existing_bucket
.key
.partial_cmp(fill_gaps_bucket)
.unwrap_or(Ordering::Equal)
},
)
.map(|either| match either {
itertools::EitherOrBoth::Both(existing, _) => {
(existing, &req.sub_aggregation).into()
}
itertools::EitherOrBoth::Left(existing) => {
(existing, &req.sub_aggregation).into()
}
// Add missing bucket
itertools::EitherOrBoth::Right(bucket) => BucketEntry {
key: Key::F64(bucket),
doc_count: 0,
sub_aggregation: (sub_aggregation.clone(), &req.sub_aggregation)
.into(),
},
})
.collect_vec()
} else {
buckets
.into_iter()
.filter(|bucket| bucket.doc_count >= histogram_req.min_doc_count())
.map(|bucket| (bucket, &req.sub_aggregation).into())
.collect_vec()
};

BucketResult::Histogram { buckets }
}
}
}
}

/// This is the default entry for a bucket, which contains a key, count, and optionally
/// sub_aggregations.
///
/// # JSON Format
/// ```json
/// {
/// ...
/// "my_histogram": {
/// "buckets": [
/// {
/// "key": "2.0",
/// "doc_count": 5
/// },
/// {
/// "key": "4.0",
/// "doc_count": 2
/// },
/// {
/// "key": "6.0",
/// "doc_count": 3
/// }
/// ]
/// }
/// ...
/// }
/// ```
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct BucketEntry {
/// The identifier of the bucket.
pub key: Key,
/// Number of documents in the bucket.
pub doc_count: u64,
#[serde(flatten)]
/// sub-aggregations in this bucket.
pub sub_aggregation: AggregationResults,
}

impl From<(IntermediateHistogramBucketEntry, &CollectorAggregations)> for BucketEntry {
fn from(entry_and_req: (IntermediateHistogramBucketEntry, &CollectorAggregations)) -> Self {
let entry = entry_and_req.0;
let req = entry_and_req.1;
BucketEntry {
key: Key::F64(entry.key),
doc_count: entry.doc_count,
sub_aggregation: (entry.sub_aggregation, req).into(),
}
}
}
Expand Down Expand Up @@ -139,7 +285,7 @@ impl From<IntermediateBucketResult> for BucketResult {
/// }
/// ...
/// }
/// ```
/// ```
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct RangeBucketEntry {
/// The identifier of the bucket.
Expand All @@ -157,12 +303,14 @@ pub struct RangeBucketEntry {
pub to: Option<f64>,
}

impl From<IntermediateRangeBucketEntry> for RangeBucketEntry {
fn from(entry: IntermediateRangeBucketEntry) -> Self {
impl From<(IntermediateRangeBucketEntry, &CollectorAggregations)> for RangeBucketEntry {
fn from(entry_and_req: (IntermediateRangeBucketEntry, &CollectorAggregations)) -> Self {
let entry = entry_and_req.0;
let req = entry_and_req.1;
RangeBucketEntry {
key: entry.key,
doc_count: entry.doc_count,
sub_aggregation: entry.sub_aggregation.into(),
sub_aggregation: (entry.sub_aggregation, req).into(),
to: entry.to,
from: entry.from,
}
Expand Down