Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement retry policy for the OTLP/gRPC exporter #1832

Merged
merged 31 commits into from Apr 29, 2021
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
ab7e4a2
draft for otlp/gRPC retries
paivagustavo Apr 16, 2021
a200113
draft for otlp/gRPC retries
paivagustavo Apr 16, 2021
45ca424
add retry settings and more tests.
paivagustavo Apr 20, 2021
9b2bf26
add more tests and clean code
paivagustavo Apr 21, 2021
e9271de
docs
paivagustavo Apr 21, 2021
8fcdd40
update defaults and docs.
paivagustavo Apr 21, 2021
675b49c
run make
paivagustavo Apr 21, 2021
f4fb057
Merge remote-tracking branch 'upstream/main' into otlp_exporter_retry
paivagustavo Apr 21, 2021
3cafc42
changelog
paivagustavo Apr 21, 2021
9f3146c
fix tests
paivagustavo Apr 21, 2021
c249bc1
changelog
paivagustavo Apr 21, 2021
cf5cc83
update pr number and test message
paivagustavo Apr 21, 2021
a03f5d9
Merge remote-tracking branch 'upstream/main' into otlp_exporter_retry
paivagustavo Apr 21, 2021
a834f85
clean retry settings structure
paivagustavo Apr 21, 2021
94142e7
apply suggestions
paivagustavo Apr 22, 2021
a639feb
fix flaky tests, lint and suggestions
paivagustavo Apr 22, 2021
fa4bd33
fix flaky tests
paivagustavo Apr 22, 2021
e45e41d
add more tests
paivagustavo Apr 22, 2021
ec888e8
lint
paivagustavo Apr 22, 2021
b1538ec
make permission_denied and unauthenticated permanent and assert shutdown
paivagustavo Apr 23, 2021
13b2512
Merge branch 'main' of github.com:open-telemetry/opentelemetry-go int…
paivagustavo Apr 23, 2021
15ed819
Merge branch 'main' of github.com:open-telemetry/opentelemetry-go int…
paivagustavo Apr 23, 2021
685f2d9
update changelog
paivagustavo Apr 23, 2021
5332156
address suggestions
paivagustavo Apr 27, 2021
3ca2c14
fix error wrapping and remove `time.After` usage
paivagustavo Apr 27, 2021
8aa5d94
fix error wrapping and remove `time.After` usage
paivagustavo Apr 27, 2021
5b992b9
fix server throttling bug
paivagustavo Apr 27, 2021
44fa8ea
Merge branch 'main' into otlp_exporter_retry
paivagustavo Apr 29, 2021
959e784
Merge branch 'main' of github.com:open-telemetry/opentelemetry-go int…
paivagustavo Apr 29, 2021
945e596
md lint
paivagustavo Apr 29, 2021
117e25a
Merge branch 'main' into otlp_exporter_retry
MrAlias Apr 29, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
13 changes: 12 additions & 1 deletion CHANGELOG.md
Expand Up @@ -9,7 +9,18 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
## [Unreleased]

### Added

- Adds `otlpgrpc.WithRetry`option for configuring the retry policy for transient errors on the otlp/gRPC exporter. (#1832)
- The following status codes are defined as transient errors:
| gRPC Status Code | Description |
| ---------------- | ----------- |
| 1 | Cancelled |
| 4 | Deadline Exceeded |
| 8 | Resource Exhausted |
| 10 | Aborted |
| 10 | Out of Range |
| 14 | Unavailable |
| 15 | Data Loss |

### Changed

- Make `NewSplitDriver` from `go.opentelemetry.io/otel/exporters/otlp` take variadic arguments instead of a `SplitConfig` item.
Expand Down
2 changes: 2 additions & 0 deletions example/otel-collector/go.sum
Expand Up @@ -4,6 +4,8 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/benbjohnson/clock v1.0.3 h1:vkLuvpK4fmtSCuo60+yC63p7y0BmQ8gm5ZXGuBCJyXg=
github.com/benbjohnson/clock v1.0.3/go.mod h1:bGMdMPoPVvcYyt1gHDf4J2KE153Yf9BuiUKYMaxlTDM=
github.com/cenkalti/backoff/v4 v4.1.0 h1:c8LkOFQTzuO0WBM/ae5HdGQuZPfPxp7lqBRwQRm4fSc=
MrAlias marked this conversation as resolved.
Show resolved Hide resolved
github.com/cenkalti/backoff/v4 v4.1.0/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
Expand Down
3 changes: 3 additions & 0 deletions example/prom-collector/go.sum
Expand Up @@ -29,7 +29,10 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n9yuLkIJQ=
github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4=
github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
github.com/cenkalti/backoff/v4 v4.1.0 h1:c8LkOFQTzuO0WBM/ae5HdGQuZPfPxp7lqBRwQRm4fSc=
github.com/cenkalti/backoff/v4 v4.1.0/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
Expand Down
17 changes: 1 addition & 16 deletions exporters/otlp/README.md
Expand Up @@ -13,19 +13,4 @@ The exporter can be installed using standard `go` functionality.
$ go get -u go.opentelemetry.io/otel/exporters/otlp
```

A new exporter can be created using the `NewExporter` function.

## Retries

The exporter will not, by default, retry failed requests to the collector.
However, it is configured in a way that it can be easily enabled.

To enable retries, the `GRPC_GO_RETRY` environment variable needs to be set to `on`. For example,

```
GRPC_GO_RETRY=on go run .
```

The [default service config](https://github.com/grpc/proposal/blob/master/A6-client-retries.md) used by default is defined to retry failed requests with exponential backoff (`0.3seconds * (2)^retry`) with [a max of `5` retries](https://github.com/open-telemetry/oteps/blob/be2a3fcbaa417ebbf5845cd485d34fdf0ab4a2a4/text/0035-opentelemetry-protocol.md#export-response)).

These retries are only attempted for reponses that are [deemed "retry-able" by the collector](https://github.com/grpc/proposal/blob/master/A6-client-retries.md#validation-of-retrypolicy).
A new exporter can be created using the `NewExporter` function.
2 changes: 2 additions & 0 deletions exporters/otlp/go.mod
Expand Up @@ -8,6 +8,7 @@ replace (
)

require (
github.com/cenkalti/backoff/v4 v4.1.0
github.com/google/go-cmp v0.5.5
github.com/stretchr/testify v1.7.0
go.opentelemetry.io/otel v0.20.0
Expand All @@ -17,6 +18,7 @@ require (
go.opentelemetry.io/otel/sdk/metric v0.20.0
go.opentelemetry.io/otel/trace v0.20.0
go.opentelemetry.io/proto/otlp v0.7.0
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013
google.golang.org/grpc v1.37.0
google.golang.org/protobuf v1.26.0
)
Expand Down
2 changes: 2 additions & 0 deletions exporters/otlp/go.sum
Expand Up @@ -4,6 +4,8 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/benbjohnson/clock v1.0.3 h1:vkLuvpK4fmtSCuo60+yC63p7y0BmQ8gm5ZXGuBCJyXg=
github.com/benbjohnson/clock v1.0.3/go.mod h1:bGMdMPoPVvcYyt1gHDf4J2KE153Yf9BuiUKYMaxlTDM=
github.com/cenkalti/backoff/v4 v4.1.0 h1:c8LkOFQTzuO0WBM/ae5HdGQuZPfPxp7lqBRwQRm4fSc=
github.com/cenkalti/backoff/v4 v4.1.0/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
Expand Down
66 changes: 28 additions & 38 deletions exporters/otlp/internal/otlpconfig/options.go
Expand Up @@ -42,31 +42,16 @@ const (
// DefaultTimeout is a default max waiting time for the backend to process
// each span or metrics batch.
DefaultTimeout time.Duration = 10 * time.Second
// DefaultServiceConfig is the gRPC service config used if none is
// provided by the user.
DefaultServiceConfig = `{
"methodConfig":[{
"name":[
{ "service":"opentelemetry.proto.collector.metrics.v1.MetricsService" },
{ "service":"opentelemetry.proto.collector.trace.v1.TraceService" }
],
"retryPolicy":{
"MaxAttempts":5,
"InitialBackoff":"0.3s",
"MaxBackoff":"5s",
"BackoffMultiplier":2,
"RetryableStatusCodes":[
"CANCELLED",
"DEADLINE_EXCEEDED",
"RESOURCE_EXHAUSTED",
"ABORTED",
"OUT_OF_RANGE",
"UNAVAILABLE",
"DATA_LOSS"
]
}
}]
}`
)

var (
// defaultRetrySettings is a default settings for the retry policy.
defaultRetrySettings = otlp.RetrySettings{
Enabled: true,
InitialInterval: 5 * time.Second,
MaxInterval: 30 * time.Second,
MaxElapsedTime: time.Minute,
}
)

type (
Expand All @@ -88,17 +73,16 @@ type (
Metrics SignalConfig
Traces SignalConfig

// General configurations
// HTTP configurations
Marshaler otlp.Marshaler
MaxAttempts int
Backoff time.Duration

// HTTP configuration
Marshaler otlp.Marshaler

// gRPC configurations
ReconnectionPeriod time.Duration
ServiceConfig string
DialOptions []grpc.DialOption
RetrySettings otlp.RetrySettings
}
)

Expand All @@ -118,7 +102,7 @@ func NewDefaultConfig() Config {
},
MaxAttempts: DefaultMaxAttempts,
Backoff: DefaultBackoff,
ServiceConfig: DefaultServiceConfig,
RetrySettings: defaultRetrySettings,
}

return c
Expand Down Expand Up @@ -280,15 +264,9 @@ func WithMetricsURLPath(urlPath string) GenericOption {
})
}

func WithMaxAttempts(maxAttempts int) GenericOption {
return newGenericOption(func(cfg *Config) {
cfg.MaxAttempts = maxAttempts
})
}

func WithBackoff(duration time.Duration) GenericOption {
func WithRetry(settings otlp.RetrySettings) GenericOption {
return newGenericOption(func(cfg *Config) {
cfg.Backoff = duration
cfg.RetrySettings = settings
})
}

Expand Down Expand Up @@ -374,3 +352,15 @@ func WithMetricsTimeout(duration time.Duration) GenericOption {
cfg.Metrics.Timeout = duration
})
}

func WithMaxAttempts(maxAttempts int) GenericOption {
return newGenericOption(func(cfg *Config) {
cfg.MaxAttempts = maxAttempts
})
}

func WithBackoff(duration time.Duration) GenericOption {
return newGenericOption(func(cfg *Config) {
cfg.Backoff = duration
})
}
130 changes: 130 additions & 0 deletions exporters/otlp/otlpgrpc/connection.go
Expand Up @@ -16,12 +16,18 @@ package otlpgrpc

import (
"context"
"fmt"
"math/rand"
"sync"
"sync/atomic"
"time"
"unsafe"

"github.com/cenkalti/backoff/v4"
"google.golang.org/genproto/googleapis/rpc/errdetails"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"google.golang.org/grpc/encoding/gzip"

"go.opentelemetry.io/otel/exporters/otlp"
Expand Down Expand Up @@ -276,3 +282,127 @@ func (c *connection) contextWithStop(ctx context.Context) (context.Context, cont
}(ctx, cancel)
return ctx, cancel
}

func (c *connection) doRequest(ctx context.Context, fn func(context.Context) error) error {
expBackoff := newExponentialBackoff(c.cfg.RetrySettings)

for {
err := fn(ctx)
if err == nil {
// request succeeded.
return nil
}

if !c.cfg.RetrySettings.Enabled {
return err
}

// We have an error, check gRPC status code.
st := status.Convert(err)
if st.Code() == codes.OK {
// Not really an error, still success.
return nil
}

// Now, this is this a real error.

if !shouldRetry(st.Code()) {
// It is not a retryable error, we should not retry.
return err
}

// Need to retry.
var delay time.Duration

// Respect server throttling.
throttle := getThrottleDuration(st)

backoffDelay := expBackoff.NextBackOff()
if backoffDelay == backoff.Stop {
// throw away the batch
err = fmt.Errorf("max elapsed time expired %w", err)
paivagustavo marked this conversation as resolved.
Show resolved Hide resolved
return err
}

if backoffDelay > throttle {
delay = backoffDelay
} else {
delay = throttle
paivagustavo marked this conversation as resolved.
Show resolved Hide resolved
}

// back-off, but get interrupted when shutting down or request is cancelled or timed out.
select {
case <-ctx.Done():
return ctx.Err()
case <-c.stopCh:
return fmt.Errorf("interrupted due to shutdown %w", err)
paivagustavo marked this conversation as resolved.
Show resolved Hide resolved
case <-time.After(delay):
paivagustavo marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

func shouldRetry(code codes.Code) bool {
switch code {
case codes.OK:
// Success. This function should not be called for this code, the best we
// can do is tell the caller not to retry.
return false

case codes.Canceled,
codes.DeadlineExceeded,
codes.ResourceExhausted,
codes.Aborted,
codes.OutOfRange,
codes.Unavailable,
codes.DataLoss:
// These are retryable errors.
return true
paivagustavo marked this conversation as resolved.
Show resolved Hide resolved

case codes.Unknown,
codes.InvalidArgument,
codes.Unauthenticated,
codes.PermissionDenied,
codes.NotFound,
codes.AlreadyExists,
codes.FailedPrecondition,
codes.Unimplemented,
codes.Internal:
// These are fatal errors, don't retry.
return false

default:
// Don't retry on unknown codes.
return false
}
}

func getThrottleDuration(status *status.Status) time.Duration {
// See if throttling information is available.
for _, detail := range status.Details() {
if t, ok := detail.(*errdetails.RetryInfo); ok {
if t.RetryDelay.Seconds > 0 || t.RetryDelay.Nanos > 0 {
// We are throttled. Wait before retrying as requested by the server.
return time.Duration(t.RetryDelay.Seconds)*time.Second + time.Duration(t.RetryDelay.Nanos)*time.Nanosecond
}
return 0
}
}
return 0
}

func newExponentialBackoff(rs otlp.RetrySettings) *backoff.ExponentialBackOff {
// Do not use NewExponentialBackOff since it calls Reset and the code here must
// call Reset after changing the InitialInterval (this saves an unnecessary call to Now).
expBackoff := &backoff.ExponentialBackOff{
InitialInterval: rs.InitialInterval,
RandomizationFactor: backoff.DefaultRandomizationFactor,
Multiplier: backoff.DefaultMultiplier,
MaxInterval: rs.MaxInterval,
MaxElapsedTime: rs.MaxElapsedTime,
Stop: backoff.Stop,
Clock: backoff.SystemClock,
}
expBackoff.Reset()

return expBackoff
}