diff --git a/balance_strategy.go b/balance_strategy.go index 0ce7fea1f..8f7634f94 100644 --- a/balance_strategy.go +++ b/balance_strategy.go @@ -2,6 +2,8 @@ package sarama import ( "container/heap" + "errors" + "fmt" "math" "sort" "strings" @@ -73,20 +75,6 @@ var BalanceStrategyRange = &balanceStrategy{ }, } -// BalanceStrategyRoundRobin assigns partitions to members in alternating order. -// Example with topic T with six partitions (0..5) and two members (M1, M2): -// M1: {T: [0, 2, 4]} -// M2: {T: [1, 3, 5]} -var BalanceStrategyRoundRobin = &balanceStrategy{ - name: RoundRobinBalanceStrategyName, - coreFn: func(plan BalanceStrategyPlan, memberIDs []string, topic string, partitions []int32) { - for i, part := range partitions { - memberID := memberIDs[i%len(memberIDs)] - plan.Add(memberID, topic, part) - } - }, -} - // BalanceStrategySticky assigns partitions to members with an attempt to preserve earlier assignments // while maintain a balanced partition distribution. // Example with topic T with six partitions (0..5) and two members (M1, M2): @@ -353,6 +341,92 @@ func (s *stickyBalanceStrategy) balance(currentAssignment map[string][]topicPart } } +// BalanceStrategyRoundRobin assigns partitions to members in alternating order. +// For example, there are two topics (t0, t1) and two consumer (m0, m1), and each topic has three partitions (p0, p1, p2): +// M0: [t0p0, t0p2, t1p1] +// M1: [t0p1, t1p0, t1p2] +var BalanceStrategyRoundRobin = new(roundRobinBalancer) + +type roundRobinBalancer struct{} + +func (b *roundRobinBalancer) Name() string { + return RoundRobinBalanceStrategyName +} + +func (b *roundRobinBalancer) Plan(memberAndMetadata map[string]ConsumerGroupMemberMetadata, topics map[string][]int32) (BalanceStrategyPlan, error) { + if len(memberAndMetadata) == 0 || len(topics) == 0 { + return nil, errors.New("members and topics are not provided") + } + // sort partitions + var topicPartitions []topicAndPartition + for topic, partitions := range topics { + for _, partition := range partitions { + topicPartitions = append(topicPartitions, topicAndPartition{topic: topic, partition: partition}) + } + } + sort.SliceStable(topicPartitions, func(i, j int) bool { + pi := topicPartitions[i] + pj := topicPartitions[j] + return pi.comparedValue() < pj.comparedValue() + }) + + // sort members + var members []memberAndTopic + for memberID, meta := range memberAndMetadata { + m := memberAndTopic{ + memberID: memberID, + topics: make(map[string]struct{}), + } + for _, t := range meta.Topics { + m.topics[t] = struct{}{} + } + members = append(members, m) + } + sort.SliceStable(members, func(i, j int) bool { + mi := members[i] + mj := members[j] + return mi.memberID < mj.memberID + }) + + // assign partitions + plan := make(BalanceStrategyPlan, len(members)) + i := 0 + n := len(members) + for _, tp := range topicPartitions { + m := members[i%n] + for !m.hasTopic(tp.topic) { + i++ + m = members[i%n] + } + plan.Add(m.memberID, tp.topic, tp.partition) + i++ + } + return plan, nil +} + +func (b *roundRobinBalancer) AssignmentData(memberID string, topics map[string][]int32, generationID int32) ([]byte, error) { + return nil, nil // do nothing for now +} + +type topicAndPartition struct { + topic string + partition int32 +} + +func (tp *topicAndPartition) comparedValue() string { + return fmt.Sprintf("%s-%d", tp.topic, tp.partition) +} + +type memberAndTopic struct { + memberID string + topics map[string]struct{} +} + +func (m *memberAndTopic) hasTopic(topic string) bool { + _, isExist := m.topics[topic] + return isExist +} + // Calculate the balance score of the given assignment, as the sum of assigned partitions size difference of all consumer pairs. // A perfectly balanced assignment (with all consumers getting the same number of partitions) has a balance score of 0. // Lower balance score indicates a more balanced assignment. diff --git a/balance_strategy_test.go b/balance_strategy_test.go index 25a51adaa..66af3d1c9 100644 --- a/balance_strategy_test.go +++ b/balance_strategy_test.go @@ -90,26 +90,64 @@ func TestBalanceStrategyRoundRobin(t *testing.T) { expected BalanceStrategyPlan }{ { - members: map[string][]string{"M1": {"T1", "T2"}, "M2": {"T1", "T2"}}, - topics: map[string][]int32{"T1": {0, 1, 2, 3}, "T2": {0, 1, 2, 3}}, + members: map[string][]string{"M1": {"T1", "T2", "T3"}, "M2": {"T1", "T2", "T3"}}, + topics: map[string][]int32{"T1": {0}, "T2": {0}, "T3": {0}}, expected: BalanceStrategyPlan{ - "M1": map[string][]int32{"T1": {0, 2}, "T2": {1, 3}}, - "M2": map[string][]int32{"T1": {1, 3}, "T2": {0, 2}}, + "M1": map[string][]int32{"T1": {0}, "T3": {0}}, + "M2": map[string][]int32{"T2": {0}}, }, }, { - members: map[string][]string{"M1": {"T1", "T2"}, "M2": {"T1", "T2"}}, - topics: map[string][]int32{"T1": {0, 1, 2}, "T2": {0, 1, 2}}, + members: map[string][]string{"M1": {"T1", "T2", "T3"}, "M2": {"T1", "T2", "T3"}}, + topics: map[string][]int32{"T1": {0}, "T2": {0, 1}, "T3": {0, 1, 2, 3}}, + expected: BalanceStrategyPlan{ + "M1": map[string][]int32{"T1": {0}, "T2": {1}, "T3": {1, 3}}, + "M2": map[string][]int32{"T2": {0}, "T3": {0, 2}}, + }, + }, + { + members: map[string][]string{"M1": {"T1"}, "M2": {"T1"}}, + topics: map[string][]int32{"T1": {0}}, + expected: BalanceStrategyPlan{ + "M1": map[string][]int32{"T1": {0}}, + }, + }, + { + members: map[string][]string{"M1": {"T1", "T2", "T3"}}, + topics: map[string][]int32{"T1": {0}, "T2": {0}, "T3": {0, 1, 2}}, expected: BalanceStrategyPlan{ - "M1": map[string][]int32{"T1": {0, 2}, "T2": {1}}, - "M2": map[string][]int32{"T1": {1}, "T2": {0, 2}}, + "M1": map[string][]int32{"T1": {0}, "T2": {0}, "T3": {0, 1, 2}}, + }, + }, + { + members: map[string][]string{"M1": {"T1", "T2", "T3"}, "M2": {"T1"}}, + topics: map[string][]int32{"T1": {0}, "T2": {0}, "T3": {0}}, + expected: BalanceStrategyPlan{ + "M1": map[string][]int32{"T1": {0}, "T2": {0}, "T3": {0}}, + }, + }, + { + members: map[string][]string{"M1": {"T1", "T2", "T3"}, "M2": {"T1", "T3"}}, + topics: map[string][]int32{"T1": {0}, "T2": {0}, "T3": {0}}, + expected: BalanceStrategyPlan{ + "M1": map[string][]int32{"T1": {0}, "T2": {0}}, + "M2": map[string][]int32{"T3": {0}}, + }, + }, + { + members: map[string][]string{"M": {"T1", "T2", "TT2"}, "M2": {"T1", "T2", "TT2"}, "M3": {"T1", "T2", "TT2"}}, + topics: map[string][]int32{"T1": {0}, "T2": {0}, "TT2": {0}}, + expected: BalanceStrategyPlan{ + "M": map[string][]int32{"T1": {0}}, + "M2": map[string][]int32{"T2": {0}}, + "M3": map[string][]int32{"TT2": {0}}, }, }, } strategy := BalanceStrategyRoundRobin if strategy.Name() != "roundrobin" { - t.Errorf("Unexpected stategy name\nexpected: range\nactual: %v", strategy.Name()) + t.Errorf("Unexpected strategy name\nexpected: roundrobin\nactual: %v", strategy.Name()) } for _, test := range tests {