diff --git a/cmd/thanos/rule.go b/cmd/thanos/rule.go index f1f788e5c7..bf626dc721 100644 --- a/cmd/thanos/rule.go +++ b/cmd/thanos/rule.go @@ -120,7 +120,7 @@ func registerRule(app *extkingpin.App) { walCompression := cmd.Flag("tsdb.wal-compression", "Compress the tsdb WAL.").Default("true").Bool() cmd.Flag("data-dir", "data directory").Default("data/").StringVar(&conf.dataDir) - cmd.Flag("rule-file", "Rule files that should be used by rule manager. Can be in glob format (repeated)."). + cmd.Flag("rule-file", "Rule files that should be used by rule manager. Can be in glob format (repeated). Note that rules are not automatically detected, use SIGHUP or do HTTP POST /-/reload to re-read them."). Default("rules/").StringsVar(&conf.ruleFiles) cmd.Flag("resend-delay", "Minimum amount of time to wait before resending an alert to Alertmanager."). Default("1m").DurationVar(&conf.resendDelay) diff --git a/docs/components/rule.md b/docs/components/rule.md index 1cabaca15c..72e27cd755 100644 --- a/docs/components/rule.md +++ b/docs/components/rule.md @@ -430,7 +430,10 @@ Flags: --resend-delay=1m Minimum amount of time to wait before resending an alert to Alertmanager. --rule-file=rules/ ... Rule files that should be used by rule manager. - Can be in glob format (repeated). + Can be in glob format (repeated). Note that + rules are not automatically detected, use + SIGHUP or do HTTP POST /-/reload to re-read + them. --shipper.upload-compacted If true shipper will try to upload compacted blocks as well. Useful for migration purposes. diff --git a/pkg/api/query/v1.go b/pkg/api/query/v1.go index 85473015d8..17a17b4849 100644 --- a/pkg/api/query/v1.go +++ b/pkg/api/query/v1.go @@ -185,6 +185,7 @@ func (qapi *QueryAPI) Register(r *route.Router, tracer opentracing.Tracer, logge r.Get("/stores", instr("stores", qapi.stores)) + r.Get("/alerts", instr("alerts", NewAlertsHandler(qapi.ruleGroups, qapi.enableRulePartialResponse))) r.Get("/rules", instr("rules", NewRulesHandler(qapi.ruleGroups, qapi.enableRulePartialResponse))) r.Get("/targets", instr("targets", NewTargetsHandler(qapi.targets, qapi.enableTargetPartialResponse))) @@ -762,6 +763,50 @@ func NewTargetsHandler(client targets.UnaryClient, enablePartialResponse bool) f } } +// NewAlertsHandler created handler compatible with HTTP /api/v1/alerts https://prometheus.io/docs/prometheus/latest/querying/api/#alerts +// which uses gRPC Unary Rules API (Rules API works for both /alerts and /rules). +func NewAlertsHandler(client rules.UnaryClient, enablePartialResponse bool) func(*http.Request) (interface{}, []error, *api.ApiError) { + ps := storepb.PartialResponseStrategy_ABORT + if enablePartialResponse { + ps = storepb.PartialResponseStrategy_WARN + } + + return func(r *http.Request) (interface{}, []error, *api.ApiError) { + span, ctx := tracing.StartSpan(r.Context(), "receive_http_request") + defer span.Finish() + + var ( + groups *rulespb.RuleGroups + warnings storage.Warnings + err error + ) + + // TODO(bwplotka): Allow exactly the same functionality as query API: passing replica, dedup and partial response as HTTP params as well. + req := &rulespb.RulesRequest{ + Type: rulespb.RulesRequest_ALERT, + PartialResponseStrategy: ps, + } + tracing.DoInSpan(ctx, "retrieve_rules", func(ctx context.Context) { + groups, warnings, err = client.Rules(ctx, req) + }) + if err != nil { + return nil, nil, &api.ApiError{Typ: api.ErrorInternal, Err: errors.Errorf("error retrieving rules: %v", err)} + } + + var resp struct{ Alerts []*rulespb.AlertInstance } + for _, g := range groups.Groups { + for _, r := range g.Rules { + a := r.GetAlert() + if a == nil { + continue + } + resp.Alerts = append(resp.Alerts, a.Alerts...) + } + } + return resp, warnings, nil + } +} + // NewRulesHandler created handler compatible with HTTP /api/v1/rules https://prometheus.io/docs/prometheus/latest/querying/api/#rules // which uses gRPC Unary Rules API. func NewRulesHandler(client rules.UnaryClient, enablePartialResponse bool) func(*http.Request) (interface{}, []error, *api.ApiError) { diff --git a/pkg/api/query/v1_test.go b/pkg/api/query/v1_test.go index c9f97c0eb4..e7358036ca 100644 --- a/pkg/api/query/v1_test.go +++ b/pkg/api/query/v1_test.go @@ -1713,7 +1713,7 @@ func TestRulesHandler(t *testing.T) { Type: "alerting", }, } - var tests = []test{ + for _, test := range []test{ { response: &testpromcompatibility.RuleDiscovery{ RuleGroups: []*testpromcompatibility.RuleGroup{ @@ -1770,9 +1770,7 @@ func TestRulesHandler(t *testing.T) { }, }, }, - } - - for _, test := range tests { + } { t.Run(fmt.Sprintf("endpoint=%s/method=%s/query=%q", "rules", http.MethodGet, test.query.Encode()), func(t *testing.T) { // Build a context with the correct request params. ctx := context.Background() diff --git a/test/e2e/compatibility_test.go b/test/e2e/compatibility_test.go index f3b65e0b05..9f46849928 100644 --- a/test/e2e/compatibility_test.go +++ b/test/e2e/compatibility_test.go @@ -4,7 +4,11 @@ package e2e_test import ( + "bytes" + "fmt" + "io" "io/ioutil" + "net/http" "os" "path/filepath" "testing" @@ -12,6 +16,8 @@ import ( "github.com/efficientgo/e2e" e2edb "github.com/efficientgo/e2e/db" + "github.com/thanos-io/thanos/pkg/alert" + "github.com/thanos-io/thanos/pkg/httpconfig" "github.com/thanos-io/thanos/pkg/testutil" "github.com/thanos-io/thanos/test/e2e/e2ethanos" ) @@ -110,3 +116,93 @@ query_tweaks: return ret }() } + +// TestAlertCompliance tests Alert compatibility against https://github.com/prometheus/compliance/blob/main/alert_generator. +// NOTE: This requires a dockerization of compliance framework: https://github.com/prometheus/compliance/pull/46 +func TestAlertCompliance(t *testing.T) { + t.Skip("This is an interactive test, using https://github.com/prometheus/compliance/tree/main/alert_generator. This tool is not optimized for CI runs (e.g. it infinitely retries, takes 38 minutes)") + + t.Run("stateful ruler", func(t *testing.T) { + e, err := e2e.NewDockerEnvironment("alert_compatibility") + testutil.Ok(t, err) + t.Cleanup(e.Close) + + // Start receive + Querier. + receive := e2ethanos.NewReceiveBuilder(e, "receive").WithIngestionEnabled().Init() + querierBuilder := e2ethanos.NewQuerierBuilder(e, "query") + + compliance := e.Runnable("alert_generator_compliance_tester").WithPorts(map[string]int{"http": 8080}).Init(e2e.StartOptions{ + Image: "alert_generator_compliance_tester:latest", + Command: e2e.NewCommandRunUntilStop(), + }) + + rFuture := e2ethanos.NewRulerBuilder(e, "1") + ruler := rFuture.WithAlertManagerConfig([]alert.AlertmanagerConfig{ + { + EndpointsConfig: httpconfig.EndpointsConfig{ + StaticAddresses: []string{compliance.InternalEndpoint("http")}, + Scheme: "http", + }, + Timeout: amTimeout, + APIVersion: alert.APIv1, + }, + }). + // Use default resend delay and eval interval, as the compliance spec requires this. + WithResendDelay("1m"). + WithEvalInterval("1m"). + InitTSDB(filepath.Join(rFuture.InternalDir(), "rules"), []httpconfig.Config{ + { + EndpointsConfig: httpconfig.EndpointsConfig{ + StaticAddresses: []string{ + querierBuilder.InternalEndpoint("http"), + }, + Scheme: "http", + }, + }, + }) + + query := querierBuilder. + WithStoreAddresses(receive.InternalEndpoint("grpc")). + WithRuleAddresses(ruler.InternalEndpoint("grpc")). + // We deduplicate by this, since alert compatibility tool requires clean metric without labels + // attached by receivers. + WithReplicaLabels("receive", "tenant_id"). + Init() + testutil.Ok(t, e2e.StartAndWaitReady(receive, query, ruler, compliance)) + + // Pull rules.yaml: + { + var stdout bytes.Buffer + testutil.Ok(t, compliance.Exec(e2e.NewCommand("cat", "/rules.yaml"), e2e.WithExecOptionStdout(&stdout))) + testutil.Ok(t, os.MkdirAll(filepath.Join(ruler.Dir(), "rules"), os.ModePerm)) + testutil.Ok(t, os.WriteFile(filepath.Join(ruler.Dir(), "rules", "rules.yaml"), stdout.Bytes(), os.ModePerm)) + + // Reload ruler. + resp, err := http.Post("http://"+ruler.Endpoint("http")+"/-/reload", "", nil) + testutil.Ok(t, err) + defer func() { + _, _ = io.Copy(io.Discard, resp.Body) + _ = resp.Body.Close() + }() + testutil.Equals(t, http.StatusOK, resp.StatusCode) + } + testutil.Ok(t, ioutil.WriteFile(filepath.Join(compliance.Dir(), "test-thanos.yaml"), []byte(alertCompatConfig(receive, query)), os.ModePerm)) + + fmt.Println(alertCompatConfig(receive, query)) + + testutil.Ok(t, compliance.Exec(e2e.NewCommand( + "/alert_generator_compliance_tester", "-config-file", filepath.Join(compliance.InternalDir(), "test-thanos.yaml")), + )) + }) +} + +// nolint (it's still used in skipped test). +func alertCompatConfig(receive e2e.Runnable, query e2e.Runnable) string { + return fmt.Sprintf(`settings: + remote_write_url: '%s' + query_base_url: 'http://%s' + rules_and_alerts_api_base_url: 'http://%s' + alert_reception_server_port: 8080 + alert_message_parser: default +`, e2ethanos.RemoteWriteEndpoint(receive.InternalEndpoint("remote-write")), query.InternalEndpoint("http"), query.InternalEndpoint("http")) +} diff --git a/test/e2e/e2ethanos/services.go b/test/e2e/e2ethanos/services.go index a2d62f8f09..0f56e3fbcc 100644 --- a/test/e2e/e2ethanos/services.go +++ b/test/e2e/e2ethanos/services.go @@ -181,6 +181,7 @@ type QuerierBuilder struct { enableFeatures []string endpoints []string + replicaLabels []string tracingConfig string e2e.Linkable @@ -200,6 +201,7 @@ func NewQuerierBuilder(e e2e.Environment, name string, storeAddresses ...string) name: name, storeAddresses: storeAddresses, image: DefaultImage(), + replicaLabels: []string{replicaLabel}, } } @@ -263,6 +265,12 @@ func (q *QuerierBuilder) WithTracingConfig(tracingConfig string) *QuerierBuilder return q } +// WithReplicaLabels replaces default [replica] replica label configuration for the querier. +func (q *QuerierBuilder) WithReplicaLabels(labels ...string) *QuerierBuilder { + q.replicaLabels = labels + return q +} + func (q *QuerierBuilder) Init() e2e.InstrumentedRunnable { args, err := q.collectArgs() if err != nil { @@ -284,40 +292,36 @@ func (q *QuerierBuilder) collectArgs() ([]string, error) { "--grpc-address": ":9091", "--grpc-grace-period": "0s", "--http-address": ":8080", - "--query.replica-label": replicaLabel, "--store.sd-dns-interval": "5s", "--log.level": infoLogLevel, "--query.max-concurrent": "1", "--store.sd-interval": "5s", }) + + for _, repl := range q.replicaLabels { + args = append(args, "--query.replica-label="+repl) + } for _, addr := range q.storeAddresses { args = append(args, "--store="+addr) } - for _, addr := range q.ruleAddresses { args = append(args, "--rule="+addr) } - for _, addr := range q.targetAddresses { args = append(args, "--target="+addr) } - for _, addr := range q.metadataAddresses { args = append(args, "--metadata="+addr) } - for _, addr := range q.exemplarAddresses { args = append(args, "--exemplar="+addr) } - for _, feature := range q.enableFeatures { args = append(args, "--enable-feature="+feature) } - for _, addr := range q.endpoints { args = append(args, "--endpoint="+addr) } - if len(q.fileSDStoreAddresses) > 0 { if err := os.MkdirAll(q.Dir(), 0750); err != nil { return nil, errors.Wrap(err, "create query dir failed") @@ -339,19 +343,15 @@ func (q *QuerierBuilder) collectArgs() ([]string, error) { args = append(args, "--store.sd-files="+filepath.Join(q.InternalDir(), "filesd.yaml")) } - if q.routePrefix != "" { args = append(args, "--web.route-prefix="+q.routePrefix) } - if q.externalPrefix != "" { args = append(args, "--web.external-prefix="+q.externalPrefix) } - if q.tracingConfig != "" { args = append(args, "--tracing.config="+q.tracingConfig) } - return args, nil } @@ -477,6 +477,8 @@ type RulerBuilder struct { amCfg []alert.AlertmanagerConfig replicaLabel string image string + resendDelay string + evalInterval string } // NewRulerBuilder is a Ruler future that allows extra configuration before initialization. @@ -507,6 +509,16 @@ func (r *RulerBuilder) WithReplicaLabel(replicaLabel string) *RulerBuilder { return r } +func (r *RulerBuilder) WithResendDelay(resendDelay string) *RulerBuilder { + r.resendDelay = resendDelay + return r +} + +func (r *RulerBuilder) WithEvalInterval(evalInterval string) *RulerBuilder { + r.evalInterval = evalInterval + return r +} + func (r *RulerBuilder) InitTSDB(internalRuleDir string, queryCfg []httpconfig.Config) e2e.InstrumentedRunnable { return r.initRule(internalRuleDir, queryCfg, nil) } @@ -550,6 +562,15 @@ func (r *RulerBuilder) initRule(internalRuleDir string, queryCfg []httpconfig.Co if r.replicaLabel != "" { ruleArgs["--label"] = fmt.Sprintf(`%s="%s"`, replicaLabel, r.replicaLabel) } + + if r.resendDelay != "" { + ruleArgs["--resend-delay"] = r.resendDelay + } + + if r.evalInterval != "" { + ruleArgs["--eval-interval"] = r.evalInterval + } + if remoteWriteCfg != nil { rwCfgBytes, err := yaml.Marshal(struct { RemoteWriteConfigs []*config.RemoteWriteConfig `yaml:"remote_write,omitempty"`