Skip to content

Commit

Permalink
tsdb: check for context cancel before regex matching postings (#14096)
Browse files Browse the repository at this point in the history
* tsdb: check for context cancel before regex matching postings

Regex matching can be heavy if the regex takes a lot of cycles to
evaluate and we can get stuck evaluating postings for a long time
without this fix. The constant checkContextEveryNIterations=100
may be changed later.

Signed-off-by: György Krajcsovits <gyorgy.krajcsovits@grafana.com>
  • Loading branch information
krajorama committed May 15, 2024
1 parent e6be424 commit fdaafdb
Show file tree
Hide file tree
Showing 7 changed files with 177 additions and 1 deletion.
8 changes: 8 additions & 0 deletions tsdb/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ const (
indexFilename = "index"

seriesByteAlign = 16

// checkContextEveryNIterations is used in some tight loops to check if the context is done.
checkContextEveryNIterations = 100
)

type indexWriterSeries struct {
Expand Down Expand Up @@ -1797,7 +1800,12 @@ func (r *Reader) postingsForLabelMatchingV1(ctx context.Context, name string, ma
}

var its []Postings
count := 1
for val, offset := range e {
if count%checkContextEveryNIterations == 0 && ctx.Err() != nil {
return ErrPostings(ctx.Err())
}
count++
if !match(val) {
continue
}
Expand Down
33 changes: 33 additions & 0 deletions tsdb/index/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -719,3 +719,36 @@ func TestChunksTimeOrdering(t *testing.T) {

require.NoError(t, idx.Close())
}

func TestReader_PostingsForLabelMatchingHonorsContextCancel(t *testing.T) {
dir := t.TempDir()

idx, err := NewWriter(context.Background(), filepath.Join(dir, "index"))
require.NoError(t, err)

seriesCount := 1000
for i := 1; i <= seriesCount; i++ {
require.NoError(t, idx.AddSymbol(fmt.Sprintf("%4d", i)))
}
require.NoError(t, idx.AddSymbol("__name__"))

for i := 1; i <= seriesCount; i++ {
require.NoError(t, idx.AddSeries(storage.SeriesRef(i), labels.FromStrings("__name__", fmt.Sprintf("%4d", i)),
chunks.Meta{Ref: 1, MinTime: 0, MaxTime: 10},
))
}

require.NoError(t, idx.Close())

ir, err := NewFileReader(filepath.Join(dir, "index"))
require.NoError(t, err)
defer ir.Close()

failAfter := uint64(seriesCount / 2) // Fail after processing half of the series.
ctx := &testutil.MockContextErrAfter{FailAfter: failAfter}
p := ir.PostingsForLabelMatching(ctx, "__name__", func(string) bool {
return true
})
require.Error(t, p.Err())
require.Equal(t, failAfter, ctx.Count())
}
5 changes: 5 additions & 0 deletions tsdb/index/postings.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,12 @@ func (p *MemPostings) PostingsForLabelMatching(ctx context.Context, name string,
}

var its []Postings
count := 1
for _, v := range vals {
if count%checkContextEveryNIterations == 0 && ctx.Err() != nil {
return ErrPostings(ctx.Err())
}
count++
if match(v) {
its = append(its, NewListPostings(e[v]))
}
Expand Down
17 changes: 17 additions & 0 deletions tsdb/index/postings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/util/testutil"
)

func TestMemPostings_addFor(t *testing.T) {
Expand Down Expand Up @@ -1282,3 +1283,19 @@ func BenchmarkListPostings(b *testing.B) {
})
}
}

func TestMemPostings_PostingsForLabelMatchingHonorsContextCancel(t *testing.T) {
memP := NewMemPostings()
seriesCount := 10 * checkContextEveryNIterations
for i := 1; i <= seriesCount; i++ {
memP.Add(storage.SeriesRef(i), labels.FromStrings("__name__", fmt.Sprintf("%4d", i)))
}

failAfter := uint64(seriesCount / 2 / checkContextEveryNIterations)
ctx := &testutil.MockContextErrAfter{FailAfter: failAfter}
p := memP.PostingsForLabelMatching(ctx, "__name__", func(string) bool {
return true
})
require.Error(t, p.Err())
require.Equal(t, failAfter+1, ctx.Count()) // Plus one for the Err() call that puts the error in the result.
}
13 changes: 13 additions & 0 deletions tsdb/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ import (
"github.com/prometheus/prometheus/util/annotations"
)

// checkContextEveryNIterations is used in some tight loops to check if the context is done.
const checkContextEveryNIterations = 100

type blockBaseQuerier struct {
blockID ulid.ULID
index IndexReader
Expand Down Expand Up @@ -358,7 +361,12 @@ func inversePostingsForMatcher(ctx context.Context, ix IndexReader, m *labels.Ma
if m.Type == labels.MatchEqual && m.Value == "" {
res = vals
} else {
count := 1
for _, val := range vals {
if count%checkContextEveryNIterations == 0 && ctx.Err() != nil {
return nil, ctx.Err()
}
count++
if !m.Matches(val) {
res = append(res, val)
}
Expand Down Expand Up @@ -387,7 +395,12 @@ func labelValuesWithMatchers(ctx context.Context, r IndexReader, name string, ma
// re-use the allValues slice to avoid allocations
// this is safe because the iteration is always ahead of the append
filteredValues := allValues[:0]
count := 1
for _, v := range allValues {
if count%checkContextEveryNIterations == 0 && ctx.Err() != nil {
return nil, ctx.Err()
}
count++
if m.Matches(v) {
filteredValues = append(filteredValues, v)
}
Expand Down
75 changes: 75 additions & 0 deletions tsdb/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/prometheus/prometheus/tsdb/tombstones"
"github.com/prometheus/prometheus/tsdb/tsdbutil"
"github.com/prometheus/prometheus/util/annotations"
"github.com/prometheus/prometheus/util/testutil"
)

// TODO(bwplotka): Replace those mocks with remote.concreteSeriesSet.
Expand Down Expand Up @@ -3638,3 +3639,77 @@ func TestQueryWithOneChunkCompletelyDeleted(t *testing.T) {
require.NoError(t, css.Err())
require.Equal(t, 1, seriesCount)
}

func TestReader_PostingsForLabelMatchingHonorsContextCancel(t *testing.T) {
ir := mockReaderOfLabels{}

failAfter := uint64(mockReaderOfLabelsSeriesCount / 2 / checkContextEveryNIterations)
ctx := &testutil.MockContextErrAfter{FailAfter: failAfter}
_, err := labelValuesWithMatchers(ctx, ir, "__name__", labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".+"))

require.Error(t, err)
require.Equal(t, failAfter+1, ctx.Count()) // Plus one for the Err() call that puts the error in the result.
}

func TestReader_InversePostingsForMatcherHonorsContextCancel(t *testing.T) {
ir := mockReaderOfLabels{}

failAfter := uint64(mockReaderOfLabelsSeriesCount / 2 / checkContextEveryNIterations)
ctx := &testutil.MockContextErrAfter{FailAfter: failAfter}
_, err := inversePostingsForMatcher(ctx, ir, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*"))

require.Error(t, err)
require.Equal(t, failAfter+1, ctx.Count()) // Plus one for the Err() call that puts the error in the result.
}

type mockReaderOfLabels struct{}

const mockReaderOfLabelsSeriesCount = checkContextEveryNIterations * 10

func (m mockReaderOfLabels) LabelValues(context.Context, string, ...*labels.Matcher) ([]string, error) {
return make([]string, mockReaderOfLabelsSeriesCount), nil
}

func (m mockReaderOfLabels) LabelValueFor(context.Context, storage.SeriesRef, string) (string, error) {
panic("LabelValueFor called")
}

func (m mockReaderOfLabels) SortedLabelValues(context.Context, string, ...*labels.Matcher) ([]string, error) {
panic("SortedLabelValues called")
}

func (m mockReaderOfLabels) Close() error {
return nil
}

func (m mockReaderOfLabels) LabelNames(context.Context, ...*labels.Matcher) ([]string, error) {
panic("LabelNames called")
}

func (m mockReaderOfLabels) LabelNamesFor(context.Context, ...storage.SeriesRef) ([]string, error) {
panic("LabelNamesFor called")
}

func (m mockReaderOfLabels) PostingsForLabelMatching(context.Context, string, func(string) bool) index.Postings {
panic("PostingsForLabelMatching called")
}

func (m mockReaderOfLabels) Postings(context.Context, string, ...string) (index.Postings, error) {
panic("Postings called")
}

func (m mockReaderOfLabels) ShardedPostings(index.Postings, uint64, uint64) index.Postings {
panic("Postings called")
}

func (m mockReaderOfLabels) SortedPostings(index.Postings) index.Postings {
panic("SortedPostings called")
}

func (m mockReaderOfLabels) Series(storage.SeriesRef, *labels.ScratchBuilder, *[]chunks.Meta) error {
panic("Series called")
}

func (m mockReaderOfLabels) Symbols() index.StringIter {
panic("Series called")
}
27 changes: 26 additions & 1 deletion util/testutil/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,12 @@

package testutil

import "time"
import (
"context"
"time"

"go.uber.org/atomic"
)

// A MockContext provides a simple stub implementation of a Context.
type MockContext struct {
Expand All @@ -40,3 +45,23 @@ func (c *MockContext) Err() error {
func (c *MockContext) Value(interface{}) interface{} {
return nil
}

// MockContextErrAfter is a MockContext that will return an error after a certain
// number of calls to Err().
type MockContextErrAfter struct {
MockContext
count atomic.Uint64
FailAfter uint64
}

func (c *MockContextErrAfter) Err() error {
c.count.Inc()
if c.count.Load() >= c.FailAfter {
return context.Canceled
}
return c.MockContext.Err()
}

func (c *MockContextErrAfter) Count() uint64 {
return c.count.Load()
}

0 comments on commit fdaafdb

Please sign in to comment.