From 2c50b02eb3dcea0f8c6115c7d15cc73cfc5d5db7 Mon Sep 17 00:00:00 2001 From: PSeitz Date: Mon, 12 Dec 2022 04:40:15 +0100 Subject: [PATCH] Fix max bucket limit in histogram (#1703) * Fix max bucket limit in histogram The max bucket limit in histogram was broken, since some code introduced temporary filtering of buckets, which then resulted into an incorrect increment on the bucket count. The provided solution covers more scenarios, but there are still some scenarios unhandled (See #1702). * Apply suggestions from code review Co-authored-by: Paul Masurel Co-authored-by: Paul Masurel --- src/aggregation/bucket/histogram/histogram.rs | 63 +++++++++++++++++-- 1 file changed, 58 insertions(+), 5 deletions(-) diff --git a/src/aggregation/bucket/histogram/histogram.rs b/src/aggregation/bucket/histogram/histogram.rs index fb59693b9c..8472628069 100644 --- a/src/aggregation/bucket/histogram/histogram.rs +++ b/src/aggregation/bucket/histogram/histogram.rs @@ -206,6 +206,7 @@ pub struct SegmentHistogramCollector { field_type: Type, interval: f64, offset: f64, + min_doc_count: u64, first_bucket_num: i64, bounds: HistogramBounds, } @@ -215,6 +216,30 @@ impl SegmentHistogramCollector { self, agg_with_accessor: &BucketAggregationWithAccessor, ) -> crate::Result { + // Compute the number of buckets to validate against max num buckets + // Note: We use min_doc_count here, but it's only an lowerbound here, since were are on the + // intermediate level and after merging the number of documents of a bucket could exceed + // `min_doc_count`. + { + let cut_off_buckets_front = self + .buckets + .iter() + .take_while(|bucket| bucket.doc_count <= self.min_doc_count) + .count(); + let cut_off_buckets_back = self.buckets[cut_off_buckets_front..] + .iter() + .rev() + .take_while(|bucket| bucket.doc_count <= self.min_doc_count) + .count(); + let estimate_num_buckets = + self.buckets.len() - cut_off_buckets_front - cut_off_buckets_back; + + agg_with_accessor + .bucket_count + .add_count(estimate_num_buckets as u32); + agg_with_accessor.bucket_count.validate_bucket_count()?; + } + let mut buckets = Vec::with_capacity( self.buckets .iter() @@ -251,11 +276,6 @@ impl SegmentHistogramCollector { ); }; - agg_with_accessor - .bucket_count - .add_count(buckets.len() as u32); - agg_with_accessor.bucket_count.validate_bucket_count()?; - Ok(IntermediateBucketResult::Histogram { buckets }) } @@ -308,6 +328,7 @@ impl SegmentHistogramCollector { first_bucket_num, bounds, sub_aggregations, + min_doc_count: req.min_doc_count(), }) } @@ -1521,4 +1542,36 @@ mod tests { Ok(()) } + + #[test] + fn histogram_test_max_buckets_segments() -> crate::Result<()> { + let values = vec![0.0, 70000.0]; + + let index = get_test_index_from_values(true, &values)?; + + let agg_req: Aggregations = vec![( + "my_interval".to_string(), + Aggregation::Bucket(BucketAggregation { + bucket_agg: BucketAggregationType::Histogram(HistogramAggregation { + field: "score_f64".to_string(), + interval: 1.0, + ..Default::default() + }), + sub_aggregation: Default::default(), + }), + )] + .into_iter() + .collect(); + + let res = exec_request(agg_req, &index); + + assert_eq!( + res.unwrap_err().to_string(), + "An invalid argument was passed: 'Aborting aggregation because too many buckets were \ + created'" + .to_string() + ); + + Ok(()) + } }