Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run downsample tool continuously #5218

Merged
merged 2 commits into from Mar 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Expand Up @@ -15,7 +15,8 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
### Changed

- [#5205](https://github.com/thanos-io/thanos/pull/5205) Rule: Add ruler labels as external labels in stateless ruler mode.
- [#5206](https://github.com/thanos-io/thanos/pull/5206) Cache: add timeout for groupcache's fetch operation
- [#5206](https://github.com/thanos-io/thanos/pull/5206) Cache: add timeout for groupcache's fetch operation.
- [#5218](https://github.com/thanos-io/thanos/pull/5218) Tools: Run bucket downsample tools continuously.

### Removed

Expand Down
48 changes: 25 additions & 23 deletions cmd/thanos/downsample.go
Expand Up @@ -69,6 +69,7 @@ func RunDownsample(
httpTLSConfig string,
httpGracePeriod time.Duration,
dataDir string,
waitInterval time.Duration,
downsampleConcurrency int,
objStoreConfig *extflag.PathOrContent,
comp component.Component,
Expand Down Expand Up @@ -113,31 +114,32 @@ func RunDownsample(
defer runutil.CloseWithLogOnErr(logger, bkt, "bucket client")
statusProber.Ready()

level.Info(logger).Log("msg", "start first pass of downsampling")
metas, _, err := metaFetcher.Fetch(ctx)
if err != nil {
return errors.Wrap(err, "sync before first pass of downsampling")
}

for _, meta := range metas {
groupKey := meta.Thanos.GroupKey()
metrics.downsamples.WithLabelValues(groupKey)
metrics.downsampleFailures.WithLabelValues(groupKey)
}
if err := downsampleBucket(ctx, logger, metrics, bkt, metas, dataDir, downsampleConcurrency, hashFunc); err != nil {
return errors.Wrap(err, "downsampling failed")
}
return runutil.Repeat(waitInterval, ctx.Done(), func() error {
level.Info(logger).Log("msg", "start first pass of downsampling")
metas, _, err := metaFetcher.Fetch(ctx)
if err != nil {
return errors.Wrap(err, "sync before first pass of downsampling")
}

level.Info(logger).Log("msg", "start second pass of downsampling")
metas, _, err = metaFetcher.Fetch(ctx)
if err != nil {
return errors.Wrap(err, "sync before second pass of downsampling")
}
if err := downsampleBucket(ctx, logger, metrics, bkt, metas, dataDir, downsampleConcurrency, hashFunc); err != nil {
return errors.Wrap(err, "downsampling failed")
}
for _, meta := range metas {
groupKey := meta.Thanos.GroupKey()
metrics.downsamples.WithLabelValues(groupKey)
metrics.downsampleFailures.WithLabelValues(groupKey)
}
if err := downsampleBucket(ctx, logger, metrics, bkt, metas, dataDir, downsampleConcurrency, hashFunc); err != nil {
return errors.Wrap(err, "downsampling failed")
}

return nil
level.Info(logger).Log("msg", "start second pass of downsampling")
metas, _, err = metaFetcher.Fetch(ctx)
if err != nil {
return errors.Wrap(err, "sync before second pass of downsampling")
}
if err := downsampleBucket(ctx, logger, metrics, bkt, metas, dataDir, downsampleConcurrency, hashFunc); err != nil {
return errors.Wrap(err, "downsampling failed")
}
return nil
})
}, func(error) {
cancel()
})
Expand Down
6 changes: 5 additions & 1 deletion cmd/thanos/tools_bucket.go
Expand Up @@ -129,6 +129,7 @@ type bucketReplicateConfig struct {
}

type bucketDownsampleConfig struct {
waitInterval time.Duration
downsampleConcurrency int
dataDir string
hashFunc string
Expand Down Expand Up @@ -224,6 +225,8 @@ func (tbc *bucketRewriteConfig) registerBucketRewriteFlag(cmd extkingpin.FlagCla
}

func (tbc *bucketDownsampleConfig) registerBucketDownsampleFlag(cmd extkingpin.FlagClause) *bucketDownsampleConfig {
cmd.Flag("wait-interval", "Wait interval between downsample runs.").
Default("5m").DurationVar(&tbc.waitInterval)
cmd.Flag("downsample.concurrency", "Number of goroutines to use when downsampling blocks.").
Default("1").IntVar(&tbc.downsampleConcurrency)
cmd.Flag("data-dir", "Data directory in which to cache blocks and process downsamplings.").
Expand Down Expand Up @@ -747,7 +750,8 @@ func registerBucketDownsample(app extkingpin.AppClause, objStoreConfig *extflag.
tbc.registerBucketDownsampleFlag(cmd)

cmd.Setup(func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, _ bool) error {
return RunDownsample(g, logger, reg, *httpAddr, *httpTLSConfig, time.Duration(*httpGracePeriod), tbc.dataDir, tbc.downsampleConcurrency, objStoreConfig, component.Downsample, metadata.HashFunc(tbc.hashFunc))
return RunDownsample(g, logger, reg, *httpAddr, *httpTLSConfig, time.Duration(*httpGracePeriod), tbc.dataDir,
tbc.waitInterval, tbc.downsampleConcurrency, objStoreConfig, component.Downsample, metadata.HashFunc(tbc.hashFunc))
})
}

Expand Down
1 change: 1 addition & 0 deletions docs/components/tools.md
Expand Up @@ -652,6 +652,7 @@ Flags:
format details:
https://thanos.io/tip/thanos/tracing.md/#configuration
--version Show application version.
--wait-interval=5m Wait interval between downsample runs.

```

Expand Down