/
options.go
295 lines (269 loc) · 9.88 KB
/
options.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
package statsd
import (
"math"
"strings"
"time"
)
var (
// DefaultNamespace is the default value for the Namespace option
DefaultNamespace = ""
// DefaultTags is the default value for the Tags option
DefaultTags = []string{}
// DefaultMaxBytesPerPayload is the default value for the MaxBytesPerPayload option
DefaultMaxBytesPerPayload = 0
// DefaultMaxMessagesPerPayload is the default value for the MaxMessagesPerPayload option
DefaultMaxMessagesPerPayload = math.MaxInt32
// DefaultBufferPoolSize is the default value for the DefaultBufferPoolSize option
DefaultBufferPoolSize = 0
// DefaultBufferFlushInterval is the default value for the BufferFlushInterval option
DefaultBufferFlushInterval = 100 * time.Millisecond
// DefaultBufferShardCount is the default value for the BufferShardCount option
DefaultBufferShardCount = 32
// DefaultSenderQueueSize is the default value for the DefaultSenderQueueSize option
DefaultSenderQueueSize = 0
// DefaultWriteTimeoutUDS is the default value for the WriteTimeoutUDS option
DefaultWriteTimeoutUDS = 1 * time.Millisecond
// DefaultTelemetry is the default value for the Telemetry option
DefaultTelemetry = true
// DefaultReceivingMode is the default behavior when sending metrics
DefaultReceivingMode = MutexMode
// DefaultChannelModeBufferSize is the default size of the channel holding incoming metrics
DefaultChannelModeBufferSize = 4096
// DefaultAggregationFlushInterval is the default interval for the aggregator to flush metrics.
DefaultAggregationFlushInterval = 3 * time.Second
// DefaultAggregation
DefaultAggregation = false
// DefaultDevMode
DefaultDevMode = false
)
// Options contains the configuration options for a client.
type Options struct {
// Namespace to prepend to all metrics, events and service checks name.
Namespace string
// Tags are global tags to be applied to every metrics, events and service checks.
Tags []string
// MaxBytesPerPayload is the maximum number of bytes a single payload will contain.
// The magic value 0 will set the option to the optimal size for the transport
// protocol used when creating the client: 1432 for UDP and 8192 for UDS.
MaxBytesPerPayload int
// MaxMessagesPerPayload is the maximum number of metrics, events and/or service checks a single payload will contain.
// This option can be set to `1` to create an unbuffered client.
MaxMessagesPerPayload int
// BufferPoolSize is the size of the pool of buffers in number of buffers.
// The magic value 0 will set the option to the optimal size for the transport
// protocol used when creating the client: 2048 for UDP and 512 for UDS.
BufferPoolSize int
// BufferFlushInterval is the interval after which the current buffer will get flushed.
BufferFlushInterval time.Duration
// BufferShardCount is the number of buffer "shards" that will be used.
// Those shards allows the use of multiple buffers at the same time to reduce
// lock contention.
BufferShardCount int
// SenderQueueSize is the size of the sender queue in number of buffers.
// The magic value 0 will set the option to the optimal size for the transport
// protocol used when creating the client: 2048 for UDP and 512 for UDS.
SenderQueueSize int
// WriteTimeoutUDS is the timeout after which a UDS packet is dropped.
WriteTimeoutUDS time.Duration
// Telemetry is a set of metrics automatically injected by the client in the
// dogstatsd stream to be able to monitor the client itself.
Telemetry bool
// ReceiveMode determins the behavior of the client when receiving to many
// metrics. The client will either drop the metrics if its buffers are
// full (ChannelMode mode) or block the caller until the metric can be
// handled (MutexMode mode). By default the client will MutexMode. This
// option should be set to ChannelMode only when use under very high
// load.
//
// MutexMode uses a mutex internally which is much faster than
// channel but causes some lock contention when used with a high number
// of threads. Mutex are sharded based on the metrics name which
// limit mutex contention when goroutines send different metrics.
//
// ChannelMode: uses channel (of ChannelModeBufferSize size) to send
// metrics and drop metrics if the channel is full. Sending metrics in
// this mode is slower that MutexMode (because of the channel), but
// will not block the application. This mode is made for application
// using many goroutines, sending the same metrics at a very high
// volume. The goal is to not slow down the application at the cost of
// dropping metrics and having a lower max throughput.
ReceiveMode ReceivingMode
// ChannelModeBufferSize is the size of the channel holding incoming metrics
ChannelModeBufferSize int
// AggregationFlushInterval is the interval for the aggregator to flush metrics
AggregationFlushInterval time.Duration
// [beta] Aggregation enables/disables client side aggregation
Aggregation bool
// TelemetryAddr specify a different endpoint for telemetry metrics.
TelemetryAddr string
// DevMode enables the "dev" mode where the client sends much more
// telemetry metrics to help troubleshooting the client behavior.
DevMode bool
}
func resolveOptions(options []Option) (*Options, error) {
o := &Options{
Namespace: DefaultNamespace,
Tags: DefaultTags,
MaxBytesPerPayload: DefaultMaxBytesPerPayload,
MaxMessagesPerPayload: DefaultMaxMessagesPerPayload,
BufferPoolSize: DefaultBufferPoolSize,
BufferFlushInterval: DefaultBufferFlushInterval,
BufferShardCount: DefaultBufferShardCount,
SenderQueueSize: DefaultSenderQueueSize,
WriteTimeoutUDS: DefaultWriteTimeoutUDS,
Telemetry: DefaultTelemetry,
ReceiveMode: DefaultReceivingMode,
ChannelModeBufferSize: DefaultChannelModeBufferSize,
AggregationFlushInterval: DefaultAggregationFlushInterval,
Aggregation: DefaultAggregation,
DevMode: DefaultDevMode,
}
for _, option := range options {
err := option(o)
if err != nil {
return nil, err
}
}
return o, nil
}
// Option is a client option. Can return an error if validation fails.
type Option func(*Options) error
// WithNamespace sets the Namespace option.
func WithNamespace(namespace string) Option {
return func(o *Options) error {
if strings.HasSuffix(namespace, ".") {
o.Namespace = namespace
} else {
o.Namespace = namespace + "."
}
return nil
}
}
// WithTags sets the Tags option.
func WithTags(tags []string) Option {
return func(o *Options) error {
o.Tags = tags
return nil
}
}
// WithMaxMessagesPerPayload sets the MaxMessagesPerPayload option.
func WithMaxMessagesPerPayload(maxMessagesPerPayload int) Option {
return func(o *Options) error {
o.MaxMessagesPerPayload = maxMessagesPerPayload
return nil
}
}
// WithMaxBytesPerPayload sets the MaxBytesPerPayload option.
func WithMaxBytesPerPayload(MaxBytesPerPayload int) Option {
return func(o *Options) error {
o.MaxBytesPerPayload = MaxBytesPerPayload
return nil
}
}
// WithBufferPoolSize sets the BufferPoolSize option.
func WithBufferPoolSize(bufferPoolSize int) Option {
return func(o *Options) error {
o.BufferPoolSize = bufferPoolSize
return nil
}
}
// WithBufferFlushInterval sets the BufferFlushInterval option.
func WithBufferFlushInterval(bufferFlushInterval time.Duration) Option {
return func(o *Options) error {
o.BufferFlushInterval = bufferFlushInterval
return nil
}
}
// WithBufferShardCount sets the BufferShardCount option.
func WithBufferShardCount(bufferShardCount int) Option {
return func(o *Options) error {
o.BufferShardCount = bufferShardCount
return nil
}
}
// WithSenderQueueSize sets the SenderQueueSize option.
func WithSenderQueueSize(senderQueueSize int) Option {
return func(o *Options) error {
o.SenderQueueSize = senderQueueSize
return nil
}
}
// WithWriteTimeoutUDS sets the WriteTimeoutUDS option.
func WithWriteTimeoutUDS(writeTimeoutUDS time.Duration) Option {
return func(o *Options) error {
o.WriteTimeoutUDS = writeTimeoutUDS
return nil
}
}
// WithoutTelemetry disables the telemetry
func WithoutTelemetry() Option {
return func(o *Options) error {
o.Telemetry = false
return nil
}
}
// WithChannelMode will use channel to receive metrics
func WithChannelMode() Option {
return func(o *Options) error {
o.ReceiveMode = ChannelMode
return nil
}
}
// WithMutexMode will use mutex to receive metrics
func WithMutexMode() Option {
return func(o *Options) error {
o.ReceiveMode = MutexMode
return nil
}
}
// WithChannelModeBufferSize the channel buffer size when using "drop mode"
func WithChannelModeBufferSize(bufferSize int) Option {
return func(o *Options) error {
o.ChannelModeBufferSize = bufferSize
return nil
}
}
// WithAggregationInterval set the aggregation interval
func WithAggregationInterval(interval time.Duration) Option {
return func(o *Options) error {
o.AggregationFlushInterval = interval
return nil
}
}
// WithClientSideAggregation enables client side aggregation. Client side aggregation is a beta feature.
func WithClientSideAggregation() Option {
return func(o *Options) error {
o.Aggregation = true
return nil
}
}
// WithoutClientSideAggregation disables client side aggregation.
func WithoutClientSideAggregation() Option {
return func(o *Options) error {
o.Aggregation = false
return nil
}
}
// WithTelemetryAddr specify a different address for telemetry metrics.
func WithTelemetryAddr(addr string) Option {
return func(o *Options) error {
o.TelemetryAddr = addr
return nil
}
}
// WithDevMode enables client "dev" mode, sending more Telemetry metrics to
// help troubleshoot client behavior.
func WithDevMode() Option {
return func(o *Options) error {
o.DevMode = true
return nil
}
}
// WithoutDevMode disables client "dev" mode, sending more Telemetry metrics to
// help troubleshoot client behavior.
func WithoutDevMode() Option {
return func(o *Options) error {
o.DevMode = false
return nil
}
}