Skip to content
This repository has been archived by the owner on Aug 13, 2019. It is now read-only.

Commit

Permalink
Reset ref in compactionMerger when there are 0 chunks
Browse files Browse the repository at this point in the history
Signed-off-by: Ganesh Vernekar <cs15btech11018@iith.ac.in>
  • Loading branch information
codesome committed Jul 16, 2019
1 parent e0693dc commit 048790b
Showing 1 changed file with 27 additions and 6 deletions.
33 changes: 27 additions & 6 deletions compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -739,18 +739,19 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
}

ref, lset, chks, dranges := set.At() // The chunks here are not fully deleted.
// Skip the series with all deleted chunks.
if len(chks) == 0 {
set.RemoveLastRef()
continue
}

if overlapping {
// If blocks are overlapping, it is possible to have unsorted chunks.
sort.Slice(chks, func(i, j int) bool {
return chks[i].MinTime < chks[j].MinTime
})
}

// Skip the series with all deleted chunks.
if len(chks) == 0 {
continue
}

for i, chk := range chks {
// Re-encode head chunks that are still open (being appended to) or
// outside the compacted MaxTime range.
Expand Down Expand Up @@ -814,7 +815,6 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
if err := chunkw.WriteChunks(mergedChks...); err != nil {
return errors.Wrap(err, "write chunks")
}

if err := indexw.AddSeries(ref, lset, mergedChks...); err != nil {
return errors.Wrap(err, "add series")
}
Expand Down Expand Up @@ -1100,6 +1100,8 @@ func (c *compactionMerger) Next() bool {
ref, lset, chks, intervals := c.sets[0].At()
idx := 0

// Find the labels with the lowest index when
// sorted in ascending order.
for i, s := range c.sets[1:] {
if !c.oks[1+i] {
continue
Expand All @@ -1111,6 +1113,9 @@ func (c *compactionMerger) Next() bool {
}
}

// Building the seriesMap and gathering the chunks
// from other index readers which have same labels as
// 'lset' described above.
c.l = append(c.l[:0], lset...)
c.c = append(c.c[:0], chks...)
c.seriesMap[idx][ref] = c.ref
Expand All @@ -1135,6 +1140,22 @@ func (c *compactionMerger) Next() bool {
return true
}

// After calling RemoveLastRef(), At() is only valid after calling
// another Next().
func (c *compactionMerger) RemoveLastRef() {
if c.ref == 0 || c.Err() != nil {
return
}
toRemoveRef := c.ref - 1
for key1 := range c.seriesMap {
for key2, rf := range c.seriesMap[key1] {
if rf == toRemoveRef {
delete(c.seriesMap[key1], key2)
}
}
}
}

func (c *compactionMerger) Err() error {
for _, s := range c.sets {
if s.Err() != nil {
Expand Down

0 comments on commit 048790b

Please sign in to comment.