Skip to content

Commit

Permalink
Store: batch tsdb infos (#7308)
Browse files Browse the repository at this point in the history
Batch TSDB Infos for bucket store for blocks with overlapping ranges.

Signed-off-by: Michael Hoffmann <mhoffm@posteo.de>
  • Loading branch information
MichaHoffmann committed Apr 26, 2024
1 parent 6bf98f9 commit fed2870
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 6 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -23,6 +23,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#7248](https://github.com/thanos-io/thanos/pull/7248) Receive: Fix RemoteWriteAsync was sequentially executed causing high latency in the ingestion path.
- [#7271](https://github.com/thanos-io/thanos/pull/7271) Query: fixing dedup iterator when working on mixed sample types.
- [#7289](https://github.com/thanos-io/thanos/pull/7289) Query Frontend: show warnings from downstream queries.
- [#7308](https://github.com/thanos-io/thanos/pull/7308) Store: Batch TSDB Infos for blocks.

### Added

Expand Down
30 changes: 25 additions & 5 deletions pkg/store/bucket.go
Expand Up @@ -42,7 +42,6 @@ import (
"google.golang.org/grpc/status"

"github.com/thanos-io/objstore"

"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/indexheader"
"github.com/thanos-io/thanos/pkg/block/metadata"
Expand Down Expand Up @@ -874,18 +873,39 @@ func (s *BucketStore) TSDBInfos() []infopb.TSDBInfo {
s.mtx.RLock()
defer s.mtx.RUnlock()

infos := make([]infopb.TSDBInfo, 0, len(s.blocks))
infoMap := make(map[uint64][]infopb.TSDBInfo, len(s.blocks))
for _, b := range s.blocks {
infos = append(infos, infopb.TSDBInfo{
lbls := labels.FromMap(b.meta.Thanos.Labels)
hash := lbls.Hash()
infoMap[hash] = append(infoMap[hash], infopb.TSDBInfo{
Labels: labelpb.ZLabelSet{
Labels: labelpb.ZLabelsFromPromLabels(labels.FromMap(b.meta.Thanos.Labels)),
Labels: labelpb.ZLabelsFromPromLabels(lbls),
},
MinTime: b.meta.MinTime,
MaxTime: b.meta.MaxTime,
})
}

return infos
// join adjacent blocks so we emit less TSDBInfos
res := make([]infopb.TSDBInfo, 0, len(s.blocks))
for _, infos := range infoMap {
sort.Slice(infos, func(i, j int) bool { return infos[i].MinTime < infos[j].MinTime })

cur := infos[0]
for i, info := range infos {
if info.MinTime > cur.MaxTime {
res = append(res, cur)
cur = info
continue
}
cur.MaxTime = info.MaxTime
if i == len(infos)-1 {
res = append(res, cur)
}
}
}

return res
}

func (s *BucketStore) LabelSet() []labelpb.ZLabelSet {
Expand Down
79 changes: 78 additions & 1 deletion pkg/store/bucket_test.go
Expand Up @@ -48,13 +48,13 @@ import (

"github.com/thanos-io/objstore"
"github.com/thanos-io/objstore/providers/filesystem"

"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/indexheader"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/compact"
"github.com/thanos-io/thanos/pkg/compact/downsample"
"github.com/thanos-io/thanos/pkg/gate"
"github.com/thanos-io/thanos/pkg/info/infopb"
"github.com/thanos-io/thanos/pkg/pool"
storecache "github.com/thanos-io/thanos/pkg/store/cache"
"github.com/thanos-io/thanos/pkg/store/hintspb"
Expand Down Expand Up @@ -636,6 +636,83 @@ func TestBucketStoreConfig_validate(t *testing.T) {
}
}

func TestBucketStore_TSDBInfo(t *testing.T) {
defer custom.TolerantVerifyLeak(t)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

logger := log.NewNopLogger()
dir := t.TempDir()

bkt := objstore.WithNoopInstr(objstore.NewInMemBucket())
series := []labels.Labels{labels.FromStrings("a", "1", "b", "1")}

for _, tt := range []struct {
mint, maxt int64
extLabels labels.Labels
}{
{mint: 0, maxt: 1000, extLabels: labels.FromStrings("a", "b")},
{mint: 1000, maxt: 2000, extLabels: labels.FromStrings("a", "b")},
{mint: 3000, maxt: 4000, extLabels: labels.FromStrings("a", "b")},
{mint: 3500, maxt: 5000, extLabels: labels.FromStrings("a", "b")},
{mint: 0, maxt: 1000, extLabels: labels.FromStrings("a", "c")},
{mint: 500, maxt: 2000, extLabels: labels.FromStrings("a", "c")},
} {
id1, err := e2eutil.CreateBlock(ctx, dir, series, 10, tt.mint, tt.maxt, tt.extLabels, 0, metadata.NoneFunc)
testutil.Ok(t, err)
testutil.Ok(t, block.Upload(ctx, logger, bkt, filepath.Join(dir, id1.String()), metadata.NoneFunc))
}

baseBlockIDsFetcher := block.NewConcurrentLister(logger, bkt)
metaFetcher, err := block.NewMetaFetcher(logger, 20, bkt, baseBlockIDsFetcher, dir, nil, []block.MetadataFilter{
block.NewTimePartitionMetaFilter(allowAllFilterConf.MinTime, allowAllFilterConf.MaxTime),
})
testutil.Ok(t, err)

chunkPool, err := NewDefaultChunkBytesPool(2e5)
testutil.Ok(t, err)

bucketStore, err := NewBucketStore(
objstore.WithNoopInstr(bkt),
metaFetcher,
dir,
NewChunksLimiterFactory(0),
NewSeriesLimiterFactory(0),
NewBytesLimiterFactory(0),
NewGapBasedPartitioner(PartitionerMaxGapSize),
20,
true,
DefaultPostingOffsetInMemorySampling,
false,
false,
0,
WithChunkPool(chunkPool),
WithFilterConfig(allowAllFilterConf),
)
testutil.Ok(t, err)
defer func() { testutil.Ok(t, bucketStore.Close()) }()

testutil.Ok(t, bucketStore.SyncBlocks(ctx))
testutil.Equals(t, bucketStore.TSDBInfos(), []infopb.TSDBInfo{
{
Labels: labelpb.ZLabelSet{Labels: []labelpb.ZLabel{{Name: "a", Value: "b"}}},
MinTime: 0,
MaxTime: 2000,
},
{
Labels: labelpb.ZLabelSet{Labels: []labelpb.ZLabel{{Name: "a", Value: "b"}}},
MinTime: 3000,
MaxTime: 5000,
},
{
Labels: labelpb.ZLabelSet{Labels: []labelpb.ZLabel{{Name: "a", Value: "c"}}},
MinTime: 0,
MaxTime: 2000,
},
})
}

func TestBucketStore_Info(t *testing.T) {
defer custom.TolerantVerifyLeak(t)

Expand Down

0 comments on commit fed2870

Please sign in to comment.