Skip to content
This repository has been archived by the owner on Jan 8, 2020. It is now read-only.

Pluggable partition assignment strategies #218

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
219 changes: 162 additions & 57 deletions balancer.go
@@ -1,6 +1,7 @@
package cluster

import (
"container/ring"
"math"
"sort"

Expand Down Expand Up @@ -70,101 +71,205 @@ func (n *Notification) success(current map[string][]int32) *Notification {

// --------------------------------------------------------------------

type topicInfo struct {
// Assignor is a function which returns specific partition assignments
// given the set of topic subscriptions of a given group.
type Assignor func(subs *Subscriptions, topics []*TopicPartitions) Assignments

// TopicPartitions identifies a topic and its partition IDs.
type TopicPartitions struct {
Name string
Partitions []int32
MemberIDs []string
}

func (info topicInfo) Perform(s Strategy) map[string][]int32 {
if s == StrategyRoundRobin {
return info.RoundRobin()
}
return info.Ranges()
// Subscriptions contains information about all members in a consumer
// group, and which topics they have subscribed to.
type Subscriptions struct {
memberIDs []string
subscribers map[string][]string
}

func (info topicInfo) Ranges() map[string][]int32 {
sort.Strings(info.MemberIDs)
// NewSubscriptions returns an empty set of subscriptions.
func NewSubscriptions() *Subscriptions {
return &Subscriptions{
memberIDs: []string{},
subscribers: map[string][]string{},
}
}

mlen := len(info.MemberIDs)
plen := len(info.Partitions)
res := make(map[string][]int32, mlen)
// Members returns the list of all member IDs in the group.
func (m *Subscriptions) Members() []string {
return m.memberIDs
}

for pos, memberID := range info.MemberIDs {
n, i := float64(plen)/float64(mlen), float64(pos)
min := int(math.Floor(i*n + 0.5))
max := int(math.Floor((i+1)*n + 0.5))
sub := info.Partitions[min:max]
if len(sub) > 0 {
res[memberID] = sub
// AddSubscriber registers a member as subscribed to a topic.
// Returns self.
func (m *Subscriptions) AddSubscriber(memberID, topic string) *Subscriptions {
seen := false
for i := range m.memberIDs {
if m.memberIDs[i] == memberID {
seen = true
}
}
return res

if !seen {
m.memberIDs = append(m.memberIDs, memberID)
}

m.subscribers[topic] = append(m.subscribers[topic], memberID)
return m
}

// SubscribedMembers returns the full list of members subscribed
// to a topic.
func (m *Subscriptions) SubscribedMembers(topic string) []string {
return m.subscribers[topic]
}

func (info topicInfo) RoundRobin() map[string][]int32 {
sort.Strings(info.MemberIDs)
// IsSubscribed returns true if a member is subscribed to a topic.
func (m *Subscriptions) IsSubscribed(memberID, topic string) bool {
subs, ok := m.subscribers[topic]
if !ok {
return false
}

mlen := len(info.MemberIDs)
res := make(map[string][]int32, mlen)
for i, pnum := range info.Partitions {
memberID := info.MemberIDs[i%mlen]
res[memberID] = append(res[memberID], pnum)
for i := range subs {
if subs[i] == memberID {
return true
}
}
return res

return false
}

// --------------------------------------------------------------------
// Assignments is a mapping of member IDs to the topic partitions that they
// have been assigned.
type Assignments map[string]map[string][]int32

// NewAssignments returns an empty set of assignments.
func NewAssignments() Assignments {
return map[string]map[string][]int32{}
}

// Assign adds a partition to the list of a member's assignments.
func (a Assignments) Assign(memberID, topic string, partition int32) {
topics, ok := a[memberID]
if !ok {
topics = map[string][]int32{}
a[memberID] = topics
}

topics[topic] = append(topics[topic], partition)
}

type balancer struct {
client sarama.Client
topics map[string]topicInfo
subs *Subscriptions
topics []*TopicPartitions
}

func newBalancerFromMeta(client sarama.Client, members map[string]sarama.ConsumerGroupMemberMetadata) (*balancer, error) {
balancer := newBalancer(client)
balancer := &balancer{
client: client,
subs: NewSubscriptions(),
topics: []*TopicPartitions{},
}

for memberID, meta := range members {
for _, topic := range meta.Topics {
if err := balancer.Topic(topic, memberID); err != nil {
balancer.subs.AddSubscriber(memberID, topic)
if err := balancer.AddTopic(topic); err != nil {
return nil, err
}
}
}

return balancer, nil
}

func newBalancer(client sarama.Client) *balancer {
return &balancer{
client: client,
topics: make(map[string]topicInfo),
func (r *balancer) AddTopic(name string) error {
for i := range r.topics {
if r.topics[i].Name == name {
return nil
}
}

nums, err := r.client.Partitions(name)
if err != nil {
return err
}

r.topics = append(r.topics, &TopicPartitions{
Name: name,
Partitions: nums,
})
return nil
}

func (r *balancer) Topic(name string, memberID string) error {
topic, ok := r.topics[name]
if !ok {
nums, err := r.client.Partitions(name)
if err != nil {
return err
}
topic = topicInfo{
Partitions: nums,
MemberIDs: make([]string, 0, 1),
func (r *balancer) Perform(fn Assignor) Assignments {
return fn(r.subs, r.topics)
}

// RangeAssignor assigns partitions to subscribed group members by
// dividing the number of partitions, per-topic, by the number of
// consumers to determine the number of partitions per consumer that
// should be assigned. If the value does not evenly divide, consumers
// lexicographically earlier will be assigned an extra partition.
func RangeAssignor(subs *Subscriptions, topics []*TopicPartitions) Assignments {
assignments := NewAssignments()

sort.Slice(topics, func(i, j int) bool { return topics[i].Name < topics[j].Name })
for _, tp := range topics {
members := subs.SubscribedMembers(tp.Name)
mlen := len(members)
plen := len(tp.Partitions)

sort.Strings(members)
for pos, memberID := range members {
n, i := float64(plen)/float64(mlen), float64(pos)
min := int(math.Floor(i*n + 0.5))
max := int(math.Floor((i+1)*n + 0.5))
sub := tp.Partitions[min:max]
for i := range sub {
assignments.Assign(memberID, tp.Name, sub[i])
}
}
}
topic.MemberIDs = append(topic.MemberIDs, memberID)
r.topics[name] = topic
return nil

return assignments
}

func (r *balancer) Perform(s Strategy) map[string]map[string][]int32 {
res := make(map[string]map[string][]int32, 1)
for topic, info := range r.topics {
for memberID, partitions := range info.Perform(s) {
if _, ok := res[memberID]; !ok {
res[memberID] = make(map[string][]int32, 1)
// RoundRobinAssignor assigns partitions by iterating through the
// list of group members and assigning one to each consumer until
// all partitions have been assigned. If a group member is not
// subscribed to a topic, the next subscribed member is assigned
// instead.
func RoundRobinAssignor(subs *Subscriptions, topics []*TopicPartitions) Assignments {
assignments := NewAssignments()
memberIDs := subs.Members()
sort.Strings(memberIDs)

r := ring.New(len(memberIDs))
for i := 0; i < r.Len(); i++ {
r.Value = memberIDs[i]
r = r.Next()
}

sort.Slice(topics, func(i, j int) bool { return topics[i].Name < topics[j].Name })
for _, tp := range topics {
if len(subs.SubscribedMembers(tp.Name)) == 0 {
continue
}

partitions := tp.Partitions
for i := range partitions {
for ; !subs.IsSubscribed(r.Value.(string), tp.Name); r = r.Next() {
continue
}
res[memberID][topic] = partitions

assignments.Assign(r.Value.(string), tp.Name, partitions[i])
r = r.Next()
}
}
return res

return assignments
}