From 38a4490465506f2f63c8efd3b57c364a57cf7eb2 Mon Sep 17 00:00:00 2001 From: Paul Banks Date: Wed, 14 Apr 2021 15:22:35 +0100 Subject: [PATCH 1/2] Don't expire Prometheus metrics that have been explicitly defined --- prometheus/prometheus.go | 34 ++++++++--------- prometheus/prometheus_test.go | 71 +++++++++++++++++++++++++++++++++++ 2 files changed, 87 insertions(+), 18 deletions(-) diff --git a/prometheus/prometheus.go b/prometheus/prometheus.go index 1dcf530..4a7f516 100644 --- a/prometheus/prometheus.go +++ b/prometheus/prometheus.go @@ -5,7 +5,6 @@ package prometheus import ( "fmt" "log" - "math" "regexp" "strings" "sync" @@ -31,17 +30,16 @@ type PrometheusOpts struct { Expiration time.Duration Registerer prometheus.Registerer - // Gauges, Summaries, and Counters allow us to pre-declare metrics by giving their Name, Help, and ConstLabels to - // the PrometheusSink when it is created. Metrics declared in this way will be initialized at zero and will not be - // deleted when their expiry is reached. - // - Gauges and Summaries will be set to NaN when they expire. - // - Counters continue to Collect their last known value. - // Ex: - // PrometheusOpts{ + // Gauges, Summaries, and Counters allow us to pre-declare metrics by giving + // their Name, Help, and ConstLabels to the PrometheusSink when it is created. + // Metrics declared in this way will be initialized at zero and will not be + // deleted or altered when their expiry is reached. + // + // Ex: PrometheusOpts{ // Expiration: 10 * time.Second, // Gauges: []GaugeDefinition{ // { - // Name: []string{ "application", "component", "measurement"}, + // Name: []string{ "application", "component", "measurement"}, // Help: "application_component_measurement provides an example of how to declare static metrics", // ConstLabels: []metrics.Label{ { Name: "my_label", Value: "does_not_change" }, }, // }, @@ -139,21 +137,24 @@ func (p *PrometheusSink) Describe(c chan<- *prometheus.Desc) { // logic to clean up ephemeral metrics if their value haven't been set for a // duration exceeding our allowed expiration time. func (p *PrometheusSink) Collect(c chan<- prometheus.Metric) { + p.collectAtTime(c, time.Now()) +} + +// collectAtTime allows internal testing of the expiry based logic here without +// mocking clocks or making tests timing sensitive. +func (p *PrometheusSink) collectAtTime(c chan<- prometheus.Metric, t time.Time) { expire := p.expiration != 0 - now := time.Now() p.gauges.Range(func(k, v interface{}) bool { if v == nil { return true } g := v.(*gauge) lastUpdate := g.updatedAt - if expire && lastUpdate.Add(p.expiration).Before(now) { + if expire && lastUpdate.Add(p.expiration).Before(t) { if g.canDelete { p.gauges.Delete(k) return true } - // We have not observed the gauge this interval so we don't know its value. - g.Set(math.NaN()) } g.Collect(c) return true @@ -164,13 +165,11 @@ func (p *PrometheusSink) Collect(c chan<- prometheus.Metric) { } s := v.(*summary) lastUpdate := s.updatedAt - if expire && lastUpdate.Add(p.expiration).Before(now) { + if expire && lastUpdate.Add(p.expiration).Before(t) { if s.canDelete { p.summaries.Delete(k) return true } - // We have observed nothing in this interval. - s.Observe(math.NaN()) } s.Collect(c) return true @@ -181,12 +180,11 @@ func (p *PrometheusSink) Collect(c chan<- prometheus.Metric) { } count := v.(*counter) lastUpdate := count.updatedAt - if expire && lastUpdate.Add(p.expiration).Before(now) { + if expire && lastUpdate.Add(p.expiration).Before(t) { if count.canDelete { p.counters.Delete(k) return true } - // Counters remain at their previous value when not observed, so we do not set it to NaN. } count.Collect(c) return true diff --git a/prometheus/prometheus_test.go b/prometheus/prometheus_test.go index d0e223b..c670383 100644 --- a/prometheus/prometheus_test.go +++ b/prometheus/prometheus_test.go @@ -107,6 +107,77 @@ func TestDefinitions(t *testing.T) { } return true }) + + // Set a bunch of values + sink.SetGauge(gaugeDef.Name, 42) + sink.AddSample(summaryDef.Name, 42) + sink.IncrCounter(counterDef.Name, 1) + + // Test that the expiry behavior works as expected. First pick a time which + // is after all the actual updates above. + timeAfterUpdates := time.Now() + // Buffer the chan to make sure it doesn't block. We expect only 3 metrics to + // be produced but give some extra room as this will hand the test if we don't + // have a big enough buffer. + ch := make(chan prometheus.Metric, 10) + + // Collect the metrics as if it's some time in the future, way beyond the 5 + // second expiry. + sink.collectAtTime(ch, timeAfterUpdates.Add(10*time.Second)) + + // We should see all the metrics desired Expiry behavior + expectedNum := 3 + for i := 0; i < expectedNum; i++ { + select { + case m := <-ch: + // m is a prometheus.Metric the only thing we can do is Write it to a + // protobuf type and read from there. + var pb dto.Metric + if err := m.Write(&pb); err != nil { + t.Fatalf("unexpected error reading metric: %s", err) + } + desc := m.Desc().String() + switch { + case pb.Counter != nil: + if !strings.Contains(desc, counterDef.Help) { + t.Fatalf("expected counter to include correct help=%s, but was %s", counterDef.Help, m.Desc().String()) + } + // Counters should _not_ reset. We could assert not nil too but that + // would be a bug in prometheus client code so assume it's never nil... + if *pb.Counter.Value != float64(1) { + t.Fatalf("expected defined counter to have value 42 after expiring, got %f", *pb.Counter.Value) + } + case pb.Gauge != nil: + if !strings.Contains(desc, gaugeDef.Help) { + t.Fatalf("expected gauge to include correct help=%s, but was %s", gaugeDef.Help, m.Desc().String()) + } + // Gauges should _not_ reset. We could assert not nil too but that + // would be a bug in prometheus client code so assume it's never nil... + if *pb.Gauge.Value != float64(42) { + t.Fatalf("expected defined gauge to have value 42 after expiring, got %f", *pb.Gauge.Value) + } + case pb.Summary != nil: + if !strings.Contains(desc, summaryDef.Help) { + t.Fatalf("expected summary to include correct help=%s, but was %s", summaryDef.Help, m.Desc().String()) + } + // Summaries should not be reset. Previous behavior here did attempt to + // reset them by calling Observe(NaN) which results in all values being + // set to NaN but doesn't actually clear the time window of data + // predictably so future observations could also end up as NaN until the + // NaN sample has aged out of the window. Since the summary is already + // aging out a fixed time window (we fix it a 10 seconds currently for + // all summaries and it's not affected by Expiration option), there's no + // point in trying to reset it after "expiry". + if *pb.Summary.SampleSum != float64(42) { + t.Fatalf("expected defined summary sum to have value 42 after expiring, got %f", *pb.Summary.SampleSum) + } + default: + t.Fatalf("unexpected metric type %v", pb) + } + case <-time.After(100 * time.Millisecond): + t.Fatalf("Timed out waiting to collect expected metric. Got %d, want %d", i, expectedNum) + } + } } func MockGetHostname() string { From 60a4d2a90acc890f65958589bdad46526f037457 Mon Sep 17 00:00:00 2001 From: Paul Banks Date: Tue, 20 Apr 2021 14:54:16 +0100 Subject: [PATCH 2/2] Update prometheus/prometheus_test.go Co-authored-by: Nick Cabatoff --- prometheus/prometheus_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/prometheus/prometheus_test.go b/prometheus/prometheus_test.go index c670383..a629a40 100644 --- a/prometheus/prometheus_test.go +++ b/prometheus/prometheus_test.go @@ -117,7 +117,7 @@ func TestDefinitions(t *testing.T) { // is after all the actual updates above. timeAfterUpdates := time.Now() // Buffer the chan to make sure it doesn't block. We expect only 3 metrics to - // be produced but give some extra room as this will hand the test if we don't + // be produced but give some extra room as this will hang the test if we don't // have a big enough buffer. ch := make(chan prometheus.Metric, 10)