Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

statsd: ensure Windows Pipe writer tries to reconnect when losing connection #185

Merged
merged 4 commits into from Feb 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
72 changes: 67 additions & 5 deletions statsd/pipe_windows.go
Expand Up @@ -3,20 +3,82 @@
package statsd

import (
"errors"
"net"
"sync"
"time"

"github.com/Microsoft/go-winio"
)

type pipeWriter struct{ net.Conn }
const defaultPipeTimeout = 1 * time.Millisecond

func (pipeWriter) SetWriteTimeout(_ time.Duration) error {
return errors.New("SetWriteTimeout is not supported on Windows Named Pipes")
type pipeWriter struct {
mu sync.RWMutex
conn net.Conn
timeout time.Duration
pipepath string
}

func (p *pipeWriter) SetWriteTimeout(d time.Duration) error {
p.mu.Lock()
p.timeout = d
p.mu.Unlock()
return nil
}

func (p *pipeWriter) Write(data []byte) (n int, err error) {
conn, err := p.ensureConnection()
if err != nil {
return 0, err
}

p.mu.RLock()
conn.SetWriteDeadline(time.Now().Add(p.timeout))
p.mu.RUnlock()

n, err = conn.Write(data)
gbbr marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
if e, ok := err.(net.Error); !ok || !e.Temporary() {
// disconnected; retry again on next attempt
gbbr marked this conversation as resolved.
Show resolved Hide resolved
p.mu.Lock()
p.conn = nil
p.mu.Unlock()
}
}
return n, err
}

func (p *pipeWriter) ensureConnection() (net.Conn, error) {
p.mu.RLock()
conn := p.conn
p.mu.RUnlock()
if conn != nil {
return conn, nil
}

// looks like we might need to connect - try again with write locking.
p.mu.Lock()
defer p.mu.Unlock()
if p.conn != nil {
return p.conn, nil
}
newconn, err := winio.DialPipe(p.pipepath, nil)
if err != nil {
return nil, err
}
p.conn = newconn
return newconn, nil
}

func (p *pipeWriter) Close() error {
return p.conn.Close()
}

func newWindowsPipeWriter(pipepath string) (*pipeWriter, error) {
conn, err := winio.DialPipe(pipepath, nil)
return &pipeWriter{conn}, err
return &pipeWriter{
conn: conn,
timeout: defaultPipeTimeout,
pipepath: pipepath,
}, err
}
108 changes: 90 additions & 18 deletions statsd/pipe_windows_test.go
Expand Up @@ -4,17 +4,34 @@ package statsd

import (
"io/ioutil"
"log"
"net"
"os"
"testing"
"time"

"github.com/Microsoft/go-winio"
)

// acceptOne accepts one single connection from ln, reads 512 bytes from it
// and sends it to the out channel, afterwards closing the connection.
func acceptOne(t *testing.T, ln net.Listener, out chan string) {
gbbr marked this conversation as resolved.
Show resolved Hide resolved
conn, err := ln.Accept()
if err != nil {
t.Fatal(err)
}
buf := make([]byte, 512)
n, err := conn.Read(buf)
if err != nil {
t.Fatal(err)
}
conn.Close()
out <- string(buf[:n])
}

func TestPipeWriter(t *testing.T) {
f, err := ioutil.TempFile("", "test-pipe-")
if err != nil {
log.Fatal(err)
t.Fatal(err)
}
defer os.Remove(f.Name())
pipepath := WindowsPipeAddressPrefix + f.Name()
Expand All @@ -23,32 +40,87 @@ func TestPipeWriter(t *testing.T) {
InputBufferSize: 1_000_000,
})
if err != nil {
log.Fatal(err)
t.Fatal(err)
}
out := make(chan string)
go func() {
conn, err := ln.Accept()
if err != nil {
log.Fatal(err)
}
buf := make([]byte, 512)
n, err := conn.Read(buf)
if err != nil {
log.Fatal(err)
}
conn.Close()
out <- string(buf[:n])
}()
go acceptOne(t, ln, out)

client, err := New(pipepath)
if err != nil {
log.Fatal(err)
t.Fatal(err)
}
if err := client.Gauge("metric", 1, []string{"key:val"}, 1); err != nil {
log.Fatal(err)
t.Fatal(err)
}
got := <-out
if exp := "metric:1|g|#key:val"; got != exp {
t.Fatalf("Expected %q, got %q", exp, got)
}
}

func TestPipeWriterReconnect(t *testing.T) {
f, err := ioutil.TempFile("", "test-pipe-")
if err != nil {
t.Fatal(err)
}
defer os.Remove(f.Name())
gbbr marked this conversation as resolved.
Show resolved Hide resolved
pipepath := WindowsPipeAddressPrefix + f.Name()
ln, err := winio.ListenPipe(pipepath, &winio.PipeConfig{
SecurityDescriptor: "D:AI(A;;GA;;;WD)",
InputBufferSize: 1_000_000,
})
if err != nil {
t.Fatalf("Listen: %s", err)
}
out := make(chan string)
go acceptOne(t, ln, out)
client, err := New(pipepath)
if err != nil {
t.Fatalf("New: %s", err)
}

// first attempt works, then connection closes
if err := client.Gauge("metric", 1, []string{"key:val"}, 1); err != nil {
t.Fatalf("Failed to send gauge: %s", err)
}
timeout := time.After(1 * time.Second)
select {
case got := <-out:
if exp := "metric:1|g|#key:val"; got != exp {
t.Fatalf("Expected %q, got %q", exp, got)
}
case <-timeout:
t.Fatal("timeout1")
}

// second attempt fails by attempting the same connection
go acceptOne(t, ln, out)
if err := client.Gauge("metric", 2, []string{"key:val"}, 1); err != nil {
t.Fatalf("Failed to send second gauge: %s", err)
}
timeout = time.After(100 * time.Millisecond)
select {
case <-out:
t.Fatal("Second attempt should have timed out")
case <-timeout:
// ok
}

// subsequent attempts succeed with new connection
for n := 0; n < 3; n++ {
if err := client.Gauge("metric", 3, []string{"key:val"}, 1); err != nil {
t.Fatalf("Failed to send second gauge: %s", err)
}
timeout = time.After(500 * time.Millisecond)
select {
case got := <-out:
if exp := "metric:3|g|#key:val"; got != exp {
t.Fatalf("Expected %q, got %q", exp, got)
}
return
case <-timeout:
continue
}
}
t.Fatal("failed to reconnect")
}