Skip to content
This repository has been archived by the owner on Aug 13, 2019. It is now read-only.

Attempt to reduce memory footprint of compaction #627

Closed
wants to merge 49 commits into from
Closed
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
9811a14
Returning series ref in ChunkSeriesSet.At()
codesome Jun 11, 2019
12a10a0
newCompactionMerger takes []ChunkSeriesSet. Calculating (old series i…
codesome Jun 11, 2019
b7839c4
User (old series id -> new series id) map to write new postings
codesome Jun 11, 2019
5d3bf62
Get rid of postingMap
codesome Jun 12, 2019
76b67fd
NumSeries() for BlockReader. Pre-allocate memory for seriesMap maps.
codesome Jun 12, 2019
64e4487
Set a higher initial value for postingBuf
codesome Jun 14, 2019
3a4a6e6
Efficiently iterate over all sorted label-values
codesome Jun 14, 2019
0a0dbfe
Hint number of postings writes to the index Writer to pre-allocate me…
codesome Jun 14, 2019
c1c39ed
Pre-allocate slice for label values in populateBlock
codesome Jun 14, 2019
3593059
Re-use bigEndianPostings in populateBlock
codesome Jun 17, 2019
e78ec69
Reuse string buffer when swapping stringTuples
codesome Jun 18, 2019
f344fa2
Reuse byte slice for writing chunk meta hash
codesome Jun 18, 2019
9fab207
Fix race in writeHash
codesome Jun 18, 2019
0ff98c0
Fix lint errors
codesome Jun 18, 2019
407fe55
Reuse slice in ReadOffsetTable. Reduces allocs for OpenBlock.
codesome Jun 18, 2019
39b97e5
Reuse ListPostings in populateBlock.
codesome Jun 18, 2019
4ff0ed8
Check for error before wrapping in blockIndexReader.Series
codesome Jun 18, 2019
387705d
Check for error before wrapping in blockIndexReader.Postings
codesome Jun 18, 2019
cff36d4
Use already allocated byte buffer in writeHash
codesome Jun 18, 2019
f9dfe07
WriteChunks takes a byte buffer to reuse
codesome Jun 18, 2019
e812fba
Breakdown generic writeOffsetTable into writeLabelIndexesOffsetTable …
codesome Jun 19, 2019
b421a4b
Fix review comments
codesome Jun 19, 2019
53ccf99
Reset() for Postings interface
codesome Jun 20, 2019
eb79899
Merge remote-tracking branch 'upstream/master' into compact-opt
codesome Jun 20, 2019
c7af8aa
Fix review comments
codesome Jun 22, 2019
b2b2647
Remove Reset(...) from Postings interface
codesome Jun 28, 2019
b007998
More code comments and fix review comments
codesome Jun 28, 2019
0068b3c
Revert changes to writeOffsetTable
codesome Jul 2, 2019
606fcf1
Revert checking error before wrapping
codesome Jul 2, 2019
0898907
Revert re-use of 'keys' in ReadOffsetTable
codesome Jul 2, 2019
caa715e
Revert changes made to stringTuples
codesome Jul 3, 2019
1d54273
Revert passing byte buffer to WriteChunks and writeHash
codesome Jul 3, 2019
dbec7eb
Merge remote-tracking branch 'upstream/master' into compact-opt
codesome Jul 5, 2019
85bdf35
Merge remote-tracking branch 'upstream/master' into compact-opt
codesome Jul 10, 2019
89d459e
hashEntry -> postingsHashEntry after merging upstream
codesome Jul 10, 2019
f8ddc03
writePostings perf improvements
codesome Jul 10, 2019
1582ebc
Fix review comments
codesome Jul 12, 2019
5993889
Add RemappedPostings
codesome Jul 12, 2019
4293e37
Merge remote-tracking branch 'upstream/master' into compact-opt
codesome Jul 12, 2019
e0693dc
Fix review comments
codesome Jul 15, 2019
048790b
Reset ref in compactionMerger when there are 0 chunks
codesome Jul 16, 2019
605f5b7
compactionMerger doesn't return series with 0 chunks
codesome Jul 23, 2019
40ad33c
Merge remote-tracking branch 'upstream/master' into compact-opt
codesome Jul 23, 2019
b72273f
Fix out-of-order series bug
codesome Jul 24, 2019
ffe544e
Merge remote-tracking branch 'upstream/master' into compact-opt
codesome Jul 24, 2019
570d5e1
Bug fix in compactionMerger.Next()
codesome Jul 24, 2019
4ed0cac
Don't put deleted series in seriesMap
codesome Jul 25, 2019
c252f96
Better allocation strategy for remapped postings
codesome Jul 28, 2019
a75fa32
Faster way to merge remapped postings
codesome Aug 1, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
39 changes: 29 additions & 10 deletions block.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@ type IndexReader interface {
// The Postings here contain the offsets to the series inside the index.
// Found IDs are not strictly required to point to a valid Series, e.g. during
// background garbage collections.
Postings(name, value string) (index.Postings, error)
// 'reusePos' is the Postings object that can be re-used instead of allocating
// a new object. Re-use of this object need not be guaranteed.
Postings(name, value string, reusePos index.Postings) (index.Postings, error)
codesome marked this conversation as resolved.
Show resolved Hide resolved

// SortedPostings returns a postings list that is reordered to be sorted
// by the label set of the underlying series.
Expand Down Expand Up @@ -109,9 +111,10 @@ type StringTuples interface {
type ChunkWriter interface {
// WriteChunks writes several chunks. The Chunk field of the ChunkMetas
// must be populated.
// b is the byte buffer that can be used by WriteChunks.
// After returning successfully, the Ref fields in the ChunkMetas
// are set and can be used to retrieve the chunks from the written data.
WriteChunks(chunks ...chunks.Meta) error
WriteChunks(b []byte, chunks ...chunks.Meta) ([]byte, error)
codesome marked this conversation as resolved.
Show resolved Hide resolved

// Close writes any required finalization and closes the resources
// associated with the underlying writer.
Expand Down Expand Up @@ -143,6 +146,9 @@ type BlockReader interface {

// MaxTime returns the max time of the block.
MaxTime() int64

// NumSeries returns the total number of series in the block.
NumSeries() uint64
}

// Appendable defines an entity to which data can be appended.
Expand Down Expand Up @@ -455,21 +461,29 @@ func (r blockIndexReader) LabelValues(names ...string) (index.StringTuples, erro
return st, errors.Wrapf(err, "block: %s", r.b.Meta().ULID)
}

func (r blockIndexReader) Postings(name, value string) (index.Postings, error) {
p, err := r.ir.Postings(name, value)
return p, errors.Wrapf(err, "block: %s", r.b.Meta().ULID)
func (r blockIndexReader) Postings(name, value string, reusePos index.Postings) (index.Postings, error) {
p, err := r.ir.Postings(name, value, reusePos)
if err != nil {
// Checking for error before wrapping saves some allocs.
codesome marked this conversation as resolved.
Show resolved Hide resolved
// This results in big savings of allocs if this function
// is called a lot of time, example during compaction.
return p, errors.Wrapf(err, "block: %s", r.b.Meta().ULID)
}
return p, nil
}

func (r blockIndexReader) SortedPostings(p index.Postings) index.Postings {
return r.ir.SortedPostings(p)
}

func (r blockIndexReader) Series(ref uint64, lset *labels.Labels, chks *[]chunks.Meta) error {
return errors.Wrapf(
r.ir.Series(ref, lset, chks),
"block: %s",
r.b.Meta().ULID,
)
if err := r.ir.Series(ref, lset, chks); err != nil {
// Checking for error before wrapping saves some allocs.
// This results in big savings of allocs if this function
// is called a lot of time, example during compaction.
return errors.Wrapf(err, "block: %s", r.b.Meta().ULID)
}
return nil
}

func (r blockIndexReader) LabelIndices() ([][]string, error) {
Expand Down Expand Up @@ -643,6 +657,11 @@ func (pb *Block) LabelNames() ([]string, error) {
return pb.indexr.LabelNames()
}

// NumSeries returns number of series in the block.
func (pb *Block) NumSeries() uint64 {
return pb.meta.Stats.NumSeries
}

func clampInterval(a, b, mint, maxt int64) (int64, int64) {
if a < mint {
a = mint
Expand Down
31 changes: 16 additions & 15 deletions chunks/chunks.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,9 @@ type Meta struct {
}

// writeHash writes the chunk encoding and raw data into the provided hash.
func (cm *Meta) writeHash(h hash.Hash) error {
if _, err := h.Write([]byte{byte(cm.Chunk.Encoding())}); err != nil {
func (cm *Meta) writeHash(h hash.Hash, buf []byte) error {
codesome marked this conversation as resolved.
Show resolved Hide resolved
buf = append(buf, byte(cm.Chunk.Encoding()))
if _, err := h.Write(buf[:1]); err != nil {
return err
}
if _, err := h.Write(cm.Chunk.Bytes()); err != nil {
Expand Down Expand Up @@ -280,7 +281,7 @@ func MergeChunks(a, b chunkenc.Chunk) (*chunkenc.XORChunk, error) {
return newChunk, nil
}

func (w *Writer) WriteChunks(chks ...Meta) error {
func (w *Writer) WriteChunks(b []byte, chks ...Meta) ([]byte, error) {
// Calculate maximum space we need and cut a new segment in case
// we don't fit into the current one.
maxLen := int64(binary.MaxVarintLen32) // The number of chunks.
Expand All @@ -293,14 +294,14 @@ func (w *Writer) WriteChunks(chks ...Meta) error {

if w.wbuf == nil || w.n > w.segmentSize || newsz > w.segmentSize && maxLen <= w.segmentSize {
if err := w.cut(); err != nil {
return err
return b, err
}
}

var (
b = [binary.MaxVarintLen32]byte{}
seq = uint64(w.seq()) << 32
)
var seq = uint64(w.seq()) << 32
codesome marked this conversation as resolved.
Show resolved Hide resolved
for len(b) < binary.MaxVarintLen32 {
codesome marked this conversation as resolved.
Show resolved Hide resolved
b = append(b, 0)
}
for i := range chks {
chk := &chks[i]

Expand All @@ -309,26 +310,26 @@ func (w *Writer) WriteChunks(chks ...Meta) error {
n := binary.PutUvarint(b[:], uint64(len(chk.Chunk.Bytes())))

if err := w.write(b[:n]); err != nil {
return err
return b, err
}
b[0] = byte(chk.Chunk.Encoding())
if err := w.write(b[:1]); err != nil {
return err
return b, err
}
if err := w.write(chk.Chunk.Bytes()); err != nil {
return err
return b, err
}

w.crc32.Reset()
if err := chk.writeHash(w.crc32); err != nil {
return err
if err := chk.writeHash(w.crc32, b[:0]); err != nil {
return b, err
}
if err := w.write(w.crc32.Sum(b[:0])); err != nil {
return err
return b, err
}
}

return nil
return b, nil
}

func (w *Writer) seq() int {
Expand Down
6 changes: 4 additions & 2 deletions cmd/tsdb/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/tsdb"
"github.com/prometheus/tsdb/chunks"
"github.com/prometheus/tsdb/index"
"github.com/prometheus/tsdb/labels"
"gopkg.in/alecthomas/kingpin.v2"
)
Expand Down Expand Up @@ -456,7 +457,8 @@ func analyzeBlock(b *tsdb.Block, limit int) {
labelpairsUncovered := map[string]uint64{}
labelpairsCount := map[string]uint64{}
entries := 0
p, err := ir.Postings("", "") // The special all key.
apkName, apkValue := index.AllPostingsKey()
p, err := ir.Postings(apkName, apkValue, nil) // The special all key.
if err != nil {
exitWithError(err)
}
Expand Down Expand Up @@ -528,7 +530,7 @@ func analyzeBlock(b *tsdb.Block, limit int) {
exitWithError(err)
}
for _, n := range names {
postings, err := ir.Postings("__name__", n)
postings, err := ir.Postings("__name__", n, nil)
if err != nil {
exitWithError(err)
}
Expand Down