Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sidecar Flag: Random delay before upload blocks on sidecar #7239

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
1 change: 1 addition & 0 deletions cmd/thanos/config.go
Expand Up @@ -164,6 +164,7 @@ type shipperConfig struct {
allowOutOfOrderUpload bool
hashFunc string
metaFileName string
uploadJitter time.Duration
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this if not needed

}

func (sc *shipperConfig) registerFlag(cmd extkingpin.FlagClause) *shipperConfig {
Expand Down
17 changes: 16 additions & 1 deletion cmd/thanos/sidecar.go
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"fmt"
"math"
"math/rand"
"net/http"
"net/url"
"sync"
Expand Down Expand Up @@ -105,6 +106,18 @@ func registerSidecar(app *extkingpin.App) {
})
}

// DurationWithJitter returns random duration from "input - input*variancePerc" to "input + input*variancePerc" interval.
func DurationWithJitter(input time.Duration, variancePerc float64) time.Duration {
if input == 0 {
return 0
}

variance := int64(float64(input) * variancePerc)
jitter := rand.Int63n(variance*2) - variance

return input + time.Duration(jitter)
}

func runSidecar(
g *run.Group,
logger log.Logger,
Expand Down Expand Up @@ -362,7 +375,7 @@ func runSidecar(
s := shipper.New(logger, reg, conf.tsdb.path, bkt, m.Labels, metadata.SidecarSource,
uploadCompactedFunc, conf.shipper.allowOutOfOrderUpload, metadata.HashFunc(conf.shipper.hashFunc), conf.shipper.metaFileName)

return runutil.Repeat(30*time.Second, ctx.Done(), func() error {
return runutil.RepeatWithJitter(30*time.Second, ctx, 0.05, func() error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

0.05 variance seems too small as it is only 30s * 0.05 = 1.5s?
I am thinking about 0.2 here maybe

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolved all !

if uploaded, err := s.Sync(ctx); err != nil {
level.Warn(logger).Log("err", err, "uploaded", uploaded)
}
Expand Down Expand Up @@ -518,4 +531,6 @@ func (sc *sidecarConfig) registerFlag(cmd extkingpin.FlagClause) {
sc.storeRateLimits.RegisterFlags(cmd)
cmd.Flag("min-time", "Start of time range limit to serve. Thanos sidecar will serve only metrics, which happened later than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
Default("0000-01-01T00:00:00Z").SetValue(&sc.limitMinTime)
conf := &sidecarConfig{}
cmd.Flag("upload-jitter", "Maximum random delay before uploading blocks.").Default("0s").DurationVar(&conf.shipper.uploadJitter)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's remove this if it is not used anymore

}
1 change: 1 addition & 0 deletions docs/components/sidecar.md
Expand Up @@ -228,6 +228,7 @@ Flags:
configuration. See format details:
https://thanos.io/tip/thanos/tracing.md/#configuration
--tsdb.path="./data" Data directory of TSDB.
--upload-jitter=0s Maximum random delay before uploading blocks.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to fix docs

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also it looks like this flag is never consumed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also added nowhere

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yess will remove it

--version Show application version.
```
Expand Down
33 changes: 32 additions & 1 deletion pkg/runutil/runutil.go
Expand Up @@ -12,7 +12,10 @@
// err := runutil.Repeat(10*time.Second, stopc, func() error {
// // ...
// })
//
//err := runutil.RepeatWithJitter(30*time.Second, ctx, 0.05, func() error {
// Your code here
//})

// Retry starts executing closure function f until no error is returned from f:
//
// err := runutil.Retry(10*time.Second, stopc, func() error {
Expand Down Expand Up @@ -50,8 +53,10 @@
package runutil

import (
"context"
"fmt"
"io"
"math/rand"
"os"
"path/filepath"
"strings"
Expand Down Expand Up @@ -99,6 +104,32 @@ func Repeat(interval time.Duration, stopc <-chan struct{}, f func() error) error
}
}

// RepeatWithJitter executes f with a random jitter added to the interval between each execution.
// It continues until ctx is done or f returns an error.
// The jitter factor should be between 0 and 1, where 0 means no jitter and 1 means the interval can vary from 0 to 2 times the original interval.
func RepeatWithJitter(interval time.Duration, ctx context.Context, jitterFactor float64, f func() error) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ctx should be always first parameter. https://pkg.go.dev/context#pkg-overview

for {
select {
case <-ctx.Done():
return nil
default:
if err := f(); err != nil {
return err
}

jitter := time.Duration(float64(interval) * jitterFactor)

jitteredInterval := interval + time.Duration(rand.Float64()*float64(jitter))

select {
case <-ctx.Done():
return nil
case <-time.After(jitteredInterval):
}
}
}
}

// Retry executes f every interval seconds until timeout or no error is returned from f.
func Retry(interval time.Duration, stopc <-chan struct{}, f func() error) error {
return RetryWithLog(log.NewNopLogger(), interval, stopc, f)
Expand Down