From 685bf3989a2ff4300c17b13773e9f59c1f59fcac Mon Sep 17 00:00:00 2001 From: Kyle Hargraves Date: Thu, 8 Feb 2018 21:29:22 -0600 Subject: [PATCH 1/2] Round-robin partition assignments across multiple topics This is closer to the behavior of Kafka's Java consumer, as described here: https://github.com/apache/kafka/blob/15bc405/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java#L31-L55 This is valuable in situations where you have highly disparate partition counts. Given three group members subscribed to many topics, only a few of which have more than one partition, the first member will receive far more assignments than the others. --- balancer.go | 168 +++++++++++++++++++------------- balancer_test.go | 247 ++++++++++++++++++++++++++++++++++++++--------- 2 files changed, 307 insertions(+), 108 deletions(-) diff --git a/balancer.go b/balancer.go index 3aeaece..cd05f2d 100644 --- a/balancer.go +++ b/balancer.go @@ -1,6 +1,7 @@ package cluster import ( + "container/ring" "math" "sort" @@ -70,101 +71,138 @@ func (n *Notification) success(current map[string][]int32) *Notification { // -------------------------------------------------------------------- +type balancer struct { + client sarama.Client + memberIDs []string + topics map[string]*topicInfo +} + type topicInfo struct { Partitions []int32 MemberIDs []string } -func (info topicInfo) Perform(s Strategy) map[string][]int32 { - if s == StrategyRoundRobin { - return info.RoundRobin() +func newBalancerFromMeta(client sarama.Client, members map[string]sarama.ConsumerGroupMemberMetadata) (*balancer, error) { + balancer := &balancer{ + client: client, + memberIDs: make([]string, 0, len(members)), + topics: make(map[string]*topicInfo), + } + for memberID, meta := range members { + balancer.memberIDs = append(balancer.memberIDs, memberID) + for _, topic := range meta.Topics { + if err := balancer.Topic(memberID, topic); err != nil { + return nil, err + } + } } - return info.Ranges() -} -func (info topicInfo) Ranges() map[string][]int32 { - sort.Strings(info.MemberIDs) + sort.Strings(balancer.memberIDs) + return balancer, nil +} - mlen := len(info.MemberIDs) - plen := len(info.Partitions) - res := make(map[string][]int32, mlen) +func (r *balancer) Topic(memberID string, name string) error { + info, ok := r.topics[name] + if !ok { + nums, err := r.client.Partitions(name) + if err != nil { + return err + } - for pos, memberID := range info.MemberIDs { - n, i := float64(plen)/float64(mlen), float64(pos) - min := int(math.Floor(i*n + 0.5)) - max := int(math.Floor((i+1)*n + 0.5)) - sub := info.Partitions[min:max] - if len(sub) > 0 { - res[memberID] = sub + r.topics[name] = &topicInfo{ + MemberIDs: []string{memberID}, + Partitions: nums, } + + return nil } - return res -} -func (info topicInfo) RoundRobin() map[string][]int32 { - sort.Strings(info.MemberIDs) + info.MemberIDs = append(info.MemberIDs, memberID) + return nil +} - mlen := len(info.MemberIDs) - res := make(map[string][]int32, mlen) - for i, pnum := range info.Partitions { - memberID := info.MemberIDs[i%mlen] - res[memberID] = append(res[memberID], pnum) +func (r *balancer) Perform(s Strategy) map[string]map[string][]int32 { + switch s { + case StrategyRoundRobin: + return assignRoundRobin(r.memberIDs, r.topics) + default: + return assignRange(r.memberIDs, r.topics) } - return res } -// -------------------------------------------------------------------- - -type balancer struct { - client sarama.Client - topics map[string]topicInfo -} +func assignRange(_ []string, topics map[string]*topicInfo) map[string]map[string][]int32 { + tlen := len(topics) + res := make(map[string]map[string][]int32) + + for topic, info := range topics { + mlen := len(info.MemberIDs) + plen := len(info.Partitions) + + sort.Strings(info.MemberIDs) + for pos, memberID := range info.MemberIDs { + n, i := float64(plen)/float64(mlen), float64(pos) + min := int(math.Floor(i*n + 0.5)) + max := int(math.Floor((i+1)*n + 0.5)) + sub := info.Partitions[min:max] + if len(sub) <= 0 { + continue + } -func newBalancerFromMeta(client sarama.Client, members map[string]sarama.ConsumerGroupMemberMetadata) (*balancer, error) { - balancer := newBalancer(client) - for memberID, meta := range members { - for _, topic := range meta.Topics { - if err := balancer.Topic(topic, memberID); err != nil { - return nil, err + assigned, ok := res[memberID] + if !ok { + assigned = make(map[string][]int32, tlen) + res[memberID] = assigned } + assigned[topic] = sub } } - return balancer, nil + + return res } -func newBalancer(client sarama.Client) *balancer { - return &balancer{ - client: client, - topics: make(map[string]topicInfo), +func assignRoundRobin(memberIDs []string, topics map[string]*topicInfo) map[string]map[string][]int32 { + sort.Strings(memberIDs) + + r := ring.New(len(memberIDs)) + for i := 0; i < r.Len(); i++ { + r.Value = memberIDs[i] + r = r.Next() } -} -func (r *balancer) Topic(name string, memberID string) error { - topic, ok := r.topics[name] - if !ok { - nums, err := r.client.Partitions(name) - if err != nil { - return err + isSubscribed := func(memberID string, topic string) bool { + info, ok := topics[topic] + if !ok { + return false } - topic = topicInfo{ - Partitions: nums, - MemberIDs: make([]string, 0, 1), + + for _, subscriber := range info.MemberIDs { + if memberID == subscriber { + return true + } } + + return false } - topic.MemberIDs = append(topic.MemberIDs, memberID) - r.topics[name] = topic - return nil -} -func (r *balancer) Perform(s Strategy) map[string]map[string][]int32 { - res := make(map[string]map[string][]int32, 1) - for topic, info := range r.topics { - for memberID, partitions := range info.Perform(s) { - if _, ok := res[memberID]; !ok { - res[memberID] = make(map[string][]int32, 1) + tlen := len(topics) + res := make(map[string]map[string][]int32, r.Len()) + + for topic, info := range topics { + for i := range info.Partitions { + for ; !isSubscribed(r.Value.(string), topic); r = r.Next() { + continue } - res[memberID][topic] = partitions + + memberID := r.Value.(string) + assigned, ok := res[memberID] + if !ok { + assigned = make(map[string][]int32, tlen) + res[memberID] = assigned + } + assigned[topic] = append(assigned[topic], info.Partitions[i]) + r = r.Next() } } + return res } diff --git a/balancer_test.go b/balancer_test.go index 0334c18..4a88be8 100644 --- a/balancer_test.go +++ b/balancer_test.go @@ -56,74 +56,235 @@ var _ = Describe("balancer", func() { }) It("should parse from meta data", func() { + Expect(subject.memberIDs).To(Equal([]string{"a", "b"})) Expect(subject.topics).To(HaveLen(3)) }) - It("should perform", func() { - Expect(subject.Perform(StrategyRange)).To(Equal(map[string]map[string][]int32{ - "a": {"one": {0, 1}, "two": {0, 1, 2}}, - "b": {"one": {2, 3}, "three": {0, 1}}, - })) - - Expect(subject.Perform(StrategyRoundRobin)).To(Equal(map[string]map[string][]int32{ - "a": {"one": {0, 2}, "two": {0, 1, 2}}, - "b": {"one": {1, 3}, "three": {0, 1}}, - })) - }) - }) -var _ = Describe("topicInfo", func() { +var _ = Describe("partition assignment", func() { DescribeTable("Ranges", - func(memberIDs []string, partitions []int32, expected map[string][]int32) { - info := topicInfo{MemberIDs: memberIDs, Partitions: partitions} - Expect(info.Ranges()).To(Equal(expected)) + func(memberIDs []string, topics map[string]*topicInfo, expected map[string]map[string][]int32) { + assignments := assignRange(memberIDs, topics) + Expect(assignments).To(Equal(expected)) }, - Entry("three members, three partitions", []string{"M1", "M2", "M3"}, []int32{0, 1, 2}, map[string][]int32{ - "M1": {0}, "M2": {1}, "M3": {2}, + Entry("three members, three partitions", []string{"M1", "M2", "M3"}, map[string]*topicInfo{ + "t1": &topicInfo{ + Partitions: []int32{0, 1, 2}, + MemberIDs: []string{"M1", "M2", "M3"}, + }, + }, map[string]map[string][]int32{ + "M1": {"t1": {0}}, + "M2": {"t1": {1}}, + "M3": {"t1": {2}}, }), - Entry("member ID order", []string{"M3", "M1", "M2"}, []int32{0, 1, 2}, map[string][]int32{ - "M1": {0}, "M2": {1}, "M3": {2}, + + Entry("member ID order", []string{"M3", "M1", "M2"}, map[string]*topicInfo{ + "t1": &topicInfo{ + Partitions: []int32{0, 1, 2}, + MemberIDs: []string{"M1", "M2", "M3"}, + }, + }, map[string]map[string][]int32{ + "M1": {"t1": {0}}, + "M2": {"t1": {1}}, + "M3": {"t1": {2}}, + }), + + Entry("more members than partitions", []string{"M1", "M2", "M3"}, map[string]*topicInfo{ + "t1": &topicInfo{ + Partitions: []int32{0, 1}, + MemberIDs: []string{"M1", "M2", "M3"}, + }, + }, map[string]map[string][]int32{ + "M1": {"t1": {0}}, + "M3": {"t1": {1}}, + }), + + Entry("far more members than partitions", []string{"M1", "M2", "M3"}, map[string]*topicInfo{ + "t1": &topicInfo{ + Partitions: []int32{0}, + MemberIDs: []string{"M1", "M2", "M3"}, + }, + }, map[string]map[string][]int32{ + "M2": {"t1": {0}}, }), - Entry("more members than partitions", []string{"M1", "M2", "M3"}, []int32{0, 1}, map[string][]int32{ - "M1": {0}, "M3": {1}, + + Entry("fewer members than partitions", []string{"M1", "M2", "M3"}, map[string]*topicInfo{ + "t1": &topicInfo{ + Partitions: []int32{0, 1, 2, 3}, + MemberIDs: []string{"M1", "M2", "M3"}, + }, + }, map[string]map[string][]int32{ + "M1": {"t1": {0}}, + "M2": {"t1": {1, 2}}, + "M3": {"t1": {3}}, }), - Entry("far more members than partitions", []string{"M1", "M2", "M3"}, []int32{0}, map[string][]int32{ - "M2": {0}, + + Entry("uneven members/partitions ratio", []string{"M1", "M2", "M3"}, map[string]*topicInfo{ + "t1": &topicInfo{ + Partitions: []int32{0, 2, 4, 6, 8}, + MemberIDs: []string{"M1", "M2", "M3"}, + }, + }, map[string]map[string][]int32{ + "M1": {"t1": {0, 2}}, + "M2": {"t1": {4}}, + "M3": {"t1": {6, 8}}, }), - Entry("fewer members than partitions", []string{"M1", "M2", "M3"}, []int32{0, 1, 2, 3}, map[string][]int32{ - "M1": {0}, "M2": {1, 2}, "M3": {3}, + + Entry("multiple topics", []string{"M1", "M2", "M3"}, map[string]*topicInfo{ + "t1": &topicInfo{ + Partitions: []int32{0, 1, 2}, + MemberIDs: []string{"M1", "M2", "M3"}, + }, + "t2": &topicInfo{ + Partitions: []int32{0, 1}, + MemberIDs: []string{"M1", "M2", "M3"}, + }, + "t3": &topicInfo{ + Partitions: []int32{0}, + MemberIDs: []string{"M1", "M2", "M3"}, + }, + }, map[string]map[string][]int32{ + "M1": {"t1": {0}, "t2": {0}}, + "M2": {"t1": {1}, "t3": {0}}, + "M3": {"t1": {2}, "t2": {1}}, }), - Entry("uneven members/partitions ratio", []string{"M1", "M2", "M3"}, []int32{0, 2, 4, 6, 8}, map[string][]int32{ - "M1": {0, 2}, "M2": {4}, "M3": {6, 8}, + + Entry("different subscriptions", []string{"M1", "M2", "M3"}, map[string]*topicInfo{ + "t1": &topicInfo{ + Partitions: []int32{0, 1, 2}, + MemberIDs: []string{"M1", "M2", "M3"}, + }, + "t2": &topicInfo{ + Partitions: []int32{0, 1}, + MemberIDs: []string{"M2", "M3"}, + }, + "t3": &topicInfo{ + Partitions: []int32{0}, + MemberIDs: []string{"M1"}, + }, + }, map[string]map[string][]int32{ + "M1": {"t1": {0}, "t3": {0}}, + "M2": {"t1": {1}, "t2": {0}}, + "M3": {"t1": {2}, "t2": {1}}, }), ) DescribeTable("RoundRobin", - func(memberIDs []string, partitions []int32, expected map[string][]int32) { - info := topicInfo{MemberIDs: memberIDs, Partitions: partitions} - Expect(info.RoundRobin()).To(Equal(expected)) + func(memberIDs []string, topics map[string]*topicInfo, expected map[string]map[string][]int32) { + assignments := assignRoundRobin(memberIDs, topics) + Expect(assignments).To(Equal(expected)) }, - Entry("three members, three partitions", []string{"M1", "M2", "M3"}, []int32{0, 1, 2}, map[string][]int32{ - "M1": {0}, "M2": {1}, "M3": {2}, + Entry("three members, three partitions", []string{"M1", "M2", "M3"}, map[string]*topicInfo{ + "t1": &topicInfo{ + Partitions: []int32{0, 1, 2}, + MemberIDs: []string{"M1", "M2", "M3"}, + }, + }, map[string]map[string][]int32{ + "M1": {"t1": {0}}, + "M2": {"t1": {1}}, + "M3": {"t1": {2}}, }), - Entry("member ID order", []string{"M3", "M1", "M2"}, []int32{0, 1, 2}, map[string][]int32{ - "M1": {0}, "M2": {1}, "M3": {2}, + + Entry("member ID order", []string{"M3", "M1", "M2"}, map[string]*topicInfo{ + "t1": &topicInfo{ + Partitions: []int32{0, 1, 2}, + MemberIDs: []string{"M1", "M2", "M3"}, + }, + }, map[string]map[string][]int32{ + "M1": {"t1": {0}}, + "M2": {"t1": {1}}, + "M3": {"t1": {2}}, }), - Entry("more members than partitions", []string{"M1", "M2", "M3"}, []int32{0, 1}, map[string][]int32{ - "M1": {0}, "M2": {1}, + + Entry("more members than partitions", []string{"M1", "M2", "M3"}, map[string]*topicInfo{ + "t1": &topicInfo{ + Partitions: []int32{0, 1}, + MemberIDs: []string{"M1", "M2", "M3"}, + }, + }, map[string]map[string][]int32{ + "M1": {"t1": {0}}, + "M2": {"t1": {1}}, }), - Entry("far more members than partitions", []string{"M1", "M2", "M3"}, []int32{0}, map[string][]int32{ - "M1": {0}, + + Entry("far more members than partitions", []string{"M1", "M2", "M3"}, map[string]*topicInfo{ + "t1": &topicInfo{ + Partitions: []int32{0}, + MemberIDs: []string{"M1", "M2", "M3"}, + }, + }, map[string]map[string][]int32{ + "M1": {"t1": {0}}, }), - Entry("fewer members than partitions", []string{"M1", "M2", "M3"}, []int32{0, 1, 2, 3}, map[string][]int32{ - "M1": {0, 3}, "M2": {1}, "M3": {2}, + + Entry("fewer members than partitions", []string{"M1", "M2", "M3"}, map[string]*topicInfo{ + "t1": &topicInfo{ + Partitions: []int32{0, 1, 2, 3}, + MemberIDs: []string{"M1", "M2", "M3"}, + }, + }, map[string]map[string][]int32{ + "M1": {"t1": {0, 3}}, + "M2": {"t1": {1}}, + "M3": {"t1": {2}}, + }), + + Entry("uneven members/partitions ratio", []string{"M1", "M2", "M3"}, map[string]*topicInfo{ + "t1": &topicInfo{ + Partitions: []int32{0, 2, 4, 6, 8}, + MemberIDs: []string{"M1", "M2", "M3"}, + }, + }, map[string]map[string][]int32{ + "M1": {"t1": {0, 6}}, + "M2": {"t1": {2, 8}}, + "M3": {"t1": {4}}, }), - Entry("uneven members/partitions ratio", []string{"M1", "M2", "M3"}, []int32{0, 2, 4, 6, 8}, map[string][]int32{ - "M1": {0, 6}, "M2": {2, 8}, "M3": {4}, + + Entry("multiple topics", []string{"M1", "M2"}, map[string]*topicInfo{ + "t1": &topicInfo{ + Partitions: []int32{0, 1, 2}, + MemberIDs: []string{"M1", "M2"}, + }, + "t2": &topicInfo{ + Partitions: []int32{0, 1, 2}, + MemberIDs: []string{"M1", "M2"}, + }, + }, map[string]map[string][]int32{ + "M1": { + "t1": {0, 2}, + "t2": {1}, + }, + "M2": { + "t1": {1}, + "t2": {0, 2}, + }, + }), + + Entry("different subscriptions", []string{"M1", "M2", "M3"}, map[string]*topicInfo{ + "t1": &topicInfo{ + Partitions: []int32{0}, + MemberIDs: []string{"M1", "M2", "M3"}, + }, + "t2": &topicInfo{ + Partitions: []int32{0, 1}, + MemberIDs: []string{"M2", "M3"}, + }, + "t3": &topicInfo{ + Partitions: []int32{0, 1, 2}, + MemberIDs: []string{"M3"}, + }, + }, map[string]map[string][]int32{ + "M1": { + "t1": {0}, + }, + "M2": { + "t2": {0}, + }, + "M3": { + "t2": {1}, + "t3": {0, 1, 2}, + }, }), ) From 3a860e6899defc16061e5d1f5defa5c7bf9fde10 Mon Sep 17 00:00:00 2001 From: Kyle Hargraves Date: Fri, 9 Feb 2018 10:41:15 -0600 Subject: [PATCH 2/2] Pluggable partition assignment strategies This incorporates the round robin changes from #216 (a64a8bed) and exposes the ability for users to implement their own partition assignor. The tests for the two assignors now match the Java consumer's, to validate that the behaviors match up as expected. Reviewing the previous tests, I _think_ all of the same scenarios are still covered, as well. Backwards compatibility is maintained by falling back to the value in `PartitionStrategy` if the `PartitionAssignor` is unset. --- balancer.go | 231 ++++++++++++++++++++----------- balancer_test.go | 348 +++++++++++++++++++++-------------------------- config.go | 7 +- consumer.go | 12 +- 4 files changed, 324 insertions(+), 274 deletions(-) diff --git a/balancer.go b/balancer.go index cd05f2d..4f4de77 100644 --- a/balancer.go +++ b/balancer.go @@ -71,96 +71,181 @@ func (n *Notification) success(current map[string][]int32) *Notification { // -------------------------------------------------------------------- -type balancer struct { - client sarama.Client - memberIDs []string - topics map[string]*topicInfo -} +// Assignor is a function which returns specific partition assignments +// given the set of topic subscriptions of a given group. +type Assignor func(subs *Subscriptions, topics []*TopicPartitions) Assignments -type topicInfo struct { +// TopicPartitions identifies a topic and its partition IDs. +type TopicPartitions struct { + Name string Partitions []int32 - MemberIDs []string +} + +// Subscriptions contains information about all members in a consumer +// group, and which topics they have subscribed to. +type Subscriptions struct { + memberIDs []string + subscribers map[string][]string +} + +// NewSubscriptions returns an empty set of subscriptions. +func NewSubscriptions() *Subscriptions { + return &Subscriptions{ + memberIDs: []string{}, + subscribers: map[string][]string{}, + } +} + +// Members returns the list of all member IDs in the group. +func (m *Subscriptions) Members() []string { + return m.memberIDs +} + +// AddSubscriber registers a member as subscribed to a topic. +// Returns self. +func (m *Subscriptions) AddSubscriber(memberID, topic string) *Subscriptions { + seen := false + for i := range m.memberIDs { + if m.memberIDs[i] == memberID { + seen = true + } + } + + if !seen { + m.memberIDs = append(m.memberIDs, memberID) + } + + m.subscribers[topic] = append(m.subscribers[topic], memberID) + return m +} + +// SubscribedMembers returns the full list of members subscribed +// to a topic. +func (m *Subscriptions) SubscribedMembers(topic string) []string { + return m.subscribers[topic] +} + +// IsSubscribed returns true if a member is subscribed to a topic. +func (m *Subscriptions) IsSubscribed(memberID, topic string) bool { + subs, ok := m.subscribers[topic] + if !ok { + return false + } + + for i := range subs { + if subs[i] == memberID { + return true + } + } + + return false +} + +// Assignments is a mapping of member IDs to the topic partitions that they +// have been assigned. +type Assignments map[string]map[string][]int32 + +// NewAssignments returns an empty set of assignments. +func NewAssignments() Assignments { + return map[string]map[string][]int32{} +} + +// Assign adds a partition to the list of a member's assignments. +func (a Assignments) Assign(memberID, topic string, partition int32) { + topics, ok := a[memberID] + if !ok { + topics = map[string][]int32{} + a[memberID] = topics + } + + topics[topic] = append(topics[topic], partition) +} + +type balancer struct { + client sarama.Client + subs *Subscriptions + topics []*TopicPartitions } func newBalancerFromMeta(client sarama.Client, members map[string]sarama.ConsumerGroupMemberMetadata) (*balancer, error) { balancer := &balancer{ - client: client, - memberIDs: make([]string, 0, len(members)), - topics: make(map[string]*topicInfo), + client: client, + subs: NewSubscriptions(), + topics: []*TopicPartitions{}, } + for memberID, meta := range members { - balancer.memberIDs = append(balancer.memberIDs, memberID) for _, topic := range meta.Topics { - if err := balancer.Topic(memberID, topic); err != nil { + balancer.subs.AddSubscriber(memberID, topic) + if err := balancer.AddTopic(topic); err != nil { return nil, err } } } - sort.Strings(balancer.memberIDs) return balancer, nil } -func (r *balancer) Topic(memberID string, name string) error { - info, ok := r.topics[name] - if !ok { - nums, err := r.client.Partitions(name) - if err != nil { - return err - } - - r.topics[name] = &topicInfo{ - MemberIDs: []string{memberID}, - Partitions: nums, +func (r *balancer) AddTopic(name string) error { + for i := range r.topics { + if r.topics[i].Name == name { + return nil } + } - return nil + nums, err := r.client.Partitions(name) + if err != nil { + return err } - info.MemberIDs = append(info.MemberIDs, memberID) + r.topics = append(r.topics, &TopicPartitions{ + Name: name, + Partitions: nums, + }) return nil } -func (r *balancer) Perform(s Strategy) map[string]map[string][]int32 { - switch s { - case StrategyRoundRobin: - return assignRoundRobin(r.memberIDs, r.topics) - default: - return assignRange(r.memberIDs, r.topics) - } +func (r *balancer) Perform(fn Assignor) Assignments { + return fn(r.subs, r.topics) } -func assignRange(_ []string, topics map[string]*topicInfo) map[string]map[string][]int32 { - tlen := len(topics) - res := make(map[string]map[string][]int32) - - for topic, info := range topics { - mlen := len(info.MemberIDs) - plen := len(info.Partitions) - - sort.Strings(info.MemberIDs) - for pos, memberID := range info.MemberIDs { +// RangeAssignor assigns partitions to subscribed group members by +// dividing the number of partitions, per-topic, by the number of +// consumers to determine the number of partitions per consumer that +// should be assigned. If the value does not evenly divide, consumers +// lexicographically earlier will be assigned an extra partition. +func RangeAssignor(subs *Subscriptions, topics []*TopicPartitions) Assignments { + assignments := NewAssignments() + + sort.Slice(topics, func(i, j int) bool { return topics[i].Name < topics[j].Name }) + for _, tp := range topics { + members := subs.SubscribedMembers(tp.Name) + mlen := len(members) + plen := len(tp.Partitions) + + sort.Strings(members) + for pos, memberID := range members { n, i := float64(plen)/float64(mlen), float64(pos) min := int(math.Floor(i*n + 0.5)) max := int(math.Floor((i+1)*n + 0.5)) - sub := info.Partitions[min:max] - if len(sub) <= 0 { - continue + sub := tp.Partitions[min:max] + for i := range sub { + assignments.Assign(memberID, tp.Name, sub[i]) } - - assigned, ok := res[memberID] - if !ok { - assigned = make(map[string][]int32, tlen) - res[memberID] = assigned - } - assigned[topic] = sub } } - return res + return assignments } -func assignRoundRobin(memberIDs []string, topics map[string]*topicInfo) map[string]map[string][]int32 { +// RoundRobinAssignor assigns partitions by iterating through the +// list of group members and assigning one to each consumer until +// all partitions have been assigned. If a group member is not +// subscribed to a topic, the next subscribed member is assigned +// instead. +func RoundRobinAssignor(subs *Subscriptions, topics []*TopicPartitions) Assignments { + assignments := NewAssignments() + memberIDs := subs.Members() sort.Strings(memberIDs) r := ring.New(len(memberIDs)) @@ -169,40 +254,22 @@ func assignRoundRobin(memberIDs []string, topics map[string]*topicInfo) map[stri r = r.Next() } - isSubscribed := func(memberID string, topic string) bool { - info, ok := topics[topic] - if !ok { - return false - } - - for _, subscriber := range info.MemberIDs { - if memberID == subscriber { - return true - } + sort.Slice(topics, func(i, j int) bool { return topics[i].Name < topics[j].Name }) + for _, tp := range topics { + if len(subs.SubscribedMembers(tp.Name)) == 0 { + continue } - return false - } - - tlen := len(topics) - res := make(map[string]map[string][]int32, r.Len()) - - for topic, info := range topics { - for i := range info.Partitions { - for ; !isSubscribed(r.Value.(string), topic); r = r.Next() { + partitions := tp.Partitions + for i := range partitions { + for ; !subs.IsSubscribed(r.Value.(string), tp.Name); r = r.Next() { continue } - memberID := r.Value.(string) - assigned, ok := res[memberID] - if !ok { - assigned = make(map[string][]int32, tlen) - res[memberID] = assigned - } - assigned[topic] = append(assigned[topic], info.Partitions[i]) + assignments.Assign(r.Value.(string), tp.Name, partitions[i]) r = r.Next() } } - return res + return assignments } diff --git a/balancer_test.go b/balancer_test.go index 4a88be8..a650927 100644 --- a/balancer_test.go +++ b/balancer_test.go @@ -55,237 +55,205 @@ var _ = Describe("balancer", func() { Expect(err).NotTo(HaveOccurred()) }) - It("should parse from meta data", func() { - Expect(subject.memberIDs).To(Equal([]string{"a", "b"})) + It("should track topic partitions", func() { Expect(subject.topics).To(HaveLen(3)) }) -}) + It("should track group member subscriptions", func() { + subs := subject.subs + Expect(subs.Members()).To(ConsistOf([]string{"a", "b"})) + Expect(subs.SubscribedMembers("one")).To(ConsistOf([]string{"a", "b"})) + Expect(subs.SubscribedMembers("two")).To(Equal([]string{"a"})) + Expect(subs.SubscribedMembers("three")).To(Equal([]string{"b"})) + }) -var _ = Describe("partition assignment", func() { +}) - DescribeTable("Ranges", - func(memberIDs []string, topics map[string]*topicInfo, expected map[string]map[string][]int32) { - assignments := assignRange(memberIDs, topics) - Expect(assignments).To(Equal(expected)) +var _ = Describe("Assignors", func() { + DescribeTable("Range", + func(subs submap, topics topicparts, expected Assignments) { + result := RangeAssignor(subs.Subscriptions(), topics.TopicPartitions()) + Expect(result).To(Equal(expected)) }, - Entry("three members, three partitions", []string{"M1", "M2", "M3"}, map[string]*topicInfo{ - "t1": &topicInfo{ - Partitions: []int32{0, 1, 2}, - MemberIDs: []string{"M1", "M2", "M3"}, - }, - }, map[string]map[string][]int32{ - "M1": {"t1": {0}}, - "M2": {"t1": {1}}, - "M3": {"t1": {2}}, + Entry("one consumer, no topics", submap{"m0": {"t0"}}, topicparts{}, Assignments{}), + Entry("one consumer, no such topic", submap{"m0": {"t1"}}, topicparts{"t0": {0}}, Assignments{}), + + Entry("one consumer, one topic", submap{"m0": {"t0"}}, topicparts{"t0": {0, 1, 2}}, Assignments{ + "m0": {"t0": {0, 1, 2}}, }), - Entry("member ID order", []string{"M3", "M1", "M2"}, map[string]*topicInfo{ - "t1": &topicInfo{ - Partitions: []int32{0, 1, 2}, - MemberIDs: []string{"M1", "M2", "M3"}, - }, - }, map[string]map[string][]int32{ - "M1": {"t1": {0}}, - "M2": {"t1": {1}}, - "M3": {"t1": {2}}, + Entry("one consumer, two topics, one subscribed", submap{"m0": {"t0"}}, topicparts{ + "t0": {0, 1, 2}, + "t1": {0, 1, 2}, + }, Assignments{ + "m0": {"t0": {0, 1, 2}}, }), - Entry("more members than partitions", []string{"M1", "M2", "M3"}, map[string]*topicInfo{ - "t1": &topicInfo{ - Partitions: []int32{0, 1}, - MemberIDs: []string{"M1", "M2", "M3"}, - }, - }, map[string]map[string][]int32{ - "M1": {"t1": {0}}, - "M3": {"t1": {1}}, + Entry("one consumer, multiple topics", submap{"m0": {"t0", "t1"}}, topicparts{ + "t0": {0, 1, 2}, + "t1": {0, 1, 2}, + }, Assignments{ + "m0": {"t0": {0, 1, 2}, "t1": {0, 1, 2}}, }), - Entry("far more members than partitions", []string{"M1", "M2", "M3"}, map[string]*topicInfo{ - "t1": &topicInfo{ - Partitions: []int32{0}, - MemberIDs: []string{"M1", "M2", "M3"}, - }, - }, map[string]map[string][]int32{ - "M2": {"t1": {0}}, + Entry("two consumers, one topic, one partition", submap{ + "m0": {"t0"}, + "m1": {"t0"}, + }, topicparts{ + "t0": {0}, + }, Assignments{ + "m0": {"t0": {0}}, }), - Entry("fewer members than partitions", []string{"M1", "M2", "M3"}, map[string]*topicInfo{ - "t1": &topicInfo{ - Partitions: []int32{0, 1, 2, 3}, - MemberIDs: []string{"M1", "M2", "M3"}, - }, - }, map[string]map[string][]int32{ - "M1": {"t1": {0}}, - "M2": {"t1": {1, 2}}, - "M3": {"t1": {3}}, + Entry("two consumers, one topic, two partitions", submap{ + "m0": {"t0"}, + "m1": {"t0"}, + }, topicparts{ + "t0": {0, 1}, + }, Assignments{ + "m0": {"t0": {0}}, + "m1": {"t0": {1}}, }), - Entry("uneven members/partitions ratio", []string{"M1", "M2", "M3"}, map[string]*topicInfo{ - "t1": &topicInfo{ - Partitions: []int32{0, 2, 4, 6, 8}, - MemberIDs: []string{"M1", "M2", "M3"}, - }, - }, map[string]map[string][]int32{ - "M1": {"t1": {0, 2}}, - "M2": {"t1": {4}}, - "M3": {"t1": {6, 8}}, + Entry("multiple consumers, mixed topics", submap{ + "m0": {"t0"}, + "m1": {"t0", "t1"}, + "m2": {"t0"}, + }, topicparts{ + "t0": {0, 1, 2}, + "t1": {0, 1}, + }, Assignments{ + "m0": {"t0": {0}}, + "m1": {"t0": {1}, "t1": {0, 1}}, + "m2": {"t0": {2}}, }), - Entry("multiple topics", []string{"M1", "M2", "M3"}, map[string]*topicInfo{ - "t1": &topicInfo{ - Partitions: []int32{0, 1, 2}, - MemberIDs: []string{"M1", "M2", "M3"}, - }, - "t2": &topicInfo{ - Partitions: []int32{0, 1}, - MemberIDs: []string{"M1", "M2", "M3"}, - }, - "t3": &topicInfo{ - Partitions: []int32{0}, - MemberIDs: []string{"M1", "M2", "M3"}, - }, - }, map[string]map[string][]int32{ - "M1": {"t1": {0}, "t2": {0}}, - "M2": {"t1": {1}, "t3": {0}}, - "M3": {"t1": {2}, "t2": {1}}, + Entry("two consumers, two topics, six partitions", submap{ + "m0": {"t0", "t1"}, + "m1": {"t0", "t1"}, + }, topicparts{ + "t0": {0, 1, 2}, + "t1": {0, 1, 2}, + }, Assignments{ + "m0": {"t0": {0, 1}, "t1": {0, 1}}, + "m1": {"t0": {2}, "t1": {2}}, }), - Entry("different subscriptions", []string{"M1", "M2", "M3"}, map[string]*topicInfo{ - "t1": &topicInfo{ - Partitions: []int32{0, 1, 2}, - MemberIDs: []string{"M1", "M2", "M3"}, - }, - "t2": &topicInfo{ - Partitions: []int32{0, 1}, - MemberIDs: []string{"M2", "M3"}, - }, - "t3": &topicInfo{ - Partitions: []int32{0}, - MemberIDs: []string{"M1"}, - }, - }, map[string]map[string][]int32{ - "M1": {"t1": {0}, "t3": {0}}, - "M2": {"t1": {1}, "t2": {0}}, - "M3": {"t1": {2}, "t2": {1}}, + Entry("heavily uneven partition counts", submap{ + "m0": {"t0", "t1", "t2"}, + "m1": {"t0", "t1", "t2"}, + }, topicparts{ + "t0": {0, 1, 2, 3, 4}, + "t1": {0, 1}, + "t2": {0}, + }, Assignments{ + "m0": {"t0": {0, 1, 2}, "t1": {0}, "t2": {0}}, + "m1": {"t0": {3, 4}, "t1": {1}}, }), ) DescribeTable("RoundRobin", - func(memberIDs []string, topics map[string]*topicInfo, expected map[string]map[string][]int32) { - assignments := assignRoundRobin(memberIDs, topics) - Expect(assignments).To(Equal(expected)) + func(subs submap, topics topicparts, expected Assignments) { + result := RoundRobinAssignor(subs.Subscriptions(), topics.TopicPartitions()) + Expect(result).To(Equal(expected)) }, - Entry("three members, three partitions", []string{"M1", "M2", "M3"}, map[string]*topicInfo{ - "t1": &topicInfo{ - Partitions: []int32{0, 1, 2}, - MemberIDs: []string{"M1", "M2", "M3"}, - }, - }, map[string]map[string][]int32{ - "M1": {"t1": {0}}, - "M2": {"t1": {1}}, - "M3": {"t1": {2}}, + Entry("one consumer, no topics", submap{"m0": {"t0"}}, topicparts{}, Assignments{}), + Entry("one consumer, no such topic", submap{"m0": {"t1"}}, topicparts{"t0": {0}}, Assignments{}), + + Entry("one consumer, one topic", submap{"m0": {"t0"}}, topicparts{"t0": {0, 1, 2}}, Assignments{ + "m0": {"t0": {0, 1, 2}}, }), - Entry("member ID order", []string{"M3", "M1", "M2"}, map[string]*topicInfo{ - "t1": &topicInfo{ - Partitions: []int32{0, 1, 2}, - MemberIDs: []string{"M1", "M2", "M3"}, - }, - }, map[string]map[string][]int32{ - "M1": {"t1": {0}}, - "M2": {"t1": {1}}, - "M3": {"t1": {2}}, + Entry("one consumer, two topics, one subscribed", submap{"m0": {"t0"}}, topicparts{ + "t0": {0, 1, 2}, + "t1": {0, 1, 2}, + }, Assignments{ + "m0": {"t0": {0, 1, 2}}, }), - Entry("more members than partitions", []string{"M1", "M2", "M3"}, map[string]*topicInfo{ - "t1": &topicInfo{ - Partitions: []int32{0, 1}, - MemberIDs: []string{"M1", "M2", "M3"}, - }, - }, map[string]map[string][]int32{ - "M1": {"t1": {0}}, - "M2": {"t1": {1}}, + Entry("one consumer, multiple topics", submap{"m0": {"t0", "t1"}}, topicparts{ + "t0": {0, 1, 2}, + "t1": {0, 1, 2}, + }, Assignments{ + "m0": {"t0": {0, 1, 2}, "t1": {0, 1, 2}}, }), - Entry("far more members than partitions", []string{"M1", "M2", "M3"}, map[string]*topicInfo{ - "t1": &topicInfo{ - Partitions: []int32{0}, - MemberIDs: []string{"M1", "M2", "M3"}, - }, - }, map[string]map[string][]int32{ - "M1": {"t1": {0}}, + Entry("two consumers, one topic, two partitions", submap{ + "m0": {"t0"}, + "m1": {"t0"}, + }, topicparts{ + "t0": {0, 1}, + }, Assignments{ + "m0": {"t0": {0}}, + "m1": {"t0": {1}}, }), - Entry("fewer members than partitions", []string{"M1", "M2", "M3"}, map[string]*topicInfo{ - "t1": &topicInfo{ - Partitions: []int32{0, 1, 2, 3}, - MemberIDs: []string{"M1", "M2", "M3"}, - }, - }, map[string]map[string][]int32{ - "M1": {"t1": {0, 3}}, - "M2": {"t1": {1}}, - "M3": {"t1": {2}}, + Entry("two consumers, one topic, one partition", submap{ + "m0": {"t0"}, + "m1": {"t0"}, + }, topicparts{ + "t0": {0}, + }, Assignments{ + "m0": {"t0": {0}}, }), - Entry("uneven members/partitions ratio", []string{"M1", "M2", "M3"}, map[string]*topicInfo{ - "t1": &topicInfo{ - Partitions: []int32{0, 2, 4, 6, 8}, - MemberIDs: []string{"M1", "M2", "M3"}, - }, - }, map[string]map[string][]int32{ - "M1": {"t1": {0, 6}}, - "M2": {"t1": {2, 8}}, - "M3": {"t1": {4}}, + Entry("multiple consumers, mixed topics", submap{ + "m0": {"t0"}, + "m1": {"t0", "t1"}, + "m2": {"t0"}, + }, topicparts{ + "t0": {0, 1, 2}, + "t1": {0, 1}, + }, Assignments{ + "m0": {"t0": {0}}, + "m1": {"t0": {1}, "t1": {0, 1}}, + "m2": {"t0": {2}}, }), - Entry("multiple topics", []string{"M1", "M2"}, map[string]*topicInfo{ - "t1": &topicInfo{ - Partitions: []int32{0, 1, 2}, - MemberIDs: []string{"M1", "M2"}, - }, - "t2": &topicInfo{ - Partitions: []int32{0, 1, 2}, - MemberIDs: []string{"M1", "M2"}, - }, - }, map[string]map[string][]int32{ - "M1": { - "t1": {0, 2}, - "t2": {1}, - }, - "M2": { - "t1": {1}, - "t2": {0, 2}, - }, + Entry("two consumers, two topics, six partitions", submap{ + "m0": {"t0", "t1"}, + "m1": {"t0", "t1"}, + }, topicparts{ + "t0": {0, 1, 2}, + "t1": {0, 1, 2}, + }, Assignments{ + "m0": {"t0": {0, 2}, "t1": {1}}, + "m1": {"t0": {1}, "t1": {0, 2}}, }), - Entry("different subscriptions", []string{"M1", "M2", "M3"}, map[string]*topicInfo{ - "t1": &topicInfo{ - Partitions: []int32{0}, - MemberIDs: []string{"M1", "M2", "M3"}, - }, - "t2": &topicInfo{ - Partitions: []int32{0, 1}, - MemberIDs: []string{"M2", "M3"}, - }, - "t3": &topicInfo{ - Partitions: []int32{0, 1, 2}, - MemberIDs: []string{"M3"}, - }, - }, map[string]map[string][]int32{ - "M1": { - "t1": {0}, - }, - "M2": { - "t2": {0}, - }, - "M3": { - "t2": {1}, - "t3": {0, 1, 2}, - }, + Entry("heavily uneven partition counts", submap{ + "m0": {"t0", "t1", "t2"}, + "m1": {"t0", "t1", "t2"}, + }, topicparts{ + "t0": {0, 1, 2, 3, 4}, + "t1": {0, 1}, + "t2": {0}, + }, Assignments{ + "m0": {"t0": {0, 2, 4}, "t1": {1}}, + "m1": {"t0": {1, 3}, "t1": {0}, "t2": {0}}, }), ) - }) + +type submap map[string][]string +type topicparts map[string][]int32 + +func (s submap) Subscriptions() *Subscriptions { + subs := NewSubscriptions() + for memberID, topics := range s { + for _, topic := range topics { + subs.AddSubscriber(memberID, topic) + } + } + return subs +} + +func (t topicparts) TopicPartitions() []*TopicPartitions { + tps := []*TopicPartitions{} + for topic, partitions := range t { + tps = append(tps, &TopicPartitions{Name: topic, Partitions: partitions}) + } + return tps +} diff --git a/config.go b/config.go index 084b835..205d8a2 100644 --- a/config.go +++ b/config.go @@ -23,7 +23,12 @@ type Config struct { // Group is the namespace for group management properties Group struct { - // The strategy to use for the allocation of partitions to consumers (defaults to StrategyRange) + // The assignor implementation to use for the allocation of partitions to consumers. + // Defaults to RangeAssignor. + PartitionAssignor Assignor + + // The strategy to use for the allocation of partitions to consumers. Defaults to + // StrategyRange. If PartitionAssignor is set, it takes precedence over this option. PartitionStrategy Strategy // By default, messages and errors from the subscribed topics and partitions are all multiplexed and diff --git a/consumer.go b/consumer.go index e7a67da..0ff438e 100644 --- a/consumer.go +++ b/consumer.go @@ -700,8 +700,18 @@ func (c *Consumer) syncGroup(strategy *balancer) (map[string][]int32, error) { GenerationId: generationID, } + assignor := c.client.config.Group.PartitionAssignor + if assignor == nil { + switch c.client.config.Group.PartitionStrategy { + case StrategyRoundRobin: + assignor = RoundRobinAssignor + default: + assignor = RangeAssignor + } + } + if strategy != nil { - for memberID, topics := range strategy.Perform(c.client.config.Group.PartitionStrategy) { + for memberID, topics := range strategy.Perform(assignor) { if err := req.AddGroupAssignmentMember(memberID, &sarama.ConsumerGroupMemberAssignment{ Topics: topics, }); err != nil {