diff --git a/src/aggregation/bucket/histogram/histogram.rs b/src/aggregation/bucket/histogram/histogram.rs index fb59693b9c..8472628069 100644 --- a/src/aggregation/bucket/histogram/histogram.rs +++ b/src/aggregation/bucket/histogram/histogram.rs @@ -206,6 +206,7 @@ pub struct SegmentHistogramCollector { field_type: Type, interval: f64, offset: f64, + min_doc_count: u64, first_bucket_num: i64, bounds: HistogramBounds, } @@ -215,6 +216,30 @@ impl SegmentHistogramCollector { self, agg_with_accessor: &BucketAggregationWithAccessor, ) -> crate::Result { + // Compute the number of buckets to validate against max num buckets + // Note: We use min_doc_count here, but it's only an lowerbound here, since were are on the + // intermediate level and after merging the number of documents of a bucket could exceed + // `min_doc_count`. + { + let cut_off_buckets_front = self + .buckets + .iter() + .take_while(|bucket| bucket.doc_count <= self.min_doc_count) + .count(); + let cut_off_buckets_back = self.buckets[cut_off_buckets_front..] + .iter() + .rev() + .take_while(|bucket| bucket.doc_count <= self.min_doc_count) + .count(); + let estimate_num_buckets = + self.buckets.len() - cut_off_buckets_front - cut_off_buckets_back; + + agg_with_accessor + .bucket_count + .add_count(estimate_num_buckets as u32); + agg_with_accessor.bucket_count.validate_bucket_count()?; + } + let mut buckets = Vec::with_capacity( self.buckets .iter() @@ -251,11 +276,6 @@ impl SegmentHistogramCollector { ); }; - agg_with_accessor - .bucket_count - .add_count(buckets.len() as u32); - agg_with_accessor.bucket_count.validate_bucket_count()?; - Ok(IntermediateBucketResult::Histogram { buckets }) } @@ -308,6 +328,7 @@ impl SegmentHistogramCollector { first_bucket_num, bounds, sub_aggregations, + min_doc_count: req.min_doc_count(), }) } @@ -1521,4 +1542,36 @@ mod tests { Ok(()) } + + #[test] + fn histogram_test_max_buckets_segments() -> crate::Result<()> { + let values = vec![0.0, 70000.0]; + + let index = get_test_index_from_values(true, &values)?; + + let agg_req: Aggregations = vec![( + "my_interval".to_string(), + Aggregation::Bucket(BucketAggregation { + bucket_agg: BucketAggregationType::Histogram(HistogramAggregation { + field: "score_f64".to_string(), + interval: 1.0, + ..Default::default() + }), + sub_aggregation: Default::default(), + }), + )] + .into_iter() + .collect(); + + let res = exec_request(agg_req, &index); + + assert_eq!( + res.unwrap_err().to_string(), + "An invalid argument was passed: 'Aborting aggregation because too many buckets were \ + created'" + .to_string() + ); + + Ok(()) + } }