Skip to content

Commit

Permalink
remove unused block-sync-concurrency flag (#5426)
Browse files Browse the repository at this point in the history
* remove unused block-sync-concurrency flag

Signed-off-by: Ben Ye <ben.ye@bytedance.com>

* add changelog

Signed-off-by: Ben Ye <ben.ye@bytedance.com>

* update

Signed-off-by: Ben Ye <ben.ye@bytedance.com>

* fix e2e test

Signed-off-by: Ben Ye <ben.ye@bytedance.com>

* fix tests

Signed-off-by: Ben Ye <ben.ye@bytedance.com>
  • Loading branch information
Ben Ye committed Jun 23, 2022
1 parent a0f4181 commit f60b659
Show file tree
Hide file tree
Showing 7 changed files with 15 additions and 22 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Expand Up @@ -42,6 +42,8 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re

### Removed

- [#5426](https://github.com/thanos-io/thanos/pull/5426) Compactor: Remove an unused flag `--block-sync-concurrency`.

## [v0.26.0](https://github.com/thanos-io/thanos/tree/release-0.26) - 2022.05.05

### Fixed
Expand Down
5 changes: 1 addition & 4 deletions cmd/thanos/compact.go
Expand Up @@ -280,7 +280,7 @@ func runCompact(
ignoreDeletionMarkFilter,
compactMetrics.blocksMarked.WithLabelValues(metadata.DeletionMarkFilename, ""),
compactMetrics.garbageCollectedBlocks,
conf.blockSyncConcurrency)
)
if err != nil {
return errors.Wrap(err, "create syncer")
}
Expand Down Expand Up @@ -629,7 +629,6 @@ type compactConfig struct {
wait bool
waitInterval time.Duration
disableDownsampling bool
blockSyncConcurrency int
blockMetaFetchConcurrency int
blockViewerSyncBlockInterval time.Duration
blockViewerSyncBlockTimeout time.Duration
Expand Down Expand Up @@ -687,8 +686,6 @@ func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) {
"as querying long time ranges without non-downsampled data is not efficient and useful e.g it is not possible to render all samples for a human eye anyway").
Default("false").BoolVar(&cc.disableDownsampling)

cmd.Flag("block-sync-concurrency", "Number of goroutines to use when syncing block metadata from object storage.").
Default("20").IntVar(&cc.blockSyncConcurrency)
cmd.Flag("block-meta-fetch-concurrency", "Number of goroutines to use when fetching block metadata from object storage.").
Default("32").IntVar(&cc.blockMetaFetchConcurrency)
cmd.Flag("block-viewer.global.sync-block-interval", "Repeat interval for syncing the blocks between local and remote view for /global Block Viewer UI.").
Expand Down
16 changes: 8 additions & 8 deletions cmd/thanos/tools_bucket.go
Expand Up @@ -792,15 +792,15 @@ func registerBucketCleanup(app extkingpin.AppClause, objStoreConfig *extflag.Pat
// While fetching blocks, we filter out blocks that were marked for deletion by using IgnoreDeletionMarkFilter.
// The delay of deleteDelay/2 is added to ensure we fetch blocks that are meant to be deleted but do not have a replacement yet.
// This is to make sure compactor will not accidentally perform compactions with gap instead.
ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, bkt, tbc.deleteDelay/2, block.FetcherConcurrency)
duplicateBlocksFilter := block.NewDeduplicateFilter(block.FetcherConcurrency)
ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, bkt, tbc.deleteDelay/2, tbc.blockSyncConcurrency)
duplicateBlocksFilter := block.NewDeduplicateFilter(tbc.blockSyncConcurrency)
blocksCleaner := compact.NewBlocksCleaner(logger, bkt, ignoreDeletionMarkFilter, tbc.deleteDelay, stubCounter, stubCounter)

ctx := context.Background()

var sy *compact.Syncer
{
baseMetaFetcher, err := block.NewBaseFetcher(logger, block.FetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg))
baseMetaFetcher, err := block.NewBaseFetcher(logger, tbc.blockSyncConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg))
if err != nil {
return errors.Wrap(err, "create meta fetcher")
}
Expand All @@ -821,7 +821,7 @@ func registerBucketCleanup(app extkingpin.AppClause, objStoreConfig *extflag.Pat
ignoreDeletionMarkFilter,
stubCounter,
stubCounter,
tbc.blockSyncConcurrency)
)
if err != nil {
return errors.Wrap(err, "create syncer")
}
Expand Down Expand Up @@ -1318,13 +1318,13 @@ func registerBucketRetention(app extkingpin.AppClause, objStoreConfig *extflag.P
// While fetching blocks, we filter out blocks that were marked for deletion by using IgnoreDeletionMarkFilter.
// The delay of deleteDelay/2 is added to ensure we fetch blocks that are meant to be deleted but do not have a replacement yet.
// This is to make sure compactor will not accidentally perform compactions with gap instead.
ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, bkt, tbc.deleteDelay/2, block.FetcherConcurrency)
duplicateBlocksFilter := block.NewDeduplicateFilter(block.FetcherConcurrency)
ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(logger, bkt, tbc.deleteDelay/2, tbc.blockSyncConcurrency)
duplicateBlocksFilter := block.NewDeduplicateFilter(tbc.blockSyncConcurrency)
stubCounter := promauto.With(nil).NewCounter(prometheus.CounterOpts{})

var sy *compact.Syncer
{
baseMetaFetcher, err := block.NewBaseFetcher(logger, block.FetcherConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg))
baseMetaFetcher, err := block.NewBaseFetcher(logger, tbc.blockSyncConcurrency, bkt, "", extprom.WrapRegistererWithPrefix(extpromPrefix, reg))
if err != nil {
return errors.Wrap(err, "create meta fetcher")
}
Expand All @@ -1345,7 +1345,7 @@ func registerBucketRetention(app extkingpin.AppClause, objStoreConfig *extflag.P
ignoreDeletionMarkFilter,
stubCounter,
stubCounter,
tbc.blockSyncConcurrency)
)
if err != nil {
return errors.Wrap(err, "create syncer")
}
Expand Down
3 changes: 0 additions & 3 deletions docs/components/compact.md
Expand Up @@ -282,9 +282,6 @@ Flags:
--block-meta-fetch-concurrency=32
Number of goroutines to use when fetching block
metadata from object storage.
--block-sync-concurrency=20
Number of goroutines to use when syncing block
metadata from object storage.
--block-viewer.global.sync-block-interval=1m
Repeat interval for syncing the blocks between
local and remote view for /global Block Viewer
Expand Down
4 changes: 1 addition & 3 deletions pkg/compact/compact.go
Expand Up @@ -59,7 +59,6 @@ type Syncer struct {
mtx sync.Mutex
blocks map[ulid.ULID]*metadata.Meta
partial map[ulid.ULID]error
blockSyncConcurrency int
metrics *syncerMetrics
duplicateBlocksFilter *block.DeduplicateFilter
ignoreDeletionMarkFilter *block.IgnoreDeletionMarkFilter
Expand Down Expand Up @@ -98,7 +97,7 @@ func newSyncerMetrics(reg prometheus.Registerer, blocksMarkedForDeletion, garbag

// NewMetaSyncer returns a new Syncer for the given Bucket and directory.
// Blocks must be at least as old as the sync delay for being considered.
func NewMetaSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, fetcher block.MetadataFetcher, duplicateBlocksFilter *block.DeduplicateFilter, ignoreDeletionMarkFilter *block.IgnoreDeletionMarkFilter, blocksMarkedForDeletion, garbageCollectedBlocks prometheus.Counter, blockSyncConcurrency int) (*Syncer, error) {
func NewMetaSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bucket, fetcher block.MetadataFetcher, duplicateBlocksFilter *block.DeduplicateFilter, ignoreDeletionMarkFilter *block.IgnoreDeletionMarkFilter, blocksMarkedForDeletion, garbageCollectedBlocks prometheus.Counter) (*Syncer, error) {
if logger == nil {
logger = log.NewNopLogger()
}
Expand All @@ -111,7 +110,6 @@ func NewMetaSyncer(logger log.Logger, reg prometheus.Registerer, bkt objstore.Bu
metrics: newSyncerMetrics(reg, blocksMarkedForDeletion, garbageCollectedBlocks),
duplicateBlocksFilter: duplicateBlocksFilter,
ignoreDeletionMarkFilter: ignoreDeletionMarkFilter,
blockSyncConcurrency: blockSyncConcurrency,
}, nil
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/compact/compact_e2e_test.go
Expand Up @@ -104,7 +104,7 @@ func TestSyncer_GarbageCollect_e2e(t *testing.T) {
garbageCollectedBlocks := promauto.With(nil).NewCounter(prometheus.CounterOpts{})
blockMarkedForNoCompact := promauto.With(nil).NewCounter(prometheus.CounterOpts{})
ignoreDeletionMarkFilter := block.NewIgnoreDeletionMarkFilter(nil, nil, 48*time.Hour, fetcherConcurrency)
sy, err := NewMetaSyncer(nil, nil, bkt, metaFetcher, duplicateBlocksFilter, ignoreDeletionMarkFilter, blocksMarkedForDeletion, garbageCollectedBlocks, 1)
sy, err := NewMetaSyncer(nil, nil, bkt, metaFetcher, duplicateBlocksFilter, ignoreDeletionMarkFilter, blocksMarkedForDeletion, garbageCollectedBlocks)
testutil.Ok(t, err)

// Do one initial synchronization with the bucket.
Expand Down Expand Up @@ -207,7 +207,7 @@ func testGroupCompactE2e(t *testing.T, mergeFunc storage.VerticalChunkSeriesMerg
blocksMarkedForDeletion := promauto.With(nil).NewCounter(prometheus.CounterOpts{})
blocksMaredForNoCompact := promauto.With(nil).NewCounter(prometheus.CounterOpts{})
garbageCollectedBlocks := promauto.With(nil).NewCounter(prometheus.CounterOpts{})
sy, err := NewMetaSyncer(nil, nil, bkt, metaFetcher, duplicateBlocksFilter, ignoreDeletionMarkFilter, blocksMarkedForDeletion, garbageCollectedBlocks, 5)
sy, err := NewMetaSyncer(nil, nil, bkt, metaFetcher, duplicateBlocksFilter, ignoreDeletionMarkFilter, blocksMarkedForDeletion, garbageCollectedBlocks)
testutil.Ok(t, err)

comp, err := tsdb.NewLeveledCompactor(ctx, reg, logger, []int64{1000, 3000}, nil, mergeFunc)
Expand Down Expand Up @@ -519,7 +519,7 @@ func TestGarbageCollectDoesntCreateEmptyBlocksWithDeletionMarksOnly(t *testing.T
})
testutil.Ok(t, err)

sy, err := NewMetaSyncer(nil, nil, bkt, metaFetcher, duplicateBlocksFilter, ignoreDeletionMarkFilter, blocksMarkedForDeletion, garbageCollectedBlocks, 1)
sy, err := NewMetaSyncer(nil, nil, bkt, metaFetcher, duplicateBlocksFilter, ignoreDeletionMarkFilter, blocksMarkedForDeletion, garbageCollectedBlocks)
testutil.Ok(t, err)

// Do one initial synchronization with the bucket.
Expand Down
1 change: 0 additions & 1 deletion test/e2e/e2ethanos/services.go
Expand Up @@ -709,7 +709,6 @@ func (c *CompactorBuilder) Init(bucketConfig client.BucketConfig, relabelConfig
"--data-dir": c.InternalDir(),
"--objstore.config": string(bktConfigBytes),
"--http-address": ":8080",
"--block-sync-concurrency": "50",
"--compact.cleanup-interval": "15s",
"--selector.relabel-config": string(relabelConfigBytes),
"--wait": "",
Expand Down

0 comments on commit f60b659

Please sign in to comment.