Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

returns error messages when trigger reload with http #1848

Merged
merged 19 commits into from
Mar 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel
- [#2049](https://github.com/thanos-io/thanos/pull/2049) Tracing: Support sampling on Elastic APM with new sample_rate setting.
- [#2008](https://github.com/thanos-io/thanos/pull/2008) Querier, Receiver, Sidecar, Store: Add gRPC [health check](https://github.com/grpc/grpc/blob/master/doc/health-checking.md) endpoints.
- [#2145](https://github.com/thanos-io/thanos/pull/2145) Tracing: track query sent to prometheus via remote read api.
- [#1848](https://github.com/thanos-io/thanos/pull/1848) Ruler: Return error messages when trigger reload with http.
- [#2113](https://github.com/thanos-io/thanos/pull/2113) Bucket: Added `thanos bucket replicate`.

### Changed
Expand Down
179 changes: 108 additions & 71 deletions cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ import (
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
"github.com/prometheus/common/route"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/rules"
"github.com/prometheus/prometheus/storage/tsdb"
tsdberrors "github.com/prometheus/prometheus/tsdb/errors"
"github.com/prometheus/prometheus/util/strutil"
"github.com/thanos-io/thanos/pkg/alert"
"github.com/thanos-io/thanos/pkg/block/metadata"
Expand Down Expand Up @@ -200,6 +202,50 @@ func registerRule(m map[string]setupFunc, app *kingpin.Application) {
}
}

// RuleMetrics defines thanos rule metrics.
type RuleMetrics struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍
Thanks!

configSuccess prometheus.Gauge
configSuccessTime prometheus.Gauge
duplicatedQuery prometheus.Counter
rulesLoaded *prometheus.GaugeVec
ruleEvalWarnings *prometheus.CounterVec
}

func newRuleMetrics(reg *prometheus.Registry) *RuleMetrics {
m := new(RuleMetrics)

factory := promauto.With(reg)
m.configSuccess = factory.NewGauge(prometheus.GaugeOpts{
Name: "thanos_rule_config_last_reload_successful",
Help: "Whether the last configuration reload attempt was successful.",
})
m.configSuccessTime = factory.NewGauge(prometheus.GaugeOpts{
Name: "thanos_rule_config_last_reload_success_timestamp_seconds",
Help: "Timestamp of the last successful configuration reload.",
})
m.duplicatedQuery = factory.NewCounter(prometheus.CounterOpts{
Name: "thanos_rule_duplicated_query_addresses_total",
Help: "The number of times a duplicated query addresses is detected from the different configs in rule.",
})
m.rulesLoaded = factory.NewGaugeVec(
prometheus.GaugeOpts{
Name: "thanos_rule_loaded_rules",
Help: "Loaded rules partitioned by file and group.",
},
[]string{"strategy", "file", "group"},
)
m.ruleEvalWarnings = factory.NewCounterVec(
prometheus.CounterOpts{
Name: "thanos_rule_evaluation_with_warnings_total",
Help: "The total number of rule evaluation that were successful but had warnings which can indicate partial error.",
}, []string{"strategy"},
)
m.ruleEvalWarnings.WithLabelValues(strings.ToLower(storepb.PartialResponseStrategy_ABORT.String()))
m.ruleEvalWarnings.WithLabelValues(strings.ToLower(storepb.PartialResponseStrategy_WARN.String()))

return m
}

// runRule runs a rule evaluation component that continuously evaluates alerting and recording
// rules. It sends alert notifications and writes TSDB data for results like a regular Prometheus server.
func runRule(
Expand Down Expand Up @@ -239,39 +285,7 @@ func runRule(
dnsSDResolver string,
comp component.Component,
) error {
configSuccess := prometheus.NewGauge(prometheus.GaugeOpts{
Name: "thanos_rule_config_last_reload_successful",
Help: "Whether the last configuration reload attempt was successful.",
})
configSuccessTime := prometheus.NewGauge(prometheus.GaugeOpts{
Name: "thanos_rule_config_last_reload_success_timestamp_seconds",
Help: "Timestamp of the last successful configuration reload.",
})
duplicatedQuery := prometheus.NewCounter(prometheus.CounterOpts{
Name: "thanos_rule_duplicated_query_addresses_total",
Help: "The number of times a duplicated query addresses is detected from the different configs in rule",
})
rulesLoaded := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "thanos_rule_loaded_rules",
Help: "Loaded rules partitioned by file and group",
},
[]string{"strategy", "file", "group"},
)
ruleEvalWarnings := prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "thanos_rule_evaluation_with_warnings_total",
Help: "The total number of rule evaluation that were successful but had warnings which can indicate partial error.",
}, []string{"strategy"},
)
ruleEvalWarnings.WithLabelValues(strings.ToLower(storepb.PartialResponseStrategy_ABORT.String()))
ruleEvalWarnings.WithLabelValues(strings.ToLower(storepb.PartialResponseStrategy_WARN.String()))

reg.MustRegister(configSuccess)
reg.MustRegister(configSuccessTime)
reg.MustRegister(duplicatedQuery)
reg.MustRegister(rulesLoaded)
reg.MustRegister(ruleEvalWarnings)
metrics := newRuleMetrics(reg)

var queryCfg []query.Config
if len(queryConfigYAML) > 0 {
Expand Down Expand Up @@ -435,7 +449,7 @@ func runRule(
opts := opts
opts.Registerer = extprom.WrapRegistererWith(prometheus.Labels{"strategy": strings.ToLower(s.String())}, reg)
opts.Context = ctx
opts.QueryFunc = queryFunc(logger, queryClients, duplicatedQuery, ruleEvalWarnings, s)
opts.QueryFunc = queryFunc(logger, queryClients, metrics.duplicatedQuery, metrics.ruleEvalWarnings, s)

mgr := rules.NewManager(&opts)
ruleMgr.SetRuleManager(s, mgr)
Expand Down Expand Up @@ -472,52 +486,32 @@ func runRule(
}

// Handle reload and termination interrupts.
reload := make(chan struct{}, 1)
reloadWebhandler := make(chan chan error)
{
cancel := make(chan struct{})
reload <- struct{}{} // Initial reload.

ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
// Initialize rules.
if err := reloadRules(logger, ruleFiles, ruleMgr, evalInterval, metrics); err != nil {
level.Error(logger).Log("msg", "initialize rules failed", "err", err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the initial load, we can actually fail and return the combined multierror.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kakkoyun The maintainer doesn't agree the to stop the everything.

please check the comment here. #1848 (comment)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have strong opinions about it, both works for me. In any case, I've pinged @bwplotka on the thread for the final decision.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, initial can fail indeed (:

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bwplotka @kakkoyun Let's focus on the reload process first in this PR. Since the failure in iniliazation is breaking change.
After the reload related code is merged, I will start a new PR for the initialization failure only.
Any suggestion?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cool, happy with that.

}
for {
select {
case <-cancel:
return errors.New("canceled")
case <-reload:
case <-reloadSignal:
}

level.Debug(logger).Log("msg", "configured rule files", "files", strings.Join(ruleFiles, ","))
var files []string
for _, pat := range ruleFiles {
fs, err := filepath.Glob(pat)
if err := reloadRules(logger, ruleFiles, ruleMgr, evalInterval, metrics); err != nil {
level.Error(logger).Log("msg", "reload rules by sighup failed", "err", err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not only sighup TBH, can be HTTP /-/reload right? So let's fix message here (:

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bwplotka Yes. These code hanlde sighup only.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be nice to treat signal and HTTP reload exactly the same way (as it was before). Why not? We could reuse reloadSignal channel.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be nice to treat signal and HTTP reload exactly the same way (as it was before). Why not? We could reuse reloadSignal channel.

@bwplotka If so, we need another channel to receive the error message for webhandler. So a new struct should wrap reloadSignal and errMsg. Maybe it is redundant for the current implementation.
BTW, If you persist, I will follow your comment, and any suggestion on the "a new struct" solution?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a new struct to handler webhandler. Since we have a reloadSignal as an input paramter, we need select reloadSignal always.
Please help review. Thanks.
@bwplotka

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

}
case reloadMsg := <-reloadWebhandler:
err := reloadRules(logger, ruleFiles, ruleMgr, evalInterval, metrics)
if err != nil {
// The only error can be a bad pattern.
level.Error(logger).Log("msg", "retrieving rule files failed. Ignoring file.", "pattern", pat, "err", err)
continue
level.Error(logger).Log("msg", "reload rules by webhandler failed", "err", err)
}

files = append(files, fs...)
}

level.Info(logger).Log("msg", "reload rule files", "numFiles", len(files))

if err := ruleMgr.Update(evalInterval, files); err != nil {
configSuccess.Set(0)
level.Error(logger).Log("msg", "reloading rules failed", "err", err)
continue
}

configSuccess.Set(1)
configSuccessTime.SetToCurrentTime()

rulesLoaded.Reset()
for _, group := range ruleMgr.RuleGroups() {
rulesLoaded.WithLabelValues(group.PartialResponseStrategy.String(), group.File(), group.Name()).Set(float64(len(group.Rules())))
reloadMsg <- err
case <-ctx.Done():
return ctx.Err()
}

}
}, func(error) {
close(cancel)
cancel()
})
}

Expand Down Expand Up @@ -564,7 +558,11 @@ func runRule(
}

router.WithPrefix(webRoutePrefix).Post("/-/reload", func(w http.ResponseWriter, r *http.Request) {
reload <- struct{}{}
reloadMsg := make(chan error)
reloadWebhandler <- reloadMsg
if err := <-reloadMsg; err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
})

flagsMap := map[string]string{
Expand Down Expand Up @@ -758,3 +756,42 @@ func addDiscoveryGroups(g *run.Group, c *http_util.Client, interval time.Duratio
cancel()
})
}

func reloadRules(logger log.Logger,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function has too many parameters, it makes it harder to read. And most of the parameters are metrics, consider using a struct to collect metrics as it had done in

type DownsampleMetrics struct {
downsamples *prometheus.CounterVec
downsampleFailures *prometheus.CounterVec
}

and pass it around.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kakkoyun Committed. Please help to review. Thanks.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good now 👍

ruleFiles []string,
ruleMgr *thanosrule.Manager,
evalInterval time.Duration,
metrics *RuleMetrics) error {
level.Debug(logger).Log("msg", "configured rule files", "files", strings.Join(ruleFiles, ","))
var (
errs tsdberrors.MultiError
files []string
)
for _, pat := range ruleFiles {
fs, err := filepath.Glob(pat)
if err != nil {
// The only error can be a bad pattern.
errs.Add(errors.Wrapf(err, "retrieving rule files failed. Ignoring file. pattern %s", pat))
continue
}

files = append(files, fs...)
}

level.Info(logger).Log("msg", "reload rule files", "numFiles", len(files))

if err := ruleMgr.Update(evalInterval, files); err != nil {
metrics.configSuccess.Set(0)
errs.Add(errors.Wrap(err, "reloading rules failed"))
return errs.Err()
}

metrics.configSuccess.Set(1)
metrics.configSuccessTime.Set(float64(time.Now().UnixNano()) / 1e9)

metrics.rulesLoaded.Reset()
for _, group := range ruleMgr.RuleGroups() {
metrics.rulesLoaded.WithLabelValues(group.PartialResponseStrategy.String(), group.File(), group.Name()).Set(float64(len(group.Rules())))
}
return errs.Err()
}