Skip to content

Commit

Permalink
compact: wire roaring bitmaps to compactor
Browse files Browse the repository at this point in the history
Wire roaring bitmaps to the compactor. Tests still not passing.

Signed-off-by: Giedrius Statkevičius <giedrius.statkevicius@vinted.com>
  • Loading branch information
GiedriusS committed Feb 19, 2024
1 parent f28680c commit 9a504fe
Show file tree
Hide file tree
Showing 17 changed files with 350 additions and 17 deletions.
23 changes: 22 additions & 1 deletion cmd/thanos/compact.go
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/prometheus/common/route"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/index"

"github.com/thanos-io/objstore"
"github.com/thanos-io/objstore/client"
Expand All @@ -42,6 +43,7 @@ import (
"github.com/thanos-io/thanos/pkg/extprom"
extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http"
"github.com/thanos-io/thanos/pkg/logging"
"github.com/thanos-io/thanos/pkg/postings"
"github.com/thanos-io/thanos/pkg/prober"
"github.com/thanos-io/thanos/pkg/runutil"
httpserver "github.com/thanos-io/thanos/pkg/server/http"
Expand Down Expand Up @@ -328,9 +330,26 @@ func runCompact(
return errors.Errorf("unsupported deduplication func, got %s", conf.dedupFunc)
}

var pe index.PostingsEncoder = index.EncodePostingsRaw
if conf.postingsFormat == postings.RoaringEncoder {
pe = postings.EncodePostingsRoaring
}

// Instantiate the compactor with different time slices. Timestamps in TSDB
// are in milliseconds.
comp, err := tsdb.NewLeveledCompactor(ctx, reg, logger, levels, downsample.NewPool(), mergeFunc)

comp, err := tsdb.NewLeveledCompactorWithOptions(ctx, reg, logger, levels, downsample.NewPool(), tsdb.LeveledCompactorOptions{
MergeFunc: mergeFunc,
PE: pe,
PD: func(meta *tsdb.BlockMeta) index.PostingsDecoder {
for _, h := range meta.Compaction.Hints {
if h == "roaring" {
return postings.DecodePostingsRoaring
}
}
return index.DecodePostingsRaw
},
})
if err != nil {
return errors.Wrap(err, "create compactor")
}
Expand Down Expand Up @@ -715,6 +734,7 @@ type compactConfig struct {
progressCalculateInterval time.Duration
filterConf *store.FilterConfig
disableAdminOperations bool
postingsFormat string
}

func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) {
Expand Down Expand Up @@ -767,6 +787,7 @@ func (cc *compactConfig) registerFlag(cmd extkingpin.FlagClause) {
cmd.Flag("compact.progress-interval", "Frequency of calculating the compaction progress in the background when --wait has been enabled. Setting it to \"0s\" disables it. Now compaction, downsampling and retention progress are supported.").
Default("5m").DurationVar(&cc.progressCalculateInterval)

cmd.Flag("compact.postings-format", "Format to use in resulting blocks.").Default(postings.RawEncoder).EnumVar(&cc.postingsFormat, postings.RoaringEncoder, postings.RawEncoder)
cmd.Flag("compact.concurrency", "Number of goroutines to use when compacting groups.").
Default("1").IntVar(&cc.compactionConcurrency)
cmd.Flag("compact.blocks-fetch-concurrency", "Number of goroutines to use when download block during compaction.").
Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/downsample.go
Expand Up @@ -375,7 +375,7 @@ func processDownsampling(
pool = downsample.NewPool()
}

b, err := tsdb.OpenBlock(logger, bdir, pool)
b, err := tsdb.OpenBlock(logger, bdir, pool, nil)
if err != nil {
return errors.Wrapf(err, "open block %s", m.ULID)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/tools_bucket.go
Expand Up @@ -1232,7 +1232,7 @@ func registerBucketRewrite(app extkingpin.AppClause, objStoreConfig *extflag.Pat
if err != nil {
return errors.Wrapf(err, "read meta of %v", id)
}
b, err := tsdb.OpenBlock(logger, filepath.Join(tbc.tmpDir, id.String()), chunkPool)
b, err := tsdb.OpenBlock(logger, filepath.Join(tbc.tmpDir, id.String()), chunkPool, nil)
if err != nil {
return errors.Wrapf(err, "open block %v", id)
}
Expand Down
5 changes: 5 additions & 0 deletions go.mod
Expand Up @@ -116,6 +116,7 @@ require (
)

require (
github.com/RoaringBitmap/roaring v1.9.0
github.com/mitchellh/go-ps v1.0.0
github.com/onsi/gomega v1.27.10
github.com/prometheus-community/prom-label-proxy v0.8.1-0.20240127162815-c1195f9aabc0
Expand All @@ -126,12 +127,14 @@ require (

require (
github.com/bboreham/go-loser v0.0.0-20230920113527-fcc2c21820a3 // indirect
github.com/bits-and-blooms/bitset v1.12.0 // indirect
github.com/go-openapi/runtime v0.27.1 // indirect
github.com/golang-jwt/jwt/v5 v5.2.0 // indirect
github.com/google/s2a-go v0.1.7 // indirect
github.com/hashicorp/go-version v1.6.0 // indirect
github.com/huaweicloud/huaweicloud-sdk-go-obs v3.23.3+incompatible // indirect
github.com/metalmatze/signal v0.0.0-20210307161603-1c9aa721a97a // indirect
github.com/mschoch/smat v0.2.0 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/onsi/ginkgo v1.16.5 // indirect
github.com/sercand/kuberesolver/v4 v4.0.0 // indirect
Expand Down Expand Up @@ -268,6 +271,8 @@ replace (
// Required by Cortex https://github.com/cortexproject/cortex/pull/3051.
github.com/bradfitz/gomemcache => github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab

github.com/prometheus/prometheus => github.com/yeya24/prometheus v1.8.2-0.20240212194858-9b851e143ae3

github.com/vimeo/galaxycache => github.com/thanos-community/galaxycache v0.0.0-20211122094458-3a32041a1f1e

// Override due to https://github.com/weaveworks/common/issues/239
Expand Down
10 changes: 8 additions & 2 deletions go.sum
Expand Up @@ -639,6 +639,8 @@ github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAE
github.com/OneOfOne/xxhash v1.2.6 h1:U68crOE3y3MPttCMQGywZOLrTeF5HHJ3/vDBCJn9/bA=
github.com/OneOfOne/xxhash v1.2.6/go.mod h1:eZbhyaAYD41SGSSsnmcpxVoRiQ/MPUTjUdIIOT9Um7Q=
github.com/QcloudApi/qcloud_sign_golang v0.0.0-20141224014652-e4130a326409/go.mod h1:1pk82RBxDY/JZnPQrtqHlUFfCctgdorsd9M06fMynOM=
github.com/RoaringBitmap/roaring v1.9.0 h1:lwKhr90/j0jVXJyh5X+vQN1VVn77rQFfYnh6RDRGCcE=
github.com/RoaringBitmap/roaring v1.9.0/go.mod h1:6AXUsoIEzDTFFQCe1RbGA6uFONMhvejWj5rqITANK90=
github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo=
github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI=
github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg=
Expand Down Expand Up @@ -723,6 +725,8 @@ github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+Ce
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
github.com/bits-and-blooms/bitset v1.12.0 h1:U/q1fAF7xXRhFCrhROzIfffYnu+dlS38vCZtmFVPHmA=
github.com/bits-and-blooms/bitset v1.12.0/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8=
github.com/blang/semver/v4 v4.0.0 h1:1PFHFE6yCCTv8C1TeyNNarDzntLi7wMI5i/pzqYIsAM=
github.com/blang/semver/v4 v4.0.0/go.mod h1:IbckMUScFkM3pff0VJDNKRiT6TG/YpiHIM2yvyW5YoQ=
github.com/boombuler/barcode v1.0.0/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8=
Expand Down Expand Up @@ -1302,6 +1306,8 @@ github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjY
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc=
github.com/mozillazg/go-httpheader v0.2.1 h1:geV7TrjbL8KXSyvghnFm+NyTux/hxwueTSrwhe88TQQ=
github.com/mozillazg/go-httpheader v0.2.1/go.mod h1:jJ8xECTlalr6ValeXYdOF8fFUISeBAdw6E61aqQma60=
github.com/mschoch/smat v0.2.0 h1:8imxQsjDm8yFEAVBe7azKmKSgzSkZXDuKkSq9374khM=
github.com/mschoch/smat v0.2.0/go.mod h1:kc9mz7DoBKqDyiRL7VZN8KvXQMWeTaVnttLRXOlotKw=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
Expand Down Expand Up @@ -1449,8 +1455,6 @@ github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1
github.com/prometheus/procfs v0.8.0/go.mod h1:z7EfXMXOkbkqb9IINtpCn86r/to3BnA0uaxHdg830/4=
github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo=
github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo=
github.com/prometheus/prometheus v0.49.2-0.20240126144015-960b6266e2ba h1:bDCs3jd+3KURFIDykicCeNfa573KYVZGhN4F62WHTmI=
github.com/prometheus/prometheus v0.49.2-0.20240126144015-960b6266e2ba/go.mod h1:FvE8dtQ1Ww63IlyKBn1V4s+zMwF9kHkVNkQBR1pM4CU=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/redis/rueidis v1.0.14-go1.18 h1:dGir5z8w8X1ex7JWO/Zx2FMBrZgQ8Yjm+lw9fPLSNGw=
github.com/redis/rueidis v1.0.14-go1.18/go.mod h1:HGekzV3HbmzFmRK6j0xic8Z9119+ECoGMjeN1TV1NYU=
Expand Down Expand Up @@ -1565,6 +1569,8 @@ github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT
github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4=
github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
github.com/yeya24/prometheus v1.8.2-0.20240212194858-9b851e143ae3 h1:0HLRlryLLRh4rxg0IR4cuLEGRMAO/KDKQWyObE57tiM=
github.com/yeya24/prometheus v1.8.2-0.20240212194858-9b851e143ae3/go.mod h1:FvE8dtQ1Ww63IlyKBn1V4s+zMwF9kHkVNkQBR1pM4CU=
github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7JulP+udvsHwJoVG1YGAP6VLg4y9I5dyZdqmA=
github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
Expand Down
23 changes: 19 additions & 4 deletions pkg/block/index.go
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/prometheus/prometheus/tsdb/index"

"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/postings"
"github.com/thanos-io/thanos/pkg/runutil"
)

Expand Down Expand Up @@ -214,16 +215,30 @@ func (n *minMaxSumInt64) Avg() int64 {
// It considers https://github.com/prometheus/tsdb/issues/347 as something that Thanos can handle.
// See HealthStats.Issue347OutsideChunks for details.
func GatherIndexHealthStats(ctx context.Context, logger log.Logger, fn string, minTime, maxTime int64) (stats HealthStats, err error) {
r, err := index.NewFileReader(fn)
r, err := index.NewFileReader(fn, postings.DecodePostingsRoaring)
if err != nil {
return stats, errors.Wrap(err, "open index file")
return stats, fmt.Errorf("open index: %w", err)
}

defer runutil.CloseWithErrCapture(&err, r, "gather index issue file reader")

key, value := index.AllPostingsKey()
p, err := r.Postings(ctx, key, value)
if err != nil {
return stats, errors.Wrap(err, "get all postings")
rd, err := index.NewFileReader(fn, nil)
if err != nil {
return stats, fmt.Errorf("open index: %w", err)
}

defer runutil.CloseWithErrCapture(&err, rd, "gather index issue file reader")

r = rd

ps, err := r.Postings(ctx, key, value)
if err != nil {
return stats, fmt.Errorf("postings: %w", err)
}
p = ps
}
var (
lset labels.Labels
Expand Down Expand Up @@ -427,7 +442,7 @@ func Repair(ctx context.Context, logger log.Logger, dir string, id ulid.ULID, so
return resid, errors.New("cannot repair downsampled block")
}

b, err := tsdb.OpenBlock(logger, bdir, nil)
b, err := tsdb.OpenBlock(logger, bdir, nil, nil)
if err != nil {
return resid, errors.Wrap(err, "open block")
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/block/index_test.go
Expand Up @@ -37,7 +37,7 @@ func TestRewrite(t *testing.T) {
}, 150, 0, 1000, labels.EmptyLabels(), 124, metadata.NoneFunc)
testutil.Ok(t, err)

ir, err := index.NewFileReader(filepath.Join(tmpDir, b.String(), IndexFilename))
ir, err := index.NewFileReader(filepath.Join(tmpDir, b.String(), IndexFilename), nil)
testutil.Ok(t, err)

defer func() { testutil.Ok(t, ir.Close()) }()
Expand Down Expand Up @@ -69,7 +69,7 @@ func TestRewrite(t *testing.T) {
testutil.Ok(t, iw.Close())
testutil.Ok(t, cw.Close())

ir2, err := index.NewFileReader(filepath.Join(tmpDir, m.ULID.String(), IndexFilename))
ir2, err := index.NewFileReader(filepath.Join(tmpDir, m.ULID.String(), IndexFilename), nil)
testutil.Ok(t, err)

defer func() { testutil.Ok(t, ir2.Close()) }()
Expand Down
4 changes: 2 additions & 2 deletions pkg/compact/downsample/downsample_test.go
Expand Up @@ -1159,7 +1159,7 @@ func TestDownsample(t *testing.T) {
_, err = metadata.ReadFromDir(filepath.Join(dir, id.String()))
testutil.Ok(t, err)

indexr, err := index.NewFileReader(filepath.Join(dir, id.String(), block.IndexFilename))
indexr, err := index.NewFileReader(filepath.Join(dir, id.String(), block.IndexFilename), nil)
testutil.Ok(t, err)
defer func() { testutil.Ok(t, indexr.Close()) }()

Expand Down Expand Up @@ -1288,7 +1288,7 @@ func TestDownsampleAggrAndNonEmptyXORChunks(t *testing.T) {
_, err = metadata.ReadFromDir(filepath.Join(dir, id.String()))
testutil.Ok(t, err)

indexr, err := index.NewFileReader(filepath.Join(dir, id.String(), block.IndexFilename))
indexr, err := index.NewFileReader(filepath.Join(dir, id.String(), block.IndexFilename), nil)
testutil.Ok(t, err)
defer func() { testutil.Ok(t, indexr.Close()) }()

Expand Down
4 changes: 2 additions & 2 deletions pkg/compactv2/compactor_test.go
Expand Up @@ -612,7 +612,7 @@ func TestCompactor_WriteSeries_e2e(t *testing.T) {
testutil.Ok(t, createBlockSeries(bdir, b))
// Meta does not matter, but let's create for OpenBlock to work.
testutil.Ok(t, metadata.Meta{BlockMeta: tsdb.BlockMeta{Version: 1, ULID: id}}.WriteToDir(logger, bdir))
block, err := tsdb.OpenBlock(logger, bdir, chunkPool)
block, err := tsdb.OpenBlock(logger, bdir, chunkPool, nil)
testutil.Ok(t, err)
blocks = append(blocks, block)
}
Expand Down Expand Up @@ -653,7 +653,7 @@ type seriesSamples struct {
func readBlockSeries(t *testing.T, bDir string) []seriesSamples {
ctx := context.Background()

indexr, err := index.NewFileReader(filepath.Join(bDir, block.IndexFilename))
indexr, err := index.NewFileReader(filepath.Join(bDir, block.IndexFilename), nil)
testutil.Ok(t, err)
defer indexr.Close()

Expand Down
78 changes: 78 additions & 0 deletions pkg/postings/roaringbitmap.go
@@ -0,0 +1,78 @@
package postings

import (
"bytes"
"fmt"

"github.com/RoaringBitmap/roaring"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/encoding"
"github.com/prometheus/prometheus/tsdb/index"
)

const (
RoaringEncoder = "roaringbitmap"
RawEncoder = "raw"
)

var _ index.PostingsEncoder = EncodePostingsRoaring

func EncodePostingsRoaring(e *encoding.Encbuf, in []uint32) error {
bm := roaring.NewBitmap()

bm.AddMany(in)
bm.RunOptimize()

out, err := bm.MarshalBinary()
if err != nil {
return fmt.Errorf("marshaling roaring bitmap: %w", err)
}

e.PutBytes(out)
return nil
}

var _ index.PostingsDecoder = DecodePostingsRoaring

func DecodePostingsRoaring(b []byte) (int, index.Postings, error) {
rb := roaring.New()

_, err := rb.ReadFrom(bytes.NewReader(b))
if err != nil {
return 0, nil, err
}

rbIter := rb.Iterator()

return int(rb.GetCardinality()), &postingsRoaringWrapper{IntPeekable: rbIter}, nil
}

type postingsRoaringWrapper struct {
roaring.IntPeekable

cur storage.SeriesRef
}

func (w *postingsRoaringWrapper) At() storage.SeriesRef {
return w.cur
}

func (w *postingsRoaringWrapper) Next() bool {
if !w.IntPeekable.HasNext() {
return false
}

w.cur = storage.SeriesRef(w.IntPeekable.Next())

return true
}

func (w *postingsRoaringWrapper) Err() error {
return nil
}

func (w *postingsRoaringWrapper) Seek(v storage.SeriesRef) bool {
w.IntPeekable.AdvanceIfNeeded(uint32(v))

return w.IntPeekable.HasNext()
}
20 changes: 20 additions & 0 deletions pkg/postings/roaringbitmap_test.go
@@ -0,0 +1,20 @@
package postings

import (
"testing"

"github.com/prometheus/prometheus/tsdb/encoding"
"github.com/stretchr/testify/require"
)

func TestRoaringEncodeDecode(t *testing.T) {
e := &encoding.Encbuf{}

require.NoError(t, EncodePostingsRoaring(e, []uint32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}))

n, p, err := DecodePostingsRoaring(e.Get())
require.NoError(t, err)

require.Equal(t, 10, n)
require.Equal(t, true, p.Next())
}
2 changes: 1 addition & 1 deletion pkg/store/bucket_test.go
Expand Up @@ -1429,7 +1429,7 @@ func benchBucketSeries(t testutil.TB, sampleType chunkenc.ValueType, skipChunk,
// Histogram chunks are represented differently in memory and on disk. In order to
// have a precise comparison, we need to use the on-disk representation as the expected value
// instead of the in-memory one.
diskBlock, err := tsdb.OpenBlock(logger, blockIDDir, nil)
diskBlock, err := tsdb.OpenBlock(logger, blockIDDir, nil, nil)
testutil.Ok(t, err)
series = append(series, storetestutil.ReadSeriesFromBlock(t, diskBlock, extLset, skipChunk)...)

Expand Down
2 changes: 1 addition & 1 deletion pkg/testutil/e2eutil/prometheus.go
Expand Up @@ -668,7 +668,7 @@ func createBlock(
}

func gatherMaxSeriesSize(ctx context.Context, fn string) (int64, error) {
r, err := index.NewFileReader(fn)
r, err := index.NewFileReader(fn, nil)
if err != nil {
return 0, errors.Wrap(err, "open index file")
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/ui/static/react/static/js/main.09d05bbb.js

Large diffs are not rendered by default.

0 comments on commit 9a504fe

Please sign in to comment.