Skip to content

Commit

Permalink
Proxy: acceptance tests for relabel filter
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaHoffmann committed Apr 26, 2024
1 parent e6fc833 commit 1953fdb
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 14 deletions.
19 changes: 14 additions & 5 deletions pkg/store/acceptance_test.go
Expand Up @@ -18,7 +18,9 @@ import (
"github.com/pkg/errors"
"golang.org/x/exp/slices"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/relabel"
"github.com/prometheus/prometheus/model/timestamp"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
Expand Down Expand Up @@ -991,7 +993,7 @@ func TestTSDBStore_Acceptance(t *testing.T) {
testStoreAPIsSeriesSplitSamplesIntoChunksWithMaxSizeOf120(t, startStore)
}

func TestProxyStore_Acceptance(t *testing.T) {
func TestProxyStoreWithTSDBSelector_Acceptance(t *testing.T) {
t.Cleanup(func() { custom.TolerantVerifyLeak(t) })

startStore := func(tt *testing.T, extLset labels.Labels, appendFn func(app storage.Appender)) storepb.StoreServer {
Expand All @@ -1002,17 +1004,24 @@ func TestProxyStore_Acceptance(t *testing.T) {
appendFn(db.Appender(context.Background()))

return NewTSDBStore(nil, db, component.Rule, extLset)

}

p1 := startNestedStore(tt, extLset, appendFn)
p2 := startNestedStore(tt, extLset, appendFn)
p2 := startNestedStore(tt, labels.FromStrings("some", "label"), appendFn)

clients := []Client{
storetestutil.TestClient{StoreClient: storepb.ServerAsClient(p1)},
storetestutil.TestClient{StoreClient: storepb.ServerAsClient(p2)},
storetestutil.TestClient{StoreClient: storepb.ServerAsClient(p1), ExtLset: []labels.Labels{extLset}},
storetestutil.TestClient{StoreClient: storepb.ServerAsClient(p2), ExtLset: []labels.Labels{labels.FromStrings("some", "label")}},
}

return NewProxyStore(nil, nil, func() []Client { return clients }, component.Query, labels.EmptyLabels(), 0*time.Second, RetrievalStrategy(EagerRetrieval))
relabelCfgs := []*relabel.Config{{
SourceLabels: model.LabelNames([]model.LabelName{"some"}),
Regex: relabel.MustNewRegexp("label"),
Action: relabel.Drop,
}}

return NewProxyStore(nil, nil, func() []Client { return clients }, component.Query, labels.EmptyLabels(), 0*time.Second, RetrievalStrategy(EagerRetrieval), WithTSDBSelector(NewTSDBSelector(relabelCfgs)))
}

testStoreAPIsAcceptance(t, startStore)
Expand Down
39 changes: 30 additions & 9 deletions pkg/store/proxy.go
Expand Up @@ -274,8 +274,12 @@ func (s *ProxyStore) TimeRange() (int64, int64) {

func (s *ProxyStore) TSDBInfos() []infopb.TSDBInfo {
infos := make([]infopb.TSDBInfo, 0)
for _, store := range s.stores() {
infos = append(infos, store.TSDBInfos()...)
for _, st := range s.stores() {
matches, _ := s.tsdbSelector.MatchLabelSets(st.LabelSets()...)
if !matches {
continue
}
infos = append(infos, st.TSDBInfos()...)
}
return infos
}
Expand Down Expand Up @@ -336,13 +340,15 @@ func (s *ProxyStore) Series(originalRequest *storepb.SeriesRequest, srv storepb.
// We might be able to skip the store if its meta information indicates it cannot have series matching our query.
if ok, reason := storeMatches(ctx, st, s.debugLogging, originalRequest.MinTime, originalRequest.MaxTime, matchers...); !ok {
if s.debugLogging {
storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("store %s filtered out: %v", st, reason))
storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("Store %s filtered out due to: %v", st, reason))
}
continue
}

matches, extraMatchers := s.tsdbSelector.MatchLabelSets(st.LabelSets()...)
if !matches {
if s.debugLogging {
storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("Store %s filtered out due to: %v", st, "tsdb selector"))
}
continue
}
storeLabelSets = append(storeLabelSets, extraMatchers...)
Expand All @@ -360,7 +366,7 @@ func (s *ProxyStore) Series(originalRequest *storepb.SeriesRequest, srv storepb.
for _, st := range stores {
st := st
if s.debugLogging {
storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("store %s queried", st))
storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("Store %s queried", st))
}

respSet, err := newAsyncRespSet(ctx, st, r, s.responseTimeout, s.retrievalStrategy, &s.buffers, r.ShardInfo, reqLogger, s.metrics.emptyStreamResponses)
Expand Down Expand Up @@ -503,10 +509,18 @@ func (s *ProxyStore) LabelNames(ctx context.Context, r *storepb.LabelNamesReques
// We might be able to skip the store if its meta information indicates it cannot have series matching our query.
if ok, reason := storeMatches(gctx, st, s.debugLogging, r.Start, r.End); !ok {
if s.debugLogging {
storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("Store %s filtered out due to %v", st, reason))
storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("Store %s filtered out due to: %v", st, reason))
}
continue
}
matches, extraMatchers := s.tsdbSelector.MatchLabelSets(st.LabelSets()...)
if !matches {
if s.debugLogging {
storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("Store %s filtered out due to: %v", st, "tsdb selector"))
}
continue
}

if s.debugLogging {
storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("Store %s queried", st))
}
Expand All @@ -516,7 +530,7 @@ func (s *ProxyStore) LabelNames(ctx context.Context, r *storepb.LabelNamesReques
PartialResponseDisabled: r.PartialResponseDisabled,
Start: r.Start,
End: r.End,
Matchers: r.Matchers,
Matchers: append(r.Matchers, MatchersForLabelSets(extraMatchers)...),
})
if err != nil {
err = errors.Wrapf(err, "fetch label names from store %s", st)
Expand Down Expand Up @@ -590,7 +604,14 @@ func (s *ProxyStore) LabelValues(ctx context.Context, r *storepb.LabelValuesRequ
// We might be able to skip the store if its meta information indicates it cannot have series matching our query.
if ok, reason := storeMatches(gctx, st, s.debugLogging, r.Start, r.End); !ok {
if s.debugLogging {
storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("Store %s filtered out due to %v", st, reason))
storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("Store %s filtered out due to: %v", st, reason))
}
continue
}
matches, extraMatchers := s.tsdbSelector.MatchLabelSets(st.LabelSets()...)
if !matches {
if s.debugLogging {
storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("Store %s filtered out due to: %v", st, "tsdb selector"))
}
continue
}
Expand All @@ -611,7 +632,7 @@ func (s *ProxyStore) LabelValues(ctx context.Context, r *storepb.LabelValuesRequ
PartialResponseDisabled: r.PartialResponseDisabled,
Start: r.Start,
End: r.End,
Matchers: r.Matchers,
Matchers: append(r.Matchers, MatchersForLabelSets(extraMatchers)...),
})
if err != nil {
msg := "fetch label values from store %s"
Expand Down

0 comments on commit 1953fdb

Please sign in to comment.