Skip to content

Commit

Permalink
Merge pull request #296 from DataDog/corentin.chary/send-multiple-sam…
Browse files Browse the repository at this point in the history
…ples

Add an API to send multiple samples at once
  • Loading branch information
iksaif committed Dec 15, 2023
2 parents e255066 + f901e25 commit 112920c
Show file tree
Hide file tree
Showing 8 changed files with 225 additions and 5 deletions.
2 changes: 1 addition & 1 deletion statsd/buffered_metric_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (bc *bufferedMetricContexts) sample(name string, value float64, tags []stri
}

context, stringTags := getContextAndTags(name, tags)
var v *bufferedMetric = nil
var v *bufferedMetric

bc.mutex.RLock()
v, _ = bc.values[context]
Expand Down
48 changes: 48 additions & 0 deletions statsd/end_to_end_udp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,40 @@ func getTestMap() map[string]testCase {
ts.assert(t, client, expectedMetrics)
},
},
"Basic Extended client side aggregation + Maximum number of Samples + ChannelMode": testCase{
[]Option{
WithExtendedClientSideAggregation(),
WithMaxSamplesPerContext(2),
WithChannelMode(),
WithoutTelemetry(),
},
func(t *testing.T, ts *testServer, client *Client) {
expectedMetrics := ts.sendExtendedBasicAggregationMetrics(client)
ts.assert(t, client, expectedMetrics)
},
},
}
}

type testCaseDirect struct {
opt []Option
testFunc func(*testing.T, *testServer, *ClientDirect)
}

func getTestMapDirect() map[string]testCaseDirect {
return map[string]testCaseDirect{
"Basic Extended client side aggregation + Maximum number of Samples + ChannelMode + pre-sampled distributions": testCaseDirect{
[]Option{
WithExtendedClientSideAggregation(),
WithMaxSamplesPerContext(2),
WithChannelMode(),
WithoutTelemetry(),
},
func(t *testing.T, ts *testServer, client *ClientDirect) {
expectedMetrics := ts.sendExtendedBasicAggregationMetricsWithPreAggregatedSamples(client)
ts.assert(t, client.Client, expectedMetrics)
},
},
}
}

Expand All @@ -356,3 +390,17 @@ func TestFullPipelineUDP(t *testing.T) {
})
}
}

func TestFullPipelineUDPDirectClient(t *testing.T) {
for testName, c := range getTestMapDirect() {
t.Run(testName, func(t *testing.T) {
ts, client := newClientDirectAndTestServer(t,
"udp",
"localhost:8765",
nil,
c.opt...,
)
c.testFunc(t, ts, client)
})
}
}
48 changes: 48 additions & 0 deletions statsd/mocks/statsd_direct.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions statsd/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,3 +104,15 @@ func (n *NoOpClient) GetTelemetry() Telemetry {
// Verify that NoOpClient implements the ClientInterface.
// https://golang.org/doc/faq#guarantee_satisfies_interface
var _ ClientInterface = &NoOpClient{}

// NoOpClientDirect implements ClientDirectInterface and does nothing.
type NoOpClientDirect struct {
NoOpClient
}

// DistributionSamples does nothing and returns nil
func (n *NoOpClientDirect) DistributionSamples(name string, values []float64, tags []string, rate float64) error {
return nil
}

var _ ClientDirectInterface = &NoOpClientDirect{}
9 changes: 9 additions & 0 deletions statsd/noop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,12 @@ func TestNoOpClient(t *testing.T) {
a.Nil(c.Close())
a.Nil(c.Flush())
}

func TestNoopClientDirect(t *testing.T) {
a := assert.New(t)
c := NoOpClientDirect{}
tags := []string{"a:b"}

a.Nil(c.Gauge("asd", 123.4, tags, 56.0))
a.Nil(c.DistributionSamples("asd", []float64{1.234, 4.567}, tags, 56.0))
}
3 changes: 3 additions & 0 deletions statsd/statsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,9 @@ type ClientInterface interface {
Histogram(name string, value float64, tags []string, rate float64) error

// Distribution tracks the statistical distribution of a set of values across your infrastructure.
//
// It is recommended to use `WithMaxBufferedMetricsPerContext` to avoid dropping metrics at high throughput, `rate` can
// also be used to limit the load. Both options can *not* be used together.
Distribution(name string, value float64, tags []string, rate float64) error

// Decr is just Count of -1
Expand Down
69 changes: 69 additions & 0 deletions statsd/statsd_direct.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package statsd

import (
"io"
"strings"
"sync/atomic"
)

type ClientDirectInterface interface {
DistributionSamples(name string, values []float64, tags []string, rate float64) error
}

// ClientDirect is an *experimental* statsd client that gives direct access to some dogstatsd features.
//
// It is not recommended to use this client in production. This client might allow you to take advantage of
// new features in the agent before they are released, but it might also break your application.
type ClientDirect struct {
*Client
}

// NewDirect returns a pointer to a new ClientDirect given an addr in the format "hostname:port" for UDP,
// "unix:///path/to/socket" for UDS or "\\.\pipe\path\to\pipe" for Windows Named Pipes.
func NewDirect(addr string, options ...Option) (*ClientDirect, error) {
client, err := New(addr, options...)
if err != nil {
return nil, err
}
return &ClientDirect{
client,
}, nil
}

func NewDirectWithWriter(writer io.WriteCloser, options ...Option) (*ClientDirect, error) {
client, err := NewWithWriter(writer, options...)
if err != nil {
return nil, err
}
return &ClientDirect{
client,
}, nil
}

// DistributionSamples is similar to Distribution, but it lets the client deals with the sampling.
//
// The provided `rate` is the sampling rate applied by the client and will *not* be used to apply further
// sampling. This is recommended in high performance cases were the overhead of the statsd library might be
// significant and the sampling is already done by the client.
//
// `WithMaxBufferedMetricsPerContext` is ignored when using this method.
func (c *ClientDirect) DistributionSamples(name string, values []float64, tags []string, rate float64) error {
if c == nil {
return ErrNoClient
}
atomic.AddUint64(&c.telemetry.totalMetricsDistribution, uint64(len(values)))
return c.send(metric{
metricType: distributionAggregated,
name: name,
fvalues: values,
tags: tags,
stags: strings.Join(tags, tagSeparatorSymbol),
rate: rate,
globalTags: c.tags,
namespace: c.namespace,
})
}

// Validate that ClientDirect implements ClientDirectInterface and ClientInterface.
var _ ClientDirectInterface = (*ClientDirect)(nil)
var _ ClientInterface = (*ClientDirect)(nil)
39 changes: 35 additions & 4 deletions statsd/test_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,7 @@ type testServer struct {
telemetryEnabled bool
}

func newClientAndTestServer(t *testing.T, proto string, addr string, tags []string, options ...Option) (*testServer, *Client) {

func newTestServer(t *testing.T, proto string, addr string, tags []string, options ...Option) *testServer {
opt, err := resolveOptions(options)
require.NoError(t, err)

Expand Down Expand Up @@ -108,12 +107,34 @@ func newClientAndTestServer(t *testing.T, proto string, addr string, tags []stri
require.FailNow(t, "unknown proto '%s'", proto)
}

client, err := New(addr, options...)
require.NoError(t, err)
return ts
}

func startTestServer(ts *testServer) {
ts.containerID = getContainerID()

go ts.start()
}

func newClientAndTestServer(t *testing.T, proto string, addr string, tags []string, options ...Option) (*testServer, *Client) {

ts := newTestServer(t, proto, addr, tags, options...)

client, err := New(addr, options...)
require.NoError(t, err)

startTestServer(ts)
return ts, client
}

func newClientDirectAndTestServer(t *testing.T, proto string, addr string, tags []string, options ...Option) (*testServer, *ClientDirect) {

ts := newTestServer(t, proto, addr, tags, options...)

client, err := NewDirect(addr, options...)
require.NoError(t, err)

startTestServer(ts)
return ts, client
}

Expand Down Expand Up @@ -613,6 +634,16 @@ func (ts *testServer) sendExtendedBasicAggregationMetrics(client *Client) []stri
}
}

func (ts *testServer) sendExtendedBasicAggregationMetricsWithPreAggregatedSamples(client *ClientDirect) []string {
expectedMetrics := ts.sendExtendedBasicAggregationMetrics(client.Client)

tags := []string{"custom:1", "custom:2"}
client.DistributionSamples("distro2", []float64{5, 6}, tags, 0.5)

finalTags := ts.getFinalTags(tags...)
return append(expectedMetrics, ts.namespace+"distro2:5:6|d|@0.5"+finalTags)
}

func patchContainerID(id string) { containerID = id }

func resetContainerID() {
Expand Down

0 comments on commit 112920c

Please sign in to comment.