From a414f0d35a382335f06004bc8fbc74d054d057cf Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Sun, 19 Jun 2022 19:05:59 -0700 Subject: [PATCH 1/5] remove unused block-sync-concurrency flag Signed-off-by: Ben Ye --- cmd/thanos/compact.go | 5 +---- cmd/thanos/tools_bucket.go | 16 ++++++++-------- docs/components/compact.md | 3 --- pkg/compact/compact.go | 3 +-- 4 files changed, 10 insertions(+), 17 deletions(-) diff --git a/cmd/thanos/compact.go b/cmd/thanos/compact.go index a17433ca5a..18c2be2104 100644 --- a/cmd/thanos/compact.go +++ b/cmd/thanos/compact.go @@ -280,7 +280,7 @@ func runCompact( ignoreDeletionMarkFilter, compactMetrics.blocksMarked.WithLabelValues(metadata.DeletionMarkFilename, ""), compactMetrics.garbageCollectedBlocks, - conf.blockSyncConcurrency) + ) if err != nil { return errors.Wrap(err, "create syncer") } @@ -629,7 +629,6 @@ type compactConfig struct { wait bool waitInterval time.Duration disableDownsampling bool - blockSyncConcurrency int blockMetaFetchConcurrency int blockViewerSyncBlockInterval time.Duration blockViewerSyncBlockTimeout time.Duration @@ -687,8 +686,6 @@ func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) { "as querying long time ranges without non-downsampled data is not efficient and useful e.g it is not possible to render all samples for a human eye anyway"). Default("false").BoolVar(&cc.disableDownsampling) - cmd.Flag("block-sync-concurrency", "Number of goroutines to use when syncing block metadata from object storage."). - Default("20").IntVar(&cc.blockSyncConcurrency) cmd.Flag("block-meta-fetch-concurrency", "Number of goroutines to use when fetching block metadata from object storage."). Default("32").IntVar(&cc.blockMetaFetchConcurrency) cmd.Flag("block-viewer.global.sync-block-interval", "Repeat interval for syncing the blocks between local and remote view for /global Block Viewer UI."). diff --git a/cmd/thanos/tools_bucket.go b/cmd/thanos/tools_bucket.go index 1559ec5050..f1107706aa 100644 --- a/cmd/thanos/tools_bucket.go +++ b/cmd/thanos/tools_bucket.go @@ -792,15 +792,15 @@ func registerBucketCleanup(app extkingpin.AppClause, objStoreConfig *extflag.Pat // While fetching blocks, we filter out blocks that were marked for deletion by using IgnoreDeletionMarkFilter. // The delay of deleteDelay/2 is added to ensure we fetch blocks that are meant to be deleted but do not have a replacement yet. // This is to make sure compactor will not accidentally perform compactions with gap instead. - ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, bkt, tbc.deleteDelay/2, block.FetcherConcurrency) - duplicateBlocksFilter := block.NewDeduplicateFilter(block.FetcherConcurrency) + ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, bkt, tbc.deleteDelay/2, tbc.blockSyncConcurrency) + duplicateBlocksFilter := block.NewDeduplicateFilter(tbc.blockSyncConcurrency) blocksCleaner := compact.NewBlocksCleaner(logger, bkt, ignoreDeletionMarkFilter, tbc.deleteDelay, stubCounter, stubCounter) ctx := context.Background() var sy *compact.Syncer { - baseMetaFetcher, err := block.NewBaseFetcher(logger, block.FetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg)) + baseMetaFetcher, err := block.NewBaseFetcher(logger, tbc.blockSyncConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg)) if err != nil { return errors.Wrap(err, "create meta fetcher") } @@ -821,7 +821,7 @@ func registerBucketCleanup(app extkingpin.AppClause, objStoreConfig *extflag.Pat ignoreDeletionMarkFilter, stubCounter, stubCounter, - tbc.blockSyncConcurrency) + ) if err != nil { return errors.Wrap(err, "create syncer") } @@ -1318,13 +1318,13 @@ func registerBucketRetention(app extkingpin.AppClause, objStoreConfig *extflag.P // While fetching blocks, we filter out blocks that were marked for deletion by using IgnoreDeletionMarkFilter. // The delay of deleteDelay/2 is added to ensure we fetch blocks that are meant to be deleted but do not have a replacement yet. // This is to make sure compactor will not accidentally perform compactions with gap instead. - ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, bkt, tbc.deleteDelay/2, block.FetcherConcurrency) - duplicateBlocksFilter := block.NewDeduplicateFilter(block.FetcherConcurrency) + ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, bkt, tbc.deleteDelay/2, tbc.blockSyncConcurrency) + duplicateBlocksFilter := block.NewDeduplicateFilter(tbc.blockSyncConcurrency) stubCounter := promauto.With(nil).NewCounter(prometheus.CounterOpts{}) var sy *compact.Syncer { - baseMetaFetcher, err := block.NewBaseFetcher(logger, block.FetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg)) + baseMetaFetcher, err := block.NewBaseFetcher(logger, tbc.blockSyncConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg)) if err != nil { return errors.Wrap(err, "create meta fetcher") } @@ -1345,7 +1345,7 @@ func registerBucketRetention(app extkingpin.AppClause, objStoreConfig *extflag.P ignoreDeletionMarkFilter, stubCounter, stubCounter, - tbc.blockSyncConcurrency) + ) if err != nil { return errors.Wrap(err, "create syncer") } diff --git a/docs/components/compact.md b/docs/components/compact.md index 8b706e7013..337c38c1bc 100644 --- a/docs/components/compact.md +++ b/docs/components/compact.md @@ -282,9 +282,6 @@ Flags: --block-meta-fetch-concurrency=32 Number of goroutines to use when fetching block metadata from object storage. - --block-sync-concurrency=20 - Number of goroutines to use when syncing block - metadata from object storage. --block-viewer.global.sync-block-interval=1m Repeat interval for syncing the blocks between local and remote view for /global Block Viewer diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index 5e55262f5e..15a11e9dd3 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -98,7 +98,7 @@ func newSyncerMetrics(reg prometheus.Registerer, blocksMarkedForDeletion, garbag // NewMetaSyncer returns a new Syncer for the given Bucket and directory. // Blocks must be at least as old as the sync delay for being considered. -func NewMetaSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, fetcher block.MetadataFetcher, duplicateBlocksFilter *block.DeduplicateFilter, ignoreDeletionMarkFilter *block.IgnoreDeletionMarkFilter, blocksMarkedForDeletion, garbageCollectedBlocks prometheus.Counter, blockSyncConcurrency int) (*Syncer, error) { +func NewMetaSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, fetcher block.MetadataFetcher, duplicateBlocksFilter *block.DeduplicateFilter, ignoreDeletionMarkFilter *block.IgnoreDeletionMarkFilter, blocksMarkedForDeletion, garbageCollectedBlocks prometheus.Counter) (*Syncer, error) { if logger == nil { logger = log.NewNopLogger() } @@ -111,7 +111,6 @@ func NewMetaSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bu metrics: newSyncerMetrics(reg, blocksMarkedForDeletion, garbageCollectedBlocks), duplicateBlocksFilter: duplicateBlocksFilter, ignoreDeletionMarkFilter: ignoreDeletionMarkFilter, - blockSyncConcurrency: blockSyncConcurrency, }, nil } From e1e074b1269890bc9fc990329260d72087319081 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Sun, 19 Jun 2022 19:07:51 -0700 Subject: [PATCH 2/5] add changelog Signed-off-by: Ben Ye --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5722c256a4..1ff18e50b2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -28,6 +28,8 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re ### Removed +- [#5426](https://github.com/thanos-io/thanos/pull/5426) Compactor: Remove flag `--block-sync-concurrency`. + ## [v0.26.0](https://github.com/thanos-io/thanos/tree/release-0.26) - 2022.05.05 ### Fixed From 0de2a8d988e16218c1b867269a526ba86e7d4032 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Sun, 19 Jun 2022 20:13:39 -0700 Subject: [PATCH 3/5] update Signed-off-by: Ben Ye --- pkg/compact/compact.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/compact/compact.go b/pkg/compact/compact.go index 15a11e9dd3..6465f16927 100644 --- a/pkg/compact/compact.go +++ b/pkg/compact/compact.go @@ -59,7 +59,6 @@ type Syncer struct { mtx sync.Mutex blocks map[ulid.ULID]*metadata.Meta partial map[ulid.ULID]error - blockSyncConcurrency int metrics *syncerMetrics duplicateBlocksFilter *block.DeduplicateFilter ignoreDeletionMarkFilter *block.IgnoreDeletionMarkFilter From dd7643502823aadde2494ebc09b16db673b2a08f Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Mon, 20 Jun 2022 12:58:21 -0700 Subject: [PATCH 4/5] fix e2e test Signed-off-by: Ben Ye --- CHANGELOG.md | 2 +- test/e2e/e2ethanos/services.go | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1ff18e50b2..6e4fdc12e9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -28,7 +28,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re ### Removed -- [#5426](https://github.com/thanos-io/thanos/pull/5426) Compactor: Remove flag `--block-sync-concurrency`. +- [#5426](https://github.com/thanos-io/thanos/pull/5426) Compactor: Remove an unused flag `--block-sync-concurrency`. ## [v0.26.0](https://github.com/thanos-io/thanos/tree/release-0.26) - 2022.05.05 diff --git a/test/e2e/e2ethanos/services.go b/test/e2e/e2ethanos/services.go index 587d9e05c3..37345cf4f3 100644 --- a/test/e2e/e2ethanos/services.go +++ b/test/e2e/e2ethanos/services.go @@ -688,7 +688,6 @@ func (c *CompactorBuilder) Init(bucketConfig client.BucketConfig, relabelConfig "--data-dir": c.InternalDir(), "--objstore.config": string(bktConfigBytes), "--http-address": ":8080", - "--block-sync-concurrency": "50", "--compact.cleanup-interval": "15s", "--selector.relabel-config": string(relabelConfigBytes), "--wait": "", From 8334467c3b09fb6e8a7fb809cb277ae8e165e1f6 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Mon, 20 Jun 2022 15:25:19 -0700 Subject: [PATCH 5/5] fix tests Signed-off-by: Ben Ye --- pkg/compact/compact_e2e_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/compact/compact_e2e_test.go b/pkg/compact/compact_e2e_test.go index 2dc783b9e7..9501fc120c 100644 --- a/pkg/compact/compact_e2e_test.go +++ b/pkg/compact/compact_e2e_test.go @@ -104,7 +104,7 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) { garbageCollectedBlocks := promauto.With(nil).NewCounter(prometheus.CounterOpts{}) blockMarkedForNoCompact := promauto.With(nil).NewCounter(prometheus.CounterOpts{}) ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(nil, nil, 48*time.Hour, fetcherConcurrency) - sy, err := NewMetaSyncer(nil, nil, bkt, metaFetcher, duplicateBlocksFilter, ignoreDeletionMarkFilter, blocksMarkedForDeletion, garbageCollectedBlocks, 1) + sy, err := NewMetaSyncer(nil, nil, bkt, metaFetcher, duplicateBlocksFilter, ignoreDeletionMarkFilter, blocksMarkedForDeletion, garbageCollectedBlocks) testutil.Ok(t, err) // Do one initial synchronization with the bucket. @@ -207,7 +207,7 @@ func testGroupCompactE2e(t *testing.T, mergeFunc storage.VerticalChunkSeriesMerg blocksMarkedForDeletion := promauto.With(nil).NewCounter(prometheus.CounterOpts{}) blocksMaredForNoCompact := promauto.With(nil).NewCounter(prometheus.CounterOpts{}) garbageCollectedBlocks := promauto.With(nil).NewCounter(prometheus.CounterOpts{}) - sy, err := NewMetaSyncer(nil, nil, bkt, metaFetcher, duplicateBlocksFilter, ignoreDeletionMarkFilter, blocksMarkedForDeletion, garbageCollectedBlocks, 5) + sy, err := NewMetaSyncer(nil, nil, bkt, metaFetcher, duplicateBlocksFilter, ignoreDeletionMarkFilter, blocksMarkedForDeletion, garbageCollectedBlocks) testutil.Ok(t, err) comp, err := tsdb.NewLeveledCompactor(ctx, reg, logger, []int64{1000, 3000}, nil, mergeFunc) @@ -519,7 +519,7 @@ func TestGarbageCollectDoesntCreateEmptyBlocksWithDeletionMarksOnly(t *testing.T }) testutil.Ok(t, err) - sy, err := NewMetaSyncer(nil, nil, bkt, metaFetcher, duplicateBlocksFilter, ignoreDeletionMarkFilter, blocksMarkedForDeletion, garbageCollectedBlocks, 1) + sy, err := NewMetaSyncer(nil, nil, bkt, metaFetcher, duplicateBlocksFilter, ignoreDeletionMarkFilter, blocksMarkedForDeletion, garbageCollectedBlocks) testutil.Ok(t, err) // Do one initial synchronization with the bucket.