/
telemetry.go
147 lines (127 loc) · 5.7 KB
/
telemetry.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
package statsd
import (
"fmt"
"sync"
"time"
)
/*
TelemetryInterval is the interval at which telemetry will be sent by the client.
*/
const TelemetryInterval = 10 * time.Second
/*
clientTelemetryTag is a tag identifying this specific client.
*/
var clientTelemetryTag = "client:go"
/*
clientVersionTelemetryTag is a tag identifying this specific client version.
*/
var clientVersionTelemetryTag = "client_version:4.2.0"
type telemetryClient struct {
c *Client
tags []string
tagsByType map[metricType][]string
sender *sender
worker *worker
devMode bool
}
func newTelemetryClient(c *Client, transport string, devMode bool) *telemetryClient {
t := &telemetryClient{
c: c,
tags: append(c.Tags, clientTelemetryTag, clientVersionTelemetryTag, "client_transport:"+transport),
tagsByType: map[metricType][]string{},
devMode: devMode,
}
if devMode {
t.tagsByType[gauge] = append(append([]string{}, t.tags...), "metrics_type:gauge")
t.tagsByType[count] = append(append([]string{}, t.tags...), "metrics_type:count")
t.tagsByType[set] = append(append([]string{}, t.tags...), "metrics_type:set")
t.tagsByType[timing] = append(append([]string{}, t.tags...), "metrics_type:timing")
t.tagsByType[histogram] = append(append([]string{}, t.tags...), "metrics_type:histogram")
t.tagsByType[distribution] = append(append([]string{}, t.tags...), "metrics_type:distribution")
}
return t
}
func newTelemetryClientWithCustomAddr(c *Client, transport string, devMode bool, telemetryAddr string, pool *bufferPool) (*telemetryClient, error) {
telemetryWriter, _, err := resolveAddr(telemetryAddr)
if err != nil {
return nil, fmt.Errorf("Could not resolve telemetry address: %v", err)
}
t := newTelemetryClient(c, transport, devMode)
// Creating a custom sender/worker with 1 worker in mutex mode for the
// telemetry that share the same bufferPool.
// FIXME due to performance pitfall, we're always using UDP defaults
// even for UDS.
t.sender = newSender(telemetryWriter, DefaultUDPBufferPoolSize, pool)
t.worker = newWorker(pool, t.sender)
return t, nil
}
func (t *telemetryClient) run(wg *sync.WaitGroup, stop chan struct{}) {
wg.Add(1)
go func() {
defer wg.Done()
ticker := time.NewTicker(TelemetryInterval)
for {
select {
case <-ticker.C:
t.sendTelemetry()
case <-stop:
ticker.Stop()
if t.sender != nil {
t.sender.close()
}
return
}
}
}()
}
func (t *telemetryClient) sendTelemetry() {
for _, m := range t.flush() {
if t.worker != nil {
t.worker.processMetric(m)
} else {
t.c.send(m)
}
}
if t.worker != nil {
t.worker.flush()
}
}
// flushTelemetry returns Telemetry metrics to be flushed. It's its own function to ease testing.
func (t *telemetryClient) flush() []metric {
m := []metric{}
// same as Count but without global namespace
telemetryCount := func(name string, value int64, tags []string) {
m = append(m, metric{metricType: count, name: name, ivalue: value, tags: tags, rate: 1})
}
clientMetrics := t.c.FlushTelemetryMetrics()
telemetryCount("datadog.dogstatsd.client.metrics", int64(clientMetrics.TotalMetrics), t.tags)
if t.devMode {
telemetryCount("datadog.dogstatsd.client.metrics_by_type", int64(clientMetrics.TotalMetricsGauge), t.tagsByType[gauge])
telemetryCount("datadog.dogstatsd.client.metrics_by_type", int64(clientMetrics.TotalMetricsCount), t.tagsByType[count])
telemetryCount("datadog.dogstatsd.client.metrics_by_type", int64(clientMetrics.TotalMetricsHistogram), t.tagsByType[histogram])
telemetryCount("datadog.dogstatsd.client.metrics_by_type", int64(clientMetrics.TotalMetricsDistribution), t.tagsByType[distribution])
telemetryCount("datadog.dogstatsd.client.metrics_by_type", int64(clientMetrics.TotalMetricsSet), t.tagsByType[set])
telemetryCount("datadog.dogstatsd.client.metrics_by_type", int64(clientMetrics.TotalMetricsTiming), t.tagsByType[timing])
}
telemetryCount("datadog.dogstatsd.client.events", int64(clientMetrics.TotalEvents), t.tags)
telemetryCount("datadog.dogstatsd.client.service_checks", int64(clientMetrics.TotalServiceChecks), t.tags)
telemetryCount("datadog.dogstatsd.client.metric_dropped_on_receive", int64(clientMetrics.TotalDroppedOnReceive), t.tags)
senderMetrics := t.c.sender.flushTelemetryMetrics()
telemetryCount("datadog.dogstatsd.client.packets_sent", int64(senderMetrics.TotalSentPayloads), t.tags)
telemetryCount("datadog.dogstatsd.client.bytes_sent", int64(senderMetrics.TotalSentBytes), t.tags)
telemetryCount("datadog.dogstatsd.client.packets_dropped", int64(senderMetrics.TotalDroppedPayloads), t.tags)
telemetryCount("datadog.dogstatsd.client.bytes_dropped", int64(senderMetrics.TotalDroppedBytes), t.tags)
telemetryCount("datadog.dogstatsd.client.packets_dropped_queue", int64(senderMetrics.TotalDroppedPayloadsQueueFull), t.tags)
telemetryCount("datadog.dogstatsd.client.bytes_dropped_queue", int64(senderMetrics.TotalDroppedBytesQueueFull), t.tags)
telemetryCount("datadog.dogstatsd.client.packets_dropped_writer", int64(senderMetrics.TotalDroppedPayloadsWriter), t.tags)
telemetryCount("datadog.dogstatsd.client.bytes_dropped_writer", int64(senderMetrics.TotalDroppedBytesWriter), t.tags)
if aggMetrics := t.c.agg.flushTelemetryMetrics(); aggMetrics != nil {
telemetryCount("datadog.dogstatsd.client.aggregated_context", int64(aggMetrics.nbContext), t.tags)
if t.devMode {
telemetryCount("datadog.dogstatsd.client.aggregated_context_by_type", int64(aggMetrics.nbContextGauge), t.tagsByType[gauge])
telemetryCount("datadog.dogstatsd.client.aggregated_context_by_type", int64(aggMetrics.nbContextSet), t.tagsByType[set])
telemetryCount("datadog.dogstatsd.client.aggregated_context_by_type", int64(aggMetrics.nbContextCount), t.tagsByType[count])
}
}
return m
}