Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wrr: improve randomWRR performance #5067

Merged
merged 8 commits into from Jan 12, 2022
Merged
49 changes: 32 additions & 17 deletions internal/wrr/random.go
Expand Up @@ -19,15 +19,16 @@ package wrr

import (
"fmt"
"sort"
"sync"

"google.golang.org/grpc/internal/grpcrand"
)

// weightedItem is a wrapped weighted item that is used to implement weighted random algorithm.
type weightedItem struct {
Item interface{}
Weight int64
item interface{}
accumulatedWeight int64
}

func (w *weightedItem) String() string {
Expand All @@ -36,9 +37,10 @@ func (w *weightedItem) String() string {

// randomWRR is a struct that contains weighted items implement weighted random algorithm.
type randomWRR struct {
mu sync.RWMutex
items []*weightedItem
sumOfWeights int64
mu sync.RWMutex
items []*weightedItem
// Are all item's weights equal
equalWeights bool
huangchong94 marked this conversation as resolved.
Show resolved Hide resolved
}

// NewRandom creates a new WRR with random.
Expand All @@ -51,27 +53,40 @@ var grpcrandInt63n = grpcrand.Int63n
func (rw *randomWRR) Next() (item interface{}) {
rw.mu.RLock()
defer rw.mu.RUnlock()
if rw.sumOfWeights == 0 {
var sumOfWeights int64
huangchong94 marked this conversation as resolved.
Show resolved Hide resolved
if len(rw.items) == 0 {
return nil
}
// Random number in [0, sum).
randomWeight := grpcrandInt63n(rw.sumOfWeights)
for _, item := range rw.items {
randomWeight = randomWeight - item.Weight
if randomWeight < 0 {
return item.Item
}
if rw.equalWeights {
return rw.items[grpcrandInt63n(int64(len(rw.items)))].item
}

return rw.items[len(rw.items)-1].Item
sumOfWeights = rw.items[len(rw.items)-1].accumulatedWeight
// Random number in [0, sumOfWeights).
randomWeight := grpcrandInt63n(sumOfWeights)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, this may still fail if sumOfWeights == 0 (this happens when the items added all are weight 0 🤷‍♂️ )

Maybe keep the check if sumOfWeights == 0

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If items added all are weight 0, then rw.equalWeights will be true, Next will return normally at line 61 return rw.items[grpcrandInt63n(int64(len(rw.items)))].item

I considered keeping if sumOfWeights == 0 return nil, but the behaviour will be different from edf.go, in edf, if all
item's weights are zero, Next will return non nil item

// Item's accumulated weights are in ascending order, because item's weight >= 0.
// Binary search rw.items to find first item whose accumulatedWeight > randomWeight
// The return i is guaranteed to be in range [0, len(rw.items)) because randomWeight < last item's accumulatedWeight
i := sort.Search(len(rw.items), func(i int) bool { return rw.items[i].accumulatedWeight > randomWeight })
return rw.items[i].item
}

func (rw *randomWRR) Add(item interface{}, weight int64) {
rw.mu.Lock()
defer rw.mu.Unlock()
rItem := &weightedItem{Item: item, Weight: weight}
accumulatedWeight := weight
equalWeights := true
if len(rw.items) > 0 {
lastItem := rw.items[len(rw.items)-1]
accumulatedWeight = lastItem.accumulatedWeight + weight
lastItemWeight := lastItem.accumulatedWeight
if len(rw.items) > 1 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I now see what you meant in the old comments. weight is used once here.
I would be OK to keep the weight field to simplify the code here.
Either way is fine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added weight back to weightedItem to simplify code here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By the way, why weight type is int64 not uint32, int64 will allow user add negative weight item to WRR which will result undefined behaviour in Next.

lastItemWeight = lastItem.accumulatedWeight - rw.items[len(rw.items)-2].accumulatedWeight
}
equalWeights = rw.equalWeights && weight == lastItemWeight
}
rw.equalWeights = equalWeights
rItem := &weightedItem{item: item, accumulatedWeight: accumulatedWeight}
rw.items = append(rw.items, rItem)
rw.sumOfWeights += weight
}

func (rw *randomWRR) String() string {
Expand Down
76 changes: 76 additions & 0 deletions internal/wrr/wrr_test.go
Expand Up @@ -21,6 +21,7 @@ import (
"errors"
"math"
"math/rand"
"strconv"
"testing"

"github.com/google/go-cmp/cmp"
Expand Down Expand Up @@ -70,6 +71,10 @@ func testWRRNext(t *testing.T, newWRR func() WRR) {
name: "17-23-37",
weights: []int64{17, 23, 37},
},
{
name: "no items",
weights: []int64{},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand All @@ -80,6 +85,13 @@ func testWRRNext(t *testing.T, newWRR func() WRR) {
w.Add(i, weight)
sumOfWeights += weight
}
if len(tt.weights) == 0 {
if w.Next() != nil {
t.Fatalf("w.Next returns non nil value when there is no item")
} else {
huangchong94 marked this conversation as resolved.
Show resolved Hide resolved
return
}
}

results := make(map[int]int)
for i := 0; i < iterCount; i++ {
Expand Down Expand Up @@ -112,6 +124,70 @@ func (s) TestEdfWrrNext(t *testing.T) {
testWRRNext(t, NewEDF)
}

func BenchmarkRandomWRRNext(b *testing.B) {
for _, n := range []int{100, 500, 1000} {
b.Run("equal-weights-"+strconv.Itoa(n)+"-items", func(b *testing.B) {
w := NewRandom()
sumOfWeights := n
for i := 0; i < n; i++ {
w.Add(i, 1)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
for i := 0; i < sumOfWeights; i++ {
w.Next()
}
}
})
}

var maxWeight int64 = 1024
for _, n := range []int{100, 500, 1000} {
b.Run("random-weights-"+strconv.Itoa(n)+"-items", func(b *testing.B) {
w := NewRandom()
var sumOfWeights int64
for i := 0; i < n; i++ {
weight := rand.Int63n(maxWeight + 1)
w.Add(i, weight)
sumOfWeights += weight
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
for i := 0; i < int(sumOfWeights); i++ {
w.Next()
}
}
})
}

itemsNum := 200
heavyWeight := int64(itemsNum)
lightWeight := int64(1)
heavyIndices := []int{0, itemsNum / 2, itemsNum - 1}
for _, heavyIndex := range heavyIndices {
b.Run("skew-weights-heavy-index-"+strconv.Itoa(heavyIndex), func(b *testing.B) {
w := NewRandom()
var sumOfWeights int64
for i := 0; i < itemsNum; i++ {
var weight int64
if i == heavyIndex {
weight = heavyWeight
} else {
weight = lightWeight
}
sumOfWeights += weight
w.Add(i, weight)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
for i := 0; i < int(sumOfWeights); i++ {
w.Next()
}
}
})
}
}

func init() {
r := rand.New(rand.NewSource(0))
grpcrandInt63n = r.Int63n
Expand Down