Skip to content

Commit

Permalink
Merge pull request #1788 from kzinglzy/even-round-robin-balance-strategy
Browse files Browse the repository at this point in the history
feat[balance_strategy]: announcing a new round robin balance strategy
  • Loading branch information
bai committed Dec 24, 2020
2 parents 0d9d572 + 2caa4f7 commit 0cdf4a6
Show file tree
Hide file tree
Showing 2 changed files with 135 additions and 23 deletions.
102 changes: 88 additions & 14 deletions balance_strategy.go
Expand Up @@ -2,6 +2,8 @@ package sarama

import (
"container/heap"
"errors"
"fmt"
"math"
"sort"
"strings"
Expand Down Expand Up @@ -73,20 +75,6 @@ var BalanceStrategyRange = &balanceStrategy{
},
}

// BalanceStrategyRoundRobin assigns partitions to members in alternating order.
// Example with topic T with six partitions (0..5) and two members (M1, M2):
// M1: {T: [0, 2, 4]}
// M2: {T: [1, 3, 5]}
var BalanceStrategyRoundRobin = &balanceStrategy{
name: RoundRobinBalanceStrategyName,
coreFn: func(plan BalanceStrategyPlan, memberIDs []string, topic string, partitions []int32) {
for i, part := range partitions {
memberID := memberIDs[i%len(memberIDs)]
plan.Add(memberID, topic, part)
}
},
}

// BalanceStrategySticky assigns partitions to members with an attempt to preserve earlier assignments
// while maintain a balanced partition distribution.
// Example with topic T with six partitions (0..5) and two members (M1, M2):
Expand Down Expand Up @@ -353,6 +341,92 @@ func (s *stickyBalanceStrategy) balance(currentAssignment map[string][]topicPart
}
}

// BalanceStrategyRoundRobin assigns partitions to members in alternating order.
// For example, there are two topics (t0, t1) and two consumer (m0, m1), and each topic has three partitions (p0, p1, p2):
// M0: [t0p0, t0p2, t1p1]
// M1: [t0p1, t1p0, t1p2]
var BalanceStrategyRoundRobin = new(roundRobinBalancer)

type roundRobinBalancer struct{}

func (b *roundRobinBalancer) Name() string {
return RoundRobinBalanceStrategyName
}

func (b *roundRobinBalancer) Plan(memberAndMetadata map[string]ConsumerGroupMemberMetadata, topics map[string][]int32) (BalanceStrategyPlan, error) {
if len(memberAndMetadata) == 0 || len(topics) == 0 {
return nil, errors.New("members and topics are not provided")
}
// sort partitions
var topicPartitions []topicAndPartition
for topic, partitions := range topics {
for _, partition := range partitions {
topicPartitions = append(topicPartitions, topicAndPartition{topic: topic, partition: partition})
}
}
sort.SliceStable(topicPartitions, func(i, j int) bool {
pi := topicPartitions[i]
pj := topicPartitions[j]
return pi.comparedValue() < pj.comparedValue()
})

// sort members
var members []memberAndTopic
for memberID, meta := range memberAndMetadata {
m := memberAndTopic{
memberID: memberID,
topics: make(map[string]struct{}),
}
for _, t := range meta.Topics {
m.topics[t] = struct{}{}
}
members = append(members, m)
}
sort.SliceStable(members, func(i, j int) bool {
mi := members[i]
mj := members[j]
return mi.memberID < mj.memberID
})

// assign partitions
plan := make(BalanceStrategyPlan, len(members))
i := 0
n := len(members)
for _, tp := range topicPartitions {
m := members[i%n]
for !m.hasTopic(tp.topic) {
i++
m = members[i%n]
}
plan.Add(m.memberID, tp.topic, tp.partition)
i++
}
return plan, nil
}

func (b *roundRobinBalancer) AssignmentData(memberID string, topics map[string][]int32, generationID int32) ([]byte, error) {
return nil, nil // do nothing for now
}

type topicAndPartition struct {
topic string
partition int32
}

func (tp *topicAndPartition) comparedValue() string {
return fmt.Sprintf("%s-%d", tp.topic, tp.partition)
}

type memberAndTopic struct {
memberID string
topics map[string]struct{}
}

func (m *memberAndTopic) hasTopic(topic string) bool {
_, isExist := m.topics[topic]
return isExist
}

// Calculate the balance score of the given assignment, as the sum of assigned partitions size difference of all consumer pairs.
// A perfectly balanced assignment (with all consumers getting the same number of partitions) has a balance score of 0.
// Lower balance score indicates a more balanced assignment.
Expand Down
56 changes: 47 additions & 9 deletions balance_strategy_test.go
Expand Up @@ -90,26 +90,64 @@ func TestBalanceStrategyRoundRobin(t *testing.T) {
expected BalanceStrategyPlan
}{
{
members: map[string][]string{"M1": {"T1", "T2"}, "M2": {"T1", "T2"}},
topics: map[string][]int32{"T1": {0, 1, 2, 3}, "T2": {0, 1, 2, 3}},
members: map[string][]string{"M1": {"T1", "T2", "T3"}, "M2": {"T1", "T2", "T3"}},
topics: map[string][]int32{"T1": {0}, "T2": {0}, "T3": {0}},
expected: BalanceStrategyPlan{
"M1": map[string][]int32{"T1": {0, 2}, "T2": {1, 3}},
"M2": map[string][]int32{"T1": {1, 3}, "T2": {0, 2}},
"M1": map[string][]int32{"T1": {0}, "T3": {0}},
"M2": map[string][]int32{"T2": {0}},
},
},
{
members: map[string][]string{"M1": {"T1", "T2"}, "M2": {"T1", "T2"}},
topics: map[string][]int32{"T1": {0, 1, 2}, "T2": {0, 1, 2}},
members: map[string][]string{"M1": {"T1", "T2", "T3"}, "M2": {"T1", "T2", "T3"}},
topics: map[string][]int32{"T1": {0}, "T2": {0, 1}, "T3": {0, 1, 2, 3}},
expected: BalanceStrategyPlan{
"M1": map[string][]int32{"T1": {0}, "T2": {1}, "T3": {1, 3}},
"M2": map[string][]int32{"T2": {0}, "T3": {0, 2}},
},
},
{
members: map[string][]string{"M1": {"T1"}, "M2": {"T1"}},
topics: map[string][]int32{"T1": {0}},
expected: BalanceStrategyPlan{
"M1": map[string][]int32{"T1": {0}},
},
},
{
members: map[string][]string{"M1": {"T1", "T2", "T3"}},
topics: map[string][]int32{"T1": {0}, "T2": {0}, "T3": {0, 1, 2}},
expected: BalanceStrategyPlan{
"M1": map[string][]int32{"T1": {0, 2}, "T2": {1}},
"M2": map[string][]int32{"T1": {1}, "T2": {0, 2}},
"M1": map[string][]int32{"T1": {0}, "T2": {0}, "T3": {0, 1, 2}},
},
},
{
members: map[string][]string{"M1": {"T1", "T2", "T3"}, "M2": {"T1"}},
topics: map[string][]int32{"T1": {0}, "T2": {0}, "T3": {0}},
expected: BalanceStrategyPlan{
"M1": map[string][]int32{"T1": {0}, "T2": {0}, "T3": {0}},
},
},
{
members: map[string][]string{"M1": {"T1", "T2", "T3"}, "M2": {"T1", "T3"}},
topics: map[string][]int32{"T1": {0}, "T2": {0}, "T3": {0}},
expected: BalanceStrategyPlan{
"M1": map[string][]int32{"T1": {0}, "T2": {0}},
"M2": map[string][]int32{"T3": {0}},
},
},
{
members: map[string][]string{"M": {"T1", "T2", "TT2"}, "M2": {"T1", "T2", "TT2"}, "M3": {"T1", "T2", "TT2"}},
topics: map[string][]int32{"T1": {0}, "T2": {0}, "TT2": {0}},
expected: BalanceStrategyPlan{
"M": map[string][]int32{"T1": {0}},
"M2": map[string][]int32{"T2": {0}},
"M3": map[string][]int32{"TT2": {0}},
},
},
}

strategy := BalanceStrategyRoundRobin
if strategy.Name() != "roundrobin" {
t.Errorf("Unexpected stategy name\nexpected: range\nactual: %v", strategy.Name())
t.Errorf("Unexpected strategy name\nexpected: roundrobin\nactual: %v", strategy.Name())
}

for _, test := range tests {
Expand Down

0 comments on commit 0cdf4a6

Please sign in to comment.