Skip to content

Commit

Permalink
statsd: ensure Windows Pipe writer tries to reconnect when losing con…
Browse files Browse the repository at this point in the history
…nection (#185)

This changes ensures that the Windows Pipe statsdWriter attempts
reconnecting after losing a connection.
  • Loading branch information
gbbr committed Feb 11, 2021
1 parent 2c5dbc8 commit 79f7121
Show file tree
Hide file tree
Showing 2 changed files with 157 additions and 23 deletions.
72 changes: 67 additions & 5 deletions statsd/pipe_windows.go
Expand Up @@ -3,20 +3,82 @@
package statsd

import (
"errors"
"net"
"sync"
"time"

"github.com/Microsoft/go-winio"
)

type pipeWriter struct{ net.Conn }
const defaultPipeTimeout = 1 * time.Millisecond

func (pipeWriter) SetWriteTimeout(_ time.Duration) error {
return errors.New("SetWriteTimeout is not supported on Windows Named Pipes")
type pipeWriter struct {
mu sync.RWMutex
conn net.Conn
timeout time.Duration
pipepath string
}

func (p *pipeWriter) SetWriteTimeout(d time.Duration) error {
p.mu.Lock()
p.timeout = d
p.mu.Unlock()
return nil
}

func (p *pipeWriter) Write(data []byte) (n int, err error) {
conn, err := p.ensureConnection()
if err != nil {
return 0, err
}

p.mu.RLock()
conn.SetWriteDeadline(time.Now().Add(p.timeout))
p.mu.RUnlock()

n, err = conn.Write(data)
if err != nil {
if e, ok := err.(net.Error); !ok || !e.Temporary() {
// disconnected; retry again on next attempt
p.mu.Lock()
p.conn = nil
p.mu.Unlock()
}
}
return n, err
}

func (p *pipeWriter) ensureConnection() (net.Conn, error) {
p.mu.RLock()
conn := p.conn
p.mu.RUnlock()
if conn != nil {
return conn, nil
}

// looks like we might need to connect - try again with write locking.
p.mu.Lock()
defer p.mu.Unlock()
if p.conn != nil {
return p.conn, nil
}
newconn, err := winio.DialPipe(p.pipepath, nil)
if err != nil {
return nil, err
}
p.conn = newconn
return newconn, nil
}

func (p *pipeWriter) Close() error {
return p.conn.Close()
}

func newWindowsPipeWriter(pipepath string) (*pipeWriter, error) {
conn, err := winio.DialPipe(pipepath, nil)
return &pipeWriter{conn}, err
return &pipeWriter{
conn: conn,
timeout: defaultPipeTimeout,
pipepath: pipepath,
}, err
}
108 changes: 90 additions & 18 deletions statsd/pipe_windows_test.go
Expand Up @@ -4,17 +4,34 @@ package statsd

import (
"io/ioutil"
"log"
"net"
"os"
"testing"
"time"

"github.com/Microsoft/go-winio"
)

// acceptOne accepts one single connection from ln, reads 512 bytes from it
// and sends it to the out channel, afterwards closing the connection.
func acceptOne(t *testing.T, ln net.Listener, out chan string) {
conn, err := ln.Accept()
if err != nil {
t.Fatal(err)
}
buf := make([]byte, 512)
n, err := conn.Read(buf)
if err != nil {
t.Fatal(err)
}
conn.Close()
out <- string(buf[:n])
}

func TestPipeWriter(t *testing.T) {
f, err := ioutil.TempFile("", "test-pipe-")
if err != nil {
log.Fatal(err)
t.Fatal(err)
}
defer os.Remove(f.Name())
pipepath := WindowsPipeAddressPrefix + f.Name()
Expand All @@ -23,32 +40,87 @@ func TestPipeWriter(t *testing.T) {
InputBufferSize: 1_000_000,
})
if err != nil {
log.Fatal(err)
t.Fatal(err)
}
out := make(chan string)
go func() {
conn, err := ln.Accept()
if err != nil {
log.Fatal(err)
}
buf := make([]byte, 512)
n, err := conn.Read(buf)
if err != nil {
log.Fatal(err)
}
conn.Close()
out <- string(buf[:n])
}()
go acceptOne(t, ln, out)

client, err := New(pipepath)
if err != nil {
log.Fatal(err)
t.Fatal(err)
}
if err := client.Gauge("metric", 1, []string{"key:val"}, 1); err != nil {
log.Fatal(err)
t.Fatal(err)
}
got := <-out
if exp := "metric:1|g|#key:val"; got != exp {
t.Fatalf("Expected %q, got %q", exp, got)
}
}

func TestPipeWriterReconnect(t *testing.T) {
f, err := ioutil.TempFile("", "test-pipe-")
if err != nil {
t.Fatal(err)
}
defer os.Remove(f.Name())
pipepath := WindowsPipeAddressPrefix + f.Name()
ln, err := winio.ListenPipe(pipepath, &winio.PipeConfig{
SecurityDescriptor: "D:AI(A;;GA;;;WD)",
InputBufferSize: 1_000_000,
})
if err != nil {
t.Fatalf("Listen: %s", err)
}
out := make(chan string)
go acceptOne(t, ln, out)
client, err := New(pipepath)
if err != nil {
t.Fatalf("New: %s", err)
}

// first attempt works, then connection closes
if err := client.Gauge("metric", 1, []string{"key:val"}, 1); err != nil {
t.Fatalf("Failed to send gauge: %s", err)
}
timeout := time.After(1 * time.Second)
select {
case got := <-out:
if exp := "metric:1|g|#key:val"; got != exp {
t.Fatalf("Expected %q, got %q", exp, got)
}
case <-timeout:
t.Fatal("timeout1")
}

// second attempt fails by attempting the same connection
go acceptOne(t, ln, out)
if err := client.Gauge("metric", 2, []string{"key:val"}, 1); err != nil {
t.Fatalf("Failed to send second gauge: %s", err)
}
timeout = time.After(100 * time.Millisecond)
select {
case <-out:
t.Fatal("Second attempt should have timed out")
case <-timeout:
// ok
}

// subsequent attempts succeed with new connection
for n := 0; n < 3; n++ {
if err := client.Gauge("metric", 3, []string{"key:val"}, 1); err != nil {
t.Fatalf("Failed to send second gauge: %s", err)
}
timeout = time.After(500 * time.Millisecond)
select {
case got := <-out:
if exp := "metric:3|g|#key:val"; got != exp {
t.Fatalf("Expected %q, got %q", exp, got)
}
return
case <-timeout:
continue
}
}
t.Fatal("failed to reconnect")
}

0 comments on commit 79f7121

Please sign in to comment.