Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add resolved udp connection type, continually resolve dns names in background #520

Merged
merged 25 commits into from Jul 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
7f1085d
Add resolved udp connection type, continually resolve dns names in
terev Jun 24, 2020
3690bcb
Be sure to set buffer bytes width on new connections.
terev Jun 26, 2020
e7e7b32
Lock when checking if resolved addr is new.
terev Jun 26, 2020
7994cdf
Fixes from review comments. Dont return error if UDPConn fails on sta…
terev Jun 29, 2020
66b7ec4
Fix failing test. Apparently the linux kernel returns the sockopt val
terev Jun 29, 2020
fc0d3eb
Use atomic ops to manage bufferBytes instead of locking mutex
terev Jun 29, 2020
c47cc4b
Fix buffer bytes assert because sock opt value is not guaranteed to b…
terev Jun 29, 2020
aaf0c9b
Remove intermediate init helpers, initialize close chann in struct
terev Jun 30, 2020
0264dd8
Fixes based on comments, more tests for udp_client.go, and test for
terev Jul 3, 2020
8ea3b31
Run make fmt
terev Jul 3, 2020
d4f29aa
Remove unused struct field
terev Jul 3, 2020
fd62566
Fix lint error
terev Jul 3, 2020
d70dc93
Add test for new conn established when host record changes
terev Jul 3, 2020
4161d17
Fix comment typo
terev Jul 3, 2020
0d18d0d
Add test for failed write retry
terev Jul 3, 2020
044e26a
Add test calling NewAgentClientUDP
terev Jul 3, 2020
9c36339
Remove sleep on last try evaluating connection condition
terev Jul 3, 2020
70bff5e
Rename resolved udp conn to reconnecting udp conn, add opt-out option
terev Jul 7, 2020
3aedb36
Remove irrelevant comment, fix transport max packet size regression
terev Jul 7, 2020
f9de33b
Add panic in case the test server listen or srv fails unexpectedly
terev Jul 7, 2020
19fb557
Add coverage for new env vars
terev Jul 7, 2020
3003b3c
Run make fmt
terev Jul 7, 2020
3e254f0
Add back constants, add helper for generating a mock udp addr, require
terev Jul 8, 2020
2e4ea48
Remove local agent constants from utils
terev Jul 8, 2020
da03468
Move no error requirement into mock udp addr constructor
terev Jul 9, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
75 changes: 74 additions & 1 deletion Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions README.md
Expand Up @@ -61,6 +61,8 @@ JAEGER_PASSWORD | Password to send as part of "Basic" authentication to the coll
JAEGER_REPORTER_LOG_SPANS | Whether the reporter should also log the spans" `true` or `false` (default `false`).
JAEGER_REPORTER_MAX_QUEUE_SIZE | The reporter's maximum queue size (default `100`).
JAEGER_REPORTER_FLUSH_INTERVAL | The reporter's flush interval, with units, e.g. `500ms` or `2s` ([valid units][timeunits]; default `1s`).
JAEGER_REPORTER_ATTEMPT_RECONNECTING_DISABLED | When true, disables udp connection helper that periodically re-resolves the agent's hostname and reconnects if there was a change (default `false`).
JAEGER_REPORTER_ATTEMPT_RECONNECT_INTERVAL | Controls how often the agent client re-resolves the provided hostname in order to detect address changes ([valid units][timeunits]; default `30s`).
JAEGER_SAMPLER_TYPE | The sampler type: `remote`, `const`, `probabilistic`, `ratelimiting` (default `remote`). See also https://www.jaegertracing.io/docs/latest/sampling/.
JAEGER_SAMPLER_PARAM | The sampler parameter (number).
JAEGER_SAMPLER_MANAGER_HOST_PORT | (deprecated) The HTTP endpoint when using the `remote` sampler.
Expand Down
25 changes: 22 additions & 3 deletions config/config.go
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"github.com/opentracing/opentracing-go"
"github.com/uber/jaeger-client-go/utils"

"github.com/uber/jaeger-client-go"
"github.com/uber/jaeger-client-go/internal/baggage/remote"
Expand Down Expand Up @@ -124,6 +125,17 @@ type ReporterConfig struct {
// Can be provided by FromEnv() via the environment variable named JAEGER_AGENT_HOST / JAEGER_AGENT_PORT
LocalAgentHostPort string `yaml:"localAgentHostPort"`

// DisableAttemptReconnecting when true, disables udp connection helper that periodically re-resolves
// the agent's hostname and reconnects if there was a change. This option only
// applies if LocalAgentHostPort is specified.
// Can be provided by FromEnv() via the environment variable named JAEGER_REPORTER_ATTEMPT_RECONNECTING_DISABLED
DisableAttemptReconnecting bool `yaml:"disableAttemptReconnecting"`

// AttemptReconnectInterval controls how often the agent client re-resolves the provided hostname
// in order to detect address changes. This option only applies if DisableAttemptReconnecting is false.
// Can be provided by FromEnv() via the environment variable named JAEGER_REPORTER_ATTEMPT_RECONNECT_INTERVAL
AttemptReconnectInterval time.Duration

// CollectorEndpoint instructs reporter to send spans to jaeger-collector at this URL.
// Can be provided by FromEnv() via the environment variable named JAEGER_ENDPOINT
CollectorEndpoint string `yaml:"collectorEndpoint"`
Expand Down Expand Up @@ -384,7 +396,7 @@ func (rc *ReporterConfig) NewReporter(
metrics *jaeger.Metrics,
logger jaeger.Logger,
) (jaeger.Reporter, error) {
sender, err := rc.newTransport()
sender, err := rc.newTransport(logger)
if err != nil {
return nil, err
}
Expand All @@ -401,7 +413,7 @@ func (rc *ReporterConfig) NewReporter(
return reporter, err
}

func (rc *ReporterConfig) newTransport() (jaeger.Transport, error) {
func (rc *ReporterConfig) newTransport(logger jaeger.Logger) (jaeger.Transport, error) {
switch {
case rc.CollectorEndpoint != "":
httpOptions := []transport.HTTPOption{transport.HTTPBatchSize(1), transport.HTTPHeaders(rc.HTTPHeaders)}
Expand All @@ -410,6 +422,13 @@ func (rc *ReporterConfig) newTransport() (jaeger.Transport, error) {
}
return transport.NewHTTPTransport(rc.CollectorEndpoint, httpOptions...), nil
default:
return jaeger.NewUDPTransport(rc.LocalAgentHostPort, 0)
return jaeger.NewUDPTransportWithParams(jaeger.UDPTransportParams{
AgentClientUDPParams: utils.AgentClientUDPParams{
HostPort: rc.LocalAgentHostPort,
Logger: logger,
DisableAttemptReconnecting: rc.DisableAttemptReconnecting,
AttemptReconnectInterval: rc.AttemptReconnectInterval,
},
})
}
}
57 changes: 38 additions & 19 deletions config/config_env.go
Expand Up @@ -24,30 +24,31 @@ import (

"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"

"github.com/uber/jaeger-client-go"
)

const (
// environment variable names
envServiceName = "JAEGER_SERVICE_NAME"
envDisabled = "JAEGER_DISABLED"
envRPCMetrics = "JAEGER_RPC_METRICS"
envTags = "JAEGER_TAGS"
envSamplerType = "JAEGER_SAMPLER_TYPE"
envSamplerParam = "JAEGER_SAMPLER_PARAM"
envSamplerManagerHostPort = "JAEGER_SAMPLER_MANAGER_HOST_PORT" // Deprecated by envSamplingEndpoint
envSamplingEndpoint = "JAEGER_SAMPLING_ENDPOINT"
envSamplerMaxOperations = "JAEGER_SAMPLER_MAX_OPERATIONS"
envSamplerRefreshInterval = "JAEGER_SAMPLER_REFRESH_INTERVAL"
envReporterMaxQueueSize = "JAEGER_REPORTER_MAX_QUEUE_SIZE"
envReporterFlushInterval = "JAEGER_REPORTER_FLUSH_INTERVAL"
envReporterLogSpans = "JAEGER_REPORTER_LOG_SPANS"
envEndpoint = "JAEGER_ENDPOINT"
envUser = "JAEGER_USER"
envPassword = "JAEGER_PASSWORD"
envAgentHost = "JAEGER_AGENT_HOST"
envAgentPort = "JAEGER_AGENT_PORT"
envServiceName = "JAEGER_SERVICE_NAME"
envDisabled = "JAEGER_DISABLED"
envRPCMetrics = "JAEGER_RPC_METRICS"
envTags = "JAEGER_TAGS"
envSamplerType = "JAEGER_SAMPLER_TYPE"
envSamplerParam = "JAEGER_SAMPLER_PARAM"
envSamplerManagerHostPort = "JAEGER_SAMPLER_MANAGER_HOST_PORT" // Deprecated by envSamplingEndpoint
envSamplingEndpoint = "JAEGER_SAMPLING_ENDPOINT"
envSamplerMaxOperations = "JAEGER_SAMPLER_MAX_OPERATIONS"
envSamplerRefreshInterval = "JAEGER_SAMPLER_REFRESH_INTERVAL"
envReporterMaxQueueSize = "JAEGER_REPORTER_MAX_QUEUE_SIZE"
envReporterFlushInterval = "JAEGER_REPORTER_FLUSH_INTERVAL"
envReporterLogSpans = "JAEGER_REPORTER_LOG_SPANS"
envReporterAttemptReconnectingDisabled = "JAEGER_REPORTER_ATTEMPT_RECONNECTING_DISABLED"
envReporterAttemptReconnectInterval = "JAEGER_REPORTER_ATTEMPT_RECONNECT_INTERVAL"
envEndpoint = "JAEGER_ENDPOINT"
envUser = "JAEGER_USER"
envPassword = "JAEGER_PASSWORD"
envAgentHost = "JAEGER_AGENT_HOST"
envAgentPort = "JAEGER_AGENT_PORT"
)

// FromEnv uses environment variables to set the tracer's Configuration
Expand Down Expand Up @@ -206,6 +207,24 @@ func (rc *ReporterConfig) reporterConfigFromEnv() (*ReporterConfig, error) {
if useEnv || rc.LocalAgentHostPort == "" {
rc.LocalAgentHostPort = fmt.Sprintf("%s:%d", host, port)
}

if e := os.Getenv(envReporterAttemptReconnectingDisabled); e != "" {
if value, err := strconv.ParseBool(e); err == nil {
rc.DisableAttemptReconnecting = value
} else {
return nil, errors.Wrapf(err, "cannot parse env var %s=%s", envReporterAttemptReconnectingDisabled, e)
}
}

if !rc.DisableAttemptReconnecting {
if e := os.Getenv(envReporterAttemptReconnectInterval); e != "" {
if value, err := time.ParseDuration(e); err == nil {
rc.AttemptReconnectInterval = value
} else {
return nil, errors.Wrapf(err, "cannot parse env var %s=%s", envReporterAttemptReconnectInterval, e)
}
}
}
}

return rc, nil
Expand Down
10 changes: 7 additions & 3 deletions config/config_test.go
Expand Up @@ -202,6 +202,8 @@ func TestReporter(t *testing.T) {
setEnv(t, envAgentPort, "6832")
setEnv(t, envUser, "user")
setEnv(t, envPassword, "password")
setEnv(t, envReporterAttemptReconnectingDisabled, "false")
setEnv(t, envReporterAttemptReconnectInterval, "40s")

// Existing ReporterConfig data
rc := ReporterConfig{
Expand All @@ -225,6 +227,8 @@ func TestReporter(t *testing.T) {
assert.Equal(t, "nonlocalhost:6832", cfg.LocalAgentHostPort)
assert.Equal(t, "user01", cfg.User)
assert.Equal(t, "password01", cfg.Password)
assert.Equal(t, false, cfg.DisableAttemptReconnecting)
assert.Equal(t, time.Second*40, cfg.AttemptReconnectInterval)

// Prepare
setEnv(t, envEndpoint, "http://1.2.3.4:5678/api/traces")
Expand Down Expand Up @@ -561,15 +565,15 @@ func TestInvalidSamplerType(t *testing.T) {
func TestUDPTransportType(t *testing.T) {
rc := &ReporterConfig{LocalAgentHostPort: "localhost:1234"}
expect, _ := jaeger.NewUDPTransport(rc.LocalAgentHostPort, 0)
sender, err := rc.newTransport()
sender, err := rc.newTransport(log.NullLogger)
require.NoError(t, err)
require.IsType(t, expect, sender)
}

func TestHTTPTransportType(t *testing.T) {
rc := &ReporterConfig{CollectorEndpoint: "http://1.2.3.4:5678/api/traces"}
expect := transport.NewHTTPTransport(rc.CollectorEndpoint)
sender, err := rc.newTransport()
sender, err := rc.newTransport(log.NullLogger)
require.NoError(t, err)
require.IsType(t, expect, sender)
}
Expand All @@ -581,7 +585,7 @@ func TestHTTPTransportTypeWithAuth(t *testing.T) {
Password: "auth_pass",
}
expect := transport.NewHTTPTransport(rc.CollectorEndpoint)
sender, err := rc.newTransport()
sender, err := rc.newTransport(log.NullLogger)
require.NoError(t, err)
require.IsType(t, expect, sender)
}
Expand Down
5 changes: 4 additions & 1 deletion transport/http_test.go
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/uber/jaeger-client-go/thrift"

"github.com/uber/jaeger-client-go"
Expand Down Expand Up @@ -167,7 +168,9 @@ func newHTTPServer(t *testing.T) *httpServer {
})

go func() {
http.ListenAndServe(":10001", nil)
if err := http.ListenAndServe(":10001", nil); err != nil && err != http.ErrServerClosed {
require.NoError(t, err)
}
}()

return server
Expand Down
41 changes: 32 additions & 9 deletions transport_udp.go
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"

"github.com/uber/jaeger-client-go/internal/reporterstats"
"github.com/uber/jaeger-client-go/log"
"github.com/uber/jaeger-client-go/thrift"
j "github.com/uber/jaeger-client-go/thrift-gen/jaeger"
"github.com/uber/jaeger-client-go/utils"
Expand Down Expand Up @@ -57,35 +58,57 @@ type udpSender struct {
failedToEmitSpans int64
}

// NewUDPTransport creates a reporter that submits spans to jaeger-agent.
// UDPTransportParams allows specifying options for initializing a UDPTransport. An instance of this struct should
// be passed to NewUDPTransportWithParams.
type UDPTransportParams struct {
utils.AgentClientUDPParams
}

// NewUDPTransportWithParams creates a reporter that submits spans to jaeger-agent.
// TODO: (breaking change) move to transport/ package.
func NewUDPTransport(hostPort string, maxPacketSize int) (Transport, error) {
if len(hostPort) == 0 {
hostPort = fmt.Sprintf("%s:%d", DefaultUDPSpanServerHost, DefaultUDPSpanServerPort)
func NewUDPTransportWithParams(params UDPTransportParams) (Transport, error) {
if len(params.HostPort) == 0 {
params.HostPort = fmt.Sprintf("%s:%d", DefaultUDPSpanServerHost, DefaultUDPSpanServerPort)
}
if maxPacketSize == 0 {
maxPacketSize = utils.UDPPacketMaxLength

if params.Logger == nil {
params.Logger = log.StdLogger
}

if params.MaxPacketSize == 0 {
params.MaxPacketSize = utils.UDPPacketMaxLength
}
terev marked this conversation as resolved.
Show resolved Hide resolved

protocolFactory := thrift.NewTCompactProtocolFactory()

// Each span is first written to thriftBuffer to determine its size in bytes.
thriftBuffer := thrift.NewTMemoryBufferLen(maxPacketSize)
thriftBuffer := thrift.NewTMemoryBufferLen(params.MaxPacketSize)
thriftProtocol := protocolFactory.GetProtocol(thriftBuffer)

client, err := utils.NewAgentClientUDP(hostPort, maxPacketSize)
client, err := utils.NewAgentClientUDPWithParams(params.AgentClientUDPParams)
if err != nil {
return nil, err
}

return &udpSender{
client: client,
maxSpanBytes: maxPacketSize - emitBatchOverhead,
maxSpanBytes: params.MaxPacketSize - emitBatchOverhead,
thriftBuffer: thriftBuffer,
thriftProtocol: thriftProtocol,
}, nil
}

// NewUDPTransport creates a reporter that submits spans to jaeger-agent.
// TODO: (breaking change) move to transport/ package.
func NewUDPTransport(hostPort string, maxPacketSize int) (Transport, error) {
return NewUDPTransportWithParams(UDPTransportParams{
AgentClientUDPParams: utils.AgentClientUDPParams{
HostPort: hostPort,
MaxPacketSize: maxPacketSize,
},
})
}

// SetReporterStats implements reporterstats.Receiver.
func (s *udpSender) SetReporterStats(rs reporterstats.ReporterStats) {
s.reporterStats = rs
Expand Down