Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reloader: Force trigger reload when config rollbacked #5324

Merged
merged 1 commit into from May 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -23,6 +23,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
### Fixed
- [#5281](https://github.com/thanos-io/thanos/pull/5281) Blocks: Use correct separators for filesystem paths and object storage paths respectively.
- [#5300](https://github.com/thanos-io/thanos/pull/5300) Query: Ignore cache on queries with deduplication off.
- [#5324](https://github.com/thanos-io/thanos/pull/5324) Reloader: Force trigger reload when config rollbacked

### Added

Expand Down
5 changes: 4 additions & 1 deletion pkg/reloader/reloader.go
Expand Up @@ -96,6 +96,7 @@ type Reloader struct {

lastCfgHash []byte
lastWatchedDirsHash []byte
forceReload bool

reloads prometheus.Counter
reloadErrors prometheus.Counter
Expand Down Expand Up @@ -352,7 +353,7 @@ func (r *Reloader) apply(ctx context.Context) error {
watchedDirsHash = h.Sum(nil)
}

if bytes.Equal(r.lastCfgHash, cfgHash) && bytes.Equal(r.lastWatchedDirsHash, watchedDirsHash) {
if !r.forceReload && bytes.Equal(r.lastCfgHash, cfgHash) && bytes.Equal(r.lastWatchedDirsHash, watchedDirsHash) {
// Nothing to do.
return nil
}
Expand All @@ -368,6 +369,7 @@ func (r *Reloader) apply(ctx context.Context) error {
return errors.Wrap(err, "trigger reload")
}

r.forceReload = false
r.lastCfgHash = cfgHash
r.lastWatchedDirsHash = watchedDirsHash
level.Info(r.logger).Log(
Expand All @@ -379,6 +381,7 @@ func (r *Reloader) apply(ctx context.Context) error {
r.lastReloadSuccessTimestamp.SetToCurrentTime()
return nil
}); err != nil {
r.forceReload = true
level.Error(r.logger).Log("msg", "Failed to trigger reload. Retrying.", "err", err)
}

Expand Down
113 changes: 113 additions & 0 deletions pkg/reloader/reloader_test.go
Expand Up @@ -167,6 +167,119 @@ config:
testutil.Ok(t, os.Unsetenv("TEST_RELOADER_THANOS_ENV2"))
}

func TestReloader_ConfigRollback(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
defer cancel()

l, err := net.Listen("tcp", "localhost:0")
testutil.Ok(t, err)

correctConfig := []byte(`
config:
a: 1
`)
faultyConfig := []byte(`
faulty_config:
a: 1
`)

dir, err := ioutil.TempDir("", "reloader-cfg-test")
testutil.Ok(t, err)
defer func() { testutil.Ok(t, os.RemoveAll(dir)) }()

testutil.Ok(t, os.Mkdir(filepath.Join(dir, "in"), os.ModePerm))
testutil.Ok(t, os.Mkdir(filepath.Join(dir, "out"), os.ModePerm))

var (
input = filepath.Join(dir, "in", "cfg.yaml.tmpl")
output = filepath.Join(dir, "out", "cfg.yaml")
)

reloads := &atomic.Value{}
reloads.Store(0)
srv := &http.Server{}

srv.Handler = http.HandlerFunc(func(resp http.ResponseWriter, r *http.Request) {
f, err := ioutil.ReadFile(output)
testutil.Ok(t, err)

if string(f) == string(faultyConfig) {
resp.WriteHeader(http.StatusServiceUnavailable)
return
}

reloads.Store(reloads.Load().(int) + 1) // The only writer.
resp.WriteHeader(http.StatusOK)
})
go func() { _ = srv.Serve(l) }()
defer func() { testutil.Ok(t, srv.Close()) }()

reloadURL, err := url.Parse(fmt.Sprintf("http://%s", l.Addr().String()))
testutil.Ok(t, err)

reloader := New(nil, nil, &Options{
ReloadURL: reloadURL,
CfgFile: input,
CfgOutputFile: output,
WatchedDirs: nil,
WatchInterval: 10 * time.Second, // 10 seconds to make the reload of faulty config fail quick
RetryInterval: 100 * time.Millisecond,
DelayInterval: 1 * time.Millisecond,
})

testutil.Ok(t, ioutil.WriteFile(input, correctConfig, os.ModePerm))

rctx, cancel2 := context.WithCancel(ctx)
g := sync.WaitGroup{}
g.Add(1)
go func() {
defer g.Done()
testutil.Ok(t, reloader.Watch(rctx))
}()

reloadsSeen := 0
faulty := false

for {
select {
case <-ctx.Done():
t.Fatalf("Timeout with faulty = %t, reloadsSeen = %d", faulty, reloadsSeen)
case <-time.After(300 * time.Millisecond):
}

rel := reloads.Load().(int)
reloadsSeen = rel

if reloadsSeen == 1 && !faulty {
// Initial apply seen (without doing anything).
f, err := ioutil.ReadFile(output)
testutil.Ok(t, err)
testutil.Equals(t, string(correctConfig), string(f))

// Change to a faulty config
testutil.Ok(t, ioutil.WriteFile(input, faultyConfig, os.ModePerm))
faulty = true
} else if reloadsSeen == 1 && faulty {
// Faulty config will trigger a reload, but reload failed
f, err := ioutil.ReadFile(output)
testutil.Ok(t, err)
testutil.Equals(t, string(faultyConfig), string(f))

// Rollback config
testutil.Ok(t, ioutil.WriteFile(input, correctConfig, os.ModePerm))
} else if reloadsSeen >= 2 {
// Rollback to previous config should trigger a reload
f, err := ioutil.ReadFile(output)
testutil.Ok(t, err)
testutil.Equals(t, string(correctConfig), string(f))

break
}
}
cancel2()
g.Wait()
}

func TestReloader_DirectoriesApply(t *testing.T) {
l, err := net.Listen("tcp", "localhost:0")
testutil.Ok(t, err)
Expand Down