Skip to content

Commit

Permalink
Pop intersected postings heap without popping (#10092)
Browse files Browse the repository at this point in the history
See this comment for detailed explanation:

#9907 (comment)

TL;DR: if we don't call Pop() on the heap implementation, we don't need
to return our param as an `interface{}` so we save an allocation.
This would be popped for every label value, so it can be thousands of
saved allocations here (see benchmarks).

Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com>
  • Loading branch information
colega committed Jan 5, 2022
1 parent 5a68a41 commit 7015452
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 10 deletions.
41 changes: 32 additions & 9 deletions tsdb/index/postings.go
Original file line number Diff line number Diff line change
Expand Up @@ -846,18 +846,17 @@ func FindIntersectingPostings(p Postings, candidates []Postings) (indexes []int,
return nil, it.Err()
}
}
if len(h) == 0 {
if h.empty() {
return nil, nil
}
h.Init()

for len(h) > 0 {
for !h.empty() {
if !p.Seek(h.At()) {
return indexes, p.Err()
}
if p.At() == h.At() {
found := heap.Pop(&h).(postingsWithIndex)
indexes = append(indexes, found.index)
indexes = append(indexes, h.popIndex())
} else if err := h.Next(); err != nil {
return nil, err
}
Expand All @@ -869,10 +868,29 @@ func FindIntersectingPostings(p Postings, candidates []Postings) (indexes []int,
type postingsWithIndex struct {
index int
p Postings
// popped means that this postings shouldn't be considered anymore.
// See popIndex() comment to understand why we need this.
popped bool
}

type postingsWithIndexHeap []postingsWithIndex

// empty checks whether the heap is empty, which is true if it has no elements, of if the smallest element is popped.
func (h *postingsWithIndexHeap) empty() bool {
return len(*h) == 0 || (*h)[0].popped
}

// popIndex pops the smallest heap element and returns its index.
// In our implementation we don't actually do heap.Pop(), instead we mark the element as `popped` and fix its position, which
// should be after all the non-popped elements according to our sorting strategy.
// By skipping the `heap.Pop()` call we avoid an extra allocation in this heap's Pop() implementation which returns an interface{}.
func (h *postingsWithIndexHeap) popIndex() int {
index := (*h)[0].index
(*h)[0].popped = true
heap.Fix(h, 0)
return index
}

func (h *postingsWithIndexHeap) Init() { heap.Init(h) }
func (h postingsWithIndexHeap) At() storage.SeriesRef { return h[0].p.At() }
func (h *postingsWithIndexHeap) Next() error {
Expand All @@ -886,21 +904,26 @@ func (h *postingsWithIndexHeap) Next() error {
if err := pi.p.Err(); err != nil {
return errors.Wrapf(err, "postings %d", pi.index)
}

heap.Pop(h)
h.popIndex()
return nil
}

func (h postingsWithIndexHeap) Len() int { return len(h) }
func (h postingsWithIndexHeap) Less(i, j int) bool { return h[i].p.At() < h[j].p.At() }
func (h *postingsWithIndexHeap) Swap(i, j int) { (*h)[i], (*h)[j] = (*h)[j], (*h)[i] }
func (h postingsWithIndexHeap) Len() int { return len(h) }
func (h postingsWithIndexHeap) Less(i, j int) bool {
if h[i].popped != h[j].popped {
return h[j].popped
}
return h[i].p.At() < h[j].p.At()
}
func (h *postingsWithIndexHeap) Swap(i, j int) { (*h)[i], (*h)[j] = (*h)[j], (*h)[i] }

func (h *postingsWithIndexHeap) Push(x interface{}) {
*h = append(*h, x.(postingsWithIndex))
}

// Pop implements heap.Interface and pops the last element, which is NOT the min element,
// so this doesn't return the same heap.Pop()
// Although this method is implemented for correctness, we don't expect it to be used, see popIndex() method for details.
func (h *postingsWithIndexHeap) Pop() interface{} {
old := *h
n := len(old)
Expand Down
2 changes: 1 addition & 1 deletion tsdb/index/postings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1047,7 +1047,7 @@ func TestPostingsWithIndexHeap(t *testing.T) {
require.Equal(t, expected, h.At())
require.NoError(t, h.Next())
}
require.Empty(t, h)
require.True(t, h.empty())
})

t.Run("pop", func(t *testing.T) {
Expand Down

0 comments on commit 7015452

Please sign in to comment.