Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix DD_AGENT_HOST variable support #192

Merged
merged 1 commit into from Mar 31, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion README.md
Expand Up @@ -43,7 +43,8 @@ Find a list of all the available options for your DogStatsD Client in the [Datad

### Supported environment variables

* If the `addr` parameter is empty, the client uses the `DD_AGENT_HOST` and (optionally) the `DD_DOGSTATSD_PORT` environment variables to build a target address.
* If the `addr` parameter is empty, the client uses the `DD_AGENT_HOST` environment variables to build a target address.
Example: `DD_AGENT_HOST=127.0.0.1:8125` for UDP, `DD_AGENT_HOST=unix:///path/to/socket` for UDS and `DD_AGENT_HOST=\\.\pipe\my_windows_pipe` for Windows
* If the `DD_ENTITY_ID` environment variable is found, its value is injected as a global `dd.internal.entity_id` tag. The Datadog Agent uses this tag to insert container tags into the metrics. To avoid overwriting this global tag, only `append` to the `c.Tags` slice.

To enable origin detection and set the `DD_ENTITY_ID` environment variable, add the following lines to your application manifest:
Expand Down
123 changes: 65 additions & 58 deletions statsd/pipe_windows_test.go
Expand Up @@ -10,94 +10,103 @@ import (
"time"

"github.com/Microsoft/go-winio"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func createNamedPipe(t *testing.T) (string, *os.File, net.Listener) {
f, err := ioutil.TempFile("", "test-pipe-")
require.Nil(t, err)

pipepath := WindowsPipeAddressPrefix + f.Name()
ln, err := winio.ListenPipe(pipepath, &winio.PipeConfig{
SecurityDescriptor: "D:AI(A;;GA;;;WD)",
InputBufferSize: 1_000_000,
})
if err != nil {
os.Remove(f.Name())
t.Fatal(err)
}
return pipepath, f, ln
}

// acceptOne accepts one single connection from ln, reads 512 bytes from it
// and sends it to the out channel, afterwards closing the connection.
func acceptOne(t *testing.T, ln net.Listener, out chan string) {
conn, err := ln.Accept()
if err != nil {
t.Fatal(err)
}
require.Nil(t, err)

buf := make([]byte, 512)
n, err := conn.Read(buf)
if err != nil {
t.Fatal(err)
}
require.Nil(t, err)

conn.Close()
out <- string(buf[:n])
}

func TestPipeWriter(t *testing.T) {
f, err := ioutil.TempFile("", "test-pipe-")
if err != nil {
t.Fatal(err)
}
pipepath, f, ln := createNamedPipe(t)
defer os.Remove(f.Name())
pipepath := WindowsPipeAddressPrefix + f.Name()
ln, err := winio.ListenPipe(pipepath, &winio.PipeConfig{
SecurityDescriptor: "D:AI(A;;GA;;;WD)",
InputBufferSize: 1_000_000,
})
if err != nil {
t.Fatal(err)
}

out := make(chan string)
go acceptOne(t, ln, out)

client, err := New(pipepath)
if err != nil {
t.Fatal(err)
}
if err := client.Gauge("metric", 1, []string{"key:val"}, 1); err != nil {
t.Fatal(err)
}
require.Nil(t, err)

err = client.Gauge("metric", 1, []string{"key:val"}, 1)
require.Nil(t, err)

got := <-out
if exp := "metric:1|g|#key:val"; got != exp {
t.Fatalf("Expected %q, got %q", exp, got)
}
assert.Equal(t, got, "metric:1|g|#key:val")
}

func TestPipeWriterEnv(t *testing.T) {
pipepath, f, ln := createNamedPipe(t)
defer os.Remove(f.Name())

out := make(chan string)
go acceptOne(t, ln, out)

os.Setenv(agentHostEnvVarName, pipepath)
defer os.Unsetenv(agentHostEnvVarName)

client, err := New("")
require.Nil(t, err)

err = client.Gauge("metric", 1, []string{"key:val"}, 1)
require.Nil(t, err)

got := <-out
assert.Equal(t, got, "metric:1|g|#key:val")
}

func TestPipeWriterReconnect(t *testing.T) {
f, err := ioutil.TempFile("", "test-pipe-")
if err != nil {
t.Fatal(err)
}
pipepath, f, ln := createNamedPipe(t)
defer os.Remove(f.Name())
pipepath := WindowsPipeAddressPrefix + f.Name()
ln, err := winio.ListenPipe(pipepath, &winio.PipeConfig{
SecurityDescriptor: "D:AI(A;;GA;;;WD)",
InputBufferSize: 1_000_000,
})
if err != nil {
t.Fatalf("Listen: %s", err)
}

out := make(chan string)
go acceptOne(t, ln, out)
client, err := New(pipepath)
if err != nil {
t.Fatalf("New: %s", err)
}
require.Nil(t, err)

// first attempt works, then connection closes
if err := client.Gauge("metric", 1, []string{"key:val"}, 1); err != nil {
t.Fatalf("Failed to send gauge: %s", err)
}
err = client.Gauge("metric", 1, []string{"key:val"}, 1)
require.Nil(t, err, "Failed to send gauge: %s", err)

timeout := time.After(1 * time.Second)
select {
case got := <-out:
if exp := "metric:1|g|#key:val"; got != exp {
t.Fatalf("Expected %q, got %q", exp, got)
}
assert.Equal(t, got, "metric:1|g|#key:val")
case <-timeout:
t.Fatal("timeout1")
t.Fatal("timeout receiving the first metric")
}

// second attempt fails by attempting the same connection
go acceptOne(t, ln, out)
if err := client.Gauge("metric", 2, []string{"key:val"}, 1); err != nil {
t.Fatalf("Failed to send second gauge: %s", err)
}
err = client.Gauge("metric", 2, []string{"key:val"}, 1)
require.Nil(t, err, "Failed to send second gauge: %s", err)

timeout = time.After(100 * time.Millisecond)
select {
case <-out:
Expand All @@ -108,15 +117,13 @@ func TestPipeWriterReconnect(t *testing.T) {

// subsequent attempts succeed with new connection
for n := 0; n < 3; n++ {
if err := client.Gauge("metric", 3, []string{"key:val"}, 1); err != nil {
t.Fatalf("Failed to send second gauge: %s", err)
}
err = client.Gauge("metric", 3, []string{"key:val"}, 1)
require.Nil(t, err, "Failed to send second gauge: %s", err)

timeout = time.After(500 * time.Millisecond)
select {
case got := <-out:
if exp := "metric:3|g|#key:val"; got != exp {
t.Fatalf("Expected %q, got %q", exp, got)
}
assert.Equal(t, got, "metric:3|g|#key:val")
return
case <-timeout:
continue
Expand Down
39 changes: 37 additions & 2 deletions statsd/statsd.go
Expand Up @@ -11,6 +11,7 @@ statsd is based on go-statsd-client.
package statsd

import (
"errors"
"fmt"
"os"
"strings"
Expand Down Expand Up @@ -61,6 +62,12 @@ traffic instead of UDP.
*/
const WindowsPipeAddressPrefix = `\\.\pipe\`

const (
agentHostEnvVarName = "DD_AGENT_HOST"
agentPortEnvVarName = "DD_DOGSTATSD_PORT"
defaultUDPPort = "8125"
)

/*
ddEnvTagsMapping is a mapping of each "DD_" prefixed environment variable
to a specific tag name.
Expand Down Expand Up @@ -228,7 +235,35 @@ type ClientMetrics struct {
// https://golang.org/doc/faq#guarantee_satisfies_interface
var _ ClientInterface = &Client{}

func resolveAddr(addr string) (statsdWriter, string, error) {
func resolveAddr(addr string) string {
envPort := ""
if addr == "" {
addr = os.Getenv(agentHostEnvVarName)
envPort = os.Getenv(agentPortEnvVarName)
}

if addr == "" {
return ""
}

if !strings.HasPrefix(addr, WindowsPipeAddressPrefix) && !strings.HasPrefix(addr, UnixAddressPrefix) {
if !strings.Contains(addr, ":") {
if envPort != "" {
addr = fmt.Sprintf("%s:%s", addr, envPort)
} else {
addr = fmt.Sprintf("%s:%s", addr, defaultUDPPort)
}
}
}
return addr
}

func createWriter(addr string) (statsdWriter, string, error) {
addr = resolveAddr(addr)
if addr == "" {
return nil, "", errors.New("No address passed and autodetection from environment failed")
}

switch {
case strings.HasPrefix(addr, WindowsPipeAddressPrefix):
w, err := newWindowsPipeWriter(addr)
Expand All @@ -250,7 +285,7 @@ func New(addr string, options ...Option) (*Client, error) {
return nil, err
}

w, writerType, err := resolveAddr(addr)
w, writerType, err := createWriter(addr)
if err != nil {
return nil, err
}
Expand Down
48 changes: 48 additions & 0 deletions statsd/statsd_test.go
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"io"
"net"
"os"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -179,3 +180,50 @@ func TestCloneWithExtraOptions(t *testing.T) {
assert.Equal(t, cloneClient.addrOption, addr)
assert.Len(t, cloneClient.options, 3)
}

func TestResolveAddressFromEnvironment(t *testing.T) {
hostInitialValue, hostInitiallySet := os.LookupEnv(agentHostEnvVarName)
if hostInitiallySet {
defer os.Setenv(agentHostEnvVarName, hostInitialValue)
} else {
defer os.Unsetenv(agentHostEnvVarName)
}
portInitialValue, portInitiallySet := os.LookupEnv(agentPortEnvVarName)
if portInitiallySet {
defer os.Setenv(agentPortEnvVarName, portInitialValue)
} else {
defer os.Unsetenv(agentPortEnvVarName)
}

for _, tc := range []struct {
name string
addrParam string
hostEnv string
portEnv string
expectedAddr string
}{
{"UPD Nominal case", "127.0.0.1:1234", "", "", "127.0.0.1:1234"},
{"UPD Parameter overrides environment", "127.0.0.1:8125", "10.12.16.9", "1234", "127.0.0.1:8125"},
{"UPD Host and port passed as env", "", "10.12.16.9", "1234", "10.12.16.9:1234"},
{"UPD Host env, default port", "", "10.12.16.9", "", "10.12.16.9:8125"},
{"UPD Host passed, ignore env port", "10.12.16.9", "", "1234", "10.12.16.9:8125"},

{"UDS socket passed", "unix://test/path.socket", "", "", "unix://test/path.socket"},
{"UDS socket env", "", "unix://test/path.socket", "", "unix://test/path.socket"},
{"UDS socket env with port", "", "unix://test/path.socket", "8125", "unix://test/path.socket"},

{"Pipe passed", "\\\\.\\pipe\\my_pipe", "", "", "\\\\.\\pipe\\my_pipe"},
{"Pipe env", "", "\\\\.\\pipe\\my_pipe", "", "\\\\.\\pipe\\my_pipe"},
{"Pipe env with port", "", "\\\\.\\pipe\\my_pipe", "8125", "\\\\.\\pipe\\my_pipe"},

{"No autodetection failed", "", "", "", ""},
} {
t.Run(tc.name, func(t *testing.T) {
os.Setenv(agentHostEnvVarName, tc.hostEnv)
os.Setenv(agentPortEnvVarName, tc.portEnv)

addr := resolveAddr(tc.addrParam)
assert.Equal(t, tc.expectedAddr, addr)
})
}
}
2 changes: 1 addition & 1 deletion statsd/telemetry.go
Expand Up @@ -51,7 +51,7 @@ func newTelemetryClient(c *Client, transport string, devMode bool) *telemetryCli
}

func newTelemetryClientWithCustomAddr(c *Client, transport string, devMode bool, telemetryAddr string, pool *bufferPool) (*telemetryClient, error) {
telemetryWriter, _, err := resolveAddr(telemetryAddr)
telemetryWriter, _, err := createWriter(telemetryAddr)
if err != nil {
return nil, fmt.Errorf("Could not resolve telemetry address: %v", err)
}
Expand Down
33 changes: 0 additions & 33 deletions statsd/udp.go
Expand Up @@ -2,32 +2,17 @@ package statsd

import (
"errors"
"fmt"
"net"
"os"
"time"
)

const (
autoHostEnvName = "DD_AGENT_HOST"
autoPortEnvName = "DD_DOGSTATSD_PORT"
defaultUDPPort = "8125"
)

// udpWriter is an internal class wrapping around management of UDP connection
type udpWriter struct {
conn net.Conn
}

// New returns a pointer to a new udpWriter given an addr in the format "hostname:port".
func newUDPWriter(addr string) (*udpWriter, error) {
if addr == "" {
addr = addressFromEnvironment()
}
if addr == "" {
return nil, errors.New("No address passed and autodetection from environment failed")
}

udpAddr, err := net.ResolveUDPAddr("udp", addr)
if err != nil {
return nil, err
Expand All @@ -53,21 +38,3 @@ func (w *udpWriter) Write(data []byte) (int, error) {
func (w *udpWriter) Close() error {
return w.conn.Close()
}

func (w *udpWriter) remoteAddr() net.Addr {
return w.conn.RemoteAddr()
}

func addressFromEnvironment() string {
autoHost := os.Getenv(autoHostEnvName)
if autoHost == "" {
return ""
}

autoPort := os.Getenv(autoPortEnvName)
if autoPort == "" {
autoPort = defaultUDPPort
}

return fmt.Sprintf("%s:%s", autoHost, autoPort)
}