Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement retry policy for the OTLP/gRPC exporter #1832

Merged
merged 31 commits into from Apr 29, 2021
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
ab7e4a2
draft for otlp/gRPC retries
paivagustavo Apr 16, 2021
a200113
draft for otlp/gRPC retries
paivagustavo Apr 16, 2021
45ca424
add retry settings and more tests.
paivagustavo Apr 20, 2021
9b2bf26
add more tests and clean code
paivagustavo Apr 21, 2021
e9271de
docs
paivagustavo Apr 21, 2021
8fcdd40
update defaults and docs.
paivagustavo Apr 21, 2021
675b49c
run make
paivagustavo Apr 21, 2021
f4fb057
Merge remote-tracking branch 'upstream/main' into otlp_exporter_retry
paivagustavo Apr 21, 2021
3cafc42
changelog
paivagustavo Apr 21, 2021
9f3146c
fix tests
paivagustavo Apr 21, 2021
c249bc1
changelog
paivagustavo Apr 21, 2021
cf5cc83
update pr number and test message
paivagustavo Apr 21, 2021
a03f5d9
Merge remote-tracking branch 'upstream/main' into otlp_exporter_retry
paivagustavo Apr 21, 2021
a834f85
clean retry settings structure
paivagustavo Apr 21, 2021
94142e7
apply suggestions
paivagustavo Apr 22, 2021
a639feb
fix flaky tests, lint and suggestions
paivagustavo Apr 22, 2021
fa4bd33
fix flaky tests
paivagustavo Apr 22, 2021
e45e41d
add more tests
paivagustavo Apr 22, 2021
ec888e8
lint
paivagustavo Apr 22, 2021
b1538ec
make permission_denied and unauthenticated permanent and assert shutdown
paivagustavo Apr 23, 2021
13b2512
Merge branch 'main' of github.com:open-telemetry/opentelemetry-go int…
paivagustavo Apr 23, 2021
15ed819
Merge branch 'main' of github.com:open-telemetry/opentelemetry-go int…
paivagustavo Apr 23, 2021
685f2d9
update changelog
paivagustavo Apr 23, 2021
5332156
address suggestions
paivagustavo Apr 27, 2021
3ca2c14
fix error wrapping and remove `time.After` usage
paivagustavo Apr 27, 2021
8aa5d94
fix error wrapping and remove `time.After` usage
paivagustavo Apr 27, 2021
5b992b9
fix server throttling bug
paivagustavo Apr 27, 2021
44fa8ea
Merge branch 'main' into otlp_exporter_retry
paivagustavo Apr 29, 2021
959e784
Merge branch 'main' of github.com:open-telemetry/opentelemetry-go int…
paivagustavo Apr 29, 2021
945e596
md lint
paivagustavo Apr 29, 2021
117e25a
Merge branch 'main' into otlp_exporter_retry
MrAlias Apr 29, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
14 changes: 14 additions & 0 deletions CHANGELOG.md
Expand Up @@ -38,6 +38,20 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- The Jaeger exporter now reports dropped attributes for a Span event in the exported log. (#1771)
- Adds `k8s.node.name` and `k8s.node.uid` attribute keys to the `semconv` package. (#1789)
- Adds `otlpgrpc.WithTimeout` option for configuring timeout to the otlp/gRPC exporter. (#1821)
- Adds `otlpgrpc.WithRetry`option for configuring the retry policy for transient errors on the otlp/gRPC exporter.(#1832)
- The following status codes are defined as transient errors:
| gRPC Status Code | Description |
| ---------------- | ----------- |
| 1 | Cancelled |
| 4 | Deadline Exceeded |
| 7 | Permission Denied |
| 8 | Resource Exhausted |
| 10 | Aborted |
| 10 | Out of Range |
| 14 | Unavailable |
| 15 | Data Loss |
| 16 | Unauthenticated |


### Fixed

Expand Down
2 changes: 2 additions & 0 deletions example/otel-collector/go.sum
Expand Up @@ -4,6 +4,8 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/benbjohnson/clock v1.0.3 h1:vkLuvpK4fmtSCuo60+yC63p7y0BmQ8gm5ZXGuBCJyXg=
github.com/benbjohnson/clock v1.0.3/go.mod h1:bGMdMPoPVvcYyt1gHDf4J2KE153Yf9BuiUKYMaxlTDM=
github.com/cenkalti/backoff/v4 v4.1.0 h1:c8LkOFQTzuO0WBM/ae5HdGQuZPfPxp7lqBRwQRm4fSc=
MrAlias marked this conversation as resolved.
Show resolved Hide resolved
github.com/cenkalti/backoff/v4 v4.1.0/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
Expand Down
3 changes: 3 additions & 0 deletions example/prom-collector/go.sum
Expand Up @@ -29,7 +29,10 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n9yuLkIJQ=
github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4=
github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
github.com/cenkalti/backoff/v4 v4.1.0 h1:c8LkOFQTzuO0WBM/ae5HdGQuZPfPxp7lqBRwQRm4fSc=
github.com/cenkalti/backoff/v4 v4.1.0/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
Expand Down
17 changes: 1 addition & 16 deletions exporters/otlp/README.md
Expand Up @@ -13,19 +13,4 @@ The exporter can be installed using standard `go` functionality.
$ go get -u go.opentelemetry.io/otel/exporters/otlp
```

A new exporter can be created using the `NewExporter` function.

## Retries

The exporter will not, by default, retry failed requests to the collector.
However, it is configured in a way that it can be easily enabled.

To enable retries, the `GRPC_GO_RETRY` environment variable needs to be set to `on`. For example,

```
GRPC_GO_RETRY=on go run .
```

The [default service config](https://github.com/grpc/proposal/blob/master/A6-client-retries.md) used by default is defined to retry failed requests with exponential backoff (`0.3seconds * (2)^retry`) with [a max of `5` retries](https://github.com/open-telemetry/oteps/blob/be2a3fcbaa417ebbf5845cd485d34fdf0ab4a2a4/text/0035-opentelemetry-protocol.md#export-response)).

These retries are only attempted for reponses that are [deemed "retry-able" by the collector](https://github.com/grpc/proposal/blob/master/A6-client-retries.md#validation-of-retrypolicy).
A new exporter can be created using the `NewExporter` function.
2 changes: 2 additions & 0 deletions exporters/otlp/go.mod
Expand Up @@ -8,6 +8,7 @@ replace (
)

require (
github.com/cenkalti/backoff/v4 v4.1.0
github.com/google/go-cmp v0.5.5
github.com/stretchr/testify v1.7.0
go.opentelemetry.io/otel v0.19.0
Expand All @@ -17,6 +18,7 @@ require (
go.opentelemetry.io/otel/sdk/metric v0.19.0
go.opentelemetry.io/otel/trace v0.19.0
go.opentelemetry.io/proto/otlp v0.7.0
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013
google.golang.org/grpc v1.37.0
google.golang.org/protobuf v1.26.0
)
Expand Down
2 changes: 2 additions & 0 deletions exporters/otlp/go.sum
Expand Up @@ -4,6 +4,8 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/benbjohnson/clock v1.0.3 h1:vkLuvpK4fmtSCuo60+yC63p7y0BmQ8gm5ZXGuBCJyXg=
github.com/benbjohnson/clock v1.0.3/go.mod h1:bGMdMPoPVvcYyt1gHDf4J2KE153Yf9BuiUKYMaxlTDM=
github.com/cenkalti/backoff/v4 v4.1.0 h1:c8LkOFQTzuO0WBM/ae5HdGQuZPfPxp7lqBRwQRm4fSc=
github.com/cenkalti/backoff/v4 v4.1.0/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
Expand Down
47 changes: 19 additions & 28 deletions exporters/otlp/internal/otlpconfig/options.go
Expand Up @@ -49,22 +49,7 @@ const (
"name":[
{ "service":"opentelemetry.proto.collector.metrics.v1.MetricsService" },
{ "service":"opentelemetry.proto.collector.trace.v1.TraceService" }
],
"retryPolicy":{
"MaxAttempts":5,
"InitialBackoff":"0.3s",
"MaxBackoff":"5s",
"BackoffMultiplier":2,
"RetryableStatusCodes":[
"CANCELLED",
"DEADLINE_EXCEEDED",
"RESOURCE_EXHAUSTED",
"ABORTED",
"OUT_OF_RANGE",
"UNAVAILABLE",
"DATA_LOSS"
]
}
]
paivagustavo marked this conversation as resolved.
Show resolved Hide resolved
}]
}`
)
Expand All @@ -88,17 +73,16 @@ type (
Metrics SignalConfig
Traces SignalConfig

// General configurations
// HTTP configurations
Marshaler otlp.Marshaler
MaxAttempts int
Backoff time.Duration

// HTTP configuration
Marshaler otlp.Marshaler

// gRPC configurations
ReconnectionPeriod time.Duration
ServiceConfig string
DialOptions []grpc.DialOption
RetrySettings otlp.RetrySettings
}
)

Expand All @@ -118,6 +102,7 @@ func NewDefaultConfig() Config {
},
MaxAttempts: DefaultMaxAttempts,
Backoff: DefaultBackoff,
RetrySettings: otlp.DefaultRetrySettings(),
ServiceConfig: DefaultServiceConfig,
}

Expand Down Expand Up @@ -280,15 +265,9 @@ func WithMetricsURLPath(urlPath string) GenericOption {
})
}

func WithMaxAttempts(maxAttempts int) GenericOption {
func WithRetry(settings otlp.RetrySettings) GenericOption {
return newGenericOption(func(cfg *Config) {
cfg.MaxAttempts = maxAttempts
})
}

func WithBackoff(duration time.Duration) GenericOption {
return newGenericOption(func(cfg *Config) {
cfg.Backoff = duration
cfg.RetrySettings = settings
})
}

Expand Down Expand Up @@ -374,3 +353,15 @@ func WithMetricsTimeout(duration time.Duration) GenericOption {
cfg.Metrics.Timeout = duration
})
}

func WithMaxAttempts(maxAttempts int) GenericOption {
return newGenericOption(func(cfg *Config) {
cfg.MaxAttempts = maxAttempts
})
}

func WithBackoff(duration time.Duration) GenericOption {
return newGenericOption(func(cfg *Config) {
cfg.Backoff = duration
})
}
22 changes: 14 additions & 8 deletions exporters/otlp/otlpgrpc/driver.go
Expand Up @@ -145,10 +145,13 @@ func (md *metricsDriver) uploadMetrics(ctx context.Context, protoMetrics []*metr
if md.metricsClient == nil {
return errNoClient
}
_, err := md.metricsClient.Export(ctx, &colmetricpb.ExportMetricsServiceRequest{
ResourceMetrics: protoMetrics,
})
return err
req := func(ctx context.Context) error {
_, err := md.metricsClient.Export(ctx, &colmetricpb.ExportMetricsServiceRequest{
ResourceMetrics: protoMetrics,
})
return err
}
return doRequest(ctx, req, md.connection.cfg.RetrySettings, md.connection.stopCh)
paivagustavo marked this conversation as resolved.
Show resolved Hide resolved
}()
if err != nil {
md.connection.setStateDisconnected(err)
Expand Down Expand Up @@ -183,10 +186,13 @@ func (td *tracesDriver) uploadTraces(ctx context.Context, protoSpans []*tracepb.
if td.tracesClient == nil {
return errNoClient
}
_, err := td.tracesClient.Export(ctx, &coltracepb.ExportTraceServiceRequest{
ResourceSpans: protoSpans,
})
return err
req := func(ctx context.Context) error {
_, err := td.tracesClient.Export(ctx, &coltracepb.ExportTraceServiceRequest{
ResourceSpans: protoSpans,
})
return err
}
return doRequest(ctx, req, td.connection.cfg.RetrySettings, td.connection.stopCh)
}()
if err != nil {
td.connection.setStateDisconnected(err)
Expand Down
40 changes: 31 additions & 9 deletions exporters/otlp/otlpgrpc/mock_collector_test.go
Expand Up @@ -34,11 +34,12 @@ import (
tracepb "go.opentelemetry.io/proto/otlp/trace/v1"
)

func makeMockCollector(t *testing.T) *mockCollector {
func makeMockCollector(t *testing.T, mockConfig *mockConfig) *mockCollector {
return &mockCollector{
t: t,
traceSvc: &mockTraceService{
storage: otlptest.NewSpansStorage(),
errors: mockConfig.errors,
},
metricSvc: &mockMetricService{
storage: otlptest.NewMetricsStorage(),
Expand All @@ -49,10 +50,12 @@ func makeMockCollector(t *testing.T) *mockCollector {
type mockTraceService struct {
collectortracepb.UnimplementedTraceServiceServer

mu sync.RWMutex
storage otlptest.SpansStorage
headers metadata.MD
delay time.Duration
errors []error
requests int
mu sync.RWMutex
storage otlptest.SpansStorage
headers metadata.MD
delay time.Duration
}

func (mts *mockTraceService) getHeaders() metadata.MD {
Expand All @@ -77,9 +80,19 @@ func (mts *mockTraceService) Export(ctx context.Context, exp *collectortracepb.E
if mts.delay > 0 {
time.Sleep(mts.delay)
}
reply := &collectortracepb.ExportTraceServiceResponse{}

mts.mu.Lock()
defer mts.mu.Unlock()
defer func() {
mts.requests++
mts.mu.Unlock()
}()

reply := &collectortracepb.ExportTraceServiceResponse{}
if mts.requests < len(mts.errors) {
idx := mts.requests
return reply, mts.errors[idx]
}

mts.headers, _ = metadata.FromIncomingContext(ctx)
mts.storage.AddSpans(exp)
return reply, nil
Expand Down Expand Up @@ -122,6 +135,11 @@ type mockCollector struct {
stopOnce sync.Once
}

type mockConfig struct {
errors []error
endpoint string
}

var _ collectortracepb.TraceServiceServer = (*mockTraceService)(nil)
var _ collectormetricpb.MetricsServiceServer = (*mockMetricService)(nil)

Expand Down Expand Up @@ -192,13 +210,17 @@ func runMockCollector(t *testing.T) *mockCollector {
}

func runMockCollectorAtEndpoint(t *testing.T, endpoint string) *mockCollector {
ln, err := net.Listen("tcp", endpoint)
return runMockCollectorWithConfig(t, &mockConfig{endpoint: endpoint})
}

func runMockCollectorWithConfig(t *testing.T, mockConfig *mockConfig) *mockCollector {
ln, err := net.Listen("tcp", mockConfig.endpoint)
if err != nil {
t.Fatalf("Failed to get an endpoint: %v", err)
}

srv := grpc.NewServer()
mc := makeMockCollector(t)
mc := makeMockCollector(t, mockConfig)
collectortracepb.RegisterTraceServiceServer(srv, mc.traceSvc)
collectormetricpb.RegisterMetricsServiceServer(srv, mc.metricSvc)
mc.ln = newListener(ln)
Expand Down
9 changes: 9 additions & 0 deletions exporters/otlp/otlpgrpc/options.go
Expand Up @@ -200,3 +200,12 @@ func WithTracesTimeout(duration time.Duration) Option {
func WithMetricsTimeout(duration time.Duration) Option {
return otlpconfig.WithMetricsTimeout(duration)
}

// WithRetry configures the retry policy for transient errors that may occurs when
// exporting metrics or traces. An exponential back-off algorithm is used to
// ensure endpoints are not overwhelmed with retries. If unset, the default
// retry policy will retry after 5 seconds and increase exponentially after each
// error for a total of 1 minute.
func WithRetry(settings otlp.RetrySettings) Option {
return otlpconfig.WithRetry(settings)
}