Skip to content

Commit

Permalink
Stores: handle replica labels in label_value and label_names grpcs
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaHoffmann committed Apr 27, 2024
1 parent 044e5ed commit c2c9de3
Show file tree
Hide file tree
Showing 8 changed files with 282 additions and 122 deletions.
21 changes: 16 additions & 5 deletions pkg/query/querier.go
Expand Up @@ -399,14 +399,19 @@ func (q *querier) LabelValues(ctx context.Context, name string, matchers ...*lab
if err != nil {
return nil, nil, errors.Wrap(err, "converting prom matchers to storepb matchers")
}

resp, err := q.proxy.LabelValues(ctx, &storepb.LabelValuesRequest{
req := &storepb.LabelValuesRequest{
Label: name,
PartialResponseStrategy: q.partialResponseStrategy,
Start: q.mint,
End: q.maxt,
Matchers: pbMatchers,
})
}

if q.isDedupEnabled() {
req.WithoutReplicaLabels = q.replicaLabels
}

resp, err := q.proxy.LabelValues(ctx, req)
if err != nil {
return nil, nil, errors.Wrap(err, "proxy LabelValues()")
}
Expand All @@ -433,12 +438,18 @@ func (q *querier) LabelNames(ctx context.Context, matchers ...*labels.Matcher) (
return nil, nil, errors.Wrap(err, "converting prom matchers to storepb matchers")
}

resp, err := q.proxy.LabelNames(ctx, &storepb.LabelNamesRequest{
req := &storepb.LabelNamesRequest{
PartialResponseStrategy: q.partialResponseStrategy,
Start: q.mint,
End: q.maxt,
Matchers: pbMatchers,
})
}

if q.isDedupEnabled() {
req.WithoutReplicaLabels = q.replicaLabels
}

resp, err := q.proxy.LabelNames(ctx, req)
if err != nil {
return nil, nil, errors.Wrap(err, "proxy LabelNames()")
}
Expand Down
49 changes: 26 additions & 23 deletions pkg/store/acceptance_test.go
Expand Up @@ -721,9 +721,10 @@ func testStoreAPIsAcceptance(t *testing.T, startStore startStoreFn) {
for _, c := range tc.labelNameCalls {
t.Run("label_names", func(t *testing.T) {
resp, err := store.LabelNames(context.Background(), &storepb.LabelNamesRequest{
Start: c.start,
End: c.end,
Matchers: c.matchers,
Start: c.start,
End: c.end,
Matchers: c.matchers,
WithoutReplicaLabels: []string{"replica"},
})
if c.expectErr != nil {
testutil.NotOk(t, err)
Expand All @@ -741,10 +742,11 @@ func testStoreAPIsAcceptance(t *testing.T, startStore startStoreFn) {
for _, c := range tc.labelValuesCalls {
t.Run("label_values", func(t *testing.T) {
resp, err := store.LabelValues(context.Background(), &storepb.LabelValuesRequest{
Start: c.start,
End: c.end,
Label: c.label,
Matchers: c.matchers,
Start: c.start,
End: c.end,
Label: c.label,
Matchers: c.matchers,
WithoutReplicaLabels: []string{"replica"},
})
if c.expectErr != nil {
testutil.NotOk(t, err)
Expand Down Expand Up @@ -882,23 +884,24 @@ func TestBucketStore_Acceptance(t *testing.T) {
tt.Skip("Bucket Store cannot handle empty HEAD")
}

id := createBlockFromHead(tt, auxDir, h)
for _, replica := range []string{"r1", "r2"} {
id := createBlockFromHead(tt, auxDir, h)

auxBlockDir := filepath.Join(auxDir, id.String())
meta, err := metadata.ReadFromDir(auxBlockDir)
testutil.Ok(t, err)
stats, err := block.GatherIndexHealthStats(ctx, logger, filepath.Join(auxBlockDir, block.IndexFilename), meta.MinTime, meta.MaxTime)
testutil.Ok(t, err)
_, err = metadata.InjectThanos(log.NewNopLogger(), auxBlockDir, metadata.Thanos{
Labels: extLset.Map(),
Downsample: metadata.ThanosDownsample{Resolution: 0},
Source: metadata.TestSource,
IndexStats: metadata.IndexStats{SeriesMaxSize: stats.SeriesMaxSize, ChunkMaxSize: stats.ChunkMaxSize},
}, nil)
testutil.Ok(tt, err)

testutil.Ok(tt, block.Upload(ctx, logger, bkt, auxBlockDir, metadata.NoneFunc))
testutil.Ok(tt, block.Upload(ctx, logger, bkt, auxBlockDir, metadata.NoneFunc))
auxBlockDir := filepath.Join(auxDir, id.String())
meta, err := metadata.ReadFromDir(auxBlockDir)
testutil.Ok(t, err)
stats, err := block.GatherIndexHealthStats(ctx, logger, filepath.Join(auxBlockDir, block.IndexFilename), meta.MinTime, meta.MaxTime)
testutil.Ok(t, err)
_, err = metadata.InjectThanos(log.NewNopLogger(), auxBlockDir, metadata.Thanos{
Labels: labels.NewBuilder(extLset).Set("replica", replica).Labels().Map(),
Downsample: metadata.ThanosDownsample{Resolution: 0},
Source: metadata.TestSource,
IndexStats: metadata.IndexStats{SeriesMaxSize: stats.SeriesMaxSize, ChunkMaxSize: stats.ChunkMaxSize},
}, nil)
testutil.Ok(tt, err)

testutil.Ok(tt, block.Upload(ctx, logger, bkt, auxBlockDir, metadata.NoneFunc))
}

chunkPool, err := NewDefaultChunkBytesPool(2e5)
testutil.Ok(tt, err)
Expand Down
31 changes: 23 additions & 8 deletions pkg/store/bucket.go
Expand Up @@ -1734,6 +1734,12 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq
return nil, status.Error(codes.InvalidArgument, errors.Wrap(err, "translate request hints labels matchers").Error())
}
}
extLsetToRemove := make(map[string]struct{})
if len(req.WithoutReplicaLabels) > 0 {
for _, l := range req.WithoutReplicaLabels {
extLsetToRemove[l] = struct{}{}
}
}

g, gctx := errgroup.WithContext(ctx)

Expand Down Expand Up @@ -1790,15 +1796,18 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq
// b.extLset is already sorted by label name, no need to sort it again.
extRes := make([]string, 0, b.extLset.Len())
b.extLset.Range(func(l labels.Label) {
extRes = append(extRes, l.Name)
if _, ok := extLsetToRemove[l.Name]; !ok {
extRes = append(extRes, l.Name)
}
})

result = strutil.MergeSlices(res, extRes)
} else {
seriesReq := &storepb.SeriesRequest{
MinTime: req.Start,
MaxTime: req.End,
SkipChunks: true,
MinTime: req.Start,
MaxTime: req.End,
SkipChunks: true,
WithoutReplicaLabels: req.WithoutReplicaLabels,
}
blockClient := newBlockSeriesClient(
newCtx,
Expand All @@ -1815,7 +1824,7 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq
s.metrics.seriesFetchDurationSum,
nil,
nil,
nil,
extLsetToRemove,
s.enabledLazyExpandedPostings,
s.metrics.lazyExpandedPostingsCount,
s.metrics.lazyExpandedPostingSizeBytes,
Expand Down Expand Up @@ -1917,6 +1926,11 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR
if err != nil {
return nil, status.Error(codes.InvalidArgument, errors.Wrap(err, "translate request labels matchers").Error())
}
for i := range req.WithoutReplicaLabels {
if req.Label == req.WithoutReplicaLabels[i] {
return &storepb.LabelValuesResponse{}, nil
}
}

tenant, _ := tenancy.GetTenantFromGRPCMetadata(ctx)

Expand Down Expand Up @@ -2001,9 +2015,10 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR
result = res
} else {
seriesReq := &storepb.SeriesRequest{
MinTime: req.Start,
MaxTime: req.End,
SkipChunks: true,
MinTime: req.Start,
MaxTime: req.End,
SkipChunks: true,
WithoutReplicaLabels: req.WithoutReplicaLabels,
}
blockClient := newBlockSeriesClient(
newCtx,
Expand Down
14 changes: 13 additions & 1 deletion pkg/store/prometheus.go
Expand Up @@ -606,9 +606,16 @@ func (p *PrometheusStore) LabelNames(ctx context.Context, r *storepb.LabelNamesR
}
}

extLsetToRemove := map[string]struct{}{}
for _, lbl := range r.WithoutReplicaLabels {
extLsetToRemove[lbl] = struct{}{}
}

if len(lbls) > 0 {
extLset.Range(func(l labels.Label) {
lbls = append(lbls, l.Name)
if _, ok := extLsetToRemove[l.Name]; !ok {
lbls = append(lbls, l.Name)
}
})
sort.Strings(lbls)
}
Expand All @@ -621,6 +628,11 @@ func (p *PrometheusStore) LabelValues(ctx context.Context, r *storepb.LabelValue
if r.Label == "" {
return nil, status.Error(codes.InvalidArgument, "label name parameter cannot be empty")
}
for i := range r.WithoutReplicaLabels {
if r.Label == r.WithoutReplicaLabels[i] {
return &storepb.LabelValuesResponse{}, nil
}
}

extLset := p.externalLabelsFn()

Expand Down
2 changes: 2 additions & 0 deletions pkg/store/proxy.go
Expand Up @@ -517,6 +517,7 @@ func (s *ProxyStore) LabelNames(ctx context.Context, r *storepb.LabelNamesReques
Start: r.Start,
End: r.End,
Matchers: r.Matchers,
WithoutReplicaLabels: r.WithoutReplicaLabels,
})
if err != nil {
err = errors.Wrapf(err, "fetch label names from store %s", st)
Expand Down Expand Up @@ -612,6 +613,7 @@ func (s *ProxyStore) LabelValues(ctx context.Context, r *storepb.LabelValuesRequ
Start: r.Start,
End: r.End,
Matchers: r.Matchers,
WithoutReplicaLabels: r.WithoutReplicaLabels,
})
if err != nil {
msg := "fetch label values from store %s"
Expand Down

0 comments on commit c2c9de3

Please sign in to comment.