Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wrr: improve randomWRR performance #5067

Merged
merged 8 commits into from Jan 12, 2022
Merged
44 changes: 29 additions & 15 deletions internal/wrr/random.go
Expand Up @@ -19,15 +19,21 @@ package wrr

import (
"fmt"
"sort"
"sync"

"google.golang.org/grpc/internal/grpcrand"
)

// weightedItem is a wrapped weighted item that is used to implement weighted random algorithm.
type weightedItem struct {
Item interface{}
Weight int64
Item interface{}
// TODO Delete Weight? This field is not necessary for randomWRR to work.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Delete this field if it's not used.

And it seems all fields in this struct can be unexported, right? Can you also change that? Thanks

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

// But without this field, if we want to know an item's weight in randomWRR.Add , we have to
// calculate it (i.e. weight = items.AccumulatedWeight - previousItem.AccumulatedWeight)
// which is a bit less concise than items.Weight
Weight int64
AccumulatedWeight int64
}

func (w *weightedItem) String() string {
Expand All @@ -38,7 +44,7 @@ func (w *weightedItem) String() string {
type randomWRR struct {
mu sync.RWMutex
items []*weightedItem
sumOfWeights int64
equalWeights bool
huangchong94 marked this conversation as resolved.
Show resolved Hide resolved
}

// NewRandom creates a new WRR with random.
Expand All @@ -51,27 +57,35 @@ var grpcrandInt63n = grpcrand.Int63n
func (rw *randomWRR) Next() (item interface{}) {
rw.mu.RLock()
defer rw.mu.RUnlock()
if rw.sumOfWeights == 0 {
sumOfWeights := rw.items[len(rw.items)-1].AccumulatedWeight
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will fail if there's no item, right? (And add a test)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

if sumOfWeights == 0 {
return nil
}
// Random number in [0, sum).
randomWeight := grpcrandInt63n(rw.sumOfWeights)
for _, item := range rw.items {
randomWeight = randomWeight - item.Weight
if randomWeight < 0 {
return item.Item
}
if rw.equalWeights {
return rw.items[grpcrandInt63n(int64(len(rw.items)))].Item
}

return rw.items[len(rw.items)-1].Item
// Random number in [0, sumOfWeights).
randomWeight := grpcrandInt63n(sumOfWeights)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, this may still fail if sumOfWeights == 0 (this happens when the items added all are weight 0 🤷‍♂️ )

Maybe keep the check if sumOfWeights == 0

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If items added all are weight 0, then rw.equalWeights will be true, Next will return normally at line 61 return rw.items[grpcrandInt63n(int64(len(rw.items)))].item

I considered keeping if sumOfWeights == 0 return nil, but the behaviour will be different from edf.go, in edf, if all
item's weights are zero, Next will return non nil item

// Item's accumulated weights are in ascending order, because item's weight >= 0.
// Binary search rw.items to find first item whose AccumulatedWeight > randomWeight
// The return i is guaranteed to be in range [0, len(rw.items)) because randomWeight < last item's AccumulatedWeight
i := sort.Search(len(rw.items), func(i int) bool { return rw.items[i].AccumulatedWeight > randomWeight })
return rw.items[i].Item
}

func (rw *randomWRR) Add(item interface{}, weight int64) {
rw.mu.Lock()
defer rw.mu.Unlock()
rItem := &weightedItem{Item: item, Weight: weight}
accumulatedWeight := weight
equalWeights := true
if len(rw.items) > 0 {
lastItem := rw.items[len(rw.items)-1]
accumulatedWeight = lastItem.AccumulatedWeight + weight
equalWeights = rw.equalWeights && weight == lastItem.Weight
}
rw.equalWeights = equalWeights
rItem := &weightedItem{Item: item, Weight: weight, AccumulatedWeight: accumulatedWeight}
rw.items = append(rw.items, rItem)
rw.sumOfWeights += weight
}

func (rw *randomWRR) String() string {
Expand Down
65 changes: 65 additions & 0 deletions internal/wrr/wrr_test.go
Expand Up @@ -21,6 +21,7 @@ import (
"errors"
"math"
"math/rand"
"strconv"
"testing"

"github.com/google/go-cmp/cmp"
Expand Down Expand Up @@ -112,6 +113,70 @@ func (s) TestEdfWrrNext(t *testing.T) {
testWRRNext(t, NewEDF)
}

func BenchmarkRandomWRRNext(b *testing.B) {
for _, n := range []int{100, 500, 1000} {
b.Run("equal-weights-"+strconv.Itoa(n)+"-items", func(b *testing.B) {
w := NewRandom().(*randomWRR)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need this type assertion?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

sumOfWeights := n
for i := 0; i < n; i++ {
w.Add(i, 1)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
for i := 0; i < sumOfWeights; i++ {
w.Next()
}
}
})
}

var maxWeight int64 = 1024
for _, n := range []int{100, 500, 1000} {
b.Run("random-weights-"+strconv.Itoa(n)+"-items", func(b *testing.B) {
w := NewRandom()
var sumOfWeights int64
for i := 0; i < n; i++ {
weight := rand.Int63n(maxWeight + 1)
w.Add(i, weight)
sumOfWeights += weight
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
for i := 0; i < int(sumOfWeights); i++ {
w.Next()
}
}
})
}

itemsNum := 200
heavyWeight := int64(itemsNum)
lightWeight := int64(1)
heavyIndices := []int{0, itemsNum / 2, itemsNum - 1}
for _, heavyIndex := range heavyIndices {
b.Run("skew-weights-heavy-index-"+strconv.Itoa(heavyIndex), func(b *testing.B) {
w := NewRandom()
var sumOfWeights int64
for i := 0; i < itemsNum; i++ {
var weight int64
if i == heavyIndex {
weight = heavyWeight
} else {
weight = lightWeight
}
sumOfWeights += weight
w.Add(i, weight)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
for i := 0; i < int(sumOfWeights); i++ {
w.Next()
}
}
})
}
}

func init() {
r := rand.New(rand.NewSource(0))
grpcrandInt63n = r.Int63n
Expand Down