Skip to content

Commit

Permalink
switch to prometheus notifier (#5243)
Browse files Browse the repository at this point in the history
Signed-off-by: Ben Ye <ben.ye@bytedance.com>
  • Loading branch information
Ben Ye committed Mar 18, 2022
1 parent cb96cf9 commit d0a51a8
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 60 deletions.
5 changes: 3 additions & 2 deletions cmd/thanos/rule.go
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/relabel"
"github.com/prometheus/prometheus/notifier"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/rules"
"github.com/prometheus/prometheus/storage"
Expand Down Expand Up @@ -465,13 +466,13 @@ func runRule(
{
// Run rule evaluation and alert notifications.
notifyFunc := func(ctx context.Context, expr string, alerts ...*rules.Alert) {
res := make([]*alert.Alert, 0, len(alerts))
res := make([]*notifier.Alert, 0, len(alerts))
for _, alrt := range alerts {
// Only send actually firing alerts.
if alrt.State == rules.StatePending {
continue
}
a := &alert.Alert{
a := &notifier.Alert{
StartsAt: alrt.FiredAt,
Labels: alrt.Labels,
Annotations: alrt.Annotations,
Expand Down
58 changes: 6 additions & 52 deletions pkg/alert/alert.go
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/relabel"
"github.com/prometheus/prometheus/notifier"
"go.uber.org/atomic"

"github.com/thanos-io/thanos/pkg/runutil"
Expand All @@ -36,53 +37,6 @@ const (
contentTypeJSON = "application/json"
)

// Alert is a generic representation of an alert in the Prometheus eco-system.
type Alert struct {
// Label value pairs for purpose of aggregation, matching, and disposition
// dispatching. This must minimally include an "alertname" label.
Labels labels.Labels `json:"labels"`

// Extra key/value information which does not define alert identity.
Annotations labels.Labels `json:"annotations"`

// The known time range for this alert. Start and end time are both optional.
StartsAt time.Time `json:"startsAt,omitempty"`
EndsAt time.Time `json:"endsAt,omitempty"`
GeneratorURL string `json:"generatorURL,omitempty"`
}

// Name returns the name of the alert. It is equivalent to the "alertname" label.
func (a *Alert) Name() string {
return a.Labels.Get(labels.AlertName)
}

// Hash returns a hash over the alert. It is equivalent to the alert labels hash.
func (a *Alert) Hash() uint64 {
return a.Labels.Hash()
}

func (a *Alert) String() string {
s := fmt.Sprintf("%s[%s]", a.Name(), fmt.Sprintf("%016x", a.Hash())[:7])
if a.Resolved() {
return s + "[resolved]"
}
return s + "[active]"
}

// Resolved returns true iff the activity interval ended in the past.
func (a *Alert) Resolved() bool {
return a.ResolvedAt(time.Now())
}

// ResolvedAt returns true off the activity interval ended before
// the given timestamp.
func (a *Alert) ResolvedAt(ts time.Time) bool {
if a.EndsAt.IsZero() {
return false
}
return !a.EndsAt.After(ts)
}

// Queue is a queue of alert notifications waiting to be sent. The queue is consumed in batches
// and entries are dropped at the front if it runs full.
type Queue struct {
Expand All @@ -94,7 +48,7 @@ type Queue struct {
alertRelabelConfigs []*relabel.Config

mtx sync.Mutex
queue []*Alert
queue []*notifier.Alert
morec chan struct{}

pushed prometheus.Counter
Expand Down Expand Up @@ -180,7 +134,7 @@ func (q *Queue) Cap() int {
// Pop takes a batch of alerts from the front of the queue. The batch size is limited
// according to the queues maxBatchSize limit.
// It blocks until elements are available or a termination signal is send on termc.
func (q *Queue) Pop(termc <-chan struct{}) []*Alert {
func (q *Queue) Pop(termc <-chan struct{}) []*notifier.Alert {
select {
case <-termc:
return nil
Expand All @@ -190,7 +144,7 @@ func (q *Queue) Pop(termc <-chan struct{}) []*Alert {
q.mtx.Lock()
defer q.mtx.Unlock()

as := make([]*Alert, q.maxBatchSize)
as := make([]*notifier.Alert, q.maxBatchSize)
n := copy(as, q.queue)
q.queue = q.queue[n:]

Expand All @@ -206,7 +160,7 @@ func (q *Queue) Pop(termc <-chan struct{}) []*Alert {
}

// Push adds a list of alerts to the queue.
func (q *Queue) Push(alerts []*Alert) {
func (q *Queue) Push(alerts []*notifier.Alert) {
if len(alerts) == 0 {
return
}
Expand Down Expand Up @@ -332,7 +286,7 @@ func toAPILabels(labels labels.Labels) models.LabelSet {

// Send an alert batch to all given Alertmanager clients.
// TODO(bwplotka): https://github.com/thanos-io/thanos/issues/660.
func (s *Sender) Send(ctx context.Context, alerts []*Alert) {
func (s *Sender) Send(ctx context.Context, alerts []*notifier.Alert) {
if len(alerts) == 0 {
return
}
Expand Down
13 changes: 7 additions & 6 deletions pkg/alert/alert_test.go
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/relabel"
"github.com/prometheus/prometheus/notifier"

"github.com/thanos-io/thanos/pkg/testutil"
)
Expand All @@ -28,7 +29,7 @@ func TestQueue_Pop_all_Pushed(t *testing.T) {

q := NewQueue(nil, nil, qcapacity, batchsize, nil, nil, nil)
for i := 0; i < pushes; i++ {
q.Push([]*Alert{
q.Push([]*notifier.Alert{
{},
{},
})
Expand All @@ -47,7 +48,7 @@ func TestQueue_Pop_all_Pushed(t *testing.T) {
func TestQueue_Push_Relabelled(t *testing.T) {
q := NewQueue(nil, nil, 10, 10, labels.FromStrings("a", "1", "replica", "A"), []string{"b", "replica"}, nil)

q.Push([]*Alert{
q.Push([]*notifier.Alert{
{Labels: labels.FromStrings("b", "2", "c", "3")},
{Labels: labels.FromStrings("c", "3")},
{Labels: labels.FromStrings("a", "2")},
Expand All @@ -74,7 +75,7 @@ func TestQueue_Push_Relabelled_Alerts(t *testing.T) {
},
)

q.Push([]*Alert{
q.Push([]*notifier.Alert{
{Labels: labels.FromMap(map[string]string{
"a": "abc",
})},
Expand Down Expand Up @@ -134,7 +135,7 @@ func TestSenderSendsOk(t *testing.T) {
}
s := NewSender(nil, nil, []*Alertmanager{NewAlertmanager(nil, poster, time.Minute, APIv1)})

s.Send(context.Background(), []*Alert{{}, {}})
s.Send(context.Background(), []*notifier.Alert{{}, {}})

assertSameHosts(t, poster.urls, poster.seen)

Expand All @@ -161,7 +162,7 @@ func TestSenderSendsOneFails(t *testing.T) {
}
s := NewSender(nil, nil, []*Alertmanager{NewAlertmanager(nil, poster, time.Minute, APIv1)})

s.Send(context.Background(), []*Alert{{}, {}})
s.Send(context.Background(), []*notifier.Alert{{}, {}})

assertSameHosts(t, poster.urls, poster.seen)

Expand All @@ -182,7 +183,7 @@ func TestSenderSendsAllFail(t *testing.T) {
}
s := NewSender(nil, nil, []*Alertmanager{NewAlertmanager(nil, poster, time.Minute, APIv1)})

s.Send(context.Background(), []*Alert{{}, {}})
s.Send(context.Background(), []*notifier.Alert{{}, {}})

assertSameHosts(t, poster.urls, poster.seen)

Expand Down

0 comments on commit d0a51a8

Please sign in to comment.