diff --git a/balancer.go b/balancer.go index 9fade63..154cde0 100644 --- a/balancer.go +++ b/balancer.go @@ -71,100 +71,185 @@ func (n *Notification) success(current map[string][]int32) *Notification { // -------------------------------------------------------------------- -type balancer struct { - client sarama.Client - memberIDs []string - topics map[string]*topicInfo -} +// Assignor is a function which returns specific partition assignments +// given the set of topic subscriptions of a given group. +type Assignor func(subs *Subscriptions, topics []*TopicPartitions) Assignments -type topicInfo struct { +// TopicPartitions identifies a topic and its partition IDs. +type TopicPartitions struct { + Name string Partitions []int32 - MemberIDs []string +} + +// Subscriptions contains information about all members in a consumer +// group, and which topics they have subscribed to. +type Subscriptions struct { + memberIDs []string + subscribers map[string][]string +} + +// NewSubscriptions returns an empty set of subscriptions. +func NewSubscriptions() *Subscriptions { + return &Subscriptions{ + memberIDs: []string{}, + subscribers: map[string][]string{}, + } +} + +// Members returns the list of all member IDs in the group. +func (m *Subscriptions) Members() []string { + return m.memberIDs +} + +// AddSubscriber registers a member as subscribed to a topic. +// Returns self. +func (m *Subscriptions) AddSubscriber(memberID, topic string) *Subscriptions { + seen := false + for i := range m.memberIDs { + if m.memberIDs[i] == memberID { + seen = true + } + } + + if !seen { + m.memberIDs = append(m.memberIDs, memberID) + } + + m.subscribers[topic] = append(m.subscribers[topic], memberID) + return m +} + +// SubscribedMembers returns the full list of members subscribed +// to a topic. +func (m *Subscriptions) SubscribedMembers(topic string) []string { + return m.subscribers[topic] +} + +// IsSubscribed returns true if a member is subscribed to a topic. +func (m *Subscriptions) IsSubscribed(memberID, topic string) bool { + subs, ok := m.subscribers[topic] + if !ok { + return false + } + + for i := range subs { + if subs[i] == memberID { + return true + } + } + + return false +} + +// Assignments is a mapping of member IDs to the topic partitions that they +// have been assigned. +type Assignments map[string]map[string][]int32 + +// NewAssignments returns an empty set of assignments. +func NewAssignments() Assignments { + return map[string]map[string][]int32{} +} + +// Assign adds a partition to the list of a member's assignments. +func (a Assignments) Assign(memberID, topic string, partition int32) { + topics, ok := a[memberID] + if !ok { + topics = map[string][]int32{} + a[memberID] = topics + } + + topics[topic] = append(topics[topic], partition) +} + +type balancer struct { + client sarama.Client + subs *Subscriptions + topics []*TopicPartitions } func newBalancerFromMeta(client sarama.Client, members map[string]sarama.ConsumerGroupMemberMetadata) (*balancer, error) { balancer := &balancer{ - client: client, - memberIDs: make([]string, 0, len(members)), - topics: make(map[string]*topicInfo), + client: client, + subs: NewSubscriptions(), + topics: []*TopicPartitions{}, } + for memberID, meta := range members { - balancer.memberIDs = append(balancer.memberIDs, memberID) for _, topic := range meta.Topics { - if err := balancer.Topic(memberID, topic); err != nil { + balancer.subs.AddSubscriber(memberID, topic) + if err := balancer.AddTopic(topic); err != nil { return nil, err } } } - sort.Strings(balancer.memberIDs) return balancer, nil } -func (r *balancer) Topic(memberID string, name string) error { - info, ok := r.topics[name] - if !ok { - nums, err := r.client.Partitions(name) - if err != nil { - return err - } - - r.topics[name] = &topicInfo{ - MemberIDs: []string{memberID}, - Partitions: nums, +func (r *balancer) AddTopic(name string) error { + for i := range r.topics { + if r.topics[i].Name == name { + return nil } + } - return nil + nums, err := r.client.Partitions(name) + if err != nil { + return err } - info.MemberIDs = append(info.MemberIDs, memberID) + r.topics = append(r.topics, &TopicPartitions{ + Name: name, + Partitions: nums, + }) return nil } -func (r *balancer) Perform(s Strategy) map[string]map[string][]int32 { +func (r *balancer) Perform(fn Assignor) Assignments { if r == nil { return nil } - switch s { - case StrategyRoundRobin: - return assignRoundRobin(r.memberIDs, r.topics) - default: - return assignRange(r.memberIDs, r.topics) - } + return fn(r.subs, r.topics) } -func assignRange(_ []string, topics map[string]*topicInfo) map[string]map[string][]int32 { - tlen := len(topics) - res := make(map[string]map[string][]int32) - - for topic, info := range topics { - mlen := len(info.MemberIDs) - plen := len(info.Partitions) - - sort.Strings(info.MemberIDs) - for pos, memberID := range info.MemberIDs { +// RangeAssignor assigns partitions to subscribed group members by +// dividing the number of partitions, per-topic, by the number of +// consumers to determine the number of partitions per consumer that +// should be assigned. If the value does not evenly divide, consumers +// lexicographically earlier will be assigned an extra partition. +func RangeAssignor(subs *Subscriptions, topics []*TopicPartitions) Assignments { + assignments := NewAssignments() + + sort.Slice(topics, func(i, j int) bool { return topics[i].Name < topics[j].Name }) + for _, tp := range topics { + members := subs.SubscribedMembers(tp.Name) + mlen := len(members) + plen := len(tp.Partitions) + + sort.Strings(members) + for pos, memberID := range members { n, i := float64(plen)/float64(mlen), float64(pos) min := int(math.Floor(i*n + 0.5)) max := int(math.Floor((i+1)*n + 0.5)) - sub := info.Partitions[min:max] - if len(sub) <= 0 { - continue + sub := tp.Partitions[min:max] + for i := range sub { + assignments.Assign(memberID, tp.Name, sub[i]) } - - assigned, ok := res[memberID] - if !ok { - assigned = make(map[string][]int32, tlen) - res[memberID] = assigned - } - assigned[topic] = sub } } - return res + return assignments } -func assignRoundRobin(memberIDs []string, topics map[string]*topicInfo) map[string]map[string][]int32 { +// RoundRobinAssignor assigns partitions by iterating through the +// list of group members and assigning one to each consumer until +// all partitions have been assigned. If a group member is not +// subscribed to a topic, the next subscribed member is assigned +// instead. +func RoundRobinAssignor(subs *Subscriptions, topics []*TopicPartitions) Assignments { + assignments := NewAssignments() + memberIDs := subs.Members() sort.Strings(memberIDs) r := ring.New(len(memberIDs)) @@ -173,40 +258,22 @@ func assignRoundRobin(memberIDs []string, topics map[string]*topicInfo) map[stri r = r.Next() } - isSubscribed := func(memberID string, topic string) bool { - info, ok := topics[topic] - if !ok { - return false - } - - for _, subscriber := range info.MemberIDs { - if memberID == subscriber { - return true - } + sort.Slice(topics, func(i, j int) bool { return topics[i].Name < topics[j].Name }) + for _, tp := range topics { + if len(subs.SubscribedMembers(tp.Name)) == 0 { + continue } - return false - } - - tlen := len(topics) - res := make(map[string]map[string][]int32, r.Len()) - - for topic, info := range topics { - for i := range info.Partitions { - for ; !isSubscribed(r.Value.(string), topic); r = r.Next() { + partitions := tp.Partitions + for i := range partitions { + for ; !subs.IsSubscribed(r.Value.(string), tp.Name); r = r.Next() { continue } - memberID := r.Value.(string) - assigned, ok := res[memberID] - if !ok { - assigned = make(map[string][]int32, tlen) - res[memberID] = assigned - } - assigned[topic] = append(assigned[topic], info.Partitions[i]) + assignments.Assign(r.Value.(string), tp.Name, partitions[i]) r = r.Next() } } - return res + return assignments } diff --git a/balancer_test.go b/balancer_test.go index 4a88be8..a650927 100644 --- a/balancer_test.go +++ b/balancer_test.go @@ -55,237 +55,205 @@ var _ = Describe("balancer", func() { Expect(err).NotTo(HaveOccurred()) }) - It("should parse from meta data", func() { - Expect(subject.memberIDs).To(Equal([]string{"a", "b"})) + It("should track topic partitions", func() { Expect(subject.topics).To(HaveLen(3)) }) -}) + It("should track group member subscriptions", func() { + subs := subject.subs + Expect(subs.Members()).To(ConsistOf([]string{"a", "b"})) + Expect(subs.SubscribedMembers("one")).To(ConsistOf([]string{"a", "b"})) + Expect(subs.SubscribedMembers("two")).To(Equal([]string{"a"})) + Expect(subs.SubscribedMembers("three")).To(Equal([]string{"b"})) + }) -var _ = Describe("partition assignment", func() { +}) - DescribeTable("Ranges", - func(memberIDs []string, topics map[string]*topicInfo, expected map[string]map[string][]int32) { - assignments := assignRange(memberIDs, topics) - Expect(assignments).To(Equal(expected)) +var _ = Describe("Assignors", func() { + DescribeTable("Range", + func(subs submap, topics topicparts, expected Assignments) { + result := RangeAssignor(subs.Subscriptions(), topics.TopicPartitions()) + Expect(result).To(Equal(expected)) }, - Entry("three members, three partitions", []string{"M1", "M2", "M3"}, map[string]*topicInfo{ - "t1": &topicInfo{ - Partitions: []int32{0, 1, 2}, - MemberIDs: []string{"M1", "M2", "M3"}, - }, - }, map[string]map[string][]int32{ - "M1": {"t1": {0}}, - "M2": {"t1": {1}}, - "M3": {"t1": {2}}, + Entry("one consumer, no topics", submap{"m0": {"t0"}}, topicparts{}, Assignments{}), + Entry("one consumer, no such topic", submap{"m0": {"t1"}}, topicparts{"t0": {0}}, Assignments{}), + + Entry("one consumer, one topic", submap{"m0": {"t0"}}, topicparts{"t0": {0, 1, 2}}, Assignments{ + "m0": {"t0": {0, 1, 2}}, }), - Entry("member ID order", []string{"M3", "M1", "M2"}, map[string]*topicInfo{ - "t1": &topicInfo{ - Partitions: []int32{0, 1, 2}, - MemberIDs: []string{"M1", "M2", "M3"}, - }, - }, map[string]map[string][]int32{ - "M1": {"t1": {0}}, - "M2": {"t1": {1}}, - "M3": {"t1": {2}}, + Entry("one consumer, two topics, one subscribed", submap{"m0": {"t0"}}, topicparts{ + "t0": {0, 1, 2}, + "t1": {0, 1, 2}, + }, Assignments{ + "m0": {"t0": {0, 1, 2}}, }), - Entry("more members than partitions", []string{"M1", "M2", "M3"}, map[string]*topicInfo{ - "t1": &topicInfo{ - Partitions: []int32{0, 1}, - MemberIDs: []string{"M1", "M2", "M3"}, - }, - }, map[string]map[string][]int32{ - "M1": {"t1": {0}}, - "M3": {"t1": {1}}, + Entry("one consumer, multiple topics", submap{"m0": {"t0", "t1"}}, topicparts{ + "t0": {0, 1, 2}, + "t1": {0, 1, 2}, + }, Assignments{ + "m0": {"t0": {0, 1, 2}, "t1": {0, 1, 2}}, }), - Entry("far more members than partitions", []string{"M1", "M2", "M3"}, map[string]*topicInfo{ - "t1": &topicInfo{ - Partitions: []int32{0}, - MemberIDs: []string{"M1", "M2", "M3"}, - }, - }, map[string]map[string][]int32{ - "M2": {"t1": {0}}, + Entry("two consumers, one topic, one partition", submap{ + "m0": {"t0"}, + "m1": {"t0"}, + }, topicparts{ + "t0": {0}, + }, Assignments{ + "m0": {"t0": {0}}, }), - Entry("fewer members than partitions", []string{"M1", "M2", "M3"}, map[string]*topicInfo{ - "t1": &topicInfo{ - Partitions: []int32{0, 1, 2, 3}, - MemberIDs: []string{"M1", "M2", "M3"}, - }, - }, map[string]map[string][]int32{ - "M1": {"t1": {0}}, - "M2": {"t1": {1, 2}}, - "M3": {"t1": {3}}, + Entry("two consumers, one topic, two partitions", submap{ + "m0": {"t0"}, + "m1": {"t0"}, + }, topicparts{ + "t0": {0, 1}, + }, Assignments{ + "m0": {"t0": {0}}, + "m1": {"t0": {1}}, }), - Entry("uneven members/partitions ratio", []string{"M1", "M2", "M3"}, map[string]*topicInfo{ - "t1": &topicInfo{ - Partitions: []int32{0, 2, 4, 6, 8}, - MemberIDs: []string{"M1", "M2", "M3"}, - }, - }, map[string]map[string][]int32{ - "M1": {"t1": {0, 2}}, - "M2": {"t1": {4}}, - "M3": {"t1": {6, 8}}, + Entry("multiple consumers, mixed topics", submap{ + "m0": {"t0"}, + "m1": {"t0", "t1"}, + "m2": {"t0"}, + }, topicparts{ + "t0": {0, 1, 2}, + "t1": {0, 1}, + }, Assignments{ + "m0": {"t0": {0}}, + "m1": {"t0": {1}, "t1": {0, 1}}, + "m2": {"t0": {2}}, }), - Entry("multiple topics", []string{"M1", "M2", "M3"}, map[string]*topicInfo{ - "t1": &topicInfo{ - Partitions: []int32{0, 1, 2}, - MemberIDs: []string{"M1", "M2", "M3"}, - }, - "t2": &topicInfo{ - Partitions: []int32{0, 1}, - MemberIDs: []string{"M1", "M2", "M3"}, - }, - "t3": &topicInfo{ - Partitions: []int32{0}, - MemberIDs: []string{"M1", "M2", "M3"}, - }, - }, map[string]map[string][]int32{ - "M1": {"t1": {0}, "t2": {0}}, - "M2": {"t1": {1}, "t3": {0}}, - "M3": {"t1": {2}, "t2": {1}}, + Entry("two consumers, two topics, six partitions", submap{ + "m0": {"t0", "t1"}, + "m1": {"t0", "t1"}, + }, topicparts{ + "t0": {0, 1, 2}, + "t1": {0, 1, 2}, + }, Assignments{ + "m0": {"t0": {0, 1}, "t1": {0, 1}}, + "m1": {"t0": {2}, "t1": {2}}, }), - Entry("different subscriptions", []string{"M1", "M2", "M3"}, map[string]*topicInfo{ - "t1": &topicInfo{ - Partitions: []int32{0, 1, 2}, - MemberIDs: []string{"M1", "M2", "M3"}, - }, - "t2": &topicInfo{ - Partitions: []int32{0, 1}, - MemberIDs: []string{"M2", "M3"}, - }, - "t3": &topicInfo{ - Partitions: []int32{0}, - MemberIDs: []string{"M1"}, - }, - }, map[string]map[string][]int32{ - "M1": {"t1": {0}, "t3": {0}}, - "M2": {"t1": {1}, "t2": {0}}, - "M3": {"t1": {2}, "t2": {1}}, + Entry("heavily uneven partition counts", submap{ + "m0": {"t0", "t1", "t2"}, + "m1": {"t0", "t1", "t2"}, + }, topicparts{ + "t0": {0, 1, 2, 3, 4}, + "t1": {0, 1}, + "t2": {0}, + }, Assignments{ + "m0": {"t0": {0, 1, 2}, "t1": {0}, "t2": {0}}, + "m1": {"t0": {3, 4}, "t1": {1}}, }), ) DescribeTable("RoundRobin", - func(memberIDs []string, topics map[string]*topicInfo, expected map[string]map[string][]int32) { - assignments := assignRoundRobin(memberIDs, topics) - Expect(assignments).To(Equal(expected)) + func(subs submap, topics topicparts, expected Assignments) { + result := RoundRobinAssignor(subs.Subscriptions(), topics.TopicPartitions()) + Expect(result).To(Equal(expected)) }, - Entry("three members, three partitions", []string{"M1", "M2", "M3"}, map[string]*topicInfo{ - "t1": &topicInfo{ - Partitions: []int32{0, 1, 2}, - MemberIDs: []string{"M1", "M2", "M3"}, - }, - }, map[string]map[string][]int32{ - "M1": {"t1": {0}}, - "M2": {"t1": {1}}, - "M3": {"t1": {2}}, + Entry("one consumer, no topics", submap{"m0": {"t0"}}, topicparts{}, Assignments{}), + Entry("one consumer, no such topic", submap{"m0": {"t1"}}, topicparts{"t0": {0}}, Assignments{}), + + Entry("one consumer, one topic", submap{"m0": {"t0"}}, topicparts{"t0": {0, 1, 2}}, Assignments{ + "m0": {"t0": {0, 1, 2}}, }), - Entry("member ID order", []string{"M3", "M1", "M2"}, map[string]*topicInfo{ - "t1": &topicInfo{ - Partitions: []int32{0, 1, 2}, - MemberIDs: []string{"M1", "M2", "M3"}, - }, - }, map[string]map[string][]int32{ - "M1": {"t1": {0}}, - "M2": {"t1": {1}}, - "M3": {"t1": {2}}, + Entry("one consumer, two topics, one subscribed", submap{"m0": {"t0"}}, topicparts{ + "t0": {0, 1, 2}, + "t1": {0, 1, 2}, + }, Assignments{ + "m0": {"t0": {0, 1, 2}}, }), - Entry("more members than partitions", []string{"M1", "M2", "M3"}, map[string]*topicInfo{ - "t1": &topicInfo{ - Partitions: []int32{0, 1}, - MemberIDs: []string{"M1", "M2", "M3"}, - }, - }, map[string]map[string][]int32{ - "M1": {"t1": {0}}, - "M2": {"t1": {1}}, + Entry("one consumer, multiple topics", submap{"m0": {"t0", "t1"}}, topicparts{ + "t0": {0, 1, 2}, + "t1": {0, 1, 2}, + }, Assignments{ + "m0": {"t0": {0, 1, 2}, "t1": {0, 1, 2}}, }), - Entry("far more members than partitions", []string{"M1", "M2", "M3"}, map[string]*topicInfo{ - "t1": &topicInfo{ - Partitions: []int32{0}, - MemberIDs: []string{"M1", "M2", "M3"}, - }, - }, map[string]map[string][]int32{ - "M1": {"t1": {0}}, + Entry("two consumers, one topic, two partitions", submap{ + "m0": {"t0"}, + "m1": {"t0"}, + }, topicparts{ + "t0": {0, 1}, + }, Assignments{ + "m0": {"t0": {0}}, + "m1": {"t0": {1}}, }), - Entry("fewer members than partitions", []string{"M1", "M2", "M3"}, map[string]*topicInfo{ - "t1": &topicInfo{ - Partitions: []int32{0, 1, 2, 3}, - MemberIDs: []string{"M1", "M2", "M3"}, - }, - }, map[string]map[string][]int32{ - "M1": {"t1": {0, 3}}, - "M2": {"t1": {1}}, - "M3": {"t1": {2}}, + Entry("two consumers, one topic, one partition", submap{ + "m0": {"t0"}, + "m1": {"t0"}, + }, topicparts{ + "t0": {0}, + }, Assignments{ + "m0": {"t0": {0}}, }), - Entry("uneven members/partitions ratio", []string{"M1", "M2", "M3"}, map[string]*topicInfo{ - "t1": &topicInfo{ - Partitions: []int32{0, 2, 4, 6, 8}, - MemberIDs: []string{"M1", "M2", "M3"}, - }, - }, map[string]map[string][]int32{ - "M1": {"t1": {0, 6}}, - "M2": {"t1": {2, 8}}, - "M3": {"t1": {4}}, + Entry("multiple consumers, mixed topics", submap{ + "m0": {"t0"}, + "m1": {"t0", "t1"}, + "m2": {"t0"}, + }, topicparts{ + "t0": {0, 1, 2}, + "t1": {0, 1}, + }, Assignments{ + "m0": {"t0": {0}}, + "m1": {"t0": {1}, "t1": {0, 1}}, + "m2": {"t0": {2}}, }), - Entry("multiple topics", []string{"M1", "M2"}, map[string]*topicInfo{ - "t1": &topicInfo{ - Partitions: []int32{0, 1, 2}, - MemberIDs: []string{"M1", "M2"}, - }, - "t2": &topicInfo{ - Partitions: []int32{0, 1, 2}, - MemberIDs: []string{"M1", "M2"}, - }, - }, map[string]map[string][]int32{ - "M1": { - "t1": {0, 2}, - "t2": {1}, - }, - "M2": { - "t1": {1}, - "t2": {0, 2}, - }, + Entry("two consumers, two topics, six partitions", submap{ + "m0": {"t0", "t1"}, + "m1": {"t0", "t1"}, + }, topicparts{ + "t0": {0, 1, 2}, + "t1": {0, 1, 2}, + }, Assignments{ + "m0": {"t0": {0, 2}, "t1": {1}}, + "m1": {"t0": {1}, "t1": {0, 2}}, }), - Entry("different subscriptions", []string{"M1", "M2", "M3"}, map[string]*topicInfo{ - "t1": &topicInfo{ - Partitions: []int32{0}, - MemberIDs: []string{"M1", "M2", "M3"}, - }, - "t2": &topicInfo{ - Partitions: []int32{0, 1}, - MemberIDs: []string{"M2", "M3"}, - }, - "t3": &topicInfo{ - Partitions: []int32{0, 1, 2}, - MemberIDs: []string{"M3"}, - }, - }, map[string]map[string][]int32{ - "M1": { - "t1": {0}, - }, - "M2": { - "t2": {0}, - }, - "M3": { - "t2": {1}, - "t3": {0, 1, 2}, - }, + Entry("heavily uneven partition counts", submap{ + "m0": {"t0", "t1", "t2"}, + "m1": {"t0", "t1", "t2"}, + }, topicparts{ + "t0": {0, 1, 2, 3, 4}, + "t1": {0, 1}, + "t2": {0}, + }, Assignments{ + "m0": {"t0": {0, 2, 4}, "t1": {1}}, + "m1": {"t0": {1, 3}, "t1": {0}, "t2": {0}}, }), ) - }) + +type submap map[string][]string +type topicparts map[string][]int32 + +func (s submap) Subscriptions() *Subscriptions { + subs := NewSubscriptions() + for memberID, topics := range s { + for _, topic := range topics { + subs.AddSubscriber(memberID, topic) + } + } + return subs +} + +func (t topicparts) TopicPartitions() []*TopicPartitions { + tps := []*TopicPartitions{} + for topic, partitions := range t { + tps = append(tps, &TopicPartitions{Name: topic, Partitions: partitions}) + } + return tps +} diff --git a/config.go b/config.go index 084b835..205d8a2 100644 --- a/config.go +++ b/config.go @@ -23,7 +23,12 @@ type Config struct { // Group is the namespace for group management properties Group struct { - // The strategy to use for the allocation of partitions to consumers (defaults to StrategyRange) + // The assignor implementation to use for the allocation of partitions to consumers. + // Defaults to RangeAssignor. + PartitionAssignor Assignor + + // The strategy to use for the allocation of partitions to consumers. Defaults to + // StrategyRange. If PartitionAssignor is set, it takes precedence over this option. PartitionStrategy Strategy // By default, messages and errors from the subscribed topics and partitions are all multiplexed and diff --git a/consumer.go b/consumer.go index 13500cc..8b1adf3 100644 --- a/consumer.go +++ b/consumer.go @@ -706,7 +706,17 @@ func (c *Consumer) syncGroup(strategy *balancer) (map[string][]int32, error) { GenerationId: generationID, } - for memberID, topics := range strategy.Perform(c.client.config.Group.PartitionStrategy) { + assignor := c.client.config.Group.PartitionAssignor + if assignor == nil { + switch c.client.config.Group.PartitionStrategy { + case StrategyRoundRobin: + assignor = RoundRobinAssignor + default: + assignor = RangeAssignor + } + } + + for memberID, topics := range strategy.Perform(assignor) { if err := req.AddGroupAssignmentMember(memberID, &sarama.ConsumerGroupMemberAssignment{ Version: 1, Topics: topics,