Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

compact: wire roaring bitmaps to compactor #7109

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
23 changes: 22 additions & 1 deletion cmd/thanos/compact.go
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/prometheus/common/route"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/index"

"github.com/thanos-io/objstore"
"github.com/thanos-io/objstore/client"
Expand All @@ -42,6 +43,7 @@ import (
"github.com/thanos-io/thanos/pkg/extprom"
extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http"
"github.com/thanos-io/thanos/pkg/logging"
"github.com/thanos-io/thanos/pkg/postings"
"github.com/thanos-io/thanos/pkg/prober"
"github.com/thanos-io/thanos/pkg/runutil"
httpserver "github.com/thanos-io/thanos/pkg/server/http"
Expand Down Expand Up @@ -328,9 +330,26 @@ func runCompact(
return errors.Errorf("unsupported deduplication func, got %s", conf.dedupFunc)
}

var pe index.PostingsEncoder = index.EncodePostingsRaw
if conf.postingsFormat == postings.RoaringEncoder {
pe = postings.EncodePostingsRoaring
}

// Instantiate the compactor with different time slices. Timestamps in TSDB
// are in milliseconds.
comp, err := tsdb.NewLeveledCompactor(ctx, reg, logger, levels, downsample.NewPool(), mergeFunc)

comp, err := tsdb.NewLeveledCompactorWithOptions(ctx, reg, logger, levels, downsample.NewPool(), tsdb.LeveledCompactorOptions{
MergeFunc: mergeFunc,
PE: pe,
PD: func(meta *tsdb.BlockMeta) index.PostingsDecoder {
for _, h := range meta.Compaction.Hints {
if h == "roaring" {
return postings.DecodePostingsRoaring
}
}
return index.DecodePostingsRaw
},
})
if err != nil {
return errors.Wrap(err, "create compactor")
}
Expand Down Expand Up @@ -715,6 +734,7 @@ type compactConfig struct {
progressCalculateInterval time.Duration
filterConf *store.FilterConfig
disableAdminOperations bool
postingsFormat string
}

func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) {
Expand Down Expand Up @@ -767,6 +787,7 @@ func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) {
cmd.Flag("compact.progress-interval", "Frequency of calculating the compaction progress in the background when --wait has been enabled. Setting it to \"0s\" disables it. Now compaction, downsampling and retention progress are supported.").
Default("5m").DurationVar(&cc.progressCalculateInterval)

cmd.Flag("compact.postings-format", "Format to use in resulting blocks.").Default(postings.RawEncoder).EnumVar(&cc.postingsFormat, postings.RoaringEncoder, postings.RawEncoder)
cmd.Flag("compact.concurrency", "Number of goroutines to use when compacting groups.").
Default("1").IntVar(&cc.compactionConcurrency)
cmd.Flag("compact.blocks-fetch-concurrency", "Number of goroutines to use when download block during compaction.").
Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/downsample.go
Expand Up @@ -375,7 +375,7 @@ func processDownsampling(
pool = downsample.NewPool()
}

b, err := tsdb.OpenBlock(logger, bdir, pool)
b, err := tsdb.OpenBlock(logger, bdir, pool, nil)
if err != nil {
return errors.Wrapf(err, "open block %s", m.ULID)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/tools_bucket.go
Expand Up @@ -1232,7 +1232,7 @@ func registerBucketRewrite(app extkingpin.AppClause, objStoreConfig *extflag.Pat
if err != nil {
return errors.Wrapf(err, "read meta of %v", id)
}
b, err := tsdb.OpenBlock(logger, filepath.Join(tbc.tmpDir, id.String()), chunkPool)
b, err := tsdb.OpenBlock(logger, filepath.Join(tbc.tmpDir, id.String()), chunkPool, nil)
if err != nil {
return errors.Wrapf(err, "open block %v", id)
}
Expand Down
5 changes: 5 additions & 0 deletions go.mod
Expand Up @@ -116,6 +116,7 @@ require (
)

require (
github.com/RoaringBitmap/roaring v1.9.0
github.com/mitchellh/go-ps v1.0.0
github.com/onsi/gomega v1.27.10
github.com/prometheus-community/prom-label-proxy v0.8.1-0.20240127162815-c1195f9aabc0
Expand All @@ -126,12 +127,14 @@ require (

require (
github.com/bboreham/go-loser v0.0.0-20230920113527-fcc2c21820a3 // indirect
github.com/bits-and-blooms/bitset v1.12.0 // indirect
github.com/go-openapi/runtime v0.27.1 // indirect
github.com/golang-jwt/jwt/v5 v5.2.0 // indirect
github.com/google/s2a-go v0.1.7 // indirect
github.com/hashicorp/go-version v1.6.0 // indirect
github.com/huaweicloud/huaweicloud-sdk-go-obs v3.23.3+incompatible // indirect
github.com/metalmatze/signal v0.0.0-20210307161603-1c9aa721a97a // indirect
github.com/mschoch/smat v0.2.0 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/onsi/ginkgo v1.16.5 // indirect
github.com/sercand/kuberesolver/v4 v4.0.0 // indirect
Expand Down Expand Up @@ -268,6 +271,8 @@ replace (
// Required by Cortex https://github.com/cortexproject/cortex/pull/3051.
github.com/bradfitz/gomemcache => github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab

github.com/prometheus/prometheus => github.com/yeya24/prometheus v1.8.2-0.20240212194858-9b851e143ae3

github.com/vimeo/galaxycache => github.com/thanos-community/galaxycache v0.0.0-20211122094458-3a32041a1f1e

// Override due to https://github.com/weaveworks/common/issues/239
Expand Down
10 changes: 8 additions & 2 deletions go.sum
Expand Up @@ -639,6 +639,8 @@ github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAE
github.com/OneOfOne/xxhash v1.2.6 h1:U68crOE3y3MPttCMQGywZOLrTeF5HHJ3/vDBCJn9/bA=
github.com/OneOfOne/xxhash v1.2.6/go.mod h1:eZbhyaAYD41SGSSsnmcpxVoRiQ/MPUTjUdIIOT9Um7Q=
github.com/QcloudApi/qcloud_sign_golang v0.0.0-20141224014652-e4130a326409/go.mod h1:1pk82RBxDY/JZnPQrtqHlUFfCctgdorsd9M06fMynOM=
github.com/RoaringBitmap/roaring v1.9.0 h1:lwKhr90/j0jVXJyh5X+vQN1VVn77rQFfYnh6RDRGCcE=
github.com/RoaringBitmap/roaring v1.9.0/go.mod h1:6AXUsoIEzDTFFQCe1RbGA6uFONMhvejWj5rqITANK90=
github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo=
github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI=
github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg=
Expand Down Expand Up @@ -723,6 +725,8 @@ github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+Ce
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
github.com/bits-and-blooms/bitset v1.12.0 h1:U/q1fAF7xXRhFCrhROzIfffYnu+dlS38vCZtmFVPHmA=
github.com/bits-and-blooms/bitset v1.12.0/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8=
github.com/blang/semver/v4 v4.0.0 h1:1PFHFE6yCCTv8C1TeyNNarDzntLi7wMI5i/pzqYIsAM=
github.com/blang/semver/v4 v4.0.0/go.mod h1:IbckMUScFkM3pff0VJDNKRiT6TG/YpiHIM2yvyW5YoQ=
github.com/boombuler/barcode v1.0.0/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8=
Expand Down Expand Up @@ -1302,6 +1306,8 @@ github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjY
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc=
github.com/mozillazg/go-httpheader v0.2.1 h1:geV7TrjbL8KXSyvghnFm+NyTux/hxwueTSrwhe88TQQ=
github.com/mozillazg/go-httpheader v0.2.1/go.mod h1:jJ8xECTlalr6ValeXYdOF8fFUISeBAdw6E61aqQma60=
github.com/mschoch/smat v0.2.0 h1:8imxQsjDm8yFEAVBe7azKmKSgzSkZXDuKkSq9374khM=
github.com/mschoch/smat v0.2.0/go.mod h1:kc9mz7DoBKqDyiRL7VZN8KvXQMWeTaVnttLRXOlotKw=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
Expand Down Expand Up @@ -1449,8 +1455,6 @@ github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1
github.com/prometheus/procfs v0.8.0/go.mod h1:z7EfXMXOkbkqb9IINtpCn86r/to3BnA0uaxHdg830/4=
github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo=
github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo=
github.com/prometheus/prometheus v0.49.2-0.20240126144015-960b6266e2ba h1:bDCs3jd+3KURFIDykicCeNfa573KYVZGhN4F62WHTmI=
github.com/prometheus/prometheus v0.49.2-0.20240126144015-960b6266e2ba/go.mod h1:FvE8dtQ1Ww63IlyKBn1V4s+zMwF9kHkVNkQBR1pM4CU=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/redis/rueidis v1.0.14-go1.18 h1:dGir5z8w8X1ex7JWO/Zx2FMBrZgQ8Yjm+lw9fPLSNGw=
github.com/redis/rueidis v1.0.14-go1.18/go.mod h1:HGekzV3HbmzFmRK6j0xic8Z9119+ECoGMjeN1TV1NYU=
Expand Down Expand Up @@ -1565,6 +1569,8 @@ github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT
github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4=
github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
github.com/yeya24/prometheus v1.8.2-0.20240212194858-9b851e143ae3 h1:0HLRlryLLRh4rxg0IR4cuLEGRMAO/KDKQWyObE57tiM=
github.com/yeya24/prometheus v1.8.2-0.20240212194858-9b851e143ae3/go.mod h1:FvE8dtQ1Ww63IlyKBn1V4s+zMwF9kHkVNkQBR1pM4CU=
github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7JulP+udvsHwJoVG1YGAP6VLg4y9I5dyZdqmA=
github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
Expand Down
23 changes: 19 additions & 4 deletions pkg/block/index.go
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/prometheus/prometheus/tsdb/index"

"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/postings"
"github.com/thanos-io/thanos/pkg/runutil"
)

Expand Down Expand Up @@ -214,16 +215,30 @@ func (n *minMaxSumInt64) Avg() int64 {
// It considers https://github.com/prometheus/tsdb/issues/347 as something that Thanos can handle.
// See HealthStats.Issue347OutsideChunks for details.
func GatherIndexHealthStats(ctx context.Context, logger log.Logger, fn string, minTime, maxTime int64) (stats HealthStats, err error) {
r, err := index.NewFileReader(fn)
r, err := index.NewFileReader(fn, postings.DecodePostingsRoaring)
if err != nil {
return stats, errors.Wrap(err, "open index file")
return stats, fmt.Errorf("open index: %w", err)
}

defer runutil.CloseWithErrCapture(&err, r, "gather index issue file reader")

key, value := index.AllPostingsKey()
p, err := r.Postings(ctx, key, value)
if err != nil {
return stats, errors.Wrap(err, "get all postings")
rd, err := index.NewFileReader(fn, nil)
if err != nil {
return stats, fmt.Errorf("open index: %w", err)
}

defer runutil.CloseWithErrCapture(&err, rd, "gather index issue file reader")

r = rd

ps, err := r.Postings(ctx, key, value)
if err != nil {
return stats, fmt.Errorf("postings: %w", err)
}
p = ps
}
var (
lset labels.Labels
Expand Down Expand Up @@ -427,7 +442,7 @@ func Repair(ctx context.Context, logger log.Logger, dir string, id ulid.ULID, so
return resid, errors.New("cannot repair downsampled block")
}

b, err := tsdb.OpenBlock(logger, bdir, nil)
b, err := tsdb.OpenBlock(logger, bdir, nil, nil)
if err != nil {
return resid, errors.Wrap(err, "open block")
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/block/index_test.go
Expand Up @@ -37,7 +37,7 @@ func TestRewrite(t *testing.T) {
}, 150, 0, 1000, labels.EmptyLabels(), 124, metadata.NoneFunc)
testutil.Ok(t, err)

ir, err := index.NewFileReader(filepath.Join(tmpDir, b.String(), IndexFilename))
ir, err := index.NewFileReader(filepath.Join(tmpDir, b.String(), IndexFilename), nil)
testutil.Ok(t, err)

defer func() { testutil.Ok(t, ir.Close()) }()
Expand Down Expand Up @@ -69,7 +69,7 @@ func TestRewrite(t *testing.T) {
testutil.Ok(t, iw.Close())
testutil.Ok(t, cw.Close())

ir2, err := index.NewFileReader(filepath.Join(tmpDir, m.ULID.String(), IndexFilename))
ir2, err := index.NewFileReader(filepath.Join(tmpDir, m.ULID.String(), IndexFilename), nil)
testutil.Ok(t, err)

defer func() { testutil.Ok(t, ir2.Close()) }()
Expand Down
4 changes: 2 additions & 2 deletions pkg/compact/downsample/downsample_test.go
Expand Up @@ -1159,7 +1159,7 @@ func TestDownsample(t *testing.T) {
_, err = metadata.ReadFromDir(filepath.Join(dir, id.String()))
testutil.Ok(t, err)

indexr, err := index.NewFileReader(filepath.Join(dir, id.String(), block.IndexFilename))
indexr, err := index.NewFileReader(filepath.Join(dir, id.String(), block.IndexFilename), nil)
testutil.Ok(t, err)
defer func() { testutil.Ok(t, indexr.Close()) }()

Expand Down Expand Up @@ -1288,7 +1288,7 @@ func TestDownsampleAggrAndNonEmptyXORChunks(t *testing.T) {
_, err = metadata.ReadFromDir(filepath.Join(dir, id.String()))
testutil.Ok(t, err)

indexr, err := index.NewFileReader(filepath.Join(dir, id.String(), block.IndexFilename))
indexr, err := index.NewFileReader(filepath.Join(dir, id.String(), block.IndexFilename), nil)
testutil.Ok(t, err)
defer func() { testutil.Ok(t, indexr.Close()) }()

Expand Down
4 changes: 2 additions & 2 deletions pkg/compactv2/compactor_test.go
Expand Up @@ -612,7 +612,7 @@ func TestCompactor_WriteSeries_e2e(t *testing.T) {
testutil.Ok(t, createBlockSeries(bdir, b))
// Meta does not matter, but let's create for OpenBlock to work.
testutil.Ok(t, metadata.Meta{BlockMeta: tsdb.BlockMeta{Version: 1, ULID: id}}.WriteToDir(logger, bdir))
block, err := tsdb.OpenBlock(logger, bdir, chunkPool)
block, err := tsdb.OpenBlock(logger, bdir, chunkPool, nil)
testutil.Ok(t, err)
blocks = append(blocks, block)
}
Expand Down Expand Up @@ -653,7 +653,7 @@ type seriesSamples struct {
func readBlockSeries(t *testing.T, bDir string) []seriesSamples {
ctx := context.Background()

indexr, err := index.NewFileReader(filepath.Join(bDir, block.IndexFilename))
indexr, err := index.NewFileReader(filepath.Join(bDir, block.IndexFilename), nil)
testutil.Ok(t, err)
defer indexr.Close()

Expand Down
78 changes: 78 additions & 0 deletions pkg/postings/roaringbitmap.go
@@ -0,0 +1,78 @@
package postings

import (
"bytes"
"fmt"

"github.com/RoaringBitmap/roaring"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/encoding"
"github.com/prometheus/prometheus/tsdb/index"
)

const (
RoaringEncoder = "roaringbitmap"
RawEncoder = "raw"
)

var _ index.PostingsEncoder = EncodePostingsRoaring

func EncodePostingsRoaring(e *encoding.Encbuf, in []uint32) error {
bm := roaring.NewBitmap()

bm.AddMany(in)
bm.RunOptimize()

out, err := bm.MarshalBinary()
if err != nil {
return fmt.Errorf("marshaling roaring bitmap: %w", err)
}

e.PutBytes(out)
return nil
}

var _ index.PostingsDecoder = DecodePostingsRoaring

func DecodePostingsRoaring(b []byte) (int, index.Postings, error) {
rb := roaring.New()

_, err := rb.ReadFrom(bytes.NewReader(b))
if err != nil {
return 0, nil, err
}

rbIter := rb.Iterator()

return int(rb.GetCardinality()), &postingsRoaringWrapper{IntPeekable: rbIter}, nil
}

type postingsRoaringWrapper struct {
roaring.IntPeekable

cur storage.SeriesRef
}

func (w *postingsRoaringWrapper) At() storage.SeriesRef {
return w.cur
}

func (w *postingsRoaringWrapper) Next() bool {
if !w.IntPeekable.HasNext() {
return false
}

w.cur = storage.SeriesRef(w.IntPeekable.Next())

return true
}

func (w *postingsRoaringWrapper) Err() error {
return nil
}

func (w *postingsRoaringWrapper) Seek(v storage.SeriesRef) bool {
w.IntPeekable.AdvanceIfNeeded(uint32(v))

return w.IntPeekable.HasNext()
}
20 changes: 20 additions & 0 deletions pkg/postings/roaringbitmap_test.go
@@ -0,0 +1,20 @@
package postings

import (
"testing"

"github.com/prometheus/prometheus/tsdb/encoding"
"github.com/stretchr/testify/require"
)

func TestRoaringEncodeDecode(t *testing.T) {
e := &encoding.Encbuf{}

require.NoError(t, EncodePostingsRoaring(e, []uint32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}))

n, p, err := DecodePostingsRoaring(e.Get())
require.NoError(t, err)

require.Equal(t, 10, n)
require.Equal(t, true, p.Next())
}
2 changes: 1 addition & 1 deletion pkg/store/bucket_test.go
Expand Up @@ -1429,7 +1429,7 @@ func benchBucketSeries(t testutil.TB, sampleType chunkenc.ValueType, skipChunk,
// Histogram chunks are represented differently in memory and on disk. In order to
// have a precise comparison, we need to use the on-disk representation as the expected value
// instead of the in-memory one.
diskBlock, err := tsdb.OpenBlock(logger, blockIDDir, nil)
diskBlock, err := tsdb.OpenBlock(logger, blockIDDir, nil, nil)
testutil.Ok(t, err)
series = append(series, storetestutil.ReadSeriesFromBlock(t, diskBlock, extLset, skipChunk)...)

Expand Down
2 changes: 1 addition & 1 deletion pkg/testutil/e2eutil/prometheus.go
Expand Up @@ -668,7 +668,7 @@ func createBlock(
}

func gatherMaxSeriesSize(ctx context.Context, fn string) (int64, error) {
r, err := index.NewFileReader(fn)
r, err := index.NewFileReader(fn, nil)
if err != nil {
return 0, errors.Wrap(err, "open index file")
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/ui/static/react/static/js/main.09d05bbb.js

Large diffs are not rendered by default.