Skip to content

Commit

Permalink
Stores: respect replica labels in LabelValues and LabelNames (#7310)
Browse files Browse the repository at this point in the history
* Proxy: acceptance test for proxy store with replica labels

Signed-off-by: Michael Hoffmann <mhoffm@posteo.de>

* Stores: handle replica labels in label_value and label_names grpcs

Signed-off-by: Michael Hoffmann <mhoffm@posteo.de>

---------

Signed-off-by: Michael Hoffmann <mhoffm@posteo.de>
  • Loading branch information
MichaHoffmann committed Apr 29, 2024
1 parent 4145f03 commit bd74665
Show file tree
Hide file tree
Showing 8 changed files with 320 additions and 127 deletions.
21 changes: 16 additions & 5 deletions pkg/query/querier.go
Expand Up @@ -399,14 +399,19 @@ func (q *querier) LabelValues(ctx context.Context, name string, matchers ...*lab
if err != nil {
return nil, nil, errors.Wrap(err, "converting prom matchers to storepb matchers")
}

resp, err := q.proxy.LabelValues(ctx, &storepb.LabelValuesRequest{
req := &storepb.LabelValuesRequest{
Label: name,
PartialResponseStrategy: q.partialResponseStrategy,
Start: q.mint,
End: q.maxt,
Matchers: pbMatchers,
})
}

if q.isDedupEnabled() {
req.WithoutReplicaLabels = q.replicaLabels
}

resp, err := q.proxy.LabelValues(ctx, req)
if err != nil {
return nil, nil, errors.Wrap(err, "proxy LabelValues()")
}
Expand All @@ -433,12 +438,18 @@ func (q *querier) LabelNames(ctx context.Context, matchers ...*labels.Matcher) (
return nil, nil, errors.Wrap(err, "converting prom matchers to storepb matchers")
}

resp, err := q.proxy.LabelNames(ctx, &storepb.LabelNamesRequest{
req := &storepb.LabelNamesRequest{
PartialResponseStrategy: q.partialResponseStrategy,
Start: q.mint,
End: q.maxt,
Matchers: pbMatchers,
})
}

if q.isDedupEnabled() {
req.WithoutReplicaLabels = q.replicaLabels
}

resp, err := q.proxy.LabelNames(ctx, req)
if err != nil {
return nil, nil, errors.Wrap(err, "proxy LabelNames()")
}
Expand Down
92 changes: 64 additions & 28 deletions pkg/store/acceptance_test.go
Expand Up @@ -109,6 +109,7 @@ func testStoreAPIsAcceptance(t *testing.T, startStore startStoreFn) {
},
labelValuesCalls: []labelValuesCallCase{
{start: timestamp.FromTime(minTime), end: timestamp.FromTime(maxTime), label: "foo", expectedValues: []string{"foovalue1"}},
{start: timestamp.FromTime(minTime), end: timestamp.FromTime(maxTime), label: "replica"},
},
},
{
Expand Down Expand Up @@ -722,9 +723,10 @@ func testStoreAPIsAcceptance(t *testing.T, startStore startStoreFn) {
for _, c := range tc.labelNameCalls {
t.Run("label_names", func(t *testing.T) {
resp, err := store.LabelNames(context.Background(), &storepb.LabelNamesRequest{
Start: c.start,
End: c.end,
Matchers: c.matchers,
Start: c.start,
End: c.end,
Matchers: c.matchers,
WithoutReplicaLabels: []string{"replica"},
})
if c.expectErr != nil {
testutil.NotOk(t, err)
Expand All @@ -740,12 +742,13 @@ func testStoreAPIsAcceptance(t *testing.T, startStore startStoreFn) {
})
}
for _, c := range tc.labelValuesCalls {
t.Run("label_name_values", func(t *testing.T) {
t.Run("label_values", func(t *testing.T) {
resp, err := store.LabelValues(context.Background(), &storepb.LabelValuesRequest{
Start: c.start,
End: c.end,
Label: c.label,
Matchers: c.matchers,
Start: c.start,
End: c.end,
Label: c.label,
Matchers: c.matchers,
WithoutReplicaLabels: []string{"replica"},
})
if c.expectErr != nil {
testutil.NotOk(t, err)
Expand All @@ -764,10 +767,11 @@ func testStoreAPIsAcceptance(t *testing.T, startStore startStoreFn) {
t.Run("series", func(t *testing.T) {
srv := newStoreSeriesServer(context.Background())
err := store.Series(&storepb.SeriesRequest{
MinTime: c.start,
MaxTime: c.end,
Matchers: c.matchers,
SkipChunks: c.skipChunks,
MinTime: c.start,
MaxTime: c.end,
Matchers: c.matchers,
SkipChunks: c.skipChunks,
WithoutReplicaLabels: []string{"replica"},
}, srv)
if c.expectErr != nil {
testutil.NotOk(t, err)
Expand Down Expand Up @@ -882,23 +886,24 @@ func TestBucketStore_Acceptance(t *testing.T) {
tt.Skip("Bucket Store cannot handle empty HEAD")
}

id := createBlockFromHead(tt, auxDir, h)
for _, replica := range []string{"r1", "r2"} {
id := createBlockFromHead(tt, auxDir, h)

auxBlockDir := filepath.Join(auxDir, id.String())
meta, err := metadata.ReadFromDir(auxBlockDir)
testutil.Ok(t, err)
stats, err := block.GatherIndexHealthStats(ctx, logger, filepath.Join(auxBlockDir, block.IndexFilename), meta.MinTime, meta.MaxTime)
testutil.Ok(t, err)
_, err = metadata.InjectThanos(log.NewNopLogger(), auxBlockDir, metadata.Thanos{
Labels: extLset.Map(),
Downsample: metadata.ThanosDownsample{Resolution: 0},
Source: metadata.TestSource,
IndexStats: metadata.IndexStats{SeriesMaxSize: stats.SeriesMaxSize, ChunkMaxSize: stats.ChunkMaxSize},
}, nil)
testutil.Ok(tt, err)

testutil.Ok(tt, block.Upload(ctx, logger, bkt, auxBlockDir, metadata.NoneFunc))
testutil.Ok(tt, block.Upload(ctx, logger, bkt, auxBlockDir, metadata.NoneFunc))
auxBlockDir := filepath.Join(auxDir, id.String())
meta, err := metadata.ReadFromDir(auxBlockDir)
testutil.Ok(t, err)
stats, err := block.GatherIndexHealthStats(ctx, logger, filepath.Join(auxBlockDir, block.IndexFilename), meta.MinTime, meta.MaxTime)
testutil.Ok(t, err)
_, err = metadata.InjectThanos(log.NewNopLogger(), auxBlockDir, metadata.Thanos{
Labels: labels.NewBuilder(extLset).Set("replica", replica).Labels().Map(),
Downsample: metadata.ThanosDownsample{Resolution: 0},
Source: metadata.TestSource,
IndexStats: metadata.IndexStats{SeriesMaxSize: stats.SeriesMaxSize, ChunkMaxSize: stats.ChunkMaxSize},
}, nil)
testutil.Ok(tt, err)

testutil.Ok(tt, block.Upload(ctx, logger, bkt, auxBlockDir, metadata.NoneFunc))
}

chunkPool, err := NewDefaultChunkBytesPool(2e5)
testutil.Ok(tt, err)
Expand Down Expand Up @@ -1026,3 +1031,34 @@ func TestProxyStoreWithTSDBSelector_Acceptance(t *testing.T) {

testStoreAPIsAcceptance(t, startStore)
}

func TestProxyStoreWithReplicas_Acceptance(t *testing.T) {
t.Cleanup(func() { custom.TolerantVerifyLeak(t) })

startStore := func(tt *testing.T, extLset labels.Labels, appendFn func(app storage.Appender)) storepb.StoreServer {
startNestedStore := func(tt *testing.T, extLset labels.Labels, appendFn func(app storage.Appender)) storepb.StoreServer {
db, err := e2eutil.NewTSDB()
testutil.Ok(tt, err)
tt.Cleanup(func() { testutil.Ok(tt, db.Close()) })
appendFn(db.Appender(context.Background()))

return NewTSDBStore(nil, db, component.Rule, extLset)

}

extLset1 := labels.NewBuilder(extLset).Set("replica", "r1").Labels()
extLset2 := labels.NewBuilder(extLset).Set("replica", "r2").Labels()

p1 := startNestedStore(tt, extLset1, appendFn)
p2 := startNestedStore(tt, extLset2, appendFn)

clients := []Client{
storetestutil.TestClient{StoreClient: storepb.ServerAsClient(p1), ExtLset: []labels.Labels{extLset1}},
storetestutil.TestClient{StoreClient: storepb.ServerAsClient(p2), ExtLset: []labels.Labels{extLset2}},
}

return NewProxyStore(nil, nil, func() []Client { return clients }, component.Query, labels.EmptyLabels(), 0*time.Second, RetrievalStrategy(EagerRetrieval))
}

testStoreAPIsAcceptance(t, startStore)
}
31 changes: 23 additions & 8 deletions pkg/store/bucket.go
Expand Up @@ -1749,6 +1749,12 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq
return nil, status.Error(codes.InvalidArgument, errors.Wrap(err, "translate request hints labels matchers").Error())
}
}
extLsetToRemove := make(map[string]struct{})
if len(req.WithoutReplicaLabels) > 0 {
for _, l := range req.WithoutReplicaLabels {
extLsetToRemove[l] = struct{}{}
}
}

g, gctx := errgroup.WithContext(ctx)

Expand Down Expand Up @@ -1805,15 +1811,18 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq
// b.extLset is already sorted by label name, no need to sort it again.
extRes := make([]string, 0, b.extLset.Len())
b.extLset.Range(func(l labels.Label) {
extRes = append(extRes, l.Name)
if _, ok := extLsetToRemove[l.Name]; !ok {
extRes = append(extRes, l.Name)
}
})

result = strutil.MergeSlices(res, extRes)
} else {
seriesReq := &storepb.SeriesRequest{
MinTime: req.Start,
MaxTime: req.End,
SkipChunks: true,
MinTime: req.Start,
MaxTime: req.End,
SkipChunks: true,
WithoutReplicaLabels: req.WithoutReplicaLabels,
}
blockClient := newBlockSeriesClient(
newCtx,
Expand All @@ -1830,7 +1839,7 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq
s.metrics.seriesFetchDurationSum,
nil,
nil,
nil,
extLsetToRemove,
s.enabledLazyExpandedPostings,
s.metrics.lazyExpandedPostingsCount,
s.metrics.lazyExpandedPostingSizeBytes,
Expand Down Expand Up @@ -1932,6 +1941,11 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR
if err != nil {
return nil, status.Error(codes.InvalidArgument, errors.Wrap(err, "translate request labels matchers").Error())
}
for i := range req.WithoutReplicaLabels {
if req.Label == req.WithoutReplicaLabels[i] {
return &storepb.LabelValuesResponse{}, nil
}
}

tenant, _ := tenancy.GetTenantFromGRPCMetadata(ctx)

Expand Down Expand Up @@ -2016,9 +2030,10 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR
result = res
} else {
seriesReq := &storepb.SeriesRequest{
MinTime: req.Start,
MaxTime: req.End,
SkipChunks: true,
MinTime: req.Start,
MaxTime: req.End,
SkipChunks: true,
WithoutReplicaLabels: req.WithoutReplicaLabels,
}
blockClient := newBlockSeriesClient(
newCtx,
Expand Down
14 changes: 13 additions & 1 deletion pkg/store/prometheus.go
Expand Up @@ -606,9 +606,16 @@ func (p *PrometheusStore) LabelNames(ctx context.Context, r *storepb.LabelNamesR
}
}

extLsetToRemove := map[string]struct{}{}
for _, lbl := range r.WithoutReplicaLabels {
extLsetToRemove[lbl] = struct{}{}
}

if len(lbls) > 0 {
extLset.Range(func(l labels.Label) {
lbls = append(lbls, l.Name)
if _, ok := extLsetToRemove[l.Name]; !ok {
lbls = append(lbls, l.Name)
}
})
sort.Strings(lbls)
}
Expand All @@ -621,6 +628,11 @@ func (p *PrometheusStore) LabelValues(ctx context.Context, r *storepb.LabelValue
if r.Label == "" {
return nil, status.Error(codes.InvalidArgument, "label name parameter cannot be empty")
}
for i := range r.WithoutReplicaLabels {
if r.Label == r.WithoutReplicaLabels[i] {
return &storepb.LabelValuesResponse{}, nil
}
}

extLset := p.externalLabelsFn()

Expand Down
2 changes: 2 additions & 0 deletions pkg/store/proxy.go
Expand Up @@ -531,6 +531,7 @@ func (s *ProxyStore) LabelNames(ctx context.Context, r *storepb.LabelNamesReques
Start: r.Start,
End: r.End,
Matchers: append(r.Matchers, MatchersForLabelSets(extraMatchers)...),
WithoutReplicaLabels: r.WithoutReplicaLabels,
})
if err != nil {
err = errors.Wrapf(err, "fetch label names from store %s", st)
Expand Down Expand Up @@ -633,6 +634,7 @@ func (s *ProxyStore) LabelValues(ctx context.Context, r *storepb.LabelValuesRequ
Start: r.Start,
End: r.End,
Matchers: append(r.Matchers, MatchersForLabelSets(extraMatchers)...),
WithoutReplicaLabels: r.WithoutReplicaLabels,
})
if err != nil {
msg := "fetch label values from store %s"
Expand Down

0 comments on commit bd74665

Please sign in to comment.