Skip to content

Commit

Permalink
Run downsample tool continuously (#5218)
Browse files Browse the repository at this point in the history
* run downsample tool continuously

Signed-off-by: Ben Ye <ben.ye@bytedance.com>

* update changelog

Signed-off-by: Ben Ye <ben.ye@bytedance.com>
  • Loading branch information
Ben Ye committed Mar 8, 2022
1 parent e60ca7e commit 0c067eb
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 25 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Expand Up @@ -15,7 +15,8 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
### Changed

- [#5205](https://github.com/thanos-io/thanos/pull/5205) Rule: Add ruler labels as external labels in stateless ruler mode.
- [#5206](https://github.com/thanos-io/thanos/pull/5206) Cache: add timeout for groupcache's fetch operation
- [#5206](https://github.com/thanos-io/thanos/pull/5206) Cache: add timeout for groupcache's fetch operation.
- [#5218](https://github.com/thanos-io/thanos/pull/5218) Tools: Run bucket downsample tools continuously.

### Removed

Expand Down
48 changes: 25 additions & 23 deletions cmd/thanos/downsample.go
Expand Up @@ -69,6 +69,7 @@ func RunDownsample(
httpTLSConfig string,
httpGracePeriod time.Duration,
dataDir string,
waitInterval time.Duration,
downsampleConcurrency int,
objStoreConfig *extflag.PathOrContent,
comp component.Component,
Expand Down Expand Up @@ -113,31 +114,32 @@ func RunDownsample(
defer runutil.CloseWithLogOnErr(logger, bkt, "bucket client")
statusProber.Ready()

level.Info(logger).Log("msg", "start first pass of downsampling")
metas, _, err := metaFetcher.Fetch(ctx)
if err != nil {
return errors.Wrap(err, "sync before first pass of downsampling")
}

for _, meta := range metas {
groupKey := meta.Thanos.GroupKey()
metrics.downsamples.WithLabelValues(groupKey)
metrics.downsampleFailures.WithLabelValues(groupKey)
}
if err := downsampleBucket(ctx, logger, metrics, bkt, metas, dataDir, downsampleConcurrency, hashFunc); err != nil {
return errors.Wrap(err, "downsampling failed")
}
return runutil.Repeat(waitInterval, ctx.Done(), func() error {
level.Info(logger).Log("msg", "start first pass of downsampling")
metas, _, err := metaFetcher.Fetch(ctx)
if err != nil {
return errors.Wrap(err, "sync before first pass of downsampling")
}

level.Info(logger).Log("msg", "start second pass of downsampling")
metas, _, err = metaFetcher.Fetch(ctx)
if err != nil {
return errors.Wrap(err, "sync before second pass of downsampling")
}
if err := downsampleBucket(ctx, logger, metrics, bkt, metas, dataDir, downsampleConcurrency, hashFunc); err != nil {
return errors.Wrap(err, "downsampling failed")
}
for _, meta := range metas {
groupKey := meta.Thanos.GroupKey()
metrics.downsamples.WithLabelValues(groupKey)
metrics.downsampleFailures.WithLabelValues(groupKey)
}
if err := downsampleBucket(ctx, logger, metrics, bkt, metas, dataDir, downsampleConcurrency, hashFunc); err != nil {
return errors.Wrap(err, "downsampling failed")
}

return nil
level.Info(logger).Log("msg", "start second pass of downsampling")
metas, _, err = metaFetcher.Fetch(ctx)
if err != nil {
return errors.Wrap(err, "sync before second pass of downsampling")
}
if err := downsampleBucket(ctx, logger, metrics, bkt, metas, dataDir, downsampleConcurrency, hashFunc); err != nil {
return errors.Wrap(err, "downsampling failed")
}
return nil
})
}, func(error) {
cancel()
})
Expand Down
6 changes: 5 additions & 1 deletion cmd/thanos/tools_bucket.go
Expand Up @@ -129,6 +129,7 @@ type bucketReplicateConfig struct {
}

type bucketDownsampleConfig struct {
waitInterval time.Duration
downsampleConcurrency int
dataDir string
hashFunc string
Expand Down Expand Up @@ -224,6 +225,8 @@ func (tbc *bucketRewriteConfig) registerBucketRewriteFlag(cmd extkingpin.FlagCla
}

func (tbc *bucketDownsampleConfig) registerBucketDownsampleFlag(cmd extkingpin.FlagClause) *bucketDownsampleConfig {
cmd.Flag("wait-interval", "Wait interval between downsample runs.").
Default("5m").DurationVar(&tbc.waitInterval)
cmd.Flag("downsample.concurrency", "Number of goroutines to use when downsampling blocks.").
Default("1").IntVar(&tbc.downsampleConcurrency)
cmd.Flag("data-dir", "Data directory in which to cache blocks and process downsamplings.").
Expand Down Expand Up @@ -747,7 +750,8 @@ func registerBucketDownsample(app extkingpin.AppClause, objStoreConfig *extflag.
tbc.registerBucketDownsampleFlag(cmd)

cmd.Setup(func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, _ bool) error {
return RunDownsample(g, logger, reg, *httpAddr, *httpTLSConfig, time.Duration(*httpGracePeriod), tbc.dataDir, tbc.downsampleConcurrency, objStoreConfig, component.Downsample, metadata.HashFunc(tbc.hashFunc))
return RunDownsample(g, logger, reg, *httpAddr, *httpTLSConfig, time.Duration(*httpGracePeriod), tbc.dataDir,
tbc.waitInterval, tbc.downsampleConcurrency, objStoreConfig, component.Downsample, metadata.HashFunc(tbc.hashFunc))
})
}

Expand Down
1 change: 1 addition & 0 deletions docs/components/tools.md
Expand Up @@ -652,6 +652,7 @@ Flags:
format details:
https://thanos.io/tip/thanos/tracing.md/#configuration
--version Show application version.
--wait-interval=5m Wait interval between downsample runs.
```

Expand Down

0 comments on commit 0c067eb

Please sign in to comment.