diff --git a/balancer.go b/balancer.go index 3aeaece..4f4de77 100644 --- a/balancer.go +++ b/balancer.go @@ -1,6 +1,7 @@ package cluster import ( + "container/ring" "math" "sort" @@ -70,101 +71,205 @@ func (n *Notification) success(current map[string][]int32) *Notification { // -------------------------------------------------------------------- -type topicInfo struct { +// Assignor is a function which returns specific partition assignments +// given the set of topic subscriptions of a given group. +type Assignor func(subs *Subscriptions, topics []*TopicPartitions) Assignments + +// TopicPartitions identifies a topic and its partition IDs. +type TopicPartitions struct { + Name string Partitions []int32 - MemberIDs []string } -func (info topicInfo) Perform(s Strategy) map[string][]int32 { - if s == StrategyRoundRobin { - return info.RoundRobin() - } - return info.Ranges() +// Subscriptions contains information about all members in a consumer +// group, and which topics they have subscribed to. +type Subscriptions struct { + memberIDs []string + subscribers map[string][]string } -func (info topicInfo) Ranges() map[string][]int32 { - sort.Strings(info.MemberIDs) +// NewSubscriptions returns an empty set of subscriptions. +func NewSubscriptions() *Subscriptions { + return &Subscriptions{ + memberIDs: []string{}, + subscribers: map[string][]string{}, + } +} - mlen := len(info.MemberIDs) - plen := len(info.Partitions) - res := make(map[string][]int32, mlen) +// Members returns the list of all member IDs in the group. +func (m *Subscriptions) Members() []string { + return m.memberIDs +} - for pos, memberID := range info.MemberIDs { - n, i := float64(plen)/float64(mlen), float64(pos) - min := int(math.Floor(i*n + 0.5)) - max := int(math.Floor((i+1)*n + 0.5)) - sub := info.Partitions[min:max] - if len(sub) > 0 { - res[memberID] = sub +// AddSubscriber registers a member as subscribed to a topic. +// Returns self. +func (m *Subscriptions) AddSubscriber(memberID, topic string) *Subscriptions { + seen := false + for i := range m.memberIDs { + if m.memberIDs[i] == memberID { + seen = true } } - return res + + if !seen { + m.memberIDs = append(m.memberIDs, memberID) + } + + m.subscribers[topic] = append(m.subscribers[topic], memberID) + return m +} + +// SubscribedMembers returns the full list of members subscribed +// to a topic. +func (m *Subscriptions) SubscribedMembers(topic string) []string { + return m.subscribers[topic] } -func (info topicInfo) RoundRobin() map[string][]int32 { - sort.Strings(info.MemberIDs) +// IsSubscribed returns true if a member is subscribed to a topic. +func (m *Subscriptions) IsSubscribed(memberID, topic string) bool { + subs, ok := m.subscribers[topic] + if !ok { + return false + } - mlen := len(info.MemberIDs) - res := make(map[string][]int32, mlen) - for i, pnum := range info.Partitions { - memberID := info.MemberIDs[i%mlen] - res[memberID] = append(res[memberID], pnum) + for i := range subs { + if subs[i] == memberID { + return true + } } - return res + + return false } -// -------------------------------------------------------------------- +// Assignments is a mapping of member IDs to the topic partitions that they +// have been assigned. +type Assignments map[string]map[string][]int32 + +// NewAssignments returns an empty set of assignments. +func NewAssignments() Assignments { + return map[string]map[string][]int32{} +} + +// Assign adds a partition to the list of a member's assignments. +func (a Assignments) Assign(memberID, topic string, partition int32) { + topics, ok := a[memberID] + if !ok { + topics = map[string][]int32{} + a[memberID] = topics + } + + topics[topic] = append(topics[topic], partition) +} type balancer struct { client sarama.Client - topics map[string]topicInfo + subs *Subscriptions + topics []*TopicPartitions } func newBalancerFromMeta(client sarama.Client, members map[string]sarama.ConsumerGroupMemberMetadata) (*balancer, error) { - balancer := newBalancer(client) + balancer := &balancer{ + client: client, + subs: NewSubscriptions(), + topics: []*TopicPartitions{}, + } + for memberID, meta := range members { for _, topic := range meta.Topics { - if err := balancer.Topic(topic, memberID); err != nil { + balancer.subs.AddSubscriber(memberID, topic) + if err := balancer.AddTopic(topic); err != nil { return nil, err } } } + return balancer, nil } -func newBalancer(client sarama.Client) *balancer { - return &balancer{ - client: client, - topics: make(map[string]topicInfo), +func (r *balancer) AddTopic(name string) error { + for i := range r.topics { + if r.topics[i].Name == name { + return nil + } + } + + nums, err := r.client.Partitions(name) + if err != nil { + return err } + + r.topics = append(r.topics, &TopicPartitions{ + Name: name, + Partitions: nums, + }) + return nil } -func (r *balancer) Topic(name string, memberID string) error { - topic, ok := r.topics[name] - if !ok { - nums, err := r.client.Partitions(name) - if err != nil { - return err - } - topic = topicInfo{ - Partitions: nums, - MemberIDs: make([]string, 0, 1), +func (r *balancer) Perform(fn Assignor) Assignments { + return fn(r.subs, r.topics) +} + +// RangeAssignor assigns partitions to subscribed group members by +// dividing the number of partitions, per-topic, by the number of +// consumers to determine the number of partitions per consumer that +// should be assigned. If the value does not evenly divide, consumers +// lexicographically earlier will be assigned an extra partition. +func RangeAssignor(subs *Subscriptions, topics []*TopicPartitions) Assignments { + assignments := NewAssignments() + + sort.Slice(topics, func(i, j int) bool { return topics[i].Name < topics[j].Name }) + for _, tp := range topics { + members := subs.SubscribedMembers(tp.Name) + mlen := len(members) + plen := len(tp.Partitions) + + sort.Strings(members) + for pos, memberID := range members { + n, i := float64(plen)/float64(mlen), float64(pos) + min := int(math.Floor(i*n + 0.5)) + max := int(math.Floor((i+1)*n + 0.5)) + sub := tp.Partitions[min:max] + for i := range sub { + assignments.Assign(memberID, tp.Name, sub[i]) + } } } - topic.MemberIDs = append(topic.MemberIDs, memberID) - r.topics[name] = topic - return nil + + return assignments } -func (r *balancer) Perform(s Strategy) map[string]map[string][]int32 { - res := make(map[string]map[string][]int32, 1) - for topic, info := range r.topics { - for memberID, partitions := range info.Perform(s) { - if _, ok := res[memberID]; !ok { - res[memberID] = make(map[string][]int32, 1) +// RoundRobinAssignor assigns partitions by iterating through the +// list of group members and assigning one to each consumer until +// all partitions have been assigned. If a group member is not +// subscribed to a topic, the next subscribed member is assigned +// instead. +func RoundRobinAssignor(subs *Subscriptions, topics []*TopicPartitions) Assignments { + assignments := NewAssignments() + memberIDs := subs.Members() + sort.Strings(memberIDs) + + r := ring.New(len(memberIDs)) + for i := 0; i < r.Len(); i++ { + r.Value = memberIDs[i] + r = r.Next() + } + + sort.Slice(topics, func(i, j int) bool { return topics[i].Name < topics[j].Name }) + for _, tp := range topics { + if len(subs.SubscribedMembers(tp.Name)) == 0 { + continue + } + + partitions := tp.Partitions + for i := range partitions { + for ; !subs.IsSubscribed(r.Value.(string), tp.Name); r = r.Next() { + continue } - res[memberID][topic] = partitions + + assignments.Assign(r.Value.(string), tp.Name, partitions[i]) + r = r.Next() } } - return res + + return assignments } diff --git a/balancer_test.go b/balancer_test.go index 0334c18..a650927 100644 --- a/balancer_test.go +++ b/balancer_test.go @@ -55,76 +55,205 @@ var _ = Describe("balancer", func() { Expect(err).NotTo(HaveOccurred()) }) - It("should parse from meta data", func() { + It("should track topic partitions", func() { Expect(subject.topics).To(HaveLen(3)) }) - It("should perform", func() { - Expect(subject.Perform(StrategyRange)).To(Equal(map[string]map[string][]int32{ - "a": {"one": {0, 1}, "two": {0, 1, 2}}, - "b": {"one": {2, 3}, "three": {0, 1}}, - })) - - Expect(subject.Perform(StrategyRoundRobin)).To(Equal(map[string]map[string][]int32{ - "a": {"one": {0, 2}, "two": {0, 1, 2}}, - "b": {"one": {1, 3}, "three": {0, 1}}, - })) + It("should track group member subscriptions", func() { + subs := subject.subs + Expect(subs.Members()).To(ConsistOf([]string{"a", "b"})) + Expect(subs.SubscribedMembers("one")).To(ConsistOf([]string{"a", "b"})) + Expect(subs.SubscribedMembers("two")).To(Equal([]string{"a"})) + Expect(subs.SubscribedMembers("three")).To(Equal([]string{"b"})) }) }) -var _ = Describe("topicInfo", func() { - - DescribeTable("Ranges", - func(memberIDs []string, partitions []int32, expected map[string][]int32) { - info := topicInfo{MemberIDs: memberIDs, Partitions: partitions} - Expect(info.Ranges()).To(Equal(expected)) +var _ = Describe("Assignors", func() { + DescribeTable("Range", + func(subs submap, topics topicparts, expected Assignments) { + result := RangeAssignor(subs.Subscriptions(), topics.TopicPartitions()) + Expect(result).To(Equal(expected)) }, - Entry("three members, three partitions", []string{"M1", "M2", "M3"}, []int32{0, 1, 2}, map[string][]int32{ - "M1": {0}, "M2": {1}, "M3": {2}, + Entry("one consumer, no topics", submap{"m0": {"t0"}}, topicparts{}, Assignments{}), + Entry("one consumer, no such topic", submap{"m0": {"t1"}}, topicparts{"t0": {0}}, Assignments{}), + + Entry("one consumer, one topic", submap{"m0": {"t0"}}, topicparts{"t0": {0, 1, 2}}, Assignments{ + "m0": {"t0": {0, 1, 2}}, }), - Entry("member ID order", []string{"M3", "M1", "M2"}, []int32{0, 1, 2}, map[string][]int32{ - "M1": {0}, "M2": {1}, "M3": {2}, + + Entry("one consumer, two topics, one subscribed", submap{"m0": {"t0"}}, topicparts{ + "t0": {0, 1, 2}, + "t1": {0, 1, 2}, + }, Assignments{ + "m0": {"t0": {0, 1, 2}}, }), - Entry("more members than partitions", []string{"M1", "M2", "M3"}, []int32{0, 1}, map[string][]int32{ - "M1": {0}, "M3": {1}, + + Entry("one consumer, multiple topics", submap{"m0": {"t0", "t1"}}, topicparts{ + "t0": {0, 1, 2}, + "t1": {0, 1, 2}, + }, Assignments{ + "m0": {"t0": {0, 1, 2}, "t1": {0, 1, 2}}, }), - Entry("far more members than partitions", []string{"M1", "M2", "M3"}, []int32{0}, map[string][]int32{ - "M2": {0}, + + Entry("two consumers, one topic, one partition", submap{ + "m0": {"t0"}, + "m1": {"t0"}, + }, topicparts{ + "t0": {0}, + }, Assignments{ + "m0": {"t0": {0}}, }), - Entry("fewer members than partitions", []string{"M1", "M2", "M3"}, []int32{0, 1, 2, 3}, map[string][]int32{ - "M1": {0}, "M2": {1, 2}, "M3": {3}, + + Entry("two consumers, one topic, two partitions", submap{ + "m0": {"t0"}, + "m1": {"t0"}, + }, topicparts{ + "t0": {0, 1}, + }, Assignments{ + "m0": {"t0": {0}}, + "m1": {"t0": {1}}, + }), + + Entry("multiple consumers, mixed topics", submap{ + "m0": {"t0"}, + "m1": {"t0", "t1"}, + "m2": {"t0"}, + }, topicparts{ + "t0": {0, 1, 2}, + "t1": {0, 1}, + }, Assignments{ + "m0": {"t0": {0}}, + "m1": {"t0": {1}, "t1": {0, 1}}, + "m2": {"t0": {2}}, }), - Entry("uneven members/partitions ratio", []string{"M1", "M2", "M3"}, []int32{0, 2, 4, 6, 8}, map[string][]int32{ - "M1": {0, 2}, "M2": {4}, "M3": {6, 8}, + + Entry("two consumers, two topics, six partitions", submap{ + "m0": {"t0", "t1"}, + "m1": {"t0", "t1"}, + }, topicparts{ + "t0": {0, 1, 2}, + "t1": {0, 1, 2}, + }, Assignments{ + "m0": {"t0": {0, 1}, "t1": {0, 1}}, + "m1": {"t0": {2}, "t1": {2}}, + }), + + Entry("heavily uneven partition counts", submap{ + "m0": {"t0", "t1", "t2"}, + "m1": {"t0", "t1", "t2"}, + }, topicparts{ + "t0": {0, 1, 2, 3, 4}, + "t1": {0, 1}, + "t2": {0}, + }, Assignments{ + "m0": {"t0": {0, 1, 2}, "t1": {0}, "t2": {0}}, + "m1": {"t0": {3, 4}, "t1": {1}}, }), ) DescribeTable("RoundRobin", - func(memberIDs []string, partitions []int32, expected map[string][]int32) { - info := topicInfo{MemberIDs: memberIDs, Partitions: partitions} - Expect(info.RoundRobin()).To(Equal(expected)) + func(subs submap, topics topicparts, expected Assignments) { + result := RoundRobinAssignor(subs.Subscriptions(), topics.TopicPartitions()) + Expect(result).To(Equal(expected)) }, - Entry("three members, three partitions", []string{"M1", "M2", "M3"}, []int32{0, 1, 2}, map[string][]int32{ - "M1": {0}, "M2": {1}, "M3": {2}, + Entry("one consumer, no topics", submap{"m0": {"t0"}}, topicparts{}, Assignments{}), + Entry("one consumer, no such topic", submap{"m0": {"t1"}}, topicparts{"t0": {0}}, Assignments{}), + + Entry("one consumer, one topic", submap{"m0": {"t0"}}, topicparts{"t0": {0, 1, 2}}, Assignments{ + "m0": {"t0": {0, 1, 2}}, }), - Entry("member ID order", []string{"M3", "M1", "M2"}, []int32{0, 1, 2}, map[string][]int32{ - "M1": {0}, "M2": {1}, "M3": {2}, + + Entry("one consumer, two topics, one subscribed", submap{"m0": {"t0"}}, topicparts{ + "t0": {0, 1, 2}, + "t1": {0, 1, 2}, + }, Assignments{ + "m0": {"t0": {0, 1, 2}}, }), - Entry("more members than partitions", []string{"M1", "M2", "M3"}, []int32{0, 1}, map[string][]int32{ - "M1": {0}, "M2": {1}, + + Entry("one consumer, multiple topics", submap{"m0": {"t0", "t1"}}, topicparts{ + "t0": {0, 1, 2}, + "t1": {0, 1, 2}, + }, Assignments{ + "m0": {"t0": {0, 1, 2}, "t1": {0, 1, 2}}, }), - Entry("far more members than partitions", []string{"M1", "M2", "M3"}, []int32{0}, map[string][]int32{ - "M1": {0}, + + Entry("two consumers, one topic, two partitions", submap{ + "m0": {"t0"}, + "m1": {"t0"}, + }, topicparts{ + "t0": {0, 1}, + }, Assignments{ + "m0": {"t0": {0}}, + "m1": {"t0": {1}}, }), - Entry("fewer members than partitions", []string{"M1", "M2", "M3"}, []int32{0, 1, 2, 3}, map[string][]int32{ - "M1": {0, 3}, "M2": {1}, "M3": {2}, + + Entry("two consumers, one topic, one partition", submap{ + "m0": {"t0"}, + "m1": {"t0"}, + }, topicparts{ + "t0": {0}, + }, Assignments{ + "m0": {"t0": {0}}, }), - Entry("uneven members/partitions ratio", []string{"M1", "M2", "M3"}, []int32{0, 2, 4, 6, 8}, map[string][]int32{ - "M1": {0, 6}, "M2": {2, 8}, "M3": {4}, + + Entry("multiple consumers, mixed topics", submap{ + "m0": {"t0"}, + "m1": {"t0", "t1"}, + "m2": {"t0"}, + }, topicparts{ + "t0": {0, 1, 2}, + "t1": {0, 1}, + }, Assignments{ + "m0": {"t0": {0}}, + "m1": {"t0": {1}, "t1": {0, 1}}, + "m2": {"t0": {2}}, + }), + + Entry("two consumers, two topics, six partitions", submap{ + "m0": {"t0", "t1"}, + "m1": {"t0", "t1"}, + }, topicparts{ + "t0": {0, 1, 2}, + "t1": {0, 1, 2}, + }, Assignments{ + "m0": {"t0": {0, 2}, "t1": {1}}, + "m1": {"t0": {1}, "t1": {0, 2}}, }), - ) + Entry("heavily uneven partition counts", submap{ + "m0": {"t0", "t1", "t2"}, + "m1": {"t0", "t1", "t2"}, + }, topicparts{ + "t0": {0, 1, 2, 3, 4}, + "t1": {0, 1}, + "t2": {0}, + }, Assignments{ + "m0": {"t0": {0, 2, 4}, "t1": {1}}, + "m1": {"t0": {1, 3}, "t1": {0}, "t2": {0}}, + }), + ) }) + +type submap map[string][]string +type topicparts map[string][]int32 + +func (s submap) Subscriptions() *Subscriptions { + subs := NewSubscriptions() + for memberID, topics := range s { + for _, topic := range topics { + subs.AddSubscriber(memberID, topic) + } + } + return subs +} + +func (t topicparts) TopicPartitions() []*TopicPartitions { + tps := []*TopicPartitions{} + for topic, partitions := range t { + tps = append(tps, &TopicPartitions{Name: topic, Partitions: partitions}) + } + return tps +} diff --git a/config.go b/config.go index 084b835..205d8a2 100644 --- a/config.go +++ b/config.go @@ -23,7 +23,12 @@ type Config struct { // Group is the namespace for group management properties Group struct { - // The strategy to use for the allocation of partitions to consumers (defaults to StrategyRange) + // The assignor implementation to use for the allocation of partitions to consumers. + // Defaults to RangeAssignor. + PartitionAssignor Assignor + + // The strategy to use for the allocation of partitions to consumers. Defaults to + // StrategyRange. If PartitionAssignor is set, it takes precedence over this option. PartitionStrategy Strategy // By default, messages and errors from the subscribed topics and partitions are all multiplexed and diff --git a/consumer.go b/consumer.go index e7a67da..0ff438e 100644 --- a/consumer.go +++ b/consumer.go @@ -700,8 +700,18 @@ func (c *Consumer) syncGroup(strategy *balancer) (map[string][]int32, error) { GenerationId: generationID, } + assignor := c.client.config.Group.PartitionAssignor + if assignor == nil { + switch c.client.config.Group.PartitionStrategy { + case StrategyRoundRobin: + assignor = RoundRobinAssignor + default: + assignor = RangeAssignor + } + } + if strategy != nil { - for memberID, topics := range strategy.Perform(c.client.config.Group.PartitionStrategy) { + for memberID, topics := range strategy.Perform(assignor) { if err := req.AddGroupAssignmentMember(memberID, &sarama.ConsumerGroupMemberAssignment{ Topics: topics, }); err != nil {