Skip to content

Commit

Permalink
[feat] Periodically re-resolve UDP server address, with opt-out (#520)
Browse files Browse the repository at this point in the history
* Add resolved udp connection type, continually resolve dns names in
background

Signed-off-by: Trevor Foster <trevor.foster@hotmail.ca>

* Be sure to set buffer bytes width on new connections.

Signed-off-by: Trevor Foster <trevor.foster@hotmail.ca>

* Lock when checking if resolved addr is new.

Signed-off-by: Trevor Foster <trevor.foster@hotmail.ca>

* Fixes from review comments. Dont return error if UDPConn fails on startup

Signed-off-by: Trevor Foster <trevor.foster@hotmail.ca>

* Fix failing test. Apparently the linux kernel returns the sockopt val
doubled.

Signed-off-by: Trevor Foster <trevor.foster@hotmail.ca>

* Use atomic ops to manage bufferBytes instead of locking mutex

Signed-off-by: Trevor Foster <trevor.foster@hotmail.ca>

* Fix buffer bytes assert because sock opt value is not guaranteed to be exactly twice
set val

Signed-off-by: Trevor Foster <trevor.foster@hotmail.ca>

* Remove intermediate init helpers, initialize close chann in struct
initialization

Signed-off-by: Trevor Foster <trevor.foster@hotmail.ca>

* Fixes based on comments, more tests for udp_client.go, and test for
write retry

Signed-off-by: Trevor Foster <trevor.foster@hotmail.ca>

* Run make fmt

Signed-off-by: Trevor Foster <trevor.foster@hotmail.ca>

* Remove unused struct field

Signed-off-by: Trevor Foster <trevor.foster@hotmail.ca>

* Fix lint error

Signed-off-by: Trevor Foster <trevor.foster@hotmail.ca>

* Add test for new conn established when host record changes

Signed-off-by: Trevor Foster <trevor.foster@hotmail.ca>

* Fix comment typo

Signed-off-by: Trevor Foster <trevor.foster@hotmail.ca>

* Add test for failed write retry

Signed-off-by: Trevor Foster <trevor.foster@hotmail.ca>

* Add test calling NewAgentClientUDP

Signed-off-by: Trevor Foster <trevor.foster@hotmail.ca>

* Remove sleep on last try evaluating connection condition

Signed-off-by: Trevor Foster <trevor.foster@hotmail.ca>

* Rename resolved udp conn to reconnecting udp conn, add opt-out option
for reconnecting client and option for reconnect interval

Signed-off-by: Trevor Foster <trevor.foster@hotmail.ca>

* Remove irrelevant comment, fix transport max packet size regression

Signed-off-by: Trevor Foster <trevor.foster@hotmail.ca>

* Add panic in case the test server listen or srv fails unexpectedly

Signed-off-by: Trevor Foster <trevor.foster@hotmail.ca>

* Add coverage for new env vars

Signed-off-by: Trevor Foster <trevor.foster@hotmail.ca>

* Run make fmt

Signed-off-by: Trevor Foster <trevor.foster@hotmail.ca>

* Add back constants, add helper for generating a mock udp addr, require
all assertions of udp write to succeed

Signed-off-by: Trevor Foster <trevor.foster@hotmail.ca>

* Remove local agent constants from utils

Signed-off-by: Trevor Foster <trevor.foster@hotmail.ca>

* Move no error requirement into mock udp addr constructor

Signed-off-by: Trevor Foster <trevor.foster@hotmail.ca>
  • Loading branch information
terev committed Jul 9, 2020
1 parent b8ed773 commit 704ce28
Show file tree
Hide file tree
Showing 11 changed files with 1,023 additions and 54 deletions.
75 changes: 74 additions & 1 deletion Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions README.md
Expand Up @@ -61,6 +61,8 @@ JAEGER_PASSWORD | Password to send as part of "Basic" authentication to the coll
JAEGER_REPORTER_LOG_SPANS | Whether the reporter should also log the spans" `true` or `false` (default `false`).
JAEGER_REPORTER_MAX_QUEUE_SIZE | The reporter's maximum queue size (default `100`).
JAEGER_REPORTER_FLUSH_INTERVAL | The reporter's flush interval, with units, e.g. `500ms` or `2s` ([valid units][timeunits]; default `1s`).
JAEGER_REPORTER_ATTEMPT_RECONNECTING_DISABLED | When true, disables udp connection helper that periodically re-resolves the agent's hostname and reconnects if there was a change (default `false`).
JAEGER_REPORTER_ATTEMPT_RECONNECT_INTERVAL | Controls how often the agent client re-resolves the provided hostname in order to detect address changes ([valid units][timeunits]; default `30s`).
JAEGER_SAMPLER_TYPE | The sampler type: `remote`, `const`, `probabilistic`, `ratelimiting` (default `remote`). See also https://www.jaegertracing.io/docs/latest/sampling/.
JAEGER_SAMPLER_PARAM | The sampler parameter (number).
JAEGER_SAMPLER_MANAGER_HOST_PORT | (deprecated) The HTTP endpoint when using the `remote` sampler.
Expand Down
25 changes: 22 additions & 3 deletions config/config.go
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"github.com/opentracing/opentracing-go"
"github.com/uber/jaeger-client-go/utils"

"github.com/uber/jaeger-client-go"
"github.com/uber/jaeger-client-go/internal/baggage/remote"
Expand Down Expand Up @@ -124,6 +125,17 @@ type ReporterConfig struct {
// Can be provided by FromEnv() via the environment variable named JAEGER_AGENT_HOST / JAEGER_AGENT_PORT
LocalAgentHostPort string `yaml:"localAgentHostPort"`

// DisableAttemptReconnecting when true, disables udp connection helper that periodically re-resolves
// the agent's hostname and reconnects if there was a change. This option only
// applies if LocalAgentHostPort is specified.
// Can be provided by FromEnv() via the environment variable named JAEGER_REPORTER_ATTEMPT_RECONNECTING_DISABLED
DisableAttemptReconnecting bool `yaml:"disableAttemptReconnecting"`

// AttemptReconnectInterval controls how often the agent client re-resolves the provided hostname
// in order to detect address changes. This option only applies if DisableAttemptReconnecting is false.
// Can be provided by FromEnv() via the environment variable named JAEGER_REPORTER_ATTEMPT_RECONNECT_INTERVAL
AttemptReconnectInterval time.Duration

// CollectorEndpoint instructs reporter to send spans to jaeger-collector at this URL.
// Can be provided by FromEnv() via the environment variable named JAEGER_ENDPOINT
CollectorEndpoint string `yaml:"collectorEndpoint"`
Expand Down Expand Up @@ -384,7 +396,7 @@ func (rc *ReporterConfig) NewReporter(
metrics *jaeger.Metrics,
logger jaeger.Logger,
) (jaeger.Reporter, error) {
sender, err := rc.newTransport()
sender, err := rc.newTransport(logger)
if err != nil {
return nil, err
}
Expand All @@ -401,7 +413,7 @@ func (rc *ReporterConfig) NewReporter(
return reporter, err
}

func (rc *ReporterConfig) newTransport() (jaeger.Transport, error) {
func (rc *ReporterConfig) newTransport(logger jaeger.Logger) (jaeger.Transport, error) {
switch {
case rc.CollectorEndpoint != "":
httpOptions := []transport.HTTPOption{transport.HTTPBatchSize(1), transport.HTTPHeaders(rc.HTTPHeaders)}
Expand All @@ -410,6 +422,13 @@ func (rc *ReporterConfig) newTransport() (jaeger.Transport, error) {
}
return transport.NewHTTPTransport(rc.CollectorEndpoint, httpOptions...), nil
default:
return jaeger.NewUDPTransport(rc.LocalAgentHostPort, 0)
return jaeger.NewUDPTransportWithParams(jaeger.UDPTransportParams{
AgentClientUDPParams: utils.AgentClientUDPParams{
HostPort: rc.LocalAgentHostPort,
Logger: logger,
DisableAttemptReconnecting: rc.DisableAttemptReconnecting,
AttemptReconnectInterval: rc.AttemptReconnectInterval,
},
})
}
}
57 changes: 38 additions & 19 deletions config/config_env.go
Expand Up @@ -24,30 +24,31 @@ import (

"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"

"github.com/uber/jaeger-client-go"
)

const (
// environment variable names
envServiceName = "JAEGER_SERVICE_NAME"
envDisabled = "JAEGER_DISABLED"
envRPCMetrics = "JAEGER_RPC_METRICS"
envTags = "JAEGER_TAGS"
envSamplerType = "JAEGER_SAMPLER_TYPE"
envSamplerParam = "JAEGER_SAMPLER_PARAM"
envSamplerManagerHostPort = "JAEGER_SAMPLER_MANAGER_HOST_PORT" // Deprecated by envSamplingEndpoint
envSamplingEndpoint = "JAEGER_SAMPLING_ENDPOINT"
envSamplerMaxOperations = "JAEGER_SAMPLER_MAX_OPERATIONS"
envSamplerRefreshInterval = "JAEGER_SAMPLER_REFRESH_INTERVAL"
envReporterMaxQueueSize = "JAEGER_REPORTER_MAX_QUEUE_SIZE"
envReporterFlushInterval = "JAEGER_REPORTER_FLUSH_INTERVAL"
envReporterLogSpans = "JAEGER_REPORTER_LOG_SPANS"
envEndpoint = "JAEGER_ENDPOINT"
envUser = "JAEGER_USER"
envPassword = "JAEGER_PASSWORD"
envAgentHost = "JAEGER_AGENT_HOST"
envAgentPort = "JAEGER_AGENT_PORT"
envServiceName = "JAEGER_SERVICE_NAME"
envDisabled = "JAEGER_DISABLED"
envRPCMetrics = "JAEGER_RPC_METRICS"
envTags = "JAEGER_TAGS"
envSamplerType = "JAEGER_SAMPLER_TYPE"
envSamplerParam = "JAEGER_SAMPLER_PARAM"
envSamplerManagerHostPort = "JAEGER_SAMPLER_MANAGER_HOST_PORT" // Deprecated by envSamplingEndpoint
envSamplingEndpoint = "JAEGER_SAMPLING_ENDPOINT"
envSamplerMaxOperations = "JAEGER_SAMPLER_MAX_OPERATIONS"
envSamplerRefreshInterval = "JAEGER_SAMPLER_REFRESH_INTERVAL"
envReporterMaxQueueSize = "JAEGER_REPORTER_MAX_QUEUE_SIZE"
envReporterFlushInterval = "JAEGER_REPORTER_FLUSH_INTERVAL"
envReporterLogSpans = "JAEGER_REPORTER_LOG_SPANS"
envReporterAttemptReconnectingDisabled = "JAEGER_REPORTER_ATTEMPT_RECONNECTING_DISABLED"
envReporterAttemptReconnectInterval = "JAEGER_REPORTER_ATTEMPT_RECONNECT_INTERVAL"
envEndpoint = "JAEGER_ENDPOINT"
envUser = "JAEGER_USER"
envPassword = "JAEGER_PASSWORD"
envAgentHost = "JAEGER_AGENT_HOST"
envAgentPort = "JAEGER_AGENT_PORT"
)

// FromEnv uses environment variables to set the tracer's Configuration
Expand Down Expand Up @@ -206,6 +207,24 @@ func (rc *ReporterConfig) reporterConfigFromEnv() (*ReporterConfig, error) {
if useEnv || rc.LocalAgentHostPort == "" {
rc.LocalAgentHostPort = fmt.Sprintf("%s:%d", host, port)
}

if e := os.Getenv(envReporterAttemptReconnectingDisabled); e != "" {
if value, err := strconv.ParseBool(e); err == nil {
rc.DisableAttemptReconnecting = value
} else {
return nil, errors.Wrapf(err, "cannot parse env var %s=%s", envReporterAttemptReconnectingDisabled, e)
}
}

if !rc.DisableAttemptReconnecting {
if e := os.Getenv(envReporterAttemptReconnectInterval); e != "" {
if value, err := time.ParseDuration(e); err == nil {
rc.AttemptReconnectInterval = value
} else {
return nil, errors.Wrapf(err, "cannot parse env var %s=%s", envReporterAttemptReconnectInterval, e)
}
}
}
}

return rc, nil
Expand Down
10 changes: 7 additions & 3 deletions config/config_test.go
Expand Up @@ -202,6 +202,8 @@ func TestReporter(t *testing.T) {
setEnv(t, envAgentPort, "6832")
setEnv(t, envUser, "user")
setEnv(t, envPassword, "password")
setEnv(t, envReporterAttemptReconnectingDisabled, "false")
setEnv(t, envReporterAttemptReconnectInterval, "40s")

// Existing ReporterConfig data
rc := ReporterConfig{
Expand All @@ -225,6 +227,8 @@ func TestReporter(t *testing.T) {
assert.Equal(t, "nonlocalhost:6832", cfg.LocalAgentHostPort)
assert.Equal(t, "user01", cfg.User)
assert.Equal(t, "password01", cfg.Password)
assert.Equal(t, false, cfg.DisableAttemptReconnecting)
assert.Equal(t, time.Second*40, cfg.AttemptReconnectInterval)

// Prepare
setEnv(t, envEndpoint, "http://1.2.3.4:5678/api/traces")
Expand Down Expand Up @@ -561,15 +565,15 @@ func TestInvalidSamplerType(t *testing.T) {
func TestUDPTransportType(t *testing.T) {
rc := &ReporterConfig{LocalAgentHostPort: "localhost:1234"}
expect, _ := jaeger.NewUDPTransport(rc.LocalAgentHostPort, 0)
sender, err := rc.newTransport()
sender, err := rc.newTransport(log.NullLogger)
require.NoError(t, err)
require.IsType(t, expect, sender)
}

func TestHTTPTransportType(t *testing.T) {
rc := &ReporterConfig{CollectorEndpoint: "http://1.2.3.4:5678/api/traces"}
expect := transport.NewHTTPTransport(rc.CollectorEndpoint)
sender, err := rc.newTransport()
sender, err := rc.newTransport(log.NullLogger)
require.NoError(t, err)
require.IsType(t, expect, sender)
}
Expand All @@ -581,7 +585,7 @@ func TestHTTPTransportTypeWithAuth(t *testing.T) {
Password: "auth_pass",
}
expect := transport.NewHTTPTransport(rc.CollectorEndpoint)
sender, err := rc.newTransport()
sender, err := rc.newTransport(log.NullLogger)
require.NoError(t, err)
require.IsType(t, expect, sender)
}
Expand Down
5 changes: 4 additions & 1 deletion transport/http_test.go
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/uber/jaeger-client-go/thrift"

"github.com/uber/jaeger-client-go"
Expand Down Expand Up @@ -167,7 +168,9 @@ func newHTTPServer(t *testing.T) *httpServer {
})

go func() {
http.ListenAndServe(":10001", nil)
if err := http.ListenAndServe(":10001", nil); err != nil && err != http.ErrServerClosed {
require.NoError(t, err)
}
}()

return server
Expand Down
41 changes: 32 additions & 9 deletions transport_udp.go
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"

"github.com/uber/jaeger-client-go/internal/reporterstats"
"github.com/uber/jaeger-client-go/log"
"github.com/uber/jaeger-client-go/thrift"
j "github.com/uber/jaeger-client-go/thrift-gen/jaeger"
"github.com/uber/jaeger-client-go/utils"
Expand Down Expand Up @@ -57,35 +58,57 @@ type udpSender struct {
failedToEmitSpans int64
}

// NewUDPTransport creates a reporter that submits spans to jaeger-agent.
// UDPTransportParams allows specifying options for initializing a UDPTransport. An instance of this struct should
// be passed to NewUDPTransportWithParams.
type UDPTransportParams struct {
utils.AgentClientUDPParams
}

// NewUDPTransportWithParams creates a reporter that submits spans to jaeger-agent.
// TODO: (breaking change) move to transport/ package.
func NewUDPTransport(hostPort string, maxPacketSize int) (Transport, error) {
if len(hostPort) == 0 {
hostPort = fmt.Sprintf("%s:%d", DefaultUDPSpanServerHost, DefaultUDPSpanServerPort)
func NewUDPTransportWithParams(params UDPTransportParams) (Transport, error) {
if len(params.HostPort) == 0 {
params.HostPort = fmt.Sprintf("%s:%d", DefaultUDPSpanServerHost, DefaultUDPSpanServerPort)
}
if maxPacketSize == 0 {
maxPacketSize = utils.UDPPacketMaxLength

if params.Logger == nil {
params.Logger = log.StdLogger
}

if params.MaxPacketSize == 0 {
params.MaxPacketSize = utils.UDPPacketMaxLength
}

protocolFactory := thrift.NewTCompactProtocolFactory()

// Each span is first written to thriftBuffer to determine its size in bytes.
thriftBuffer := thrift.NewTMemoryBufferLen(maxPacketSize)
thriftBuffer := thrift.NewTMemoryBufferLen(params.MaxPacketSize)
thriftProtocol := protocolFactory.GetProtocol(thriftBuffer)

client, err := utils.NewAgentClientUDP(hostPort, maxPacketSize)
client, err := utils.NewAgentClientUDPWithParams(params.AgentClientUDPParams)
if err != nil {
return nil, err
}

return &udpSender{
client: client,
maxSpanBytes: maxPacketSize - emitBatchOverhead,
maxSpanBytes: params.MaxPacketSize - emitBatchOverhead,
thriftBuffer: thriftBuffer,
thriftProtocol: thriftProtocol,
}, nil
}

// NewUDPTransport creates a reporter that submits spans to jaeger-agent.
// TODO: (breaking change) move to transport/ package.
func NewUDPTransport(hostPort string, maxPacketSize int) (Transport, error) {
return NewUDPTransportWithParams(UDPTransportParams{
AgentClientUDPParams: utils.AgentClientUDPParams{
HostPort: hostPort,
MaxPacketSize: maxPacketSize,
},
})
}

// SetReporterStats implements reporterstats.Receiver.
func (s *udpSender) SetReporterStats(rs reporterstats.ReporterStats) {
s.reporterStats = rs
Expand Down

0 comments on commit 704ce28

Please sign in to comment.