Skip to content
This repository has been archived by the owner on Jan 8, 2020. It is now read-only.

Round-robin partition assignments across multiple topics #216

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
170 changes: 104 additions & 66 deletions balancer.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cluster

import (
"container/ring"
"math"
"sort"

Expand Down Expand Up @@ -70,89 +71,53 @@ func (n *Notification) success(current map[string][]int32) *Notification {

// --------------------------------------------------------------------

type balancer struct {
client sarama.Client
memberIDs []string
topics map[string]*topicInfo
}

type topicInfo struct {
Partitions []int32
MemberIDs []string
}

func (info topicInfo) Perform(s Strategy) map[string][]int32 {
if s == StrategyRoundRobin {
return info.RoundRobin()
}
return info.Ranges()
}

func (info topicInfo) Ranges() map[string][]int32 {
sort.Strings(info.MemberIDs)

mlen := len(info.MemberIDs)
plen := len(info.Partitions)
res := make(map[string][]int32, mlen)

for pos, memberID := range info.MemberIDs {
n, i := float64(plen)/float64(mlen), float64(pos)
min := int(math.Floor(i*n + 0.5))
max := int(math.Floor((i+1)*n + 0.5))
sub := info.Partitions[min:max]
if len(sub) > 0 {
res[memberID] = sub
}
}
return res
}

func (info topicInfo) RoundRobin() map[string][]int32 {
sort.Strings(info.MemberIDs)

mlen := len(info.MemberIDs)
res := make(map[string][]int32, mlen)
for i, pnum := range info.Partitions {
memberID := info.MemberIDs[i%mlen]
res[memberID] = append(res[memberID], pnum)
}
return res
}

// --------------------------------------------------------------------

type balancer struct {
client sarama.Client
topics map[string]topicInfo
}

func newBalancerFromMeta(client sarama.Client, members map[string]sarama.ConsumerGroupMemberMetadata) (*balancer, error) {
balancer := newBalancer(client)
balancer := &balancer{
client: client,
memberIDs: make([]string, 0, len(members)),
topics: make(map[string]*topicInfo),
}
for memberID, meta := range members {
balancer.memberIDs = append(balancer.memberIDs, memberID)
for _, topic := range meta.Topics {
if err := balancer.Topic(topic, memberID); err != nil {
if err := balancer.Topic(memberID, topic); err != nil {
return nil, err
}
}
}
return balancer, nil
}

func newBalancer(client sarama.Client) *balancer {
return &balancer{
client: client,
topics: make(map[string]topicInfo),
}
sort.Strings(balancer.memberIDs)
return balancer, nil
}

func (r *balancer) Topic(name string, memberID string) error {
topic, ok := r.topics[name]
func (r *balancer) Topic(memberID string, name string) error {
info, ok := r.topics[name]
if !ok {
nums, err := r.client.Partitions(name)
if err != nil {
return err
}
topic = topicInfo{

r.topics[name] = &topicInfo{
MemberIDs: []string{memberID},
Partitions: nums,
MemberIDs: make([]string, 0, 1),
}

return nil
}
topic.MemberIDs = append(topic.MemberIDs, memberID)
r.topics[name] = topic

info.MemberIDs = append(info.MemberIDs, memberID)
return nil
}

Expand All @@ -161,14 +126,87 @@ func (r *balancer) Perform(s Strategy) map[string]map[string][]int32 {
return nil
}

res := make(map[string]map[string][]int32, 1)
for topic, info := range r.topics {
for memberID, partitions := range info.Perform(s) {
if _, ok := res[memberID]; !ok {
res[memberID] = make(map[string][]int32, 1)
switch s {
case StrategyRoundRobin:
return assignRoundRobin(r.memberIDs, r.topics)
default:
return assignRange(r.memberIDs, r.topics)
}
}

func assignRange(_ []string, topics map[string]*topicInfo) map[string]map[string][]int32 {
tlen := len(topics)
res := make(map[string]map[string][]int32)

for topic, info := range topics {
mlen := len(info.MemberIDs)
plen := len(info.Partitions)

sort.Strings(info.MemberIDs)
for pos, memberID := range info.MemberIDs {
n, i := float64(plen)/float64(mlen), float64(pos)
min := int(math.Floor(i*n + 0.5))
max := int(math.Floor((i+1)*n + 0.5))
sub := info.Partitions[min:max]
if len(sub) <= 0 {
continue
}

assigned, ok := res[memberID]
if !ok {
assigned = make(map[string][]int32, tlen)
res[memberID] = assigned
}
res[memberID][topic] = partitions
assigned[topic] = sub
}
}

return res
}

func assignRoundRobin(memberIDs []string, topics map[string]*topicInfo) map[string]map[string][]int32 {
sort.Strings(memberIDs)

r := ring.New(len(memberIDs))
for i := 0; i < r.Len(); i++ {
r.Value = memberIDs[i]
r = r.Next()
}

isSubscribed := func(memberID string, topic string) bool {
info, ok := topics[topic]
if !ok {
return false
}

for _, subscriber := range info.MemberIDs {
if memberID == subscriber {
return true
}
}

return false
}

tlen := len(topics)
res := make(map[string]map[string][]int32, r.Len())

for topic, info := range topics {
for i := range info.Partitions {
for ; !isSubscribed(r.Value.(string), topic); r = r.Next() {
continue
}

memberID := r.Value.(string)
assigned, ok := res[memberID]
if !ok {
assigned = make(map[string][]int32, tlen)
res[memberID] = assigned
}
assigned[topic] = append(assigned[topic], info.Partitions[i])
r = r.Next()
}
}

return res
}