Skip to content

Commit

Permalink
Fix telemetry with extended aggregation and channelMode (#194)
Browse files Browse the repository at this point in the history
When extended aggregation is enabled with channelMode we don't need to
run the workers in channelMode since the user never use them directly
(the aggregator sit in between). The telemetry on the other side was
using the workers in channelMode despite them not being reading on the
end of the channel. This caused the telemetry to never been sent.
  • Loading branch information
hush-hush committed Apr 29, 2021
1 parent 5de76a8 commit fcf7900
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 26 deletions.
54 changes: 30 additions & 24 deletions statsd/statsd.go
Expand Up @@ -202,19 +202,20 @@ type Client struct {
// Tags are global tags to be added to every statsd call
Tags []string
// skipErrors turns off error passing and allows UDS to emulate UDP behaviour
SkipErrors bool
flushTime time.Duration
metrics *ClientMetrics
telemetry *telemetryClient
stop chan struct{}
wg sync.WaitGroup
workers []*worker
closerLock sync.Mutex
receiveMode ReceivingMode
agg *aggregator
aggExtended *aggregator
options []Option
addrOption string
SkipErrors bool
flushTime time.Duration
metrics *ClientMetrics
telemetry *telemetryClient
stop chan struct{}
wg sync.WaitGroup
workers []*worker
closerLock sync.Mutex
workersMode ReceivingMode
aggregatorMode ReceivingMode
agg *aggregator
aggExtended *aggregator
options []Option
addrOption string
}

// ClientMetrics contains metrics about the client
Expand Down Expand Up @@ -361,7 +362,16 @@ func newWithWriter(w statsdWriter, o *Options, writerName string) (*Client, erro

bufferPool := newBufferPool(o.BufferPoolSize, o.MaxBytesPerPayload, o.MaxMessagesPerPayload)
c.sender = newSender(w, o.SenderQueueSize, bufferPool)
c.receiveMode = o.ReceiveMode
c.aggregatorMode = o.ReceiveMode

c.workersMode = o.ReceiveMode
// ChannelMode mode at the worker level is not enabled when
// ExtendedAggregation is since the user app will not directly
// use the worker (the aggregator sit between the app and the
// workers).
if o.ExtendedAggregation {
c.workersMode = MutexMode
}

if o.Aggregation || o.ExtendedAggregation {
c.agg = newAggregator(&c)
Expand All @@ -370,7 +380,7 @@ func newWithWriter(w statsdWriter, o *Options, writerName string) (*Client, erro
if o.ExtendedAggregation {
c.aggExtended = c.agg

if c.receiveMode == ChannelMode {
if c.aggregatorMode == ChannelMode {
c.agg.startReceivingMetric(o.ChannelModeBufferSize)
}
}
Expand All @@ -380,11 +390,7 @@ func newWithWriter(w statsdWriter, o *Options, writerName string) (*Client, erro
w := newWorker(bufferPool, c.sender)
c.workers = append(c.workers, w)

// ChannelMode mode at the worker level is not enabled when
// ExtendedAggregation is since the user app will not directly
// use the worker (the aggregator sit between the app and the
// workers).
if c.receiveMode == ChannelMode && !o.ExtendedAggregation {
if c.workersMode == ChannelMode {
w.startReceivingMetric(o.ChannelModeBufferSize)
}
}
Expand Down Expand Up @@ -497,7 +503,7 @@ func (c *Client) send(m metric) error {
h := hashString32(m.name)
worker := c.workers[h%uint32(len(c.workers))]

if c.receiveMode == ChannelMode {
if c.workersMode == ChannelMode {
select {
case worker.inputMetrics <- m:
default:
Expand All @@ -519,7 +525,7 @@ func (c *Client) sendBlocking(m metric) error {
}

func (c *Client) sendToAggregator(mType metricType, name string, value float64, tags []string, rate float64, f bufferedMetricSampleFunc) error {
if c.receiveMode == ChannelMode {
if c.aggregatorMode == ChannelMode {
select {
case c.aggExtended.inputMetrics <- metric{metricType: mType, name: name, fvalue: value, tags: tags, rate: rate}:
default:
Expand Down Expand Up @@ -666,15 +672,15 @@ func (c *Client) Close() error {
}
close(c.stop)

if c.receiveMode == ChannelMode && c.aggExtended == nil {
if c.workersMode == ChannelMode {
for _, w := range c.workers {
w.stopReceivingMetric()
}
}

// flush the aggregator first
if c.agg != nil {
if c.aggExtended != nil && c.receiveMode == ChannelMode {
if c.aggExtended != nil && c.aggregatorMode == ChannelMode {
c.agg.stopReceivingMetric()
}
c.agg.stop()
Expand Down
4 changes: 2 additions & 2 deletions statsd/statsd_test.go
Expand Up @@ -112,7 +112,7 @@ func TestCloneWithExtraOptions(t *testing.T) {

assert.Equal(t, client.Tags, []string{"tag1", "tag2"})
assert.Equal(t, client.Namespace, "")
assert.Equal(t, client.receiveMode, MutexMode)
assert.Equal(t, client.workersMode, MutexMode)
assert.Equal(t, client.addrOption, defaultAddr)
assert.Len(t, client.options, 1)

Expand All @@ -121,7 +121,7 @@ func TestCloneWithExtraOptions(t *testing.T) {

assert.Equal(t, cloneClient.Tags, []string{"tag1", "tag2"})
assert.Equal(t, cloneClient.Namespace, "test.")
assert.Equal(t, cloneClient.receiveMode, ChannelMode)
assert.Equal(t, cloneClient.workersMode, ChannelMode)
assert.Equal(t, cloneClient.addrOption, defaultAddr)
assert.Len(t, cloneClient.options, 3)
}
Expand Down

0 comments on commit fcf7900

Please sign in to comment.