Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat[balance_strategy]: announcing a new round robin balance strategy #1788

Merged
merged 4 commits into from Dec 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
102 changes: 88 additions & 14 deletions balance_strategy.go
Expand Up @@ -2,6 +2,8 @@ package sarama

import (
"container/heap"
"errors"
"fmt"
"math"
"sort"
"strings"
Expand Down Expand Up @@ -73,20 +75,6 @@ var BalanceStrategyRange = &balanceStrategy{
},
}

// BalanceStrategyRoundRobin assigns partitions to members in alternating order.
// Example with topic T with six partitions (0..5) and two members (M1, M2):
// M1: {T: [0, 2, 4]}
// M2: {T: [1, 3, 5]}
var BalanceStrategyRoundRobin = &balanceStrategy{
name: RoundRobinBalanceStrategyName,
coreFn: func(plan BalanceStrategyPlan, memberIDs []string, topic string, partitions []int32) {
for i, part := range partitions {
memberID := memberIDs[i%len(memberIDs)]
plan.Add(memberID, topic, part)
}
},
}

// BalanceStrategySticky assigns partitions to members with an attempt to preserve earlier assignments
// while maintain a balanced partition distribution.
// Example with topic T with six partitions (0..5) and two members (M1, M2):
Expand Down Expand Up @@ -353,6 +341,92 @@ func (s *stickyBalanceStrategy) balance(currentAssignment map[string][]topicPart
}
}

// BalanceStrategyRoundRobin assigns partitions to members in alternating order.
// For example, there are two topics (t0, t1) and two consumer (m0, m1), and each topic has three partitions (p0, p1, p2):
// M0: [t0p0, t0p2, t1p1]
// M1: [t0p1, t1p0, t1p2]
var BalanceStrategyRoundRobin = new(roundRobinBalancer)

type roundRobinBalancer struct{}

func (b *roundRobinBalancer) Name() string {
return RoundRobinBalanceStrategyName
}

func (b *roundRobinBalancer) Plan(memberAndMetadata map[string]ConsumerGroupMemberMetadata, topics map[string][]int32) (BalanceStrategyPlan, error) {
if len(memberAndMetadata) == 0 || len(topics) == 0 {
return nil, errors.New("members and topics are not provided")
}
// sort partitions
var topicPartitions []topicAndPartition
for topic, partitions := range topics {
for _, partition := range partitions {
topicPartitions = append(topicPartitions, topicAndPartition{topic: topic, partition: partition})
}
}
sort.SliceStable(topicPartitions, func(i, j int) bool {
pi := topicPartitions[i]
pj := topicPartitions[j]
return pi.comparedValue() < pj.comparedValue()
})

// sort members
var members []memberAndTopic
for memberID, meta := range memberAndMetadata {
m := memberAndTopic{
memberID: memberID,
topics: make(map[string]struct{}),
}
for _, t := range meta.Topics {
m.topics[t] = struct{}{}
}
members = append(members, m)
}
sort.SliceStable(members, func(i, j int) bool {
mi := members[i]
mj := members[j]
return mi.memberID < mj.memberID
})

// assign partitions
plan := make(BalanceStrategyPlan, len(members))
i := 0
n := len(members)
for _, tp := range topicPartitions {
m := members[i%n]
for !m.hasTopic(tp.topic) {
i++
m = members[i%n]
}
plan.Add(m.memberID, tp.topic, tp.partition)
i++
}
return plan, nil
}

func (b *roundRobinBalancer) AssignmentData(memberID string, topics map[string][]int32, generationID int32) ([]byte, error) {
return nil, nil // do nothing for now
}

type topicAndPartition struct {
topic string
partition int32
}

func (tp *topicAndPartition) comparedValue() string {
return fmt.Sprintf("%s-%d", tp.topic, tp.partition)
}

type memberAndTopic struct {
memberID string
topics map[string]struct{}
}

func (m *memberAndTopic) hasTopic(topic string) bool {
_, isExist := m.topics[topic]
return isExist
}

// Calculate the balance score of the given assignment, as the sum of assigned partitions size difference of all consumer pairs.
// A perfectly balanced assignment (with all consumers getting the same number of partitions) has a balance score of 0.
// Lower balance score indicates a more balanced assignment.
Expand Down
56 changes: 47 additions & 9 deletions balance_strategy_test.go
Expand Up @@ -90,26 +90,64 @@ func TestBalanceStrategyRoundRobin(t *testing.T) {
expected BalanceStrategyPlan
}{
{
members: map[string][]string{"M1": {"T1", "T2"}, "M2": {"T1", "T2"}},
topics: map[string][]int32{"T1": {0, 1, 2, 3}, "T2": {0, 1, 2, 3}},
members: map[string][]string{"M1": {"T1", "T2", "T3"}, "M2": {"T1", "T2", "T3"}},
topics: map[string][]int32{"T1": {0}, "T2": {0}, "T3": {0}},
expected: BalanceStrategyPlan{
"M1": map[string][]int32{"T1": {0, 2}, "T2": {1, 3}},
"M2": map[string][]int32{"T1": {1, 3}, "T2": {0, 2}},
"M1": map[string][]int32{"T1": {0}, "T3": {0}},
"M2": map[string][]int32{"T2": {0}},
},
},
{
members: map[string][]string{"M1": {"T1", "T2"}, "M2": {"T1", "T2"}},
topics: map[string][]int32{"T1": {0, 1, 2}, "T2": {0, 1, 2}},
members: map[string][]string{"M1": {"T1", "T2", "T3"}, "M2": {"T1", "T2", "T3"}},
topics: map[string][]int32{"T1": {0}, "T2": {0, 1}, "T3": {0, 1, 2, 3}},
expected: BalanceStrategyPlan{
"M1": map[string][]int32{"T1": {0}, "T2": {1}, "T3": {1, 3}},
"M2": map[string][]int32{"T2": {0}, "T3": {0, 2}},
},
},
{
members: map[string][]string{"M1": {"T1"}, "M2": {"T1"}},
topics: map[string][]int32{"T1": {0}},
expected: BalanceStrategyPlan{
"M1": map[string][]int32{"T1": {0}},
},
},
{
members: map[string][]string{"M1": {"T1", "T2", "T3"}},
topics: map[string][]int32{"T1": {0}, "T2": {0}, "T3": {0, 1, 2}},
expected: BalanceStrategyPlan{
"M1": map[string][]int32{"T1": {0, 2}, "T2": {1}},
"M2": map[string][]int32{"T1": {1}, "T2": {0, 2}},
"M1": map[string][]int32{"T1": {0}, "T2": {0}, "T3": {0, 1, 2}},
},
},
{
members: map[string][]string{"M1": {"T1", "T2", "T3"}, "M2": {"T1"}},
topics: map[string][]int32{"T1": {0}, "T2": {0}, "T3": {0}},
expected: BalanceStrategyPlan{
"M1": map[string][]int32{"T1": {0}, "T2": {0}, "T3": {0}},
},
},
{
members: map[string][]string{"M1": {"T1", "T2", "T3"}, "M2": {"T1", "T3"}},
topics: map[string][]int32{"T1": {0}, "T2": {0}, "T3": {0}},
expected: BalanceStrategyPlan{
"M1": map[string][]int32{"T1": {0}, "T2": {0}},
"M2": map[string][]int32{"T3": {0}},
},
},
{
members: map[string][]string{"M": {"T1", "T2", "TT2"}, "M2": {"T1", "T2", "TT2"}, "M3": {"T1", "T2", "TT2"}},
topics: map[string][]int32{"T1": {0}, "T2": {0}, "TT2": {0}},
expected: BalanceStrategyPlan{
"M": map[string][]int32{"T1": {0}},
"M2": map[string][]int32{"T2": {0}},
"M3": map[string][]int32{"TT2": {0}},
},
},
}

strategy := BalanceStrategyRoundRobin
if strategy.Name() != "roundrobin" {
t.Errorf("Unexpected stategy name\nexpected: range\nactual: %v", strategy.Name())
t.Errorf("Unexpected strategy name\nexpected: roundrobin\nactual: %v", strategy.Name())
}

for _, test := range tests {
Expand Down