Skip to content

Commit

Permalink
otlploghttp: Generate internal/retry (#5165)
Browse files Browse the repository at this point in the history
  • Loading branch information
MrAlias committed Apr 8, 2024
1 parent d66a661 commit 014c6fc
Show file tree
Hide file tree
Showing 6 changed files with 408 additions and 3 deletions.
6 changes: 3 additions & 3 deletions exporters/otlp/otlplog/otlploghttp/config.go
Expand Up @@ -8,6 +8,8 @@ import (
"net/http"
"net/url"
"time"

"go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp/internal/retry"
)

// Option applies an option to the Exporter.
Expand Down Expand Up @@ -165,9 +167,7 @@ func WithTimeout(duration time.Duration) Option {

// RetryConfig defines configuration for retrying the export of log data that
// failed.
type RetryConfig struct {
// TODO: implement.
}
type RetryConfig retry.Config

// WithRetry sets the retry policy for transient retryable errors that are
// returned by the target endpoint.
Expand Down
1 change: 1 addition & 0 deletions exporters/otlp/otlplog/otlploghttp/go.mod
Expand Up @@ -3,6 +3,7 @@ module go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp
go 1.21

require (
github.com/cenkalti/backoff/v4 v4.3.0
github.com/stretchr/testify v1.9.0
go.opentelemetry.io/otel/sdk/log v0.0.0-20240403115316-6c6e1e7416e9
)
Expand Down
2 changes: 2 additions & 0 deletions exporters/otlp/otlplog/otlploghttp/go.sum
@@ -1,3 +1,5 @@
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
Expand Down
7 changes: 7 additions & 0 deletions exporters/otlp/otlplog/otlploghttp/internal/gen.go
@@ -0,0 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package internal // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp/internal"

//go:generate gotmpl --body=../../../../../internal/shared/otlp/retry/retry.go.tmpl "--data={}" --out=retry/retry.go
//go:generate gotmpl --body=../../../../../internal/shared/otlp/retry/retry_test.go.tmpl "--data={}" --out=retry/retry_test.go
145 changes: 145 additions & 0 deletions exporters/otlp/otlplog/otlploghttp/internal/retry/retry.go
@@ -0,0 +1,145 @@
// Code created by gotmpl. DO NOT MODIFY.
// source: internal/shared/otlp/retry/retry.go.tmpl

// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

// Package retry provides request retry functionality that can perform
// configurable exponential backoff for transient errors and honor any
// explicit throttle responses received.
package retry // import "go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp/internal/retry"

import (
"context"
"fmt"
"time"

"github.com/cenkalti/backoff/v4"
)

// DefaultConfig are the recommended defaults to use.
var DefaultConfig = Config{
Enabled: true,
InitialInterval: 5 * time.Second,
MaxInterval: 30 * time.Second,
MaxElapsedTime: time.Minute,
}

// Config defines configuration for retrying batches in case of export failure
// using an exponential backoff.
type Config struct {
// Enabled indicates whether to not retry sending batches in case of
// export failure.
Enabled bool
// InitialInterval the time to wait after the first failure before
// retrying.
InitialInterval time.Duration
// MaxInterval is the upper bound on backoff interval. Once this value is
// reached the delay between consecutive retries will always be
// `MaxInterval`.
MaxInterval time.Duration
// MaxElapsedTime is the maximum amount of time (including retries) spent
// trying to send a request/batch. Once this value is reached, the data
// is discarded.
MaxElapsedTime time.Duration
}

// RequestFunc wraps a request with retry logic.
type RequestFunc func(context.Context, func(context.Context) error) error

// EvaluateFunc returns if an error is retry-able and if an explicit throttle
// duration should be honored that was included in the error.
//
// The function must return true if the error argument is retry-able,
// otherwise it must return false for the first return parameter.
//
// The function must return a non-zero time.Duration if the error contains
// explicit throttle duration that should be honored, otherwise it must return
// a zero valued time.Duration.
type EvaluateFunc func(error) (bool, time.Duration)

// RequestFunc returns a RequestFunc using the evaluate function to determine
// if requests can be retried and based on the exponential backoff
// configuration of c.
func (c Config) RequestFunc(evaluate EvaluateFunc) RequestFunc {
if !c.Enabled {
return func(ctx context.Context, fn func(context.Context) error) error {
return fn(ctx)
}
}

return func(ctx context.Context, fn func(context.Context) error) error {
// Do not use NewExponentialBackOff since it calls Reset and the code here
// must call Reset after changing the InitialInterval (this saves an
// unnecessary call to Now).
b := &backoff.ExponentialBackOff{
InitialInterval: c.InitialInterval,
RandomizationFactor: backoff.DefaultRandomizationFactor,
Multiplier: backoff.DefaultMultiplier,
MaxInterval: c.MaxInterval,
MaxElapsedTime: c.MaxElapsedTime,
Stop: backoff.Stop,
Clock: backoff.SystemClock,
}
b.Reset()

for {
err := fn(ctx)
if err == nil {
return nil
}

retryable, throttle := evaluate(err)
if !retryable {
return err
}

bOff := b.NextBackOff()
if bOff == backoff.Stop {
return fmt.Errorf("max retry time elapsed: %w", err)
}

// Wait for the greater of the backoff or throttle delay.
var delay time.Duration
if bOff > throttle {
delay = bOff
} else {
elapsed := b.GetElapsedTime()
if b.MaxElapsedTime != 0 && elapsed+throttle > b.MaxElapsedTime {
return fmt.Errorf("max retry time would elapse: %w", err)
}
delay = throttle
}

if ctxErr := waitFunc(ctx, delay); ctxErr != nil {
return fmt.Errorf("%w: %s", ctxErr, err)
}
}
}
}

// Allow override for testing.
var waitFunc = wait

// wait takes the caller's context, and the amount of time to wait. It will
// return nil if the timer fires before or at the same time as the context's
// deadline. This indicates that the call can be retried.
func wait(ctx context.Context, delay time.Duration) error {
timer := time.NewTimer(delay)
defer timer.Stop()

select {
case <-ctx.Done():
// Handle the case where the timer and context deadline end
// simultaneously by prioritizing the timer expiration nil value
// response.
select {
case <-timer.C:
default:
return ctx.Err()
}
case <-timer.C:
}

return nil
}

0 comments on commit 014c6fc

Please sign in to comment.