forked from DataDog/datadog-go
/
worker.go
124 lines (110 loc) · 2.98 KB
/
worker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package statsd
import (
"math/rand"
"sync"
"time"
)
type worker struct {
pool *bufferPool
buffer *statsdBuffer
sender *sender
random *rand.Rand
sync.Mutex
inputMetrics chan metric
stop chan struct{}
}
func newWorker(pool *bufferPool, sender *sender) *worker {
// Each worker uses its own random source to prevent workers in separate
// goroutines from contending for the lock on the "math/rand" package-global
// random source (e.g. calls like "rand.Float64()" must acquire a shared
// lock to get the next pseudorandom number).
// Note that calling "time.Now().UnixNano()" repeatedly quickly may return
// very similar values. That's fine for seeding the worker-specific random
// source because we just need an evenly distributed stream of float values.
// Do not use this random source for cryptographic randomness.
random := rand.New(rand.NewSource(time.Now().UnixNano()))
return &worker{
pool: pool,
sender: sender,
buffer: pool.borrowBuffer(),
random: random,
stop: make(chan struct{}),
}
}
func (w *worker) startReceivingMetric(bufferSize int) {
w.inputMetrics = make(chan metric, bufferSize)
go w.pullMetric()
}
func (w *worker) stopReceivingMetric() {
w.stop <- struct{}{}
}
func (w *worker) pullMetric() {
for {
select {
case m := <-w.inputMetrics:
w.processMetric(m)
case <-w.stop:
return
}
}
}
func (w *worker) processMetric(m metric) error {
if !w.shouldSample(m.rate) {
return nil
}
w.Lock()
var err error
if err = w.writeMetricUnsafe(m); err == errBufferFull {
w.flushUnsafe()
err = w.writeMetricUnsafe(m)
}
w.Unlock()
return err
}
func (w *worker) shouldSample(rate float64) bool {
if rate < 1 && w.random.Float64() > rate {
return false
}
return true
}
func (w *worker) writeMetricUnsafe(m metric) error {
switch m.metricType {
case gauge:
return w.buffer.writeGauge(m.namespace, m.globalTags, m.name, m.fvalue, m.tags, m.rate)
case count:
return w.buffer.writeCount(m.namespace, m.globalTags, m.name, m.ivalue, m.tags, m.rate)
case histogram:
return w.buffer.writeHistogram(m.namespace, m.globalTags, m.name, m.fvalue, m.tags, m.rate)
case distribution:
return w.buffer.writeDistribution(m.namespace, m.globalTags, m.name, m.fvalue, m.tags, m.rate)
case set:
return w.buffer.writeSet(m.namespace, m.globalTags, m.name, m.svalue, m.tags, m.rate)
case timing:
return w.buffer.writeTiming(m.namespace, m.globalTags, m.name, m.fvalue, m.tags, m.rate)
case event:
return w.buffer.writeEvent(*m.evalue, m.globalTags)
case serviceCheck:
return w.buffer.writeServiceCheck(*m.scvalue, m.globalTags)
default:
return nil
}
}
func (w *worker) flush() {
w.Lock()
w.flushUnsafe()
w.Unlock()
}
func (w *worker) pause() {
w.Lock()
}
func (w *worker) unpause() {
w.Unlock()
}
// flush the current buffer. Lock must be held by caller.
// flushed buffer written to the network asynchronously.
func (w *worker) flushUnsafe() {
if len(w.buffer.bytes()) > 0 {
w.sender.send(w.buffer)
w.buffer = w.pool.borrowBuffer()
}
}