Skip to content
This repository has been archived by the owner on Aug 13, 2019. It is now read-only.

Commit

Permalink
Faster way to merge remapped postings
Browse files Browse the repository at this point in the history
Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>
  • Loading branch information
codesome committed Aug 2, 2019
1 parent c252f96 commit a75fa32
Showing 1 changed file with 71 additions and 37 deletions.
108 changes: 71 additions & 37 deletions compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -738,7 +738,6 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
return c.ctx.Err()
default:
}

ref, lset, chks, dranges := set.At() // The chunks here are not fully deleted.

if overlapping {
Expand Down Expand Up @@ -885,7 +884,7 @@ func (c *LeveledCompactor) writePostings(indexw IndexWriter, totalSeries int, va
idxw.HintPostingsWriteCount(numLabelValues)
}

remapPostings := newRemappedPostings(seriesMap, totalSeries)
remapPostings := newRemappedPostings(seriesMap, indexReaders, totalSeries)
var postBuf index.Postings
for _, n := range names {
labelValuesBuf = labelValuesBuf[:0]
Expand All @@ -895,16 +894,11 @@ func (c *LeveledCompactor) writePostings(indexw IndexWriter, totalSeries int, va
sort.Strings(labelValuesBuf)

for _, v := range labelValuesBuf {
remapPostings.clearPostings()
for i, ir := range indexReaders {
postBuf, err = ir.Postings(n, v, postBuf)
if err != nil {
return errors.Wrap(err, "read postings")
}
remapPostings.add(i, postBuf)
postBuf, err = remapPostings.get(n, v)
if err != nil {
return errors.Wrap(err, "remapping postings")
}

if err := indexw.WritePostings(n, v, remapPostings.get()); err != nil {
if err := indexw.WritePostings(n, v, postBuf); err != nil {
return errors.Wrap(err, "write postings")
}
}
Expand All @@ -917,18 +911,30 @@ func (c *LeveledCompactor) writePostings(indexw IndexWriter, totalSeries int, va
// from given set of maps.
type remappedPostings struct {
postingsMap []map[uint64]uint64
postingBuf []uint64
listPost *index.ListPostings
irs []IndexReader

postObj index.Postings
postingLists [3][]uint64
currBufIdx, appendBufIdx, mergeBufIdx int
}

// newRemappedPostings returns remappedPostings.
// 'postingsMap' is the slice of maps used for remapping.
// 'postingSizeHint' is for preallocation of memory to reduce allocs later.
func newRemappedPostings(postingsMap []map[uint64]uint64, postingSizeHint int) *remappedPostings {
func newRemappedPostings(postingsMap []map[uint64]uint64, irs []IndexReader, postingSizeHint int) *remappedPostings {
return &remappedPostings{
postingsMap: postingsMap,
postingBuf: make([]uint64, 0, postingSizeHint),
listPost: index.NewListPostings(),
irs: irs,
postingLists: [3][]uint64{
make([]uint64, 0, postingSizeHint),
make([]uint64, 0, postingSizeHint),
make([]uint64, 0, postingSizeHint),
},
listPost: index.NewListPostings(),
currBufIdx: 0,
appendBufIdx: 1,
mergeBufIdx: 2,
}
}

Expand All @@ -937,44 +943,72 @@ func newRemappedPostings(postingsMap []map[uint64]uint64, postingSizeHint int) *
// are added to the result buffer.
func (rp *remappedPostings) add(mapIdx int, p index.Postings) {
pMap := rp.postingsMap[mapIdx]
idx, lastIdx := -1, -1

currBuf := rp.postingLists[rp.currBufIdx]
appendBuf := rp.postingLists[rp.appendBufIdx][:0]
mergeBuf := rp.postingLists[rp.mergeBufIdx][:0]

for p.Next() {
newVal, ok := pMap[p.At()]
if !ok {
continue
}
// idx is the index at which newVal exists or index at which we need to insert.
// 'p' consists postings in sorted order w.r.t. the series labels.
// Hence the mapped series will also be in ascending order including the postings.
// So we need not look at/before 'lastIdx' in 'postingBuf'.
for idx = lastIdx + 1; idx < len(rp.postingBuf); idx++ {
if rp.postingBuf[idx] >= newVal {
break
appendBuf = append(appendBuf, newVal)
}

if mapIdx == 0 {
rp.postingLists[rp.currBufIdx] = appendBuf
rp.postingLists[rp.appendBufIdx] = currBuf
} else {
i, j := 0, 0
for i < len(currBuf) && j < len(appendBuf) {
if currBuf[i] < appendBuf[j] {
mergeBuf = append(mergeBuf, currBuf[i])
i++
} else if appendBuf[j] < currBuf[i] {
mergeBuf = append(mergeBuf, appendBuf[j])
j++
} else {
mergeBuf = append(mergeBuf, currBuf[i])
i++
j++
}
}
lastIdx = idx
if idx == len(rp.postingBuf) {
rp.postingBuf = append(rp.postingBuf, newVal)
} else if rp.postingBuf[idx] != newVal {
rp.postingBuf = append(rp.postingBuf[:idx], append([]uint64{newVal}, rp.postingBuf[idx:]...)...)
for i < len(currBuf) {
mergeBuf = append(mergeBuf, currBuf[i])
i++
}
for j < len(appendBuf) {
mergeBuf = append(mergeBuf, appendBuf[j])
j++
}

rp.postingLists[rp.currBufIdx] = mergeBuf
rp.postingLists[rp.mergeBufIdx] = currBuf
}
}

// get returns the remapped postings.
// The returned postings becomes invalid after calling any other exposed
// methods of RemappedPostings (Add, Get, ClearPostings). Hence postings
// should be used right after calling 'Get'.
// This is because of the shared buffer for memory optimizations.
func (rp *remappedPostings) get() index.Postings {
rp.listPost.Reset(rp.postingBuf)
return rp.listPost
// get returns the remapped postings for the given label name.
func (rp *remappedPostings) get(name, value string) (index.Postings, error) {
rp.clearPostings()
var err error
for i, ir := range rp.irs {
rp.postObj, err = ir.Postings(name, value, rp.postObj)
if err != nil {
return nil, errors.Wrap(err, "read postings")
}
rp.add(i, rp.postObj)
}
rp.listPost.Reset(rp.postingLists[rp.currBufIdx])
return rp.listPost, nil
}

// clearPostings only clears the result postings
// buffer and not the map.
func (rp *remappedPostings) clearPostings() {
rp.postingBuf = rp.postingBuf[:0]
rp.postingLists[0] = rp.postingLists[0][:0]
rp.postingLists[1] = rp.postingLists[1][:0]
rp.postingLists[2] = rp.postingLists[2][:0]
}

type compactionSeriesSet struct {
Expand Down

0 comments on commit a75fa32

Please sign in to comment.