Skip to content

Commit

Permalink
Merge pull request #120 from mkcp/mkcp/add-static-metrics
Browse files Browse the repository at this point in the history
Add {Gauge,Summary,Counter}Definitions to PrometheusOpts to define non-expiring metrics
  • Loading branch information
cgbaker committed Nov 4, 2020
2 parents 2bc64eb + af95c47 commit 6fd5a4d
Show file tree
Hide file tree
Showing 2 changed files with 211 additions and 33 deletions.
191 changes: 158 additions & 33 deletions prometheus/prometheus.go
Expand Up @@ -5,6 +5,7 @@ package prometheus
import (
"fmt"
"log"
"math"
"regexp"
"strings"
"sync"
Expand All @@ -29,6 +30,26 @@ type PrometheusOpts struct {
// untracked. If the value is zero, a metric is never expired.
Expiration time.Duration
Registerer prometheus.Registerer

// Gauges, Summaries, and Counters allow us to pre-declare metrics by giving their Name, Help, and ConstLabels to
// the PrometheusSink when it is created. Metrics declared in this way will be initialized at zero and will not be
// deleted when their expiry is reached.
// - Gauges and Summaries will be set to NaN when they expire.
// - Counters continue to Collect their last known value.
// Ex:
// PrometheusOpts{
// Expiration: 10 * time.Second,
// Gauges: []GaugeDefinition{
// {
// Name: []string{ "application", "component", "measurement"},
// Help: "application_component_measurement provides an example of how to declare static metrics",
// ConstLabels: []metrics.Label{ { Name: "my_label", Value: "does_not_change" }, },
// },
// },
// }
GaugeDefinitions []GaugeDefinition
SummaryDefinitions []SummaryDefinition
CounterDefinitions []CounterDefinition
}

type PrometheusSink struct {
Expand All @@ -39,19 +60,44 @@ type PrometheusSink struct {
expiration time.Duration
}

type PrometheusGauge struct {
// GaugeDefinition can be provided to PrometheusOpts to declare a constant gauge that is not deleted on expiry.
type GaugeDefinition struct {
Name []string
ConstLabels []metrics.Label
Help string
}

type gauge struct {
prometheus.Gauge
updatedAt time.Time
// canDelete is set if the metric is created during runtime so we know it's ephemeral and can delete it on expiry.
canDelete bool
}

// SummaryDefinition can be provided to PrometheusOpts to declare a constant summary that is not deleted on expiry.
type SummaryDefinition struct {
Name []string
ConstLabels []metrics.Label
Help string
}

type PrometheusSummary struct {
type summary struct {
prometheus.Summary
updatedAt time.Time
canDelete bool
}

// CounterDefinition can be provided to PrometheusOpts to declare a constant counter that is not deleted on expiry.
type CounterDefinition struct {
Name []string
ConstLabels []metrics.Label
Help string
}

type PrometheusCounter struct {
type counter struct {
prometheus.Counter
updatedAt time.Time
canDelete bool
}

// NewPrometheusSink creates a new PrometheusSink using the default options.
Expand All @@ -68,6 +114,10 @@ func NewPrometheusSinkFrom(opts PrometheusOpts) (*PrometheusSink, error) {
expiration: opts.Expiration,
}

initGauges(&sink.gauges, opts.GaugeDefinitions)
initSummaries(&sink.summaries, opts.SummaryDefinitions)
initCounters(&sink.counters, opts.CounterDefinitions)

reg := opts.Registerer
if reg == nil {
reg = prometheus.DefaultRegisterer
Expand All @@ -90,43 +140,101 @@ func (p *PrometheusSink) Collect(c chan<- prometheus.Metric) {
expire := p.expiration != 0
now := time.Now()
p.gauges.Range(func(k, v interface{}) bool {
if v != nil {
lastUpdate := v.(*PrometheusGauge).updatedAt
if expire && lastUpdate.Add(p.expiration).Before(now) {
if v == nil {
return true
}
g := v.(*gauge)
lastUpdate := g.updatedAt
if expire && lastUpdate.Add(p.expiration).Before(now) {
if g.canDelete {
p.gauges.Delete(k)
} else {
v.(*PrometheusGauge).Collect(c)
return true
}
// We have not observed the gauge this interval so we don't know its value.
g.Set(math.NaN())
}
g.Collect(c)
return true
})
p.summaries.Range(func(k, v interface{}) bool {
if v != nil {
lastUpdate := v.(*PrometheusSummary).updatedAt
if expire && lastUpdate.Add(p.expiration).Before(now) {
if v == nil {
return true
}
s := v.(*summary)
lastUpdate := s.updatedAt
if expire && lastUpdate.Add(p.expiration).Before(now) {
if s.canDelete {
p.summaries.Delete(k)
} else {
v.(*PrometheusSummary).Collect(c)
return true
}
// We have observed nothing in this interval.
s.Observe(math.NaN())
}
s.Collect(c)
return true
})
p.counters.Range(func(k, v interface{}) bool {
if v != nil {
lastUpdate := v.(*PrometheusCounter).updatedAt
if expire && lastUpdate.Add(p.expiration).Before(now) {
if v == nil {
return true
}
count := v.(*counter)
lastUpdate := count.updatedAt
if expire && lastUpdate.Add(p.expiration).Before(now) {
if count.canDelete {
p.counters.Delete(k)
} else {
v.(*PrometheusCounter).Collect(c)
return true
}
// Counters remain at their previous value when not observed, so we do not set it to NaN.
}
count.Collect(c)
return true
})
}

func initGauges(m *sync.Map, gauges []GaugeDefinition) {
for _, g := range gauges {
key, hash := flattenKey(g.Name, g.ConstLabels)
pG := prometheus.NewGauge(prometheus.GaugeOpts{
Name: key,
Help: g.Help,
ConstLabels: prometheusLabels(g.ConstLabels),
})
m.Store(hash, &gauge{ Gauge: pG })
}
return
}

func initSummaries(m *sync.Map, summaries []SummaryDefinition) {
for _, s := range summaries {
key, hash := flattenKey(s.Name, s.ConstLabels)
pS := prometheus.NewSummary(prometheus.SummaryOpts{
Name: key,
Help: s.Help,
MaxAge: 10 * time.Second,
ConstLabels: prometheusLabels(s.ConstLabels),
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
})
m.Store(hash, &summary{ Summary: pS })
}
return
}

func initCounters(m *sync.Map, counters []CounterDefinition) {
for _, c := range counters {
key, hash := flattenKey(c.Name, c.ConstLabels)
pC := prometheus.NewCounter(prometheus.CounterOpts{
Name: key,
Help: c.Help,
ConstLabels: prometheusLabels(c.ConstLabels),
})
m.Store(hash, &counter{ Counter: pC })
}
return
}

var forbiddenChars = regexp.MustCompile("[ .=\\-/]")

func (p *PrometheusSink) flattenKey(parts []string, labels []metrics.Label) (string, string) {
func flattenKey(parts []string, labels []metrics.Label) (string, string) {
key := strings.Join(parts, "_")
key = forbiddenChars.ReplaceAllString(key, "_")

Expand All @@ -151,7 +259,7 @@ func (p *PrometheusSink) SetGauge(parts []string, val float32) {
}

func (p *PrometheusSink) SetGaugeWithLabels(parts []string, val float32, labels []metrics.Label) {
key, hash := p.flattenKey(parts, labels)
key, hash := flattenKey(parts, labels)
pg, ok := p.gauges.Load(hash)

// The sync.Map underlying gauges stores pointers to our structs. If we need to make updates,
Expand All @@ -161,19 +269,23 @@ func (p *PrometheusSink) SetGaugeWithLabels(parts []string, val float32, labels
// so there's no issues there. It's possible for racy updates to occur to the updatedAt
// value, but since we're always setting it to time.Now(), it doesn't really matter.
if ok {
localGauge := *pg.(*PrometheusGauge)
localGauge := *pg.(*gauge)
localGauge.Set(float64(val))
localGauge.updatedAt = time.Now()
p.gauges.Store(hash, &localGauge)

// The gauge does not exist, create the gauge and allow it to be deleted
} else {
g := prometheus.NewGauge(prometheus.GaugeOpts{
Name: key,
Help: key,
ConstLabels: prometheusLabels(labels),
})
g.Set(float64(val))
pg = &PrometheusGauge{
g, time.Now(),
pg = &gauge{
Gauge: g,
updatedAt: time.Now(),
canDelete: true,
}
p.gauges.Store(hash, pg)
}
Expand All @@ -184,14 +296,17 @@ func (p *PrometheusSink) AddSample(parts []string, val float32) {
}

func (p *PrometheusSink) AddSampleWithLabels(parts []string, val float32, labels []metrics.Label) {
key, hash := p.flattenKey(parts, labels)
key, hash := flattenKey(parts, labels)
ps, ok := p.summaries.Load(hash)

// Does the summary already exist for this sample type?
if ok {
localSummary := *ps.(*PrometheusSummary)
localSummary := *ps.(*summary)
localSummary.Observe(float64(val))
localSummary.updatedAt = time.Now()
p.summaries.Store(hash, &localSummary)

// The summary does not exist, create the Summary and allow it to be deleted
} else {
s := prometheus.NewSummary(prometheus.SummaryOpts{
Name: key,
Expand All @@ -201,8 +316,10 @@ func (p *PrometheusSink) AddSampleWithLabels(parts []string, val float32, labels
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
})
s.Observe(float64(val))
ps = &PrometheusSummary{
s, time.Now(),
ps = &summary{
Summary: s,
updatedAt: time.Now(),
canDelete: true,
}
p.summaries.Store(hash, ps)
}
Expand All @@ -219,28 +336,35 @@ func (p *PrometheusSink) IncrCounter(parts []string, val float32) {
}

func (p *PrometheusSink) IncrCounterWithLabels(parts []string, val float32, labels []metrics.Label) {
key, hash := p.flattenKey(parts, labels)
key, hash := flattenKey(parts, labels)
pc, ok := p.counters.Load(hash)

// Does the counter exist?
if ok {
localCounter := *pc.(*PrometheusCounter)
localCounter := *pc.(*counter)
localCounter.Add(float64(val))
localCounter.updatedAt = time.Now()
p.counters.Store(hash, &localCounter)

// The counter does not exist yet, create it and allow it to be deleted
} else {
c := prometheus.NewCounter(prometheus.CounterOpts{
Name: key,
Help: key,
ConstLabels: prometheusLabels(labels),
})
c.Add(float64(val))
pc = &PrometheusCounter{
c, time.Now(),
pc = &counter{
Counter: c,
updatedAt: time.Now(),
canDelete: true,
}
p.counters.Store(hash, pc)
}
}

// PrometheusPushSink wraps a normal prometheus sink and provides an address and facilities to export it to an address
// on an interval.
type PrometheusPushSink struct {
*PrometheusSink
pusher *push.Pusher
Expand All @@ -249,7 +373,8 @@ type PrometheusPushSink struct {
stopChan chan struct{}
}

func NewPrometheusPushSink(address string, pushIterval time.Duration, name string) (*PrometheusPushSink, error) {
// NewPrometheusPushSink creates a PrometheusPushSink by taking an address, interval, and destination name.
func NewPrometheusPushSink(address string, pushInterval time.Duration, name string) (*PrometheusPushSink, error) {
promSink := &PrometheusSink{
gauges: sync.Map{},
summaries: sync.Map{},
Expand All @@ -263,7 +388,7 @@ func NewPrometheusPushSink(address string, pushIterval time.Duration, name strin
promSink,
pusher,
address,
pushIterval,
pushInterval,
make(chan struct{}),
}

Expand Down
53 changes: 53 additions & 0 deletions prometheus/prometheus_test.go
Expand Up @@ -54,6 +54,59 @@ func TestNewPrometheusSink(t *testing.T) {
}
}

func TestDefinitions(t *testing.T) {
gaugeDef := GaugeDefinition{
Name: []string{"my", "test", "gauge"},
Help: "A gauge for testing? How helpful!",
}
summaryDef := SummaryDefinition{
Name: []string{"my", "test", "summary"},
Help: "A summary for testing? How helpful!",
}
counterDef := CounterDefinition{
Name: []string{"my", "test", "summary"},
Help: "A counter for testing? How helpful!",
}

// PrometheusSink config w/ definitions for each metric type
cfg := PrometheusOpts{
Expiration: 5 * time.Second,
GaugeDefinitions: append([]GaugeDefinition{}, gaugeDef),
SummaryDefinitions: append([]SummaryDefinition{}, summaryDef),
CounterDefinitions: append([]CounterDefinition{}, counterDef),
}
sink, err := NewPrometheusSinkFrom(cfg)
if err != nil {
t.Fatalf("err = #{err}, want nil")
}

// We can't just len(x) where x is a sync.Map, so we range over the single item and assert the name in our metric
// definition matches the key we have for the map entry. Should fail if any metrics exist that aren't defined, or if
// the defined metrics don't exist.
sink.gauges.Range(func(key, value interface{}) bool {
name, _ := flattenKey(gaugeDef.Name, gaugeDef.ConstLabels)
if name != key {
t.Fatalf("expected my_test_gauge, got #{name}")
}
return true
})
sink.summaries.Range(func(key, value interface{}) bool {
name, _ := flattenKey(summaryDef.Name, summaryDef.ConstLabels)
fmt.Printf("k: %+v, v: %+v", key, value)
if name != key {
t.Fatalf("expected my_test_summary, got #{name}")
}
return true
})
sink.counters.Range(func(key, value interface{}) bool {
name, _ := flattenKey(counterDef.Name, counterDef.ConstLabels)
if name != key {
t.Fatalf("expected my_test_counter, got #{name}")
}
return true
})
}

func MockGetHostname() string {
return TestHostname
}
Expand Down

0 comments on commit 6fd5a4d

Please sign in to comment.