Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

statsd: add support for Windows Pipes #182

Merged
merged 5 commits into from Feb 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 9 additions & 0 deletions statsd/pipe.go
@@ -0,0 +1,9 @@
// +build !windows

package statsd

import "errors"

func newWindowsPipeWriter(pipepath string) (statsdWriter, error) {
return nil, errors.New("Windows Named Pipes are only supported on Windows")
}
22 changes: 22 additions & 0 deletions statsd/pipe_windows.go
@@ -0,0 +1,22 @@
// +build windows

package statsd

import (
"errors"
"net"
"time"

"github.com/Microsoft/go-winio"
)

type pipeWriter struct{ net.Conn }

func (pipeWriter) SetWriteTimeout(_ time.Duration) error {
return errors.New("SetWriteTimeout is not supported on Windows Named Pipes")
}

func newWindowsPipeWriter(pipepath string) (*pipeWriter, error) {
conn, err := winio.DialPipe(pipepath, nil)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if the connexion is temporary lost between the client and the server?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolved in #185

return &pipeWriter{conn}, err
}
54 changes: 54 additions & 0 deletions statsd/pipe_windows_test.go
@@ -0,0 +1,54 @@
// +build windows

package statsd

import (
"io/ioutil"
"log"
"os"
"testing"

"github.com/Microsoft/go-winio"
)

func TestPipeWriter(t *testing.T) {
f, err := ioutil.TempFile("", "test-pipe-")
if err != nil {
log.Fatal(err)
}
defer os.Remove(f.Name())
pipepath := WindowsPipeAddressPrefix + f.Name()
ln, err := winio.ListenPipe(pipepath, &winio.PipeConfig{
SecurityDescriptor: "D:AI(A;;GA;;;WD)",
InputBufferSize: 1_000_000,
})
if err != nil {
log.Fatal(err)
}
out := make(chan string)
go func() {
conn, err := ln.Accept()
if err != nil {
log.Fatal(err)
}
buf := make([]byte, 512)
n, err := conn.Read(buf)
if err != nil {
log.Fatal(err)
}
conn.Close()
out <- string(buf[:n])
}()

client, err := New(pipepath)
if err != nil {
log.Fatal(err)
}
if err := client.Gauge("metric", 1, []string{"key:val"}, 1); err != nil {
log.Fatal(err)
}
got := <-out
if exp := "metric:1|g|#key:val"; got != exp {
t.Fatalf("Expected %q, got %q", exp, got)
}
}
30 changes: 21 additions & 9 deletions statsd/statsd.go
Expand Up @@ -55,6 +55,12 @@ traffic instead of UDP.
*/
const UnixAddressPrefix = "unix://"

/*
WindowsPipeAddressPrefix holds the prefix to use to enable Windows Named Pipes
traffic instead of UDP.
*/
const WindowsPipeAddressPrefix = `\\.\pipe\`
gbbr marked this conversation as resolved.
Show resolved Hide resolved

/*
ddEnvTagsMapping is a mapping of each "DD_" prefixed environment variable
to a specific tag name.
Expand Down Expand Up @@ -94,8 +100,9 @@ const (
)

const (
WriterNameUDP string = "udp"
WriterNameUDS string = "uds"
WriterNameUDP string = "udp"
WriterNameUDS string = "uds"
WriterWindowsPipe string = "pipe"
gbbr marked this conversation as resolved.
Show resolved Hide resolved
)

type metric struct {
Expand Down Expand Up @@ -222,17 +229,21 @@ type ClientMetrics struct {
var _ ClientInterface = &Client{}

func resolveAddr(addr string) (statsdWriter, string, error) {
if !strings.HasPrefix(addr, UnixAddressPrefix) {
switch {
case strings.HasPrefix(addr, WindowsPipeAddressPrefix):
w, err := newWindowsPipeWriter(addr)
return w, WriterWindowsPipe, err
case strings.HasPrefix(addr, UnixAddressPrefix):
w, err := newUDSWriter(addr[len(UnixAddressPrefix):])
return w, WriterNameUDS, err
default:
w, err := newUDPWriter(addr)
return w, WriterNameUDP, err
}

w, err := newUDSWriter(addr[len(UnixAddressPrefix):])
return w, WriterNameUDS, err
}

// New returns a pointer to a new Client given an addr in the format "hostname:port" or
// "unix:///path/to/socket".
// New returns a pointer to a new Client given an addr in the format "hostname:port" for UDP,
// "unix:///path/to/socket" for UDS or "\\.\pipe\path\to\pipe" for Windows Named Pipes.
func New(addr string, options ...Option) (*Client, error) {
o, err := resolveOptions(options)
if err != nil {
Expand Down Expand Up @@ -367,7 +378,8 @@ func NewBuffered(addr string, buflen int) (*Client, error) {
return New(addr, WithMaxMessagesPerPayload(buflen))
}

// SetWriteTimeout allows the user to set a custom UDS write timeout. Not supported for UDP.
// SetWriteTimeout allows the user to set a custom UDS write timeout. Not supported for UDP
// or Windows Pipes.
func (c *Client) SetWriteTimeout(d time.Duration) error {
if c == nil {
return ErrNoClient
Expand Down