Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extended index reader #13983

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
44 changes: 42 additions & 2 deletions tsdb/block.go
Expand Up @@ -107,6 +107,11 @@ type IndexReader interface {
Close() error
}

type ExtendedIndexReader interface {
IndexReader
PostingsForMatchers(ctx context.Context, ms ...*labels.Matcher) (index.Postings, error)
}

// ChunkWriter serializes a time block of chunked series data.
type ChunkWriter interface {
// WriteChunks writes several chunks. The Chunk field of the ChunkMetas
Expand Down Expand Up @@ -323,12 +328,28 @@ type Block struct {
numBytesMeta int64
}

type IndexReaderWrapFunc func(reader *index.Reader) IndexReader

type OpenBlockOptions struct {
IndexReaderWrapFunc IndexReaderWrapFunc
}

// OpenBlock opens the block in the directory. It can be passed a chunk pool, which is used
// to instantiate chunk structs.
func OpenBlock(logger log.Logger, dir string, pool chunkenc.Pool) (pb *Block, err error) {
return OpenBlockWithOptions(logger, dir, pool, OpenBlockOptions{})
}

func OpenBlockWithOptions(logger log.Logger, dir string, pool chunkenc.Pool, ops OpenBlockOptions) (pb *Block, err error) {
if logger == nil {
logger = log.NewNopLogger()
}
if ops.IndexReaderWrapFunc == nil {
ops.IndexReaderWrapFunc = func(reader *index.Reader) IndexReader {
return reader
}
}

var closers []io.Closer
defer func() {
if err != nil {
Expand Down Expand Up @@ -362,7 +383,7 @@ func OpenBlock(logger log.Logger, dir string, pool chunkenc.Pool) (pb *Block, er
dir: dir,
meta: *meta,
chunkr: cr,
indexr: ir,
indexr: ops.IndexReaderWrapFunc(ir),
tombstones: tr,
symbolTableSize: ir.SymbolTableSize(),
logger: logger,
Expand Down Expand Up @@ -429,7 +450,13 @@ func (pb *Block) Index() (IndexReader, error) {
if err := pb.startRead(); err != nil {
return nil, err
}
return blockIndexReader{ir: pb.indexr, b: pb}, nil

indexReader := blockIndexReader{ir: pb.indexr, b: pb}

if _, ok := pb.indexr.(ExtendedIndexReader); ok {
return extendedBlockIndexReader{blockIndexReader: indexReader}, nil
}
return indexReader, nil
}

// Chunks returns a new ChunkReader against the block data.
Expand Down Expand Up @@ -549,6 +576,19 @@ func (r blockIndexReader) LabelNamesFor(ctx context.Context, ids ...storage.Seri
return r.ir.LabelNamesFor(ctx, ids...)
}

type extendedBlockIndexReader struct {
blockIndexReader
}

func (r extendedBlockIndexReader) PostingsForMatchers(ctx context.Context, ms ...*labels.Matcher) (index.Postings, error) {
extendedReader, ok := r.ir.(ExtendedIndexReader)
if !ok {
return nil, fmt.Errorf("missing methods for ExtendedIndexReader")
}

return extendedReader.PostingsForMatchers(ctx, ms...)
}

type blockTombstoneReader struct {
tombstones.Reader
b *Block
Expand Down
22 changes: 22 additions & 0 deletions tsdb/block_test.go
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/fileutil"
"github.com/prometheus/prometheus/tsdb/index"
"github.com/prometheus/prometheus/tsdb/wlog"
)

Expand Down Expand Up @@ -89,6 +90,27 @@ func BenchmarkOpenBlock(b *testing.B) {
})
}

func TestOpenBlockWithOptions(t *testing.T) {
tmpdir := t.TempDir()
blockDir := createBlock(t, tmpdir, genSeries(10, 1, 1, 100))
wrapFuncCalled := false

block, err := OpenBlockWithOptions(nil, blockDir, nil, OpenBlockOptions{
IndexReaderWrapFunc: func(reader *index.Reader) IndexReader {
wrapFuncCalled = true
return reader
},
})

require.NoError(t, err)
defer func() { require.NoError(t, block.Close()) }()

indexReader, err := block.Index()
require.NoError(t, err)
require.True(t, wrapFuncCalled)
defer func() { require.NoError(t, indexReader.Close()) }()
}

func TestCorruptedChunk(t *testing.T) {
for _, tc := range []struct {
name string
Expand Down
34 changes: 20 additions & 14 deletions tsdb/db.go
Expand Up @@ -189,6 +189,9 @@ type Options struct {

// EnableSharding enables query sharding support in TSDB.
EnableSharding bool

// OpenBlockOptions includes IndexReaderWrapFunc to return IndexReader with extended functions.
OpenBlockOptions OpenBlockOptions
}

type BlocksToDeleteFunc func(blocks []*Block) map[ulid.ULID]struct{}
Expand Down Expand Up @@ -239,6 +242,8 @@ type DB struct {
writeNotified wlog.WriteNotified

registerer prometheus.Registerer

openBlockOptions OpenBlockOptions
}

type dbMetrics struct {
Expand Down Expand Up @@ -571,7 +576,7 @@ func (db *DBReadOnly) Blocks() ([]BlockReader, error) {
return nil, ErrClosed
default:
}
loadable, corrupted, err := openBlocks(db.logger, db.dir, nil, nil)
loadable, corrupted, err := openBlocks(db.logger, OpenBlockOptions{}, db.dir, nil, nil)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -787,16 +792,17 @@ func open(dir string, l log.Logger, r prometheus.Registerer, opts *Options, rngs
}

db := &DB{
dir: dir,
logger: l,
opts: opts,
compactc: make(chan struct{}, 1),
donec: make(chan struct{}),
stopc: make(chan struct{}),
autoCompact: true,
chunkPool: chunkenc.NewPool(),
blocksToDelete: opts.BlocksToDelete,
registerer: r,
dir: dir,
logger: l,
opts: opts,
compactc: make(chan struct{}, 1),
donec: make(chan struct{}),
stopc: make(chan struct{}),
autoCompact: true,
chunkPool: chunkenc.NewPool(),
blocksToDelete: opts.BlocksToDelete,
registerer: r,
openBlockOptions: opts.OpenBlockOptions,
}
defer func() {
// Close files if startup fails somewhere.
Expand Down Expand Up @@ -1437,7 +1443,7 @@ func (db *DB) reloadBlocks() (err error) {
db.mtx.Lock()
defer db.mtx.Unlock()

loadable, corrupted, err := openBlocks(db.logger, db.dir, db.blocks, db.chunkPool)
loadable, corrupted, err := openBlocks(db.logger, db.openBlockOptions, db.dir, db.blocks, db.chunkPool)
if err != nil {
return err
}
Expand Down Expand Up @@ -1529,7 +1535,7 @@ func (db *DB) reloadBlocks() (err error) {
return nil
}

func openBlocks(l log.Logger, dir string, loaded []*Block, chunkPool chunkenc.Pool) (blocks []*Block, corrupted map[ulid.ULID]error, err error) {
func openBlocks(l log.Logger, openBlockOptions OpenBlockOptions, dir string, loaded []*Block, chunkPool chunkenc.Pool) (blocks []*Block, corrupted map[ulid.ULID]error, err error) {
bDirs, err := blockDirs(dir)
if err != nil {
return nil, nil, fmt.Errorf("find blocks: %w", err)
Expand All @@ -1546,7 +1552,7 @@ func openBlocks(l log.Logger, dir string, loaded []*Block, chunkPool chunkenc.Po
// See if we already have the block in memory or open it otherwise.
block, open := getBlock(loaded, meta.ULID)
if !open {
block, err = OpenBlock(l, bDir, chunkPool)
block, err = OpenBlockWithOptions(l, bDir, chunkPool, openBlockOptions)
if err != nil {
corrupted[meta.ULID] = err
continue
Expand Down
21 changes: 19 additions & 2 deletions tsdb/querier.go
Expand Up @@ -117,7 +117,15 @@ func (q *blockQuerier) Select(ctx context.Context, sortSeries bool, hints *stora
disableTrimming := false
sharded := hints != nil && hints.ShardCount > 0

p, err := PostingsForMatchers(ctx, q.index, ms...)
var p index.Postings
var err error

if i, ok := q.index.(ExtendedIndexReader); ok {
p, err = i.PostingsForMatchers(ctx, ms...)
} else {
p, err = PostingsForMatchers(ctx, q.index, ms...)
}

if err != nil {
return storage.ErrSeriesSet(err)
}
Expand Down Expand Up @@ -166,7 +174,16 @@ func (q *blockChunkQuerier) Select(ctx context.Context, sortSeries bool, hints *
maxt = hints.End
disableTrimming = hints.DisableTrimming
}
p, err := PostingsForMatchers(ctx, q.index, ms...)

var p index.Postings
var err error

if i, ok := q.index.(ExtendedIndexReader); ok {
p, err = i.PostingsForMatchers(ctx, ms...)
} else {
p, err = PostingsForMatchers(ctx, q.index, ms...)
}

if err != nil {
return storage.ErrChunkSeriesSet(err)
}
Expand Down
52 changes: 52 additions & 0 deletions tsdb/querier_test.go
Expand Up @@ -683,6 +683,42 @@ func TestBlockQuerierDelete(t *testing.T) {
}
}

func TestBlockQuerierWithExtendedIndexReader(t *testing.T) {
err := fmt.Errorf("test error")

bq := blockQuerier{
blockBaseQuerier: &blockBaseQuerier{
index: newMockExtendedIndex(err),
chunks: mockChunkReader(make(map[chunks.ChunkRef]chunkenc.Chunk)),
tombstones: tombstones.NewMemTombstones(),

mint: 1,
maxt: 10,
},
}

ss := bq.Select(context.Background(), false, nil, nil)
defer func() { require.NoError(t, bq.Close()) }()

require.Equal(t, err, ss.Err())

cq := blockChunkQuerier{
blockBaseQuerier: &blockBaseQuerier{
index: newMockExtendedIndex(err),
chunks: mockChunkReader(make(map[chunks.ChunkRef]chunkenc.Chunk)),
tombstones: tombstones.NewMemTombstones(),

mint: 1,
maxt: 10,
},
}

cs := cq.Select(context.Background(), false, nil, nil)
defer func() { require.NoError(t, cq.Close()) }()

require.Equal(t, err, cs.Err())
}

type fakeChunksReader struct {
ChunkReader
chks map[chunks.ChunkRef]chunkenc.Chunk
Expand Down Expand Up @@ -2388,6 +2424,22 @@ func (m mockIndex) LabelNames(_ context.Context, matchers ...*labels.Matcher) ([
return l, nil
}

type mockExtendedIndex struct {
mockIndex
err error
}

func newMockExtendedIndex(err error) mockExtendedIndex {
return mockExtendedIndex{
mockIndex: newMockIndex(),
err: err,
}
}

func (m mockExtendedIndex) PostingsForMatchers(ctx context.Context, ms ...*labels.Matcher) (index.Postings, error) {
return nil, m.err
}

func BenchmarkQueryIterator(b *testing.B) {
cases := []struct {
numBlocks int
Expand Down