From 79f7121297707957d9b53696be20f55954a2a878 Mon Sep 17 00:00:00 2001 From: Gabriel Aszalos Date: Thu, 11 Feb 2021 15:47:23 +0200 Subject: [PATCH] statsd: ensure Windows Pipe writer tries to reconnect when losing connection (#185) This changes ensures that the Windows Pipe statsdWriter attempts reconnecting after losing a connection. --- statsd/pipe_windows.go | 72 ++++++++++++++++++++++-- statsd/pipe_windows_test.go | 108 ++++++++++++++++++++++++++++++------ 2 files changed, 157 insertions(+), 23 deletions(-) diff --git a/statsd/pipe_windows.go b/statsd/pipe_windows.go index a450e537..c820318b 100644 --- a/statsd/pipe_windows.go +++ b/statsd/pipe_windows.go @@ -3,20 +3,82 @@ package statsd import ( - "errors" "net" + "sync" "time" "github.com/Microsoft/go-winio" ) -type pipeWriter struct{ net.Conn } +const defaultPipeTimeout = 1 * time.Millisecond -func (pipeWriter) SetWriteTimeout(_ time.Duration) error { - return errors.New("SetWriteTimeout is not supported on Windows Named Pipes") +type pipeWriter struct { + mu sync.RWMutex + conn net.Conn + timeout time.Duration + pipepath string +} + +func (p *pipeWriter) SetWriteTimeout(d time.Duration) error { + p.mu.Lock() + p.timeout = d + p.mu.Unlock() + return nil +} + +func (p *pipeWriter) Write(data []byte) (n int, err error) { + conn, err := p.ensureConnection() + if err != nil { + return 0, err + } + + p.mu.RLock() + conn.SetWriteDeadline(time.Now().Add(p.timeout)) + p.mu.RUnlock() + + n, err = conn.Write(data) + if err != nil { + if e, ok := err.(net.Error); !ok || !e.Temporary() { + // disconnected; retry again on next attempt + p.mu.Lock() + p.conn = nil + p.mu.Unlock() + } + } + return n, err +} + +func (p *pipeWriter) ensureConnection() (net.Conn, error) { + p.mu.RLock() + conn := p.conn + p.mu.RUnlock() + if conn != nil { + return conn, nil + } + + // looks like we might need to connect - try again with write locking. + p.mu.Lock() + defer p.mu.Unlock() + if p.conn != nil { + return p.conn, nil + } + newconn, err := winio.DialPipe(p.pipepath, nil) + if err != nil { + return nil, err + } + p.conn = newconn + return newconn, nil +} + +func (p *pipeWriter) Close() error { + return p.conn.Close() } func newWindowsPipeWriter(pipepath string) (*pipeWriter, error) { conn, err := winio.DialPipe(pipepath, nil) - return &pipeWriter{conn}, err + return &pipeWriter{ + conn: conn, + timeout: defaultPipeTimeout, + pipepath: pipepath, + }, err } diff --git a/statsd/pipe_windows_test.go b/statsd/pipe_windows_test.go index f30a542b..f32e2d01 100644 --- a/statsd/pipe_windows_test.go +++ b/statsd/pipe_windows_test.go @@ -4,17 +4,34 @@ package statsd import ( "io/ioutil" - "log" + "net" "os" "testing" + "time" "github.com/Microsoft/go-winio" ) +// acceptOne accepts one single connection from ln, reads 512 bytes from it +// and sends it to the out channel, afterwards closing the connection. +func acceptOne(t *testing.T, ln net.Listener, out chan string) { + conn, err := ln.Accept() + if err != nil { + t.Fatal(err) + } + buf := make([]byte, 512) + n, err := conn.Read(buf) + if err != nil { + t.Fatal(err) + } + conn.Close() + out <- string(buf[:n]) +} + func TestPipeWriter(t *testing.T) { f, err := ioutil.TempFile("", "test-pipe-") if err != nil { - log.Fatal(err) + t.Fatal(err) } defer os.Remove(f.Name()) pipepath := WindowsPipeAddressPrefix + f.Name() @@ -23,32 +40,87 @@ func TestPipeWriter(t *testing.T) { InputBufferSize: 1_000_000, }) if err != nil { - log.Fatal(err) + t.Fatal(err) } out := make(chan string) - go func() { - conn, err := ln.Accept() - if err != nil { - log.Fatal(err) - } - buf := make([]byte, 512) - n, err := conn.Read(buf) - if err != nil { - log.Fatal(err) - } - conn.Close() - out <- string(buf[:n]) - }() + go acceptOne(t, ln, out) client, err := New(pipepath) if err != nil { - log.Fatal(err) + t.Fatal(err) } if err := client.Gauge("metric", 1, []string{"key:val"}, 1); err != nil { - log.Fatal(err) + t.Fatal(err) } got := <-out if exp := "metric:1|g|#key:val"; got != exp { t.Fatalf("Expected %q, got %q", exp, got) } } + +func TestPipeWriterReconnect(t *testing.T) { + f, err := ioutil.TempFile("", "test-pipe-") + if err != nil { + t.Fatal(err) + } + defer os.Remove(f.Name()) + pipepath := WindowsPipeAddressPrefix + f.Name() + ln, err := winio.ListenPipe(pipepath, &winio.PipeConfig{ + SecurityDescriptor: "D:AI(A;;GA;;;WD)", + InputBufferSize: 1_000_000, + }) + if err != nil { + t.Fatalf("Listen: %s", err) + } + out := make(chan string) + go acceptOne(t, ln, out) + client, err := New(pipepath) + if err != nil { + t.Fatalf("New: %s", err) + } + + // first attempt works, then connection closes + if err := client.Gauge("metric", 1, []string{"key:val"}, 1); err != nil { + t.Fatalf("Failed to send gauge: %s", err) + } + timeout := time.After(1 * time.Second) + select { + case got := <-out: + if exp := "metric:1|g|#key:val"; got != exp { + t.Fatalf("Expected %q, got %q", exp, got) + } + case <-timeout: + t.Fatal("timeout1") + } + + // second attempt fails by attempting the same connection + go acceptOne(t, ln, out) + if err := client.Gauge("metric", 2, []string{"key:val"}, 1); err != nil { + t.Fatalf("Failed to send second gauge: %s", err) + } + timeout = time.After(100 * time.Millisecond) + select { + case <-out: + t.Fatal("Second attempt should have timed out") + case <-timeout: + // ok + } + + // subsequent attempts succeed with new connection + for n := 0; n < 3; n++ { + if err := client.Gauge("metric", 3, []string{"key:val"}, 1); err != nil { + t.Fatalf("Failed to send second gauge: %s", err) + } + timeout = time.After(500 * time.Millisecond) + select { + case got := <-out: + if exp := "metric:3|g|#key:val"; got != exp { + t.Fatalf("Expected %q, got %q", exp, got) + } + return + case <-timeout: + continue + } + } + t.Fatal("failed to reconnect") +}