Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sidecar Flag: Random delay before upload blocks on sidecar #7239

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
2 changes: 1 addition & 1 deletion cmd/thanos/sidecar.go
Expand Up @@ -362,7 +362,7 @@ func runSidecar(
s := shipper.New(logger, reg, conf.tsdb.path, bkt, m.Labels, metadata.SidecarSource,
uploadCompactedFunc, conf.shipper.allowOutOfOrderUpload, metadata.HashFunc(conf.shipper.hashFunc), conf.shipper.metaFileName)

return runutil.Repeat(30*time.Second, ctx.Done(), func() error {
return runutil.RepeatWithJitter(30*time.Second, ctx, 0.2, func() error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this really help though if we are jittering only 6s; at least for us block uploads can take far longer

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can increase the jitter. What value do you think that makes sense?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know, the whole method feels kinda weird. It was about in house blob storage and blocks being uploaded at same time right?

if uploaded, err := s.Sync(ctx); err != nil {
level.Warn(logger).Log("err", err, "uploaded", uploaded)
}
Expand Down
1 change: 1 addition & 0 deletions docs/components/sidecar.md
Expand Up @@ -228,6 +228,7 @@ Flags:
configuration. See format details:
https://thanos.io/tip/thanos/tracing.md/#configuration
--tsdb.path="./data" Data directory of TSDB.
--upload-jitter=0s Maximum random delay before uploading blocks.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to fix docs

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also it looks like this flag is never consumed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also added nowhere

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yess will remove it

--version Show application version.

```
Expand Down
33 changes: 32 additions & 1 deletion pkg/runutil/runutil.go
Expand Up @@ -12,7 +12,10 @@
// err := runutil.Repeat(10*time.Second, stopc, func() error {
// // ...
// })
//
//err := runutil.RepeatWithJitter(30*time.Second, ctx, 0.05, func() error {
// Your code here
//})

// Retry starts executing closure function f until no error is returned from f:
//
// err := runutil.Retry(10*time.Second, stopc, func() error {
Expand Down Expand Up @@ -50,8 +53,10 @@
package runutil

import (
"context"
"fmt"
"io"
"math/rand"
"os"
"path/filepath"
"strings"
Expand Down Expand Up @@ -99,6 +104,32 @@ func Repeat(interval time.Duration, stopc <-chan struct{}, f func() error) error
}
}

// RepeatWithJitter executes f with a random jitter added to the interval between each execution.
// It continues until ctx is done or f returns an error.
// The jitter factor should be between 0 and 1, where 0 means no jitter and 1 means the interval can vary from 0 to 2 times the original interval.
func RepeatWithJitter(interval time.Duration, ctx context.Context, jitterFactor float64, f func() error) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ctx should be always first parameter. https://pkg.go.dev/context#pkg-overview

for {
select {
case <-ctx.Done():
return nil
default:
if err := f(); err != nil {
return err
}

jitter := time.Duration(float64(interval) * jitterFactor)

jitteredInterval := interval + time.Duration(rand.Float64()*float64(jitter))

select {
case <-ctx.Done():
return nil
case <-time.After(jitteredInterval):
}
}
}
}

// Retry executes f every interval seconds until timeout or no error is returned from f.
func Retry(interval time.Duration, stopc <-chan struct{}, f func() error) error {
return RetryWithLog(log.NewNopLogger(), interval, stopc, f)
Expand Down