Skip to content

Commit

Permalink
Implement retry policy for the OTLP/gRPC exporter (#1832)
Browse files Browse the repository at this point in the history
This was heavily inspired by the retry policy from the https://github.com/open-telemetry/opentelemetry-collector code.

Co-authored-by: Tyler Yahn <MrAlias@users.noreply.github.com>
  • Loading branch information
paivagustavo and MrAlias committed Apr 29, 2021
1 parent ec75390 commit f92a6d8
Show file tree
Hide file tree
Showing 14 changed files with 646 additions and 71 deletions.
12 changes: 12 additions & 0 deletions CHANGELOG.md
Expand Up @@ -10,6 +10,18 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

### Added

- Adds `otlpgrpc.WithRetry`option for configuring the retry policy for transient errors on the otlp/gRPC exporter. (#1832)
- The following status codes are defined as transient errors:
| gRPC Status Code | Description |
| ---------------- | ----------- |
| 1 | Cancelled |
| 4 | Deadline Exceeded |
| 8 | Resource Exhausted |
| 10 | Aborted |
| 10 | Out of Range |
| 14 | Unavailable |
| 15 | Data Loss |

### Changed

- Make `NewSplitDriver` from `go.opentelemetry.io/otel/exporters/otlp` take variadic arguments instead of a `SplitConfig` item.
Expand Down
2 changes: 2 additions & 0 deletions example/otel-collector/go.sum
Expand Up @@ -4,6 +4,8 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/benbjohnson/clock v1.0.3 h1:vkLuvpK4fmtSCuo60+yC63p7y0BmQ8gm5ZXGuBCJyXg=
github.com/benbjohnson/clock v1.0.3/go.mod h1:bGMdMPoPVvcYyt1gHDf4J2KE153Yf9BuiUKYMaxlTDM=
github.com/cenkalti/backoff/v4 v4.1.0 h1:c8LkOFQTzuO0WBM/ae5HdGQuZPfPxp7lqBRwQRm4fSc=
github.com/cenkalti/backoff/v4 v4.1.0/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
Expand Down
3 changes: 3 additions & 0 deletions example/prom-collector/go.sum
Expand Up @@ -29,7 +29,10 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n9yuLkIJQ=
github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4=
github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
github.com/cenkalti/backoff/v4 v4.1.0 h1:c8LkOFQTzuO0WBM/ae5HdGQuZPfPxp7lqBRwQRm4fSc=
github.com/cenkalti/backoff/v4 v4.1.0/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
Expand Down
18 changes: 1 addition & 17 deletions exporters/otlp/README.md
Expand Up @@ -4,28 +4,12 @@

This exporter exports OpenTelemetry spans and metrics to the OpenTelemetry Collector.


## Installation and Setup

The exporter can be installed using standard `go` functionality.

```bash
$ go get -u go.opentelemetry.io/otel/exporters/otlp
go get -u go.opentelemetry.io/otel/exporters/otlp
```

A new exporter can be created using the `NewExporter` function.

## Retries

The exporter will not, by default, retry failed requests to the collector.
However, it is configured in a way that it can be easily enabled.

To enable retries, the `GRPC_GO_RETRY` environment variable needs to be set to `on`. For example,

```
GRPC_GO_RETRY=on go run .
```

The [default service config](https://github.com/grpc/proposal/blob/master/A6-client-retries.md) used by default is defined to retry failed requests with exponential backoff (`0.3seconds * (2)^retry`) with [a max of `5` retries](https://github.com/open-telemetry/oteps/blob/be2a3fcbaa417ebbf5845cd485d34fdf0ab4a2a4/text/0035-opentelemetry-protocol.md#export-response)).

These retries are only attempted for reponses that are [deemed "retry-able" by the collector](https://github.com/grpc/proposal/blob/master/A6-client-retries.md#validation-of-retrypolicy).
2 changes: 2 additions & 0 deletions exporters/otlp/go.mod
Expand Up @@ -8,6 +8,7 @@ replace (
)

require (
github.com/cenkalti/backoff/v4 v4.1.0
github.com/google/go-cmp v0.5.5
github.com/stretchr/testify v1.7.0
go.opentelemetry.io/otel v0.20.0
Expand All @@ -17,6 +18,7 @@ require (
go.opentelemetry.io/otel/sdk/metric v0.20.0
go.opentelemetry.io/otel/trace v0.20.0
go.opentelemetry.io/proto/otlp v0.7.0
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013
google.golang.org/grpc v1.37.0
google.golang.org/protobuf v1.26.0
)
Expand Down
2 changes: 2 additions & 0 deletions exporters/otlp/go.sum
Expand Up @@ -4,6 +4,8 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/benbjohnson/clock v1.0.3 h1:vkLuvpK4fmtSCuo60+yC63p7y0BmQ8gm5ZXGuBCJyXg=
github.com/benbjohnson/clock v1.0.3/go.mod h1:bGMdMPoPVvcYyt1gHDf4J2KE153Yf9BuiUKYMaxlTDM=
github.com/cenkalti/backoff/v4 v4.1.0 h1:c8LkOFQTzuO0WBM/ae5HdGQuZPfPxp7lqBRwQRm4fSc=
github.com/cenkalti/backoff/v4 v4.1.0/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
Expand Down
66 changes: 28 additions & 38 deletions exporters/otlp/internal/otlpconfig/options.go
Expand Up @@ -42,31 +42,16 @@ const (
// DefaultTimeout is a default max waiting time for the backend to process
// each span or metrics batch.
DefaultTimeout time.Duration = 10 * time.Second
// DefaultServiceConfig is the gRPC service config used if none is
// provided by the user.
DefaultServiceConfig = `{
"methodConfig":[{
"name":[
{ "service":"opentelemetry.proto.collector.metrics.v1.MetricsService" },
{ "service":"opentelemetry.proto.collector.trace.v1.TraceService" }
],
"retryPolicy":{
"MaxAttempts":5,
"InitialBackoff":"0.3s",
"MaxBackoff":"5s",
"BackoffMultiplier":2,
"RetryableStatusCodes":[
"CANCELLED",
"DEADLINE_EXCEEDED",
"RESOURCE_EXHAUSTED",
"ABORTED",
"OUT_OF_RANGE",
"UNAVAILABLE",
"DATA_LOSS"
]
}
}]
}`
)

var (
// defaultRetrySettings is a default settings for the retry policy.
defaultRetrySettings = otlp.RetrySettings{
Enabled: true,
InitialInterval: 5 * time.Second,
MaxInterval: 30 * time.Second,
MaxElapsedTime: time.Minute,
}
)

type (
Expand All @@ -88,17 +73,16 @@ type (
Metrics SignalConfig
Traces SignalConfig

// General configurations
// HTTP configurations
Marshaler otlp.Marshaler
MaxAttempts int
Backoff time.Duration

// HTTP configuration
Marshaler otlp.Marshaler

// gRPC configurations
ReconnectionPeriod time.Duration
ServiceConfig string
DialOptions []grpc.DialOption
RetrySettings otlp.RetrySettings
}
)

Expand All @@ -118,7 +102,7 @@ func NewDefaultConfig() Config {
},
MaxAttempts: DefaultMaxAttempts,
Backoff: DefaultBackoff,
ServiceConfig: DefaultServiceConfig,
RetrySettings: defaultRetrySettings,
}

return c
Expand Down Expand Up @@ -280,15 +264,9 @@ func WithMetricsURLPath(urlPath string) GenericOption {
})
}

func WithMaxAttempts(maxAttempts int) GenericOption {
return newGenericOption(func(cfg *Config) {
cfg.MaxAttempts = maxAttempts
})
}

func WithBackoff(duration time.Duration) GenericOption {
func WithRetry(settings otlp.RetrySettings) GenericOption {
return newGenericOption(func(cfg *Config) {
cfg.Backoff = duration
cfg.RetrySettings = settings
})
}

Expand Down Expand Up @@ -374,3 +352,15 @@ func WithMetricsTimeout(duration time.Duration) GenericOption {
cfg.Metrics.Timeout = duration
})
}

func WithMaxAttempts(maxAttempts int) GenericOption {
return newGenericOption(func(cfg *Config) {
cfg.MaxAttempts = maxAttempts
})
}

func WithBackoff(duration time.Duration) GenericOption {
return newGenericOption(func(cfg *Config) {
cfg.Backoff = duration
})
}
148 changes: 148 additions & 0 deletions exporters/otlp/otlpgrpc/connection.go
Expand Up @@ -16,12 +16,18 @@ package otlpgrpc

import (
"context"
"fmt"
"math/rand"
"sync"
"sync/atomic"
"time"
"unsafe"

"github.com/cenkalti/backoff/v4"
"google.golang.org/genproto/googleapis/rpc/errdetails"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"google.golang.org/grpc/encoding/gzip"

"go.opentelemetry.io/otel/exporters/otlp"
Expand Down Expand Up @@ -276,3 +282,145 @@ func (c *connection) contextWithStop(ctx context.Context) (context.Context, cont
}(ctx, cancel)
return ctx, cancel
}

func (c *connection) doRequest(ctx context.Context, fn func(context.Context) error) error {
expBackoff := newExponentialBackoff(c.cfg.RetrySettings)

for {
err := fn(ctx)
if err == nil {
// request succeeded.
return nil
}

if !c.cfg.RetrySettings.Enabled {
return err
}

// We have an error, check gRPC status code.
st := status.Convert(err)
if st.Code() == codes.OK {
// Not really an error, still success.
return nil
}

// Now, this is this a real error.

if !shouldRetry(st.Code()) {
// It is not a retryable error, we should not retry.
return err
}

// Need to retry.

throttle := getThrottleDuration(st)

backoffDelay := expBackoff.NextBackOff()
if backoffDelay == backoff.Stop {
// throw away the batch
err = fmt.Errorf("max elapsed time expired: %w", err)
return err
}

var delay time.Duration

if backoffDelay > throttle {
delay = backoffDelay
} else {
if expBackoff.GetElapsedTime()+throttle > expBackoff.MaxElapsedTime {
err = fmt.Errorf("max elapsed time expired when respecting server throttle: %w", err)
return err
}

// Respect server throttling.
delay = throttle
}

// back-off, but get interrupted when shutting down or request is cancelled or timed out.
err = func() error {
dt := time.NewTimer(delay)
defer dt.Stop()

select {
case <-ctx.Done():
return ctx.Err()
case <-c.stopCh:
return fmt.Errorf("interrupted due to shutdown: %w", err)
case <-dt.C:
}

return nil
}()

if err != nil {
return err
}

}
}

func shouldRetry(code codes.Code) bool {
switch code {
case codes.OK:
// Success. This function should not be called for this code, the best we
// can do is tell the caller not to retry.
return false

case codes.Canceled,
codes.DeadlineExceeded,
codes.ResourceExhausted,
codes.Aborted,
codes.OutOfRange,
codes.Unavailable,
codes.DataLoss:
// These are retryable errors.
return true

case codes.Unknown,
codes.InvalidArgument,
codes.Unauthenticated,
codes.PermissionDenied,
codes.NotFound,
codes.AlreadyExists,
codes.FailedPrecondition,
codes.Unimplemented,
codes.Internal:
// These are fatal errors, don't retry.
return false

default:
// Don't retry on unknown codes.
return false
}
}

func getThrottleDuration(status *status.Status) time.Duration {
// See if throttling information is available.
for _, detail := range status.Details() {
if t, ok := detail.(*errdetails.RetryInfo); ok {
if t.RetryDelay.Seconds > 0 || t.RetryDelay.Nanos > 0 {
// We are throttled. Wait before retrying as requested by the server.
return time.Duration(t.RetryDelay.Seconds)*time.Second + time.Duration(t.RetryDelay.Nanos)*time.Nanosecond
}
return 0
}
}
return 0
}

func newExponentialBackoff(rs otlp.RetrySettings) *backoff.ExponentialBackOff {
// Do not use NewExponentialBackOff since it calls Reset and the code here must
// call Reset after changing the InitialInterval (this saves an unnecessary call to Now).
expBackoff := &backoff.ExponentialBackOff{
InitialInterval: rs.InitialInterval,
RandomizationFactor: backoff.DefaultRandomizationFactor,
Multiplier: backoff.DefaultMultiplier,
MaxInterval: rs.MaxInterval,
MaxElapsedTime: rs.MaxElapsedTime,
Stop: backoff.Stop,
Clock: backoff.SystemClock,
}
expBackoff.Reset()

return expBackoff
}

0 comments on commit f92a6d8

Please sign in to comment.