Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sidecar Flag: Random delay before upload blocks on sidecar #7239

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
2 changes: 1 addition & 1 deletion cmd/thanos/sidecar.go
Expand Up @@ -362,7 +362,7 @@ func runSidecar(
s := shipper.New(logger, reg, conf.tsdb.path, bkt, m.Labels, metadata.SidecarSource,
uploadCompactedFunc, conf.shipper.allowOutOfOrderUpload, metadata.HashFunc(conf.shipper.hashFunc), conf.shipper.metaFileName)

return runutil.Repeat(30*time.Second, ctx.Done(), func() error {
return runutil.RepeatWithJitter(ctx, 30*time.Second, 0.2, func() error {
if uploaded, err := s.Sync(ctx); err != nil {
level.Warn(logger).Log("err", err, "uploaded", uploaded)
}
Expand Down
33 changes: 32 additions & 1 deletion pkg/runutil/runutil.go
Expand Up @@ -12,7 +12,10 @@
// err := runutil.Repeat(10*time.Second, stopc, func() error {
// // ...
// })
//
//err := runutil.RepeatWithJitter(30*time.Second, ctx, 0.05, func() error {
// Your code here
//})

// Retry starts executing closure function f until no error is returned from f:
//
// err := runutil.Retry(10*time.Second, stopc, func() error {
Expand Down Expand Up @@ -50,8 +53,10 @@
package runutil

import (
"context"
"fmt"
"io"
"math/rand"
"os"
"path/filepath"
"strings"
Expand Down Expand Up @@ -99,6 +104,32 @@ func Repeat(interval time.Duration, stopc <-chan struct{}, f func() error) error
}
}

// RepeatWithJitter executes f with a random jitter added to the interval between each execution.
// It continues until ctx is done or f returns an error.
// The jitter factor should be between 0 and 1, where 0 means no jitter and 1 means the interval can vary from 0 to 2 times the original interval.
func RepeatWithJitter(ctx context.Context, interval time.Duration, jitterFactor float64, f func() error) error {
for {
select {
case <-ctx.Done():
return nil
default:
if err := f(); err != nil {
return err
}

jitter := time.Duration(float64(interval) * jitterFactor)

jitteredInterval := interval + time.Duration(rand.Float64()*float64(jitter))

select {
case <-ctx.Done():
return nil
case <-time.After(jitteredInterval):
}
}
}
}

// Retry executes f every interval seconds until timeout or no error is returned from f.
func Retry(interval time.Duration, stopc <-chan struct{}, f func() error) error {
return RetryWithLog(log.NewNopLogger(), interval, stopc, f)
Expand Down