From 56af8d97a38003c0e805dac971a0f14b637f4a4c Mon Sep 17 00:00:00 2001 From: Maxime mouial Date: Wed, 31 Mar 2021 12:48:17 +0200 Subject: [PATCH] Fix DD_AGENT_HOST variable support DD_AGENT_HOST did not support UDS or NamedPipe --- README.md | 3 +- statsd/pipe_windows_test.go | 123 +++++++++++++++++++----------------- statsd/statsd.go | 39 +++++++++++- statsd/statsd_test.go | 48 ++++++++++++++ statsd/telemetry.go | 2 +- statsd/udp.go | 33 ---------- statsd/udp_test.go | 70 -------------------- 7 files changed, 153 insertions(+), 165 deletions(-) delete mode 100644 statsd/udp_test.go diff --git a/README.md b/README.md index 4fe017fe..b740d285 100644 --- a/README.md +++ b/README.md @@ -43,7 +43,8 @@ Find a list of all the available options for your DogStatsD Client in the [Datad ### Supported environment variables -* If the `addr` parameter is empty, the client uses the `DD_AGENT_HOST` and (optionally) the `DD_DOGSTATSD_PORT` environment variables to build a target address. +* If the `addr` parameter is empty, the client uses the `DD_AGENT_HOST` environment variables to build a target address. + Example: `DD_AGENT_HOST=127.0.0.1:8125` for UDP, `DD_AGENT_HOST=unix:///path/to/socket` for UDS and `DD_AGENT_HOST=\\.\pipe\my_windows_pipe` for Windows * If the `DD_ENTITY_ID` environment variable is found, its value is injected as a global `dd.internal.entity_id` tag. The Datadog Agent uses this tag to insert container tags into the metrics. To avoid overwriting this global tag, only `append` to the `c.Tags` slice. To enable origin detection and set the `DD_ENTITY_ID` environment variable, add the following lines to your application manifest: diff --git a/statsd/pipe_windows_test.go b/statsd/pipe_windows_test.go index f32e2d01..364f69f2 100644 --- a/statsd/pipe_windows_test.go +++ b/statsd/pipe_windows_test.go @@ -10,94 +10,103 @@ import ( "time" "github.com/Microsoft/go-winio" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) +func createNamedPipe(t *testing.T) (*os.File, net.Listener) { + f, err := ioutil.TempFile("", "test-pipe-") + require.Nil(t, err) + + pipepath := WindowsPipeAddressPrefix + f.Name() + ln, err := winio.ListenPipe(pipepath, &winio.PipeConfig{ + SecurityDescriptor: "D:AI(A;;GA;;;WD)", + InputBufferSize: 1_000_000, + }) + if err != nil { + os.Remove(f.Name()) + t.Fatal(err) + } + return f, ln +} + // acceptOne accepts one single connection from ln, reads 512 bytes from it // and sends it to the out channel, afterwards closing the connection. func acceptOne(t *testing.T, ln net.Listener, out chan string) { conn, err := ln.Accept() - if err != nil { - t.Fatal(err) - } + require.Nil(t, err) + buf := make([]byte, 512) n, err := conn.Read(buf) - if err != nil { - t.Fatal(err) - } + require.Nil(t, err) + conn.Close() out <- string(buf[:n]) } func TestPipeWriter(t *testing.T) { - f, err := ioutil.TempFile("", "test-pipe-") - if err != nil { - t.Fatal(err) - } + f, ln := createNamedPipe(t) defer os.Remove(f.Name()) - pipepath := WindowsPipeAddressPrefix + f.Name() - ln, err := winio.ListenPipe(pipepath, &winio.PipeConfig{ - SecurityDescriptor: "D:AI(A;;GA;;;WD)", - InputBufferSize: 1_000_000, - }) - if err != nil { - t.Fatal(err) - } + out := make(chan string) go acceptOne(t, ln, out) client, err := New(pipepath) - if err != nil { - t.Fatal(err) - } - if err := client.Gauge("metric", 1, []string{"key:val"}, 1); err != nil { - t.Fatal(err) - } + require.Nil(t, err) + + err = client.Gauge("metric", 1, []string{"key:val"}, 1) + require.Nil(t, err) + got := <-out - if exp := "metric:1|g|#key:val"; got != exp { - t.Fatalf("Expected %q, got %q", exp, got) - } + assert.Equal(t, got, exp) +} + +func TestPipeWriterEnv(t *testing.T) { + f, ln := createNamedPipe(t) + defer os.Remove(f.Name()) + + out := make(chan string) + go acceptOne(t, ln, out) + + os.Setenv(agentHostEnvVarName, pipepath) + defer os.Unsetenv(agentHostEnvVarName) + + client, err := New("") + require.Nil(t, err) + + err = client.Gauge("metric", 1, []string{"key:val"}, 1) + require.Nil(t, err) + + got := <-out + assert.Equal(t, got, exp) } func TestPipeWriterReconnect(t *testing.T) { - f, err := ioutil.TempFile("", "test-pipe-") - if err != nil { - t.Fatal(err) - } + f, ln := createNamedPipe(t) defer os.Remove(f.Name()) - pipepath := WindowsPipeAddressPrefix + f.Name() - ln, err := winio.ListenPipe(pipepath, &winio.PipeConfig{ - SecurityDescriptor: "D:AI(A;;GA;;;WD)", - InputBufferSize: 1_000_000, - }) - if err != nil { - t.Fatalf("Listen: %s", err) - } + out := make(chan string) go acceptOne(t, ln, out) client, err := New(pipepath) - if err != nil { - t.Fatalf("New: %s", err) - } + require.Nil(t, err) // first attempt works, then connection closes - if err := client.Gauge("metric", 1, []string{"key:val"}, 1); err != nil { - t.Fatalf("Failed to send gauge: %s", err) - } + err = client.Gauge("metric", 1, []string{"key:val"}, 1) + require.Nil(t, err, "Failed to send gauge: %s", err) + timeout := time.After(1 * time.Second) select { case got := <-out: - if exp := "metric:1|g|#key:val"; got != exp { - t.Fatalf("Expected %q, got %q", exp, got) - } + assert.Equal(t, got, exp) case <-timeout: - t.Fatal("timeout1") + t.Fatal("timeout receiving the first metric") } // second attempt fails by attempting the same connection go acceptOne(t, ln, out) - if err := client.Gauge("metric", 2, []string{"key:val"}, 1); err != nil { - t.Fatalf("Failed to send second gauge: %s", err) - } + err = client.Gauge("metric", 2, []string{"key:val"}, 1) + require.Nil(t, err, "Failed to send second gauge: %s", err) + timeout = time.After(100 * time.Millisecond) select { case <-out: @@ -108,15 +117,13 @@ func TestPipeWriterReconnect(t *testing.T) { // subsequent attempts succeed with new connection for n := 0; n < 3; n++ { - if err := client.Gauge("metric", 3, []string{"key:val"}, 1); err != nil { - t.Fatalf("Failed to send second gauge: %s", err) - } + err = client.Gauge("metric", 3, []string{"key:val"}, 1) + require.Nil(t, err, "Failed to send second gauge: %s", err) + timeout = time.After(500 * time.Millisecond) select { case got := <-out: - if exp := "metric:3|g|#key:val"; got != exp { - t.Fatalf("Expected %q, got %q", exp, got) - } + assert.Equal(t, got, exp) return case <-timeout: continue diff --git a/statsd/statsd.go b/statsd/statsd.go index c743a1e9..37dd31e3 100644 --- a/statsd/statsd.go +++ b/statsd/statsd.go @@ -11,6 +11,7 @@ statsd is based on go-statsd-client. package statsd import ( + "errors" "fmt" "os" "strings" @@ -61,6 +62,12 @@ traffic instead of UDP. */ const WindowsPipeAddressPrefix = `\\.\pipe\` +const ( + agentHostEnvVarName = "DD_AGENT_HOST" + agentPortEnvVarName = "DD_DOGSTATSD_PORT" + defaultUDPPort = "8125" +) + /* ddEnvTagsMapping is a mapping of each "DD_" prefixed environment variable to a specific tag name. @@ -228,7 +235,35 @@ type ClientMetrics struct { // https://golang.org/doc/faq#guarantee_satisfies_interface var _ ClientInterface = &Client{} -func resolveAddr(addr string) (statsdWriter, string, error) { +func resolveAddr(addr string) string { + envPort := "" + if addr == "" { + addr = os.Getenv(agentHostEnvVarName) + envPort = os.Getenv(agentPortEnvVarName) + } + + if addr == "" { + return "" + } + + if !strings.HasPrefix(addr, WindowsPipeAddressPrefix) && !strings.HasPrefix(addr, UnixAddressPrefix) { + if !strings.Contains(addr, ":") { + if envPort != "" { + addr = fmt.Sprintf("%s:%s", addr, envPort) + } else { + addr = fmt.Sprintf("%s:%s", addr, defaultUDPPort) + } + } + } + return addr +} + +func createWriter(addr string) (statsdWriter, string, error) { + addr = resolveAddr(addr) + if addr == "" { + return nil, "", errors.New("No address passed and autodetection from environment failed") + } + switch { case strings.HasPrefix(addr, WindowsPipeAddressPrefix): w, err := newWindowsPipeWriter(addr) @@ -250,7 +285,7 @@ func New(addr string, options ...Option) (*Client, error) { return nil, err } - w, writerType, err := resolveAddr(addr) + w, writerType, err := createWriter(addr) if err != nil { return nil, err } diff --git a/statsd/statsd_test.go b/statsd/statsd_test.go index 978001b1..ada16472 100644 --- a/statsd/statsd_test.go +++ b/statsd/statsd_test.go @@ -4,6 +4,7 @@ import ( "fmt" "io" "net" + "os" "sync" "testing" "time" @@ -179,3 +180,50 @@ func TestCloneWithExtraOptions(t *testing.T) { assert.Equal(t, cloneClient.addrOption, addr) assert.Len(t, cloneClient.options, 3) } + +func TestResolveAddressFromEnvironment(t *testing.T) { + hostInitialValue, hostInitiallySet := os.LookupEnv(agentHostEnvVarName) + if hostInitiallySet { + defer os.Setenv(agentHostEnvVarName, hostInitialValue) + } else { + defer os.Unsetenv(agentHostEnvVarName) + } + portInitialValue, portInitiallySet := os.LookupEnv(agentPortEnvVarName) + if portInitiallySet { + defer os.Setenv(agentPortEnvVarName, portInitialValue) + } else { + defer os.Unsetenv(agentPortEnvVarName) + } + + for _, tc := range []struct { + name string + addrParam string + hostEnv string + portEnv string + expectedAddr string + }{ + {"UPD Nominal case", "127.0.0.1:1234", "", "", "127.0.0.1:1234"}, + {"UPD Parameter overrides environment", "127.0.0.1:8125", "10.12.16.9", "1234", "127.0.0.1:8125"}, + {"UPD Host and port passed as env", "", "10.12.16.9", "1234", "10.12.16.9:1234"}, + {"UPD Host env, default port", "", "10.12.16.9", "", "10.12.16.9:8125"}, + {"UPD Host passed, ignore env port", "10.12.16.9", "", "1234", "10.12.16.9:8125"}, + + {"UDS socket passed", "unix://test/path.socket", "", "", "unix://test/path.socket"}, + {"UDS socket env", "", "unix://test/path.socket", "", "unix://test/path.socket"}, + {"UDS socket env with port", "", "unix://test/path.socket", "8125", "unix://test/path.socket"}, + + {"Pipe passed", "\\\\.\\pipe\\my_pipe", "", "", "\\\\.\\pipe\\my_pipe"}, + {"Pipe env", "", "\\\\.\\pipe\\my_pipe", "", "\\\\.\\pipe\\my_pipe"}, + {"Pipe env with port", "", "\\\\.\\pipe\\my_pipe", "8125", "\\\\.\\pipe\\my_pipe"}, + + {"No autodetection failed", "", "", "", ""}, + } { + t.Run(tc.name, func(t *testing.T) { + os.Setenv(agentHostEnvVarName, tc.hostEnv) + os.Setenv(agentPortEnvVarName, tc.portEnv) + + addr := resolveAddr(tc.addrParam) + assert.Equal(t, tc.expectedAddr, addr) + }) + } +} diff --git a/statsd/telemetry.go b/statsd/telemetry.go index dca8240d..726f1f6d 100644 --- a/statsd/telemetry.go +++ b/statsd/telemetry.go @@ -51,7 +51,7 @@ func newTelemetryClient(c *Client, transport string, devMode bool) *telemetryCli } func newTelemetryClientWithCustomAddr(c *Client, transport string, devMode bool, telemetryAddr string, pool *bufferPool) (*telemetryClient, error) { - telemetryWriter, _, err := resolveAddr(telemetryAddr) + telemetryWriter, _, err := createWriter(telemetryAddr) if err != nil { return nil, fmt.Errorf("Could not resolve telemetry address: %v", err) } diff --git a/statsd/udp.go b/statsd/udp.go index 9ddff421..8af522c5 100644 --- a/statsd/udp.go +++ b/statsd/udp.go @@ -2,18 +2,10 @@ package statsd import ( "errors" - "fmt" "net" - "os" "time" ) -const ( - autoHostEnvName = "DD_AGENT_HOST" - autoPortEnvName = "DD_DOGSTATSD_PORT" - defaultUDPPort = "8125" -) - // udpWriter is an internal class wrapping around management of UDP connection type udpWriter struct { conn net.Conn @@ -21,13 +13,6 @@ type udpWriter struct { // New returns a pointer to a new udpWriter given an addr in the format "hostname:port". func newUDPWriter(addr string) (*udpWriter, error) { - if addr == "" { - addr = addressFromEnvironment() - } - if addr == "" { - return nil, errors.New("No address passed and autodetection from environment failed") - } - udpAddr, err := net.ResolveUDPAddr("udp", addr) if err != nil { return nil, err @@ -53,21 +38,3 @@ func (w *udpWriter) Write(data []byte) (int, error) { func (w *udpWriter) Close() error { return w.conn.Close() } - -func (w *udpWriter) remoteAddr() net.Addr { - return w.conn.RemoteAddr() -} - -func addressFromEnvironment() string { - autoHost := os.Getenv(autoHostEnvName) - if autoHost == "" { - return "" - } - - autoPort := os.Getenv(autoPortEnvName) - if autoPort == "" { - autoPort = defaultUDPPort - } - - return fmt.Sprintf("%s:%s", autoHost, autoPort) -} diff --git a/statsd/udp_test.go b/statsd/udp_test.go deleted file mode 100644 index e99da272..00000000 --- a/statsd/udp_test.go +++ /dev/null @@ -1,70 +0,0 @@ -package statsd - -import ( - "errors" - "os" - "testing" -) - -func TestAddressFromEnvironment(t *testing.T) { - hostInitialValue, hostInitiallySet := os.LookupEnv(autoHostEnvName) - if hostInitiallySet { - defer os.Setenv(autoHostEnvName, hostInitialValue) - } else { - defer os.Unsetenv(autoHostEnvName) - } - portInitialValue, portInitiallySet := os.LookupEnv(autoPortEnvName) - if portInitiallySet { - defer os.Setenv(autoPortEnvName, portInitialValue) - } else { - defer os.Unsetenv(autoPortEnvName) - } - - for _, tc := range []struct { - addrParam string - hostEnv string - portEnv string - expectedAddr string - expectedErr error - }{ - // Nominal case - {"127.0.0.1:8125", "", "", "127.0.0.1:8125", nil}, - // Parameter overrides environment - {"127.0.0.1:8125", "10.12.16.9", "1234", "127.0.0.1:8125", nil}, - // Host and port passed as env - {"", "10.12.16.9", "1234", "10.12.16.9:1234", nil}, - // Host passed, default port - {"", "10.12.16.9", "", "10.12.16.9:8125", nil}, - // No autodetection failed - {"", "", "", "", errors.New("No address passed and autodetection from environment failed")}, - } { - os.Setenv(autoHostEnvName, tc.hostEnv) - os.Setenv(autoPortEnvName, tc.portEnv) - - // Test the error - writer, err := newUDPWriter(tc.addrParam) - if tc.expectedErr == nil { - if err != nil { - t.Errorf("Unexepected error while getting writer: %s", err) - } - } else { - if err == nil || tc.expectedErr.Error() != err.Error() { - t.Errorf("Unexepected error %q, got %q", tc.expectedErr, err) - } - } - - if writer == nil { - if tc.expectedAddr != "" { - t.Error("Nil writer while we were expecting a valid one") - } - - // Do not test for the addr if writer is nil - continue - } - - if writer.remoteAddr().String() != tc.expectedAddr { - t.Errorf("Expected %q, got %q", tc.expectedAddr, writer.remoteAddr().String()) - } - writer.Close() - } -}