Skip to content
This repository has been archived by the owner on Aug 13, 2019. It is now read-only.

Attempt to reduce memory footprint of compaction #627

Closed
wants to merge 49 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
9811a14
Returning series ref in ChunkSeriesSet.At()
codesome Jun 11, 2019
12a10a0
newCompactionMerger takes []ChunkSeriesSet. Calculating (old series i…
codesome Jun 11, 2019
b7839c4
User (old series id -> new series id) map to write new postings
codesome Jun 11, 2019
5d3bf62
Get rid of postingMap
codesome Jun 12, 2019
76b67fd
NumSeries() for BlockReader. Pre-allocate memory for seriesMap maps.
codesome Jun 12, 2019
64e4487
Set a higher initial value for postingBuf
codesome Jun 14, 2019
3a4a6e6
Efficiently iterate over all sorted label-values
codesome Jun 14, 2019
0a0dbfe
Hint number of postings writes to the index Writer to pre-allocate me…
codesome Jun 14, 2019
c1c39ed
Pre-allocate slice for label values in populateBlock
codesome Jun 14, 2019
3593059
Re-use bigEndianPostings in populateBlock
codesome Jun 17, 2019
e78ec69
Reuse string buffer when swapping stringTuples
codesome Jun 18, 2019
f344fa2
Reuse byte slice for writing chunk meta hash
codesome Jun 18, 2019
9fab207
Fix race in writeHash
codesome Jun 18, 2019
0ff98c0
Fix lint errors
codesome Jun 18, 2019
407fe55
Reuse slice in ReadOffsetTable. Reduces allocs for OpenBlock.
codesome Jun 18, 2019
39b97e5
Reuse ListPostings in populateBlock.
codesome Jun 18, 2019
4ff0ed8
Check for error before wrapping in blockIndexReader.Series
codesome Jun 18, 2019
387705d
Check for error before wrapping in blockIndexReader.Postings
codesome Jun 18, 2019
cff36d4
Use already allocated byte buffer in writeHash
codesome Jun 18, 2019
f9dfe07
WriteChunks takes a byte buffer to reuse
codesome Jun 18, 2019
e812fba
Breakdown generic writeOffsetTable into writeLabelIndexesOffsetTable …
codesome Jun 19, 2019
b421a4b
Fix review comments
codesome Jun 19, 2019
53ccf99
Reset() for Postings interface
codesome Jun 20, 2019
eb79899
Merge remote-tracking branch 'upstream/master' into compact-opt
codesome Jun 20, 2019
c7af8aa
Fix review comments
codesome Jun 22, 2019
b2b2647
Remove Reset(...) from Postings interface
codesome Jun 28, 2019
b007998
More code comments and fix review comments
codesome Jun 28, 2019
0068b3c
Revert changes to writeOffsetTable
codesome Jul 2, 2019
606fcf1
Revert checking error before wrapping
codesome Jul 2, 2019
0898907
Revert re-use of 'keys' in ReadOffsetTable
codesome Jul 2, 2019
caa715e
Revert changes made to stringTuples
codesome Jul 3, 2019
1d54273
Revert passing byte buffer to WriteChunks and writeHash
codesome Jul 3, 2019
dbec7eb
Merge remote-tracking branch 'upstream/master' into compact-opt
codesome Jul 5, 2019
85bdf35
Merge remote-tracking branch 'upstream/master' into compact-opt
codesome Jul 10, 2019
89d459e
hashEntry -> postingsHashEntry after merging upstream
codesome Jul 10, 2019
f8ddc03
writePostings perf improvements
codesome Jul 10, 2019
1582ebc
Fix review comments
codesome Jul 12, 2019
5993889
Add RemappedPostings
codesome Jul 12, 2019
4293e37
Merge remote-tracking branch 'upstream/master' into compact-opt
codesome Jul 12, 2019
e0693dc
Fix review comments
codesome Jul 15, 2019
048790b
Reset ref in compactionMerger when there are 0 chunks
codesome Jul 16, 2019
605f5b7
compactionMerger doesn't return series with 0 chunks
codesome Jul 23, 2019
40ad33c
Merge remote-tracking branch 'upstream/master' into compact-opt
codesome Jul 23, 2019
b72273f
Fix out-of-order series bug
codesome Jul 24, 2019
ffe544e
Merge remote-tracking branch 'upstream/master' into compact-opt
codesome Jul 24, 2019
570d5e1
Bug fix in compactionMerger.Next()
codesome Jul 24, 2019
4ed0cac
Don't put deleted series in seriesMap
codesome Jul 25, 2019
c252f96
Better allocation strategy for remapped postings
codesome Jul 28, 2019
a75fa32
Faster way to merge remapped postings
codesome Aug 1, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
217 changes: 133 additions & 84 deletions compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"os"
"path/filepath"
"sort"
"strings"
"time"

"github.com/go-kit/kit/log"
Expand Down Expand Up @@ -646,10 +647,11 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
}

var (
set ChunkSeriesSet
allSymbols = make(map[string]struct{}, 1<<16)
closers = []io.Closer{}
overlapping bool
sets = make([]ChunkSeriesSet, 0, len(blocks))
allSymbols = make(map[string]struct{}, 1<<16)
closers = []io.Closer{}
indexReaders = make([]IndexReader, 0, len(blocks))
overlapping bool
)
defer func() {
var merr tsdb_errors.MultiError
Expand Down Expand Up @@ -684,6 +686,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
return errors.Wrapf(err, "open index reader for block %s", b)
}
closers = append(closers, indexr)
indexReaders = append(indexReaders, indexr)

chunkr, err := b.Chunks()
if err != nil {
Expand Down Expand Up @@ -711,24 +714,15 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
}
all = indexr.SortedPostings(all)

s := newCompactionSeriesSet(indexr, chunkr, tombsr, all)
sets = append(sets, newCompactionSeriesSet(indexr, chunkr, tombsr, all))
}

if i == 0 {
set = s
continue
}
set, err = newCompactionMerger(set, s)
if err != nil {
return err
}
set, err := newCompactionMerger(sets...)
if err != nil {
return err
}

// We fully rebuild the postings list index from merged series.
var (
postings = index.NewMemPostings()
values = map[string]stringset{}
i = uint64(0)
)
var values = map[string]stringset{}

if err := indexw.AddSymbols(allSymbols); err != nil {
return errors.Wrap(err, "add symbols")
Expand All @@ -741,7 +735,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
default:
}

lset, chks, dranges := set.At() // The chunks here are not fully deleted.
ref, lset, chks, dranges := set.At() // The chunks here are not fully deleted.
if overlapping {
// If blocks are overlapping, it is possible to have unsorted chunks.
sort.Slice(chks, func(i, j int) bool {
Expand Down Expand Up @@ -792,7 +786,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
return errors.Wrap(err, "write chunks")
}

if err := indexw.AddSeries(i, lset, mergedChks...); err != nil {
if err := indexw.AddSeries(ref, lset, mergedChks...); err != nil {
return errors.Wrap(err, "add series")
}

Expand All @@ -816,31 +810,70 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
}
valset.set(l.Value)
}
postings.Add(i, lset)

i++
}
if set.Err() != nil {
return errors.Wrap(set.Err(), "iterate compaction set")
}

if meta.Stats.NumSamples == 0 {
// No postings to write, exit early.
return nil
}

s := make([]string, 0, 256)

var numLabelValues int
for _, v := range values {
codesome marked this conversation as resolved.
Show resolved Hide resolved
numLabelValues += len(v)
}

keys := make([]labels.Label, 0, numLabelValues+1)
apkName, apkValue := index.AllPostingsKey()
keys = append(keys, labels.Label{Name: apkName, Value: apkValue})
for n, v := range values {
s = s[:0]

for x := range v {
s = append(s, x)
keys = append(keys, labels.Label{Name: n, Value: x})
}
if err := indexw.WriteLabelIndex([]string{n}, s); err != nil {
return errors.Wrap(err, "write label index")
}
}

for _, l := range postings.SortedKeys() {
if err := indexw.WritePostings(l.Name, l.Value, postings.Get(l.Name, l.Value)); err != nil {
sort.Slice(keys, func(i, j int) bool {
if d := strings.Compare(keys[i].Name, keys[j].Name); d != 0 {
return d < 0
}
return keys[i].Value < keys[j].Value
})

// TODO: Decide initial size.
postingBuf := make([]uint64, 0, 1<<16)
seriesMap := set.SeriesMap()
codesome marked this conversation as resolved.
Show resolved Hide resolved
for _, k := range keys {
postingMap := make(map[uint64]struct{}) // TODO: can the map be reused?
for i, ir := range indexReaders {
p, err := ir.Postings(k.Name, k.Value)
if err != nil {
return errors.Wrap(err, "read postings")
}
for p.Next() {
if newVal, ok := seriesMap[i][p.At()]; ok {
postingMap[newVal] = struct{}{}
}
}
}
postingBuf = postingBuf[:0]
for p := range postingMap {
postingBuf = append(postingBuf, p)
}
if err := indexw.WritePostings(k.Name, k.Value, index.NewListPostings(postingBuf)); err != nil {
return errors.Wrap(err, "write postings")
}
}

return nil
}

Expand Down Expand Up @@ -914,93 +947,109 @@ func (c *compactionSeriesSet) Err() error {
return c.p.Err()
}

func (c *compactionSeriesSet) At() (labels.Labels, []chunks.Meta, Intervals) {
return c.l, c.c, c.intervals
func (c *compactionSeriesSet) At() (uint64, labels.Labels, []chunks.Meta, Intervals) {
return c.p.At(), c.l, c.c, c.intervals
}

type compactionMerger struct {
a, b ChunkSeriesSet
sets []ChunkSeriesSet
first bool
oks []bool
// TODO(enhancement): If we can know number of series in each block, we can
codesome marked this conversation as resolved.
Show resolved Hide resolved
// replace `map[uint64]uint64` with a slice.
seriesMap []map[uint64]uint64

aok, bok bool
l labels.Labels
c []chunks.Meta
intervals Intervals
ref uint64 // This is 1 based ref. Should return ref-1 for 0 based ref.
}

func newCompactionMerger(a, b ChunkSeriesSet) (*compactionMerger, error) {
c := &compactionMerger{
a: a,
b: b,
func newCompactionMerger(sets ...ChunkSeriesSet) (*compactionMerger, error) {
seriesMap := make([]map[uint64]uint64, len(sets))
for i := range seriesMap {
seriesMap[i] = make(map[uint64]uint64)
}
// Initialize first elements of both sets as Next() needs
// one element look-ahead.
c.aok = c.a.Next()
c.bok = c.b.Next()

return c, c.Err()
}

func (c *compactionMerger) compare() int {
if !c.aok {
return 1
}
if !c.bok {
return -1
c := &compactionMerger{
sets: sets,
oks: make([]bool, len(sets)),
seriesMap: seriesMap,
first: true,
}
a, _, _ := c.a.At()
b, _, _ := c.b.At()
return labels.Compare(a, b)
return c, nil
}

func (c *compactionMerger) Next() bool {
if !c.aok && !c.bok || c.Err() != nil {
if c.first {
codesome marked this conversation as resolved.
Show resolved Hide resolved
for i, s := range c.sets {
c.oks[i] = s.Next()
}
c.first = false
}
if c.Err() != nil {
return false
}
// While advancing child iterators the memory used for labels and chunks
// may be reused. When picking a series we have to store the result.
var lset labels.Labels
var chks []chunks.Meta

d := c.compare()
if d > 0 {
lset, chks, c.intervals = c.b.At()
c.l = append(c.l[:0], lset...)
c.c = append(c.c[:0], chks...)

c.bok = c.b.Next()
} else if d < 0 {
lset, chks, c.intervals = c.a.At()
c.l = append(c.l[:0], lset...)
c.c = append(c.c[:0], chks...)
var nextExists bool
for _, ok := range c.oks {
nextExists = nextExists || ok
}
if !nextExists {
return false
}

c.aok = c.a.Next()
} else {
// Both sets contain the current series. Chain them into a single one.
l, ca, ra := c.a.At()
_, cb, rb := c.b.At()
ref, lset, chks, intervals := c.sets[0].At()
idx := 0

for _, r := range rb {
ra = ra.add(r)
for i, s := range c.sets[1:] {
if !c.oks[1+i] {
continue
}
rf, lb, ch, itv := s.At()
if labels.Compare(lset, lb) > 0 {
ref, lset, chks, intervals = rf, lb, ch, itv
idx = i + 1
}
}

c.l = append(c.l[:0], l...)
c.c = append(append(c.c[:0], ca...), cb...)
c.intervals = ra

c.aok = c.a.Next()
c.bok = c.b.Next()
c.l = append(c.l[:0], lset...)
c.c = append(c.c[:0], chks...)
c.seriesMap[idx][ref] = c.ref
c.oks[idx] = c.sets[idx].Next()
for i, s := range c.sets[idx+1:] {
if !c.oks[idx+1+i] {
continue
}
rf, lb, ch, itv := s.At()
if labels.Compare(c.l, lb) == 0 {
c.seriesMap[idx+1+i][rf] = c.ref
c.c = append(c.c, ch...)
for _, r := range itv {
intervals.add(r)
}
c.oks[idx+1+i] = c.sets[idx+1+i].Next()
}
}
c.intervals = intervals

c.ref++
return true
}

func (c *compactionMerger) Err() error {
if c.a.Err() != nil {
return c.a.Err()
for _, s := range c.sets {
if s.Err() != nil {
return s.Err()
}
}
return c.b.Err()
return nil
}

func (c *compactionMerger) At() (uint64, labels.Labels, []chunks.Meta, Intervals) {
return c.ref - 1, c.l, c.c, c.intervals
}

func (c *compactionMerger) At() (labels.Labels, []chunks.Meta, Intervals) {
return c.l, c.c, c.intervals
func (c *compactionMerger) SeriesMap() []map[uint64]uint64 {
return c.seriesMap
}
2 changes: 1 addition & 1 deletion head_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ Outer:
// Getting the actual samples.
actSamples := make([]sample, 0)
for css.Next() {
lblsAct, chkMetas, intv := css.At()
_, lblsAct, chkMetas, intv := css.At()
testutil.Equals(t, labels.Labels{lblDefault}, lblsAct)
testutil.Equals(t, 0, len(intv))

Expand Down
16 changes: 9 additions & 7 deletions querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -670,7 +670,7 @@ func (s *mergedVerticalSeriesSet) Next() bool {
// actual series itself.
type ChunkSeriesSet interface {
Next() bool
At() (labels.Labels, []chunks.Meta, Intervals)
At() (uint64, labels.Labels, []chunks.Meta, Intervals)
Err() error
}

Expand Down Expand Up @@ -704,8 +704,8 @@ func LookupChunkSeries(ir IndexReader, tr TombstoneReader, ms ...labels.Matcher)
}, nil
}

func (s *baseChunkSeries) At() (labels.Labels, []chunks.Meta, Intervals) {
return s.lset, s.chks, s.intervals
func (s *baseChunkSeries) At() (uint64, labels.Labels, []chunks.Meta, Intervals) {
return s.p.At(), s.lset, s.chks, s.intervals
}

func (s *baseChunkSeries) Err() error { return s.err }
Expand Down Expand Up @@ -765,20 +765,21 @@ type populatedChunkSeries struct {
mint, maxt int64

err error
ref uint64
chks []chunks.Meta
lset labels.Labels
intervals Intervals
}

func (s *populatedChunkSeries) At() (labels.Labels, []chunks.Meta, Intervals) {
return s.lset, s.chks, s.intervals
func (s *populatedChunkSeries) At() (uint64, labels.Labels, []chunks.Meta, Intervals) {
return s.ref, s.lset, s.chks, s.intervals
}

func (s *populatedChunkSeries) Err() error { return s.err }

func (s *populatedChunkSeries) Next() bool {
for s.set.Next() {
lset, chks, dranges := s.set.At()
ref, lset, chks, dranges := s.set.At()

for len(chks) > 0 {
if chks[0].MaxTime >= s.mint {
Expand Down Expand Up @@ -814,6 +815,7 @@ func (s *populatedChunkSeries) Next() bool {
continue
}

s.ref = ref
s.lset = lset
s.chks = chks
s.intervals = dranges
Expand All @@ -837,7 +839,7 @@ type blockSeriesSet struct {

func (s *blockSeriesSet) Next() bool {
for s.set.Next() {
lset, chunks, dranges := s.set.At()
_, lset, chunks, dranges := s.set.At()
s.cur = &chunkSeries{
labels: lset,
chunks: chunks,
Expand Down