From 014c6fc3323554065532b2b260945a9d0cd62b74 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Mon, 8 Apr 2024 01:19:44 -0700 Subject: [PATCH] otlploghttp: Generate internal/retry (#5165) --- exporters/otlp/otlplog/otlploghttp/config.go | 6 +- exporters/otlp/otlplog/otlploghttp/go.mod | 1 + exporters/otlp/otlplog/otlploghttp/go.sum | 2 + .../otlp/otlplog/otlploghttp/internal/gen.go | 7 + .../otlploghttp/internal/retry/retry.go | 145 ++++++++++ .../otlploghttp/internal/retry/retry_test.go | 250 ++++++++++++++++++ 6 files changed, 408 insertions(+), 3 deletions(-) create mode 100644 exporters/otlp/otlplog/otlploghttp/internal/gen.go create mode 100644 exporters/otlp/otlplog/otlploghttp/internal/retry/retry.go create mode 100644 exporters/otlp/otlplog/otlploghttp/internal/retry/retry_test.go diff --git a/exporters/otlp/otlplog/otlploghttp/config.go b/exporters/otlp/otlplog/otlploghttp/config.go index b15a92122af..c94b9cb61e1 100644 --- a/exporters/otlp/otlplog/otlploghttp/config.go +++ b/exporters/otlp/otlplog/otlploghttp/config.go @@ -8,6 +8,8 @@ import ( "net/http" "net/url" "time" + + "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp/internal/retry" ) // Option applies an option to the Exporter. @@ -165,9 +167,7 @@ func WithTimeout(duration time.Duration) Option { // RetryConfig defines configuration for retrying the export of log data that // failed. -type RetryConfig struct { - // TODO: implement. -} +type RetryConfig retry.Config // WithRetry sets the retry policy for transient retryable errors that are // returned by the target endpoint. diff --git a/exporters/otlp/otlplog/otlploghttp/go.mod b/exporters/otlp/otlplog/otlploghttp/go.mod index ab51702ddae..b4c2ed1e4d1 100644 --- a/exporters/otlp/otlplog/otlploghttp/go.mod +++ b/exporters/otlp/otlplog/otlploghttp/go.mod @@ -3,6 +3,7 @@ module go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp go 1.21 require ( + github.com/cenkalti/backoff/v4 v4.3.0 github.com/stretchr/testify v1.9.0 go.opentelemetry.io/otel/sdk/log v0.0.0-20240403115316-6c6e1e7416e9 ) diff --git a/exporters/otlp/otlplog/otlploghttp/go.sum b/exporters/otlp/otlplog/otlploghttp/go.sum index e973beecd7a..feb40a55ca0 100644 --- a/exporters/otlp/otlplog/otlploghttp/go.sum +++ b/exporters/otlp/otlplog/otlploghttp/go.sum @@ -1,3 +1,5 @@ +github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= +github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= diff --git a/exporters/otlp/otlplog/otlploghttp/internal/gen.go b/exporters/otlp/otlplog/otlploghttp/internal/gen.go new file mode 100644 index 00000000000..3906a5bd632 --- /dev/null +++ b/exporters/otlp/otlplog/otlploghttp/internal/gen.go @@ -0,0 +1,7 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package internal // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp/internal" + +//go:generate gotmpl --body=../../../../../internal/shared/otlp/retry/retry.go.tmpl "--data={}" --out=retry/retry.go +//go:generate gotmpl --body=../../../../../internal/shared/otlp/retry/retry_test.go.tmpl "--data={}" --out=retry/retry_test.go diff --git a/exporters/otlp/otlplog/otlploghttp/internal/retry/retry.go b/exporters/otlp/otlplog/otlploghttp/internal/retry/retry.go new file mode 100644 index 00000000000..dcd31893c65 --- /dev/null +++ b/exporters/otlp/otlplog/otlploghttp/internal/retry/retry.go @@ -0,0 +1,145 @@ +// Code created by gotmpl. DO NOT MODIFY. +// source: internal/shared/otlp/retry/retry.go.tmpl + +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +// Package retry provides request retry functionality that can perform +// configurable exponential backoff for transient errors and honor any +// explicit throttle responses received. +package retry // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp/internal/retry" + +import ( + "context" + "fmt" + "time" + + "github.com/cenkalti/backoff/v4" +) + +// DefaultConfig are the recommended defaults to use. +var DefaultConfig = Config{ + Enabled: true, + InitialInterval: 5 * time.Second, + MaxInterval: 30 * time.Second, + MaxElapsedTime: time.Minute, +} + +// Config defines configuration for retrying batches in case of export failure +// using an exponential backoff. +type Config struct { + // Enabled indicates whether to not retry sending batches in case of + // export failure. + Enabled bool + // InitialInterval the time to wait after the first failure before + // retrying. + InitialInterval time.Duration + // MaxInterval is the upper bound on backoff interval. Once this value is + // reached the delay between consecutive retries will always be + // `MaxInterval`. + MaxInterval time.Duration + // MaxElapsedTime is the maximum amount of time (including retries) spent + // trying to send a request/batch. Once this value is reached, the data + // is discarded. + MaxElapsedTime time.Duration +} + +// RequestFunc wraps a request with retry logic. +type RequestFunc func(context.Context, func(context.Context) error) error + +// EvaluateFunc returns if an error is retry-able and if an explicit throttle +// duration should be honored that was included in the error. +// +// The function must return true if the error argument is retry-able, +// otherwise it must return false for the first return parameter. +// +// The function must return a non-zero time.Duration if the error contains +// explicit throttle duration that should be honored, otherwise it must return +// a zero valued time.Duration. +type EvaluateFunc func(error) (bool, time.Duration) + +// RequestFunc returns a RequestFunc using the evaluate function to determine +// if requests can be retried and based on the exponential backoff +// configuration of c. +func (c Config) RequestFunc(evaluate EvaluateFunc) RequestFunc { + if !c.Enabled { + return func(ctx context.Context, fn func(context.Context) error) error { + return fn(ctx) + } + } + + return func(ctx context.Context, fn func(context.Context) error) error { + // Do not use NewExponentialBackOff since it calls Reset and the code here + // must call Reset after changing the InitialInterval (this saves an + // unnecessary call to Now). + b := &backoff.ExponentialBackOff{ + InitialInterval: c.InitialInterval, + RandomizationFactor: backoff.DefaultRandomizationFactor, + Multiplier: backoff.DefaultMultiplier, + MaxInterval: c.MaxInterval, + MaxElapsedTime: c.MaxElapsedTime, + Stop: backoff.Stop, + Clock: backoff.SystemClock, + } + b.Reset() + + for { + err := fn(ctx) + if err == nil { + return nil + } + + retryable, throttle := evaluate(err) + if !retryable { + return err + } + + bOff := b.NextBackOff() + if bOff == backoff.Stop { + return fmt.Errorf("max retry time elapsed: %w", err) + } + + // Wait for the greater of the backoff or throttle delay. + var delay time.Duration + if bOff > throttle { + delay = bOff + } else { + elapsed := b.GetElapsedTime() + if b.MaxElapsedTime != 0 && elapsed+throttle > b.MaxElapsedTime { + return fmt.Errorf("max retry time would elapse: %w", err) + } + delay = throttle + } + + if ctxErr := waitFunc(ctx, delay); ctxErr != nil { + return fmt.Errorf("%w: %s", ctxErr, err) + } + } + } +} + +// Allow override for testing. +var waitFunc = wait + +// wait takes the caller's context, and the amount of time to wait. It will +// return nil if the timer fires before or at the same time as the context's +// deadline. This indicates that the call can be retried. +func wait(ctx context.Context, delay time.Duration) error { + timer := time.NewTimer(delay) + defer timer.Stop() + + select { + case <-ctx.Done(): + // Handle the case where the timer and context deadline end + // simultaneously by prioritizing the timer expiration nil value + // response. + select { + case <-timer.C: + default: + return ctx.Err() + } + case <-timer.C: + } + + return nil +} diff --git a/exporters/otlp/otlplog/otlploghttp/internal/retry/retry_test.go b/exporters/otlp/otlplog/otlploghttp/internal/retry/retry_test.go new file mode 100644 index 00000000000..b48dde62359 --- /dev/null +++ b/exporters/otlp/otlplog/otlploghttp/internal/retry/retry_test.go @@ -0,0 +1,250 @@ +// Code created by gotmpl. DO NOT MODIFY. +// source: internal/shared/otlp/retry/retry_test.go.tmpl + +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package retry + +import ( + "context" + "errors" + "math" + "sync" + "testing" + "time" + + "github.com/cenkalti/backoff/v4" + "github.com/stretchr/testify/assert" +) + +func TestWait(t *testing.T) { + tests := []struct { + ctx context.Context + delay time.Duration + expected error + }{ + { + ctx: context.Background(), + delay: time.Duration(0), + }, + { + ctx: context.Background(), + delay: time.Duration(1), + }, + { + ctx: context.Background(), + delay: time.Duration(-1), + }, + { + ctx: func() context.Context { + ctx, cancel := context.WithCancel(context.Background()) + cancel() + return ctx + }(), + // Ensure the timer and context do not end simultaneously. + delay: 1 * time.Hour, + expected: context.Canceled, + }, + } + + for _, test := range tests { + err := wait(test.ctx, test.delay) + if test.expected == nil { + assert.NoError(t, err) + } else { + assert.ErrorIs(t, err, test.expected) + } + } +} + +func TestNonRetryableError(t *testing.T) { + ev := func(error) (bool, time.Duration) { return false, 0 } + + reqFunc := Config{ + Enabled: true, + InitialInterval: 1 * time.Nanosecond, + MaxInterval: 1 * time.Nanosecond, + // Never stop retrying. + MaxElapsedTime: 0, + }.RequestFunc(ev) + ctx := context.Background() + assert.NoError(t, reqFunc(ctx, func(context.Context) error { + return nil + })) + assert.ErrorIs(t, reqFunc(ctx, func(context.Context) error { + return assert.AnError + }), assert.AnError) +} + +func TestThrottledRetry(t *testing.T) { + // Ensure the throttle delay is used by making longer than backoff delay. + throttleDelay, backoffDelay := time.Second, time.Nanosecond + + ev := func(error) (bool, time.Duration) { + // Retry everything with a throttle delay. + return true, throttleDelay + } + + reqFunc := Config{ + Enabled: true, + InitialInterval: backoffDelay, + MaxInterval: backoffDelay, + // Never stop retrying. + MaxElapsedTime: 0, + }.RequestFunc(ev) + + origWait := waitFunc + var done bool + waitFunc = func(_ context.Context, delay time.Duration) error { + assert.Equal(t, throttleDelay, delay, "retry not throttled") + // Try twice to ensure call is attempted again after delay. + if done { + return assert.AnError + } + done = true + return nil + } + defer func() { waitFunc = origWait }() + + ctx := context.Background() + assert.ErrorIs(t, reqFunc(ctx, func(context.Context) error { + return errors.New("not this error") + }), assert.AnError) +} + +func TestBackoffRetry(t *testing.T) { + ev := func(error) (bool, time.Duration) { return true, 0 } + + delay := time.Nanosecond + reqFunc := Config{ + Enabled: true, + InitialInterval: delay, + MaxInterval: delay, + // Never stop retrying. + MaxElapsedTime: 0, + }.RequestFunc(ev) + + origWait := waitFunc + var done bool + waitFunc = func(_ context.Context, d time.Duration) error { + delta := math.Ceil(float64(delay) * backoff.DefaultRandomizationFactor) + assert.InDelta(t, delay, d, delta, "retry not backoffed") + // Try twice to ensure call is attempted again after delay. + if done { + return assert.AnError + } + done = true + return nil + } + t.Cleanup(func() { waitFunc = origWait }) + + ctx := context.Background() + assert.ErrorIs(t, reqFunc(ctx, func(context.Context) error { + return errors.New("not this error") + }), assert.AnError) +} + +func TestBackoffRetryCanceledContext(t *testing.T) { + ev := func(error) (bool, time.Duration) { return true, 0 } + + delay := time.Millisecond + reqFunc := Config{ + Enabled: true, + InitialInterval: delay, + MaxInterval: delay, + // Never stop retrying. + MaxElapsedTime: 10 * time.Millisecond, + }.RequestFunc(ev) + + ctx, cancel := context.WithCancel(context.Background()) + count := 0 + cancel() + err := reqFunc(ctx, func(context.Context) error { + count++ + return assert.AnError + }) + + assert.ErrorIs(t, err, context.Canceled) + assert.Contains(t, err.Error(), assert.AnError.Error()) + assert.Equal(t, 1, count) +} + +func TestThrottledRetryGreaterThanMaxElapsedTime(t *testing.T) { + // Ensure the throttle delay is used by making longer than backoff delay. + tDelay, bDelay := time.Hour, time.Nanosecond + ev := func(error) (bool, time.Duration) { return true, tDelay } + reqFunc := Config{ + Enabled: true, + InitialInterval: bDelay, + MaxInterval: bDelay, + MaxElapsedTime: tDelay - (time.Nanosecond), + }.RequestFunc(ev) + + ctx := context.Background() + assert.Contains(t, reqFunc(ctx, func(context.Context) error { + return assert.AnError + }).Error(), "max retry time would elapse: ") +} + +func TestMaxElapsedTime(t *testing.T) { + ev := func(error) (bool, time.Duration) { return true, 0 } + delay := time.Nanosecond + reqFunc := Config{ + Enabled: true, + // InitialInterval > MaxElapsedTime means immediate return. + InitialInterval: 2 * delay, + MaxElapsedTime: delay, + }.RequestFunc(ev) + + ctx := context.Background() + assert.Contains(t, reqFunc(ctx, func(context.Context) error { + return assert.AnError + }).Error(), "max retry time elapsed: ") +} + +func TestRetryNotEnabled(t *testing.T) { + ev := func(error) (bool, time.Duration) { + t.Error("evaluated retry when not enabled") + return false, 0 + } + + reqFunc := Config{}.RequestFunc(ev) + ctx := context.Background() + assert.NoError(t, reqFunc(ctx, func(context.Context) error { + return nil + })) + assert.ErrorIs(t, reqFunc(ctx, func(context.Context) error { + return assert.AnError + }), assert.AnError) +} + +func TestRetryConcurrentSafe(t *testing.T) { + ev := func(error) (bool, time.Duration) { return true, 0 } + reqFunc := Config{ + Enabled: true, + }.RequestFunc(ev) + + var wg sync.WaitGroup + ctx := context.Background() + + for i := 1; i < 5; i++ { + wg.Add(1) + + go func() { + defer wg.Done() + + var done bool + assert.NoError(t, reqFunc(ctx, func(context.Context) error { + if !done { + done = true + return assert.AnError + } + + return nil + })) + }() + } + + wg.Wait() +}