Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proxy: acceptance tests for relabel filter #7309

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
19 changes: 14 additions & 5 deletions pkg/store/acceptance_test.go
Expand Up @@ -18,7 +18,9 @@ import (
"github.com/pkg/errors"
"golang.org/x/exp/slices"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/relabel"
"github.com/prometheus/prometheus/model/timestamp"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
Expand Down Expand Up @@ -991,7 +993,7 @@ func TestTSDBStore_Acceptance(t *testing.T) {
testStoreAPIsSeriesSplitSamplesIntoChunksWithMaxSizeOf120(t, startStore)
}

func TestProxyStore_Acceptance(t *testing.T) {
func TestProxyStoreWithTSDBSelector_Acceptance(t *testing.T) {
t.Cleanup(func() { custom.TolerantVerifyLeak(t) })

startStore := func(tt *testing.T, extLset labels.Labels, appendFn func(app storage.Appender)) storepb.StoreServer {
Expand All @@ -1002,17 +1004,24 @@ func TestProxyStore_Acceptance(t *testing.T) {
appendFn(db.Appender(context.Background()))

return NewTSDBStore(nil, db, component.Rule, extLset)

}

p1 := startNestedStore(tt, extLset, appendFn)
p2 := startNestedStore(tt, extLset, appendFn)
p2 := startNestedStore(tt, labels.FromStrings("some", "label"), appendFn)

clients := []Client{
storetestutil.TestClient{StoreClient: storepb.ServerAsClient(p1)},
storetestutil.TestClient{StoreClient: storepb.ServerAsClient(p2)},
storetestutil.TestClient{StoreClient: storepb.ServerAsClient(p1), ExtLset: []labels.Labels{extLset}},
storetestutil.TestClient{StoreClient: storepb.ServerAsClient(p2), ExtLset: []labels.Labels{labels.FromStrings("some", "label")}},
}

return NewProxyStore(nil, nil, func() []Client { return clients }, component.Query, labels.EmptyLabels(), 0*time.Second, RetrievalStrategy(EagerRetrieval))
relabelCfgs := []*relabel.Config{{
SourceLabels: model.LabelNames([]model.LabelName{"some"}),
Regex: relabel.MustNewRegexp("label"),
Action: relabel.Drop,
}}

return NewProxyStore(nil, nil, func() []Client { return clients }, component.Query, labels.EmptyLabels(), 0*time.Second, RetrievalStrategy(EagerRetrieval), WithTSDBSelector(NewTSDBSelector(relabelCfgs)))
}

testStoreAPIsAcceptance(t, startStore)
Expand Down
39 changes: 30 additions & 9 deletions pkg/store/proxy.go
Expand Up @@ -274,8 +274,12 @@ func (s *ProxyStore) TimeRange() (int64, int64) {

func (s *ProxyStore) TSDBInfos() []infopb.TSDBInfo {
infos := make([]infopb.TSDBInfo, 0)
for _, store := range s.stores() {
infos = append(infos, store.TSDBInfos()...)
for _, st := range s.stores() {
matches, _ := s.tsdbSelector.MatchLabelSets(st.LabelSets()...)
if !matches {
continue
}
infos = append(infos, st.TSDBInfos()...)
}
return infos
}
Expand Down Expand Up @@ -336,13 +340,15 @@ func (s *ProxyStore) Series(originalRequest *storepb.SeriesRequest, srv storepb.
// We might be able to skip the store if its meta information indicates it cannot have series matching our query.
if ok, reason := storeMatches(ctx, st, s.debugLogging, originalRequest.MinTime, originalRequest.MaxTime, matchers...); !ok {
if s.debugLogging {
storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("store %s filtered out: %v", st, reason))
storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("Store %s filtered out due to: %v", st, reason))
}
continue
}

matches, extraMatchers := s.tsdbSelector.MatchLabelSets(st.LabelSets()...)
if !matches {
if s.debugLogging {
storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("Store %s filtered out due to: %v", st, "tsdb selector"))
}
continue
}
storeLabelSets = append(storeLabelSets, extraMatchers...)
Expand All @@ -360,7 +366,7 @@ func (s *ProxyStore) Series(originalRequest *storepb.SeriesRequest, srv storepb.
for _, st := range stores {
st := st
if s.debugLogging {
storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("store %s queried", st))
storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("Store %s queried", st))
}

respSet, err := newAsyncRespSet(ctx, st, r, s.responseTimeout, s.retrievalStrategy, &s.buffers, r.ShardInfo, reqLogger, s.metrics.emptyStreamResponses)
Expand Down Expand Up @@ -503,10 +509,18 @@ func (s *ProxyStore) LabelNames(ctx context.Context, r *storepb.LabelNamesReques
// We might be able to skip the store if its meta information indicates it cannot have series matching our query.
if ok, reason := storeMatches(gctx, st, s.debugLogging, r.Start, r.End); !ok {
if s.debugLogging {
storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("Store %s filtered out due to %v", st, reason))
storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("Store %s filtered out due to: %v", st, reason))
}
continue
}
matches, extraMatchers := s.tsdbSelector.MatchLabelSets(st.LabelSets()...)
if !matches {
if s.debugLogging {
storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("Store %s filtered out due to: %v", st, "tsdb selector"))
}
continue
}

if s.debugLogging {
storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("Store %s queried", st))
}
Expand All @@ -516,7 +530,7 @@ func (s *ProxyStore) LabelNames(ctx context.Context, r *storepb.LabelNamesReques
PartialResponseDisabled: r.PartialResponseDisabled,
Start: r.Start,
End: r.End,
Matchers: r.Matchers,
Matchers: append(r.Matchers, MatchersForLabelSets(extraMatchers)...),
})
if err != nil {
err = errors.Wrapf(err, "fetch label names from store %s", st)
Expand Down Expand Up @@ -590,7 +604,14 @@ func (s *ProxyStore) LabelValues(ctx context.Context, r *storepb.LabelValuesRequ
// We might be able to skip the store if its meta information indicates it cannot have series matching our query.
if ok, reason := storeMatches(gctx, st, s.debugLogging, r.Start, r.End); !ok {
if s.debugLogging {
storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("Store %s filtered out due to %v", st, reason))
storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("Store %s filtered out due to: %v", st, reason))
}
continue
}
matches, extraMatchers := s.tsdbSelector.MatchLabelSets(st.LabelSets()...)
if !matches {
if s.debugLogging {
storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("Store %s filtered out due to: %v", st, "tsdb selector"))
}
continue
}
Expand All @@ -611,7 +632,7 @@ func (s *ProxyStore) LabelValues(ctx context.Context, r *storepb.LabelValuesRequ
PartialResponseDisabled: r.PartialResponseDisabled,
Start: r.Start,
End: r.End,
Matchers: r.Matchers,
Matchers: append(r.Matchers, MatchersForLabelSets(extraMatchers)...),
})
if err != nil {
msg := "fetch label values from store %s"
Expand Down