Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wip #285

Draft
wants to merge 8 commits into
base: master
Choose a base branch
from
Draft

wip #285

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 7 additions & 1 deletion example/simple_example.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,19 @@ package main

import (
"log"
"os"

"github.com/DataDog/datadog-go/v5/statsd"
)

func main() {
client, err := statsd.New("127.0.0.1:8125",
addr := os.Getenv("DOGSTATSD_ADDR")
if addr == "" {
addr = "127.0.0.1:8125"
}
client, err := statsd.New(addr,
statsd.WithTags([]string{"env:prod", "service:myservice"}),
statsd.WithErrorHandler(statsd.LoggingErrorHandler),
)
if err != nil {
log.Fatal(err)
Expand Down
44 changes: 44 additions & 0 deletions flooder/cmd/flood/flood.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package flood

import (
"github.com/DataDog/datadog-go/v5/flooder/pkg/flood"
"os"
"time"

"github.com/spf13/cobra"
)

// floodCmd represents the base command when called without any subcommands
var floodCmd = &cobra.Command{
Use: "v5",
Short: "Sends a lot of statsd points.",
Run: func(cmd *cobra.Command, args []string) {
flood.Flood(cmd, args)
},
}

// Execute adds all child commands to the root command and sets flags appropriately.
// This is called by main.main(). It only needs to happen once to the rootCmd.
func Execute() {
err := floodCmd.Execute()
if err != nil {
os.Exit(1)
}
}

func init() {
floodCmd.Flags().StringP("address", "", "127.0.0.1:8125", "Address of the statsd server")
floodCmd.Flags().StringP("telemetry-address", "", "", "Address of the telemetry server")
floodCmd.Flags().BoolP("client-side-aggregation", "", false, "Enable client-side aggregation")
floodCmd.Flags().BoolP("extended-client-side-aggregation", "", false, "Enable extended client-side aggregation")
floodCmd.Flags().BoolP("channel-mode", "", false, "Enable channel mode")
floodCmd.Flags().IntP("channel-mode-buffer-size", "", 4096, "Set channel mode buffer size")
floodCmd.Flags().IntP("sender-queue-size", "", 512, "Set sender queue size")
floodCmd.Flags().DurationP("buffer-flush-interval", "", time.Duration(4)*time.Second, "Set buffer flush interval")
floodCmd.Flags().DurationP("write-timeout", "", time.Duration(100)*time.Millisecond, "Set write timeout")
floodCmd.Flags().StringSliceP("tags", "", []string{}, "Set tags")
floodCmd.Flags().IntP("points-per-10seconds", "", 100000, "Set points per 10 seconds")
floodCmd.Flags().BoolP("send-at-start-of-bucket", "", false, "Send all the points at the start of the 10 sec time bucket.")
floodCmd.Flags().BoolP("verbose", "", false, "Enable verbose mode")
floodCmd.Flags().BoolP("send-with-timestamp", "", false, "Send the timestamp of the time the point was sent")
}
7 changes: 7 additions & 0 deletions flooder/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package main

import "github.com/DataDog/datadog-go/v5/flooder/cmd/flood"

func main() {
flood.Execute()
}
237 changes: 237 additions & 0 deletions flooder/pkg/flood/flood.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
package flood

import (
"hash/fnv"
"log"
"strconv"
"strings"
"time"

"github.com/spf13/cobra"

"github.com/DataDog/datadog-go/v5/statsd"
)

type client struct {
client *statsd.Client
pointsPer10Seconds int
sendAtStartOfBucket bool
sendWithTimestamp bool
errors int
}

func initClient(command *cobra.Command) (*client, error) {
var options []statsd.Option

tags := []string{}

b, err := command.Flags().GetBool("client-side-aggregation")
if err != nil {
return nil, err
}
if b {
options = append(options, statsd.WithClientSideAggregation())
} else {
options = append(options, statsd.WithoutClientSideAggregation())
}
tags = append(tags, "client-side-aggregation:"+strconv.FormatBool(b))

b, err = command.Flags().GetBool("extended-client-side-aggregation")
if err != nil {
return nil, err
}
if b {
options = append(options, statsd.WithExtendedClientSideAggregation())
}
tags = append(tags, "extended-client-side-aggregation"+strconv.FormatBool(b))

b, err = command.Flags().GetBool("channel-mode")
if err != nil {
return nil, err
}
if b {
options = append(options, statsd.WithChannelMode())
}
tags = append(tags, "channel-mode:"+strconv.FormatBool(b))

i, err := command.Flags().GetInt("channel-mode-buffer-size")
if err != nil {
return nil, err
}
options = append(options, statsd.WithChannelModeBufferSize(i))
tags = append(tags, "channel-mode-buffer-size:"+strconv.Itoa(i))

i, err = command.Flags().GetInt("sender-queue-size")
if err != nil {
return nil, err
}
options = append(options, statsd.WithSenderQueueSize(i))
tags = append(tags, "sender-queue-size:"+strconv.Itoa(i))

d, err := command.Flags().GetDuration("buffer-flush-interval")
if err != nil {
return nil, err
}
options = append(options, statsd.WithBufferFlushInterval(d))
tags = append(tags, "buffer-flush-interval:"+d.String())

d, err = command.Flags().GetDuration("write-timeout")
if err != nil {
return nil, err
}
options = append(options, statsd.WithWriteTimeout(d))
tags = append(tags, "write-timeout:"+d.String())

pointsPer10Seconds, err := command.Flags().GetInt("points-per-10seconds")
if err != nil {
return nil, err
}
tags = append(tags, "points-per-10seconds:"+strconv.Itoa(pointsPer10Seconds))

sendAtStart, err := command.Flags().GetBool("send-at-start-of-bucket")
if err != nil {
return nil, err
}
tags = append(tags, "send-at-start-of-bucket:"+strconv.FormatBool(sendAtStart))

sendWithTimestamp, err := command.Flags().GetBool("send-with-timestamp")
if err != nil {
return nil, err
}
tags = append(tags, "send-with-timestamp:"+strconv.FormatBool(sendWithTimestamp))

address, err := command.Flags().GetString("address")
if err != nil {
return nil, err
}
tags = append(tags, "address:"+address)

transport := "udp"
if strings.HasPrefix(address, statsd.UnixAddressPrefix) {
transport = "unix"
}
tags = append(tags, "transport:"+transport)

telemetryAddress, err := command.Flags().GetString("telemetry-address")
if err != nil {
return nil, err
}
if telemetryAddress == "" {
telemetryAddress = address
}
tags = append(tags, "telemetry-address:"+telemetryAddress)
options = append(options, statsd.WithTelemetryAddr(telemetryAddress))

telemetryTransport := "udp"
if strings.HasPrefix(telemetryAddress, statsd.UnixAddressPrefix) {
telemetryTransport = "unix"
}
tags = append(tags, "telemetry-transport:"+telemetryTransport)

t, err := command.Flags().GetStringSlice("tags")
tags = append(tags, t...)
h := hash(tags)
tags = append(tags, "client-hash:"+strconv.Itoa(int(h)))

options = append(options, statsd.WithTags(tags))

log.Printf("Tags: %v - Hash: %x", tags, h)

verbose, err := command.Flags().GetBool("verbose")
if err != nil {
return nil, err
}
options = append(options, statsd.WithOriginDetection())

client := &client{
pointsPer10Seconds: pointsPer10Seconds,
sendAtStartOfBucket: sendAtStart,
sendWithTimestamp: sendWithTimestamp,
}
var errorHandler statsd.ErrorHandler
if verbose {
errorHandler = func(err error) { verboseErrorHandler(err, client) }
} else {
errorHandler = func(err error) { simpleErrorHandler(err, client) }
}
options = append(options, statsd.WithErrorHandler(errorHandler))

client.client, err = statsd.New(address, options...)
if err != nil {
return nil, err
}

return client, nil
}

func hash(s []string) uint32 {
h := fnv.New32a()
for _, e := range s {
h.Write([]byte(e))
}
return h.Sum32()
}

func simpleErrorHandler(err error, c *client) {
c.errors++
}

func verboseErrorHandler(err error, c *client) {
simpleErrorHandler(err, c)
statsd.LoggingErrorHandler(err)
}

func Flood(command *cobra.Command, args []string) {
c, err := initClient(command)
if err != nil {
log.Fatal(err)
}
log.Printf("Sending %d points per 10 seconds", c.pointsPer10Seconds)

for {
t1 := time.Now()

if c.sendWithTimestamp {
err = c.client.CountWithTimestamp("flood.dogstatsd.count", int64(c.pointsPer10Seconds), []string{}, 1, t1)
if err != nil {
log.Printf("Error: %v", err)
}
err := c.client.CountWithTimestamp("flood.dogstatsd.expected", int64(c.pointsPer10Seconds), []string{}, 1, t1)
if err != nil {
log.Printf("Error: %v", err)
}
} else {
for sent := 0; sent < c.pointsPer10Seconds; sent++ {
err = c.client.Incr("flood.dogstatsd.count", []string{"sent:" + strconv.Itoa(sent)}, 1)
if err != nil {
log.Printf("Error: %v", err)
}
if !c.sendAtStartOfBucket {
time.Sleep(time.Duration(8) * time.Second / time.Duration(c.pointsPer10Seconds))
}
}
err := c.client.Count("flood.dogstatsd.expected", int64(c.pointsPer10Seconds), []string{}, 1)
if err != nil {
log.Printf("Error: %v", err)
}
}

t2 := time.Now()

duration := t2.Sub(t1)
s := time.Duration(10)*time.Second - duration

if c.errors > 0 {
log.Printf("Errors: %d", c.errors)
c.errors = 0
}
if s > 0 {
// Sleep until the next bucket
log.Printf("Sleeping for %f seconds", s.Seconds())
time.Sleep(s)
} else {
log.Printf("We're %f seconds behind", s.Seconds()*-1)
}
}
c.client.Close()
}
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,7 @@ go 1.13
require (
github.com/Microsoft/go-winio v0.5.0
github.com/golang/mock v1.6.0
github.com/spf13/cobra v1.7.0 // indirect
github.com/stretchr/testify v1.8.1
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4
)
9 changes: 9 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,14 +1,22 @@
github.com/Microsoft/go-winio v0.5.0 h1:Elr9Wn+sGKPlkaBvwu4mTrxtmOp3F3yV9qhaHbXGjwU=
github.com/Microsoft/go-winio v0.5.0/go.mod h1:JPGBdM1cNvN/6ISo+n8V5iA4v8pBzdOpzfwIujj1a84=
github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc=
github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs=
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/sirupsen/logrus v1.7.0/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
github.com/spf13/cobra v1.7.0 h1:hyqWnYt1ZQShIddO5kBpj3vu05/++x6tJ6dg8EC572I=
github.com/spf13/cobra v1.7.0/go.mod h1:uLxZILRyS/50WlhOIKD7W6V5bgeIt+4sICxh6uRMrb0=
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c=
Expand All @@ -24,6 +32,7 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4 h1:4nGaVu0QrbjT/AK2PRLuQfQuh6DJve+pELhqTdAj3x0=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
Expand Down