Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add {Gauge,Summary,Counter}Definitions to PrometheusOpts to define non-expiring metrics #120

Merged
merged 9 commits into from Nov 4, 2020
194 changes: 161 additions & 33 deletions prometheus/prometheus.go
Expand Up @@ -5,6 +5,7 @@ package prometheus
import (
"fmt"
"log"
"math"
"regexp"
"strings"
"sync"
Expand All @@ -29,6 +30,26 @@ type PrometheusOpts struct {
// untracked. If the value is zero, a metric is never expired.
Expiration time.Duration
Registerer prometheus.Registerer

// Gauges, Summaries, and Counters allow us to pre-declare metrics by giving their Name, Help, and ConstLabels to
// the PrometheusSink when it is created. Metrics declared in this way will be initialized at zero and will not be
// deleted when their expiry is reached.
// - Gauges and Summaries will be set to NaN when they expire.
// - Counters continue to Collect their last known value.
// Ex:
// PrometheusOpts{
// Expiration: 10 * time.Second,
// Gauges: []GaugeDefinition{
// {
// Name: []string{ "application", "component", "measurement"},
// Help: "application_component_measurement provides an example of how to declare static metrics",
// ConstLabels: []metrics.Label{ { Name: "my_label", Value: "does_not_change" }, },
// },
// },
// }
GaugeDefinitions []GaugeDefinition
SummaryDefinitions []SummaryDefinition
CounterDefinitions []CounterDefinition
}

type PrometheusSink struct {
Expand All @@ -39,19 +60,44 @@ type PrometheusSink struct {
expiration time.Duration
}

type PrometheusGauge struct {
// GaugeDefinition can be provided to PrometheusOpts to declare a constant gauge that is not deleted on expiry.
type GaugeDefinition struct {
Name []string
ConstLabels []metrics.Label
Help string
}

type gauge struct {
prometheus.Gauge
updatedAt time.Time
// canDelete is set if the metric is created during runtime so we know it's ephemeral and can delete it on expiry.
canDelete bool
}

// SummaryDefinition can be provided to PrometheusOpts to declare a constant summary that is not deleted on expiry.
type SummaryDefinition struct {
Name []string
ConstLabels []metrics.Label
Help string
}

type PrometheusSummary struct {
type summary struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these classes were exported before (even though they didn't need to be), but i don't find any indication that they're used anywhere, so 👍

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah we looked over it and it seemed pretty safe to unexport these because they're only used internally.

prometheus.Summary
updatedAt time.Time
canDelete bool
}

// CounterDefinition can be provided to PrometheusOpts to declare a constant counter that is not deleted on expiry.
type CounterDefinition struct {
Name []string
ConstLabels []metrics.Label
Help string
}

type PrometheusCounter struct {
type counter struct {
prometheus.Counter
updatedAt time.Time
canDelete bool
}

// NewPrometheusSink creates a new PrometheusSink using the default options.
Expand All @@ -68,6 +114,10 @@ func NewPrometheusSinkFrom(opts PrometheusOpts) (*PrometheusSink, error) {
expiration: opts.Expiration,
}

initGauges(&sink.gauges, opts.GaugeDefinitions)
initSummaries(&sink.summaries, opts.SummaryDefinitions)
initCounters(&sink.counters, opts.CounterDefinitions)

reg := opts.Registerer
if reg == nil {
reg = prometheus.DefaultRegisterer
Expand All @@ -90,43 +140,104 @@ func (p *PrometheusSink) Collect(c chan<- prometheus.Metric) {
expire := p.expiration != 0
now := time.Now()
p.gauges.Range(func(k, v interface{}) bool {
if v != nil {
lastUpdate := v.(*PrometheusGauge).updatedAt
if expire && lastUpdate.Add(p.expiration).Before(now) {
if v == nil {
return true
}
g := v.(*gauge)
lastUpdate := g.updatedAt
if expire && lastUpdate.Add(p.expiration).Before(now) {
if g.canDelete {
p.gauges.Delete(k)
} else {
v.(*PrometheusGauge).Collect(c)
return true
}
// We have not observed the gauge this interval so we don't know its value.
g.Set(math.NaN())
}
g.Collect(c)
return true
})
p.summaries.Range(func(k, v interface{}) bool {
if v != nil {
lastUpdate := v.(*PrometheusSummary).updatedAt
if expire && lastUpdate.Add(p.expiration).Before(now) {
if v == nil {
return true
}
s := v.(*summary)
lastUpdate := s.updatedAt
if expire && lastUpdate.Add(p.expiration).Before(now) {
if s.canDelete {
p.summaries.Delete(k)
} else {
v.(*PrometheusSummary).Collect(c)
return true
}
// We have observed nothing in this interval.
s.Observe(math.NaN())
}
s.Collect(c)
return true
})
p.counters.Range(func(k, v interface{}) bool {
if v != nil {
lastUpdate := v.(*PrometheusCounter).updatedAt
if expire && lastUpdate.Add(p.expiration).Before(now) {
if v == nil {
return true
}
count := v.(*counter)
lastUpdate := count.updatedAt
if expire && lastUpdate.Add(p.expiration).Before(now) {
if count.canDelete {
p.counters.Delete(k)
} else {
v.(*PrometheusCounter).Collect(c)
return true
}
// Counters remain at their previous value when not observed, so we do not set it to NaN.
}
count.Collect(c)
return true
})
}

func initGauges(m *sync.Map, gauges []GaugeDefinition) {
for _, g := range gauges {
key, hash := flattenKey(g.Name, g.ConstLabels)
pG := prometheus.NewGauge(prometheus.GaugeOpts{
Name: key,
Help: g.Help,
ConstLabels: prometheusLabels(g.ConstLabels),
})
pG.Set(float64(0)) // Initialize at zero
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should gauges and summaries (which haven't been observed yet) be initialized to zero or NaN (as they do after expiration)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is a good question - the initialization part here is a little redundant for counters and gauges because they start at zero in the go prometheus lib anyway. However, initializing summaries to 0 does create an artifact when the process starts and before they're emitted, where the line sits on the 0 like it's a counter or gauge. It's going to be better to delete all of these inits and just use the go prometheus client's defaults.

m.Store(hash, &gauge{ Gauge: pG })
}
return
}

func initSummaries(m *sync.Map, summaries []SummaryDefinition) {
for _, s := range summaries {
key, hash := flattenKey(s.Name, s.ConstLabels)
pS := prometheus.NewSummary(prometheus.SummaryOpts{
Name: key,
Help: s.Help,
MaxAge: 10 * time.Second,
ConstLabels: prometheusLabels(s.ConstLabels),
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
})
pS.Observe(float64(0)) // Initialize at zero
m.Store(hash, &summary{ Summary: pS })
}
return
}

func initCounters(m *sync.Map, counters []CounterDefinition) {
for _, c := range counters {
key, hash := flattenKey(c.Name, c.ConstLabels)
pC := prometheus.NewCounter(prometheus.CounterOpts{
Name: key,
Help: c.Help,
ConstLabels: prometheusLabels(c.ConstLabels),
})
pC.Add(float64(0)) // Initialize at zero
m.Store(hash, &counter{ Counter: pC })
}
return
}

var forbiddenChars = regexp.MustCompile("[ .=\\-/]")

func (p *PrometheusSink) flattenKey(parts []string, labels []metrics.Label) (string, string) {
func flattenKey(parts []string, labels []metrics.Label) (string, string) {
key := strings.Join(parts, "_")
key = forbiddenChars.ReplaceAllString(key, "_")

Expand All @@ -151,7 +262,7 @@ func (p *PrometheusSink) SetGauge(parts []string, val float32) {
}

func (p *PrometheusSink) SetGaugeWithLabels(parts []string, val float32, labels []metrics.Label) {
key, hash := p.flattenKey(parts, labels)
key, hash := flattenKey(parts, labels)
pg, ok := p.gauges.Load(hash)

// The sync.Map underlying gauges stores pointers to our structs. If we need to make updates,
Expand All @@ -161,19 +272,23 @@ func (p *PrometheusSink) SetGaugeWithLabels(parts []string, val float32, labels
// so there's no issues there. It's possible for racy updates to occur to the updatedAt
// value, but since we're always setting it to time.Now(), it doesn't really matter.
if ok {
localGauge := *pg.(*PrometheusGauge)
localGauge := *pg.(*gauge)
localGauge.Set(float64(val))
localGauge.updatedAt = time.Now()
p.gauges.Store(hash, &localGauge)

// The gauge does not exist, create the gauge and allow it to be deleted
} else {
g := prometheus.NewGauge(prometheus.GaugeOpts{
Name: key,
Help: key,
ConstLabels: prometheusLabels(labels),
})
g.Set(float64(val))
pg = &PrometheusGauge{
g, time.Now(),
pg = &gauge{
Gauge: g,
updatedAt: time.Now(),
canDelete: true,
}
p.gauges.Store(hash, pg)
}
Expand All @@ -184,14 +299,17 @@ func (p *PrometheusSink) AddSample(parts []string, val float32) {
}

func (p *PrometheusSink) AddSampleWithLabels(parts []string, val float32, labels []metrics.Label) {
key, hash := p.flattenKey(parts, labels)
key, hash := flattenKey(parts, labels)
ps, ok := p.summaries.Load(hash)

// Does the summary already exist for this sample type?
if ok {
localSummary := *ps.(*PrometheusSummary)
localSummary := *ps.(*summary)
localSummary.Observe(float64(val))
localSummary.updatedAt = time.Now()
p.summaries.Store(hash, &localSummary)

// The summary does not exist, create the Summary and allow it to be deleted
} else {
s := prometheus.NewSummary(prometheus.SummaryOpts{
Name: key,
Expand All @@ -201,8 +319,10 @@ func (p *PrometheusSink) AddSampleWithLabels(parts []string, val float32, labels
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
})
s.Observe(float64(val))
ps = &PrometheusSummary{
s, time.Now(),
ps = &summary{
Summary: s,
updatedAt: time.Now(),
canDelete: true,
}
p.summaries.Store(hash, ps)
}
Expand All @@ -219,28 +339,35 @@ func (p *PrometheusSink) IncrCounter(parts []string, val float32) {
}

func (p *PrometheusSink) IncrCounterWithLabels(parts []string, val float32, labels []metrics.Label) {
key, hash := p.flattenKey(parts, labels)
key, hash := flattenKey(parts, labels)
pc, ok := p.counters.Load(hash)

// Does the counter exist?
if ok {
localCounter := *pc.(*PrometheusCounter)
localCounter := *pc.(*counter)
localCounter.Add(float64(val))
localCounter.updatedAt = time.Now()
p.counters.Store(hash, &localCounter)

// The counter does not exist yet, create it and allow it to be deleted
} else {
c := prometheus.NewCounter(prometheus.CounterOpts{
Name: key,
Help: key,
ConstLabels: prometheusLabels(labels),
})
c.Add(float64(val))
pc = &PrometheusCounter{
c, time.Now(),
pc = &counter{
Counter: c,
updatedAt: time.Now(),
canDelete: true,
}
p.counters.Store(hash, pc)
}
}

// PrometheusPushSink wraps a normal prometheus sink and provides an address and facilities to export it to an address
// on an interval.
type PrometheusPushSink struct {
*PrometheusSink
pusher *push.Pusher
Expand All @@ -249,7 +376,8 @@ type PrometheusPushSink struct {
stopChan chan struct{}
}

func NewPrometheusPushSink(address string, pushIterval time.Duration, name string) (*PrometheusPushSink, error) {
// NewPrometheusPushSink creates a PrometheusPushSink by taking an address, interval, and destination name.
func NewPrometheusPushSink(address string, pushInterval time.Duration, name string) (*PrometheusPushSink, error) {
promSink := &PrometheusSink{
gauges: sync.Map{},
summaries: sync.Map{},
Expand All @@ -263,7 +391,7 @@ func NewPrometheusPushSink(address string, pushIterval time.Duration, name strin
promSink,
pusher,
address,
pushIterval,
pushInterval,
make(chan struct{}),
}

Expand Down