Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refresh udp ip at some cadence #280

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

- [FEATURE] Add `WithMaxSamplesPerContext()` option to limit the number of samples per context. See [#292][].
- [BUGFIX] Fix the `rate` of distributions and histograms when using client side aggregation. See [#283][].
- [FEATURE] Add `WithUDPAddrRefreshRate(rate time.Duration)` option to refresh the UDP address at a given interval. See [#280][].

# 5.3.0 / 2023-03-06

Expand Down
13 changes: 13 additions & 0 deletions statsd/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ var (
defaultOriginDetection = true
defaultChannelModeErrorsWhenFull = false
defaultErrorHandler = func(error) {}
defaultUDPAddrRefreshRate = 0 * time.Second
)

// Options contains the configuration options for a client.
Expand All @@ -52,6 +53,7 @@ type Options struct {
containerID string
channelModeErrorsWhenFull bool
errorHandler ErrorHandler
udpAddrRefreshRate time.Duration
}

func resolveOptions(options []Option) (*Options, error) {
Expand All @@ -75,6 +77,7 @@ func resolveOptions(options []Option) (*Options, error) {
originDetection: defaultOriginDetection,
channelModeErrorsWhenFull: defaultChannelModeErrorsWhenFull,
errorHandler: defaultErrorHandler,
udpAddrRefreshRate: defaultUDPAddrRefreshRate,
}

for _, option := range options {
Expand Down Expand Up @@ -396,3 +399,13 @@ func WithContainerID(id string) Option {
return nil
}
}

// WithUDPAddrRefreshRate sets the interval at which the client refreshes the UDP address.
// This is useful when using the Agent's address may change during deployments without a fixed IP.
// A value of 0 disables the refresh.
func WithUDPAddrRefreshRate(rate time.Duration) Option {
return func(o *Options) error {
o.udpAddrRefreshRate = rate
return nil
}
}
8 changes: 4 additions & 4 deletions statsd/statsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ func parseAgentURL(agentURL string) string {
return ""
}

func createWriter(addr string, writeTimeout time.Duration) (io.WriteCloser, string, error) {
func createWriter(addr string, writeTimeout time.Duration, udpAddrRefreshRate time.Duration) (io.WriteCloser, string, error) {
addr = resolveAddr(addr)
if addr == "" {
return nil, "", errors.New("No address passed and autodetection from environment failed")
Expand All @@ -383,7 +383,7 @@ func createWriter(addr string, writeTimeout time.Duration) (io.WriteCloser, stri
w, err := newUDSWriter(addr[len(UnixAddressStreamPrefix):], writeTimeout, "unix")
return w, writerNameUDS, err
default:
w, err := newUDPWriter(addr, writeTimeout)
w, err := newUDPWriter(addr, writeTimeout, udpAddrRefreshRate)
return w, writerNameUDP, err
}
}
Expand All @@ -396,7 +396,7 @@ func New(addr string, options ...Option) (*Client, error) {
return nil, err
}

w, writerType, err := createWriter(addr, o.writeTimeout)
w, writerType, err := createWriter(addr, o.writeTimeout, o.udpAddrRefreshRate)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -527,7 +527,7 @@ func newWithWriter(w io.WriteCloser, o *Options, writerName string) (*Client, er
c.telemetryClient = newTelemetryClient(&c, writerName, c.agg != nil)
} else {
var err error
c.telemetryClient, err = newTelemetryClientWithCustomAddr(&c, writerName, o.telemetryAddr, c.agg != nil, bufferPool, o.writeTimeout)
c.telemetryClient, err = newTelemetryClientWithCustomAddr(&c, writerName, o.telemetryAddr, c.agg != nil, bufferPool, o.writeTimeout, o.udpAddrRefreshRate)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions statsd/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,8 @@ func newTelemetryClient(c *Client, transport string, aggregationEnabled bool) *t
return t
}

func newTelemetryClientWithCustomAddr(c *Client, transport string, telemetryAddr string, aggregationEnabled bool, pool *bufferPool, writeTimeout time.Duration) (*telemetryClient, error) {
telemetryWriter, _, err := createWriter(telemetryAddr, writeTimeout)
func newTelemetryClientWithCustomAddr(c *Client, transport string, telemetryAddr string, aggregationEnabled bool, pool *bufferPool, writeTimeout time.Duration, udpAddrRefreshRate time.Duration) (*telemetryClient, error) {
telemetryWriter, _, err := createWriter(telemetryAddr, writeTimeout, udpAddrRefreshRate)
if err != nil {
return nil, fmt.Errorf("Could not resolve telemetry address: %v", err)
}
Expand Down
59 changes: 53 additions & 6 deletions statsd/udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,33 +2,80 @@ package statsd

import (
"net"
"sync"
"time"
)

// udpWriter is an internal class wrapping around management of UDP connection
type udpWriter struct {
conn net.Conn
conn net.PacketConn
addr string
dst *dstValue
closed chan struct{}
}

type dstValue struct {
mutex sync.RWMutex
dst *net.UDPAddr
}

func (d *dstValue) set(dst *net.UDPAddr) {
d.mutex.Lock()
defer d.mutex.Unlock()
d.dst = dst
}

func (d *dstValue) get() *net.UDPAddr {
d.mutex.RLock()
defer d.mutex.RUnlock()
return d.dst
}

// New returns a pointer to a new udpWriter given an addr in the format "hostname:port".
func newUDPWriter(addr string, _ time.Duration) (*udpWriter, error) {
udpAddr, err := net.ResolveUDPAddr("udp", addr)
func newUDPWriter(addr string, _ time.Duration, refreshRate time.Duration) (*udpWriter, error) {
conn, err := net.ListenPacket("udp", ":0")
if err != nil {
return nil, err
}
conn, err := net.DialUDP("udp", nil, udpAddr)
currentDst, err := getCurrentDst(addr)
if err != nil {
return nil, err
}
writer := &udpWriter{conn: conn}
dst := &dstValue{dst: currentDst}
writer := &udpWriter{conn: conn, addr: addr, dst: dst, closed: make(chan struct{})}
if refreshRate > 0 {
go writer.refreshDstLoop(refreshRate)
}
return writer, nil
}

func (w *udpWriter) refreshDstLoop(refreshRate time.Duration) {
ticker := time.NewTicker(refreshRate)
defer ticker.Stop()
for {
select {
case <-w.closed:
return
case <-ticker.C:
dst, err := getCurrentDst(w.addr)
if err != nil {
continue
}
w.dst.set(dst)
}
}
}

// Write data to the UDP connection with no error handling
func (w *udpWriter) Write(data []byte) (int, error) {
return w.conn.Write(data)
return w.conn.WriteTo(data, w.dst.get())
}

func (w *udpWriter) Close() error {
close(w.closed)
return w.conn.Close()
}

func getCurrentDst(addr string) (*net.UDPAddr, error) {
return net.ResolveUDPAddr("udp", addr)
}