Skip to content

Commit

Permalink
Merge pull request #2339 from Jacob-bzx/Jacob-bzx/support-multiple-ba…
Browse files Browse the repository at this point in the history
…lance-strategies

feat(consumer): support multiple balance strategies
  • Loading branch information
dnwe committed Sep 27, 2022
2 parents d0a00ae + d66826a commit 5e2c2ef
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 24 deletions.
28 changes: 25 additions & 3 deletions config.go
Expand Up @@ -273,7 +273,16 @@ type Config struct {
}
Rebalance struct {
// Strategy for allocating topic partitions to members (default BalanceStrategyRange)
// Deprecated: Strategy exists for historical compatibility
// and should not be used. Please use GroupStrategies.
Strategy BalanceStrategy

// GroupStrategies is the priority-ordered list of client-side consumer group
// balancing strategies that will be offered to the coordinator. The first
// strategy that all group members support will be chosen by the leader.
// default: [BalanceStrategyRange]
GroupStrategies []BalanceStrategy

// The maximum allowed time for each worker to join the group once a rebalance has begun.
// This is basically a limit on the amount of time needed for all tasks to flush any pending
// data and commit offsets. If the timeout is exceeded, then the worker will be removed from
Expand Down Expand Up @@ -504,7 +513,7 @@ func NewConfig() *Config {

c.Consumer.Group.Session.Timeout = 10 * time.Second
c.Consumer.Group.Heartbeat.Interval = 3 * time.Second
c.Consumer.Group.Rebalance.Strategy = BalanceStrategyRange
c.Consumer.Group.Rebalance.GroupStrategies = []BalanceStrategy{BalanceStrategyRange}
c.Consumer.Group.Rebalance.Timeout = 60 * time.Second
c.Consumer.Group.Rebalance.Retry.Max = 4
c.Consumer.Group.Rebalance.Retry.Backoff = 2 * time.Second
Expand Down Expand Up @@ -745,6 +754,10 @@ func (c *Config) Validate() error {
Logger.Println("Deprecation warning: Consumer.Offsets.CommitInterval exists for historical compatibility" +
" and should not be used. Please use Consumer.Offsets.AutoCommit, the current value will be ignored")
}
if c.Consumer.Group.Rebalance.Strategy != nil {
Logger.Println("Deprecation warning: Consumer.Group.Rebalance.Strategy exists for historical compatibility" +
" and should not be used. Please use Consumer.Group.Rebalance.GroupStrategies")
}

// validate IsolationLevel
if c.Consumer.IsolationLevel == ReadCommitted && !c.Version.IsAtLeast(V0_11_0_0) {
Expand All @@ -759,15 +772,24 @@ func (c *Config) Validate() error {
return ConfigurationError("Consumer.Group.Heartbeat.Interval must be >= 1ms")
case c.Consumer.Group.Heartbeat.Interval >= c.Consumer.Group.Session.Timeout:
return ConfigurationError("Consumer.Group.Heartbeat.Interval must be < Consumer.Group.Session.Timeout")
case c.Consumer.Group.Rebalance.Strategy == nil:
return ConfigurationError("Consumer.Group.Rebalance.Strategy must not be empty")
case c.Consumer.Group.Rebalance.Strategy == nil && len(c.Consumer.Group.Rebalance.GroupStrategies) == 0:
return ConfigurationError("Consumer.Group.Rebalance.GroupStrategies or Consumer.Group.Rebalance.Strategy must not be empty")
case c.Consumer.Group.Rebalance.Strategy != nil && len(c.Consumer.Group.Rebalance.GroupStrategies) != 0:
return ConfigurationError("Consumer.Group.Rebalance.GroupStrategies and Consumer.Group.Rebalance.Strategy cannot be set at the same time")
case c.Consumer.Group.Rebalance.Timeout <= time.Millisecond:
return ConfigurationError("Consumer.Group.Rebalance.Timeout must be >= 1ms")
case c.Consumer.Group.Rebalance.Retry.Max < 0:
return ConfigurationError("Consumer.Group.Rebalance.Retry.Max must be >= 0")
case c.Consumer.Group.Rebalance.Retry.Backoff < 0:
return ConfigurationError("Consumer.Group.Rebalance.Retry.Backoff must be >= 0")
}

for _, strategy := range c.Consumer.Group.Rebalance.GroupStrategies {
if strategy == nil {
return ConfigurationError("elements in Consumer.Group.Rebalance.Strategies must not be empty")
}
}

if c.Consumer.Group.InstanceId != "" {
if !c.Version.IsAtLeast(V2_3_0_0) {
return ConfigurationError("Consumer.Group.InstanceId need Version >= 2.3")
Expand Down
60 changes: 45 additions & 15 deletions consumer_group.go
Expand Up @@ -324,6 +324,17 @@ func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler
return nil, join.Err
}

var strategy BalanceStrategy
var ok bool
if strategy = c.config.Consumer.Group.Rebalance.Strategy; strategy == nil {
strategy, ok = c.findStrategy(join.GroupProtocol, c.config.Consumer.Group.Rebalance.GroupStrategies)
if !ok {
// this case shouldn't happen in practice, since the leader will choose the protocol
// that all the members support
return nil, fmt.Errorf("unable to find selected strategy: %s", join.GroupProtocol)
}
}

// Prepare distribution plan if we joined as the leader
var plan BalanceStrategyPlan
var members map[string]ConsumerGroupMemberMetadata
Expand All @@ -333,14 +344,14 @@ func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler
return nil, err
}

plan, err = c.balance(members)
plan, err = c.balance(strategy, members)
if err != nil {
return nil, err
}
}

// Sync consumer group
groupRequest, err := c.syncGroupRequest(coordinator, members, plan, join.GenerationId)
syncGroupResponse, err := c.syncGroupRequest(coordinator, members, plan, join.GenerationId, strategy)
if consumerGroupSyncTotal != nil {
consumerGroupSyncTotal.Inc(1)
}
Expand All @@ -351,13 +362,13 @@ func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler
}
return nil, err
}
if !errors.Is(groupRequest.Err, ErrNoError) {
if !errors.Is(syncGroupResponse.Err, ErrNoError) {
if consumerGroupSyncFailed != nil {
consumerGroupSyncFailed.Inc(1)
}
}

switch groupRequest.Err {
switch syncGroupResponse.Err {
case ErrNoError:
case ErrUnknownMemberId, ErrIllegalGeneration:
// reset member ID and retry immediately
Expand All @@ -366,22 +377,22 @@ func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler
case ErrNotCoordinatorForConsumer, ErrRebalanceInProgress, ErrOffsetsLoadInProgress:
// retry after backoff
if retries <= 0 {
return nil, groupRequest.Err
return nil, syncGroupResponse.Err
}
return c.retryNewSession(ctx, topics, handler, retries, true)
case ErrFencedInstancedId:
if c.groupInstanceId != nil {
Logger.Printf("JoinGroup failed: group instance id %s has been fenced\n", *c.groupInstanceId)
}
return nil, groupRequest.Err
return nil, syncGroupResponse.Err
default:
return nil, groupRequest.Err
return nil, syncGroupResponse.Err
}

// Retrieve and sort claims
var claims map[string][]int32
if len(groupRequest.MemberAssignment) > 0 {
members, err := groupRequest.GetMemberAssignment()
if len(syncGroupResponse.MemberAssignment) > 0 {
members, err := syncGroupResponse.GetMemberAssignment()
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -424,26 +435,46 @@ func (c *consumerGroup) joinGroupRequest(coordinator *Broker, topics []string) (
Topics: topics,
UserData: c.userData,
}
strategy := c.config.Consumer.Group.Rebalance.Strategy
if err := req.AddGroupProtocolMetadata(strategy.Name(), meta); err != nil {
return nil, err
var strategy BalanceStrategy
if strategy = c.config.Consumer.Group.Rebalance.Strategy; strategy != nil {
if err := req.AddGroupProtocolMetadata(strategy.Name(), meta); err != nil {
return nil, err
}
} else {
for _, strategy = range c.config.Consumer.Group.Rebalance.GroupStrategies {
if err := req.AddGroupProtocolMetadata(strategy.Name(), meta); err != nil {
return nil, err
}
}
}

return coordinator.JoinGroup(req)
}

// findStrategy returns the BalanceStrategy with the specified protocolName
// from the slice provided.
func (c *consumerGroup) findStrategy(name string, groupStrategies []BalanceStrategy) (BalanceStrategy, bool) {
for _, strategy := range groupStrategies {
if strategy.Name() == name {
return strategy, true
}
}
return nil, false
}

func (c *consumerGroup) syncGroupRequest(
coordinator *Broker,
members map[string]ConsumerGroupMemberMetadata,
plan BalanceStrategyPlan,
generationID int32,
strategy BalanceStrategy,
) (*SyncGroupResponse, error) {
req := &SyncGroupRequest{
GroupId: c.groupID,
MemberId: c.memberID,
GenerationId: generationID,
}
strategy := c.config.Consumer.Group.Rebalance.Strategy

if c.config.Version.IsAtLeast(V2_3_0_0) {
req.Version = 3
}
Expand Down Expand Up @@ -486,7 +517,7 @@ func (c *consumerGroup) heartbeatRequest(coordinator *Broker, memberID string, g
return coordinator.Heartbeat(req)
}

func (c *consumerGroup) balance(members map[string]ConsumerGroupMemberMetadata) (BalanceStrategyPlan, error) {
func (c *consumerGroup) balance(strategy BalanceStrategy, members map[string]ConsumerGroupMemberMetadata) (BalanceStrategyPlan, error) {
topics := make(map[string][]int32)
for _, meta := range members {
for _, topic := range meta.Topics {
Expand All @@ -502,7 +533,6 @@ func (c *consumerGroup) balance(members map[string]ConsumerGroupMemberMetadata)
topics[topic] = partitions
}

strategy := c.config.Consumer.Group.Rebalance.Strategy
return strategy.Plan(members, topics)
}

Expand Down
2 changes: 1 addition & 1 deletion consumer_group_test.go
Expand Up @@ -49,7 +49,7 @@ func TestConsumerGroupNewSessionDuringOffsetLoad(t *testing.T) {
"HeartbeatRequest": NewMockHeartbeatResponse(t),
"JoinGroupRequest": NewMockSequence(
NewMockJoinGroupResponse(t).SetError(ErrOffsetsLoadInProgress),
NewMockJoinGroupResponse(t),
NewMockJoinGroupResponse(t).SetGroupProtocol(RangeBalanceStrategyName),
),
"SyncGroupRequest": NewMockSequence(
NewMockSyncGroupResponse(t).SetError(ErrOffsetsLoadInProgress),
Expand Down
6 changes: 3 additions & 3 deletions examples/consumergroup/main.go
Expand Up @@ -70,11 +70,11 @@ func main() {

switch assignor {
case "sticky":
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategySticky
config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.BalanceStrategySticky}
case "roundrobin":
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.BalanceStrategyRoundRobin}
case "range":
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange
config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.BalanceStrategyRange}
default:
log.Panicf("Unrecognized consumer group partition assignor: %s", assignor)
}
Expand Down
4 changes: 2 additions & 2 deletions functional_consumer_group_test.go
Expand Up @@ -59,7 +59,7 @@ func TestFuncConsumerGroupPartitioningStateful(t *testing.T) {

m1s := newTestStatefulStrategy(t)
config := defaultConfig("M1")
config.Consumer.Group.Rebalance.Strategy = m1s
config.Consumer.Group.Rebalance.GroupStrategies = []BalanceStrategy{m1s}
config.Consumer.Group.Member.UserData = []byte(config.ClientID)

// start M1
Expand All @@ -72,7 +72,7 @@ func TestFuncConsumerGroupPartitioningStateful(t *testing.T) {

m2s := newTestStatefulStrategy(t)
config = defaultConfig("M2")
config.Consumer.Group.Rebalance.Strategy = m2s
config.Consumer.Group.Rebalance.GroupStrategies = []BalanceStrategy{m2s}
config.Consumer.Group.Member.UserData = []byte(config.ClientID)

// start M2
Expand Down

0 comments on commit 5e2c2ef

Please sign in to comment.