diff --git a/CHANGELOG.md b/CHANGELOG.md index 8baa7ac123..5f60ae70c3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re ### Fixed - [#5281](https://github.com/thanos-io/thanos/pull/5281) Blocks: Use correct separators for filesystem paths and object storage paths respectively. - [#5300](https://github.com/thanos-io/thanos/pull/5300) Query: Ignore cache on queries with deduplication off. +- [#5324](https://github.com/thanos-io/thanos/pull/5324) Reloader: Force trigger reload when config rollbacked ### Added diff --git a/pkg/reloader/reloader.go b/pkg/reloader/reloader.go index 6303bba564..4dcf06df74 100644 --- a/pkg/reloader/reloader.go +++ b/pkg/reloader/reloader.go @@ -96,6 +96,7 @@ type Reloader struct { lastCfgHash []byte lastWatchedDirsHash []byte + forceReload bool reloads prometheus.Counter reloadErrors prometheus.Counter @@ -352,7 +353,7 @@ func (r *Reloader) apply(ctx context.Context) error { watchedDirsHash = h.Sum(nil) } - if bytes.Equal(r.lastCfgHash, cfgHash) && bytes.Equal(r.lastWatchedDirsHash, watchedDirsHash) { + if !r.forceReload && bytes.Equal(r.lastCfgHash, cfgHash) && bytes.Equal(r.lastWatchedDirsHash, watchedDirsHash) { // Nothing to do. return nil } @@ -368,6 +369,7 @@ func (r *Reloader) apply(ctx context.Context) error { return errors.Wrap(err, "trigger reload") } + r.forceReload = false r.lastCfgHash = cfgHash r.lastWatchedDirsHash = watchedDirsHash level.Info(r.logger).Log( @@ -379,6 +381,7 @@ func (r *Reloader) apply(ctx context.Context) error { r.lastReloadSuccessTimestamp.SetToCurrentTime() return nil }); err != nil { + r.forceReload = true level.Error(r.logger).Log("msg", "Failed to trigger reload. Retrying.", "err", err) } diff --git a/pkg/reloader/reloader_test.go b/pkg/reloader/reloader_test.go index 9367ebfe3b..0b8412273c 100644 --- a/pkg/reloader/reloader_test.go +++ b/pkg/reloader/reloader_test.go @@ -167,6 +167,119 @@ config: testutil.Ok(t, os.Unsetenv("TEST_RELOADER_THANOS_ENV2")) } +func TestReloader_ConfigRollback(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) + defer cancel() + + l, err := net.Listen("tcp", "localhost:0") + testutil.Ok(t, err) + + correctConfig := []byte(` +config: + a: 1 +`) + faultyConfig := []byte(` +faulty_config: + a: 1 +`) + + dir, err := ioutil.TempDir("", "reloader-cfg-test") + testutil.Ok(t, err) + defer func() { testutil.Ok(t, os.RemoveAll(dir)) }() + + testutil.Ok(t, os.Mkdir(filepath.Join(dir, "in"), os.ModePerm)) + testutil.Ok(t, os.Mkdir(filepath.Join(dir, "out"), os.ModePerm)) + + var ( + input = filepath.Join(dir, "in", "cfg.yaml.tmpl") + output = filepath.Join(dir, "out", "cfg.yaml") + ) + + reloads := &atomic.Value{} + reloads.Store(0) + srv := &http.Server{} + + srv.Handler = http.HandlerFunc(func(resp http.ResponseWriter, r *http.Request) { + f, err := ioutil.ReadFile(output) + testutil.Ok(t, err) + + if string(f) == string(faultyConfig) { + resp.WriteHeader(http.StatusServiceUnavailable) + return + } + + reloads.Store(reloads.Load().(int) + 1) // The only writer. + resp.WriteHeader(http.StatusOK) + }) + go func() { _ = srv.Serve(l) }() + defer func() { testutil.Ok(t, srv.Close()) }() + + reloadURL, err := url.Parse(fmt.Sprintf("http://%s", l.Addr().String())) + testutil.Ok(t, err) + + reloader := New(nil, nil, &Options{ + ReloadURL: reloadURL, + CfgFile: input, + CfgOutputFile: output, + WatchedDirs: nil, + WatchInterval: 10 * time.Second, // 10 seconds to make the reload of faulty config fail quick + RetryInterval: 100 * time.Millisecond, + DelayInterval: 1 * time.Millisecond, + }) + + testutil.Ok(t, ioutil.WriteFile(input, correctConfig, os.ModePerm)) + + rctx, cancel2 := context.WithCancel(ctx) + g := sync.WaitGroup{} + g.Add(1) + go func() { + defer g.Done() + testutil.Ok(t, reloader.Watch(rctx)) + }() + + reloadsSeen := 0 + faulty := false + + for { + select { + case <-ctx.Done(): + t.Fatalf("Timeout with faulty = %t, reloadsSeen = %d", faulty, reloadsSeen) + case <-time.After(300 * time.Millisecond): + } + + rel := reloads.Load().(int) + reloadsSeen = rel + + if reloadsSeen == 1 && !faulty { + // Initial apply seen (without doing anything). + f, err := ioutil.ReadFile(output) + testutil.Ok(t, err) + testutil.Equals(t, string(correctConfig), string(f)) + + // Change to a faulty config + testutil.Ok(t, ioutil.WriteFile(input, faultyConfig, os.ModePerm)) + faulty = true + } else if reloadsSeen == 1 && faulty { + // Faulty config will trigger a reload, but reload failed + f, err := ioutil.ReadFile(output) + testutil.Ok(t, err) + testutil.Equals(t, string(faultyConfig), string(f)) + + // Rollback config + testutil.Ok(t, ioutil.WriteFile(input, correctConfig, os.ModePerm)) + } else if reloadsSeen >= 2 { + // Rollback to previous config should trigger a reload + f, err := ioutil.ReadFile(output) + testutil.Ok(t, err) + testutil.Equals(t, string(correctConfig), string(f)) + + break + } + } + cancel2() + g.Wait() +} + func TestReloader_DirectoriesApply(t *testing.T) { l, err := net.Listen("tcp", "localhost:0") testutil.Ok(t, err)