Skip to content

Commit

Permalink
block: sanity check metas for common corruptions
Browse files Browse the repository at this point in the history
Santiy check metas that are returned from fetcher for common issues. We
currently return metas that will lead to crashes in compactor and store,
this PR marks them as corrupted so that they will get cleaned up and are
visible in metrics.

Signed-off-by: Michael Hoffmann <mhoffm@posteo.de>
  • Loading branch information
MichaHoffmann committed Apr 16, 2024
1 parent 5fb0c69 commit 84ab6a0
Show file tree
Hide file tree
Showing 5 changed files with 127 additions and 18 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -39,6 +39,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re

- [#7123](https://github.com/thanos-io/thanos/pull/7123) Rule: Change default Alertmanager API version to v2.
- [#7223](https://github.com/thanos-io/thanos/pull/7223) Automatic detection of memory limits and configure GOMEMLIMIT to match.
- [#7282](https://github.com/thanos-io/thanos/pull/7282) Fetcher: mark metas with incomplete files as corrupted.

### Removed

Expand Down
48 changes: 47 additions & 1 deletion pkg/block/fetcher.go
Expand Up @@ -11,6 +11,7 @@ import (
"path"
"path/filepath"
"sort"
"strconv"
"strings"
"sync"
"time"
Expand All @@ -24,10 +25,10 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/relabel"
"github.com/thanos-io/objstore"
"golang.org/x/sync/errgroup"
"gopkg.in/yaml.v2"

"github.com/thanos-io/objstore"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/errutil"
"github.com/thanos-io/thanos/pkg/extprom"
Expand Down Expand Up @@ -434,6 +435,10 @@ func (f *BaseFetcher) loadMeta(ctx context.Context, id ulid.ULID) (*metadata.Met
return nil, errors.Wrapf(ErrorSyncMetaCorrupted, "meta.json %v unmarshal: %v", metaFile, err)
}

if err := sanityCheckFilesForMeta(m.Thanos.Files); err != nil {
return nil, errors.Wrapf(ErrorSyncMetaCorrupted, "meta.json %v not sane: %v", metaFile, err)
}

if m.Version != metadata.TSDBVersion1 {
return nil, errors.Errorf("unexpected meta file: %s version: %d", metaFile, m.Version)
}
Expand All @@ -451,6 +456,47 @@ func (f *BaseFetcher) loadMeta(ctx context.Context, id ulid.ULID) (*metadata.Met
return m, nil
}

func sanityCheckFilesForMeta(files []metadata.File) error {
var (
numChunkFiles int
highestChunkFile int
hasIndex bool
)

// Old metas might not have the Thanos.Files field yet, we dont want to mess with them
if len(files) == 0 {
return nil
}

for _, f := range files {
if f.RelPath == "index" {
hasIndex = true
}
dir, name := path.Split(f.RelPath)
if dir == "chunks/" {
numChunkFiles++
idx, err := strconv.Atoi(name)
if err != nil {
return errors.Wrap(err, "unexpected chunk file name")
}
if idx > highestChunkFile {
highestChunkFile = idx
}
}
}

if !hasIndex {
return errors.New("no index file in meta")
}
if numChunkFiles == 0 {
return errors.New("no chunk files in meta")
}
if numChunkFiles != highestChunkFile {
return errors.New("incomplete chunk files in meta")
}
return nil
}

type response struct {
metas map[ulid.ULID]*metadata.Meta
partial map[ulid.ULID]error
Expand Down
65 changes: 63 additions & 2 deletions pkg/block/fetcher_test.go
Expand Up @@ -23,11 +23,11 @@ import (
"github.com/prometheus/client_golang/prometheus"
promtest "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/prometheus/tsdb"
"github.com/thanos-io/objstore"
"github.com/thanos-io/objstore/objtesting"

"github.com/efficientgo/core/testutil"

"github.com/thanos-io/objstore"
"github.com/thanos-io/objstore/objtesting"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/extprom"
"github.com/thanos-io/thanos/pkg/model"
Expand Down Expand Up @@ -106,6 +106,7 @@ func TestMetaFetcher_Fetch(t *testing.T) {
var meta metadata.Meta
meta.Version = 1
meta.ULID = ULID(1)
meta.Thanos.Files = append(meta.Thanos.Files, metadata.File{RelPath: "index"}, metadata.File{RelPath: "chunks/000001"})

var buf bytes.Buffer
testutil.Ok(t, json.NewEncoder(&buf).Encode(&meta))
Expand Down Expand Up @@ -189,6 +190,7 @@ func TestMetaFetcher_Fetch(t *testing.T) {
var meta metadata.Meta
meta.Version = 1
meta.ULID = ULID(6)
meta.Thanos.Files = append(meta.Thanos.Files, metadata.File{RelPath: "index"}, metadata.File{RelPath: "chunks/000001"})

var buf bytes.Buffer
testutil.Ok(t, json.NewEncoder(&buf).Encode(&meta))
Expand Down Expand Up @@ -224,6 +226,7 @@ func TestMetaFetcher_Fetch(t *testing.T) {
var meta metadata.Meta
meta.Version = 20
meta.ULID = ULID(7)
meta.Thanos.Files = append(meta.Thanos.Files, metadata.File{RelPath: "index"}, metadata.File{RelPath: "chunks/000001"})

var buf bytes.Buffer
testutil.Ok(t, json.NewEncoder(&buf).Encode(&meta))
Expand All @@ -235,6 +238,64 @@ func TestMetaFetcher_Fetch(t *testing.T) {
expectedNoMeta: ULIDs(4),
expectedMetaErr: errors.New("incomplete view: unexpected meta file: 00000000070000000000000000/meta.json version: 20"),
},
{
name: "error: incomplete chunks",
do: func() {
var meta metadata.Meta
meta.Version = 1
meta.ULID = ULID(8)
meta.Thanos.Files = append(meta.Thanos.Files,
metadata.File{RelPath: "index"},
metadata.File{RelPath: "chunks/000001"},
metadata.File{RelPath: "chunks/000005"},
)

var buf bytes.Buffer
testutil.Ok(t, json.NewEncoder(&buf).Encode(&meta))
testutil.Ok(t, bkt.Upload(ctx, path.Join(meta.ULID.String(), metadata.MetaFilename), &buf))
},

expectedMetas: ULIDs(1, 3, 6),
expectedCorruptedMeta: ULIDs(5, 8),
expectedNoMeta: ULIDs(4),
expectedMetaErr: errors.New("incomplete view: unexpected meta file: 00000000070000000000000000/meta.json version: 20"),
},
{
name: "error: no index",
do: func() {
var meta metadata.Meta
meta.Version = 1
meta.ULID = ULID(9)
meta.Thanos.Files = append(meta.Thanos.Files, metadata.File{RelPath: "chunks/000001"})

var buf bytes.Buffer
testutil.Ok(t, json.NewEncoder(&buf).Encode(&meta))
testutil.Ok(t, bkt.Upload(ctx, path.Join(meta.ULID.String(), metadata.MetaFilename), &buf))
},

expectedMetas: ULIDs(1, 3, 6),
expectedCorruptedMeta: ULIDs(5, 8, 9),
expectedNoMeta: ULIDs(4),
expectedMetaErr: errors.New("incomplete view: unexpected meta file: 00000000070000000000000000/meta.json version: 20"),
},
{
name: "error: no chunks",
do: func() {
var meta metadata.Meta
meta.Version = 1
meta.ULID = ULID(10)
meta.Thanos.Files = append(meta.Thanos.Files, metadata.File{RelPath: "index"})

var buf bytes.Buffer
testutil.Ok(t, json.NewEncoder(&buf).Encode(&meta))
testutil.Ok(t, bkt.Upload(ctx, path.Join(meta.ULID.String(), metadata.MetaFilename), &buf))
},

expectedMetas: ULIDs(1, 3, 6),
expectedCorruptedMeta: ULIDs(5, 8, 9, 10),
expectedNoMeta: ULIDs(4),
expectedMetaErr: errors.New("incomplete view: unexpected meta file: 00000000070000000000000000/meta.json version: 20"),
},
} {
if ok := t.Run(tcase.name, func(t *testing.T) {
tcase.do()
Expand Down
1 change: 0 additions & 1 deletion pkg/shipper/shipper.go
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/prometheus/prometheus/tsdb/fileutil"

"github.com/thanos-io/objstore"

"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/runutil"
Expand Down
30 changes: 16 additions & 14 deletions pkg/testutil/e2eutil/prometheus.go
Expand Up @@ -516,6 +516,7 @@ func createBlockWithDelay(ctx context.Context, dir string, series []labels.Label
logger := log.NewNopLogger()
m.ULID = id
m.Compaction.Sources = []ulid.ULID{id}

if err := m.WriteToDir(logger, path.Join(dir, blockID.String())); err != nil {
return ulid.ULID{}, errors.Wrap(err, "write meta.json file")
}
Expand Down Expand Up @@ -622,28 +623,29 @@ func createBlock(
}

files := []metadata.File{}
if hashFunc != metadata.NoneFunc {
paths := []string{}
if err := filepath.Walk(blockDir, func(path string, info os.FileInfo, err error) error {
if info.IsDir() {
return nil
}
paths = append(paths, path)
paths := []string{}
if err := filepath.Walk(blockDir, func(path string, info os.FileInfo, err error) error {
if info.IsDir() {
return nil
}); err != nil {
return id, errors.Wrapf(err, "walking %s", dir)
}
paths = append(paths, path)
return nil
}); err != nil {
return id, errors.Wrapf(err, "walking %s", dir)
}

for _, p := range paths {
for _, p := range paths {
f := metadata.File{
RelPath: strings.TrimPrefix(p, blockDir+"/"),
}
if hashFunc != metadata.NoneFunc {
pHash, err := metadata.CalculateHash(p, metadata.SHA256Func, log.NewNopLogger())
if err != nil {
return id, errors.Wrapf(err, "calculating hash of %s", blockDir+p)
}
files = append(files, metadata.File{
RelPath: strings.TrimPrefix(p, blockDir+"/"),
Hash: &pHash,
})
f.Hash = &pHash
}
files = append(files, f)
}

if _, err = metadata.InjectThanos(logger, blockDir, metadata.Thanos{
Expand Down

0 comments on commit 84ab6a0

Please sign in to comment.