Skip to content

Commit

Permalink
fix: data race in balance strategy (#2453)
Browse files Browse the repository at this point in the history
  • Loading branch information
napallday committed Mar 27, 2023
1 parent 2f8dcd0 commit 9127f1c
Show file tree
Hide file tree
Showing 7 changed files with 70 additions and 35 deletions.
62 changes: 40 additions & 22 deletions balance_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,35 +57,42 @@ type BalanceStrategy interface {

// --------------------------------------------------------------------

// BalanceStrategyRange is the default and assigns partitions as ranges to consumer group members.
// NewBalanceStrategyRange returns a range balance strategy,
// which is the default and assigns partitions as ranges to consumer group members.
// This follows the same logic as
// https://kafka.apache.org/31/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html
//
// Example with two topics T1 and T2 with six partitions each (0..5) and two members (M1, M2):
//
// M1: {T1: [0, 1, 2], T2: [0, 1, 2]}
// M2: {T2: [3, 4, 5], T2: [3, 4, 5]}
var BalanceStrategyRange = &balanceStrategy{
name: RangeBalanceStrategyName,
coreFn: func(plan BalanceStrategyPlan, memberIDs []string, topic string, partitions []int32) {
partitionsPerConsumer := len(partitions) / len(memberIDs)
consumersWithExtraPartition := len(partitions) % len(memberIDs)

sort.Strings(memberIDs)

for i, memberID := range memberIDs {
min := i*partitionsPerConsumer + int(math.Min(float64(consumersWithExtraPartition), float64(i)))
extra := 0
if i < consumersWithExtraPartition {
extra = 1
func NewBalanceStrategyRange() BalanceStrategy {
return &balanceStrategy{
name: RangeBalanceStrategyName,
coreFn: func(plan BalanceStrategyPlan, memberIDs []string, topic string, partitions []int32) {
partitionsPerConsumer := len(partitions) / len(memberIDs)
consumersWithExtraPartition := len(partitions) % len(memberIDs)

sort.Strings(memberIDs)

for i, memberID := range memberIDs {
min := i*partitionsPerConsumer + int(math.Min(float64(consumersWithExtraPartition), float64(i)))
extra := 0
if i < consumersWithExtraPartition {
extra = 1
}
max := min + partitionsPerConsumer + extra
plan.Add(memberID, topic, partitions[min:max]...)
}
max := min + partitionsPerConsumer + extra
plan.Add(memberID, topic, partitions[min:max]...)
}
},
},
}
}

// BalanceStrategySticky assigns partitions to members with an attempt to preserve earlier assignments
// Deprecated: use NewBalanceStrategyRange to avoid data race issue
var BalanceStrategyRange = NewBalanceStrategyRange()

// NewBalanceStrategySticky returns a sticky balance strategy,
// which assigns partitions to members with an attempt to preserve earlier assignments
// while maintain a balanced partition distribution.
// Example with topic T with six partitions (0..5) and two members (M1, M2):
//
Expand All @@ -97,7 +104,12 @@ var BalanceStrategyRange = &balanceStrategy{
// M1: {T: [0, 2]}
// M2: {T: [1, 3]}
// M3: {T: [4, 5]}
var BalanceStrategySticky = &stickyBalanceStrategy{}
func NewBalanceStrategySticky() BalanceStrategy {
return &stickyBalanceStrategy{}
}

// Deprecated: use NewBalanceStrategySticky to avoid data race issue
var BalanceStrategySticky = NewBalanceStrategySticky()

// --------------------------------------------------------------------

Expand Down Expand Up @@ -331,11 +343,17 @@ func (s *stickyBalanceStrategy) balance(currentAssignment map[string][]topicPart
}
}

// BalanceStrategyRoundRobin assigns partitions to members in alternating order.
// NewBalanceStrategyRoundRobin returns a round-robin balance strategy,
// which assigns partitions to members in alternating order.
// For example, there are two topics (t0, t1) and two consumer (m0, m1), and each topic has three partitions (p0, p1, p2):
// M0: [t0p0, t0p2, t1p1]
// M1: [t0p1, t1p0, t1p2]
var BalanceStrategyRoundRobin = new(roundRobinBalancer)
func NewBalanceStrategyRoundRobin() BalanceStrategy {
return new(roundRobinBalancer)
}

// Deprecated: use NewBalanceStrategyRoundRobin to avoid data race issue
var BalanceStrategyRoundRobin = NewBalanceStrategyRoundRobin()

type roundRobinBalancer struct{}

Expand Down
25 changes: 21 additions & 4 deletions balance_strategy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func TestBalanceStrategyRange(t *testing.T) {
},
}

strategy := BalanceStrategyRange
strategy := NewBalanceStrategyRange()
if strategy.Name() != "range" {
t.Errorf("Unexpected stategy name\nexpected: range\nactual: %v", strategy.Name())
}
Expand All @@ -96,7 +96,7 @@ func TestBalanceStrategyRange(t *testing.T) {
}

func TestBalanceStrategyRangeAssignmentData(t *testing.T) {
strategy := BalanceStrategyRange
strategy := NewBalanceStrategyRange()

members := make(map[string]ConsumerGroupMemberMetadata, 2)
members["consumer1"] = ConsumerGroupMemberMetadata{
Expand Down Expand Up @@ -177,7 +177,7 @@ func TestBalanceStrategyRoundRobin(t *testing.T) {
},
}

strategy := BalanceStrategyRoundRobin
strategy := NewBalanceStrategyRoundRobin()
if strategy.Name() != "roundrobin" {
t.Errorf("Unexpected strategy name\nexpected: roundrobin\nactual: %v", strategy.Name())
}
Expand Down Expand Up @@ -284,7 +284,7 @@ func Test_deserializeTopicPartitionAssignment(t *testing.T) {
}

func TestBalanceStrategyRoundRobinAssignmentData(t *testing.T) {
strategy := BalanceStrategyRoundRobin
strategy := NewBalanceStrategyRoundRobin()

members := make(map[string]ConsumerGroupMemberMetadata, 2)
members["consumer1"] = ConsumerGroupMemberMetadata{
Expand Down Expand Up @@ -2094,6 +2094,23 @@ func Test_stickyBalanceStrategy_Plan_AssignmentData(t *testing.T) {
}
}

func Test_stickyBalanceStrategy_Plan_data_race(t *testing.T) {
for i := 0; i < 1000; i++ {
go func(bs BalanceStrategy) {
members := map[string]ConsumerGroupMemberMetadata{
"m1": {
Version: 3,
Topics: []string{"topic"},
},
}
topics := map[string][]int32{
"topic": {0, 1, 2},
}
_, _ = bs.Plan(members, topics)
}(NewBalanceStrategySticky())
}
}

func BenchmarkStickAssignmentWithLargeNumberOfConsumersAndTopics(b *testing.B) {
s := &stickyBalanceStrategy{}
r := rand.New(rand.NewSource(time.Now().UnixNano()))
Expand Down
6 changes: 3 additions & 3 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,15 +294,15 @@ type Config struct {
Interval time.Duration
}
Rebalance struct {
// Strategy for allocating topic partitions to members (default BalanceStrategyRange)
// Strategy for allocating topic partitions to members.
// Deprecated: Strategy exists for historical compatibility
// and should not be used. Please use GroupStrategies.
Strategy BalanceStrategy

// GroupStrategies is the priority-ordered list of client-side consumer group
// balancing strategies that will be offered to the coordinator. The first
// strategy that all group members support will be chosen by the leader.
// default: [BalanceStrategyRange]
// default: [ NewBalanceStrategyRange() ]
GroupStrategies []BalanceStrategy

// The maximum allowed time for each worker to join the group once a rebalance has begun.
Expand Down Expand Up @@ -539,7 +539,7 @@ func NewConfig() *Config {

c.Consumer.Group.Session.Timeout = 10 * time.Second
c.Consumer.Group.Heartbeat.Interval = 3 * time.Second
c.Consumer.Group.Rebalance.GroupStrategies = []BalanceStrategy{BalanceStrategyRange}
c.Consumer.Group.Rebalance.GroupStrategies = []BalanceStrategy{NewBalanceStrategyRange()}
c.Consumer.Group.Rebalance.Timeout = 60 * time.Second
c.Consumer.Group.Rebalance.Retry.Max = 4
c.Consumer.Group.Rebalance.Retry.Backoff = 2 * time.Second
Expand Down
2 changes: 1 addition & 1 deletion config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,7 @@ func TestGroupInstanceIdAndVersionValidation(t *testing.T) {

func TestConsumerGroupStrategyCompatibility(t *testing.T) {
config := NewTestConfig()
config.Consumer.Group.Rebalance.Strategy = BalanceStrategySticky
config.Consumer.Group.Rebalance.Strategy = NewBalanceStrategySticky()
if err := config.Validate(); err != nil {
t.Error("Expected passing config validation, got ", err)
}
Expand Down
6 changes: 3 additions & 3 deletions examples/consumergroup/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,11 @@ func main() {

switch assignor {
case "sticky":
config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.BalanceStrategySticky}
config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategySticky()}
case "roundrobin":
config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.BalanceStrategyRoundRobin}
config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategyRoundRobin()}
case "range":
config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.BalanceStrategyRange}
config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.NewBalanceStrategyRange()}
default:
log.Panicf("Unrecognized consumer group partition assignor: %s", assignor)
}
Expand Down
2 changes: 1 addition & 1 deletion examples/exactly_once/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func main() {

config.Consumer.IsolationLevel = sarama.ReadCommitted
config.Consumer.Offsets.AutoCommit.Enable = false
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
config.Consumer.Group.Rebalance.Strategy = sarama.NewBalanceStrategyRoundRobin()

if oldest {
config.Consumer.Offsets.Initial = sarama.OffsetOldest
Expand Down
2 changes: 1 addition & 1 deletion functional_consumer_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -552,7 +552,7 @@ func (m *testFuncConsumerGroupMember) loop(topics []string) {

func newTestStatefulStrategy(t *testing.T) *testStatefulStrategy {
return &testStatefulStrategy{
BalanceStrategy: BalanceStrategyRange,
BalanceStrategy: NewBalanceStrategyRange(),
t: t,
}
}
Expand Down

0 comments on commit 9127f1c

Please sign in to comment.