From be85cdada7bfccf4797a659958f67d2ecd6634b8 Mon Sep 17 00:00:00 2001 From: kzinglzy Date: Sun, 23 Aug 2020 16:22:35 +0800 Subject: [PATCH 1/4] feat[balance_strategy]: announcing a new round robin balance strategy --- balance_strategy.go | 92 ++++++++++++++++++++++++++++++++++++++++ balance_strategy_test.go | 91 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 183 insertions(+) diff --git a/balance_strategy.go b/balance_strategy.go index 0ce7fea1f..6c5bb12c9 100644 --- a/balance_strategy.go +++ b/balance_strategy.go @@ -2,6 +2,8 @@ package sarama import ( "container/heap" + "errors" + "fmt" "math" "sort" "strings" @@ -14,6 +16,10 @@ const ( // RoundRobinBalanceStrategyName identifies strategies that use the round-robin partition assignment strategy RoundRobinBalanceStrategyName = "roundrobin" + // EvenRoundRobinBalanceStrategyName identifies strategies that use the round-robin partition assignment strategy base on all topic's partitions, + // which assigns partitions more evenly in comparison to the`RoundRobinBalanceStrategyName` strategy + EvenRoundRobinBalanceStrategyName = "even_roundrobin" + // StickyBalanceStrategyName identifies strategies that use the sticky-partition assignment strategy StickyBalanceStrategyName = "sticky" @@ -353,6 +359,92 @@ func (s *stickyBalanceStrategy) balance(currentAssignment map[string][]topicPart } } +// BalanceStrategyEvenRoundRobin assigns partitions to members in alternating order. +// For example, there are two topics (t0, t1) and two consumer (m0, m1), and each topic has three partitions (p0, p1, p2): +// M0: [t0p0, t0p2, t1p1] +// M1: [t0p1, t1p0, t1p2] +var BalanceStrategyEvenRoundRobin = new(evenRoundRobinBalancer) + +type evenRoundRobinBalancer struct{} + +func (b *evenRoundRobinBalancer) Name() string { + return EvenRoundRobinBalanceStrategyName +} + +func (b *evenRoundRobinBalancer) Plan(memberAndMetadata map[string]ConsumerGroupMemberMetadata, topics map[string][]int32) (BalanceStrategyPlan, error) { + if len(memberAndMetadata) == 0 || len(topics) == 0 { + return nil, errors.New("members and topics are not provided") + } + // sort partitions + var topicPartitions []topicAndPartition + for topic, partitions := range topics { + for _, partition := range partitions { + topicPartitions = append(topicPartitions, topicAndPartition{topic: topic, partition: partition}) + } + } + sort.SliceStable(topicPartitions, func(i, j int) bool { + pi := topicPartitions[i] + pj := topicPartitions[j] + return pi.comparedValue() < pj.comparedValue() + }) + + // sort members + var members []memberAndTopic + for memberID, meta := range memberAndMetadata { + m := memberAndTopic{ + memberID: memberID, + topics: make(map[string]struct{}), + } + for _, t := range meta.Topics { + m.topics[t] = struct{}{} + } + members = append(members, m) + } + sort.SliceStable(members, func(i, j int) bool { + mi := members[i] + mj := members[j] + return mi.memberID < mj.memberID + }) + + // assign partitions + plan := make(BalanceStrategyPlan, len(members)) + i := 0 + n := len(members) + for _, tp := range topicPartitions { + m := members[i%n] + for !m.hasTopic(tp.topic) { + i++ + m = members[i%n] + } + plan.Add(m.memberID, tp.topic, tp.partition) + i++ + } + return plan, nil +} + +func (b *evenRoundRobinBalancer) AssignmentData(memberID string, topics map[string][]int32, generationID int32) ([]byte, error) { + return nil, nil // do nothing for now +} + +type topicAndPartition struct { + topic string + partition int32 +} + +func (tp *topicAndPartition) comparedValue() string { + return fmt.Sprintf("%s-%d", tp.topic, tp.partition) +} + +type memberAndTopic struct { + memberID string + topics map[string]struct{} +} + +func (m *memberAndTopic) hasTopic(topic string) bool { + _, isExist := m.topics[topic] + return isExist +} + // Calculate the balance score of the given assignment, as the sum of assigned partitions size difference of all consumer pairs. // A perfectly balanced assignment (with all consumers getting the same number of partitions) has a balance score of 0. // Lower balance score indicates a more balanced assignment. diff --git a/balance_strategy_test.go b/balance_strategy_test.go index 25a51adaa..c99f4d109 100644 --- a/balance_strategy_test.go +++ b/balance_strategy_test.go @@ -105,6 +105,15 @@ func TestBalanceStrategyRoundRobin(t *testing.T) { "M2": map[string][]int32{"T1": {1}, "T2": {0, 2}}, }, }, + { + // case that partitions are assigned unevenly: + // there are three members and topics, however all the topics and partitions are assign to a single member + members: map[string][]string{"M": {"T1", "T2", "TT2"}, "M2": {"T1", "T2", "TT2"}, "M3": {"T1", "T2", "TT2"}}, + topics: map[string][]int32{"T1": {0}, "T2": {0}, "TT2": {0}}, + expected: BalanceStrategyPlan{ + "M": map[string][]int32{"T1": {0}, "T2": {0}, "TT2": {0}}, + }, + }, } strategy := BalanceStrategyRoundRobin @@ -127,6 +136,88 @@ func TestBalanceStrategyRoundRobin(t *testing.T) { } } +func TestBalanceStrategyEvenRoundRobin(t *testing.T) { + tests := []struct { + members map[string][]string + topics map[string][]int32 + expected BalanceStrategyPlan + }{ + { + members: map[string][]string{"M1": {"T1", "T2", "T3"}, "M2": {"T1", "T2", "T3"}}, + topics: map[string][]int32{"T1": {0}, "T2": {0}, "T3": {0}}, + expected: BalanceStrategyPlan{ + "M1": map[string][]int32{"T1": {0}, "T3": {0}}, + "M2": map[string][]int32{"T2": {0}}, + }, + }, + { + members: map[string][]string{"M1": {"T1", "T2", "T3"}, "M2": {"T1", "T2", "T3"}}, + topics: map[string][]int32{"T1": {0}, "T2": {0, 1}, "T3": {0, 1, 2, 3}}, + expected: BalanceStrategyPlan{ + "M1": map[string][]int32{"T1": {0}, "T2": {1}, "T3": {1, 3}}, + "M2": map[string][]int32{"T2": {0}, "T3": {0, 2}}, + }, + }, + { + members: map[string][]string{"M1": {"T1"}, "M2": {"T1"}}, + topics: map[string][]int32{"T1": {0}}, + expected: BalanceStrategyPlan{ + "M1": map[string][]int32{"T1": {0}}, + }, + }, + { + members: map[string][]string{"M1": {"T1", "T2", "T3"}}, + topics: map[string][]int32{"T1": {0}, "T2": {0}, "T3": {0, 1, 2}}, + expected: BalanceStrategyPlan{ + "M1": map[string][]int32{"T1": {0}, "T2": {0}, "T3": {0, 1, 2}}, + }, + }, + { + members: map[string][]string{"M1": {"T1", "T2", "T3"}, "M2": {"T1"}}, + topics: map[string][]int32{"T1": {0}, "T2": {0}, "T3": {0}}, + expected: BalanceStrategyPlan{ + "M1": map[string][]int32{"T1": {0}, "T2": {0}, "T3": {0}}, + }, + }, + { + members: map[string][]string{"M1": {"T1", "T2", "T3"}, "M2": {"T1", "T3"}}, + topics: map[string][]int32{"T1": {0}, "T2": {0}, "T3": {0}}, + expected: BalanceStrategyPlan{ + "M1": map[string][]int32{"T1": {0}, "T2": {0}}, + "M2": map[string][]int32{"T3": {0}}, + }, + }, + { + members: map[string][]string{"M": {"T1", "T2", "TT2"}, "M2": {"T1", "T2", "TT2"}, "M3": {"T1", "T2", "TT2"}}, + topics: map[string][]int32{"T1": {0}, "T2": {0}, "TT2": {0}}, + expected: BalanceStrategyPlan{ + "M": map[string][]int32{"T1": {0}}, + "M2": map[string][]int32{"T2": {0}}, + "M3": map[string][]int32{"TT2": {0}}, + }, + }, + } + + strategy := BalanceStrategyEvenRoundRobin + if strategy.Name() != "even_roundrobin" { + t.Errorf("Unexpected stategy name\nexpected: even_roundrobin\nactual: %v", strategy.Name()) + } + + for _, test := range tests { + members := make(map[string]ConsumerGroupMemberMetadata) + for memberID, topics := range test.members { + members[memberID] = ConsumerGroupMemberMetadata{Topics: topics} + } + + actual, err := strategy.Plan(members, test.topics) + if err != nil { + t.Errorf("Unexpected error %v", err) + } else if !reflect.DeepEqual(actual, test.expected) { + t.Errorf("Plan does not match expectation\nexpected: %#v\nactual: %#v", test.expected, actual) + } + } +} + func Test_deserializeTopicPartitionAssignment(t *testing.T) { type args struct { userDataBytes []byte From 7df2cf0db526b2b47cb71f8ec023d7bf941b3145 Mon Sep 17 00:00:00 2001 From: kzinglzy Date: Sun, 23 Aug 2020 16:37:23 +0800 Subject: [PATCH 2/4] polish strategy name --- balance_strategy.go | 18 +++++++++--------- balance_strategy_test.go | 8 ++++---- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/balance_strategy.go b/balance_strategy.go index 6c5bb12c9..6c5e842f1 100644 --- a/balance_strategy.go +++ b/balance_strategy.go @@ -16,9 +16,9 @@ const ( // RoundRobinBalanceStrategyName identifies strategies that use the round-robin partition assignment strategy RoundRobinBalanceStrategyName = "roundrobin" - // EvenRoundRobinBalanceStrategyName identifies strategies that use the round-robin partition assignment strategy base on all topic's partitions, + // FairRoundRobinBalanceStrategyName identifies strategies that use the round-robin partition assignment strategy base on all topic's partitions, // which assigns partitions more evenly in comparison to the`RoundRobinBalanceStrategyName` strategy - EvenRoundRobinBalanceStrategyName = "even_roundrobin" + FairRoundRobinBalanceStrategyName = "fair_roundrobin" // StickyBalanceStrategyName identifies strategies that use the sticky-partition assignment strategy StickyBalanceStrategyName = "sticky" @@ -359,19 +359,19 @@ func (s *stickyBalanceStrategy) balance(currentAssignment map[string][]topicPart } } -// BalanceStrategyEvenRoundRobin assigns partitions to members in alternating order. +// BalanceStrategyFairRoundRound assigns partitions to members in alternating order. // For example, there are two topics (t0, t1) and two consumer (m0, m1), and each topic has three partitions (p0, p1, p2): // M0: [t0p0, t0p2, t1p1] // M1: [t0p1, t1p0, t1p2] -var BalanceStrategyEvenRoundRobin = new(evenRoundRobinBalancer) +var BalanceStrategyFairRoundRound = new(fairRoundRobinBalancer) -type evenRoundRobinBalancer struct{} +type fairRoundRobinBalancer struct{} -func (b *evenRoundRobinBalancer) Name() string { - return EvenRoundRobinBalanceStrategyName +func (b *fairRoundRobinBalancer) Name() string { + return FairRoundRobinBalanceStrategyName } -func (b *evenRoundRobinBalancer) Plan(memberAndMetadata map[string]ConsumerGroupMemberMetadata, topics map[string][]int32) (BalanceStrategyPlan, error) { +func (b *fairRoundRobinBalancer) Plan(memberAndMetadata map[string]ConsumerGroupMemberMetadata, topics map[string][]int32) (BalanceStrategyPlan, error) { if len(memberAndMetadata) == 0 || len(topics) == 0 { return nil, errors.New("members and topics are not provided") } @@ -422,7 +422,7 @@ func (b *evenRoundRobinBalancer) Plan(memberAndMetadata map[string]ConsumerGroup return plan, nil } -func (b *evenRoundRobinBalancer) AssignmentData(memberID string, topics map[string][]int32, generationID int32) ([]byte, error) { +func (b *fairRoundRobinBalancer) AssignmentData(memberID string, topics map[string][]int32, generationID int32) ([]byte, error) { return nil, nil // do nothing for now } diff --git a/balance_strategy_test.go b/balance_strategy_test.go index c99f4d109..d5d455440 100644 --- a/balance_strategy_test.go +++ b/balance_strategy_test.go @@ -136,7 +136,7 @@ func TestBalanceStrategyRoundRobin(t *testing.T) { } } -func TestBalanceStrategyEvenRoundRobin(t *testing.T) { +func TestBalanceStrategyFairRoundRobin(t *testing.T) { tests := []struct { members map[string][]string topics map[string][]int32 @@ -198,9 +198,9 @@ func TestBalanceStrategyEvenRoundRobin(t *testing.T) { }, } - strategy := BalanceStrategyEvenRoundRobin - if strategy.Name() != "even_roundrobin" { - t.Errorf("Unexpected stategy name\nexpected: even_roundrobin\nactual: %v", strategy.Name()) + strategy := BalanceStrategyFairRoundRound + if strategy.Name() != "fair_roundrobin" { + t.Errorf("Unexpected stategy name\nexpected: fail_roundrobin\nactual: %v", strategy.Name()) } for _, test := range tests { From f2674320a9074e58a836365f44cd08e33822f39e Mon Sep 17 00:00:00 2001 From: kzinglzy Date: Thu, 24 Dec 2020 00:51:43 +0800 Subject: [PATCH 3/4] fix and replace original implementation --- balance_strategy.go | 32 +++++----------------- balance_strategy_test.go | 57 ++-------------------------------------- 2 files changed, 9 insertions(+), 80 deletions(-) diff --git a/balance_strategy.go b/balance_strategy.go index 6c5e842f1..8f7634f94 100644 --- a/balance_strategy.go +++ b/balance_strategy.go @@ -16,10 +16,6 @@ const ( // RoundRobinBalanceStrategyName identifies strategies that use the round-robin partition assignment strategy RoundRobinBalanceStrategyName = "roundrobin" - // FairRoundRobinBalanceStrategyName identifies strategies that use the round-robin partition assignment strategy base on all topic's partitions, - // which assigns partitions more evenly in comparison to the`RoundRobinBalanceStrategyName` strategy - FairRoundRobinBalanceStrategyName = "fair_roundrobin" - // StickyBalanceStrategyName identifies strategies that use the sticky-partition assignment strategy StickyBalanceStrategyName = "sticky" @@ -79,20 +75,6 @@ var BalanceStrategyRange = &balanceStrategy{ }, } -// BalanceStrategyRoundRobin assigns partitions to members in alternating order. -// Example with topic T with six partitions (0..5) and two members (M1, M2): -// M1: {T: [0, 2, 4]} -// M2: {T: [1, 3, 5]} -var BalanceStrategyRoundRobin = &balanceStrategy{ - name: RoundRobinBalanceStrategyName, - coreFn: func(plan BalanceStrategyPlan, memberIDs []string, topic string, partitions []int32) { - for i, part := range partitions { - memberID := memberIDs[i%len(memberIDs)] - plan.Add(memberID, topic, part) - } - }, -} - // BalanceStrategySticky assigns partitions to members with an attempt to preserve earlier assignments // while maintain a balanced partition distribution. // Example with topic T with six partitions (0..5) and two members (M1, M2): @@ -359,19 +341,19 @@ func (s *stickyBalanceStrategy) balance(currentAssignment map[string][]topicPart } } -// BalanceStrategyFairRoundRound assigns partitions to members in alternating order. +// BalanceStrategyRoundRobin assigns partitions to members in alternating order. // For example, there are two topics (t0, t1) and two consumer (m0, m1), and each topic has three partitions (p0, p1, p2): // M0: [t0p0, t0p2, t1p1] // M1: [t0p1, t1p0, t1p2] -var BalanceStrategyFairRoundRound = new(fairRoundRobinBalancer) +var BalanceStrategyRoundRobin = new(roundRobinBalancer) -type fairRoundRobinBalancer struct{} +type roundRobinBalancer struct{} -func (b *fairRoundRobinBalancer) Name() string { - return FairRoundRobinBalanceStrategyName +func (b *roundRobinBalancer) Name() string { + return RoundRobinBalanceStrategyName } -func (b *fairRoundRobinBalancer) Plan(memberAndMetadata map[string]ConsumerGroupMemberMetadata, topics map[string][]int32) (BalanceStrategyPlan, error) { +func (b *roundRobinBalancer) Plan(memberAndMetadata map[string]ConsumerGroupMemberMetadata, topics map[string][]int32) (BalanceStrategyPlan, error) { if len(memberAndMetadata) == 0 || len(topics) == 0 { return nil, errors.New("members and topics are not provided") } @@ -422,7 +404,7 @@ func (b *fairRoundRobinBalancer) Plan(memberAndMetadata map[string]ConsumerGroup return plan, nil } -func (b *fairRoundRobinBalancer) AssignmentData(memberID string, topics map[string][]int32, generationID int32) ([]byte, error) { +func (b *roundRobinBalancer) AssignmentData(memberID string, topics map[string][]int32, generationID int32) ([]byte, error) { return nil, nil // do nothing for now } diff --git a/balance_strategy_test.go b/balance_strategy_test.go index d5d455440..1dc5aecfa 100644 --- a/balance_strategy_test.go +++ b/balance_strategy_test.go @@ -84,59 +84,6 @@ func TestBalanceStrategyRangeAssignmentData(t *testing.T) { } func TestBalanceStrategyRoundRobin(t *testing.T) { - tests := []struct { - members map[string][]string - topics map[string][]int32 - expected BalanceStrategyPlan - }{ - { - members: map[string][]string{"M1": {"T1", "T2"}, "M2": {"T1", "T2"}}, - topics: map[string][]int32{"T1": {0, 1, 2, 3}, "T2": {0, 1, 2, 3}}, - expected: BalanceStrategyPlan{ - "M1": map[string][]int32{"T1": {0, 2}, "T2": {1, 3}}, - "M2": map[string][]int32{"T1": {1, 3}, "T2": {0, 2}}, - }, - }, - { - members: map[string][]string{"M1": {"T1", "T2"}, "M2": {"T1", "T2"}}, - topics: map[string][]int32{"T1": {0, 1, 2}, "T2": {0, 1, 2}}, - expected: BalanceStrategyPlan{ - "M1": map[string][]int32{"T1": {0, 2}, "T2": {1}}, - "M2": map[string][]int32{"T1": {1}, "T2": {0, 2}}, - }, - }, - { - // case that partitions are assigned unevenly: - // there are three members and topics, however all the topics and partitions are assign to a single member - members: map[string][]string{"M": {"T1", "T2", "TT2"}, "M2": {"T1", "T2", "TT2"}, "M3": {"T1", "T2", "TT2"}}, - topics: map[string][]int32{"T1": {0}, "T2": {0}, "TT2": {0}}, - expected: BalanceStrategyPlan{ - "M": map[string][]int32{"T1": {0}, "T2": {0}, "TT2": {0}}, - }, - }, - } - - strategy := BalanceStrategyRoundRobin - if strategy.Name() != "roundrobin" { - t.Errorf("Unexpected stategy name\nexpected: range\nactual: %v", strategy.Name()) - } - - for _, test := range tests { - members := make(map[string]ConsumerGroupMemberMetadata) - for memberID, topics := range test.members { - members[memberID] = ConsumerGroupMemberMetadata{Topics: topics} - } - - actual, err := strategy.Plan(members, test.topics) - if err != nil { - t.Errorf("Unexpected error %v", err) - } else if !reflect.DeepEqual(actual, test.expected) { - t.Errorf("Plan does not match expectation\nexpected: %#v\nactual: %#v", test.expected, actual) - } - } -} - -func TestBalanceStrategyFairRoundRobin(t *testing.T) { tests := []struct { members map[string][]string topics map[string][]int32 @@ -198,8 +145,8 @@ func TestBalanceStrategyFairRoundRobin(t *testing.T) { }, } - strategy := BalanceStrategyFairRoundRound - if strategy.Name() != "fair_roundrobin" { + strategy := BalanceStrategyRoundRobin + if strategy.Name() != "roundrobin" { t.Errorf("Unexpected stategy name\nexpected: fail_roundrobin\nactual: %v", strategy.Name()) } From 2caa4f747521f3d6859cc2344f3a8ac946a663ce Mon Sep 17 00:00:00 2001 From: kzinglzy Date: Thu, 24 Dec 2020 00:54:26 +0800 Subject: [PATCH 4/4] fix test typo --- balance_strategy_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/balance_strategy_test.go b/balance_strategy_test.go index 1dc5aecfa..66af3d1c9 100644 --- a/balance_strategy_test.go +++ b/balance_strategy_test.go @@ -147,7 +147,7 @@ func TestBalanceStrategyRoundRobin(t *testing.T) { strategy := BalanceStrategyRoundRobin if strategy.Name() != "roundrobin" { - t.Errorf("Unexpected stategy name\nexpected: fail_roundrobin\nactual: %v", strategy.Name()) + t.Errorf("Unexpected strategy name\nexpected: roundrobin\nactual: %v", strategy.Name()) } for _, test := range tests {