Skip to content

Commit

Permalink
statsd: add support for Windows Pipes (#182)
Browse files Browse the repository at this point in the history
This change adds support for Windows Named Pipes. They may be used when
this option is [enabled in the Datadog
Agent](DataDog/datadog-agent#6830).

Co-authored-by: Lucas Pimentel-Ordyna <lucas.pimentel@datadoghq.com>
  • Loading branch information
gbbr and lucaspimentel committed Feb 10, 2021
1 parent c869ea2 commit 2c5dbc8
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 9 deletions.
9 changes: 9 additions & 0 deletions statsd/pipe.go
@@ -0,0 +1,9 @@
// +build !windows

package statsd

import "errors"

func newWindowsPipeWriter(pipepath string) (statsdWriter, error) {
return nil, errors.New("Windows Named Pipes are only supported on Windows")
}
22 changes: 22 additions & 0 deletions statsd/pipe_windows.go
@@ -0,0 +1,22 @@
// +build windows

package statsd

import (
"errors"
"net"
"time"

"github.com/Microsoft/go-winio"
)

type pipeWriter struct{ net.Conn }

func (pipeWriter) SetWriteTimeout(_ time.Duration) error {
return errors.New("SetWriteTimeout is not supported on Windows Named Pipes")
}

func newWindowsPipeWriter(pipepath string) (*pipeWriter, error) {
conn, err := winio.DialPipe(pipepath, nil)
return &pipeWriter{conn}, err
}
54 changes: 54 additions & 0 deletions statsd/pipe_windows_test.go
@@ -0,0 +1,54 @@
// +build windows

package statsd

import (
"io/ioutil"
"log"
"os"
"testing"

"github.com/Microsoft/go-winio"
)

func TestPipeWriter(t *testing.T) {
f, err := ioutil.TempFile("", "test-pipe-")
if err != nil {
log.Fatal(err)
}
defer os.Remove(f.Name())
pipepath := WindowsPipeAddressPrefix + f.Name()
ln, err := winio.ListenPipe(pipepath, &winio.PipeConfig{
SecurityDescriptor: "D:AI(A;;GA;;;WD)",
InputBufferSize: 1_000_000,
})
if err != nil {
log.Fatal(err)
}
out := make(chan string)
go func() {
conn, err := ln.Accept()
if err != nil {
log.Fatal(err)
}
buf := make([]byte, 512)
n, err := conn.Read(buf)
if err != nil {
log.Fatal(err)
}
conn.Close()
out <- string(buf[:n])
}()

client, err := New(pipepath)
if err != nil {
log.Fatal(err)
}
if err := client.Gauge("metric", 1, []string{"key:val"}, 1); err != nil {
log.Fatal(err)
}
got := <-out
if exp := "metric:1|g|#key:val"; got != exp {
t.Fatalf("Expected %q, got %q", exp, got)
}
}
30 changes: 21 additions & 9 deletions statsd/statsd.go
Expand Up @@ -55,6 +55,12 @@ traffic instead of UDP.
*/
const UnixAddressPrefix = "unix://"

/*
WindowsPipeAddressPrefix holds the prefix to use to enable Windows Named Pipes
traffic instead of UDP.
*/
const WindowsPipeAddressPrefix = `\\.\pipe\`

/*
ddEnvTagsMapping is a mapping of each "DD_" prefixed environment variable
to a specific tag name.
Expand Down Expand Up @@ -94,8 +100,9 @@ const (
)

const (
WriterNameUDP string = "udp"
WriterNameUDS string = "uds"
WriterNameUDP string = "udp"
WriterNameUDS string = "uds"
WriterWindowsPipe string = "pipe"
)

type metric struct {
Expand Down Expand Up @@ -222,17 +229,21 @@ type ClientMetrics struct {
var _ ClientInterface = &Client{}

func resolveAddr(addr string) (statsdWriter, string, error) {
if !strings.HasPrefix(addr, UnixAddressPrefix) {
switch {
case strings.HasPrefix(addr, WindowsPipeAddressPrefix):
w, err := newWindowsPipeWriter(addr)
return w, WriterWindowsPipe, err
case strings.HasPrefix(addr, UnixAddressPrefix):
w, err := newUDSWriter(addr[len(UnixAddressPrefix):])
return w, WriterNameUDS, err
default:
w, err := newUDPWriter(addr)
return w, WriterNameUDP, err
}

w, err := newUDSWriter(addr[len(UnixAddressPrefix):])
return w, WriterNameUDS, err
}

// New returns a pointer to a new Client given an addr in the format "hostname:port" or
// "unix:///path/to/socket".
// New returns a pointer to a new Client given an addr in the format "hostname:port" for UDP,
// "unix:///path/to/socket" for UDS or "\\.\pipe\path\to\pipe" for Windows Named Pipes.
func New(addr string, options ...Option) (*Client, error) {
o, err := resolveOptions(options)
if err != nil {
Expand Down Expand Up @@ -367,7 +378,8 @@ func NewBuffered(addr string, buflen int) (*Client, error) {
return New(addr, WithMaxMessagesPerPayload(buflen))
}

// SetWriteTimeout allows the user to set a custom UDS write timeout. Not supported for UDP.
// SetWriteTimeout allows the user to set a custom UDS write timeout. Not supported for UDP
// or Windows Pipes.
func (c *Client) SetWriteTimeout(d time.Duration) error {
if c == nil {
return ErrNoClient
Expand Down

0 comments on commit 2c5dbc8

Please sign in to comment.