/
prometheus.go
420 lines (371 loc) · 11.5 KB
/
prometheus.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
// +build go1.9
package prometheus
import (
"fmt"
"log"
"math"
"regexp"
"strings"
"sync"
"time"
"github.com/armon/go-metrics"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/push"
)
var (
// DefaultPrometheusOpts is the default set of options used when creating a
// PrometheusSink.
DefaultPrometheusOpts = PrometheusOpts{
Expiration: 60 * time.Second,
}
)
// PrometheusOpts is used to configure the Prometheus Sink
type PrometheusOpts struct {
// Expiration is the duration a metric is valid for, after which it will be
// untracked. If the value is zero, a metric is never expired.
Expiration time.Duration
Registerer prometheus.Registerer
// Gauges, Summaries, and Counters allow us to pre-declare metrics by giving their Name, Help, and ConstLabels to
// the PrometheusSink when it is created. Metrics declared in this way will be initialized at zero and will not be
// deleted when their expiry is reached.
// - Gauges and Summaries will be set to NaN when they expire.
// - Counters continue to Collect their last known value.
// Ex:
// PrometheusOpts{
// Expiration: 10 * time.Second,
// Gauges: []GaugeDefinition{
// {
// Name: []string{ "application", "component", "measurement"},
// Help: "application_component_measurement provides an example of how to declare static metrics",
// ConstLabels: []metrics.Label{ { Name: "my_label", Value: "does_not_change" }, },
// },
// },
// }
GaugeDefinitions []GaugeDefinition
SummaryDefinitions []SummaryDefinition
CounterDefinitions []CounterDefinition
}
type PrometheusSink struct {
// If these will ever be copied, they should be converted to *sync.Map values and initialized appropriately
gauges sync.Map
summaries sync.Map
counters sync.Map
expiration time.Duration
}
// GaugeDefinition can be provided to PrometheusOpts to declare a constant gauge that is not deleted on expiry.
type GaugeDefinition struct {
Name []string
ConstLabels []metrics.Label
Help string
}
type gauge struct {
prometheus.Gauge
updatedAt time.Time
// canDelete is set if the metric is created during runtime so we know it's ephemeral and can delete it on expiry.
canDelete bool
}
// SummaryDefinition can be provided to PrometheusOpts to declare a constant summary that is not deleted on expiry.
type SummaryDefinition struct {
Name []string
ConstLabels []metrics.Label
Help string
}
type summary struct {
prometheus.Summary
updatedAt time.Time
canDelete bool
}
// CounterDefinition can be provided to PrometheusOpts to declare a constant counter that is not deleted on expiry.
type CounterDefinition struct {
Name []string
ConstLabels []metrics.Label
Help string
}
type counter struct {
prometheus.Counter
updatedAt time.Time
canDelete bool
}
// NewPrometheusSink creates a new PrometheusSink using the default options.
func NewPrometheusSink() (*PrometheusSink, error) {
return NewPrometheusSinkFrom(DefaultPrometheusOpts)
}
// NewPrometheusSinkFrom creates a new PrometheusSink using the passed options.
func NewPrometheusSinkFrom(opts PrometheusOpts) (*PrometheusSink, error) {
sink := &PrometheusSink{
gauges: sync.Map{},
summaries: sync.Map{},
counters: sync.Map{},
expiration: opts.Expiration,
}
initGauges(&sink.gauges, opts.GaugeDefinitions)
initSummaries(&sink.summaries, opts.SummaryDefinitions)
initCounters(&sink.counters, opts.CounterDefinitions)
reg := opts.Registerer
if reg == nil {
reg = prometheus.DefaultRegisterer
}
return sink, reg.Register(sink)
}
// Describe is needed to meet the Collector interface.
func (p *PrometheusSink) Describe(c chan<- *prometheus.Desc) {
// We must emit some description otherwise an error is returned. This
// description isn't shown to the user!
prometheus.NewGauge(prometheus.GaugeOpts{Name: "Dummy", Help: "Dummy"}).Describe(c)
}
// Collect meets the collection interface and allows us to enforce our expiration
// logic to clean up ephemeral metrics if their value haven't been set for a
// duration exceeding our allowed expiration time.
func (p *PrometheusSink) Collect(c chan<- prometheus.Metric) {
expire := p.expiration != 0
now := time.Now()
p.gauges.Range(func(k, v interface{}) bool {
if v == nil {
return true
}
g := v.(*gauge)
lastUpdate := g.updatedAt
if expire && lastUpdate.Add(p.expiration).Before(now) {
if g.canDelete {
p.gauges.Delete(k)
return true
}
// We have not observed the gauge this interval so we don't know its value.
g.Set(math.NaN())
}
g.Collect(c)
return true
})
p.summaries.Range(func(k, v interface{}) bool {
if v == nil {
return true
}
s := v.(*summary)
lastUpdate := s.updatedAt
if expire && lastUpdate.Add(p.expiration).Before(now) {
if s.canDelete {
p.summaries.Delete(k)
return true
}
// We have observed nothing in this interval.
s.Observe(math.NaN())
}
s.Collect(c)
return true
})
p.counters.Range(func(k, v interface{}) bool {
if v == nil {
return true
}
count := v.(*counter)
lastUpdate := count.updatedAt
if expire && lastUpdate.Add(p.expiration).Before(now) {
if count.canDelete {
p.counters.Delete(k)
return true
}
// Counters remain at their previous value when not observed, so we do not set it to NaN.
}
count.Collect(c)
return true
})
}
func initGauges(m *sync.Map, gauges []GaugeDefinition) {
for _, g := range gauges {
key, hash := flattenKey(g.Name, g.ConstLabels)
pG := prometheus.NewGauge(prometheus.GaugeOpts{
Name: key,
Help: g.Help,
ConstLabels: prometheusLabels(g.ConstLabels),
})
m.Store(hash, &gauge{ Gauge: pG })
}
return
}
func initSummaries(m *sync.Map, summaries []SummaryDefinition) {
for _, s := range summaries {
key, hash := flattenKey(s.Name, s.ConstLabels)
pS := prometheus.NewSummary(prometheus.SummaryOpts{
Name: key,
Help: s.Help,
MaxAge: 10 * time.Second,
ConstLabels: prometheusLabels(s.ConstLabels),
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
})
m.Store(hash, &summary{ Summary: pS })
}
return
}
func initCounters(m *sync.Map, counters []CounterDefinition) {
for _, c := range counters {
key, hash := flattenKey(c.Name, c.ConstLabels)
pC := prometheus.NewCounter(prometheus.CounterOpts{
Name: key,
Help: c.Help,
ConstLabels: prometheusLabels(c.ConstLabels),
})
m.Store(hash, &counter{ Counter: pC })
}
return
}
var forbiddenChars = regexp.MustCompile("[ .=\\-/]")
func flattenKey(parts []string, labels []metrics.Label) (string, string) {
key := strings.Join(parts, "_")
key = forbiddenChars.ReplaceAllString(key, "_")
hash := key
for _, label := range labels {
hash += fmt.Sprintf(";%s=%s", label.Name, label.Value)
}
return key, hash
}
func prometheusLabels(labels []metrics.Label) prometheus.Labels {
l := make(prometheus.Labels)
for _, label := range labels {
l[label.Name] = label.Value
}
return l
}
func (p *PrometheusSink) SetGauge(parts []string, val float32) {
p.SetGaugeWithLabels(parts, val, nil)
}
func (p *PrometheusSink) SetGaugeWithLabels(parts []string, val float32, labels []metrics.Label) {
key, hash := flattenKey(parts, labels)
pg, ok := p.gauges.Load(hash)
// The sync.Map underlying gauges stores pointers to our structs. If we need to make updates,
// rather than modifying the underlying value directly, which would be racy, we make a local
// copy by dereferencing the pointer we get back, making the appropriate changes, and then
// storing a pointer to our local copy. The underlying Prometheus types are threadsafe,
// so there's no issues there. It's possible for racy updates to occur to the updatedAt
// value, but since we're always setting it to time.Now(), it doesn't really matter.
if ok {
localGauge := *pg.(*gauge)
localGauge.Set(float64(val))
localGauge.updatedAt = time.Now()
p.gauges.Store(hash, &localGauge)
// The gauge does not exist, create the gauge and allow it to be deleted
} else {
g := prometheus.NewGauge(prometheus.GaugeOpts{
Name: key,
Help: key,
ConstLabels: prometheusLabels(labels),
})
g.Set(float64(val))
pg = &gauge{
Gauge: g,
updatedAt: time.Now(),
canDelete: true,
}
p.gauges.Store(hash, pg)
}
}
func (p *PrometheusSink) AddSample(parts []string, val float32) {
p.AddSampleWithLabels(parts, val, nil)
}
func (p *PrometheusSink) AddSampleWithLabels(parts []string, val float32, labels []metrics.Label) {
key, hash := flattenKey(parts, labels)
ps, ok := p.summaries.Load(hash)
// Does the summary already exist for this sample type?
if ok {
localSummary := *ps.(*summary)
localSummary.Observe(float64(val))
localSummary.updatedAt = time.Now()
p.summaries.Store(hash, &localSummary)
// The summary does not exist, create the Summary and allow it to be deleted
} else {
s := prometheus.NewSummary(prometheus.SummaryOpts{
Name: key,
Help: key,
MaxAge: 10 * time.Second,
ConstLabels: prometheusLabels(labels),
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
})
s.Observe(float64(val))
ps = &summary{
Summary: s,
updatedAt: time.Now(),
canDelete: true,
}
p.summaries.Store(hash, ps)
}
}
// EmitKey is not implemented. Prometheus doesn’t offer a type for which an
// arbitrary number of values is retained, as Prometheus works with a pull
// model, rather than a push model.
func (p *PrometheusSink) EmitKey(key []string, val float32) {
}
func (p *PrometheusSink) IncrCounter(parts []string, val float32) {
p.IncrCounterWithLabels(parts, val, nil)
}
func (p *PrometheusSink) IncrCounterWithLabels(parts []string, val float32, labels []metrics.Label) {
key, hash := flattenKey(parts, labels)
pc, ok := p.counters.Load(hash)
// Does the counter exist?
if ok {
localCounter := *pc.(*counter)
localCounter.Add(float64(val))
localCounter.updatedAt = time.Now()
p.counters.Store(hash, &localCounter)
// The counter does not exist yet, create it and allow it to be deleted
} else {
c := prometheus.NewCounter(prometheus.CounterOpts{
Name: key,
Help: key,
ConstLabels: prometheusLabels(labels),
})
c.Add(float64(val))
pc = &counter{
Counter: c,
updatedAt: time.Now(),
canDelete: true,
}
p.counters.Store(hash, pc)
}
}
// PrometheusPushSink wraps a normal prometheus sink and provides an address and facilities to export it to an address
// on an interval.
type PrometheusPushSink struct {
*PrometheusSink
pusher *push.Pusher
address string
pushInterval time.Duration
stopChan chan struct{}
}
// NewPrometheusPushSink creates a PrometheusPushSink by taking an address, interval, and destination name.
func NewPrometheusPushSink(address string, pushInterval time.Duration, name string) (*PrometheusPushSink, error) {
promSink := &PrometheusSink{
gauges: sync.Map{},
summaries: sync.Map{},
counters: sync.Map{},
expiration: 60 * time.Second,
}
pusher := push.New(address, name).Collector(promSink)
sink := &PrometheusPushSink{
promSink,
pusher,
address,
pushInterval,
make(chan struct{}),
}
sink.flushMetrics()
return sink, nil
}
func (s *PrometheusPushSink) flushMetrics() {
ticker := time.NewTicker(s.pushInterval)
go func() {
for {
select {
case <-ticker.C:
err := s.pusher.Push()
if err != nil {
log.Printf("[ERR] Error pushing to Prometheus! Err: %s", err)
}
case <-s.stopChan:
ticker.Stop()
return
}
}
}()
}
func (s *PrometheusPushSink) Shutdown() {
close(s.stopChan)
}