diff --git a/prometheus/prometheus.go b/prometheus/prometheus.go index abb8c49..db67a4d 100644 --- a/prometheus/prometheus.go +++ b/prometheus/prometheus.go @@ -5,6 +5,7 @@ package prometheus import ( "fmt" "log" + "math" "regexp" "strings" "sync" @@ -29,6 +30,26 @@ type PrometheusOpts struct { // untracked. If the value is zero, a metric is never expired. Expiration time.Duration Registerer prometheus.Registerer + + // Gauges, Summaries, and Counters allow us to pre-declare metrics by giving their Name, Help, and ConstLabels to + // the PrometheusSink when it is created. Metrics declared in this way will be initialized at zero and will not be + // deleted when their expiry is reached. + // - Gauges and Summaries will be set to NaN when they expire. + // - Counters continue to Collect their last known value. + // Ex: + // PrometheusOpts{ + // Expiration: 10 * time.Second, + // Gauges: []GaugeDefinition{ + // { + // Name: []string{ "application", "component", "measurement"}, + // Help: "application_component_measurement provides an example of how to declare static metrics", + // ConstLabels: []metrics.Label{ { Name: "my_label", Value: "does_not_change" }, }, + // }, + // }, + // } + GaugeDefinitions []GaugeDefinition + SummaryDefinitions []SummaryDefinition + CounterDefinitions []CounterDefinition } type PrometheusSink struct { @@ -39,19 +60,44 @@ type PrometheusSink struct { expiration time.Duration } -type PrometheusGauge struct { +// GaugeDefinition can be provided to PrometheusOpts to declare a constant gauge that is not deleted on expiry. +type GaugeDefinition struct { + Name []string + ConstLabels []metrics.Label + Help string +} + +type gauge struct { prometheus.Gauge updatedAt time.Time + // canDelete is set if the metric is created during runtime so we know it's ephemeral and can delete it on expiry. + canDelete bool +} + +// SummaryDefinition can be provided to PrometheusOpts to declare a constant summary that is not deleted on expiry. +type SummaryDefinition struct { + Name []string + ConstLabels []metrics.Label + Help string } -type PrometheusSummary struct { +type summary struct { prometheus.Summary updatedAt time.Time + canDelete bool +} + +// CounterDefinition can be provided to PrometheusOpts to declare a constant counter that is not deleted on expiry. +type CounterDefinition struct { + Name []string + ConstLabels []metrics.Label + Help string } -type PrometheusCounter struct { +type counter struct { prometheus.Counter updatedAt time.Time + canDelete bool } // NewPrometheusSink creates a new PrometheusSink using the default options. @@ -68,6 +114,10 @@ func NewPrometheusSinkFrom(opts PrometheusOpts) (*PrometheusSink, error) { expiration: opts.Expiration, } + initGauges(&sink.gauges, opts.GaugeDefinitions) + initSummaries(&sink.summaries, opts.SummaryDefinitions) + initCounters(&sink.counters, opts.CounterDefinitions) + reg := opts.Registerer if reg == nil { reg = prometheus.DefaultRegisterer @@ -90,43 +140,101 @@ func (p *PrometheusSink) Collect(c chan<- prometheus.Metric) { expire := p.expiration != 0 now := time.Now() p.gauges.Range(func(k, v interface{}) bool { - if v != nil { - lastUpdate := v.(*PrometheusGauge).updatedAt - if expire && lastUpdate.Add(p.expiration).Before(now) { + if v == nil { + return true + } + g := v.(*gauge) + lastUpdate := g.updatedAt + if expire && lastUpdate.Add(p.expiration).Before(now) { + if g.canDelete { p.gauges.Delete(k) - } else { - v.(*PrometheusGauge).Collect(c) + return true } + // We have not observed the gauge this interval so we don't know its value. + g.Set(math.NaN()) } + g.Collect(c) return true }) p.summaries.Range(func(k, v interface{}) bool { - if v != nil { - lastUpdate := v.(*PrometheusSummary).updatedAt - if expire && lastUpdate.Add(p.expiration).Before(now) { + if v == nil { + return true + } + s := v.(*summary) + lastUpdate := s.updatedAt + if expire && lastUpdate.Add(p.expiration).Before(now) { + if s.canDelete { p.summaries.Delete(k) - } else { - v.(*PrometheusSummary).Collect(c) + return true } + // We have observed nothing in this interval. + s.Observe(math.NaN()) } + s.Collect(c) return true }) p.counters.Range(func(k, v interface{}) bool { - if v != nil { - lastUpdate := v.(*PrometheusCounter).updatedAt - if expire && lastUpdate.Add(p.expiration).Before(now) { + if v == nil { + return true + } + count := v.(*counter) + lastUpdate := count.updatedAt + if expire && lastUpdate.Add(p.expiration).Before(now) { + if count.canDelete { p.counters.Delete(k) - } else { - v.(*PrometheusCounter).Collect(c) + return true } + // Counters remain at their previous value when not observed, so we do not set it to NaN. } + count.Collect(c) return true }) } +func initGauges(m *sync.Map, gauges []GaugeDefinition) { + for _, g := range gauges { + key, hash := flattenKey(g.Name, g.ConstLabels) + pG := prometheus.NewGauge(prometheus.GaugeOpts{ + Name: key, + Help: g.Help, + ConstLabels: prometheusLabels(g.ConstLabels), + }) + m.Store(hash, &gauge{ Gauge: pG }) + } + return +} + +func initSummaries(m *sync.Map, summaries []SummaryDefinition) { + for _, s := range summaries { + key, hash := flattenKey(s.Name, s.ConstLabels) + pS := prometheus.NewSummary(prometheus.SummaryOpts{ + Name: key, + Help: s.Help, + MaxAge: 10 * time.Second, + ConstLabels: prometheusLabels(s.ConstLabels), + Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, + }) + m.Store(hash, &summary{ Summary: pS }) + } + return +} + +func initCounters(m *sync.Map, counters []CounterDefinition) { + for _, c := range counters { + key, hash := flattenKey(c.Name, c.ConstLabels) + pC := prometheus.NewCounter(prometheus.CounterOpts{ + Name: key, + Help: c.Help, + ConstLabels: prometheusLabels(c.ConstLabels), + }) + m.Store(hash, &counter{ Counter: pC }) + } + return +} + var forbiddenChars = regexp.MustCompile("[ .=\\-/]") -func (p *PrometheusSink) flattenKey(parts []string, labels []metrics.Label) (string, string) { +func flattenKey(parts []string, labels []metrics.Label) (string, string) { key := strings.Join(parts, "_") key = forbiddenChars.ReplaceAllString(key, "_") @@ -151,7 +259,7 @@ func (p *PrometheusSink) SetGauge(parts []string, val float32) { } func (p *PrometheusSink) SetGaugeWithLabels(parts []string, val float32, labels []metrics.Label) { - key, hash := p.flattenKey(parts, labels) + key, hash := flattenKey(parts, labels) pg, ok := p.gauges.Load(hash) // The sync.Map underlying gauges stores pointers to our structs. If we need to make updates, @@ -161,10 +269,12 @@ func (p *PrometheusSink) SetGaugeWithLabels(parts []string, val float32, labels // so there's no issues there. It's possible for racy updates to occur to the updatedAt // value, but since we're always setting it to time.Now(), it doesn't really matter. if ok { - localGauge := *pg.(*PrometheusGauge) + localGauge := *pg.(*gauge) localGauge.Set(float64(val)) localGauge.updatedAt = time.Now() p.gauges.Store(hash, &localGauge) + + // The gauge does not exist, create the gauge and allow it to be deleted } else { g := prometheus.NewGauge(prometheus.GaugeOpts{ Name: key, @@ -172,8 +282,10 @@ func (p *PrometheusSink) SetGaugeWithLabels(parts []string, val float32, labels ConstLabels: prometheusLabels(labels), }) g.Set(float64(val)) - pg = &PrometheusGauge{ - g, time.Now(), + pg = &gauge{ + Gauge: g, + updatedAt: time.Now(), + canDelete: true, } p.gauges.Store(hash, pg) } @@ -184,14 +296,17 @@ func (p *PrometheusSink) AddSample(parts []string, val float32) { } func (p *PrometheusSink) AddSampleWithLabels(parts []string, val float32, labels []metrics.Label) { - key, hash := p.flattenKey(parts, labels) + key, hash := flattenKey(parts, labels) ps, ok := p.summaries.Load(hash) + // Does the summary already exist for this sample type? if ok { - localSummary := *ps.(*PrometheusSummary) + localSummary := *ps.(*summary) localSummary.Observe(float64(val)) localSummary.updatedAt = time.Now() p.summaries.Store(hash, &localSummary) + + // The summary does not exist, create the Summary and allow it to be deleted } else { s := prometheus.NewSummary(prometheus.SummaryOpts{ Name: key, @@ -201,8 +316,10 @@ func (p *PrometheusSink) AddSampleWithLabels(parts []string, val float32, labels Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, }) s.Observe(float64(val)) - ps = &PrometheusSummary{ - s, time.Now(), + ps = &summary{ + Summary: s, + updatedAt: time.Now(), + canDelete: true, } p.summaries.Store(hash, ps) } @@ -219,14 +336,17 @@ func (p *PrometheusSink) IncrCounter(parts []string, val float32) { } func (p *PrometheusSink) IncrCounterWithLabels(parts []string, val float32, labels []metrics.Label) { - key, hash := p.flattenKey(parts, labels) + key, hash := flattenKey(parts, labels) pc, ok := p.counters.Load(hash) + // Does the counter exist? if ok { - localCounter := *pc.(*PrometheusCounter) + localCounter := *pc.(*counter) localCounter.Add(float64(val)) localCounter.updatedAt = time.Now() p.counters.Store(hash, &localCounter) + + // The counter does not exist yet, create it and allow it to be deleted } else { c := prometheus.NewCounter(prometheus.CounterOpts{ Name: key, @@ -234,13 +354,17 @@ func (p *PrometheusSink) IncrCounterWithLabels(parts []string, val float32, labe ConstLabels: prometheusLabels(labels), }) c.Add(float64(val)) - pc = &PrometheusCounter{ - c, time.Now(), + pc = &counter{ + Counter: c, + updatedAt: time.Now(), + canDelete: true, } p.counters.Store(hash, pc) } } +// PrometheusPushSink wraps a normal prometheus sink and provides an address and facilities to export it to an address +// on an interval. type PrometheusPushSink struct { *PrometheusSink pusher *push.Pusher @@ -249,7 +373,8 @@ type PrometheusPushSink struct { stopChan chan struct{} } -func NewPrometheusPushSink(address string, pushIterval time.Duration, name string) (*PrometheusPushSink, error) { +// NewPrometheusPushSink creates a PrometheusPushSink by taking an address, interval, and destination name. +func NewPrometheusPushSink(address string, pushInterval time.Duration, name string) (*PrometheusPushSink, error) { promSink := &PrometheusSink{ gauges: sync.Map{}, summaries: sync.Map{}, @@ -263,7 +388,7 @@ func NewPrometheusPushSink(address string, pushIterval time.Duration, name strin promSink, pusher, address, - pushIterval, + pushInterval, make(chan struct{}), } diff --git a/prometheus/prometheus_test.go b/prometheus/prometheus_test.go index a13fe42..b359ecb 100644 --- a/prometheus/prometheus_test.go +++ b/prometheus/prometheus_test.go @@ -54,6 +54,59 @@ func TestNewPrometheusSink(t *testing.T) { } } +func TestDefinitions(t *testing.T) { + gaugeDef := GaugeDefinition{ + Name: []string{"my", "test", "gauge"}, + Help: "A gauge for testing? How helpful!", + } + summaryDef := SummaryDefinition{ + Name: []string{"my", "test", "summary"}, + Help: "A summary for testing? How helpful!", + } + counterDef := CounterDefinition{ + Name: []string{"my", "test", "summary"}, + Help: "A counter for testing? How helpful!", + } + + // PrometheusSink config w/ definitions for each metric type + cfg := PrometheusOpts{ + Expiration: 5 * time.Second, + GaugeDefinitions: append([]GaugeDefinition{}, gaugeDef), + SummaryDefinitions: append([]SummaryDefinition{}, summaryDef), + CounterDefinitions: append([]CounterDefinition{}, counterDef), + } + sink, err := NewPrometheusSinkFrom(cfg) + if err != nil { + t.Fatalf("err = #{err}, want nil") + } + + // We can't just len(x) where x is a sync.Map, so we range over the single item and assert the name in our metric + // definition matches the key we have for the map entry. Should fail if any metrics exist that aren't defined, or if + // the defined metrics don't exist. + sink.gauges.Range(func(key, value interface{}) bool { + name, _ := flattenKey(gaugeDef.Name, gaugeDef.ConstLabels) + if name != key { + t.Fatalf("expected my_test_gauge, got #{name}") + } + return true + }) + sink.summaries.Range(func(key, value interface{}) bool { + name, _ := flattenKey(summaryDef.Name, summaryDef.ConstLabels) + fmt.Printf("k: %+v, v: %+v", key, value) + if name != key { + t.Fatalf("expected my_test_summary, got #{name}") + } + return true + }) + sink.counters.Range(func(key, value interface{}) bool { + name, _ := flattenKey(counterDef.Name, counterDef.ConstLabels) + if name != key { + t.Fatalf("expected my_test_counter, got #{name}") + } + return true + }) +} + func MockGetHostname() string { return TestHostname }