diff --git a/CHANGELOG.md b/CHANGELOG.md index 83e9e0fe63..df11b95e10 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,7 +15,8 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re ### Changed - [#5205](https://github.com/thanos-io/thanos/pull/5205) Rule: Add ruler labels as external labels in stateless ruler mode. -- [#5206](https://github.com/thanos-io/thanos/pull/5206) Cache: add timeout for groupcache's fetch operation +- [#5206](https://github.com/thanos-io/thanos/pull/5206) Cache: add timeout for groupcache's fetch operation. +- [#5218](https://github.com/thanos-io/thanos/pull/5218) Tools: Run bucket downsample tools continuously. ### Removed diff --git a/cmd/thanos/downsample.go b/cmd/thanos/downsample.go index 3537028268..bd299c1e3e 100644 --- a/cmd/thanos/downsample.go +++ b/cmd/thanos/downsample.go @@ -69,6 +69,7 @@ func RunDownsample( httpTLSConfig string, httpGracePeriod time.Duration, dataDir string, + waitInterval time.Duration, downsampleConcurrency int, objStoreConfig *extflag.PathOrContent, comp component.Component, @@ -113,31 +114,32 @@ func RunDownsample( defer runutil.CloseWithLogOnErr(logger, bkt, "bucket client") statusProber.Ready() - level.Info(logger).Log("msg", "start first pass of downsampling") - metas, _, err := metaFetcher.Fetch(ctx) - if err != nil { - return errors.Wrap(err, "sync before first pass of downsampling") - } - - for _, meta := range metas { - groupKey := meta.Thanos.GroupKey() - metrics.downsamples.WithLabelValues(groupKey) - metrics.downsampleFailures.WithLabelValues(groupKey) - } - if err := downsampleBucket(ctx, logger, metrics, bkt, metas, dataDir, downsampleConcurrency, hashFunc); err != nil { - return errors.Wrap(err, "downsampling failed") - } + return runutil.Repeat(waitInterval, ctx.Done(), func() error { + level.Info(logger).Log("msg", "start first pass of downsampling") + metas, _, err := metaFetcher.Fetch(ctx) + if err != nil { + return errors.Wrap(err, "sync before first pass of downsampling") + } - level.Info(logger).Log("msg", "start second pass of downsampling") - metas, _, err = metaFetcher.Fetch(ctx) - if err != nil { - return errors.Wrap(err, "sync before second pass of downsampling") - } - if err := downsampleBucket(ctx, logger, metrics, bkt, metas, dataDir, downsampleConcurrency, hashFunc); err != nil { - return errors.Wrap(err, "downsampling failed") - } + for _, meta := range metas { + groupKey := meta.Thanos.GroupKey() + metrics.downsamples.WithLabelValues(groupKey) + metrics.downsampleFailures.WithLabelValues(groupKey) + } + if err := downsampleBucket(ctx, logger, metrics, bkt, metas, dataDir, downsampleConcurrency, hashFunc); err != nil { + return errors.Wrap(err, "downsampling failed") + } - return nil + level.Info(logger).Log("msg", "start second pass of downsampling") + metas, _, err = metaFetcher.Fetch(ctx) + if err != nil { + return errors.Wrap(err, "sync before second pass of downsampling") + } + if err := downsampleBucket(ctx, logger, metrics, bkt, metas, dataDir, downsampleConcurrency, hashFunc); err != nil { + return errors.Wrap(err, "downsampling failed") + } + return nil + }) }, func(error) { cancel() }) diff --git a/cmd/thanos/tools_bucket.go b/cmd/thanos/tools_bucket.go index d9d612362e..6964b4c71b 100644 --- a/cmd/thanos/tools_bucket.go +++ b/cmd/thanos/tools_bucket.go @@ -129,6 +129,7 @@ type bucketReplicateConfig struct { } type bucketDownsampleConfig struct { + waitInterval time.Duration downsampleConcurrency int dataDir string hashFunc string @@ -224,6 +225,8 @@ func (tbc *bucketRewriteConfig) registerBucketRewriteFlag(cmd extkingpin.FlagCla } func (tbc *bucketDownsampleConfig) registerBucketDownsampleFlag(cmd extkingpin.FlagClause) *bucketDownsampleConfig { + cmd.Flag("wait-interval", "Wait interval between downsample runs."). + Default("5m").DurationVar(&tbc.waitInterval) cmd.Flag("downsample.concurrency", "Number of goroutines to use when downsampling blocks."). Default("1").IntVar(&tbc.downsampleConcurrency) cmd.Flag("data-dir", "Data directory in which to cache blocks and process downsamplings."). @@ -747,7 +750,8 @@ func registerBucketDownsample(app extkingpin.AppClause, objStoreConfig *extflag. tbc.registerBucketDownsampleFlag(cmd) cmd.Setup(func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ <-chan struct{}, _ bool) error { - return RunDownsample(g, logger, reg, *httpAddr, *httpTLSConfig, time.Duration(*httpGracePeriod), tbc.dataDir, tbc.downsampleConcurrency, objStoreConfig, component.Downsample, metadata.HashFunc(tbc.hashFunc)) + return RunDownsample(g, logger, reg, *httpAddr, *httpTLSConfig, time.Duration(*httpGracePeriod), tbc.dataDir, + tbc.waitInterval, tbc.downsampleConcurrency, objStoreConfig, component.Downsample, metadata.HashFunc(tbc.hashFunc)) }) } diff --git a/docs/components/tools.md b/docs/components/tools.md index efa9dc475b..80ceed9585 100644 --- a/docs/components/tools.md +++ b/docs/components/tools.md @@ -652,6 +652,7 @@ Flags: format details: https://thanos.io/tip/thanos/tracing.md/#configuration --version Show application version. + --wait-interval=5m Wait interval between downsample runs. ```