Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stores: respect replica labels in LabelValues and LabelNames #7310

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
21 changes: 16 additions & 5 deletions pkg/query/querier.go
Expand Up @@ -399,14 +399,19 @@ func (q *querier) LabelValues(ctx context.Context, name string, matchers ...*lab
if err != nil {
return nil, nil, errors.Wrap(err, "converting prom matchers to storepb matchers")
}

resp, err := q.proxy.LabelValues(ctx, &storepb.LabelValuesRequest{
req := &storepb.LabelValuesRequest{
Label: name,
PartialResponseStrategy: q.partialResponseStrategy,
Start: q.mint,
End: q.maxt,
Matchers: pbMatchers,
})
}

if q.isDedupEnabled() {
req.WithoutReplicaLabels = q.replicaLabels
}

resp, err := q.proxy.LabelValues(ctx, req)
if err != nil {
return nil, nil, errors.Wrap(err, "proxy LabelValues()")
}
Expand All @@ -433,12 +438,18 @@ func (q *querier) LabelNames(ctx context.Context, matchers ...*labels.Matcher) (
return nil, nil, errors.Wrap(err, "converting prom matchers to storepb matchers")
}

resp, err := q.proxy.LabelNames(ctx, &storepb.LabelNamesRequest{
req := &storepb.LabelNamesRequest{
PartialResponseStrategy: q.partialResponseStrategy,
Start: q.mint,
End: q.maxt,
Matchers: pbMatchers,
})
}

if q.isDedupEnabled() {
req.WithoutReplicaLabels = q.replicaLabels
}

resp, err := q.proxy.LabelNames(ctx, req)
if err != nil {
return nil, nil, errors.Wrap(err, "proxy LabelNames()")
}
Expand Down
92 changes: 64 additions & 28 deletions pkg/store/acceptance_test.go
Expand Up @@ -109,6 +109,7 @@ func testStoreAPIsAcceptance(t *testing.T, startStore startStoreFn) {
},
labelValuesCalls: []labelValuesCallCase{
{start: timestamp.FromTime(minTime), end: timestamp.FromTime(maxTime), label: "foo", expectedValues: []string{"foovalue1"}},
{start: timestamp.FromTime(minTime), end: timestamp.FromTime(maxTime), label: "replica"},
},
},
{
Expand Down Expand Up @@ -722,9 +723,10 @@ func testStoreAPIsAcceptance(t *testing.T, startStore startStoreFn) {
for _, c := range tc.labelNameCalls {
t.Run("label_names", func(t *testing.T) {
resp, err := store.LabelNames(context.Background(), &storepb.LabelNamesRequest{
Start: c.start,
End: c.end,
Matchers: c.matchers,
Start: c.start,
End: c.end,
Matchers: c.matchers,
WithoutReplicaLabels: []string{"replica"},
})
if c.expectErr != nil {
testutil.NotOk(t, err)
Expand All @@ -740,12 +742,13 @@ func testStoreAPIsAcceptance(t *testing.T, startStore startStoreFn) {
})
}
for _, c := range tc.labelValuesCalls {
t.Run("label_name_values", func(t *testing.T) {
t.Run("label_values", func(t *testing.T) {
resp, err := store.LabelValues(context.Background(), &storepb.LabelValuesRequest{
Start: c.start,
End: c.end,
Label: c.label,
Matchers: c.matchers,
Start: c.start,
End: c.end,
Label: c.label,
Matchers: c.matchers,
WithoutReplicaLabels: []string{"replica"},
})
if c.expectErr != nil {
testutil.NotOk(t, err)
Expand All @@ -764,10 +767,11 @@ func testStoreAPIsAcceptance(t *testing.T, startStore startStoreFn) {
t.Run("series", func(t *testing.T) {
srv := newStoreSeriesServer(context.Background())
err := store.Series(&storepb.SeriesRequest{
MinTime: c.start,
MaxTime: c.end,
Matchers: c.matchers,
SkipChunks: c.skipChunks,
MinTime: c.start,
MaxTime: c.end,
Matchers: c.matchers,
SkipChunks: c.skipChunks,
WithoutReplicaLabels: []string{"replica"},
}, srv)
if c.expectErr != nil {
testutil.NotOk(t, err)
Expand Down Expand Up @@ -882,23 +886,24 @@ func TestBucketStore_Acceptance(t *testing.T) {
tt.Skip("Bucket Store cannot handle empty HEAD")
}

id := createBlockFromHead(tt, auxDir, h)
for _, replica := range []string{"r1", "r2"} {
id := createBlockFromHead(tt, auxDir, h)

auxBlockDir := filepath.Join(auxDir, id.String())
meta, err := metadata.ReadFromDir(auxBlockDir)
testutil.Ok(t, err)
stats, err := block.GatherIndexHealthStats(ctx, logger, filepath.Join(auxBlockDir, block.IndexFilename), meta.MinTime, meta.MaxTime)
testutil.Ok(t, err)
_, err = metadata.InjectThanos(log.NewNopLogger(), auxBlockDir, metadata.Thanos{
Labels: extLset.Map(),
Downsample: metadata.ThanosDownsample{Resolution: 0},
Source: metadata.TestSource,
IndexStats: metadata.IndexStats{SeriesMaxSize: stats.SeriesMaxSize, ChunkMaxSize: stats.ChunkMaxSize},
}, nil)
testutil.Ok(tt, err)

testutil.Ok(tt, block.Upload(ctx, logger, bkt, auxBlockDir, metadata.NoneFunc))
testutil.Ok(tt, block.Upload(ctx, logger, bkt, auxBlockDir, metadata.NoneFunc))
auxBlockDir := filepath.Join(auxDir, id.String())
meta, err := metadata.ReadFromDir(auxBlockDir)
testutil.Ok(t, err)
stats, err := block.GatherIndexHealthStats(ctx, logger, filepath.Join(auxBlockDir, block.IndexFilename), meta.MinTime, meta.MaxTime)
testutil.Ok(t, err)
_, err = metadata.InjectThanos(log.NewNopLogger(), auxBlockDir, metadata.Thanos{
Labels: labels.NewBuilder(extLset).Set("replica", replica).Labels().Map(),
Downsample: metadata.ThanosDownsample{Resolution: 0},
Source: metadata.TestSource,
IndexStats: metadata.IndexStats{SeriesMaxSize: stats.SeriesMaxSize, ChunkMaxSize: stats.ChunkMaxSize},
}, nil)
testutil.Ok(tt, err)

testutil.Ok(tt, block.Upload(ctx, logger, bkt, auxBlockDir, metadata.NoneFunc))
}

chunkPool, err := NewDefaultChunkBytesPool(2e5)
testutil.Ok(tt, err)
Expand Down Expand Up @@ -1026,3 +1031,34 @@ func TestProxyStoreWithTSDBSelector_Acceptance(t *testing.T) {

testStoreAPIsAcceptance(t, startStore)
}

func TestProxyStoreWithReplicas_Acceptance(t *testing.T) {
t.Cleanup(func() { custom.TolerantVerifyLeak(t) })

startStore := func(tt *testing.T, extLset labels.Labels, appendFn func(app storage.Appender)) storepb.StoreServer {
startNestedStore := func(tt *testing.T, extLset labels.Labels, appendFn func(app storage.Appender)) storepb.StoreServer {
db, err := e2eutil.NewTSDB()
testutil.Ok(tt, err)
tt.Cleanup(func() { testutil.Ok(tt, db.Close()) })
appendFn(db.Appender(context.Background()))

return NewTSDBStore(nil, db, component.Rule, extLset)

}

extLset1 := labels.NewBuilder(extLset).Set("replica", "r1").Labels()
extLset2 := labels.NewBuilder(extLset).Set("replica", "r2").Labels()

p1 := startNestedStore(tt, extLset1, appendFn)
p2 := startNestedStore(tt, extLset2, appendFn)

clients := []Client{
storetestutil.TestClient{StoreClient: storepb.ServerAsClient(p1), ExtLset: []labels.Labels{extLset1}},
storetestutil.TestClient{StoreClient: storepb.ServerAsClient(p2), ExtLset: []labels.Labels{extLset2}},
}

return NewProxyStore(nil, nil, func() []Client { return clients }, component.Query, labels.EmptyLabels(), 0*time.Second, RetrievalStrategy(EagerRetrieval))
}

testStoreAPIsAcceptance(t, startStore)
}
31 changes: 23 additions & 8 deletions pkg/store/bucket.go
Expand Up @@ -1749,6 +1749,12 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq
return nil, status.Error(codes.InvalidArgument, errors.Wrap(err, "translate request hints labels matchers").Error())
}
}
extLsetToRemove := make(map[string]struct{})
if len(req.WithoutReplicaLabels) > 0 {
for _, l := range req.WithoutReplicaLabels {
extLsetToRemove[l] = struct{}{}
}
}

g, gctx := errgroup.WithContext(ctx)

Expand Down Expand Up @@ -1805,15 +1811,18 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq
// b.extLset is already sorted by label name, no need to sort it again.
extRes := make([]string, 0, b.extLset.Len())
b.extLset.Range(func(l labels.Label) {
extRes = append(extRes, l.Name)
if _, ok := extLsetToRemove[l.Name]; !ok {
extRes = append(extRes, l.Name)
}
})

result = strutil.MergeSlices(res, extRes)
} else {
seriesReq := &storepb.SeriesRequest{
MinTime: req.Start,
MaxTime: req.End,
SkipChunks: true,
MinTime: req.Start,
MaxTime: req.End,
SkipChunks: true,
WithoutReplicaLabels: req.WithoutReplicaLabels,
}
blockClient := newBlockSeriesClient(
newCtx,
Expand All @@ -1830,7 +1839,7 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq
s.metrics.seriesFetchDurationSum,
nil,
nil,
nil,
extLsetToRemove,
s.enabledLazyExpandedPostings,
s.metrics.lazyExpandedPostingsCount,
s.metrics.lazyExpandedPostingSizeBytes,
Expand Down Expand Up @@ -1932,6 +1941,11 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR
if err != nil {
return nil, status.Error(codes.InvalidArgument, errors.Wrap(err, "translate request labels matchers").Error())
}
for i := range req.WithoutReplicaLabels {
if req.Label == req.WithoutReplicaLabels[i] {
return &storepb.LabelValuesResponse{}, nil
}
}

tenant, _ := tenancy.GetTenantFromGRPCMetadata(ctx)

Expand Down Expand Up @@ -2016,9 +2030,10 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR
result = res
} else {
seriesReq := &storepb.SeriesRequest{
MinTime: req.Start,
MaxTime: req.End,
SkipChunks: true,
MinTime: req.Start,
MaxTime: req.End,
SkipChunks: true,
WithoutReplicaLabels: req.WithoutReplicaLabels,
}
blockClient := newBlockSeriesClient(
newCtx,
Expand Down
14 changes: 13 additions & 1 deletion pkg/store/prometheus.go
Expand Up @@ -606,9 +606,16 @@ func (p *PrometheusStore) LabelNames(ctx context.Context, r *storepb.LabelNamesR
}
}

extLsetToRemove := map[string]struct{}{}
for _, lbl := range r.WithoutReplicaLabels {
extLsetToRemove[lbl] = struct{}{}
}

if len(lbls) > 0 {
extLset.Range(func(l labels.Label) {
lbls = append(lbls, l.Name)
if _, ok := extLsetToRemove[l.Name]; !ok {
lbls = append(lbls, l.Name)
}
})
sort.Strings(lbls)
}
Expand All @@ -621,6 +628,11 @@ func (p *PrometheusStore) LabelValues(ctx context.Context, r *storepb.LabelValue
if r.Label == "" {
return nil, status.Error(codes.InvalidArgument, "label name parameter cannot be empty")
}
for i := range r.WithoutReplicaLabels {
if r.Label == r.WithoutReplicaLabels[i] {
return &storepb.LabelValuesResponse{}, nil
}
}

extLset := p.externalLabelsFn()

Expand Down
2 changes: 2 additions & 0 deletions pkg/store/proxy.go
Expand Up @@ -531,6 +531,7 @@ func (s *ProxyStore) LabelNames(ctx context.Context, r *storepb.LabelNamesReques
Start: r.Start,
End: r.End,
Matchers: append(r.Matchers, MatchersForLabelSets(extraMatchers)...),
WithoutReplicaLabels: r.WithoutReplicaLabels,
})
if err != nil {
err = errors.Wrapf(err, "fetch label names from store %s", st)
Expand Down Expand Up @@ -633,6 +634,7 @@ func (s *ProxyStore) LabelValues(ctx context.Context, r *storepb.LabelValuesRequ
Start: r.Start,
End: r.End,
Matchers: append(r.Matchers, MatchersForLabelSets(extraMatchers)...),
WithoutReplicaLabels: r.WithoutReplicaLabels,
})
if err != nil {
msg := "fetch label values from store %s"
Expand Down