/
sink.go
127 lines (105 loc) · 4.11 KB
/
sink.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package metrics
import (
"fmt"
"net/url"
)
// The MetricSink interface is used to transmit metrics information
// to an external system
type MetricSink interface {
// A Gauge should retain the last value it is set to
SetGauge(key []string, val float32)
SetGaugeWithLabels(key []string, val float32, labels []Label)
// Should emit a Key/Value pair for each call
EmitKey(key []string, val float32)
// Counters should accumulate values
IncrCounter(key []string, val float32)
IncrCounterWithLabels(key []string, val float32, labels []Label)
// Samples are for timing information, where quantiles are used
AddSample(key []string, val float32)
AddSampleWithLabels(key []string, val float32, labels []Label)
// Shutdown the metric sink, flush metrics to storage, and cleanup resources.
// Called immediately prior to application exit. Implementations must block
// until metrics are flushed to storage.
Shutdown()
}
// BlackholeSink is used to just blackhole messages
type BlackholeSink struct{}
func (*BlackholeSink) SetGauge(key []string, val float32) {}
func (*BlackholeSink) SetGaugeWithLabels(key []string, val float32, labels []Label) {}
func (*BlackholeSink) EmitKey(key []string, val float32) {}
func (*BlackholeSink) IncrCounter(key []string, val float32) {}
func (*BlackholeSink) IncrCounterWithLabels(key []string, val float32, labels []Label) {}
func (*BlackholeSink) AddSample(key []string, val float32) {}
func (*BlackholeSink) AddSampleWithLabels(key []string, val float32, labels []Label) {}
func (*BlackholeSink) Shutdown() {}
// FanoutSink is used to sink to fanout values to multiple sinks
type FanoutSink []MetricSink
func (fh FanoutSink) SetGauge(key []string, val float32) {
fh.SetGaugeWithLabels(key, val, nil)
}
func (fh FanoutSink) SetGaugeWithLabels(key []string, val float32, labels []Label) {
for _, s := range fh {
s.SetGaugeWithLabels(key, val, labels)
}
}
func (fh FanoutSink) EmitKey(key []string, val float32) {
for _, s := range fh {
s.EmitKey(key, val)
}
}
func (fh FanoutSink) IncrCounter(key []string, val float32) {
fh.IncrCounterWithLabels(key, val, nil)
}
func (fh FanoutSink) IncrCounterWithLabels(key []string, val float32, labels []Label) {
for _, s := range fh {
s.IncrCounterWithLabels(key, val, labels)
}
}
func (fh FanoutSink) AddSample(key []string, val float32) {
fh.AddSampleWithLabels(key, val, nil)
}
func (fh FanoutSink) AddSampleWithLabels(key []string, val float32, labels []Label) {
for _, s := range fh {
s.AddSampleWithLabels(key, val, labels)
}
}
func (fh FanoutSink) Shutdown() {
for _, s := range fh {
s.Shutdown()
}
}
// sinkURLFactoryFunc is an generic interface around the *SinkFromURL() function provided
// by each sink type
type sinkURLFactoryFunc func(*url.URL) (MetricSink, error)
// sinkRegistry supports the generic NewMetricSink function by mapping URL
// schemes to metric sink factory functions
var sinkRegistry = map[string]sinkURLFactoryFunc{
"statsd": NewStatsdSinkFromURL,
"statsite": NewStatsiteSinkFromURL,
"inmem": NewInmemSinkFromURL,
}
// NewMetricSinkFromURL allows a generic URL input to configure any of the
// supported sinks. The scheme of the URL identifies the type of the sink, the
// and query parameters are used to set options.
//
// "statsd://" - Initializes a StatsdSink. The host and port are passed through
// as the "addr" of the sink
//
// "statsite://" - Initializes a StatsiteSink. The host and port become the
// "addr" of the sink
//
// "inmem://" - Initializes an InmemSink. The host and port are ignored. The
// "interval" and "duration" query parameters must be specified with valid
// durations, see NewInmemSink for details.
func NewMetricSinkFromURL(urlStr string) (MetricSink, error) {
u, err := url.Parse(urlStr)
if err != nil {
return nil, err
}
sinkURLFactoryFunc := sinkRegistry[u.Scheme]
if sinkURLFactoryFunc == nil {
return nil, fmt.Errorf(
"cannot create metric sink, unrecognized sink name: %q", u.Scheme)
}
return sinkURLFactoryFunc(u)
}