Skip to content

Commit

Permalink
Merge pull request #13980 from prometheus/gotjosh/restore-only-with-r…
Browse files Browse the repository at this point in the history
…ule-query

Rule Manager: Only query once per alert rule when restoring alert state
  • Loading branch information
gotjosh committed Apr 30, 2024
2 parents e491deb + 379dec9 commit 1dd0bff
Show file tree
Hide file tree
Showing 5 changed files with 159 additions and 129 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## unreleased

* [CHANGE] TSDB: Fix the predicate checking for blocks which are beyond the retention period to include the ones right at the retention boundary. #9633
* [CHANGE] Rules: Execute 1 query instead of N (where N is the number of alerts within alert rule) when restoring alerts. #13980
* [ENHANCEMENT] Rules: Add `rule_group_last_restore_duration_seconds` to measure the time it takes to restore a rule group. #13974
* [ENHANCEMENT] OTLP: Improve remote write format translation performance by using label set hashes for metric identifiers instead of string based ones. #14006 #13991

Expand Down
41 changes: 21 additions & 20 deletions rules/alerting.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,13 +246,16 @@ func (r *AlertingRule) sample(alert *Alert, ts time.Time) promql.Sample {
return s
}

// forStateSample returns the sample for ALERTS_FOR_STATE.
// forStateSample returns a promql.Sample with the rule labels, `ALERTS_FOR_STATE` as the metric name and the rule name as the `alertname` label.
// Optionally, if an alert is provided it'll copy the labels of the alert into the sample labels.
func (r *AlertingRule) forStateSample(alert *Alert, ts time.Time, v float64) promql.Sample {
lb := labels.NewBuilder(r.labels)

alert.Labels.Range(func(l labels.Label) {
lb.Set(l.Name, l.Value)
})
if alert != nil {
alert.Labels.Range(func(l labels.Label) {
lb.Set(l.Name, l.Value)
})
}

lb.Set(labels.MetricName, alertForStateMetricName)
lb.Set(labels.AlertName, r.name)
Expand All @@ -265,9 +268,11 @@ func (r *AlertingRule) forStateSample(alert *Alert, ts time.Time, v float64) pro
return s
}

// QueryforStateSeries returns the series for ALERTS_FOR_STATE.
func (r *AlertingRule) QueryforStateSeries(ctx context.Context, alert *Alert, q storage.Querier) (storage.Series, error) {
smpl := r.forStateSample(alert, time.Now(), 0)
// QueryForStateSeries returns the series for ALERTS_FOR_STATE of the alert rule.
func (r *AlertingRule) QueryForStateSeries(ctx context.Context, q storage.Querier) (storage.SeriesSet, error) {
// We use a sample to ease the building of matchers.
// Don't provide an alert as we want matchers that match all series for the alert rule.
smpl := r.forStateSample(nil, time.Now(), 0)
var matchers []*labels.Matcher
smpl.Metric.Range(func(l labels.Label) {
mt, err := labels.NewMatcher(labels.MatchEqual, l.Name, l.Value)
Expand All @@ -276,20 +281,9 @@ func (r *AlertingRule) QueryforStateSeries(ctx context.Context, alert *Alert, q
}
matchers = append(matchers, mt)
})
sset := q.Select(ctx, false, nil, matchers...)

var s storage.Series
for sset.Next() {
// Query assures that smpl.Metric is included in sset.At().Labels(),
// hence just checking the length would act like equality.
// (This is faster than calling labels.Compare again as we already have some info).
if sset.At().Labels().Len() == len(matchers) {
s = sset.At()
break
}
}

return s, sset.Err()
sset := q.Select(ctx, false, nil, matchers...)
return sset, sset.Err()
}

// SetEvaluationDuration updates evaluationDuration to the duration it took to evaluate the rule on its last evaluation.
Expand Down Expand Up @@ -557,6 +551,13 @@ func (r *AlertingRule) ForEachActiveAlert(f func(*Alert)) {
}
}

func (r *AlertingRule) ActiveAlertsCount() int {
r.activeMtx.Lock()
defer r.activeMtx.Unlock()

return len(r.active)
}

func (r *AlertingRule) sendAlerts(ctx context.Context, ts time.Time, resendDelay, interval time.Duration, notifyFunc NotifyFunc) {
alerts := []*Alert{}
r.ForEachActiveAlert(func(alert *Alert) {
Expand Down
43 changes: 31 additions & 12 deletions rules/alerting_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -710,19 +710,17 @@ func TestQueryForStateSeries(t *testing.T) {
labels.EmptyLabels(), labels.EmptyLabels(), "", true, nil,
)

alert := &Alert{
State: 0,
Labels: labels.EmptyLabels(),
Annotations: labels.EmptyLabels(),
Value: 0,
ActiveAt: time.Time{},
FiredAt: time.Time{},
ResolvedAt: time.Time{},
LastSentAt: time.Time{},
ValidUntil: time.Time{},
}
sample := rule.forStateSample(nil, time.Time{}, 0)

seriesSet, err := rule.QueryForStateSeries(context.Background(), querier)

series, err := rule.QueryforStateSeries(context.Background(), alert, querier)
var series storage.Series
for seriesSet.Next() {
if seriesSet.At().Labels().Len() == sample.Metric.Len() {
series = seriesSet.At()
break
}
}

require.Equal(t, tst.expectedSeries, series)
require.Equal(t, tst.expectedError, err)
Expand Down Expand Up @@ -1025,3 +1023,24 @@ func TestAlertingRule_SetNoDependencyRules(t *testing.T) {
rule.SetNoDependencyRules(true)
require.True(t, rule.NoDependencyRules())
}

func TestAlertingRule_ActiveAlertsCount(t *testing.T) {
rule := NewAlertingRule(
"TestRule",
nil,
time.Minute,
0,
labels.FromStrings("severity", "critical"),
labels.EmptyLabels(), labels.EmptyLabels(), "", true, nil,
)

require.Equal(t, 0, rule.ActiveAlertsCount())

// Set an active alert.
lbls := labels.FromStrings("a1", "1")
h := lbls.Hash()
al := &Alert{State: StateFiring, Labels: lbls, ActiveAt: time.Now()}
rule.active[h] = al

require.Equal(t, 1, rule.ActiveAlertsCount())
}
39 changes: 25 additions & 14 deletions rules/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -664,25 +664,36 @@ func (g *Group) RestoreForState(ts time.Time) {
continue
}

sset, err := alertRule.QueryForStateSeries(g.opts.Context, q)
if err != nil {
level.Error(g.logger).Log(
"msg", "Failed to restore 'for' state",
labels.AlertName, alertRule.Name(),
"stage", "Select",
"err", err,
)
continue
}

// While not technically the same number of series we expect, it's as good of an approximation as any.
seriesByLabels := make(map[string]storage.Series, alertRule.ActiveAlertsCount())
for sset.Next() {
seriesByLabels[sset.At().Labels().DropMetricName().String()] = sset.At()
}

// No results for this alert rule.
if len(seriesByLabels) == 0 {
level.Debug(g.logger).Log("msg", "Failed to find a series to restore the 'for' state", labels.AlertName, alertRule.Name())
continue
}

alertRule.ForEachActiveAlert(func(a *Alert) {
var s storage.Series

s, err := alertRule.QueryforStateSeries(g.opts.Context, a, q)
if err != nil {
// Querier Warnings are ignored. We do not care unless we have an error.
level.Error(g.logger).Log(
"msg", "Failed to restore 'for' state",
labels.AlertName, alertRule.Name(),
"stage", "Select",
"err", err,
)
s, ok := seriesByLabels[a.Labels.String()]
if !ok {
return
}

if s == nil {
return
}

// Series found for the 'for' state.
var t int64
var v float64
Expand Down
164 changes: 81 additions & 83 deletions rules/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,122 +397,120 @@ func TestForStateRestore(t *testing.T) {
group.Eval(context.TODO(), evalTime)
}

exp := rule.ActiveAlerts()
for _, aa := range exp {
require.Zero(t, aa.Labels.Get(model.MetricNameLabel), "%s label set on active alert: %s", model.MetricNameLabel, aa.Labels)
}
sort.Slice(exp, func(i, j int) bool {
return labels.Compare(exp[i].Labels, exp[j].Labels) < 0
})

// Prometheus goes down here. We create new rules and groups.
type testInput struct {
name string
restoreDuration time.Duration
alerts []*Alert
expectedAlerts []*Alert

num int
noRestore bool
gracePeriod bool
downDuration time.Duration
before func()
}

tests := []testInput{
{
// Normal restore (alerts were not firing).
name: "normal restore (alerts were not firing)",
restoreDuration: 15 * time.Minute,
alerts: rule.ActiveAlerts(),
expectedAlerts: rule.ActiveAlerts(),
downDuration: 10 * time.Minute,
},
{
// Testing Outage Tolerance.
name: "outage tolerance",
restoreDuration: 40 * time.Minute,
noRestore: true,
num: 2,
},
{
// No active alerts.
name: "no active alerts",
restoreDuration: 50 * time.Minute,
alerts: []*Alert{},
expectedAlerts: []*Alert{},
},
{
name: "test the grace period",
restoreDuration: 25 * time.Minute,
expectedAlerts: []*Alert{},
gracePeriod: true,
before: func() {
for _, duration := range []time.Duration{10 * time.Minute, 15 * time.Minute, 20 * time.Minute} {
evalTime := baseTime.Add(duration)
group.Eval(context.TODO(), evalTime)
}
},
num: 2,
},
}

testFunc := func(tst testInput) {
newRule := NewAlertingRule(
"HTTPRequestRateLow",
expr,
alertForDuration,
0,
labels.FromStrings("severity", "critical"),
labels.EmptyLabels(), labels.EmptyLabels(), "", false, nil,
)
newGroup := NewGroup(GroupOptions{
Name: "default",
Interval: time.Second,
Rules: []Rule{newRule},
ShouldRestore: true,
Opts: opts,
})
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if tt.before != nil {
tt.before()
}

newGroups := make(map[string]*Group)
newGroups["default;"] = newGroup
newRule := NewAlertingRule(
"HTTPRequestRateLow",
expr,
alertForDuration,
0,
labels.FromStrings("severity", "critical"),
labels.EmptyLabels(), labels.EmptyLabels(), "", false, nil,
)
newGroup := NewGroup(GroupOptions{
Name: "default",
Interval: time.Second,
Rules: []Rule{newRule},
ShouldRestore: true,
Opts: opts,
})

restoreTime := baseTime.Add(tst.restoreDuration)
// First eval before restoration.
newGroup.Eval(context.TODO(), restoreTime)
// Restore happens here.
newGroup.RestoreForState(restoreTime)
newGroups := make(map[string]*Group)
newGroups["default;"] = newGroup

got := newRule.ActiveAlerts()
for _, aa := range got {
require.Zero(t, aa.Labels.Get(model.MetricNameLabel), "%s label set on active alert: %s", model.MetricNameLabel, aa.Labels)
}
sort.Slice(got, func(i, j int) bool {
return labels.Compare(got[i].Labels, got[j].Labels) < 0
})
restoreTime := baseTime.Add(tt.restoreDuration)
// First eval before restoration.
newGroup.Eval(context.TODO(), restoreTime)
// Restore happens here.
newGroup.RestoreForState(restoreTime)

// Checking if we have restored it correctly.
switch {
case tst.noRestore:
require.Len(t, got, tst.num)
for _, e := range got {
require.Equal(t, e.ActiveAt, restoreTime)
}
case tst.gracePeriod:
require.Len(t, got, tst.num)
for _, e := range got {
require.Equal(t, opts.ForGracePeriod, e.ActiveAt.Add(alertForDuration).Sub(restoreTime))
got := newRule.ActiveAlerts()
for _, aa := range got {
require.Zero(t, aa.Labels.Get(model.MetricNameLabel), "%s label set on active alert: %s", model.MetricNameLabel, aa.Labels)
}
default:
exp := tst.alerts
require.Equal(t, len(exp), len(got))
sortAlerts(exp)
sortAlerts(got)
for i, e := range exp {
require.Equal(t, e.Labels, got[i].Labels)

// Difference in time should be within 1e6 ns, i.e. 1ms
// (due to conversion between ns & ms, float64 & int64).
activeAtDiff := float64(e.ActiveAt.Unix() + int64(tst.downDuration/time.Second) - got[i].ActiveAt.Unix())
require.Equal(t, 0.0, math.Abs(activeAtDiff), "'for' state restored time is wrong")
}
}
}
sort.Slice(got, func(i, j int) bool {
return labels.Compare(got[i].Labels, got[j].Labels) < 0
})

for _, tst := range tests {
testFunc(tst)
}
// Checking if we have restored it correctly.
switch {
case tt.noRestore:
require.Len(t, got, tt.num)
for _, e := range got {
require.Equal(t, e.ActiveAt, restoreTime)
}
case tt.gracePeriod:

// Testing the grace period.
for _, duration := range []time.Duration{10 * time.Minute, 15 * time.Minute, 20 * time.Minute} {
evalTime := baseTime.Add(duration)
group.Eval(context.TODO(), evalTime)
require.Len(t, got, tt.num)
for _, e := range got {
require.Equal(t, opts.ForGracePeriod, e.ActiveAt.Add(alertForDuration).Sub(restoreTime))
}
default:
exp := tt.expectedAlerts
require.Equal(t, len(exp), len(got))
sortAlerts(exp)
sortAlerts(got)
for i, e := range exp {
require.Equal(t, e.Labels, got[i].Labels)

// Difference in time should be within 1e6 ns, i.e. 1ms
// (due to conversion between ns & ms, float64 & int64).
activeAtDiff := float64(e.ActiveAt.Unix() + int64(tt.downDuration/time.Second) - got[i].ActiveAt.Unix())
require.Equal(t, 0.0, math.Abs(activeAtDiff), "'for' state restored time is wrong")
}
}
})
}
testFunc(testInput{
restoreDuration: 25 * time.Minute,
alerts: []*Alert{},
gracePeriod: true,
num: 2,
})
}

func TestStaleness(t *testing.T) {
Expand Down

0 comments on commit 1dd0bff

Please sign in to comment.