Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix reader getting wrong posting offsets when querying multiple values #7301

Merged
merged 8 commits into from May 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -24,6 +24,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#7271](https://github.com/thanos-io/thanos/pull/7271) Query: fixing dedup iterator when working on mixed sample types.
- [#7289](https://github.com/thanos-io/thanos/pull/7289) Query Frontend: show warnings from downstream queries.
- [#7308](https://github.com/thanos-io/thanos/pull/7308) Store: Batch TSDB Infos for blocks.
- [#7301](https://github.com/thanos-io/thanos/pull/7301) Store Gateway: fix index header reader `PostingsOffsets` returning wrong values.

### Added

Expand Down
12 changes: 11 additions & 1 deletion pkg/block/indexheader/binary_reader.go
Expand Up @@ -875,6 +875,7 @@ func (r *BinaryReader) postingsOffset(name string, values ...string) ([]index.Ra

// Iterate on the offset table.
newSameRngs = newSameRngs[:0]
Iter:
for d.Err() == nil {
// Posting format entry is as follows:
// │ ┌────────────────────────────────────────┐ │
Expand Down Expand Up @@ -916,6 +917,15 @@ func (r *BinaryReader) postingsOffset(name string, values ...string) ([]index.Ra
break
}
wantedValue = values[valueIndex]
// Only do this if there is no new range added. If there is an existing
// range we want to continue iterating the offset table to get the end.
if len(newSameRngs) == 0 && i+1 < len(e.offsets) {
// We want to limit this loop within e.offsets[i, i+1). So when the wanted value
// is >= e.offsets[i+1], go out of the loop and binary search again.
if wantedValue >= e.offsets[i+1].value {
break Iter
}
}
}

if i+1 == len(e.offsets) {
Expand All @@ -942,7 +952,7 @@ func (r *BinaryReader) postingsOffset(name string, values ...string) ([]index.Ra

if len(newSameRngs) > 0 {
// We added some ranges in this iteration. Use next posting offset as the end of our ranges.
// We know it exists as we never go further in this loop than e.offsets[i, i+1].
// We know it exists as we never go further in this loop than e.offsets[i, i+1).

skipNAndName(&d, &buf)
d.UvarintBytes() // Label value.
Expand Down
122 changes: 114 additions & 8 deletions pkg/block/indexheader/header_test.go
Expand Up @@ -7,9 +7,12 @@ import (
"context"
"fmt"
"math"
"math/rand"
"path/filepath"
"sort"
"strconv"
"testing"
"time"

"github.com/go-kit/log"
"github.com/oklog/ulid"
Expand All @@ -18,6 +21,7 @@ import (
"github.com/prometheus/prometheus/tsdb/encoding"
"github.com/prometheus/prometheus/tsdb/fileutil"
"github.com/prometheus/prometheus/tsdb/index"
"github.com/stretchr/testify/require"
"github.com/thanos-io/objstore"
"github.com/thanos-io/objstore/providers/filesystem"

Expand Down Expand Up @@ -53,6 +57,9 @@ func TestReaders(t *testing.T) {
{{Name: "a", Value: "13"}},
{{Name: "a", Value: "1"}, {Name: "longer-string", Value: "1"}},
{{Name: "a", Value: "1"}, {Name: "longer-string", Value: "2"}},
{{Name: "cluster", Value: "a-eu-west-1"}},
{{Name: "cluster", Value: "b-eu-west-1"}},
{{Name: "cluster", Value: "c-eu-west-1"}},
}, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "1"}}, 124, metadata.NoneFunc)
testutil.Ok(t, err)

Expand Down Expand Up @@ -108,15 +115,15 @@ func TestReaders(t *testing.T) {
if id == id1 {
testutil.Equals(t, 1, br.version)
testutil.Equals(t, 2, br.indexVersion)
testutil.Equals(t, &BinaryTOC{Symbols: headerLen, PostingsOffsetTable: 70}, br.toc)
testutil.Equals(t, int64(710), br.indexLastPostingEnd)
testutil.Equals(t, &BinaryTOC{Symbols: headerLen, PostingsOffsetTable: 114}, br.toc)
testutil.Equals(t, int64(905), br.indexLastPostingEnd)
testutil.Equals(t, 8, br.symbols.Size())
testutil.Equals(t, 0, len(br.postingsV1))
testutil.Equals(t, 2, len(br.nameSymbols))
testutil.Equals(t, 3, len(br.nameSymbols))
testutil.Equals(t, map[string]*postingValueOffsets{
"": {
offsets: []postingOffset{{value: "", tableOff: 4}},
lastValOffset: 440,
lastValOffset: 576,
},
"a": {
offsets: []postingOffset{
Expand All @@ -126,14 +133,21 @@ func TestReaders(t *testing.T) {
{value: "7", tableOff: 75},
{value: "9", tableOff: 89},
},
lastValOffset: 640,
lastValOffset: 776,
},
"cluster": {
offsets: []postingOffset{
{value: "a-eu-west-1", tableOff: 96},
{value: "c-eu-west-1", tableOff: 142},
},
lastValOffset: 824,
},
"longer-string": {
offsets: []postingOffset{
{value: "1", tableOff: 96},
{value: "2", tableOff: 115},
{value: "1", tableOff: 165},
{value: "2", tableOff: 184},
},
lastValOffset: 706,
lastValOffset: 901,
},
}, br.postings)

Expand Down Expand Up @@ -173,6 +187,17 @@ func TestReaders(t *testing.T) {
testutil.Assert(t, rngs[2].End > rngs[2].Start)
testutil.Equals(t, NotFoundRange, rngs[1])

// 3 values exist and 3 values don't exist.
rngs, err = br.PostingsOffsets("cluster", "a-eu-west-1", "a-us-west-2", "b-eu-west-1", "b-us-east-1", "c-eu-west-1", "c-us-east-2")
testutil.Ok(t, err)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without this fix, it will cause error invalid index size.

for i := 0; i < len(rngs); i++ {
if i%2 == 0 {
testutil.Assert(t, rngs[i].End > rngs[i].Start)
} else {
testutil.Equals(t, NotFoundRange, rngs[i])
}
}

// Regression tests for https://github.com/thanos-io/thanos/issues/2213.
// Most of not existing value was working despite bug, except in certain unlucky cases
// it was causing "invalid size" errors.
Expand Down Expand Up @@ -521,3 +546,84 @@ func readSymbols(bs index.ByteSlice, version, off int) ([]string, map[uint32]str
}
return symbolSlice, symbols, errors.Wrap(d.Err(), "read symbols")
}

// The idea of this test case is to make sure that reader.PostingsOffsets and
// reader.PostingsOffset get the same index ranges for required label values.
func TestReaderPostingsOffsets(t *testing.T) {
ctx := context.Background()

tmpDir := t.TempDir()

possibleClusters := []string{"us-west-2", "us-east-1", "us-east-2", "eu-west-1", "eu-central-1", "ap-southeast-1", "ap-south-1"}
possiblePrefixes := []string{"a", "b", "c", "d", "1", "2", "3", "4"}
totalValues := []string{}
for i := 0; i < len(possibleClusters); i++ {
for j := 0; j < len(possiblePrefixes); j++ {
totalValues = append(totalValues, fmt.Sprintf("%s-%s", possiblePrefixes[j], possibleClusters[i]))
}
}

rnd := rand.New(rand.NewSource(time.Now().Unix()))
// Pick 5 label values to be used in the block.
clusterLbls := make([]labels.Labels, 0)
valueSet := map[int]struct{}{}
for i := 0; i < 5; {
idx := rnd.Intn(len(totalValues))
if _, ok := valueSet[idx]; ok {
continue
}
valueSet[idx] = struct{}{}
clusterLbls = append(clusterLbls, []labels.Label{
{Name: "cluster", Value: totalValues[idx]},
})
i++
}

// Add additional labels.
lbls := append([]labels.Labels{
{{Name: "job", Value: "1"}},
{{Name: "job", Value: "2"}},
{{Name: "job", Value: "3"}},
{{Name: "job", Value: "4"}},
{{Name: "job", Value: "5"}},
{{Name: "job", Value: "6"}},
{{Name: "job", Value: "7"}},
{{Name: "job", Value: "8"}},
{{Name: "job", Value: "9"}}}, clusterLbls...)
bkt, err := filesystem.NewBucket(filepath.Join(tmpDir, "bkt"))
testutil.Ok(t, err)
defer func() { testutil.Ok(t, bkt.Close()) }()
id, err := e2eutil.CreateBlock(ctx, tmpDir, lbls, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "1"}}, 124, metadata.NoneFunc)
testutil.Ok(t, err)

testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, id.String()), metadata.NoneFunc))

fn := filepath.Join(tmpDir, id.String(), block.IndexHeaderFilename)
_, err = WriteBinary(ctx, bkt, id, fn)
testutil.Ok(t, err)

br, err := NewBinaryReader(ctx, log.NewNopLogger(), nil, tmpDir, id, 3, NewBinaryReaderMetrics(nil))
testutil.Ok(t, err)

defer func() { testutil.Ok(t, br.Close()) }()

for i := 0; i < 100; i++ {
vals := make([]string, 0, 15)
for j := 0; j < 15; j++ {
vals = append(vals, totalValues[rnd.Intn(len(totalValues))])
}
sort.Strings(vals)
rngs, err := br.PostingsOffsets("cluster", vals...)
require.NoError(t, err)
rngs2 := make([]index.Range, 0)
for _, val := range vals {
rng2, err2 := br.PostingsOffset("cluster", val)
if err2 == NotFoundRangeErr {
rngs2 = append(rngs2, NotFoundRange)
} else {
rngs2 = append(rngs2, rng2)
}
}
require.Equal(t, rngs2, rngs, "Got mismatched results from batched and non-batched API.\nInput cluster labels: %v.\nValues queried: %v", clusterLbls, vals)
}
}
11 changes: 8 additions & 3 deletions pkg/store/lazy_postings.go
Expand Up @@ -8,6 +8,7 @@ import (
"math"
"strings"

"github.com/go-kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/labels"
Expand Down Expand Up @@ -54,14 +55,18 @@ func optimizePostingsFetchByDownloadedBytes(r *bucketIndexReader, postingGroups
return nil, false, errors.Wrapf(err, "postings offsets for %s", pg.name)
}

for _, r := range rngs {
if r == indexheader.NotFoundRange {
for _, rng := range rngs {
if rng == indexheader.NotFoundRange {
continue
}
if rng.End <= rng.Start {
level.Error(r.block.logger).Log("msg", "invalid index range, fallback to non lazy posting optimization")
return postingGroups, false, nil
}
// Each range starts from the #entries field which is 4 bytes.
// Need to subtract it when calculating number of postings.
// https://github.com/prometheus/prometheus/blob/v2.46.0/tsdb/docs/format/index.md.
pg.cardinality += (r.End - r.Start - 4) / 4
pg.cardinality += (rng.End - rng.Start - 4) / 4
}
// If the posting group adds keys, 0 cardinality means the posting doesn't exist.
// If the posting group removes keys, no posting ranges found is fine as it is a noop.
Expand Down