Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix telemetry with extended aggregation and channelMode #194

Merged
merged 1 commit into from Apr 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
54 changes: 30 additions & 24 deletions statsd/statsd.go
Expand Up @@ -202,19 +202,20 @@ type Client struct {
// Tags are global tags to be added to every statsd call
Tags []string
// skipErrors turns off error passing and allows UDS to emulate UDP behaviour
SkipErrors bool
flushTime time.Duration
metrics *ClientMetrics
telemetry *telemetryClient
stop chan struct{}
wg sync.WaitGroup
workers []*worker
closerLock sync.Mutex
receiveMode ReceivingMode
agg *aggregator
aggExtended *aggregator
options []Option
addrOption string
SkipErrors bool
flushTime time.Duration
metrics *ClientMetrics
telemetry *telemetryClient
stop chan struct{}
wg sync.WaitGroup
workers []*worker
closerLock sync.Mutex
workersMode ReceivingMode
aggregatorMode ReceivingMode
agg *aggregator
aggExtended *aggregator
options []Option
addrOption string
}

// ClientMetrics contains metrics about the client
Expand Down Expand Up @@ -361,7 +362,16 @@ func newWithWriter(w statsdWriter, o *Options, writerName string) (*Client, erro

bufferPool := newBufferPool(o.BufferPoolSize, o.MaxBytesPerPayload, o.MaxMessagesPerPayload)
c.sender = newSender(w, o.SenderQueueSize, bufferPool)
c.receiveMode = o.ReceiveMode
c.aggregatorMode = o.ReceiveMode

c.workersMode = o.ReceiveMode
// ChannelMode mode at the worker level is not enabled when
// ExtendedAggregation is since the user app will not directly
// use the worker (the aggregator sit between the app and the
// workers).
if o.ExtendedAggregation {
c.workersMode = MutexMode
}

if o.Aggregation || o.ExtendedAggregation {
c.agg = newAggregator(&c)
Expand All @@ -370,7 +380,7 @@ func newWithWriter(w statsdWriter, o *Options, writerName string) (*Client, erro
if o.ExtendedAggregation {
c.aggExtended = c.agg

if c.receiveMode == ChannelMode {
if c.aggregatorMode == ChannelMode {
c.agg.startReceivingMetric(o.ChannelModeBufferSize)
}
}
Expand All @@ -380,11 +390,7 @@ func newWithWriter(w statsdWriter, o *Options, writerName string) (*Client, erro
w := newWorker(bufferPool, c.sender)
c.workers = append(c.workers, w)

// ChannelMode mode at the worker level is not enabled when
// ExtendedAggregation is since the user app will not directly
// use the worker (the aggregator sit between the app and the
// workers).
if c.receiveMode == ChannelMode && !o.ExtendedAggregation {
if c.workersMode == ChannelMode {
w.startReceivingMetric(o.ChannelModeBufferSize)
}
}
Expand Down Expand Up @@ -497,7 +503,7 @@ func (c *Client) send(m metric) error {
h := hashString32(m.name)
worker := c.workers[h%uint32(len(c.workers))]

if c.receiveMode == ChannelMode {
if c.workersMode == ChannelMode {
select {
case worker.inputMetrics <- m:
default:
Expand All @@ -519,7 +525,7 @@ func (c *Client) sendBlocking(m metric) error {
}

func (c *Client) sendToAggregator(mType metricType, name string, value float64, tags []string, rate float64, f bufferedMetricSampleFunc) error {
if c.receiveMode == ChannelMode {
if c.aggregatorMode == ChannelMode {
select {
case c.aggExtended.inputMetrics <- metric{metricType: mType, name: name, fvalue: value, tags: tags, rate: rate}:
default:
Expand Down Expand Up @@ -666,15 +672,15 @@ func (c *Client) Close() error {
}
close(c.stop)

if c.receiveMode == ChannelMode && c.aggExtended == nil {
if c.workersMode == ChannelMode {
for _, w := range c.workers {
w.stopReceivingMetric()
}
}

// flush the aggregator first
if c.agg != nil {
if c.aggExtended != nil && c.receiveMode == ChannelMode {
if c.aggExtended != nil && c.aggregatorMode == ChannelMode {
c.agg.stopReceivingMetric()
}
c.agg.stop()
Expand Down
4 changes: 2 additions & 2 deletions statsd/statsd_test.go
Expand Up @@ -112,7 +112,7 @@ func TestCloneWithExtraOptions(t *testing.T) {

assert.Equal(t, client.Tags, []string{"tag1", "tag2"})
assert.Equal(t, client.Namespace, "")
assert.Equal(t, client.receiveMode, MutexMode)
assert.Equal(t, client.workersMode, MutexMode)
assert.Equal(t, client.addrOption, defaultAddr)
assert.Len(t, client.options, 1)

Expand All @@ -121,7 +121,7 @@ func TestCloneWithExtraOptions(t *testing.T) {

assert.Equal(t, cloneClient.Tags, []string{"tag1", "tag2"})
assert.Equal(t, cloneClient.Namespace, "test.")
assert.Equal(t, cloneClient.receiveMode, ChannelMode)
assert.Equal(t, cloneClient.workersMode, ChannelMode)
assert.Equal(t, cloneClient.addrOption, defaultAddr)
assert.Len(t, cloneClient.options, 3)
}
Expand Down